Package scrapfly
Sub-modules
scrapfly.api_config
scrapfly.api_response
scrapfly.client
scrapfly.errors
scrapfly.extraction_config
scrapfly.frozen_dict
scrapfly.polyfill
scrapfly.reporter
scrapfly.scrape_config
scrapfly.scrapy
scrapfly.screenshot_config
scrapfly.webhook
Classes
class ApiHttpClientError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ApiHttpClientError(HttpError): pass
Common base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
- ApiHttpServerError
- scrapfly.errors.BadApiKeyError
- scrapfly.errors.PaymentRequired
- scrapfly.errors.TooManyRequest
Inherited members
class ApiHttpServerError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ApiHttpServerError(ApiHttpClientError): pass
Common base class for all non-exit exceptions.
Ancestors
- ApiHttpClientError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Inherited members
class EncoderError (content: str)
-
Expand source code
class EncoderError(BaseException): def __init__(self, content:str): self.content = content super().__init__() def __str__(self) -> str: return self.content def __repr__(self): return "Invalid payload: %s" % self.content
Common base class for all exceptions
Ancestors
- builtins.BaseException
class ErrorFactory
-
Expand source code
class ErrorFactory: RESOURCE_TO_ERROR = { ScrapflyError.RESOURCE_SCRAPE: ScrapflyScrapeError, ScrapflyError.RESOURCE_WEBHOOK: ScrapflyWebhookError, ScrapflyError.RESOURCE_PROXY: ScrapflyProxyError, ScrapflyError.RESOURCE_SCHEDULE: ScrapflyScheduleError, ScrapflyError.RESOURCE_ASP: ScrapflyAspError, ScrapflyError.RESOURCE_SESSION: ScrapflySessionError } # Notable http error has own class for more convenience # Only applicable for generic API error HTTP_STATUS_TO_ERROR = { 401: BadApiKeyError, 402: PaymentRequired, 429: TooManyRequest } @staticmethod def _get_resource(code: str) -> Optional[Tuple[str, str]]: if isinstance(code, str) and '::' in code: _, resource, _ = code.split('::') return resource return None @staticmethod def create(api_response: 'ScrapeApiResponse'): is_retryable = False kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR http_code = api_response.status_code retry_delay = 5 retry_times = 3 description = None error_url = 'https://scrapfly.io/docs/scrape-api/errors#api' code = api_response.error['code'] if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': http_code = api_response.scrape_result['status_code'] if 'description' in api_response.error: description = api_response.error['description'] message = '%s %s %s' % (str(http_code), code, api_response.error['message']) if 'doc_url' in api_response.error: error_url = api_response.error['doc_url'] if 'retryable' in api_response.error: is_retryable = api_response.error['retryable'] resource = ErrorFactory._get_resource(code=code) if is_retryable is True: if 'X-Retry' in api_response.headers: retry_delay = int(api_response.headers['Retry-After']) message = '%s: %s' % (message, description) if description else message if retry_delay is not None and is_retryable is True: message = '%s. Retry delay : %s seconds' % (message, str(retry_delay)) args = { 'message': message, 'code': code, 'http_status_code': http_code, 'is_retryable': is_retryable, 'api_response': api_response, 'resource': resource, 'retry_delay': retry_delay, 'retry_times': retry_times, 'documentation_url': error_url, 'request': api_response.request, 'response': api_response.response } if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE: if http_code >= 500: return ApiHttpServerError(**args) is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error: return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args) if is_scraper_api_error: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ApiHttpClientError(**args) elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR: if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': if http_code >= 500: return UpstreamHttpServerError(**args) if http_code >= 400: return UpstreamHttpClientError(**args) if resource in ErrorFactory.RESOURCE_TO_ERROR: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ScrapflyError(**args)
Class variables
var HTTP_STATUS_TO_ERROR
-
The type of the None singleton.
var RESOURCE_TO_ERROR
-
The type of the None singleton.
Static methods
def create(api_response: ScrapeApiResponse)
-
Expand source code
@staticmethod def create(api_response: 'ScrapeApiResponse'): is_retryable = False kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR http_code = api_response.status_code retry_delay = 5 retry_times = 3 description = None error_url = 'https://scrapfly.io/docs/scrape-api/errors#api' code = api_response.error['code'] if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': http_code = api_response.scrape_result['status_code'] if 'description' in api_response.error: description = api_response.error['description'] message = '%s %s %s' % (str(http_code), code, api_response.error['message']) if 'doc_url' in api_response.error: error_url = api_response.error['doc_url'] if 'retryable' in api_response.error: is_retryable = api_response.error['retryable'] resource = ErrorFactory._get_resource(code=code) if is_retryable is True: if 'X-Retry' in api_response.headers: retry_delay = int(api_response.headers['Retry-After']) message = '%s: %s' % (message, description) if description else message if retry_delay is not None and is_retryable is True: message = '%s. Retry delay : %s seconds' % (message, str(retry_delay)) args = { 'message': message, 'code': code, 'http_status_code': http_code, 'is_retryable': is_retryable, 'api_response': api_response, 'resource': resource, 'retry_delay': retry_delay, 'retry_times': retry_times, 'documentation_url': error_url, 'request': api_response.request, 'response': api_response.response } if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE: if http_code >= 500: return ApiHttpServerError(**args) is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error: return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args) if is_scraper_api_error: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ApiHttpClientError(**args) elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR: if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': if http_code >= 500: return UpstreamHttpServerError(**args) if http_code >= 400: return UpstreamHttpClientError(**args) if resource in ErrorFactory.RESOURCE_TO_ERROR: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ScrapflyError(**args)
class ExtractionAPIError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ExtractionAPIError(HttpError): pass
Common base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Inherited members
class ExtractionApiResponse (request: requests.models.Request,
response: requests.models.Response,
extraction_config: ExtractionConfig,
api_result: bytes | None = None)-
Expand source code
class ExtractionApiResponse(ApiResponse): def __init__(self, request: Request, response: Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None): super().__init__(request, response) self.extraction_config = extraction_config self.result = self.handle_api_result(api_result) @property def extraction_result(self) -> Optional[Dict]: extraction_result = self.result.get('result', None) if not extraction_result: # handle empty extraction responses return {'data': None, 'content_type': None} else: return extraction_result @property def data(self) -> Union[Dict, List, str]: # depends on the LLM prompt if self.error is None: return self.extraction_result['data'] return None @property def content_type(self) -> Optional[str]: if self.error is None: return self.extraction_result['content_type'] return None @property def extraction_success(self) -> bool: extraction_result = self.extraction_result if extraction_result is None or extraction_result['data'] is None: return False return True @property def error(self) -> Optional[Dict]: if self.extraction_result is None: return self.result return None def _is_api_error(self, api_result: Dict) -> bool: if api_result is None: return True return 'error_id' in api_result def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return FrozenDict({'result': api_result}) def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Ancestors
Instance variables
prop content_type : str | None
-
Expand source code
@property def content_type(self) -> Optional[str]: if self.error is None: return self.extraction_result['content_type'] return None
prop data : Dict | List | str
-
Expand source code
@property def data(self) -> Union[Dict, List, str]: # depends on the LLM prompt if self.error is None: return self.extraction_result['data'] return None
prop error : Dict | None
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.extraction_result is None: return self.result return None
prop extraction_result : Dict | None
-
Expand source code
@property def extraction_result(self) -> Optional[Dict]: extraction_result = self.result.get('result', None) if not extraction_result: # handle empty extraction responses return {'data': None, 'content_type': None} else: return extraction_result
prop extraction_success : bool
-
Expand source code
@property def extraction_success(self) -> bool: extraction_result = self.extraction_result if extraction_result is None or extraction_result['data'] is None: return False return True
Methods
def handle_api_result(self, api_result: bytes) ‑> FrozenDict
-
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return FrozenDict({'result': api_result})
def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ExtractionAPIError)-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Inherited members
class ExtractionConfig (body: str | bytes,
content_type: str,
url: str | None = None,
charset: str | None = None,
extraction_template: str | None = None,
extraction_ephemeral_template: Dict | None = None,
extraction_prompt: str | None = None,
extraction_model: str | None = None,
is_document_compressed: bool | None = None,
document_compression_format: CompressionFormat | None = None,
webhook: str | None = None,
raise_on_upstream_error: bool = True,
template: str | None = None,
ephemeral_template: Dict | None = None)-
Expand source code
class ExtractionConfig(BaseApiConfig): body: Union[str, bytes] content_type: str url: Optional[str] = None charset: Optional[str] = None extraction_template: Optional[str] = None # a saved template name extraction_ephemeral_template: Optional[Dict] # ephemeraly declared json template extraction_prompt: Optional[str] = None extraction_model: Optional[str] = None is_document_compressed: Optional[bool] = None document_compression_format: Optional[CompressionFormat] = None webhook: Optional[str] = None raise_on_upstream_error: bool = True # deprecated options template: Optional[str] = None ephemeral_template: Optional[Dict] = None def __init__( self, body: Union[str, bytes], content_type: str, url: Optional[str] = None, charset: Optional[str] = None, extraction_template: Optional[str] = None, # a saved template name extraction_ephemeral_template: Optional[Dict] = None, # ephemeraly declared json template extraction_prompt: Optional[str] = None, extraction_model: Optional[str] = None, is_document_compressed: Optional[bool] = None, document_compression_format: Optional[CompressionFormat] = None, webhook: Optional[str] = None, raise_on_upstream_error: bool = True, # deprecated options template: Optional[str] = None, ephemeral_template: Optional[Dict] = None ): if template: print("WARNGING") warnings.warn( "Deprecation warning: 'template' is deprecated. Use 'extraction_template' instead." ) extraction_template = template if ephemeral_template: warnings.warn( "Deprecation warning: 'ephemeral_template' is deprecated. Use 'extraction_ephemeral_template' instead." ) extraction_ephemeral_template = ephemeral_template self.key = None self.body = body self.content_type = content_type self.url = url self.charset = charset self.extraction_template = extraction_template self.extraction_ephemeral_template = extraction_ephemeral_template self.extraction_prompt = extraction_prompt self.extraction_model = extraction_model self.is_document_compressed = is_document_compressed self.document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None self.webhook = webhook self.raise_on_upstream_error = raise_on_upstream_error if isinstance(body, bytes) or document_compression_format: compression_format = detect_compression_format(body) if compression_format is not None: self.is_document_compressed = True if self.document_compression_format and compression_format != self.document_compression_format: raise ExtractionConfigError( f'The detected compression format `{compression_format}` does not match declared format `{self.document_compression_format}`. ' f'You must pass the compression format or disable compression.' ) self.document_compression_format = compression_format else: self.is_document_compressed = False if self.is_document_compressed is False: compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None if isinstance(self.body, str) and compression_foramt: self.body = self.body.encode('utf-8') if compression_foramt == CompressionFormat.GZIP: import gzip self.body = gzip.compress(self.body) elif compression_foramt == CompressionFormat.ZSTD: try: import zstandard as zstd except ImportError: raise ExtractionConfigError( f'zstandard is not installed. You must run pip install zstandard' f' to auto compress into zstd or use compression formats.' ) self.body = zstd.compress(self.body) elif compression_foramt == CompressionFormat.DEFLATE: import zlib compressor = zlib.compressobj(wbits=-zlib.MAX_WBITS) # raw deflate compression self.body = compressor.compress(self.body) + compressor.flush() def to_api_params(self, key: str) -> Dict: params = { 'key': self.key or key, 'content_type': self.content_type } if self.url: params['url'] = self.url if self.charset: params['charset'] = self.charset if self.extraction_template and self.extraction_ephemeral_template: raise ExtractionConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose') if self.extraction_template: params['extraction_template'] = self.extraction_template if self.extraction_ephemeral_template: self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template) params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8') if self.extraction_prompt: params['extraction_prompt'] = quote_plus(self.extraction_prompt) if self.extraction_model: params['extraction_model'] = self.extraction_model if self.webhook: params['webhook_name'] = self.webhook return params def to_dict(self) -> Dict: """ Export the ExtractionConfig instance to a plain dictionary. """ if self.is_document_compressed is True: compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None if compression_foramt == CompressionFormat.GZIP: import gzip self.body = gzip.decompress(self.body) elif compression_foramt == CompressionFormat.ZSTD: import zstandard as zstd self.body = zstd.decompress(self.body) elif compression_foramt == CompressionFormat.DEFLATE: import zlib decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS) self.body = decompressor.decompress(self.body) + decompressor.flush() if isinstance(self.body, bytes): self.body = self.body.decode('utf-8') self.is_document_compressed = False return { 'body': self.body, 'content_type': self.content_type, 'url': self.url, 'charset': self.charset, 'extraction_template': self.extraction_template, 'extraction_ephemeral_template': self.extraction_ephemeral_template, 'extraction_prompt': self.extraction_prompt, 'extraction_model': self.extraction_model, 'is_document_compressed': self.is_document_compressed, 'document_compression_format': CompressionFormat(self.document_compression_format).value if self.document_compression_format else None, 'webhook': self.webhook, 'raise_on_upstream_error': self.raise_on_upstream_error, } @staticmethod def from_dict(extraction_config_dict: Dict) -> 'ExtractionConfig': """Create an ExtractionConfig instance from a dictionary.""" body = extraction_config_dict.get('body', None) content_type = extraction_config_dict.get('content_type', None) url = extraction_config_dict.get('url', None) charset = extraction_config_dict.get('charset', None) extraction_template = extraction_config_dict.get('extraction_template', None) extraction_ephemeral_template = extraction_config_dict.get('extraction_ephemeral_template', None) extraction_prompt = extraction_config_dict.get('extraction_prompt', None) extraction_model = extraction_config_dict.get('extraction_model', None) is_document_compressed = extraction_config_dict.get('is_document_compressed', None) document_compression_format = extraction_config_dict.get('document_compression_format', None) document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None webhook = extraction_config_dict.get('webhook', None) raise_on_upstream_error = extraction_config_dict.get('raise_on_upstream_error', True) return ExtractionConfig( body=body, content_type=content_type, url=url, charset=charset, extraction_template=extraction_template, extraction_ephemeral_template=extraction_ephemeral_template, extraction_prompt=extraction_prompt, extraction_model=extraction_model, is_document_compressed=is_document_compressed, document_compression_format=document_compression_format, webhook=webhook, raise_on_upstream_error=raise_on_upstream_error )
Ancestors
Class variables
var body : str | bytes
-
The type of the None singleton.
var charset : str | None
-
The type of the None singleton.
var content_type : str
-
The type of the None singleton.
var document_compression_format : CompressionFormat | None
-
The type of the None singleton.
var ephemeral_template : Dict | None
-
The type of the None singleton.
var extraction_ephemeral_template : Dict | None
-
The type of the None singleton.
var extraction_model : str | None
-
The type of the None singleton.
var extraction_prompt : str | None
-
The type of the None singleton.
var extraction_template : str | None
-
The type of the None singleton.
var is_document_compressed : bool | None
-
The type of the None singleton.
var raise_on_upstream_error : bool
-
The type of the None singleton.
var template : str | None
-
The type of the None singleton.
var url : str | None
-
The type of the None singleton.
var webhook : str | None
-
The type of the None singleton.
Static methods
def from_dict(extraction_config_dict: Dict) ‑> ExtractionConfig
-
Expand source code
@staticmethod def from_dict(extraction_config_dict: Dict) -> 'ExtractionConfig': """Create an ExtractionConfig instance from a dictionary.""" body = extraction_config_dict.get('body', None) content_type = extraction_config_dict.get('content_type', None) url = extraction_config_dict.get('url', None) charset = extraction_config_dict.get('charset', None) extraction_template = extraction_config_dict.get('extraction_template', None) extraction_ephemeral_template = extraction_config_dict.get('extraction_ephemeral_template', None) extraction_prompt = extraction_config_dict.get('extraction_prompt', None) extraction_model = extraction_config_dict.get('extraction_model', None) is_document_compressed = extraction_config_dict.get('is_document_compressed', None) document_compression_format = extraction_config_dict.get('document_compression_format', None) document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None webhook = extraction_config_dict.get('webhook', None) raise_on_upstream_error = extraction_config_dict.get('raise_on_upstream_error', True) return ExtractionConfig( body=body, content_type=content_type, url=url, charset=charset, extraction_template=extraction_template, extraction_ephemeral_template=extraction_ephemeral_template, extraction_prompt=extraction_prompt, extraction_model=extraction_model, is_document_compressed=is_document_compressed, document_compression_format=document_compression_format, webhook=webhook, raise_on_upstream_error=raise_on_upstream_error )
Create an ExtractionConfig instance from a dictionary.
Methods
def to_api_params(self, key: str) ‑> Dict
-
Expand source code
def to_api_params(self, key: str) -> Dict: params = { 'key': self.key or key, 'content_type': self.content_type } if self.url: params['url'] = self.url if self.charset: params['charset'] = self.charset if self.extraction_template and self.extraction_ephemeral_template: raise ExtractionConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose') if self.extraction_template: params['extraction_template'] = self.extraction_template if self.extraction_ephemeral_template: self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template) params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8') if self.extraction_prompt: params['extraction_prompt'] = quote_plus(self.extraction_prompt) if self.extraction_model: params['extraction_model'] = self.extraction_model if self.webhook: params['webhook_name'] = self.webhook return params
def to_dict(self) ‑> Dict
-
Expand source code
def to_dict(self) -> Dict: """ Export the ExtractionConfig instance to a plain dictionary. """ if self.is_document_compressed is True: compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None if compression_foramt == CompressionFormat.GZIP: import gzip self.body = gzip.decompress(self.body) elif compression_foramt == CompressionFormat.ZSTD: import zstandard as zstd self.body = zstd.decompress(self.body) elif compression_foramt == CompressionFormat.DEFLATE: import zlib decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS) self.body = decompressor.decompress(self.body) + decompressor.flush() if isinstance(self.body, bytes): self.body = self.body.decode('utf-8') self.is_document_compressed = False return { 'body': self.body, 'content_type': self.content_type, 'url': self.url, 'charset': self.charset, 'extraction_template': self.extraction_template, 'extraction_ephemeral_template': self.extraction_ephemeral_template, 'extraction_prompt': self.extraction_prompt, 'extraction_model': self.extraction_model, 'is_document_compressed': self.is_document_compressed, 'document_compression_format': CompressionFormat(self.document_compression_format).value if self.document_compression_format else None, 'webhook': self.webhook, 'raise_on_upstream_error': self.raise_on_upstream_error, }
Export the ExtractionConfig instance to a plain dictionary.
class HttpError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class HttpError(ScrapflyError): def __init__(self, request:Request, response:Optional[Response]=None, **kwargs): self.request = request self.response = response super().__init__(**kwargs) def __str__(self) -> str: if isinstance(self, UpstreamHttpError): return f"Target website responded with {self.api_response.scrape_result['status_code']} - {self.api_response.scrape_result['reason']}" if self.api_response is not None: return self.api_response.error_message text = f"{self.response.status_code} - {self.response.reason}" if isinstance(self, (ApiHttpClientError, ApiHttpServerError)): text += " - " + self.message return text
Common base class for all non-exit exceptions.
Ancestors
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
- ApiHttpClientError
- scrapfly.errors.ExtractionAPIError
- scrapfly.errors.QuotaLimitReached
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.ScreenshotAPIError
- scrapfly.errors.TooManyConcurrentRequest
- scrapfly.errors.UpstreamHttpError
Inherited members
class ResponseBodyHandler (use_brotli: bool = False, signing_secrets: Tuple[str] | None = None)
-
Expand source code
class ResponseBodyHandler: SUPPORTED_COMPRESSION = ['gzip', 'deflate'] SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json'] class JSONDateTimeDecoder(JSONDecoder): def __init__(self, *args, **kargs): JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs) # brotli under perform at same gzip level and upper level destroy the cpu so # the trade off do not worth it for most of usage def __init__(self, use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None): if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION: try: try: import brotlicffi as brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: import brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: pass try: import zstd self.SUPPORTED_COMPRESSION.append('zstd') except ImportError: pass self.content_encoding: str = ', '.join(self.SUPPORTED_COMPRESSION) self._signing_secret: Optional[Tuple[str]] = None if signing_secrets: _secrets = set() for signing_secret in signing_secrets: _secrets.add(binascii.unhexlify(signing_secret)) self._signing_secret = tuple(_secrets) try: # automatically use msgpack if available https://msgpack.org/ import msgpack self.accept = 'application/msgpack;charset=utf-8' self.content_type = 'application/msgpack;charset=utf-8' self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False) except ImportError: self.accept = 'application/json;charset=utf-8' self.content_type = 'application/json;charset=utf-8' self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder) def support(self, headers: Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False def verify(self, message: bytes, signature: str) -> bool: for signing_secret in self._signing_secret: if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature: return True return False def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict: if content_encoding == 'gzip' or content_encoding == 'gz': import gzip content = gzip.decompress(content) elif content_encoding == 'deflate': import zlib content = zlib.decompress(content) elif content_encoding == 'brotli' or content_encoding == 'br': import brotli content = brotli.decompress(content) elif content_encoding == 'zstd': import zstd content = zstd.decompress(content) if self._signing_secret is not None and signature is not None: if not self.verify(content, signature): raise WebhookSignatureMissMatch() if content_type.startswith('application/json'): content = loads(content, cls=self.JSONDateTimeDecoder) elif content_type.startswith('application/msgpack'): import msgpack content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False) return content def __call__(self, content: bytes, content_type: str) -> Union[str, Dict]: content_loader = None if content_type.find('application/json') != -1: content_loader = partial(loads, cls=self.JSONDateTimeDecoder) elif content_type.find('application/msgpack') != -1: import msgpack content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False) if content_loader is None: raise Exception('Unsupported content type') try: return content_loader(content) except Exception as e: try: raise EncoderError(content=content.decode('utf-8')) from e except UnicodeError: raise EncoderError(content=base64.b64encode(content).decode('utf-8')) from e
Class variables
var JSONDateTimeDecoder
-
Simple JSON https://json.org decoder
Performs the following translations in decoding by default:
+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+
It also understands
NaN
,Infinity
, and-Infinity
as their correspondingfloat
values, which is outside the JSON spec. var SUPPORTED_COMPRESSION
-
The type of the None singleton.
var SUPPORTED_CONTENT_TYPES
-
The type of the None singleton.
Methods
def read(self,
content: bytes,
content_encoding: str,
content_type: str,
signature: str | None) ‑> Dict-
Expand source code
def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict: if content_encoding == 'gzip' or content_encoding == 'gz': import gzip content = gzip.decompress(content) elif content_encoding == 'deflate': import zlib content = zlib.decompress(content) elif content_encoding == 'brotli' or content_encoding == 'br': import brotli content = brotli.decompress(content) elif content_encoding == 'zstd': import zstd content = zstd.decompress(content) if self._signing_secret is not None and signature is not None: if not self.verify(content, signature): raise WebhookSignatureMissMatch() if content_type.startswith('application/json'): content = loads(content, cls=self.JSONDateTimeDecoder) elif content_type.startswith('application/msgpack'): import msgpack content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False) return content
def support(self, headers: Dict) ‑> bool
-
Expand source code
def support(self, headers: Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False
def verify(self, message: bytes, signature: str) ‑> bool
-
Expand source code
def verify(self, message: bytes, signature: str) -> bool: for signing_secret in self._signing_secret: if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature: return True return False
class ScrapeApiResponse (request: requests.models.Request,
response: requests.models.Response,
scrape_config: ScrapeConfig,
api_result: Dict | None = None,
large_object_handler: Callable | None = None)-
Expand source code
class ScrapeApiResponse(ApiResponse): scrape_config:ScrapeConfig large_object_handler:Callable def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler:Optional[Callable]=None): super().__init__(request, response) self.scrape_config = scrape_config self.large_object_handler = large_object_handler if self.scrape_config.method == 'HEAD': api_result = { 'result': { 'request_headers': {}, 'status': 'DONE', 'success': 200 >= self.response.status_code < 300, 'response_headers': self.response.headers, 'status_code': self.response.status_code, 'reason': self.response.reason, 'format': 'text', 'content': '' }, 'context': {}, 'config': self.scrape_config.__dict__ } if 'X-Scrapfly-Reject-Code' in self.response.headers: api_result['result']['error'] = { 'code': self.response.headers['X-Scrapfly-Reject-Code'], 'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']), 'message': self.response.headers['X-Scrapfly-Reject-Description'], 'error_id': self.response.headers['X-Scrapfly-Reject-ID'], 'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False, 'doc_url': '', 'links': {} } if 'X-Scrapfly-Reject-Doc' in self.response.headers: api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc'] api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc'] if isinstance(api_result, str): raise HttpError( request=request, response=response, message='Bad gateway', code=502, http_status_code=502, is_retryable=True ) self.result = self.handle_api_result(api_result=api_result) @property def scrape_result(self) -> Optional[Dict]: return self.result.get('result', None) @property def config(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['config'] @property def context(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['context'] @property def content(self) -> str: if self.scrape_result is None: return '' return self.scrape_result['content'] @property def success(self) -> bool: """ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299 @property def scrape_success(self) -> bool: scrape_result = self.scrape_result if not scrape_result: return False return self.scrape_result['success'] @property def error(self) -> Optional[Dict]: if self.scrape_result is None: return None if self.scrape_success is False: return self.scrape_result['error'] @property def upstream_status_code(self) -> Optional[int]: if self.scrape_result is None: return None if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None @cached_property def soup(self) -> 'BeautifulSoup': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from bs4 import BeautifulSoup soup = BeautifulSoup(self.content, "lxml") return soup except ImportError as e: logger.error('You must install scrapfly[parser] to enable this feature') @cached_property def selector(self) -> 'Selector': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from parsel import Selector return Selector(text=self.content) except ImportError as e: logger.error('You must install parsel or scrapy package to enable this feature') raise e def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if self.large_object_handler is not None and api_result['result']['content']: content_format = api_result['result']['format'] if content_format in ['clob', 'blob']: api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format) elif content_format == 'binary': base64_payload = api_result['result']['content'] if isinstance(base64_payload, bytes): base64_payload = base64_payload.decode('utf-8') api_result['result']['content'] = BytesIO(b64decode(base64_payload)) return FrozenDict(api_result) def _is_api_error(self, api_result: Dict) -> bool: if self.scrape_config.method == 'HEAD': if 'X-Reject-Reason' in self.response.headers: return True return False if api_result is None: return True return 'error_id' in api_result def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class) if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error
Ancestors
Class variables
var large_object_handler : Callable
-
The type of the None singleton.
var scrape_config : ScrapeConfig
-
The type of the None singleton.
Instance variables
prop config : Dict | None
-
Expand source code
@property def config(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['config']
prop content : str
-
Expand source code
@property def content(self) -> str: if self.scrape_result is None: return '' return self.scrape_result['content']
prop context : Dict | None
-
Expand source code
@property def context(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['context']
prop error : Dict | None
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.scrape_result is None: return None if self.scrape_success is False: return self.scrape_result['error']
prop scrape_result : Dict | None
-
Expand source code
@property def scrape_result(self) -> Optional[Dict]: return self.result.get('result', None)
prop scrape_success : bool
-
Expand source code
@property def scrape_success(self) -> bool: scrape_result = self.scrape_result if not scrape_result: return False return self.scrape_result['success']
var selector : Selector
-
Expand source code
@cached_property def selector(self) -> 'Selector': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from parsel import Selector return Selector(text=self.content) except ImportError as e: logger.error('You must install parsel or scrapy package to enable this feature') raise e
var soup : BeautifulSoup
-
Expand source code
@cached_property def soup(self) -> 'BeautifulSoup': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from bs4 import BeautifulSoup soup = BeautifulSoup(self.content, "lxml") return soup except ImportError as e: logger.error('You must install scrapfly[parser] to enable this feature')
prop success : bool
-
Expand source code
@property def success(self) -> bool: """ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299
Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
prop upstream_status_code : int | None
-
Expand source code
@property def upstream_status_code(self) -> Optional[int]: if self.scrape_result is None: return None if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None
Methods
def handle_api_result(self, api_result: Dict) ‑> FrozenDict | None
-
Expand source code
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if self.large_object_handler is not None and api_result['result']['content']: content_format = api_result['result']['format'] if content_format in ['clob', 'blob']: api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format) elif content_format == 'binary': base64_payload = api_result['result']['content'] if isinstance(base64_payload, bytes): base64_payload = base64_payload.decode('utf-8') api_result['result']['content'] = BytesIO(b64decode(base64_payload)) return FrozenDict(api_result)
def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ApiHttpClientError)-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class) if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error
def sink(self,
path: str | None = None,
name: str | None = None,
file:| _io.BytesIO | None = None,
content: str | bytes | None = None)-
Expand source code
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path)
def upstream_result_into_response(self) ‑> requests.models.Response | None
-
Expand source code
def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response
Inherited members
class ScrapeConfig (url: str,
retry: bool = True,
method: str = 'GET',
country: str | None = None,
render_js: bool = False,
cache: bool = False,
cache_clear: bool = False,
ssl: bool = False,
dns: bool = False,
asp: bool = False,
debug: bool = False,
raise_on_upstream_error: bool = True,
cache_ttl: int | None = None,
proxy_pool: str | None = None,
session: str | None = None,
tags: List[str] | Set[str] | None = None,
format: Format | None = None,
format_options: List[FormatOption] | None = None,
extraction_template: str | None = None,
extraction_ephemeral_template: Dict | None = None,
extraction_prompt: str | None = None,
extraction_model: str | None = None,
correlation_id: str | None = None,
cookies: requests.structures.CaseInsensitiveDict | None = None,
body: str | None = None,
data: Dict | None = None,
headers: requests.structures.CaseInsensitiveDict | Dict[str, str] | None = None,
js: str = None,
rendering_wait: int = None,
rendering_stage: Literal['complete', 'domcontentloaded'] = 'complete',
wait_for_selector: str | None = None,
screenshots: Dict | None = None,
screenshot_flags: List[ScreenshotFlag] | None = None,
session_sticky_proxy: bool | None = None,
webhook: str | None = None,
timeout: int | None = None,
js_scenario: List | None = None,
extract: Dict | None = None,
os: str | None = None,
lang: List[str] | None = None,
auto_scroll: bool | None = None,
cost_budget: int | None = None)-
Expand source code
class ScrapeConfig(BaseApiConfig): PUBLIC_DATACENTER_POOL = 'public_datacenter_pool' PUBLIC_RESIDENTIAL_POOL = 'public_residential_pool' url: str retry: bool = True method: str = 'GET' country: Optional[str] = None render_js: bool = False cache: bool = False cache_clear:bool = False ssl:bool = False dns:bool = False asp:bool = False debug: bool = False raise_on_upstream_error:bool = True cache_ttl:Optional[int] = None proxy_pool:Optional[str] = None session: Optional[str] = None tags: Optional[List[str]] = None format: Optional[Format] = None, # raw(unchanged) format_options: Optional[List[FormatOption]] extraction_template: Optional[str] = None # a saved template name extraction_ephemeral_template: Optional[Dict] # ephemeraly declared json template extraction_prompt: Optional[str] = None extraction_model: Optional[str] = None correlation_id: Optional[str] = None cookies: Optional[CaseInsensitiveDict] = None body: Optional[str] = None data: Optional[Dict] = None headers: Optional[CaseInsensitiveDict] = None js: str = None rendering_wait: int = None rendering_stage: Literal["complete", "domcontentloaded"] = "complete" wait_for_selector: Optional[str] = None session_sticky_proxy:bool = True screenshots:Optional[Dict]=None screenshot_flags: Optional[List[ScreenshotFlag]] = None, webhook:Optional[str]=None timeout:Optional[int]=None # in milliseconds js_scenario: Dict = None extract: Dict = None lang:Optional[List[str]] = None os:Optional[str] = None auto_scroll:Optional[bool] = None cost_budget:Optional[int] = None def __init__( self, url: str, retry: bool = True, method: str = 'GET', country: Optional[str] = None, render_js: bool = False, cache: bool = False, cache_clear:bool = False, ssl:bool = False, dns:bool = False, asp:bool = False, debug: bool = False, raise_on_upstream_error:bool = True, cache_ttl:Optional[int] = None, proxy_pool:Optional[str] = None, session: Optional[str] = None, tags: Optional[Union[List[str], Set[str]]] = None, format: Optional[Format] = None, # raw(unchanged) format_options: Optional[List[FormatOption]] = None, # raw(unchanged) extraction_template: Optional[str] = None, # a saved template name extraction_ephemeral_template: Optional[Dict] = None, # ephemeraly declared json template extraction_prompt: Optional[str] = None, extraction_model: Optional[str] = None, correlation_id: Optional[str] = None, cookies: Optional[CaseInsensitiveDict] = None, body: Optional[str] = None, data: Optional[Dict] = None, headers: Optional[Union[CaseInsensitiveDict, Dict[str, str]]] = None, js: str = None, rendering_wait: int = None, rendering_stage: Literal["complete", "domcontentloaded"] = "complete", wait_for_selector: Optional[str] = None, screenshots:Optional[Dict]=None, screenshot_flags: Optional[List[ScreenshotFlag]] = None, session_sticky_proxy:Optional[bool] = None, webhook:Optional[str] = None, timeout:Optional[int] = None, # in milliseconds js_scenario:Optional[List] = None, extract:Optional[Dict] = None, os:Optional[str] = None, lang:Optional[List[str]] = None, auto_scroll:Optional[bool] = None, cost_budget:Optional[int] = None ): assert(type(url) is str) if isinstance(tags, List): tags = set(tags) cookies = cookies or {} headers = headers or {} self.cookies = CaseInsensitiveDict(cookies) self.headers = CaseInsensitiveDict(headers) self.url = url self.retry = retry self.method = method self.country = country self.session_sticky_proxy = session_sticky_proxy self.render_js = render_js self.cache = cache self.cache_clear = cache_clear self.asp = asp self.webhook = webhook self.session = session self.debug = debug self.cache_ttl = cache_ttl self.proxy_pool = proxy_pool self.tags = tags or set() self.format = format self.format_options = format_options self.extraction_template = extraction_template self.extraction_ephemeral_template = extraction_ephemeral_template self.extraction_prompt = extraction_prompt self.extraction_model = extraction_model self.correlation_id = correlation_id self.wait_for_selector = wait_for_selector self.body = body self.data = data self.js = js self.rendering_wait = rendering_wait self.rendering_stage = rendering_stage self.raise_on_upstream_error = raise_on_upstream_error self.screenshots = screenshots self.screenshot_flags = screenshot_flags self.key = None self.dns = dns self.ssl = ssl self.js_scenario = js_scenario self.timeout = timeout self.extract = extract self.lang = lang self.os = os self.auto_scroll = auto_scroll self.cost_budget = cost_budget if cookies: _cookies = [] for name, value in cookies.items(): _cookies.append(name + '=' + value) if 'cookie' in self.headers: if self.headers['cookie'][-1] != ';': self.headers['cookie'] += ';' else: self.headers['cookie'] = '' self.headers['cookie'] += '; '.join(_cookies) if self.body and self.data: raise ScrapeConfigError('You cannot pass both parameters body and data. You must choose') if method in ['POST', 'PUT', 'PATCH']: if self.body is None and self.data is not None: if 'content-type' not in self.headers: self.headers['content-type'] = 'application/x-www-form-urlencoded' self.body = urlencode(data) else: if self.headers['content-type'].find('application/json') != -1: self.body = json.dumps(data) elif self.headers['content-type'].find('application/x-www-form-urlencoded') != -1: self.body = urlencode(data) else: raise ScrapeConfigError('Content-Type "%s" not supported, use body parameter to pass pre encoded body according to your content type' % self.headers['content-type']) elif self.body is None and self.data is None: self.headers['content-type'] = 'text/plain' def to_api_params(self, key:str) -> Dict: params = { 'key': self.key or key, 'url': self.url } if self.country is not None: params['country'] = self.country for name, value in self.headers.items(): params['headers[%s]' % name] = value if self.webhook is not None: params['webhook_name'] = self.webhook if self.timeout is not None: params['timeout'] = self.timeout if self.extract is not None: params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8') if self.cost_budget is not None: params['cost_budget'] = self.cost_budget if self.render_js is True: params['render_js'] = self._bool_to_http(self.render_js) if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.js_scenario: params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8') if self.rendering_wait: params['rendering_wait'] = self.rendering_wait if self.rendering_stage: params['rendering_stage'] = self.rendering_stage if self.screenshots is not None: for name, element in self.screenshots.items(): params['screenshots[%s]' % name] = element if self.screenshot_flags is not None: self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags] params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags) else: if self.screenshot_flags is not None: logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled') if self.auto_scroll is True: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) else: if self.wait_for_selector is not None: logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled') if self.screenshots: logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled') if self.js_scenario: logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled') if self.js: logging.warning('Params "js" is ignored. Works only if render_js is enabled') if self.rendering_wait: logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled') if self.asp is True: params['asp'] = self._bool_to_http(self.asp) if self.retry is False: params['retry'] = self._bool_to_http(self.retry) if self.cache is True: params['cache'] = self._bool_to_http(self.cache) if self.cache_clear is True: params['cache_clear'] = self._bool_to_http(self.cache_clear) if self.cache_ttl is not None: params['cache_ttl'] = self.cache_ttl else: if self.cache_clear is True: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.dns is True: params['dns'] = self._bool_to_http(self.dns) if self.ssl is True: params['ssl'] = self._bool_to_http(self.ssl) if self.tags: params['tags'] = ','.join(self.tags) if self.format: params['format'] = Format(self.format).value if self.format_options: params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options) if self.extraction_template and self.extraction_ephemeral_template: raise ScrapeConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose') if self.extraction_template: params['extraction_template'] = self.extraction_template if self.extraction_ephemeral_template: self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template) params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8') if self.extraction_prompt: params['extraction_prompt'] = quote_plus(self.extraction_prompt) if self.extraction_model: params['extraction_model'] = self.extraction_model if self.correlation_id: params['correlation_id'] = self.correlation_id if self.session: params['session'] = self.session if self.session_sticky_proxy is True: # false by default params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy) else: if self.session_sticky_proxy: logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled') if self.debug is True: params['debug'] = self._bool_to_http(self.debug) if self.proxy_pool is not None: params['proxy_pool'] = self.proxy_pool if self.lang is not None: params['lang'] = ','.join(self.lang) if self.os is not None: params['os'] = self.os return params @staticmethod def from_exported_config(config:str) -> 'ScrapeConfig': try: from msgpack import loads as msgpack_loads except ImportError as e: print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack') raise data = msgpack_loads(base64.b64decode(config)) headers = {} for name, value in data['headers'].items(): if isinstance(value, Iterable): headers[name] = '; '.join(value) else: headers[name] = value return ScrapeConfig( url=data['url'], retry=data['retry'], headers=headers, session=data['session'], session_sticky_proxy=data['session_sticky_proxy'], cache=data['cache'], cache_ttl=data['cache_ttl'], cache_clear=data['cache_clear'], render_js=data['render_js'], method=data['method'], asp=data['asp'], body=data['body'], ssl=data['ssl'], dns=data['dns'], country=data['country'], debug=data['debug'], correlation_id=data['correlation_id'], tags=data['tags'], format=data['format'], js=data['js'], rendering_wait=data['rendering_wait'], screenshots=data['screenshots'] or {}, screenshot_flags=data['screenshot_flags'], proxy_pool=data['proxy_pool'], auto_scroll=data['auto_scroll'], cost_budget=data['cost_budget'] ) def to_dict(self) -> Dict: """ Export the ScrapeConfig instance to a plain dictionary. Useful for JSON-serialization or other external storage. """ return { 'url': self.url, 'retry': self.retry, 'method': self.method, 'country': self.country, 'render_js': self.render_js, 'cache': self.cache, 'cache_clear': self.cache_clear, 'ssl': self.ssl, 'dns': self.dns, 'asp': self.asp, 'debug': self.debug, 'raise_on_upstream_error': self.raise_on_upstream_error, 'cache_ttl': self.cache_ttl, 'proxy_pool': self.proxy_pool, 'session': self.session, 'tags': list(self.tags), 'format': Format(self.format).value if self.format else None, 'format_options': [FormatOption(option).value for option in self.format_options] if self.format_options else None, 'extraction_template': self.extraction_template, 'extraction_ephemeral_template': self.extraction_ephemeral_template, 'extraction_prompt': self.extraction_prompt, 'extraction_model': self.extraction_model, 'correlation_id': self.correlation_id, 'cookies': CaseInsensitiveDict(self.cookies), 'body': self.body, 'data': None if self.body else self.data, 'headers': CaseInsensitiveDict(self.headers), 'js': self.js, 'rendering_wait': self.rendering_wait, 'wait_for_selector': self.wait_for_selector, 'session_sticky_proxy': self.session_sticky_proxy, 'screenshots': self.screenshots, 'screenshot_flags': [ScreenshotFlag(flag).value for flag in self.screenshot_flags] if self.screenshot_flags else None, 'webhook': self.webhook, 'timeout': self.timeout, 'js_scenario': self.js_scenario, 'extract': self.extract, 'lang': self.lang, 'os': self.os, 'auto_scroll': self.auto_scroll, 'cost_budget': self.cost_budget, } @staticmethod def from_dict(scrape_config_dict: Dict) -> 'ScrapeConfig': """Create a ScrapeConfig instance from a dictionary.""" url = scrape_config_dict.get('url', None) retry = scrape_config_dict.get('retry', False) method = scrape_config_dict.get('method', 'GET') country = scrape_config_dict.get('country', None) render_js = scrape_config_dict.get('render_js', False) cache = scrape_config_dict.get('cache', False) cache_clear = scrape_config_dict.get('cache_clear', False) ssl = scrape_config_dict.get('ssl', False) dns = scrape_config_dict.get('dns', False) asp = scrape_config_dict.get('asp', False) debug = scrape_config_dict.get('debug', False) raise_on_upstream_error = scrape_config_dict.get('raise_on_upstream_error', True) cache_ttl = scrape_config_dict.get('cache_ttl', None) proxy_pool = scrape_config_dict.get('proxy_pool', None) session = scrape_config_dict.get('session', None) tags = scrape_config_dict.get('tags', []) format = scrape_config_dict.get('format', None) format = Format(format) if format else None format_options = scrape_config_dict.get('format_options', None) format_options = [FormatOption(option) for option in format_options] if format_options else None extraction_template = scrape_config_dict.get('extraction_template', None) extraction_ephemeral_template = scrape_config_dict.get('extraction_ephemeral_template', None) extraction_prompt = scrape_config_dict.get('extraction_prompt', None) extraction_model = scrape_config_dict.get('extraction_model', None) correlation_id = scrape_config_dict.get('correlation_id', None) cookies = scrape_config_dict.get('cookies', {}) body = scrape_config_dict.get('body', None) data = scrape_config_dict.get('data', None) headers = scrape_config_dict.get('headers', {}) js = scrape_config_dict.get('js', None) rendering_wait = scrape_config_dict.get('rendering_wait', None) wait_for_selector = scrape_config_dict.get('wait_for_selector', None) screenshots = scrape_config_dict.get('screenshots', []) screenshot_flags = scrape_config_dict.get('screenshot_flags', []) screenshot_flags = [ScreenshotFlag(flag) for flag in screenshot_flags] if screenshot_flags else None session_sticky_proxy = scrape_config_dict.get('session_sticky_proxy', False) webhook = scrape_config_dict.get('webhook', None) timeout = scrape_config_dict.get('timeout', None) js_scenario = scrape_config_dict.get('js_scenario', None) extract = scrape_config_dict.get('extract', None) os = scrape_config_dict.get('os', None) lang = scrape_config_dict.get('lang', None) auto_scroll = scrape_config_dict.get('auto_scroll', None) cost_budget = scrape_config_dict.get('cost_budget', None) return ScrapeConfig( url=url, retry=retry, method=method, country=country, render_js=render_js, cache=cache, cache_clear=cache_clear, ssl=ssl, dns=dns, asp=asp, debug=debug, raise_on_upstream_error=raise_on_upstream_error, cache_ttl=cache_ttl, proxy_pool=proxy_pool, session=session, tags=tags, format=format, format_options=format_options, extraction_template=extraction_template, extraction_ephemeral_template=extraction_ephemeral_template, extraction_prompt=extraction_prompt, extraction_model=extraction_model, correlation_id=correlation_id, cookies=cookies, body=body, data=data, headers=headers, js=js, rendering_wait=rendering_wait, wait_for_selector=wait_for_selector, screenshots=screenshots, screenshot_flags=screenshot_flags, session_sticky_proxy=session_sticky_proxy, webhook=webhook, timeout=timeout, js_scenario=js_scenario, extract=extract, os=os, lang=lang, auto_scroll=auto_scroll, cost_budget=cost_budget, )
Ancestors
Class variables
var PUBLIC_DATACENTER_POOL
-
The type of the None singleton.
var PUBLIC_RESIDENTIAL_POOL
-
The type of the None singleton.
var asp : bool
-
The type of the None singleton.
var auto_scroll : bool | None
-
The type of the None singleton.
var body : str | None
-
The type of the None singleton.
var cache : bool
-
The type of the None singleton.
var cache_clear : bool
-
The type of the None singleton.
var cache_ttl : int | None
-
The type of the None singleton.
-
The type of the None singleton.
var correlation_id : str | None
-
The type of the None singleton.
var cost_budget : int | None
-
The type of the None singleton.
var country : str | None
-
The type of the None singleton.
var data : Dict | None
-
The type of the None singleton.
var debug : bool
-
The type of the None singleton.
var dns : bool
-
The type of the None singleton.
var extract : Dict
-
The type of the None singleton.
var extraction_ephemeral_template : Dict | None
-
The type of the None singleton.
var extraction_model : str | None
-
The type of the None singleton.
var extraction_prompt : str | None
-
The type of the None singleton.
var extraction_template : str | None
-
The type of the None singleton.
var format : Format | None
-
The type of the None singleton.
var format_options : List[FormatOption] | None
-
The type of the None singleton.
var headers : requests.structures.CaseInsensitiveDict | None
-
The type of the None singleton.
var js : str
-
The type of the None singleton.
var js_scenario : Dict
-
The type of the None singleton.
var lang : List[str] | None
-
The type of the None singleton.
var method : str
-
The type of the None singleton.
var os : str | None
-
The type of the None singleton.
var proxy_pool : str | None
-
The type of the None singleton.
var raise_on_upstream_error : bool
-
The type of the None singleton.
var render_js : bool
-
The type of the None singleton.
var rendering_stage : Literal['complete', 'domcontentloaded']
-
The type of the None singleton.
var rendering_wait : int
-
The type of the None singleton.
var retry : bool
-
The type of the None singleton.
var screenshot_flags : List[ScreenshotFlag] | None
-
The type of the None singleton.
var screenshots : Dict | None
-
The type of the None singleton.
var session : str | None
-
The type of the None singleton.
var session_sticky_proxy : bool
-
The type of the None singleton.
var ssl : bool
-
The type of the None singleton.
-
The type of the None singleton.
var timeout : int | None
-
The type of the None singleton.
var url : str
-
The type of the None singleton.
var wait_for_selector : str | None
-
The type of the None singleton.
var webhook : str | None
-
The type of the None singleton.
Static methods
def from_dict(scrape_config_dict: Dict) ‑> ScrapeConfig
-
Expand source code
@staticmethod def from_dict(scrape_config_dict: Dict) -> 'ScrapeConfig': """Create a ScrapeConfig instance from a dictionary.""" url = scrape_config_dict.get('url', None) retry = scrape_config_dict.get('retry', False) method = scrape_config_dict.get('method', 'GET') country = scrape_config_dict.get('country', None) render_js = scrape_config_dict.get('render_js', False) cache = scrape_config_dict.get('cache', False) cache_clear = scrape_config_dict.get('cache_clear', False) ssl = scrape_config_dict.get('ssl', False) dns = scrape_config_dict.get('dns', False) asp = scrape_config_dict.get('asp', False) debug = scrape_config_dict.get('debug', False) raise_on_upstream_error = scrape_config_dict.get('raise_on_upstream_error', True) cache_ttl = scrape_config_dict.get('cache_ttl', None) proxy_pool = scrape_config_dict.get('proxy_pool', None) session = scrape_config_dict.get('session', None) tags = scrape_config_dict.get('tags', []) format = scrape_config_dict.get('format', None) format = Format(format) if format else None format_options = scrape_config_dict.get('format_options', None) format_options = [FormatOption(option) for option in format_options] if format_options else None extraction_template = scrape_config_dict.get('extraction_template', None) extraction_ephemeral_template = scrape_config_dict.get('extraction_ephemeral_template', None) extraction_prompt = scrape_config_dict.get('extraction_prompt', None) extraction_model = scrape_config_dict.get('extraction_model', None) correlation_id = scrape_config_dict.get('correlation_id', None) cookies = scrape_config_dict.get('cookies', {}) body = scrape_config_dict.get('body', None) data = scrape_config_dict.get('data', None) headers = scrape_config_dict.get('headers', {}) js = scrape_config_dict.get('js', None) rendering_wait = scrape_config_dict.get('rendering_wait', None) wait_for_selector = scrape_config_dict.get('wait_for_selector', None) screenshots = scrape_config_dict.get('screenshots', []) screenshot_flags = scrape_config_dict.get('screenshot_flags', []) screenshot_flags = [ScreenshotFlag(flag) for flag in screenshot_flags] if screenshot_flags else None session_sticky_proxy = scrape_config_dict.get('session_sticky_proxy', False) webhook = scrape_config_dict.get('webhook', None) timeout = scrape_config_dict.get('timeout', None) js_scenario = scrape_config_dict.get('js_scenario', None) extract = scrape_config_dict.get('extract', None) os = scrape_config_dict.get('os', None) lang = scrape_config_dict.get('lang', None) auto_scroll = scrape_config_dict.get('auto_scroll', None) cost_budget = scrape_config_dict.get('cost_budget', None) return ScrapeConfig( url=url, retry=retry, method=method, country=country, render_js=render_js, cache=cache, cache_clear=cache_clear, ssl=ssl, dns=dns, asp=asp, debug=debug, raise_on_upstream_error=raise_on_upstream_error, cache_ttl=cache_ttl, proxy_pool=proxy_pool, session=session, tags=tags, format=format, format_options=format_options, extraction_template=extraction_template, extraction_ephemeral_template=extraction_ephemeral_template, extraction_prompt=extraction_prompt, extraction_model=extraction_model, correlation_id=correlation_id, cookies=cookies, body=body, data=data, headers=headers, js=js, rendering_wait=rendering_wait, wait_for_selector=wait_for_selector, screenshots=screenshots, screenshot_flags=screenshot_flags, session_sticky_proxy=session_sticky_proxy, webhook=webhook, timeout=timeout, js_scenario=js_scenario, extract=extract, os=os, lang=lang, auto_scroll=auto_scroll, cost_budget=cost_budget, )
Create a ScrapeConfig instance from a dictionary.
def from_exported_config(config: str) ‑> ScrapeConfig
-
Expand source code
@staticmethod def from_exported_config(config:str) -> 'ScrapeConfig': try: from msgpack import loads as msgpack_loads except ImportError as e: print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack') raise data = msgpack_loads(base64.b64decode(config)) headers = {} for name, value in data['headers'].items(): if isinstance(value, Iterable): headers[name] = '; '.join(value) else: headers[name] = value return ScrapeConfig( url=data['url'], retry=data['retry'], headers=headers, session=data['session'], session_sticky_proxy=data['session_sticky_proxy'], cache=data['cache'], cache_ttl=data['cache_ttl'], cache_clear=data['cache_clear'], render_js=data['render_js'], method=data['method'], asp=data['asp'], body=data['body'], ssl=data['ssl'], dns=data['dns'], country=data['country'], debug=data['debug'], correlation_id=data['correlation_id'], tags=data['tags'], format=data['format'], js=data['js'], rendering_wait=data['rendering_wait'], screenshots=data['screenshots'] or {}, screenshot_flags=data['screenshot_flags'], proxy_pool=data['proxy_pool'], auto_scroll=data['auto_scroll'], cost_budget=data['cost_budget'] )
Methods
def to_api_params(self, key: str) ‑> Dict
-
Expand source code
def to_api_params(self, key:str) -> Dict: params = { 'key': self.key or key, 'url': self.url } if self.country is not None: params['country'] = self.country for name, value in self.headers.items(): params['headers[%s]' % name] = value if self.webhook is not None: params['webhook_name'] = self.webhook if self.timeout is not None: params['timeout'] = self.timeout if self.extract is not None: params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8') if self.cost_budget is not None: params['cost_budget'] = self.cost_budget if self.render_js is True: params['render_js'] = self._bool_to_http(self.render_js) if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.js_scenario: params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8') if self.rendering_wait: params['rendering_wait'] = self.rendering_wait if self.rendering_stage: params['rendering_stage'] = self.rendering_stage if self.screenshots is not None: for name, element in self.screenshots.items(): params['screenshots[%s]' % name] = element if self.screenshot_flags is not None: self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags] params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags) else: if self.screenshot_flags is not None: logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled') if self.auto_scroll is True: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) else: if self.wait_for_selector is not None: logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled') if self.screenshots: logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled') if self.js_scenario: logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled') if self.js: logging.warning('Params "js" is ignored. Works only if render_js is enabled') if self.rendering_wait: logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled') if self.asp is True: params['asp'] = self._bool_to_http(self.asp) if self.retry is False: params['retry'] = self._bool_to_http(self.retry) if self.cache is True: params['cache'] = self._bool_to_http(self.cache) if self.cache_clear is True: params['cache_clear'] = self._bool_to_http(self.cache_clear) if self.cache_ttl is not None: params['cache_ttl'] = self.cache_ttl else: if self.cache_clear is True: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.dns is True: params['dns'] = self._bool_to_http(self.dns) if self.ssl is True: params['ssl'] = self._bool_to_http(self.ssl) if self.tags: params['tags'] = ','.join(self.tags) if self.format: params['format'] = Format(self.format).value if self.format_options: params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options) if self.extraction_template and self.extraction_ephemeral_template: raise ScrapeConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose') if self.extraction_template: params['extraction_template'] = self.extraction_template if self.extraction_ephemeral_template: self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template) params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8') if self.extraction_prompt: params['extraction_prompt'] = quote_plus(self.extraction_prompt) if self.extraction_model: params['extraction_model'] = self.extraction_model if self.correlation_id: params['correlation_id'] = self.correlation_id if self.session: params['session'] = self.session if self.session_sticky_proxy is True: # false by default params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy) else: if self.session_sticky_proxy: logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled') if self.debug is True: params['debug'] = self._bool_to_http(self.debug) if self.proxy_pool is not None: params['proxy_pool'] = self.proxy_pool if self.lang is not None: params['lang'] = ','.join(self.lang) if self.os is not None: params['os'] = self.os return params
def to_dict(self) ‑> Dict
-
Expand source code
def to_dict(self) -> Dict: """ Export the ScrapeConfig instance to a plain dictionary. Useful for JSON-serialization or other external storage. """ return { 'url': self.url, 'retry': self.retry, 'method': self.method, 'country': self.country, 'render_js': self.render_js, 'cache': self.cache, 'cache_clear': self.cache_clear, 'ssl': self.ssl, 'dns': self.dns, 'asp': self.asp, 'debug': self.debug, 'raise_on_upstream_error': self.raise_on_upstream_error, 'cache_ttl': self.cache_ttl, 'proxy_pool': self.proxy_pool, 'session': self.session, 'tags': list(self.tags), 'format': Format(self.format).value if self.format else None, 'format_options': [FormatOption(option).value for option in self.format_options] if self.format_options else None, 'extraction_template': self.extraction_template, 'extraction_ephemeral_template': self.extraction_ephemeral_template, 'extraction_prompt': self.extraction_prompt, 'extraction_model': self.extraction_model, 'correlation_id': self.correlation_id, 'cookies': CaseInsensitiveDict(self.cookies), 'body': self.body, 'data': None if self.body else self.data, 'headers': CaseInsensitiveDict(self.headers), 'js': self.js, 'rendering_wait': self.rendering_wait, 'wait_for_selector': self.wait_for_selector, 'session_sticky_proxy': self.session_sticky_proxy, 'screenshots': self.screenshots, 'screenshot_flags': [ScreenshotFlag(flag).value for flag in self.screenshot_flags] if self.screenshot_flags else None, 'webhook': self.webhook, 'timeout': self.timeout, 'js_scenario': self.js_scenario, 'extract': self.extract, 'lang': self.lang, 'os': self.os, 'auto_scroll': self.auto_scroll, 'cost_budget': self.cost_budget, }
Export the ScrapeConfig instance to a plain dictionary. Useful for JSON-serialization or other external storage.
class ScraperAPI
-
Expand source code
class ScraperAPI: MONITORING_DATA_FORMAT_STRUCTURED = 'structured' MONITORING_DATA_FORMAT_PROMETHEUS = 'prometheus' MONITORING_PERIOD_SUBSCRIPTION = 'subscription' MONITORING_PERIOD_LAST_7D = 'last7d' MONITORING_PERIOD_LAST_24H = 'last24h' MONITORING_PERIOD_LAST_1H = 'last1h' MONITORING_PERIOD_LAST_5m = 'last5m' MONITORING_ACCOUNT_AGGREGATION = 'account' MONITORING_PROJECT_AGGREGATION = 'project' MONITORING_TARGET_AGGREGATION = 'target'
Class variables
var MONITORING_ACCOUNT_AGGREGATION
-
The type of the None singleton.
var MONITORING_DATA_FORMAT_PROMETHEUS
-
The type of the None singleton.
var MONITORING_DATA_FORMAT_STRUCTURED
-
The type of the None singleton.
var MONITORING_PERIOD_LAST_1H
-
The type of the None singleton.
var MONITORING_PERIOD_LAST_24H
-
The type of the None singleton.
var MONITORING_PERIOD_LAST_5m
-
The type of the None singleton.
var MONITORING_PERIOD_LAST_7D
-
The type of the None singleton.
var MONITORING_PERIOD_SUBSCRIPTION
-
The type of the None singleton.
var MONITORING_PROJECT_AGGREGATION
-
The type of the None singleton.
var MONITORING_TARGET_AGGREGATION
-
The type of the None singleton.
class ScrapflyAspError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflyAspError(ScraperAPIError): pass
Common base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Inherited members
class ScrapflyClient (key: str,
host: str | None = 'https://api.scrapfly.io',
verify=True,
debug: bool = False,
max_concurrency: int = 1,
connect_timeout: int = 30,
web_scraping_api_read_timeout: int = 160,
extraction_api_read_timeout: int = 35,
screenshot_api_read_timeout: int = 60,
read_timeout: int = 30,
default_read_timeout: int = 30,
reporter: Callable | None = None,
**kwargs)-
Expand source code
class ScrapflyClient: HOST = 'https://api.scrapfly.io' DEFAULT_CONNECT_TIMEOUT = 30 DEFAULT_READ_TIMEOUT = 30 DEFAULT_WEBSCRAPING_API_READ_TIMEOUT = 160 # 155 real DEFAULT_SCREENSHOT_API_READ_TIMEOUT = 60 # 30 real DEFAULT_EXTRACTION_API_READ_TIMEOUT = 35 # 30 real host:str key:str max_concurrency:int verify:bool debug:bool distributed_mode:bool connect_timeout:int web_scraping_api_read_timeout:int screenshot_api_read_timeout:int extraction_api_read_timeout:int monitoring_api_read_timeout:int default_read_timeout:int brotli: bool reporter:Reporter version:str # @deprecated read_timeout:int CONCURRENCY_AUTO = 'auto' # retrieve the allowed concurrency from your account DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S' def __init__( self, key: str, host: Optional[str] = HOST, verify=True, debug: bool = False, max_concurrency:int=1, connect_timeout:int = DEFAULT_CONNECT_TIMEOUT, web_scraping_api_read_timeout: int = DEFAULT_WEBSCRAPING_API_READ_TIMEOUT, extraction_api_read_timeout: int = DEFAULT_EXTRACTION_API_READ_TIMEOUT, screenshot_api_read_timeout: int = DEFAULT_SCREENSHOT_API_READ_TIMEOUT, # @deprecated read_timeout:int = DEFAULT_READ_TIMEOUT, default_read_timeout:int = DEFAULT_READ_TIMEOUT, reporter:Optional[Callable]=None, **kwargs ): if host[-1] == '/': # remove last '/' if exists host = host[:-1] if 'distributed_mode' in kwargs: warnings.warn("distributed mode is deprecated and will be remove the next version -" " user should handle themself the session name based on the concurrency", DeprecationWarning, stacklevel=2 ) if 'brotli' in kwargs: warnings.warn("brotli arg is deprecated and will be remove the next version - " "brotli is disabled by default", DeprecationWarning, stacklevel=2 ) self.version = __version__ self.host = host self.key = key self.verify = verify self.debug = debug self.connect_timeout = connect_timeout self.web_scraping_api_read_timeout = web_scraping_api_read_timeout self.screenshot_api_read_timeout = screenshot_api_read_timeout self.extraction_api_read_timeout = extraction_api_read_timeout self.monitoring_api_read_timeout = default_read_timeout self.default_read_timeout = default_read_timeout # @deprecated self.read_timeout = default_read_timeout self.max_concurrency = max_concurrency self.body_handler = ResponseBodyHandler(use_brotli=False) self.async_executor = ThreadPoolExecutor() self.http_session = None if not self.verify and not self.HOST.endswith('.local'): urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) if self.debug is True: http.client.HTTPConnection.debuglevel = 5 if reporter is None: from .reporter import NoopReporter reporter = NoopReporter() self.reporter = Reporter(reporter) @property def ua(self) -> str: return 'ScrapflySDK/%s (Python %s, %s, %s)' % ( self.version, platform.python_version(), platform.uname().system, platform.uname().machine ) @cached_property def _http_handler(self): return partial(self.http_session.request if self.http_session else requests.request) @property def http(self): return self._http_handler def _scrape_request(self, scrape_config:ScrapeConfig): return { 'method': scrape_config.method, 'url': self.host + '/scrape', 'data': scrape_config.body, 'verify': self.verify, 'timeout': (self.connect_timeout, self.web_scraping_api_read_timeout), 'headers': { 'content-type': scrape_config.headers['content-type'] if scrape_config.method in ['POST', 'PUT', 'PATCH'] else self.body_handler.content_type, 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, 'params': scrape_config.to_api_params(key=self.key) } def _screenshot_request(self, screenshot_config:ScreenshotConfig): return { 'method': 'GET', 'url': self.host + '/screenshot', 'timeout': (self.connect_timeout, self.screenshot_api_read_timeout), 'headers': { 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, 'params': screenshot_config.to_api_params(key=self.key) } def _extraction_request(self, extraction_config:ExtractionConfig): headers = { 'content-type': extraction_config.content_type, 'accept-encoding': self.body_handler.content_encoding, 'content-encoding': extraction_config.document_compression_format if extraction_config.document_compression_format else None, 'accept': self.body_handler.accept, 'user-agent': self.ua } if extraction_config.document_compression_format: headers['content-encoding'] = extraction_config.document_compression_format.value return { 'method': 'POST', 'url': self.host + '/extraction', 'data': extraction_config.body, 'timeout': (self.connect_timeout, self.extraction_api_read_timeout), 'headers': headers, 'params': extraction_config.to_api_params(key=self.key) } def account(self) -> Union[str, Dict]: response = self._http_handler( method='GET', url=self.host + '/account', params={'key': self.key}, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8') def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None): params = {'key': self.key, 'format': format} if period is not None: params['period'] = period if aggregation is not None: params['aggregation'] = ','.join(aggregation) response = self._http_handler( method='GET', url=self.host + '/scrape/monitoring/metrics', params=params, timeout=(self.connect_timeout, self.monitoring_api_read_timeout), verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8') def get_monitoring_target_metrics( self, domain:str, group_subdomain:bool=False, period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H, start:Optional[datetime.datetime]=None, end:Optional[datetime.datetime]=None, ): params = { 'key': self.key, 'domain': domain, 'group_subdomain': group_subdomain } if (start is not None and end is None) or (start is None and end is not None): raise ValueError('You must provide both start and end date') if start is not None and end is not None: params['start'] = start.strftime(self.DATETIME_FORMAT) params['end'] = end.strftime(self.DATETIME_FORMAT) period = None params['period'] = period response = self._http_handler( method='GET', url=self.host + '/scrape/monitoring/metrics/target', timeout=(self.connect_timeout, self.monitoring_api_read_timeout), params=params, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8') def resilient_scrape( self, scrape_config:ScrapeConfig, retry_on_errors:Set[Exception]={ScrapflyError}, retry_on_status_code:Optional[List[int]]=None, tries: int = 5, delay: int = 20, ) -> ScrapeApiResponse: assert retry_on_errors is not None, 'Retry on error is None' assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()' @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay) def inner() -> ScrapeApiResponse: try: return self.scrape(scrape_config=scrape_config) except (UpstreamHttpClientError, UpstreamHttpServerError) as e: if retry_on_status_code is not None and e.api_response: if e.api_response.upstream_status_code in retry_on_status_code: raise e else: return e.api_response raise e return inner() def open(self): if self.http_session is None: self.http_session = Session() self.http_session.verify = self.verify self.http_session.timeout = (self.connect_timeout, self.default_read_timeout) self.http_session.params['key'] = self.key self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding self.http_session.headers['accept'] = self.body_handler.accept self.http_session.headers['user-agent'] = self.ua def close(self): self.http_session.close() self.http_session = None def __enter__(self) -> 'ScrapflyClient': self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config) async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None): if concurrency is None: concurrency = self.max_concurrency elif concurrency == self.CONCURRENCY_AUTO: concurrency = self.account()['subscription']['max_concurrency'] loop = asyncio.get_running_loop() processing_tasks = [] results = [] processed_tasks = 0 expected_tasks = len(scrape_configs) def scrape_done_callback(task:Task): nonlocal processed_tasks try: if task.cancelled() is True: return error = task.exception() if error is not None: results.append(error) else: results.append(task.result()) finally: processing_tasks.remove(task) processed_tasks += 1 while scrape_configs or results or processing_tasks: logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) if scrape_configs: if len(processing_tasks) < concurrency: # @todo handle backpressure for _ in range(0, concurrency - len(processing_tasks)): try: scrape_config = scrape_configs.pop() except: break scrape_config.raise_on_upstream_error = False task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop)) processing_tasks.append(task) task.add_done_callback(scrape_done_callback) for _ in results: result = results.pop() yield result await asyncio.sleep(.5) logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse: """ Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } } """ try: logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url)) request_data = self._scrape_request(scrape_config=scrape_config) response = self._http_handler(**request_data) scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config) self.reporter.report(scrape_api_response=scrape_api_response) return scrape_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config) @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse: """ Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } } """ try: logger.debug('--> %s Screenshoting' % (screenshot_config.url)) request_data = self._screenshot_request(screenshot_config=screenshot_config) response = self._http_handler(**request_data) screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config) return screenshot_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.extract, extraction_config) @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse: """ Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } } """ try: logger.debug('--> %s Extracting data from' % (extraction_config.content_type)) request_data = self._extraction_request(extraction_config=extraction_config) response = self._http_handler(**request_data) extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config) return extraction_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e def _handle_response(self, response:Response, scrape_config:ScrapeConfig) -> ScrapeApiResponse: try: api_response = self._handle_api_response( response=response, scrape_config=scrape_config, raise_on_upstream_error=scrape_config.raise_on_upstream_error ) if scrape_config.method == 'HEAD': logger.debug('<-- [%s %s] %s | %ss' % ( api_response.response.status_code, api_response.response.reason, api_response.response.request.url, 0 )) else: logger.debug('<-- [%s %s] %s | %ss' % ( api_response.result['result']['status_code'], api_response.result['result']['reason'], api_response.result['config']['url'], api_response.result['result']['duration']) ) logger.debug('Log url: %s' % api_response.result['result']['log_url']) return api_response except UpstreamHttpError as e: logger.critical(e.api_response.error_message) raise except HttpError as e: if e.api_response is not None: logger.critical(e.api_response.error_message) else: logger.critical(e.message) raise except ScrapflyError as e: logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url)) raise def _handle_screenshot_response(self, response:Response, screenshot_config:ScreenshotConfig) -> ScreenshotApiResponse: try: api_response = self._handle_screenshot_api_response( response=response, screenshot_config=screenshot_config, raise_on_upstream_error=screenshot_config.raise_on_upstream_error ) return api_response except UpstreamHttpError as e: logger.critical(e.api_response.error_message) raise except HttpError as e: if e.api_response is not None: logger.critical(e.api_response.error_message) else: logger.critical(e.message) raise except ScrapflyError as e: logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url)) raise def _handle_extraction_response(self, response:Response, extraction_config:ExtractionConfig) -> ExtractionApiResponse: try: api_response = self._handle_extraction_api_response( response=response, extraction_config=extraction_config, raise_on_upstream_error=extraction_config.raise_on_upstream_error ) return api_response except UpstreamHttpError as e: logger.critical(e.api_response.error_message) raise except HttpError as e: if e.api_response is not None: logger.critical(e.api_response.error_message) else: logger.critical(e.message) raise except ScrapflyError as e: logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url)) raise def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None): """ Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str] """ if screenshot_api_response.screenshot_success is not True: raise RuntimeError('Screenshot was not successful') if not screenshot_api_response.image: raise RuntimeError('Screenshot binary does not exist') content = screenshot_api_response.image extension_name = screenshot_api_response.metadata['extension_name'] if path: os.makedirs(path, exist_ok=True) file_path = os.path.join(path, f'{name}.{extension_name}') else: file_path = f'{name}.{extension_name}' if isinstance(content, bytes): content = BytesIO(content) with open(file_path, 'wb') as f: shutil.copyfileobj(content, f, length=131072) def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None): """ Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str] """ if not api_response.scrape_result['screenshots']: raise RuntimeError('Screenshot %s do no exists' % name) try: api_response.scrape_result['screenshots'][name] except KeyError: raise RuntimeError('Screenshot %s do no exists' % name) screenshot_response = self._http_handler( method='GET', url=api_response.scrape_result['screenshots'][name]['url'], params={'key': self.key}, verify=self.verify ) screenshot_response.raise_for_status() if not name.endswith('.jpg'): name += '.jpg' api_response.sink(path=path, name=name, content=screenshot_response.content) def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str: scrape_result = api_response.result['result'] scrape_config = api_response.result['config'] file_content = content or scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = scrape_config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) return file_path def _handle_scrape_large_objects( self, callback_url:str, format: Literal['clob', 'blob'] ) -> Tuple[Union[BytesIO, str], str]: if format not in ['clob', 'blob']: raise ContentError('Large objects handle can handles format format [blob, clob], given: %s' % format) response = self._http_handler(**{ 'method': 'GET', 'url': callback_url, 'verify': self.verify, 'timeout': (self.connect_timeout, self.default_read_timeout), 'headers': { 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, 'params': {'key': self.key} }) if self.body_handler.support(headers=response.headers): content = self.body_handler(content=response.content, content_type=response.headers['content-type']) else: content = response.content if format == 'clob': return content.decode('utf-8'), 'text' return BytesIO(content), 'binary' def _handle_api_response( self, response: Response, scrape_config:ScrapeConfig, raise_on_upstream_error: Optional[bool] = True ) -> ScrapeApiResponse: if scrape_config.method == 'HEAD': body = None else: if self.body_handler.support(headers=response.headers): body = self.body_handler(content=response.content, content_type=response.headers['content-type']) else: body = response.content.decode('utf-8') api_response:ScrapeApiResponse = ScrapeApiResponse( response=response, request=response.request, api_result=body, scrape_config=scrape_config, large_object_handler=self._handle_scrape_large_objects ) api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error) return api_response def _handle_screenshot_api_response( self, response: Response, screenshot_config:ScreenshotConfig, raise_on_upstream_error: Optional[bool] = True ) -> ScreenshotApiResponse: if self.body_handler.support(headers=response.headers): body = self.body_handler(content=response.content, content_type=response.headers['content-type']) else: body = {'result': response.content} api_response:ScreenshotApiResponse = ScreenshotApiResponse( response=response, request=response.request, api_result=body, screenshot_config=screenshot_config ) api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error) return api_response def _handle_extraction_api_response( self, response: Response, extraction_config:ExtractionConfig, raise_on_upstream_error: Optional[bool] = True ) -> ExtractionApiResponse: if self.body_handler.support(headers=response.headers): body = self.body_handler(content=response.content, content_type=response.headers['content-type']) else: body = response.content.decode('utf-8') api_response:ExtractionApiResponse = ExtractionApiResponse( response=response, request=response.request, api_result=body, extraction_config=extraction_config ) api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error) return api_response
Class variables
var CONCURRENCY_AUTO
-
The type of the None singleton.
var DATETIME_FORMAT
-
The type of the None singleton.
var DEFAULT_CONNECT_TIMEOUT
-
The type of the None singleton.
var DEFAULT_EXTRACTION_API_READ_TIMEOUT
-
The type of the None singleton.
var DEFAULT_READ_TIMEOUT
-
The type of the None singleton.
var DEFAULT_SCREENSHOT_API_READ_TIMEOUT
-
The type of the None singleton.
var DEFAULT_WEBSCRAPING_API_READ_TIMEOUT
-
The type of the None singleton.
var HOST
-
The type of the None singleton.
var brotli : bool
-
The type of the None singleton.
var connect_timeout : int
-
The type of the None singleton.
var debug : bool
-
The type of the None singleton.
var default_read_timeout : int
-
The type of the None singleton.
var distributed_mode : bool
-
The type of the None singleton.
var extraction_api_read_timeout : int
-
The type of the None singleton.
var host : str
-
The type of the None singleton.
var key : str
-
The type of the None singleton.
var max_concurrency : int
-
The type of the None singleton.
var monitoring_api_read_timeout : int
-
The type of the None singleton.
var read_timeout : int
-
The type of the None singleton.
var reporter : scrapfly.reporter.Reporter
-
The type of the None singleton.
var screenshot_api_read_timeout : int
-
The type of the None singleton.
var verify : bool
-
The type of the None singleton.
var version : str
-
The type of the None singleton.
var web_scraping_api_read_timeout : int
-
The type of the None singleton.
Instance variables
prop http
-
Expand source code
@property def http(self): return self._http_handler
prop ua : str
-
Expand source code
@property def ua(self) -> str: return 'ScrapflySDK/%s (Python %s, %s, %s)' % ( self.version, platform.python_version(), platform.uname().system, platform.uname().machine )
Methods
def account(self) ‑> str | Dict
-
Expand source code
def account(self) -> Union[str, Dict]: response = self._http_handler( method='GET', url=self.host + '/account', params={'key': self.key}, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8')
async def async_extraction(self,
extraction_config: ExtractionConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ExtractionApiResponse-
Expand source code
async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.extract, extraction_config)
async def async_scrape(self,
scrape_config: ScrapeConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ScrapeApiResponse-
Expand source code
async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)
async def async_screenshot(self,
screenshot_config: ScreenshotConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ScreenshotApiResponse-
Expand source code
async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config)
def close(self)
-
Expand source code
def close(self): self.http_session.close() self.http_session = None
async def concurrent_scrape(self,
scrape_configs: List[ScrapeConfig],
concurrency: int | None = None)-
Expand source code
async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None): if concurrency is None: concurrency = self.max_concurrency elif concurrency == self.CONCURRENCY_AUTO: concurrency = self.account()['subscription']['max_concurrency'] loop = asyncio.get_running_loop() processing_tasks = [] results = [] processed_tasks = 0 expected_tasks = len(scrape_configs) def scrape_done_callback(task:Task): nonlocal processed_tasks try: if task.cancelled() is True: return error = task.exception() if error is not None: results.append(error) else: results.append(task.result()) finally: processing_tasks.remove(task) processed_tasks += 1 while scrape_configs or results or processing_tasks: logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) if scrape_configs: if len(processing_tasks) < concurrency: # @todo handle backpressure for _ in range(0, concurrency - len(processing_tasks)): try: scrape_config = scrape_configs.pop() except: break scrape_config.raise_on_upstream_error = False task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop)) processing_tasks.append(task) task.add_done_callback(scrape_done_callback) for _ in results: result = results.pop() yield result await asyncio.sleep(.5) logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))
def extract(self,
extraction_config: ExtractionConfig,
no_raise: bool = False) ‑> ExtractionApiResponse-
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse: """ Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } } """ try: logger.debug('--> %s Extracting data from' % (extraction_config.content_type)) request_data = self._extraction_request(extraction_config=extraction_config) response = self._http_handler(**request_data) extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config) return extraction_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e
Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str
If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example
'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } }
def get_monitoring_metrics(self,
format: str = 'structured',
period: str | None = None,
aggregation: List[Literal['account', 'project', 'target']] | None = None)-
Expand source code
def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None): params = {'key': self.key, 'format': format} if period is not None: params['period'] = period if aggregation is not None: params['aggregation'] = ','.join(aggregation) response = self._http_handler( method='GET', url=self.host + '/scrape/monitoring/metrics', params=params, timeout=(self.connect_timeout, self.monitoring_api_read_timeout), verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8')
def get_monitoring_target_metrics(self,
domain: str,
group_subdomain: bool = False,
period: Literal['subscription', 'last7d', 'last24h', 'last1h', 'last5m'] | None = 'last24h',
start: datetime.datetime | None = None,
end: datetime.datetime | None = None)-
Expand source code
def get_monitoring_target_metrics( self, domain:str, group_subdomain:bool=False, period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H, start:Optional[datetime.datetime]=None, end:Optional[datetime.datetime]=None, ): params = { 'key': self.key, 'domain': domain, 'group_subdomain': group_subdomain } if (start is not None and end is None) or (start is None and end is not None): raise ValueError('You must provide both start and end date') if start is not None and end is not None: params['start'] = start.strftime(self.DATETIME_FORMAT) params['end'] = end.strftime(self.DATETIME_FORMAT) period = None params['period'] = period response = self._http_handler( method='GET', url=self.host + '/scrape/monitoring/metrics/target', timeout=(self.connect_timeout, self.monitoring_api_read_timeout), params=params, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8')
def open(self)
-
Expand source code
def open(self): if self.http_session is None: self.http_session = Session() self.http_session.verify = self.verify self.http_session.timeout = (self.connect_timeout, self.default_read_timeout) self.http_session.params['key'] = self.key self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding self.http_session.headers['accept'] = self.body_handler.accept self.http_session.headers['user-agent'] = self.ua
def resilient_scrape(self,
scrape_config: ScrapeConfig,
retry_on_errors: Set[Exception] = {<class 'scrapfly.errors.ScrapflyError'>},
retry_on_status_code: List[int] | None = None,
tries: int = 5,
delay: int = 20) ‑> ScrapeApiResponse-
Expand source code
def resilient_scrape( self, scrape_config:ScrapeConfig, retry_on_errors:Set[Exception]={ScrapflyError}, retry_on_status_code:Optional[List[int]]=None, tries: int = 5, delay: int = 20, ) -> ScrapeApiResponse: assert retry_on_errors is not None, 'Retry on error is None' assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()' @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay) def inner() -> ScrapeApiResponse: try: return self.scrape(scrape_config=scrape_config) except (UpstreamHttpClientError, UpstreamHttpServerError) as e: if retry_on_status_code is not None and e.api_response: if e.api_response.upstream_status_code in retry_on_status_code: raise e else: return e.api_response raise e return inner()
def save_scrape_screenshot(self,
api_response: ScrapeApiResponse,
name: str,
path: str | None = None)-
Expand source code
def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None): """ Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str] """ if not api_response.scrape_result['screenshots']: raise RuntimeError('Screenshot %s do no exists' % name) try: api_response.scrape_result['screenshots'][name] except KeyError: raise RuntimeError('Screenshot %s do no exists' % name) screenshot_response = self._http_handler( method='GET', url=api_response.scrape_result['screenshots'][name]['url'], params={'key': self.key}, verify=self.verify ) screenshot_response.raise_for_status() if not name.endswith('.jpg'): name += '.jpg' api_response.sink(path=path, name=name, content=screenshot_response.content)
Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str]
def save_screenshot(self,
screenshot_api_response: ScreenshotApiResponse,
name: str,
path: str | None = None)-
Expand source code
def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None): """ Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str] """ if screenshot_api_response.screenshot_success is not True: raise RuntimeError('Screenshot was not successful') if not screenshot_api_response.image: raise RuntimeError('Screenshot binary does not exist') content = screenshot_api_response.image extension_name = screenshot_api_response.metadata['extension_name'] if path: os.makedirs(path, exist_ok=True) file_path = os.path.join(path, f'{name}.{extension_name}') else: file_path = f'{name}.{extension_name}' if isinstance(content, bytes): content = BytesIO(content) with open(file_path, 'wb') as f: shutil.copyfileobj(content, f, length=131072)
Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str]
def scrape(self,
scrape_config: ScrapeConfig,
no_raise: bool = False) ‑> ScrapeApiResponse-
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse: """ Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } } """ try: logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url)) request_data = self._scrape_request(scrape_config=scrape_config) response = self._http_handler(**request_data) scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config) self.reporter.report(scrape_api_response=scrape_api_response) return scrape_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e
Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse
If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example
'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } }
def screenshot(self,
screenshot_config: ScreenshotConfig,
no_raise: bool = False) ‑> ScreenshotApiResponse-
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse: """ Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } } """ try: logger.debug('--> %s Screenshoting' % (screenshot_config.url)) request_data = self._screenshot_request(screenshot_config=screenshot_config) response = self._http_handler(**request_data) screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config) return screenshot_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e
Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str
If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example
'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } }
def sink(self,
api_response: ScrapeApiResponse,
content: str | bytes | None = None,
path: str | None = None,
name: str | None = None,
file:| _io.BytesIO | None = None) ‑> str -
Expand source code
def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str: scrape_result = api_response.result['result'] scrape_config = api_response.result['config'] file_content = content or scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = scrape_config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) return file_path
class ScrapflyError (message: str,
code: str,
http_status_code: int,
resource: str | None = None,
is_retryable: bool = False,
retry_delay: int | None = None,
retry_times: int | None = None,
documentation_url: str | None = None,
api_response: ForwardRef('ApiResponse') | None = None)-
Expand source code
class ScrapflyError(Exception): KIND_HTTP_BAD_RESPONSE = 'HTTP_BAD_RESPONSE' KIND_SCRAPFLY_ERROR = 'SCRAPFLY_ERROR' RESOURCE_PROXY = 'PROXY' RESOURCE_THROTTLE = 'THROTTLE' RESOURCE_SCRAPE = 'SCRAPE' RESOURCE_ASP = 'ASP' RESOURCE_SCHEDULE = 'SCHEDULE' RESOURCE_WEBHOOK = 'WEBHOOK' RESOURCE_SESSION = 'SESSION' def __init__( self, message: str, code: str, http_status_code: int, resource: Optional[str]=None, is_retryable: bool = False, retry_delay: Optional[int] = None, retry_times: Optional[int] = None, documentation_url: Optional[str] = None, api_response: Optional['ApiResponse'] = None ): self.message = message self.code = code self.retry_delay = retry_delay self.retry_times = retry_times self.resource = resource self.is_retryable = is_retryable self.documentation_url = documentation_url self.api_response = api_response self.http_status_code = http_status_code super().__init__(self.message, str(self.code)) def __str__(self): message = self.message if self.documentation_url is not None: message += '. Learn more: %s' % self.documentation_url return message
Common base class for all non-exit exceptions.
Ancestors
- builtins.Exception
- builtins.BaseException
Subclasses
- scrapfly.errors.ExtraUsageForbidden
- scrapfly.errors.HttpError
Class variables
var KIND_HTTP_BAD_RESPONSE
-
The type of the None singleton.
var KIND_SCRAPFLY_ERROR
-
The type of the None singleton.
var RESOURCE_ASP
-
The type of the None singleton.
var RESOURCE_PROXY
-
The type of the None singleton.
var RESOURCE_SCHEDULE
-
The type of the None singleton.
var RESOURCE_SCRAPE
-
The type of the None singleton.
var RESOURCE_SESSION
-
The type of the None singleton.
var RESOURCE_THROTTLE
-
The type of the None singleton.
var RESOURCE_WEBHOOK
-
The type of the None singleton.
class ScrapflyProxyError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflyProxyError(ScraperAPIError): pass
Common base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Inherited members
class ScrapflyScheduleError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflyScheduleError(ScraperAPIError): pass
Common base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Inherited members
class ScrapflyScrapeError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflyScrapeError(ScraperAPIError): pass
Common base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Inherited members
class ScrapflySessionError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflySessionError(ScraperAPIError): pass
Common base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Inherited members
class ScrapflyThrottleError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflyThrottleError(ScraperAPIError): pass
Common base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Inherited members
class ScrapflyWebhookError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflyWebhookError(ScraperAPIError): pass
Common base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Inherited members
class ScreenshotAPIError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScreenshotAPIError(HttpError): pass
Common base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Inherited members
class ScreenshotApiResponse (request: requests.models.Request,
response: requests.models.Response,
screenshot_config: ScreenshotConfig,
api_result: bytes | None = None)-
Expand source code
class ScreenshotApiResponse(ApiResponse): def __init__(self, request: Request, response: Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None): super().__init__(request, response) self.screenshot_config = screenshot_config self.result = self.handle_api_result(api_result) @property def image(self) -> Optional[str]: binary = self.result.get('result', None) if binary is None: return '' return binary @property def metadata(self) -> Optional[Dict]: if not self.image: return {} content_type = self.response.headers.get('content-type') extension_name = content_type[content_type.find('/') + 1:].split(';')[0] return { 'extension_name': extension_name, 'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'), 'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url') } @property def screenshot_success(self) -> bool: if not self.image: return False return True @property def error(self) -> Optional[Dict]: if self.image: return None if self.screenshot_success is False: return self.result def _is_api_error(self, api_result: Dict) -> bool: if api_result is None: return True return 'error_id' in api_result def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return api_result def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Ancestors
Instance variables
prop error : Dict | None
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.image: return None if self.screenshot_success is False: return self.result
prop image : str | None
-
Expand source code
@property def image(self) -> Optional[str]: binary = self.result.get('result', None) if binary is None: return '' return binary
prop metadata : Dict | None
-
Expand source code
@property def metadata(self) -> Optional[Dict]: if not self.image: return {} content_type = self.response.headers.get('content-type') extension_name = content_type[content_type.find('/') + 1:].split(';')[0] return { 'extension_name': extension_name, 'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'), 'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url') }
prop screenshot_success : bool
-
Expand source code
@property def screenshot_success(self) -> bool: if not self.image: return False return True
Methods
def handle_api_result(self, api_result: bytes) ‑> FrozenDict
-
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return api_result
def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ScreenshotAPIError)-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Inherited members
class ScreenshotConfig (url: str,
format: Format | None = None,
capture: str | None = None,
resolution: str | None = None,
country: str | None = None,
timeout: int | None = None,
rendering_wait: int | None = None,
wait_for_selector: str | None = None,
options: List[Options] | None = None,
auto_scroll: bool | None = None,
js: str | None = None,
cache: bool | None = None,
cache_ttl: bool | None = None,
cache_clear: bool | None = None,
webhook: str | None = None,
raise_on_upstream_error: bool = True)-
Expand source code
class ScreenshotConfig(BaseApiConfig): url: str format: Optional[Format] = None capture: Optional[str] = None resolution: Optional[str] = None country: Optional[str] = None timeout: Optional[int] = None # in milliseconds rendering_wait: Optional[int] = None # in milliseconds wait_for_selector: Optional[str] = None options: Optional[List[Options]] = None auto_scroll: Optional[bool] = None js: Optional[str] = None cache: Optional[bool] = None cache_ttl: Optional[bool] = None cache_clear: Optional[bool] = None webhook: Optional[str] = None raise_on_upstream_error: bool = True def __init__( self, url: str, format: Optional[Format] = None, capture: Optional[str] = None, resolution: Optional[str] = None, country: Optional[str] = None, timeout: Optional[int] = None, # in milliseconds rendering_wait: Optional[int] = None, # in milliseconds wait_for_selector: Optional[str] = None, options: Optional[List[Options]] = None, auto_scroll: Optional[bool] = None, js: Optional[str] = None, cache: Optional[bool] = None, cache_ttl: Optional[bool] = None, cache_clear: Optional[bool] = None, webhook: Optional[str] = None, raise_on_upstream_error: bool = True ): assert(type(url) is str) self.url = url self.key = None self.format = format self.capture = capture self.resolution = resolution self.country = country self.timeout = timeout self.rendering_wait = rendering_wait self.wait_for_selector = wait_for_selector self.options = [Options(flag) for flag in options] if options else None self.auto_scroll = auto_scroll self.js = js self.cache = cache self.cache_ttl = cache_ttl self.cache_clear = cache_clear self.webhook = webhook self.raise_on_upstream_error = raise_on_upstream_error def to_api_params(self, key:str) -> Dict: params = { 'key': self.key or key, 'url': self.url } if self.format: params['format'] = Format(self.format).value if self.capture: params['capture'] = self.capture if self.resolution: params['resolution'] = self.resolution if self.country is not None: params['country'] = self.country if self.timeout is not None: params['timeout'] = self.timeout if self.rendering_wait is not None: params['rendering_wait'] = self.rendering_wait if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.options is not None: params["options"] = ",".join(flag.value for flag in self.options) if self.auto_scroll is not None: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.cache is not None: params['cache'] = self._bool_to_http(self.cache) if self.cache_ttl is not None: params['cache_ttl'] = self._bool_to_http(self.cache_ttl) if self.cache_clear is not None: params['cache_clear'] = self._bool_to_http(self.cache_clear) else: if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.cache_clear is not None: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.webhook is not None: params['webhook_name'] = self.webhook return params def to_dict(self) -> Dict: """ Export the ScreenshotConfig instance to a plain dictionary. """ return { 'url': self.url, 'format': Format(self.format).value if self.format else None, 'capture': self.capture, 'resolution': self.resolution, 'country': self.country, 'timeout': self.timeout, 'rendering_wait': self.rendering_wait, 'wait_for_selector': self.wait_for_selector, 'options': [Options(option).value for option in self.options] if self.options else None, 'auto_scroll': self.auto_scroll, 'js': self.js, 'cache': self.cache, 'cache_ttl': self.cache_ttl, 'cache_clear': self.cache_clear, 'webhook': self.webhook, 'raise_on_upstream_error': self.raise_on_upstream_error } @staticmethod def from_dict(screenshot_config_dict: Dict) -> 'ScreenshotConfig': """Create a ScreenshotConfig instance from a dictionary.""" url = screenshot_config_dict.get('url', None) format = screenshot_config_dict.get('format', None) format = Format(format) if format else None capture = screenshot_config_dict.get('capture', None) resolution = screenshot_config_dict.get('resolution', None) country = screenshot_config_dict.get('country', None) timeout = screenshot_config_dict.get('timeout', None) rendering_wait = screenshot_config_dict.get('rendering_wait', None) wait_for_selector = screenshot_config_dict.get('wait_for_selector', None) options = screenshot_config_dict.get('options', None) options = [Options(option) for option in options] if options else None auto_scroll = screenshot_config_dict.get('auto_scroll', None) js = screenshot_config_dict.get('js', None) cache = screenshot_config_dict.get('cache', None) cache_ttl = screenshot_config_dict.get('cache_ttl', None) cache_clear = screenshot_config_dict.get('cache_clear', None) webhook = screenshot_config_dict.get('webhook', None) raise_on_upstream_error = screenshot_config_dict.get('raise_on_upstream_error', True) return ScreenshotConfig( url=url, format=format, capture=capture, resolution=resolution, country=country, timeout=timeout, rendering_wait=rendering_wait, wait_for_selector=wait_for_selector, options=options, auto_scroll=auto_scroll, js=js, cache=cache, cache_ttl=cache_ttl, cache_clear=cache_clear, webhook=webhook, raise_on_upstream_error=raise_on_upstream_error )
Ancestors
Class variables
var auto_scroll : bool | None
-
The type of the None singleton.
var cache : bool | None
-
The type of the None singleton.
var cache_clear : bool | None
-
The type of the None singleton.
var cache_ttl : bool | None
-
The type of the None singleton.
var capture : str | None
-
The type of the None singleton.
var country : str | None
-
The type of the None singleton.
var format : Format | None
-
The type of the None singleton.
var js : str | None
-
The type of the None singleton.
var options : List[Options] | None
-
The type of the None singleton.
var raise_on_upstream_error : bool
-
The type of the None singleton.
var rendering_wait : int | None
-
The type of the None singleton.
var resolution : str | None
-
The type of the None singleton.
var timeout : int | None
-
The type of the None singleton.
var url : str
-
The type of the None singleton.
var wait_for_selector : str | None
-
The type of the None singleton.
var webhook : str | None
-
The type of the None singleton.
Static methods
def from_dict(screenshot_config_dict: Dict) ‑> ScreenshotConfig
-
Expand source code
@staticmethod def from_dict(screenshot_config_dict: Dict) -> 'ScreenshotConfig': """Create a ScreenshotConfig instance from a dictionary.""" url = screenshot_config_dict.get('url', None) format = screenshot_config_dict.get('format', None) format = Format(format) if format else None capture = screenshot_config_dict.get('capture', None) resolution = screenshot_config_dict.get('resolution', None) country = screenshot_config_dict.get('country', None) timeout = screenshot_config_dict.get('timeout', None) rendering_wait = screenshot_config_dict.get('rendering_wait', None) wait_for_selector = screenshot_config_dict.get('wait_for_selector', None) options = screenshot_config_dict.get('options', None) options = [Options(option) for option in options] if options else None auto_scroll = screenshot_config_dict.get('auto_scroll', None) js = screenshot_config_dict.get('js', None) cache = screenshot_config_dict.get('cache', None) cache_ttl = screenshot_config_dict.get('cache_ttl', None) cache_clear = screenshot_config_dict.get('cache_clear', None) webhook = screenshot_config_dict.get('webhook', None) raise_on_upstream_error = screenshot_config_dict.get('raise_on_upstream_error', True) return ScreenshotConfig( url=url, format=format, capture=capture, resolution=resolution, country=country, timeout=timeout, rendering_wait=rendering_wait, wait_for_selector=wait_for_selector, options=options, auto_scroll=auto_scroll, js=js, cache=cache, cache_ttl=cache_ttl, cache_clear=cache_clear, webhook=webhook, raise_on_upstream_error=raise_on_upstream_error )
Create a ScreenshotConfig instance from a dictionary.
Methods
def to_api_params(self, key: str) ‑> Dict
-
Expand source code
def to_api_params(self, key:str) -> Dict: params = { 'key': self.key or key, 'url': self.url } if self.format: params['format'] = Format(self.format).value if self.capture: params['capture'] = self.capture if self.resolution: params['resolution'] = self.resolution if self.country is not None: params['country'] = self.country if self.timeout is not None: params['timeout'] = self.timeout if self.rendering_wait is not None: params['rendering_wait'] = self.rendering_wait if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.options is not None: params["options"] = ",".join(flag.value for flag in self.options) if self.auto_scroll is not None: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.cache is not None: params['cache'] = self._bool_to_http(self.cache) if self.cache_ttl is not None: params['cache_ttl'] = self._bool_to_http(self.cache_ttl) if self.cache_clear is not None: params['cache_clear'] = self._bool_to_http(self.cache_clear) else: if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.cache_clear is not None: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.webhook is not None: params['webhook_name'] = self.webhook return params
def to_dict(self) ‑> Dict
-
Expand source code
def to_dict(self) -> Dict: """ Export the ScreenshotConfig instance to a plain dictionary. """ return { 'url': self.url, 'format': Format(self.format).value if self.format else None, 'capture': self.capture, 'resolution': self.resolution, 'country': self.country, 'timeout': self.timeout, 'rendering_wait': self.rendering_wait, 'wait_for_selector': self.wait_for_selector, 'options': [Options(option).value for option in self.options] if self.options else None, 'auto_scroll': self.auto_scroll, 'js': self.js, 'cache': self.cache, 'cache_ttl': self.cache_ttl, 'cache_clear': self.cache_clear, 'webhook': self.webhook, 'raise_on_upstream_error': self.raise_on_upstream_error }
Export the ScreenshotConfig instance to a plain dictionary.
class UpstreamHttpClientError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class UpstreamHttpClientError(UpstreamHttpError): pass
Common base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.UpstreamHttpError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
Inherited members
class UpstreamHttpError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class UpstreamHttpError(HttpError): pass
Common base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
Inherited members
class UpstreamHttpServerError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class UpstreamHttpServerError(UpstreamHttpClientError): pass
Common base class for all non-exit exceptions.
Ancestors
- UpstreamHttpClientError
- scrapfly.errors.UpstreamHttpError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Inherited members