Package scrapfly
Expand source code
__version__ = '0.8.10'
from typing import Tuple
from .errors import ScrapflyError
from .errors import ScrapflyAspError
from .errors import ScrapflyProxyError
from .errors import ScrapflyScheduleError
from .errors import ScrapflyScrapeError
from .errors import ScrapflySessionError
from .errors import ScrapflyThrottleError
from .errors import ScrapflyWebhookError
from .errors import EncoderError
from .errors import ErrorFactory
from .errors import HttpError
from .errors import UpstreamHttpError
from .errors import UpstreamHttpClientError
from .errors import UpstreamHttpServerError
from .errors import ApiHttpClientError
from .errors import ApiHttpServerError
from .api_response import ScrapeApiResponse, ResponseBodyHandler
from .client import ScrapflyClient
from .scrape_config import ScrapeConfig
__all__:Tuple[str, ...] = (
'ScrapflyError',
'ScrapflyAspError',
'ScrapflyProxyError',
'ScrapflyScheduleError',
'ScrapflyScrapeError',
'ScrapflySessionError',
'ScrapflyThrottleError',
'ScrapflyWebhookError',
'UpstreamHttpError',
'UpstreamHttpClientError',
'UpstreamHttpServerError',
'ApiHttpClientError',
'ApiHttpServerError',
'EncoderError',
'ScrapeApiResponse',
'ErrorFactory',
'HttpError',
'ScrapflyClient',
'ResponseBodyHandler',
'ScrapeConfig'
)
Sub-modules
scrapfly.api_response
scrapfly.client
scrapfly.errors
scrapfly.frozen_dict
scrapfly.polyfill
scrapfly.reporter
scrapfly.scrape_config
scrapfly.scrapy
Classes
class ApiHttpClientError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ApiHttpClientError(HttpError): pass
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
- ApiHttpServerError
- scrapfly.errors.BadApiKeyError
- scrapfly.errors.PaymentRequired
- scrapfly.errors.TooManyRequest
class ApiHttpServerError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ApiHttpServerError(ApiHttpClientError): pass
Ancestors
- ApiHttpClientError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class EncoderError (content: str)
-
Common base class for all exceptions
Expand source code
class EncoderError(BaseException): def __init__(self, content:str): self.content = content super().__init__() def __str__(self) -> str: return self.content def __repr__(self): return "Invalid payload: %s" % self.content
Ancestors
- builtins.BaseException
class ErrorFactory
-
Expand source code
class ErrorFactory: RESOURCE_TO_ERROR = { ScrapflyError.RESOURCE_SCRAPE: ScrapflyScrapeError, ScrapflyError.RESOURCE_WEBHOOK: ScrapflyWebhookError, ScrapflyError.RESOURCE_PROXY: ScrapflyProxyError, ScrapflyError.RESOURCE_SCHEDULE: ScrapflyScheduleError, ScrapflyError.RESOURCE_ASP: ScrapflyAspError, ScrapflyError.RESOURCE_SESSION: ScrapflySessionError } # Notable http error has own class for more convenience # Only applicable for generic API error HTTP_STATUS_TO_ERROR = { 401: BadApiKeyError, 402: PaymentRequired, 429: TooManyRequest } @staticmethod def _get_resource(code: str) -> Optional[Tuple[str, str]]: if isinstance(code, str) and '::' in code: _, resource, _ = code.split('::') return resource return None @staticmethod def create(api_response: 'ScrapeApiResponse'): is_retryable = False kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR http_code = api_response.status_code retry_delay = 5 retry_times = 3 description = None error_url = 'https://scrapfly.io/docs/scrape-api/errors#api' code = api_response.error['code'] if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': http_code = api_response.scrape_result['status_code'] if 'description' in api_response.error: description = api_response.error['description'] message = '%s %s %s' % (str(http_code), code, api_response.error['message']) if 'doc_url' in api_response.error: error_url = api_response.error['doc_url'] if 'retryable' in api_response.error: is_retryable = api_response.error['retryable'] resource = ErrorFactory._get_resource(code=code) if is_retryable is True: if 'X-Retry' in api_response.headers: retry_delay = int(api_response.headers['Retry-After']) message = '%s: %s' % (message, description) if description else message if retry_delay is not None and is_retryable is True: message = '%s. Retry delay : %s seconds' % (message, str(retry_delay)) args = { 'message': message, 'code': code, 'http_status_code': http_code, 'is_retryable': is_retryable, 'api_response': api_response, 'resource': resource, 'retry_delay': retry_delay, 'retry_times': retry_times, 'documentation_url': error_url, 'request': api_response.request, 'response': api_response.response } if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE: if http_code >= 500: return ApiHttpServerError(**args) is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error: return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args) if is_scraper_api_error: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ApiHttpClientError(**args) elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR: if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': if http_code >= 500: return UpstreamHttpServerError(**args) if http_code >= 400: return UpstreamHttpClientError(**args) if resource in ErrorFactory.RESOURCE_TO_ERROR: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ScrapflyError(**args)
Class variables
var HTTP_STATUS_TO_ERROR
var RESOURCE_TO_ERROR
Static methods
def create(api_response: ScrapeApiResponse)
-
Expand source code
@staticmethod def create(api_response: 'ScrapeApiResponse'): is_retryable = False kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR http_code = api_response.status_code retry_delay = 5 retry_times = 3 description = None error_url = 'https://scrapfly.io/docs/scrape-api/errors#api' code = api_response.error['code'] if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': http_code = api_response.scrape_result['status_code'] if 'description' in api_response.error: description = api_response.error['description'] message = '%s %s %s' % (str(http_code), code, api_response.error['message']) if 'doc_url' in api_response.error: error_url = api_response.error['doc_url'] if 'retryable' in api_response.error: is_retryable = api_response.error['retryable'] resource = ErrorFactory._get_resource(code=code) if is_retryable is True: if 'X-Retry' in api_response.headers: retry_delay = int(api_response.headers['Retry-After']) message = '%s: %s' % (message, description) if description else message if retry_delay is not None and is_retryable is True: message = '%s. Retry delay : %s seconds' % (message, str(retry_delay)) args = { 'message': message, 'code': code, 'http_status_code': http_code, 'is_retryable': is_retryable, 'api_response': api_response, 'resource': resource, 'retry_delay': retry_delay, 'retry_times': retry_times, 'documentation_url': error_url, 'request': api_response.request, 'response': api_response.response } if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE: if http_code >= 500: return ApiHttpServerError(**args) is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error: return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args) if is_scraper_api_error: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ApiHttpClientError(**args) elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR: if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': if http_code >= 500: return UpstreamHttpServerError(**args) if http_code >= 400: return UpstreamHttpClientError(**args) if resource in ErrorFactory.RESOURCE_TO_ERROR: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ScrapflyError(**args)
class HttpError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class HttpError(ScrapflyError): def __init__(self, request:Request, response:Optional[Response]=None, **kwargs): self.request = request self.response = response super().__init__(**kwargs) def __str__(self) -> str: if isinstance(self, UpstreamHttpError): text = f"Target website responded with {self.api_response.scrape_result['status_code']} - {self.api_response.scrape_result['reason']}" else: text = f"{self.response.status_code} - {self.response.reason}" if isinstance(self, (ApiHttpClientError, ApiHttpServerError)): try: text += self.response.content.decode('utf-8') except UnicodeError: raise EncoderError(content=base64.b64encode(self.response.content).decode('utf-8')) elif isinstance(self, ScraperAPIError): print(self.api_response.error) text += f" | {self.api_response.error['code']} - {self.api_response.error['message']} - {self.api_response.error['links']}" return text
Ancestors
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
- ApiHttpClientError
- scrapfly.errors.QuotaLimitReached
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.TooManyConcurrentRequest
- scrapfly.errors.UpstreamHttpError
class ResponseBodyHandler (use_brotli: bool = False)
-
Expand source code
class ResponseBodyHandler: SUPPORTED_COMPRESSION = ['gzip', 'deflate'] SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json'] class JSONDateTimeDecoder(JSONDecoder): def __init__(self, *args, **kargs): JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs) # brotli under perform at same gzip level and upper level destroy the cpu so # the trade off do not worth it for most of usage def __init__(self, use_brotli:bool=False): if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION: try: try: import brotlicffi as brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: import brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: pass self.content_encoding = ', '.join(self.SUPPORTED_COMPRESSION) try: # automatically use msgpack if available https://msgpack.org/ import msgpack self.accept = 'application/msgpack;charset=utf-8' self.content_type = 'application/msgpack;charset=utf-8' self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False) except ImportError: self.accept = 'application/json;charset=utf-8' self.content_type = 'application/json;charset=utf-8' self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder) def support(self, headers:Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False def __call__(self, content: bytes) -> Union[str, Dict]: try: return self.content_loader(content) except Exception as e: try: raise EncoderError(content=content.decode('utf-8')) from e except UnicodeError: raise EncoderError(content=base64.b64encode(content).decode('utf-8')) from e
Class variables
var JSONDateTimeDecoder
-
Simple JSON http://json.org decoder
Performs the following translations in decoding by default:
+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+
It also understands
NaN
,Infinity
, and-Infinity
as their correspondingfloat
values, which is outside the JSON spec. var SUPPORTED_COMPRESSION
var SUPPORTED_CONTENT_TYPES
Methods
def support(self, headers: Dict) ‑> bool
-
Expand source code
def support(self, headers:Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False
class ScrapeApiResponse (request: requests.models.Request, response: requests.models.Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None)
-
Expand source code
class ScrapeApiResponse: def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None): self.request = request self.response = response self.scrape_config = scrape_config if self.scrape_config.method == 'HEAD': api_result = { 'result': { 'request_headers': {}, 'status': 'DONE', 'success': 200 >= self.response.status_code < 300, 'response_headers': self.response.headers, 'status_code': self.response.status_code, 'reason': self.response.reason, 'format': 'text', 'content': '' }, 'context': {}, 'config': self.scrape_config.__dict__ } if 'X-Scrapfly-Reject-Code' in self.response.headers: api_result['result']['error'] = { 'code': self.response.headers['X-Scrapfly-Reject-Code'], 'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']), 'message': self.response.headers['X-Scrapfly-Reject-Description'], 'error_id': self.response.headers['X-Scrapfly-Reject-ID'], 'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False, 'doc_url': '', 'links': {} } if 'X-Scrapfly-Reject-Doc' in self.response.headers: api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc'] api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc'] if isinstance(api_result, str): raise HttpError( request=request, response=response, message='Bad gateway', code=502, http_status_code=502, is_retryable=True ) self.result = self.handle_api_result(api_result=api_result) @property def scrape_result(self) -> Dict: return self.result['result'] @property def config(self) -> Dict: return self.result['config'] @property def context(self) -> Dict: return self.result['context'] @property def content(self) -> str: return self.scrape_result['content'] @property def success(self) -> bool: """ /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299 @property def scrape_success(self) -> bool: return self.scrape_result['success'] @property def error(self) -> Optional[Dict]: if self.scrape_success is False: return self.scrape_result['error'] @property def status_code(self) -> int: """ /!\ This is the status code of our API, not the upstream website """ return self.response.status_code @property def upstream_status_code(self) -> Optional[int]: if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None def prevent_extra_usage(self): if self.remaining_quota == 0: raise ExtraUsageForbidden( message='All Pre Paid Quota Used', code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE', http_status_code=429, is_retryable=False ) @property def remaining_quota(self) -> Optional[int]: remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape') if remaining_scrape: remaining_scrape = int(remaining_scrape) return remaining_scrape @property def cost(self) -> Optional[int]: cost = self.response.headers.get('X-Scrapfly-Api-Cost') if cost: cost = int(cost) return cost @property def duration_ms(self) -> Optional[float]: duration = self.response.headers.get('X-Scrapfly-Response-Time') if duration: duration = float(duration) return duration @property def headers(self) -> CaseInsensitiveDict: return self.response.headers def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if api_result['result']['format'] == 'binary' and api_result['result']['content']: api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content'])) return FrozenDict(api_result) @cached_property def soup(self) -> 'BeautifulSoup': try: from bs4 import BeautifulSoup soup = BeautifulSoup(self.content, "lxml") return soup except ImportError as e: logger.error('You must install scrapfly[parser] to enable this feature') @cached_property def selector(self) -> 'Selector': try: from parsel import Selector return Selector(text=self.content) except ImportError as e: logger.error('You must install parsel or scrapy package to enable this feature') raise e @property def error_message(self) : if self.error: message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message']) if self.error['links']: message += "Checkout the related doc: %s" % list(self.error['links'].values())[0] return message return '<-- %s - %s %s | Doc: %s' % (self.response.status_code, self.http_status_code, self.code, self.documentation_url) def _is_api_error(self, api_result: Dict) -> bool: if self.scrape_config.method == 'HEAD': if 'X-Reject-Reason' in self.response.headers: return True return False if api_result is None: return True return 'error_id' in api_result def raise_for_result(self, raise_on_upstream_error: bool = True): try: self.response.raise_for_status() except HTTPError as e: if 'http_code' in self.result: if e.response.status_code >= 500: raise ApiHttpServerError( request=e.request, response=e.response, message=self.result['message'], code='', resource='', http_status_code=e.response.status_code, documentation_url=self.result.get('links') ) from e else: raise ApiHttpClientError( request=e.request, response=e.response, message=self.result['message'], code='', resource='API', http_status_code=self.result['http_code'], documentation_url=self.result.get('links') ) from e if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content:Optional[Union[str, bytes]]=None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path)
Instance variables
var config : Dict
-
Expand source code
@property def config(self) -> Dict: return self.result['config']
var content : str
-
Expand source code
@property def content(self) -> str: return self.scrape_result['content']
var context : Dict
-
Expand source code
@property def context(self) -> Dict: return self.result['context']
var cost : Optional[int]
-
Expand source code
@property def cost(self) -> Optional[int]: cost = self.response.headers.get('X-Scrapfly-Api-Cost') if cost: cost = int(cost) return cost
var duration_ms : Optional[float]
-
Expand source code
@property def duration_ms(self) -> Optional[float]: duration = self.response.headers.get('X-Scrapfly-Response-Time') if duration: duration = float(duration) return duration
var error : Optional[Dict]
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.scrape_success is False: return self.scrape_result['error']
var error_message
-
Expand source code
@property def error_message(self) : if self.error: message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message']) if self.error['links']: message += "Checkout the related doc: %s" % list(self.error['links'].values())[0] return message return '<-- %s - %s %s | Doc: %s' % (self.response.status_code, self.http_status_code, self.code, self.documentation_url)
var headers : requests.structures.CaseInsensitiveDict
-
Expand source code
@property def headers(self) -> CaseInsensitiveDict: return self.response.headers
var remaining_quota : Optional[int]
-
Expand source code
@property def remaining_quota(self) -> Optional[int]: remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape') if remaining_scrape: remaining_scrape = int(remaining_scrape) return remaining_scrape
var scrape_result : Dict
-
Expand source code
@property def scrape_result(self) -> Dict: return self.result['result']
var scrape_success : bool
-
Expand source code
@property def scrape_success(self) -> bool: return self.scrape_result['success']
var selector
-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
var soup
-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
var status_code : int
-
/!\ This is the status code of our API, not the upstream website
Expand source code
@property def status_code(self) -> int: """ /!\ This is the status code of our API, not the upstream website """ return self.response.status_code
var success : bool
-
/!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
Expand source code
@property def success(self) -> bool: """ /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299
var upstream_status_code : Optional[int]
-
Expand source code
@property def upstream_status_code(self) -> Optional[int]: if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None
Methods
def handle_api_result(self, api_result: Dict) ‑> Optional[FrozenDict]
-
Expand source code
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if api_result['result']['format'] == 'binary' and api_result['result']['content']: api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content'])) return FrozenDict(api_result)
def prevent_extra_usage(self)
-
Expand source code
def prevent_extra_usage(self): if self.remaining_quota == 0: raise ExtraUsageForbidden( message='All Pre Paid Quota Used', code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE', http_status_code=429, is_retryable=False )
def raise_for_result(self, raise_on_upstream_error: bool = True)
-
Expand source code
def raise_for_result(self, raise_on_upstream_error: bool = True): try: self.response.raise_for_status() except HTTPError as e: if 'http_code' in self.result: if e.response.status_code >= 500: raise ApiHttpServerError( request=e.request, response=e.response, message=self.result['message'], code='', resource='', http_status_code=e.response.status_code, documentation_url=self.result.get('links') ) from e else: raise ApiHttpClientError( request=e.request, response=e.response, message=self.result['message'], code='', resource='API', http_status_code=self.result['http_code'], documentation_url=self.result.get('links') ) from e if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None, content: Union[str, bytes, ForwardRef(None)] = None)
-
Expand source code
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content:Optional[Union[str, bytes]]=None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path)
def upstream_result_into_response(self) ‑> Optional[requests.models.Response]
-
Expand source code
def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response
class ScrapeConfig (url: str, retry: bool = True, method: str = 'GET', country: Optional[str] = None, render_js: bool = False, cache: bool = False, cache_clear: bool = False, ssl: bool = False, dns: bool = False, asp: bool = False, debug: bool = False, raise_on_upstream_error: bool = True, cache_ttl: Optional[int] = None, proxy_pool: Optional[str] = None, session: Optional[str] = None, tags: Optional[Set[str]] = None, correlation_id: Optional[str] = None, cookies: Optional[requests.structures.CaseInsensitiveDict] = None, body: Optional[str] = None, data: Optional[Dict] = None, headers: Union[requests.structures.CaseInsensitiveDict, Dict[str, str], ForwardRef(None)] = None, js: str = None, rendering_wait: int = None, wait_for_selector: Optional[str] = None, screenshots: Optional[Dict] = None, session_sticky_proxy: Optional[bool] = None, webhook: Optional[str] = None, timeout: Optional[int] = None, js_scenario: Optional[Dict] = None, extract: Optional[Dict] = None, os: Optional[str] = None, lang: Optional[List[str]] = None, auto_scroll: Optional[bool] = None, cost_budget: Optional[int] = None)
-
Expand source code
class ScrapeConfig: PUBLIC_DATACENTER_POOL = 'public_datacenter_pool' PUBLIC_RESIDENTIAL_POOL = 'public_residential_pool' url: str retry: bool = True method: str = 'GET' country: Optional[str] = None render_js: bool = False cache: bool = False cache_clear:bool = False ssl:bool = False dns:bool = False asp:bool = False debug: bool = False raise_on_upstream_error:bool = True cache_ttl:Optional[int] = None proxy_pool:Optional[str] = None session: Optional[str] = None tags: Optional[List[str]] = None correlation_id: Optional[str] = None cookies: Optional[CaseInsensitiveDict] = None body: Optional[str] = None data: Optional[Dict] = None headers: Optional[CaseInsensitiveDict] = None js: str = None rendering_wait: int = None wait_for_selector: Optional[str] = None session_sticky_proxy:bool = True screenshots:Optional[Dict]=None webhook:Optional[str]=None timeout:Optional[int]=None # in milliseconds js_scenario: Dict = None extract: Dict = None lang:Optional[List[str]] = None os:Optional[str] = None auto_scroll:Optional[bool] = None cost_budget:Optional[int] = None def __init__( self, url: str, retry: bool = True, method: str = 'GET', country: Optional[str] = None, render_js: bool = False, cache: bool = False, cache_clear:bool = False, ssl:bool = False, dns:bool = False, asp:bool = False, debug: bool = False, raise_on_upstream_error:bool = True, cache_ttl:Optional[int] = None, proxy_pool:Optional[str] = None, session: Optional[str] = None, tags: Optional[Set[str]] = None, correlation_id: Optional[str] = None, cookies: Optional[CaseInsensitiveDict] = None, body: Optional[str] = None, data: Optional[Dict] = None, headers: Optional[Union[CaseInsensitiveDict, Dict[str, str]]] = None, js: str = None, rendering_wait: int = None, wait_for_selector: Optional[str] = None, screenshots:Optional[Dict]=None, session_sticky_proxy:Optional[bool] = None, webhook:Optional[str] = None, timeout:Optional[int] = None, # in milliseconds js_scenario:Optional[Dict] = None, extract:Optional[Dict] = None, os:Optional[str] = None, lang:Optional[List[str]] = None, auto_scroll:Optional[bool] = None, cost_budget:Optional[int] = None ): assert(type(url) is str) if isinstance(tags, List): tags = set(tags) cookies = cookies or {} headers = headers or {} self.cookies = CaseInsensitiveDict(cookies) self.headers = CaseInsensitiveDict(headers) self.url = url self.retry = retry self.method = method self.country = country self.session_sticky_proxy = session_sticky_proxy self.render_js = render_js self.cache = cache self.cache_clear = cache_clear self.asp = asp self.webhook = webhook self.session = session self.debug = debug self.cache_ttl = cache_ttl self.proxy_pool = proxy_pool self.tags = tags or set() self.correlation_id = correlation_id self.wait_for_selector = wait_for_selector self.body = body self.data = data self.js = js self.rendering_wait = rendering_wait self.raise_on_upstream_error = raise_on_upstream_error self.screenshots = screenshots self.key = None self.dns = dns self.ssl = ssl self.js_scenario = js_scenario self.timeout = timeout self.extract = extract self.lang = lang self.os = os self.auto_scroll = auto_scroll self.cost_budget = cost_budget if cookies: _cookies = [] for name, value in cookies.items(): _cookies.append(name + '=' + value) if 'cookie' in self.headers: if self.headers['cookie'][-1] != ';': self.headers['cookie'] += ';' else: self.headers['cookie'] = '' self.headers['cookie'] += '; '.join(_cookies) if self.body and self.data: raise ScrapeConfigError('You cannot pass both parameters body and data. You must choose') if method in ['POST', 'PUT', 'PATCH']: if self.body is None and self.data is not None: if 'content-type' not in self.headers: self.headers['content-type'] = 'application/x-www-form-urlencoded' self.body = urlencode(data) else: if self.headers['content-type'].find('application/json') != -1: self.body = json.dumps(data) elif self.headers['content-type'].find('application/x-www-form-urlencoded') != -1: self.body = urlencode(data) else: raise ScrapeConfigError('Content-Type "%s" not supported, use body parameter to pass pre encoded body according to your content type' % self.headers['content-type']) elif self.body is None and self.data is None: self.headers['content-type'] = 'text/plain' def _bool_to_http(self, _bool:bool) -> str: return 'true' if _bool is True else 'false' def to_api_params(self, key:str) -> Dict: params = { 'key': self.key if self.key is not None else key, 'url': self.url } if self.country is not None: params['country'] = self.country for name, value in self.headers.items(): params['headers[%s]' % name] = value if self.webhook is not None: params['webhook_name'] = self.webhook if self.timeout is not None: params['timeout'] = self.timeout if self.extract is not None: params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8') if self.cost_budget is not None: params['cost_budget'] = self.cost_budget if self.render_js is True: params['render_js'] = self._bool_to_http(self.render_js) if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.js_scenario: params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8') if self.rendering_wait: params['rendering_wait'] = self.rendering_wait if self.screenshots is not None: for name, element in self.screenshots.items(): params['screenshots[%s]' % name] = element if self.auto_scroll is True: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) else: if self.wait_for_selector is not None: logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled') if self.screenshots: logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled') if self.js_scenario: logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled') if self.js: logging.warning('Params "js" is ignored. Works only if render_js is enabled') if self.rendering_wait: logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled') if self.asp is True: params['asp'] = self._bool_to_http(self.asp) if self.retry is False: params['retry'] = self._bool_to_http(self.retry) if self.cache is True: params['cache'] = self._bool_to_http(self.cache) if self.cache_clear is True: params['cache_clear'] = self._bool_to_http(self.cache_clear) if self.cache_ttl is not None: params['cache_ttl'] = self.cache_ttl else: if self.cache_clear is True: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.dns is True: params['dns'] = self._bool_to_http(self.dns) if self.ssl is True: params['ssl'] = self._bool_to_http(self.ssl) if self.tags: params['tags'] = ','.join(self.tags) if self.correlation_id: params['correlation_id'] = self.correlation_id if self.session: params['session'] = self.session if self.session_sticky_proxy is True: # false by default params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy) else: if self.session_sticky_proxy: logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled') if self.debug is True: params['debug'] = self._bool_to_http(self.debug) if self.proxy_pool is not None: params['proxy_pool'] = self.proxy_pool if self.lang is not None: params['lang'] = ','.join(self.lang) if self.os is not None: params['os'] = self.os return params @staticmethod def from_exported_config(config:str) -> 'ScrapeConfig': try: from msgpack import loads as msgpack_loads except ImportError as e: print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack') raise data = msgpack_loads(base64.b64decode(config)) headers = {} for name, value in data['headers'].items(): if isinstance(value, Iterable): headers[name] = '; '.join(value) else: headers[name] = value return ScrapeConfig( url=data['url'], retry=data['retry'], headers=headers, session=data['session'], session_sticky_proxy=data['session_sticky_proxy'], cache=data['cache'], cache_ttl=data['cache_ttl'], cache_clear=data['cache_clear'], render_js=data['render_js'], method=data['method'], asp=data['asp'], body=data['body'], ssl=data['ssl'], dns=data['dns'], country=data['country'], debug=data['debug'], correlation_id=data['correlation_id'], tags=data['tags'], js=data['js'], rendering_wait=data['rendering_wait'], screenshots=data['screenshots'] or {}, proxy_pool=data['proxy_pool'], auto_scroll=data['auto_scroll'], cost_budget=data['cost_budget'] )
Class variables
var PUBLIC_DATACENTER_POOL
var PUBLIC_RESIDENTIAL_POOL
var asp : bool
var auto_scroll : Optional[bool]
var body : Optional[str]
var cache : bool
var cache_clear : bool
var cache_ttl : Optional[int]
var correlation_id : Optional[str]
var cost_budget : Optional[int]
var country : Optional[str]
var data : Optional[Dict]
var debug : bool
var dns : bool
var extract : Dict
var headers : Optional[requests.structures.CaseInsensitiveDict]
var js : str
var js_scenario : Dict
var lang : Optional[List[str]]
var method : str
var os : Optional[str]
var proxy_pool : Optional[str]
var raise_on_upstream_error : bool
var render_js : bool
var rendering_wait : int
var retry : bool
var screenshots : Optional[Dict]
var session : Optional[str]
var session_sticky_proxy : bool
var ssl : bool
var timeout : Optional[int]
var url : str
var wait_for_selector : Optional[str]
var webhook : Optional[str]
Static methods
def from_exported_config(config: str) ‑> ScrapeConfig
-
Expand source code
@staticmethod def from_exported_config(config:str) -> 'ScrapeConfig': try: from msgpack import loads as msgpack_loads except ImportError as e: print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack') raise data = msgpack_loads(base64.b64decode(config)) headers = {} for name, value in data['headers'].items(): if isinstance(value, Iterable): headers[name] = '; '.join(value) else: headers[name] = value return ScrapeConfig( url=data['url'], retry=data['retry'], headers=headers, session=data['session'], session_sticky_proxy=data['session_sticky_proxy'], cache=data['cache'], cache_ttl=data['cache_ttl'], cache_clear=data['cache_clear'], render_js=data['render_js'], method=data['method'], asp=data['asp'], body=data['body'], ssl=data['ssl'], dns=data['dns'], country=data['country'], debug=data['debug'], correlation_id=data['correlation_id'], tags=data['tags'], js=data['js'], rendering_wait=data['rendering_wait'], screenshots=data['screenshots'] or {}, proxy_pool=data['proxy_pool'], auto_scroll=data['auto_scroll'], cost_budget=data['cost_budget'] )
Methods
def to_api_params(self, key: str) ‑> Dict
-
Expand source code
def to_api_params(self, key:str) -> Dict: params = { 'key': self.key if self.key is not None else key, 'url': self.url } if self.country is not None: params['country'] = self.country for name, value in self.headers.items(): params['headers[%s]' % name] = value if self.webhook is not None: params['webhook_name'] = self.webhook if self.timeout is not None: params['timeout'] = self.timeout if self.extract is not None: params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8') if self.cost_budget is not None: params['cost_budget'] = self.cost_budget if self.render_js is True: params['render_js'] = self._bool_to_http(self.render_js) if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.js_scenario: params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8') if self.rendering_wait: params['rendering_wait'] = self.rendering_wait if self.screenshots is not None: for name, element in self.screenshots.items(): params['screenshots[%s]' % name] = element if self.auto_scroll is True: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) else: if self.wait_for_selector is not None: logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled') if self.screenshots: logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled') if self.js_scenario: logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled') if self.js: logging.warning('Params "js" is ignored. Works only if render_js is enabled') if self.rendering_wait: logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled') if self.asp is True: params['asp'] = self._bool_to_http(self.asp) if self.retry is False: params['retry'] = self._bool_to_http(self.retry) if self.cache is True: params['cache'] = self._bool_to_http(self.cache) if self.cache_clear is True: params['cache_clear'] = self._bool_to_http(self.cache_clear) if self.cache_ttl is not None: params['cache_ttl'] = self.cache_ttl else: if self.cache_clear is True: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.dns is True: params['dns'] = self._bool_to_http(self.dns) if self.ssl is True: params['ssl'] = self._bool_to_http(self.ssl) if self.tags: params['tags'] = ','.join(self.tags) if self.correlation_id: params['correlation_id'] = self.correlation_id if self.session: params['session'] = self.session if self.session_sticky_proxy is True: # false by default params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy) else: if self.session_sticky_proxy: logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled') if self.debug is True: params['debug'] = self._bool_to_http(self.debug) if self.proxy_pool is not None: params['proxy_pool'] = self.proxy_pool if self.lang is not None: params['lang'] = ','.join(self.lang) if self.os is not None: params['os'] = self.os return params
class ScrapflyAspError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyAspError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyClient (key: str, host: Optional[str] = 'https://api.scrapfly.io', verify=True, debug: bool = False, max_concurrency: int = 1, connect_timeout: int = 30, read_timeout: int = 160, reporter: Optional[Callable] = None, **kwargs)
-
Expand source code
class ScrapflyClient: HOST = 'https://api.scrapfly.io' DEFAULT_CONNECT_TIMEOUT = 30 DEFAULT_READ_TIMEOUT = 160 # 155 real host:str key:str max_concurrency:int verify:bool debug:bool distributed_mode:bool connect_timeout:int read_timeout:int brotli: bool reporter:Reporter version:str CONCURRENCY_AUTO = 'auto' # retrieve the allowed concurrency from your account def __init__( self, key: str, host: Optional[str] = HOST, verify=True, debug: bool = False, max_concurrency:int=1, connect_timeout:int = DEFAULT_CONNECT_TIMEOUT, read_timeout:int = DEFAULT_READ_TIMEOUT, reporter:Optional[Callable]=None, **kwargs ): if host[-1] == '/': # remove last '/' if exists host = host[:-1] if 'distributed_mode' in kwargs: warnings.warn("distributed mode is deprecated and will be remove the next version -" " user should handle themself the session name based on the concurrency", DeprecationWarning, stacklevel=2 ) if 'brotli' in kwargs: warnings.warn("brotli arg is deprecated and will be remove the next version - " "brotli is disabled by default", DeprecationWarning, stacklevel=2 ) self.version = __version__ self.host = host self.key = key self.verify = verify self.debug = debug self.connect_timeout = connect_timeout self.read_timeout = read_timeout self.max_concurrency = max_concurrency self.body_handler = ResponseBodyHandler(use_brotli=False) self.async_executor = ThreadPoolExecutor() self.http_session = None if not self.verify and not self.HOST.endswith('.local'): urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) if self.debug is True: http.client.HTTPConnection.debuglevel = 5 if reporter is None: from .reporter import NoopReporter reporter = NoopReporter() self.reporter = Reporter(reporter) @property def ua(self) -> str: return 'ScrapflySDK/%s (Python %s, %s, %s)' % ( self.version, platform.python_version(), platform.uname().system, platform.uname().machine ) @cached_property def _http_handler(self): return partial(self.http_session.request if self.http_session else requests.request) @property def http(self): return self._http_handler def _scrape_request(self, scrape_config:ScrapeConfig): return { 'method': scrape_config.method, 'url': self.host + '/scrape', 'data': scrape_config.body, 'verify': self.verify, 'timeout': (self.connect_timeout, self.read_timeout), 'headers': { 'content-type': scrape_config.headers['content-type'] if scrape_config.method in ['POST', 'PUT', 'PATCH'] else self.body_handler.content_type, 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, 'params': scrape_config.to_api_params(key=self.key) } def account(self) -> Union[str, Dict]: response = self._http_handler( method='GET', url=self.host + '/account', params={'key': self.key}, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content) return response.content.decode('utf-8') def resilient_scrape( self, scrape_config:ScrapeConfig, retry_on_errors:Set[Exception]={ScrapflyError}, retry_on_status_code:Optional[List[int]]=None, tries: int = 5, delay: int = 20, ) -> ScrapeApiResponse: assert retry_on_errors is not None, 'Retry on error is None' assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()' @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay) def inner() -> ScrapeApiResponse: try: return self.scrape(scrape_config=scrape_config) except (UpstreamHttpClientError, UpstreamHttpServerError) as e: if retry_on_status_code is not None and e.api_response: if e.api_response.upstream_status_code in retry_on_status_code: raise e else: return e.api_response raise e return inner() def open(self): if self.http_session is None: self.http_session = Session() self.http_session.verify = self.verify self.http_session.timeout = (self.connect_timeout, self.read_timeout) self.http_session.params['key'] = self.key self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding self.http_session.headers['accept'] = self.body_handler.accept self.http_session.headers['user-agent'] = self.ua def close(self): self.http_session.close() self.http_session = None def __enter__(self) -> 'ScrapflyClient': self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config) async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None): if concurrency is None: concurrency = self.max_concurrency elif concurrency == self.CONCURRENCY_AUTO: concurrency = self.account()['subscription']['max_concurrency'] loop = asyncio.get_running_loop() processing_tasks = [] results = [] processed_tasks = 0 expected_tasks = len(scrape_configs) def scrape_done_callback(task:Task): nonlocal processed_tasks try: if task.cancelled() is True: return error = task.exception() if error is not None: results.append(error) else: results.append(task.result()) finally: processing_tasks.remove(task) processed_tasks += 1 while scrape_configs or results or processing_tasks: logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) if scrape_configs: if len(processing_tasks) < concurrency: # @todo handle backpressure for _ in range(0, concurrency - len(processing_tasks)): try: scrape_config = scrape_configs.pop() except: break scrape_config.raise_on_upstream_error = False task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop)) processing_tasks.append(task) task.add_done_callback(scrape_done_callback) for _ in results: result = results.pop() yield result await asyncio.sleep(.5) logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def scrape(self, scrape_config:ScrapeConfig) -> ScrapeApiResponse: try: logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url)) request_data = self._scrape_request(scrape_config=scrape_config) response = self._http_handler(**request_data) scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config) self.reporter.report(scrape_api_response=scrape_api_response) return scrape_api_response except BaseException as e: self.reporter.report(error=e) raise e def _handle_response(self, response:Response, scrape_config:ScrapeConfig) -> ScrapeApiResponse: try: api_response = self._handle_api_response( response=response, scrape_config=scrape_config, raise_on_upstream_error=scrape_config.raise_on_upstream_error ) if scrape_config.method == 'HEAD': logger.debug('<-- [%s %s] %s | %ss' % ( api_response.response.status_code, api_response.response.reason, api_response.response.request.url, 0 )) else: logger.debug('<-- [%s %s] %s | %ss' % ( api_response.result['result']['status_code'], api_response.result['result']['reason'], api_response.result['config']['url'], api_response.result['result']['duration']) ) logger.debug('Log url: %s' % api_response.result['result']['log_url']) return api_response except UpstreamHttpError as e: logger.critical(e.api_response.error_message) raise except ScrapflyScrapeError as e: if e.api_response is not None: logger.critical(e.api_response.error_message) else: logger.critical(e.message) raise except HttpError as e: if e.api_response is not None: logger.critical(e.api_response.error_message) else: logger.critical(e.message) raise except ScrapflyError as e: logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url)) raise def save_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None): if not api_response.scrape_result['screenshots']: raise RuntimeError('Screenshot %s do no exists' % name) try: api_response.scrape_result['screenshots'][name] except KeyError: raise RuntimeError('Screenshot %s do no exists' % name) screenshot_response = self._http_handler( method='GET', url=api_response.scrape_result['screenshots'][name]['url'], params={'key': self.key}, verify=self.verify ) screenshot_response.raise_for_status() if not name.endswith('.jpg'): name += '.jpg' api_response.sink(path=path, name=name, content=screenshot_response.content) def screenshot(self, url:str, path:Optional[str]=None, name:Optional[str]=None) -> str: # for advance configuration, take screenshots via scrape method with ScrapeConfig api_response = self.scrape(scrape_config=ScrapeConfig( url=url, render_js=True, screenshots={'main': 'fullpage'} )) name = name or 'main.jpg' if not name.endswith('.jpg'): name += '.jpg' response = self._http_handler( method='GET', url=api_response.scrape_result['screenshots']['main']['url'], params={'key': self.key} ) response.raise_for_status() return self.sink(api_response, path=path, name=name, content=response.content) def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str: scrape_result = api_response.result['result'] scrape_config = api_response.result['config'] file_content = content or scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = scrape_config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) return file_path def _handle_api_response( self, response: Response, scrape_config:ScrapeConfig, raise_on_upstream_error: Optional[bool] = True ) -> ScrapeApiResponse: if scrape_config.method == 'HEAD': body = None else: if self.body_handler.support(headers=response.headers): body = self.body_handler(response.content) else: body = response.content.decode('utf-8') api_response:ScrapeApiResponse = ScrapeApiResponse( response=response, request=response.request, api_result=body, scrape_config=scrape_config ) api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error) return api_response
Class variables
var CONCURRENCY_AUTO
var DEFAULT_CONNECT_TIMEOUT
var DEFAULT_READ_TIMEOUT
var HOST
var brotli : bool
var connect_timeout : int
var debug : bool
var distributed_mode : bool
var host : str
var key : str
var max_concurrency : int
var read_timeout : int
var reporter : scrapfly.reporter.Reporter
var verify : bool
var version : str
Instance variables
var http
-
Expand source code
@property def http(self): return self._http_handler
var ua : str
-
Expand source code
@property def ua(self) -> str: return 'ScrapflySDK/%s (Python %s, %s, %s)' % ( self.version, platform.python_version(), platform.uname().system, platform.uname().machine )
Methods
def account(self) ‑> Union[str, Dict]
-
Expand source code
def account(self) -> Union[str, Dict]: response = self._http_handler( method='GET', url=self.host + '/account', params={'key': self.key}, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content) return response.content.decode('utf-8')
async def async_scrape(self, scrape_config: ScrapeConfig, loop: Optional[asyncio.events.AbstractEventLoop] = None) ‑> ScrapeApiResponse
-
Expand source code
async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)
def close(self)
-
Expand source code
def close(self): self.http_session.close() self.http_session = None
async def concurrent_scrape(self, scrape_configs: List[ScrapeConfig], concurrency: Optional[int] = None)
-
Expand source code
async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None): if concurrency is None: concurrency = self.max_concurrency elif concurrency == self.CONCURRENCY_AUTO: concurrency = self.account()['subscription']['max_concurrency'] loop = asyncio.get_running_loop() processing_tasks = [] results = [] processed_tasks = 0 expected_tasks = len(scrape_configs) def scrape_done_callback(task:Task): nonlocal processed_tasks try: if task.cancelled() is True: return error = task.exception() if error is not None: results.append(error) else: results.append(task.result()) finally: processing_tasks.remove(task) processed_tasks += 1 while scrape_configs or results or processing_tasks: logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) if scrape_configs: if len(processing_tasks) < concurrency: # @todo handle backpressure for _ in range(0, concurrency - len(processing_tasks)): try: scrape_config = scrape_configs.pop() except: break scrape_config.raise_on_upstream_error = False task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop)) processing_tasks.append(task) task.add_done_callback(scrape_done_callback) for _ in results: result = results.pop() yield result await asyncio.sleep(.5) logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))
def open(self)
-
Expand source code
def open(self): if self.http_session is None: self.http_session = Session() self.http_session.verify = self.verify self.http_session.timeout = (self.connect_timeout, self.read_timeout) self.http_session.params['key'] = self.key self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding self.http_session.headers['accept'] = self.body_handler.accept self.http_session.headers['user-agent'] = self.ua
def resilient_scrape(self, scrape_config: ScrapeConfig, retry_on_errors: Set[Exception] = {<class 'scrapfly.errors.ScrapflyError'>}, retry_on_status_code: Optional[List[int]] = None, tries: int = 5, delay: int = 20) ‑> ScrapeApiResponse
-
Expand source code
def resilient_scrape( self, scrape_config:ScrapeConfig, retry_on_errors:Set[Exception]={ScrapflyError}, retry_on_status_code:Optional[List[int]]=None, tries: int = 5, delay: int = 20, ) -> ScrapeApiResponse: assert retry_on_errors is not None, 'Retry on error is None' assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()' @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay) def inner() -> ScrapeApiResponse: try: return self.scrape(scrape_config=scrape_config) except (UpstreamHttpClientError, UpstreamHttpServerError) as e: if retry_on_status_code is not None and e.api_response: if e.api_response.upstream_status_code in retry_on_status_code: raise e else: return e.api_response raise e return inner()
def save_screenshot(self, api_response: ScrapeApiResponse, name: str, path: Optional[str] = None)
-
Expand source code
def save_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None): if not api_response.scrape_result['screenshots']: raise RuntimeError('Screenshot %s do no exists' % name) try: api_response.scrape_result['screenshots'][name] except KeyError: raise RuntimeError('Screenshot %s do no exists' % name) screenshot_response = self._http_handler( method='GET', url=api_response.scrape_result['screenshots'][name]['url'], params={'key': self.key}, verify=self.verify ) screenshot_response.raise_for_status() if not name.endswith('.jpg'): name += '.jpg' api_response.sink(path=path, name=name, content=screenshot_response.content)
def scrape(self, scrape_config: ScrapeConfig) ‑> ScrapeApiResponse
-
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def scrape(self, scrape_config:ScrapeConfig) -> ScrapeApiResponse: try: logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url)) request_data = self._scrape_request(scrape_config=scrape_config) response = self._http_handler(**request_data) scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config) self.reporter.report(scrape_api_response=scrape_api_response) return scrape_api_response except BaseException as e: self.reporter.report(error=e) raise e
def screenshot(self, url: str, path: Optional[str] = None, name: Optional[str] = None) ‑> str
-
Expand source code
def screenshot(self, url:str, path:Optional[str]=None, name:Optional[str]=None) -> str: # for advance configuration, take screenshots via scrape method with ScrapeConfig api_response = self.scrape(scrape_config=ScrapeConfig( url=url, render_js=True, screenshots={'main': 'fullpage'} )) name = name or 'main.jpg' if not name.endswith('.jpg'): name += '.jpg' response = self._http_handler( method='GET', url=api_response.scrape_result['screenshots']['main']['url'], params={'key': self.key} ) response.raise_for_status() return self.sink(api_response, path=path, name=name, content=response.content)
def sink(self, api_response: ScrapeApiResponse, content: Union[str, bytes, ForwardRef(None)] = None, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None) ‑> str
-
Expand source code
def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str: scrape_result = api_response.result['result'] scrape_config = api_response.result['config'] file_content = content or scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = scrape_config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) return file_path
class ScrapflyError (message: str, code: str, http_status_code: int, resource: Optional[str] = None, is_retryable: bool = False, retry_delay: Optional[int] = None, retry_times: Optional[int] = None, documentation_url: Optional[str] = None, api_response: Optional[ForwardRef('ApiResponse')] = None)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyError(Exception): KIND_HTTP_BAD_RESPONSE = 'HTTP_BAD_RESPONSE' KIND_SCRAPFLY_ERROR = 'SCRAPFLY_ERROR' RESOURCE_PROXY = 'PROXY' RESOURCE_THROTTLE = 'THROTTLE' RESOURCE_SCRAPE = 'SCRAPE' RESOURCE_ASP = 'ASP' RESOURCE_SCHEDULE = 'SCHEDULE' RESOURCE_WEBHOOK = 'WEBHOOK' RESOURCE_SESSION = 'SESSION' def __init__( self, message: str, code: str, http_status_code: int, resource: Optional[str]=None, is_retryable: bool = False, retry_delay: Optional[int] = None, retry_times: Optional[int] = None, documentation_url: Optional[str] = None, api_response: Optional['ApiResponse'] = None ): self.message = message self.code = code self.retry_delay = retry_delay self.retry_times = retry_times self.resource = resource self.is_retryable = is_retryable self.documentation_url = documentation_url self.api_response = api_response self.http_status_code = http_status_code super().__init__(self.message, str(self.code)) def __str__(self): message = self.message if self.documentation_url is not None: message += '. Learn more: %s' % self.documentation_url return message
Ancestors
- builtins.Exception
- builtins.BaseException
Subclasses
- scrapfly.errors.ExtraUsageForbidden
- scrapfly.errors.HttpError
Class variables
var KIND_HTTP_BAD_RESPONSE
var KIND_SCRAPFLY_ERROR
var RESOURCE_ASP
var RESOURCE_PROXY
var RESOURCE_SCHEDULE
var RESOURCE_SCRAPE
var RESOURCE_SESSION
var RESOURCE_THROTTLE
var RESOURCE_WEBHOOK
class ScrapflyProxyError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyProxyError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyScheduleError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyScheduleError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyScrapeError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyScrapeError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflySessionError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflySessionError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyThrottleError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyThrottleError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyWebhookError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyWebhookError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class UpstreamHttpClientError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class UpstreamHttpClientError(UpstreamHttpError): pass
Ancestors
- scrapfly.errors.UpstreamHttpError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
class UpstreamHttpError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class UpstreamHttpError(HttpError): pass
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
class UpstreamHttpServerError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class UpstreamHttpServerError(UpstreamHttpClientError): pass
Ancestors
- UpstreamHttpClientError
- scrapfly.errors.UpstreamHttpError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException