Package scrapfly

Sub-modules

scrapfly.api_config
scrapfly.api_response
scrapfly.client
scrapfly.errors
scrapfly.extraction_config
scrapfly.frozen_dict
scrapfly.polyfill
scrapfly.reporter
scrapfly.scrape_config
scrapfly.scrapy
scrapfly.screenshot_config
scrapfly.webhook

Classes

class ApiHttpClientError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ApiHttpClientError(HttpError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

  • ApiHttpServerError
  • scrapfly.errors.BadApiKeyError
  • scrapfly.errors.PaymentRequired
  • scrapfly.errors.TooManyRequest

Inherited members

class ApiHttpServerError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ApiHttpServerError(ApiHttpClientError):
    pass

Common base class for all non-exit exceptions.

Ancestors

Inherited members

class EncoderError (content: str)
Expand source code
class EncoderError(BaseException):

    def __init__(self, content:str):
        self.content = content
        super().__init__()

    def __str__(self) -> str:
        return self.content

    def __repr__(self):
        return "Invalid payload: %s" % self.content

Common base class for all exceptions

Ancestors

  • builtins.BaseException
class ErrorFactory
Expand source code
class ErrorFactory:
    RESOURCE_TO_ERROR = {
        ScrapflyError.RESOURCE_SCRAPE: ScrapflyScrapeError,
        ScrapflyError.RESOURCE_WEBHOOK: ScrapflyWebhookError,
        ScrapflyError.RESOURCE_PROXY: ScrapflyProxyError,
        ScrapflyError.RESOURCE_SCHEDULE: ScrapflyScheduleError,
        ScrapflyError.RESOURCE_ASP: ScrapflyAspError,
        ScrapflyError.RESOURCE_SESSION: ScrapflySessionError
    }

    # Notable http error has own class for more convenience
    # Only applicable for generic API error
    HTTP_STATUS_TO_ERROR = {
        401: BadApiKeyError,
        402: PaymentRequired,
        429: TooManyRequest
    }

    @staticmethod
    def _get_resource(code: str) -> Optional[Tuple[str, str]]:

        if isinstance(code, str) and '::' in code:
            _, resource, _ = code.split('::')
            return resource

        return None

    @staticmethod
    def create(api_response: 'ScrapeApiResponse'):
        is_retryable = False
        kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR
        http_code = api_response.status_code
        retry_delay = 5
        retry_times = 3
        description = None
        error_url = 'https://scrapfly.io/docs/scrape-api/errors#api'
        code = api_response.error['code']

        if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
            http_code = api_response.scrape_result['status_code']

        if 'description' in api_response.error:
            description = api_response.error['description']

        message = '%s %s %s' % (str(http_code), code, api_response.error['message'])

        if 'doc_url' in api_response.error:
            error_url = api_response.error['doc_url']

        if 'retryable' in api_response.error:
            is_retryable = api_response.error['retryable']

        resource = ErrorFactory._get_resource(code=code)

        if is_retryable is True:
            if 'X-Retry' in api_response.headers:
                retry_delay = int(api_response.headers['Retry-After'])

        message = '%s: %s' % (message, description) if description else message

        if retry_delay is not None and is_retryable is True:
            message = '%s. Retry delay : %s seconds' % (message, str(retry_delay))

        args = {
            'message': message,
            'code': code,
            'http_status_code': http_code,
            'is_retryable': is_retryable,
            'api_response': api_response,
            'resource': resource,
            'retry_delay': retry_delay,
            'retry_times': retry_times,
            'documentation_url': error_url,
            'request': api_response.request,
            'response': api_response.response
        }

        if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE:
            if http_code >= 500:
                return ApiHttpServerError(**args)

            is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR

            if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error:
                return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args)

            if is_scraper_api_error:
                return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

            return ApiHttpClientError(**args)

        elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR:
            if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
                if http_code >= 500:
                    return UpstreamHttpServerError(**args)

                if http_code >= 400:
                    return UpstreamHttpClientError(**args)

            if resource in ErrorFactory.RESOURCE_TO_ERROR:
                return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

            return ScrapflyError(**args)

Class variables

var HTTP_STATUS_TO_ERROR

The type of the None singleton.

var RESOURCE_TO_ERROR

The type of the None singleton.

Static methods

def create(api_response: ScrapeApiResponse)
Expand source code
@staticmethod
def create(api_response: 'ScrapeApiResponse'):
    is_retryable = False
    kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR
    http_code = api_response.status_code
    retry_delay = 5
    retry_times = 3
    description = None
    error_url = 'https://scrapfly.io/docs/scrape-api/errors#api'
    code = api_response.error['code']

    if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
        http_code = api_response.scrape_result['status_code']

    if 'description' in api_response.error:
        description = api_response.error['description']

    message = '%s %s %s' % (str(http_code), code, api_response.error['message'])

    if 'doc_url' in api_response.error:
        error_url = api_response.error['doc_url']

    if 'retryable' in api_response.error:
        is_retryable = api_response.error['retryable']

    resource = ErrorFactory._get_resource(code=code)

    if is_retryable is True:
        if 'X-Retry' in api_response.headers:
            retry_delay = int(api_response.headers['Retry-After'])

    message = '%s: %s' % (message, description) if description else message

    if retry_delay is not None and is_retryable is True:
        message = '%s. Retry delay : %s seconds' % (message, str(retry_delay))

    args = {
        'message': message,
        'code': code,
        'http_status_code': http_code,
        'is_retryable': is_retryable,
        'api_response': api_response,
        'resource': resource,
        'retry_delay': retry_delay,
        'retry_times': retry_times,
        'documentation_url': error_url,
        'request': api_response.request,
        'response': api_response.response
    }

    if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE:
        if http_code >= 500:
            return ApiHttpServerError(**args)

        is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR

        if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error:
            return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args)

        if is_scraper_api_error:
            return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

        return ApiHttpClientError(**args)

    elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR:
        if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
            if http_code >= 500:
                return UpstreamHttpServerError(**args)

            if http_code >= 400:
                return UpstreamHttpClientError(**args)

        if resource in ErrorFactory.RESOURCE_TO_ERROR:
            return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

        return ScrapflyError(**args)
class ExtractionAPIError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ExtractionAPIError(HttpError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Inherited members

class ExtractionApiResponse (request: requests.models.Request,
response: requests.models.Response,
extraction_config: ExtractionConfig,
api_result: bytes | None = None)
Expand source code
class ExtractionApiResponse(ApiResponse):
    def __init__(self, request: Request, response: Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None):
        super().__init__(request, response)
        self.extraction_config = extraction_config
        self.result = self.handle_api_result(api_result)

    @property
    def extraction_result(self) -> Optional[Dict]:
        extraction_result = self.result.get('result', None)
        if not extraction_result:  # handle empty extraction responses
            return {'data': None, 'content_type': None}
        else:
            return extraction_result

    @property
    def data(self) -> Union[Dict, List, str]:  # depends on the LLM prompt
        if self.error is None:
            return self.extraction_result['data']

        return None

    @property
    def content_type(self) -> Optional[str]:
        if self.error is None:
            return self.extraction_result['content_type']

        return None

    @property
    def extraction_success(self) -> bool:
        extraction_result = self.extraction_result
        if extraction_result is None or extraction_result['data'] is None:
            return False

        return True

    @property
    def error(self) -> Optional[Dict]:
        if self.extraction_result is None:
            return self.result

        return None

    def _is_api_error(self, api_result: Dict) -> bool:
        if api_result is None:
            return True

        return 'error_id' in api_result

    def handle_api_result(self, api_result: bytes) -> FrozenDict:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        return FrozenDict({'result': api_result})

    def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError):
        super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Ancestors

Instance variables

prop content_type : str | None
Expand source code
@property
def content_type(self) -> Optional[str]:
    if self.error is None:
        return self.extraction_result['content_type']

    return None
prop data : Dict | List | str
Expand source code
@property
def data(self) -> Union[Dict, List, str]:  # depends on the LLM prompt
    if self.error is None:
        return self.extraction_result['data']

    return None
prop error : Dict | None
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.extraction_result is None:
        return self.result

    return None
prop extraction_result : Dict | None
Expand source code
@property
def extraction_result(self) -> Optional[Dict]:
    extraction_result = self.result.get('result', None)
    if not extraction_result:  # handle empty extraction responses
        return {'data': None, 'content_type': None}
    else:
        return extraction_result
prop extraction_success : bool
Expand source code
@property
def extraction_success(self) -> bool:
    extraction_result = self.extraction_result
    if extraction_result is None or extraction_result['data'] is None:
        return False

    return True

Methods

def handle_api_result(self, api_result: bytes) ‑> FrozenDict
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict:
    if self._is_api_error(api_result=api_result) is True:
        return FrozenDict(api_result)

    return FrozenDict({'result': api_result})
def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ExtractionAPIError)
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError):
    super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Inherited members

class ExtractionConfig (body: str | bytes,
content_type: str,
url: str | None = None,
charset: str | None = None,
extraction_template: str | None = None,
extraction_ephemeral_template: Dict | None = None,
extraction_prompt: str | None = None,
extraction_model: str | None = None,
is_document_compressed: bool | None = None,
document_compression_format: CompressionFormat | None = None,
webhook: str | None = None,
raise_on_upstream_error: bool = True,
template: str | None = None,
ephemeral_template: Dict | None = None)
Expand source code
class ExtractionConfig(BaseApiConfig):
    body: Union[str, bytes]
    content_type: str
    url: Optional[str] = None
    charset: Optional[str] = None
    extraction_template: Optional[str] = None  # a saved template name
    extraction_ephemeral_template: Optional[Dict]  # ephemeraly declared json template
    extraction_prompt: Optional[str] = None
    extraction_model: Optional[str] = None
    is_document_compressed: Optional[bool] = None
    document_compression_format: Optional[CompressionFormat] = None
    webhook: Optional[str] = None
    raise_on_upstream_error: bool = True

    # deprecated options
    template: Optional[str] = None
    ephemeral_template: Optional[Dict] = None

    def __init__(
        self,
        body: Union[str, bytes],
        content_type: str,
        url: Optional[str] = None,
        charset: Optional[str] = None,
        extraction_template: Optional[str] = None,  # a saved template name
        extraction_ephemeral_template: Optional[Dict] = None,  # ephemeraly declared json template
        extraction_prompt: Optional[str] = None,
        extraction_model: Optional[str] = None,
        is_document_compressed: Optional[bool] = None,
        document_compression_format: Optional[CompressionFormat] = None,
        webhook: Optional[str] = None,
        raise_on_upstream_error: bool = True,

        # deprecated options
        template: Optional[str] = None,
        ephemeral_template: Optional[Dict] = None     
    ):
        if template:
            print("WARNGING")
            warnings.warn(
                "Deprecation warning: 'template' is deprecated. Use 'extraction_template' instead."
            )
            extraction_template = template

        if ephemeral_template:
            warnings.warn(
                "Deprecation warning: 'ephemeral_template' is deprecated. Use 'extraction_ephemeral_template' instead."
            )
            extraction_ephemeral_template = ephemeral_template

        self.key = None
        self.body = body
        self.content_type = content_type
        self.url = url
        self.charset = charset
        self.extraction_template = extraction_template
        self.extraction_ephemeral_template = extraction_ephemeral_template
        self.extraction_prompt = extraction_prompt
        self.extraction_model = extraction_model
        self.is_document_compressed = is_document_compressed
        self.document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None
        self.webhook = webhook
        self.raise_on_upstream_error = raise_on_upstream_error

        if isinstance(body, bytes) or document_compression_format:
            compression_format = detect_compression_format(body)

            if compression_format is not None:
                self.is_document_compressed = True

                if self.document_compression_format and compression_format != self.document_compression_format:
                    raise ExtractionConfigError(
                        f'The detected compression format `{compression_format}` does not match declared format `{self.document_compression_format}`. '
                        f'You must pass the compression format or disable compression.'
                    )
                
                self.document_compression_format = compression_format
            
            else:
                self.is_document_compressed = False

            if self.is_document_compressed is False:
                compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None
                
                if isinstance(self.body, str) and compression_foramt:
                    self.body = self.body.encode('utf-8')

                if compression_foramt == CompressionFormat.GZIP:
                    import gzip
                    self.body = gzip.compress(self.body)

                elif compression_foramt == CompressionFormat.ZSTD:
                    try:
                        import zstandard as zstd
                    except ImportError:
                        raise ExtractionConfigError(
                            f'zstandard is not installed. You must run pip install zstandard'
                            f' to auto compress into zstd or use compression formats.'
                        )
                    self.body = zstd.compress(self.body)
                
                elif compression_foramt == CompressionFormat.DEFLATE:
                    import zlib
                    compressor = zlib.compressobj(wbits=-zlib.MAX_WBITS) # raw deflate compression
                    self.body = compressor.compress(self.body) + compressor.flush()

    def to_api_params(self, key: str) -> Dict:
        params = {
            'key': self.key or key,
            'content_type': self.content_type
        }

        if self.url:
            params['url'] = self.url

        if self.charset:
            params['charset'] = self.charset

        if self.extraction_template and self.extraction_ephemeral_template:
            raise ExtractionConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

        if self.extraction_template:
            params['extraction_template'] = self.extraction_template

        if self.extraction_ephemeral_template:
            self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
            params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

        if self.extraction_prompt:
            params['extraction_prompt'] = quote_plus(self.extraction_prompt)

        if self.extraction_model:
            params['extraction_model'] = self.extraction_model

        if self.webhook:
            params['webhook_name'] = self.webhook

        return params

    def to_dict(self) -> Dict:
        """
        Export the ExtractionConfig instance to a plain dictionary.
        """

        if self.is_document_compressed is True:
                compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None

                if compression_foramt == CompressionFormat.GZIP:
                    import gzip
                    self.body = gzip.decompress(self.body)
                    
                elif compression_foramt == CompressionFormat.ZSTD:
                    import zstandard as zstd
                    self.body = zstd.decompress(self.body)

                elif compression_foramt == CompressionFormat.DEFLATE:
                    import zlib
                    decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
                    self.body = decompressor.decompress(self.body) + decompressor.flush()

                if isinstance(self.body, bytes):
                    self.body = self.body.decode('utf-8')
                    self.is_document_compressed = False

        return {
            'body': self.body,
            'content_type': self.content_type,
            'url': self.url,
            'charset': self.charset,
            'extraction_template': self.extraction_template,
            'extraction_ephemeral_template': self.extraction_ephemeral_template,
            'extraction_prompt': self.extraction_prompt,
            'extraction_model': self.extraction_model,
            'is_document_compressed': self.is_document_compressed,
            'document_compression_format': CompressionFormat(self.document_compression_format).value if self.document_compression_format else None,
            'webhook': self.webhook,
            'raise_on_upstream_error': self.raise_on_upstream_error,
        }
    
    @staticmethod
    def from_dict(extraction_config_dict: Dict) -> 'ExtractionConfig':
        """Create an ExtractionConfig instance from a dictionary."""
        body = extraction_config_dict.get('body', None)
        content_type = extraction_config_dict.get('content_type', None)
        url = extraction_config_dict.get('url', None)
        charset = extraction_config_dict.get('charset', None)
        extraction_template = extraction_config_dict.get('extraction_template', None)
        extraction_ephemeral_template = extraction_config_dict.get('extraction_ephemeral_template', None)
        extraction_prompt = extraction_config_dict.get('extraction_prompt', None)
        extraction_model = extraction_config_dict.get('extraction_model', None)
        is_document_compressed = extraction_config_dict.get('is_document_compressed', None)

        document_compression_format = extraction_config_dict.get('document_compression_format', None)
        document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None
        
        webhook = extraction_config_dict.get('webhook', None)
        raise_on_upstream_error = extraction_config_dict.get('raise_on_upstream_error', True)

        return ExtractionConfig(
            body=body,
            content_type=content_type,
            url=url,
            charset=charset,
            extraction_template=extraction_template,
            extraction_ephemeral_template=extraction_ephemeral_template,
            extraction_prompt=extraction_prompt,
            extraction_model=extraction_model,
            is_document_compressed=is_document_compressed,
            document_compression_format=document_compression_format,
            webhook=webhook,
            raise_on_upstream_error=raise_on_upstream_error
        )

Ancestors

Class variables

var body : str | bytes

The type of the None singleton.

var charset : str | None

The type of the None singleton.

var content_type : str

The type of the None singleton.

var document_compression_formatCompressionFormat | None

The type of the None singleton.

var ephemeral_template : Dict | None

The type of the None singleton.

var extraction_ephemeral_template : Dict | None

The type of the None singleton.

var extraction_model : str | None

The type of the None singleton.

var extraction_prompt : str | None

The type of the None singleton.

var extraction_template : str | None

The type of the None singleton.

var is_document_compressed : bool | None

The type of the None singleton.

var raise_on_upstream_error : bool

The type of the None singleton.

var template : str | None

The type of the None singleton.

var url : str | None

The type of the None singleton.

var webhook : str | None

The type of the None singleton.

Static methods

def from_dict(extraction_config_dict: Dict) ‑> ExtractionConfig
Expand source code
@staticmethod
def from_dict(extraction_config_dict: Dict) -> 'ExtractionConfig':
    """Create an ExtractionConfig instance from a dictionary."""
    body = extraction_config_dict.get('body', None)
    content_type = extraction_config_dict.get('content_type', None)
    url = extraction_config_dict.get('url', None)
    charset = extraction_config_dict.get('charset', None)
    extraction_template = extraction_config_dict.get('extraction_template', None)
    extraction_ephemeral_template = extraction_config_dict.get('extraction_ephemeral_template', None)
    extraction_prompt = extraction_config_dict.get('extraction_prompt', None)
    extraction_model = extraction_config_dict.get('extraction_model', None)
    is_document_compressed = extraction_config_dict.get('is_document_compressed', None)

    document_compression_format = extraction_config_dict.get('document_compression_format', None)
    document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None
    
    webhook = extraction_config_dict.get('webhook', None)
    raise_on_upstream_error = extraction_config_dict.get('raise_on_upstream_error', True)

    return ExtractionConfig(
        body=body,
        content_type=content_type,
        url=url,
        charset=charset,
        extraction_template=extraction_template,
        extraction_ephemeral_template=extraction_ephemeral_template,
        extraction_prompt=extraction_prompt,
        extraction_model=extraction_model,
        is_document_compressed=is_document_compressed,
        document_compression_format=document_compression_format,
        webhook=webhook,
        raise_on_upstream_error=raise_on_upstream_error
    )

Create an ExtractionConfig instance from a dictionary.

Methods

def to_api_params(self, key: str) ‑> Dict
Expand source code
def to_api_params(self, key: str) -> Dict:
    params = {
        'key': self.key or key,
        'content_type': self.content_type
    }

    if self.url:
        params['url'] = self.url

    if self.charset:
        params['charset'] = self.charset

    if self.extraction_template and self.extraction_ephemeral_template:
        raise ExtractionConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

    if self.extraction_template:
        params['extraction_template'] = self.extraction_template

    if self.extraction_ephemeral_template:
        self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
        params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

    if self.extraction_prompt:
        params['extraction_prompt'] = quote_plus(self.extraction_prompt)

    if self.extraction_model:
        params['extraction_model'] = self.extraction_model

    if self.webhook:
        params['webhook_name'] = self.webhook

    return params
def to_dict(self) ‑> Dict
Expand source code
def to_dict(self) -> Dict:
    """
    Export the ExtractionConfig instance to a plain dictionary.
    """

    if self.is_document_compressed is True:
            compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None

            if compression_foramt == CompressionFormat.GZIP:
                import gzip
                self.body = gzip.decompress(self.body)
                
            elif compression_foramt == CompressionFormat.ZSTD:
                import zstandard as zstd
                self.body = zstd.decompress(self.body)

            elif compression_foramt == CompressionFormat.DEFLATE:
                import zlib
                decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
                self.body = decompressor.decompress(self.body) + decompressor.flush()

            if isinstance(self.body, bytes):
                self.body = self.body.decode('utf-8')
                self.is_document_compressed = False

    return {
        'body': self.body,
        'content_type': self.content_type,
        'url': self.url,
        'charset': self.charset,
        'extraction_template': self.extraction_template,
        'extraction_ephemeral_template': self.extraction_ephemeral_template,
        'extraction_prompt': self.extraction_prompt,
        'extraction_model': self.extraction_model,
        'is_document_compressed': self.is_document_compressed,
        'document_compression_format': CompressionFormat(self.document_compression_format).value if self.document_compression_format else None,
        'webhook': self.webhook,
        'raise_on_upstream_error': self.raise_on_upstream_error,
    }

Export the ExtractionConfig instance to a plain dictionary.

class HttpError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class HttpError(ScrapflyError):

    def __init__(self, request:Request, response:Optional[Response]=None, **kwargs):
        self.request = request
        self.response = response
        super().__init__(**kwargs)

    def __str__(self) -> str:
        if isinstance(self, UpstreamHttpError):
            return f"Target website responded with {self.api_response.scrape_result['status_code']} - {self.api_response.scrape_result['reason']}"

        if self.api_response is not None:
            return self.api_response.error_message

        text = f"{self.response.status_code} - {self.response.reason}"

        if isinstance(self, (ApiHttpClientError, ApiHttpServerError)):
            text += " - " + self.message

        return text

Common base class for all non-exit exceptions.

Ancestors

Subclasses

  • ApiHttpClientError
  • scrapfly.errors.ExtractionAPIError
  • scrapfly.errors.QuotaLimitReached
  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.ScreenshotAPIError
  • scrapfly.errors.TooManyConcurrentRequest
  • scrapfly.errors.UpstreamHttpError

Inherited members

class ResponseBodyHandler (use_brotli: bool = False, signing_secrets: Tuple[str] | None = None)
Expand source code
class ResponseBodyHandler:

    SUPPORTED_COMPRESSION = ['gzip', 'deflate']
    SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json']

    class JSONDateTimeDecoder(JSONDecoder):
        def __init__(self, *args, **kargs):
            JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs)

    # brotli under perform at same gzip level and upper level destroy the cpu so
    # the trade off do not worth it for most of usage
    def __init__(self, use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None):
        if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION:
            try:
                try:
                    import brotlicffi as brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
                except ImportError:
                    import brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
            except ImportError:
                pass

        try:
            import zstd
            self.SUPPORTED_COMPRESSION.append('zstd')
        except ImportError:
            pass

        self.content_encoding: str = ', '.join(self.SUPPORTED_COMPRESSION)
        self._signing_secret: Optional[Tuple[str]] = None

        if signing_secrets:
            _secrets = set()

            for signing_secret in signing_secrets:
                _secrets.add(binascii.unhexlify(signing_secret))

            self._signing_secret = tuple(_secrets)

        try:  # automatically use msgpack if available https://msgpack.org/
            import msgpack
            self.accept = 'application/msgpack;charset=utf-8'
            self.content_type = 'application/msgpack;charset=utf-8'
            self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)
        except ImportError:
            self.accept = 'application/json;charset=utf-8'
            self.content_type = 'application/json;charset=utf-8'
            self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder)

    def support(self, headers: Dict) -> bool:
        if 'content-type' not in headers:
            return False

        for content_type in self.SUPPORTED_CONTENT_TYPES:
            if headers['content-type'].find(content_type) != -1:
                return True

        return False

    def verify(self, message: bytes, signature: str) -> bool:
        for signing_secret in self._signing_secret:
            if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature:
                return True

        return False

    def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict:
        if content_encoding == 'gzip' or content_encoding == 'gz':
            import gzip
            content = gzip.decompress(content)
        elif content_encoding == 'deflate':
            import zlib
            content = zlib.decompress(content)
        elif content_encoding == 'brotli' or content_encoding == 'br':
            import brotli
            content = brotli.decompress(content)
        elif content_encoding == 'zstd':
            import zstd
            content = zstd.decompress(content)

        if self._signing_secret is not None and signature is not None:
            if not self.verify(content, signature):
                raise WebhookSignatureMissMatch()

        if content_type.startswith('application/json'):
            content = loads(content, cls=self.JSONDateTimeDecoder)
        elif content_type.startswith('application/msgpack'):
            import msgpack
            content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False)

        return content

    def __call__(self, content: bytes, content_type: str) -> Union[str, Dict]:
        content_loader = None

        if content_type.find('application/json') != -1:
            content_loader = partial(loads, cls=self.JSONDateTimeDecoder)
        elif content_type.find('application/msgpack') != -1:
            import msgpack
            content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)

        if content_loader is None:
            raise Exception('Unsupported content type')

        try:
            return content_loader(content)
        except Exception as e:
            try:
                raise EncoderError(content=content.decode('utf-8')) from e
            except UnicodeError:
                raise EncoderError(content=base64.b64encode(content).decode('utf-8')) from e

Class variables

var JSONDateTimeDecoder

Simple JSON https://json.org decoder

Performs the following translations in decoding by default:

+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+

It also understands NaN, Infinity, and -Infinity as their corresponding float values, which is outside the JSON spec.

var SUPPORTED_COMPRESSION

The type of the None singleton.

var SUPPORTED_CONTENT_TYPES

The type of the None singleton.

Methods

def read(self,
content: bytes,
content_encoding: str,
content_type: str,
signature: str | None) ‑> Dict
Expand source code
def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict:
    if content_encoding == 'gzip' or content_encoding == 'gz':
        import gzip
        content = gzip.decompress(content)
    elif content_encoding == 'deflate':
        import zlib
        content = zlib.decompress(content)
    elif content_encoding == 'brotli' or content_encoding == 'br':
        import brotli
        content = brotli.decompress(content)
    elif content_encoding == 'zstd':
        import zstd
        content = zstd.decompress(content)

    if self._signing_secret is not None and signature is not None:
        if not self.verify(content, signature):
            raise WebhookSignatureMissMatch()

    if content_type.startswith('application/json'):
        content = loads(content, cls=self.JSONDateTimeDecoder)
    elif content_type.startswith('application/msgpack'):
        import msgpack
        content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False)

    return content
def support(self, headers: Dict) ‑> bool
Expand source code
def support(self, headers: Dict) -> bool:
    if 'content-type' not in headers:
        return False

    for content_type in self.SUPPORTED_CONTENT_TYPES:
        if headers['content-type'].find(content_type) != -1:
            return True

    return False
def verify(self, message: bytes, signature: str) ‑> bool
Expand source code
def verify(self, message: bytes, signature: str) -> bool:
    for signing_secret in self._signing_secret:
        if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature:
            return True

    return False
class ScrapeApiResponse (request: requests.models.Request,
response: requests.models.Response,
scrape_config: ScrapeConfig,
api_result: Dict | None = None,
large_object_handler: Callable | None = None)
Expand source code
class ScrapeApiResponse(ApiResponse):
    scrape_config:ScrapeConfig
    large_object_handler:Callable

    def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler:Optional[Callable]=None):
        super().__init__(request, response)
        self.scrape_config = scrape_config
        self.large_object_handler = large_object_handler

        if self.scrape_config.method == 'HEAD':
            api_result = {
                'result': {
                    'request_headers': {},
                    'status': 'DONE',
                    'success': 200 >= self.response.status_code < 300,
                    'response_headers': self.response.headers,
                    'status_code': self.response.status_code,
                    'reason': self.response.reason,
                    'format': 'text',
                    'content': ''
                },
                'context': {},
                'config': self.scrape_config.__dict__
            }

            if 'X-Scrapfly-Reject-Code' in self.response.headers:
                api_result['result']['error'] = {
                    'code': self.response.headers['X-Scrapfly-Reject-Code'],
                    'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']),
                    'message': self.response.headers['X-Scrapfly-Reject-Description'],
                    'error_id': self.response.headers['X-Scrapfly-Reject-ID'],
                    'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False,
                    'doc_url': '',
                    'links': {}
                }

                if 'X-Scrapfly-Reject-Doc' in self.response.headers:
                    api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc']
                    api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc']

        if isinstance(api_result, str):
            raise HttpError(
                request=request,
                response=response,
                message='Bad gateway',
                code=502,
                http_status_code=502,
                is_retryable=True
            )

        self.result = self.handle_api_result(api_result=api_result)

    @property
    def scrape_result(self) -> Optional[Dict]:
        return self.result.get('result', None)

    @property
    def config(self) -> Optional[Dict]:
        if self.scrape_result is None:
            return None

        return self.result['config']

    @property
    def context(self) -> Optional[Dict]:
        if self.scrape_result is None:
            return None

        return self.result['context']

    @property
    def content(self) -> str:
        if self.scrape_result is None:
            return ''

        return self.scrape_result['content']

    @property
    def success(self) -> bool:
        """
            Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
        """
        return 200 >= self.response.status_code <= 299

    @property
    def scrape_success(self) -> bool:
        scrape_result = self.scrape_result

        if not scrape_result:
            return False

        return self.scrape_result['success']

    @property
    def error(self) -> Optional[Dict]:
        if self.scrape_result is None:
            return None

        if self.scrape_success is False:
            return self.scrape_result['error']

    @property
    def upstream_status_code(self) -> Optional[int]:
        if self.scrape_result is None:
            return None

        if 'status_code' in self.scrape_result:
            return self.scrape_result['status_code']

        return None

    @cached_property
    def soup(self) -> 'BeautifulSoup':
        if self.scrape_result['format'] != 'text':
            raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")

        try:
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(self.content, "lxml")
            return soup
        except ImportError as e:
            logger.error('You must install scrapfly[parser] to enable this feature')

    @cached_property
    def selector(self) -> 'Selector':
        if self.scrape_result['format'] != 'text':
            raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")

        try:
            from parsel import Selector
            return Selector(text=self.content)
        except ImportError as e:
            logger.error('You must install parsel or scrapy package to enable this feature')
            raise e

    def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        try:
            if isinstance(api_result['config']['headers'], list):
                api_result['config']['headers'] = {}
        except TypeError:
            logger.info(api_result)
            raise

        with suppress(KeyError):
            api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
            api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])

        if self.large_object_handler is not None and api_result['result']['content']:
            content_format = api_result['result']['format']

            if content_format in ['clob', 'blob']:
                api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format)
            elif content_format == 'binary':
                base64_payload = api_result['result']['content']

                if isinstance(base64_payload, bytes):
                    base64_payload = base64_payload.decode('utf-8')

                api_result['result']['content'] = BytesIO(b64decode(base64_payload))

        return FrozenDict(api_result)

    def _is_api_error(self, api_result: Dict) -> bool:
        if self.scrape_config.method == 'HEAD':
            if 'X-Reject-Reason' in self.response.headers:
                return True
            return False

        if api_result is None:
            return True

        return 'error_id' in api_result

    def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
        if _class != Response:
            raise RuntimeError('only Response from requests package is supported at the moment')

        if self.result is None:
            return None

        if self.response.status_code != 200:
            return None

        response = Response()
        response.status_code = self.scrape_result['status_code']
        response.reason = self.scrape_result['reason']

        if self.scrape_result['content']:
            if isinstance(self.scrape_result['content'], BytesIO):
                response._content = self.scrape_result['content'].getvalue()
            elif isinstance(self.scrape_result['content'], bytes):
                response._content = self.scrape_result['content']
            elif isinstance(self.scrape_result['content'], str):
                response._content = self.scrape_result['content'].encode('utf-8')
        else:
            response._content = None

        response.headers.update(self.scrape_result['response_headers'])
        response.url = self.scrape_result['url']

        response.request = Request(
            method=self.config['method'],
            url=self.config['url'],
            headers=self.scrape_result['request_headers'],
            data=self.config['body'] if self.config['body'] else None
        )

        if 'set-cookie' in response.headers:
            for raw_cookie in response.headers['set-cookie']:
                for name, cookie in SimpleCookie(raw_cookie).items():
                    expires = cookie.get('expires')

                    if expires == '':
                        expires = None

                    if expires:
                        try:
                            expires = parse(expires).timestamp()
                        except ValueError:
                            expires = None

                    if type(expires) == str:
                        if '.' in expires:
                            expires = float(expires)
                        else:
                            expires = int(expires)

                    response.cookies.set_cookie(Cookie(
                        version=cookie.get('version') if cookie.get('version') else None,
                        name=name,
                        value=cookie.value,
                        path=cookie.get('path', ''),
                        expires=expires,
                        comment=cookie.get('comment'),
                        domain=cookie.get('domain', ''),
                        secure=cookie.get('secure'),
                        port=None,
                        port_specified=False,
                        domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
                        domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
                        path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
                        discard=False,
                        comment_url=None,
                        rest={
                            'httponly': cookie.get('httponly'),
                            'samesite': cookie.get('samesite'),
                            'max-age': cookie.get('max-age')
                        }
                    ))

        return response

    def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None):
        file_content = content or self.scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = self.scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = self.config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path is not None else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)

    def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError):
        super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
        if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
            error = ErrorFactory.create(api_response=self)
            if error:
                if isinstance(error, UpstreamHttpError):
                    if raise_on_upstream_error is True:
                        raise error
                else:
                    raise error

Ancestors

Class variables

var large_object_handler : Callable

The type of the None singleton.

var scrape_configScrapeConfig

The type of the None singleton.

Instance variables

prop config : Dict | None
Expand source code
@property
def config(self) -> Optional[Dict]:
    if self.scrape_result is None:
        return None

    return self.result['config']
prop content : str
Expand source code
@property
def content(self) -> str:
    if self.scrape_result is None:
        return ''

    return self.scrape_result['content']
prop context : Dict | None
Expand source code
@property
def context(self) -> Optional[Dict]:
    if self.scrape_result is None:
        return None

    return self.result['context']
prop error : Dict | None
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.scrape_result is None:
        return None

    if self.scrape_success is False:
        return self.scrape_result['error']
prop scrape_result : Dict | None
Expand source code
@property
def scrape_result(self) -> Optional[Dict]:
    return self.result.get('result', None)
prop scrape_success : bool
Expand source code
@property
def scrape_success(self) -> bool:
    scrape_result = self.scrape_result

    if not scrape_result:
        return False

    return self.scrape_result['success']
var selector : Selector
Expand source code
@cached_property
def selector(self) -> 'Selector':
    if self.scrape_result['format'] != 'text':
        raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")

    try:
        from parsel import Selector
        return Selector(text=self.content)
    except ImportError as e:
        logger.error('You must install parsel or scrapy package to enable this feature')
        raise e
var soup : BeautifulSoup
Expand source code
@cached_property
def soup(self) -> 'BeautifulSoup':
    if self.scrape_result['format'] != 'text':
        raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")

    try:
        from bs4 import BeautifulSoup
        soup = BeautifulSoup(self.content, "lxml")
        return soup
    except ImportError as e:
        logger.error('You must install scrapfly[parser] to enable this feature')
prop success : bool
Expand source code
@property
def success(self) -> bool:
    """
        Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
    """
    return 200 >= self.response.status_code <= 299

Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code

prop upstream_status_code : int | None
Expand source code
@property
def upstream_status_code(self) -> Optional[int]:
    if self.scrape_result is None:
        return None

    if 'status_code' in self.scrape_result:
        return self.scrape_result['status_code']

    return None

Methods

def handle_api_result(self, api_result: Dict) ‑> FrozenDict | None
Expand source code
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
    if self._is_api_error(api_result=api_result) is True:
        return FrozenDict(api_result)

    try:
        if isinstance(api_result['config']['headers'], list):
            api_result['config']['headers'] = {}
    except TypeError:
        logger.info(api_result)
        raise

    with suppress(KeyError):
        api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
        api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])

    if self.large_object_handler is not None and api_result['result']['content']:
        content_format = api_result['result']['format']

        if content_format in ['clob', 'blob']:
            api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format)
        elif content_format == 'binary':
            base64_payload = api_result['result']['content']

            if isinstance(base64_payload, bytes):
                base64_payload = base64_payload.decode('utf-8')

            api_result['result']['content'] = BytesIO(b64decode(base64_payload))

    return FrozenDict(api_result)
def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ApiHttpClientError)
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError):
    super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
    if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
        error = ErrorFactory.create(api_response=self)
        if error:
            if isinstance(error, UpstreamHttpError):
                if raise_on_upstream_error is True:
                    raise error
            else:
                raise error
def sink(self,
path: str | None = None,
name: str | None = None,
file:  | _io.BytesIO | None = None,
content: str | bytes | None = None)
Expand source code
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None):
    file_content = content or self.scrape_result['content']
    file_path = None
    file_extension = None

    if name:
        name_parts = name.split('.')
        if len(name_parts) > 1:
            file_extension = name_parts[-1]

    if not file:
        if file_extension is None:
            try:
                mime_type = self.scrape_result['response_headers']['content-type']
            except KeyError:
                mime_type = 'application/octet-stream'

            if ';' in mime_type:
                mime_type = mime_type.split(';')[0]

            file_extension = '.' + mime_type.split('/')[1]

        if not name:
            name = self.config['url'].split('/')[-1]

        if name.find(file_extension) == -1:
            name += file_extension

        file_path = path + '/' + name if path is not None else name

        if file_path == file_extension:
            url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')

            if url[-1] == '-':
                url = url[:-1]

            url += file_extension

            file_path = url

        file = open(file_path, 'wb')

    if isinstance(file_content, str):
        file_content = BytesIO(file_content.encode('utf-8'))
    elif isinstance(file_content, bytes):
        file_content = BytesIO(file_content)

    file_content.seek(0)
    with file as f:
        shutil.copyfileobj(file_content, f, length=131072)

    logger.info('file %s created' % file_path)
def upstream_result_into_response(self) ‑> requests.models.Response | None
Expand source code
def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
    if _class != Response:
        raise RuntimeError('only Response from requests package is supported at the moment')

    if self.result is None:
        return None

    if self.response.status_code != 200:
        return None

    response = Response()
    response.status_code = self.scrape_result['status_code']
    response.reason = self.scrape_result['reason']

    if self.scrape_result['content']:
        if isinstance(self.scrape_result['content'], BytesIO):
            response._content = self.scrape_result['content'].getvalue()
        elif isinstance(self.scrape_result['content'], bytes):
            response._content = self.scrape_result['content']
        elif isinstance(self.scrape_result['content'], str):
            response._content = self.scrape_result['content'].encode('utf-8')
    else:
        response._content = None

    response.headers.update(self.scrape_result['response_headers'])
    response.url = self.scrape_result['url']

    response.request = Request(
        method=self.config['method'],
        url=self.config['url'],
        headers=self.scrape_result['request_headers'],
        data=self.config['body'] if self.config['body'] else None
    )

    if 'set-cookie' in response.headers:
        for raw_cookie in response.headers['set-cookie']:
            for name, cookie in SimpleCookie(raw_cookie).items():
                expires = cookie.get('expires')

                if expires == '':
                    expires = None

                if expires:
                    try:
                        expires = parse(expires).timestamp()
                    except ValueError:
                        expires = None

                if type(expires) == str:
                    if '.' in expires:
                        expires = float(expires)
                    else:
                        expires = int(expires)

                response.cookies.set_cookie(Cookie(
                    version=cookie.get('version') if cookie.get('version') else None,
                    name=name,
                    value=cookie.value,
                    path=cookie.get('path', ''),
                    expires=expires,
                    comment=cookie.get('comment'),
                    domain=cookie.get('domain', ''),
                    secure=cookie.get('secure'),
                    port=None,
                    port_specified=False,
                    domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
                    domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
                    path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
                    discard=False,
                    comment_url=None,
                    rest={
                        'httponly': cookie.get('httponly'),
                        'samesite': cookie.get('samesite'),
                        'max-age': cookie.get('max-age')
                    }
                ))

    return response

Inherited members

class ScrapeConfig (url: str,
retry: bool = True,
method: str = 'GET',
country: str | None = None,
render_js: bool = False,
cache: bool = False,
cache_clear: bool = False,
ssl: bool = False,
dns: bool = False,
asp: bool = False,
debug: bool = False,
raise_on_upstream_error: bool = True,
cache_ttl: int | None = None,
proxy_pool: str | None = None,
session: str | None = None,
tags: List[str] | Set[str] | None = None,
format: Format | None = None,
format_options: List[FormatOption] | None = None,
extraction_template: str | None = None,
extraction_ephemeral_template: Dict | None = None,
extraction_prompt: str | None = None,
extraction_model: str | None = None,
correlation_id: str | None = None,
cookies: requests.structures.CaseInsensitiveDict | None = None,
body: str | None = None,
data: Dict | None = None,
headers: requests.structures.CaseInsensitiveDict | Dict[str, str] | None = None,
js: str = None,
rendering_wait: int = None,
rendering_stage: Literal['complete', 'domcontentloaded'] = 'complete',
wait_for_selector: str | None = None,
screenshots: Dict | None = None,
screenshot_flags: List[ScreenshotFlag] | None = None,
session_sticky_proxy: bool | None = None,
webhook: str | None = None,
timeout: int | None = None,
js_scenario: List | None = None,
extract: Dict | None = None,
os: str | None = None,
lang: List[str] | None = None,
auto_scroll: bool | None = None,
cost_budget: int | None = None)
Expand source code
class ScrapeConfig(BaseApiConfig):

    PUBLIC_DATACENTER_POOL = 'public_datacenter_pool'
    PUBLIC_RESIDENTIAL_POOL = 'public_residential_pool'

    url: str
    retry: bool = True
    method: str = 'GET'
    country: Optional[str] = None
    render_js: bool = False
    cache: bool = False
    cache_clear:bool = False
    ssl:bool = False
    dns:bool = False
    asp:bool = False
    debug: bool = False
    raise_on_upstream_error:bool = True
    cache_ttl:Optional[int] = None
    proxy_pool:Optional[str] = None
    session: Optional[str] = None
    tags: Optional[List[str]] = None
    format: Optional[Format] = None, # raw(unchanged)
    format_options: Optional[List[FormatOption]] 
    extraction_template: Optional[str] = None  # a saved template name
    extraction_ephemeral_template: Optional[Dict]  # ephemeraly declared json template
    extraction_prompt: Optional[str] = None
    extraction_model: Optional[str] = None    
    correlation_id: Optional[str] = None
    cookies: Optional[CaseInsensitiveDict] = None
    body: Optional[str] = None
    data: Optional[Dict] = None
    headers: Optional[CaseInsensitiveDict] = None
    js: str = None
    rendering_wait: int = None
    rendering_stage: Literal["complete", "domcontentloaded"] = "complete"
    wait_for_selector: Optional[str] = None
    session_sticky_proxy:bool = True
    screenshots:Optional[Dict]=None
    screenshot_flags: Optional[List[ScreenshotFlag]] = None,
    webhook:Optional[str]=None
    timeout:Optional[int]=None # in milliseconds
    js_scenario: Dict = None
    extract: Dict = None
    lang:Optional[List[str]] = None
    os:Optional[str] = None
    auto_scroll:Optional[bool] = None
    cost_budget:Optional[int] = None

    def __init__(
        self,
        url: str,
        retry: bool = True,
        method: str = 'GET',
        country: Optional[str] = None,
        render_js: bool = False,
        cache: bool = False,
        cache_clear:bool = False,
        ssl:bool = False,
        dns:bool = False,
        asp:bool = False,
        debug: bool = False,
        raise_on_upstream_error:bool = True,
        cache_ttl:Optional[int] = None,
        proxy_pool:Optional[str] = None,
        session: Optional[str] = None,
        tags: Optional[Union[List[str], Set[str]]] = None,
        format: Optional[Format] = None, # raw(unchanged)
        format_options: Optional[List[FormatOption]] = None, # raw(unchanged)
        extraction_template: Optional[str] = None,  # a saved template name
        extraction_ephemeral_template: Optional[Dict] = None,  # ephemeraly declared json template
        extraction_prompt: Optional[str] = None,
        extraction_model: Optional[str] = None,        
        correlation_id: Optional[str] = None,
        cookies: Optional[CaseInsensitiveDict] = None,
        body: Optional[str] = None,
        data: Optional[Dict] = None,
        headers: Optional[Union[CaseInsensitiveDict, Dict[str, str]]] = None,
        js: str = None,
        rendering_wait: int = None,
        rendering_stage: Literal["complete", "domcontentloaded"] = "complete",
        wait_for_selector: Optional[str] = None,
        screenshots:Optional[Dict]=None,
        screenshot_flags: Optional[List[ScreenshotFlag]] = None,
        session_sticky_proxy:Optional[bool] = None,
        webhook:Optional[str] = None,
        timeout:Optional[int] = None, # in milliseconds
        js_scenario:Optional[List] = None,
        extract:Optional[Dict] = None,
        os:Optional[str] = None,
        lang:Optional[List[str]] = None,
        auto_scroll:Optional[bool] = None,
        cost_budget:Optional[int] = None
    ):
        assert(type(url) is str)

        if isinstance(tags, List):
            tags = set(tags)

        cookies = cookies or {}
        headers = headers or {}

        self.cookies = CaseInsensitiveDict(cookies)
        self.headers = CaseInsensitiveDict(headers)
        self.url = url
        self.retry = retry
        self.method = method
        self.country = country
        self.session_sticky_proxy = session_sticky_proxy
        self.render_js = render_js
        self.cache = cache
        self.cache_clear = cache_clear
        self.asp = asp
        self.webhook = webhook
        self.session = session
        self.debug = debug
        self.cache_ttl = cache_ttl
        self.proxy_pool = proxy_pool
        self.tags = tags or set()
        self.format = format
        self.format_options = format_options
        self.extraction_template = extraction_template
        self.extraction_ephemeral_template = extraction_ephemeral_template
        self.extraction_prompt = extraction_prompt
        self.extraction_model = extraction_model        
        self.correlation_id = correlation_id
        self.wait_for_selector = wait_for_selector
        self.body = body
        self.data = data
        self.js = js
        self.rendering_wait = rendering_wait
        self.rendering_stage = rendering_stage
        self.raise_on_upstream_error = raise_on_upstream_error
        self.screenshots = screenshots
        self.screenshot_flags = screenshot_flags
        self.key = None
        self.dns = dns
        self.ssl = ssl
        self.js_scenario = js_scenario
        self.timeout = timeout
        self.extract = extract
        self.lang = lang
        self.os = os
        self.auto_scroll = auto_scroll
        self.cost_budget = cost_budget

        if cookies:
            _cookies = []

            for name, value in cookies.items():
                _cookies.append(name + '=' + value)

            if 'cookie' in self.headers:
                if self.headers['cookie'][-1] != ';':
                    self.headers['cookie'] += ';'
            else:
                self.headers['cookie'] = ''

            self.headers['cookie'] += '; '.join(_cookies)

        if self.body and self.data:
            raise ScrapeConfigError('You cannot pass both parameters body and data. You must choose')

        if method in ['POST', 'PUT', 'PATCH']:
            if self.body is None and self.data is not None:
                if 'content-type' not in self.headers:
                    self.headers['content-type'] = 'application/x-www-form-urlencoded'
                    self.body = urlencode(data)
                else:
                    if self.headers['content-type'].find('application/json') != -1:
                        self.body = json.dumps(data)
                    elif self.headers['content-type'].find('application/x-www-form-urlencoded') != -1:
                        self.body = urlencode(data)
                    else:
                        raise ScrapeConfigError('Content-Type "%s" not supported, use body parameter to pass pre encoded body according to your content type' % self.headers['content-type'])
            elif self.body is None and self.data is None:
                self.headers['content-type'] = 'text/plain'

    def to_api_params(self, key:str) -> Dict:
        params = {
            'key': self.key or key,
            'url': self.url
        }

        if self.country is not None:
            params['country'] = self.country

        for name, value in self.headers.items():
            params['headers[%s]' % name] = value

        if self.webhook is not None:
            params['webhook_name'] = self.webhook

        if self.timeout is not None:
            params['timeout'] = self.timeout

        if self.extract is not None:
            params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8')

        if self.cost_budget is not None:
            params['cost_budget'] = self.cost_budget

        if self.render_js is True:
            params['render_js'] = self._bool_to_http(self.render_js)

            if self.wait_for_selector is not None:
                params['wait_for_selector'] = self.wait_for_selector

            if self.js:
                params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

            if self.js_scenario:
                params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8')

            if self.rendering_wait:
                params['rendering_wait'] = self.rendering_wait
            
            if self.rendering_stage:
                params['rendering_stage'] = self.rendering_stage

            if self.screenshots is not None:
                for name, element in self.screenshots.items():
                    params['screenshots[%s]' % name] = element

            if self.screenshot_flags is not None:
                self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags]
                params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags)
            else:
                if self.screenshot_flags is not None:
                    logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled')

            if self.auto_scroll is True:
                params['auto_scroll'] = self._bool_to_http(self.auto_scroll)
        else:
            if self.wait_for_selector is not None:
                logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled')

            if self.screenshots:
                logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled')

            if self.js_scenario:
                logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled')

            if self.js:
                logging.warning('Params "js" is ignored. Works only if render_js is enabled')

            if self.rendering_wait:
                logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled')

        if self.asp is True:
            params['asp'] = self._bool_to_http(self.asp)

        if self.retry is False:
            params['retry'] = self._bool_to_http(self.retry)

        if self.cache is True:
            params['cache'] = self._bool_to_http(self.cache)

            if self.cache_clear is True:
                params['cache_clear'] = self._bool_to_http(self.cache_clear)

            if self.cache_ttl is not None:
                params['cache_ttl'] = self.cache_ttl
        else:
            if self.cache_clear is True:
                logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

            if self.cache_ttl is not None:
                logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

        if self.dns is True:
            params['dns'] = self._bool_to_http(self.dns)

        if self.ssl is True:
            params['ssl'] = self._bool_to_http(self.ssl)

        if self.tags:
            params['tags'] = ','.join(self.tags)

        if self.format:
            params['format'] = Format(self.format).value
            if self.format_options:
                params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options)

        if self.extraction_template and self.extraction_ephemeral_template:
            raise ScrapeConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

        if self.extraction_template:
            params['extraction_template'] = self.extraction_template

        if self.extraction_ephemeral_template:
            self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
            params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

        if self.extraction_prompt:
            params['extraction_prompt'] = quote_plus(self.extraction_prompt)

        if self.extraction_model:
            params['extraction_model'] = self.extraction_model

        if self.correlation_id:
            params['correlation_id'] = self.correlation_id

        if self.session:
            params['session'] = self.session

            if self.session_sticky_proxy is True: # false by default
                params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy)
        else:
            if self.session_sticky_proxy:
                logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled')

        if self.debug is True:
            params['debug'] = self._bool_to_http(self.debug)

        if self.proxy_pool is not None:
            params['proxy_pool'] = self.proxy_pool

        if self.lang is not None:
            params['lang'] = ','.join(self.lang)

        if self.os is not None:
            params['os'] = self.os

        return params

    @staticmethod
    def from_exported_config(config:str) -> 'ScrapeConfig':
        try:
            from msgpack import loads as msgpack_loads
        except ImportError as e:
            print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack')
            raise

        data = msgpack_loads(base64.b64decode(config))

        headers = {}

        for name, value in data['headers'].items():
            if isinstance(value, Iterable):
                headers[name] = '; '.join(value)
            else:
                headers[name] = value

        return ScrapeConfig(
            url=data['url'],
            retry=data['retry'],
            headers=headers,
            session=data['session'],
            session_sticky_proxy=data['session_sticky_proxy'],
            cache=data['cache'],
            cache_ttl=data['cache_ttl'],
            cache_clear=data['cache_clear'],
            render_js=data['render_js'],
            method=data['method'],
            asp=data['asp'],
            body=data['body'],
            ssl=data['ssl'],
            dns=data['dns'],
            country=data['country'],
            debug=data['debug'],
            correlation_id=data['correlation_id'],
            tags=data['tags'],
            format=data['format'],
            js=data['js'],
            rendering_wait=data['rendering_wait'],
            screenshots=data['screenshots'] or {},
            screenshot_flags=data['screenshot_flags'],
            proxy_pool=data['proxy_pool'],
            auto_scroll=data['auto_scroll'],
            cost_budget=data['cost_budget']
        )

    def to_dict(self) -> Dict:
        """
        Export the ScrapeConfig instance to a plain dictionary. 
        Useful for JSON-serialization or other external storage.
        """
        
        return {
            'url': self.url,
            'retry': self.retry,
            'method': self.method,
            'country': self.country,
            'render_js': self.render_js,
            'cache': self.cache,
            'cache_clear': self.cache_clear,
            'ssl': self.ssl,
            'dns': self.dns,
            'asp': self.asp,
            'debug': self.debug,
            'raise_on_upstream_error': self.raise_on_upstream_error,
            'cache_ttl': self.cache_ttl,
            'proxy_pool': self.proxy_pool,
            'session': self.session,
            'tags': list(self.tags),
            'format': Format(self.format).value if self.format else None,
            'format_options': [FormatOption(option).value for option in self.format_options] if self.format_options else None,
            'extraction_template': self.extraction_template,
            'extraction_ephemeral_template': self.extraction_ephemeral_template,
            'extraction_prompt': self.extraction_prompt,
            'extraction_model': self.extraction_model,
            'correlation_id': self.correlation_id,
            'cookies': CaseInsensitiveDict(self.cookies),
            'body': self.body,
            'data': None if self.body else self.data,
            'headers': CaseInsensitiveDict(self.headers),
            'js': self.js,
            'rendering_wait': self.rendering_wait,
            'wait_for_selector': self.wait_for_selector,
            'session_sticky_proxy': self.session_sticky_proxy,
            'screenshots': self.screenshots,
            'screenshot_flags': [ScreenshotFlag(flag).value for flag in self.screenshot_flags] if self.screenshot_flags else None,
            'webhook': self.webhook,
            'timeout': self.timeout,
            'js_scenario': self.js_scenario,
            'extract': self.extract,
            'lang': self.lang,
            'os': self.os,
            'auto_scroll': self.auto_scroll,
            'cost_budget': self.cost_budget,
        }

    @staticmethod
    def from_dict(scrape_config_dict: Dict) -> 'ScrapeConfig':
        """Create a ScrapeConfig instance from a dictionary."""
        url = scrape_config_dict.get('url', None)
        retry = scrape_config_dict.get('retry', False)
        method = scrape_config_dict.get('method', 'GET')
        country = scrape_config_dict.get('country', None)
        render_js = scrape_config_dict.get('render_js', False)
        cache = scrape_config_dict.get('cache', False)
        cache_clear = scrape_config_dict.get('cache_clear', False)
        ssl = scrape_config_dict.get('ssl', False)
        dns = scrape_config_dict.get('dns', False)
        asp = scrape_config_dict.get('asp', False)
        debug = scrape_config_dict.get('debug', False)
        raise_on_upstream_error = scrape_config_dict.get('raise_on_upstream_error', True)
        cache_ttl = scrape_config_dict.get('cache_ttl', None)
        proxy_pool = scrape_config_dict.get('proxy_pool', None)
        session = scrape_config_dict.get('session', None)
        tags = scrape_config_dict.get('tags', [])

        format = scrape_config_dict.get('format', None)
        format = Format(format) if format else None

        format_options = scrape_config_dict.get('format_options', None)
        format_options = [FormatOption(option) for option in format_options] if format_options else None

        extraction_template = scrape_config_dict.get('extraction_template', None)
        extraction_ephemeral_template = scrape_config_dict.get('extraction_ephemeral_template', None)
        extraction_prompt = scrape_config_dict.get('extraction_prompt', None)
        extraction_model = scrape_config_dict.get('extraction_model', None)
        correlation_id = scrape_config_dict.get('correlation_id', None)
        cookies = scrape_config_dict.get('cookies', {})
        body = scrape_config_dict.get('body', None)
        data = scrape_config_dict.get('data', None)
        headers = scrape_config_dict.get('headers', {})
        js = scrape_config_dict.get('js', None)
        rendering_wait = scrape_config_dict.get('rendering_wait', None)
        wait_for_selector = scrape_config_dict.get('wait_for_selector', None)
        screenshots = scrape_config_dict.get('screenshots', [])
        
        screenshot_flags = scrape_config_dict.get('screenshot_flags', [])
        screenshot_flags = [ScreenshotFlag(flag) for flag in screenshot_flags] if screenshot_flags else None

        session_sticky_proxy = scrape_config_dict.get('session_sticky_proxy', False)
        webhook = scrape_config_dict.get('webhook', None)
        timeout = scrape_config_dict.get('timeout', None)
        js_scenario = scrape_config_dict.get('js_scenario', None)
        extract = scrape_config_dict.get('extract', None)
        os = scrape_config_dict.get('os', None)
        lang = scrape_config_dict.get('lang', None)
        auto_scroll = scrape_config_dict.get('auto_scroll', None)
        cost_budget = scrape_config_dict.get('cost_budget', None)

        return ScrapeConfig(
            url=url,
            retry=retry,
            method=method,
            country=country,
            render_js=render_js,
            cache=cache,
            cache_clear=cache_clear,
            ssl=ssl,
            dns=dns,
            asp=asp,
            debug=debug,
            raise_on_upstream_error=raise_on_upstream_error,
            cache_ttl=cache_ttl,
            proxy_pool=proxy_pool,
            session=session,
            tags=tags,
            format=format,
            format_options=format_options,
            extraction_template=extraction_template,
            extraction_ephemeral_template=extraction_ephemeral_template,
            extraction_prompt=extraction_prompt,
            extraction_model=extraction_model,
            correlation_id=correlation_id,
            cookies=cookies,
            body=body,
            data=data,
            headers=headers,
            js=js,
            rendering_wait=rendering_wait,
            wait_for_selector=wait_for_selector,
            screenshots=screenshots,
            screenshot_flags=screenshot_flags,
            session_sticky_proxy=session_sticky_proxy,
            webhook=webhook,
            timeout=timeout,
            js_scenario=js_scenario,
            extract=extract,
            os=os,
            lang=lang,
            auto_scroll=auto_scroll,
            cost_budget=cost_budget,
        )

Ancestors

Class variables

var PUBLIC_DATACENTER_POOL

The type of the None singleton.

var PUBLIC_RESIDENTIAL_POOL

The type of the None singleton.

var asp : bool

The type of the None singleton.

var auto_scroll : bool | None

The type of the None singleton.

var body : str | None

The type of the None singleton.

var cache : bool

The type of the None singleton.

var cache_clear : bool

The type of the None singleton.

var cache_ttl : int | None

The type of the None singleton.

var cookies : requests.structures.CaseInsensitiveDict | None

The type of the None singleton.

var correlation_id : str | None

The type of the None singleton.

var cost_budget : int | None

The type of the None singleton.

var country : str | None

The type of the None singleton.

var data : Dict | None

The type of the None singleton.

var debug : bool

The type of the None singleton.

var dns : bool

The type of the None singleton.

var extract : Dict

The type of the None singleton.

var extraction_ephemeral_template : Dict | None

The type of the None singleton.

var extraction_model : str | None

The type of the None singleton.

var extraction_prompt : str | None

The type of the None singleton.

var extraction_template : str | None

The type of the None singleton.

var formatFormat | None

The type of the None singleton.

var format_options : List[FormatOption] | None

The type of the None singleton.

var headers : requests.structures.CaseInsensitiveDict | None

The type of the None singleton.

var js : str

The type of the None singleton.

var js_scenario : Dict

The type of the None singleton.

var lang : List[str] | None

The type of the None singleton.

var method : str

The type of the None singleton.

var os : str | None

The type of the None singleton.

var proxy_pool : str | None

The type of the None singleton.

var raise_on_upstream_error : bool

The type of the None singleton.

var render_js : bool

The type of the None singleton.

var rendering_stage : Literal['complete', 'domcontentloaded']

The type of the None singleton.

var rendering_wait : int

The type of the None singleton.

var retry : bool

The type of the None singleton.

var screenshot_flags : List[ScreenshotFlag] | None

The type of the None singleton.

var screenshots : Dict | None

The type of the None singleton.

var session : str | None

The type of the None singleton.

var session_sticky_proxy : bool

The type of the None singleton.

var ssl : bool

The type of the None singleton.

var tags : List[str] | None

The type of the None singleton.

var timeout : int | None

The type of the None singleton.

var url : str

The type of the None singleton.

var wait_for_selector : str | None

The type of the None singleton.

var webhook : str | None

The type of the None singleton.

Static methods

def from_dict(scrape_config_dict: Dict) ‑> ScrapeConfig
Expand source code
@staticmethod
def from_dict(scrape_config_dict: Dict) -> 'ScrapeConfig':
    """Create a ScrapeConfig instance from a dictionary."""
    url = scrape_config_dict.get('url', None)
    retry = scrape_config_dict.get('retry', False)
    method = scrape_config_dict.get('method', 'GET')
    country = scrape_config_dict.get('country', None)
    render_js = scrape_config_dict.get('render_js', False)
    cache = scrape_config_dict.get('cache', False)
    cache_clear = scrape_config_dict.get('cache_clear', False)
    ssl = scrape_config_dict.get('ssl', False)
    dns = scrape_config_dict.get('dns', False)
    asp = scrape_config_dict.get('asp', False)
    debug = scrape_config_dict.get('debug', False)
    raise_on_upstream_error = scrape_config_dict.get('raise_on_upstream_error', True)
    cache_ttl = scrape_config_dict.get('cache_ttl', None)
    proxy_pool = scrape_config_dict.get('proxy_pool', None)
    session = scrape_config_dict.get('session', None)
    tags = scrape_config_dict.get('tags', [])

    format = scrape_config_dict.get('format', None)
    format = Format(format) if format else None

    format_options = scrape_config_dict.get('format_options', None)
    format_options = [FormatOption(option) for option in format_options] if format_options else None

    extraction_template = scrape_config_dict.get('extraction_template', None)
    extraction_ephemeral_template = scrape_config_dict.get('extraction_ephemeral_template', None)
    extraction_prompt = scrape_config_dict.get('extraction_prompt', None)
    extraction_model = scrape_config_dict.get('extraction_model', None)
    correlation_id = scrape_config_dict.get('correlation_id', None)
    cookies = scrape_config_dict.get('cookies', {})
    body = scrape_config_dict.get('body', None)
    data = scrape_config_dict.get('data', None)
    headers = scrape_config_dict.get('headers', {})
    js = scrape_config_dict.get('js', None)
    rendering_wait = scrape_config_dict.get('rendering_wait', None)
    wait_for_selector = scrape_config_dict.get('wait_for_selector', None)
    screenshots = scrape_config_dict.get('screenshots', [])
    
    screenshot_flags = scrape_config_dict.get('screenshot_flags', [])
    screenshot_flags = [ScreenshotFlag(flag) for flag in screenshot_flags] if screenshot_flags else None

    session_sticky_proxy = scrape_config_dict.get('session_sticky_proxy', False)
    webhook = scrape_config_dict.get('webhook', None)
    timeout = scrape_config_dict.get('timeout', None)
    js_scenario = scrape_config_dict.get('js_scenario', None)
    extract = scrape_config_dict.get('extract', None)
    os = scrape_config_dict.get('os', None)
    lang = scrape_config_dict.get('lang', None)
    auto_scroll = scrape_config_dict.get('auto_scroll', None)
    cost_budget = scrape_config_dict.get('cost_budget', None)

    return ScrapeConfig(
        url=url,
        retry=retry,
        method=method,
        country=country,
        render_js=render_js,
        cache=cache,
        cache_clear=cache_clear,
        ssl=ssl,
        dns=dns,
        asp=asp,
        debug=debug,
        raise_on_upstream_error=raise_on_upstream_error,
        cache_ttl=cache_ttl,
        proxy_pool=proxy_pool,
        session=session,
        tags=tags,
        format=format,
        format_options=format_options,
        extraction_template=extraction_template,
        extraction_ephemeral_template=extraction_ephemeral_template,
        extraction_prompt=extraction_prompt,
        extraction_model=extraction_model,
        correlation_id=correlation_id,
        cookies=cookies,
        body=body,
        data=data,
        headers=headers,
        js=js,
        rendering_wait=rendering_wait,
        wait_for_selector=wait_for_selector,
        screenshots=screenshots,
        screenshot_flags=screenshot_flags,
        session_sticky_proxy=session_sticky_proxy,
        webhook=webhook,
        timeout=timeout,
        js_scenario=js_scenario,
        extract=extract,
        os=os,
        lang=lang,
        auto_scroll=auto_scroll,
        cost_budget=cost_budget,
    )

Create a ScrapeConfig instance from a dictionary.

def from_exported_config(config: str) ‑> ScrapeConfig
Expand source code
@staticmethod
def from_exported_config(config:str) -> 'ScrapeConfig':
    try:
        from msgpack import loads as msgpack_loads
    except ImportError as e:
        print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack')
        raise

    data = msgpack_loads(base64.b64decode(config))

    headers = {}

    for name, value in data['headers'].items():
        if isinstance(value, Iterable):
            headers[name] = '; '.join(value)
        else:
            headers[name] = value

    return ScrapeConfig(
        url=data['url'],
        retry=data['retry'],
        headers=headers,
        session=data['session'],
        session_sticky_proxy=data['session_sticky_proxy'],
        cache=data['cache'],
        cache_ttl=data['cache_ttl'],
        cache_clear=data['cache_clear'],
        render_js=data['render_js'],
        method=data['method'],
        asp=data['asp'],
        body=data['body'],
        ssl=data['ssl'],
        dns=data['dns'],
        country=data['country'],
        debug=data['debug'],
        correlation_id=data['correlation_id'],
        tags=data['tags'],
        format=data['format'],
        js=data['js'],
        rendering_wait=data['rendering_wait'],
        screenshots=data['screenshots'] or {},
        screenshot_flags=data['screenshot_flags'],
        proxy_pool=data['proxy_pool'],
        auto_scroll=data['auto_scroll'],
        cost_budget=data['cost_budget']
    )

Methods

def to_api_params(self, key: str) ‑> Dict
Expand source code
def to_api_params(self, key:str) -> Dict:
    params = {
        'key': self.key or key,
        'url': self.url
    }

    if self.country is not None:
        params['country'] = self.country

    for name, value in self.headers.items():
        params['headers[%s]' % name] = value

    if self.webhook is not None:
        params['webhook_name'] = self.webhook

    if self.timeout is not None:
        params['timeout'] = self.timeout

    if self.extract is not None:
        params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8')

    if self.cost_budget is not None:
        params['cost_budget'] = self.cost_budget

    if self.render_js is True:
        params['render_js'] = self._bool_to_http(self.render_js)

        if self.wait_for_selector is not None:
            params['wait_for_selector'] = self.wait_for_selector

        if self.js:
            params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

        if self.js_scenario:
            params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8')

        if self.rendering_wait:
            params['rendering_wait'] = self.rendering_wait
        
        if self.rendering_stage:
            params['rendering_stage'] = self.rendering_stage

        if self.screenshots is not None:
            for name, element in self.screenshots.items():
                params['screenshots[%s]' % name] = element

        if self.screenshot_flags is not None:
            self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags]
            params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags)
        else:
            if self.screenshot_flags is not None:
                logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled')

        if self.auto_scroll is True:
            params['auto_scroll'] = self._bool_to_http(self.auto_scroll)
    else:
        if self.wait_for_selector is not None:
            logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled')

        if self.screenshots:
            logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled')

        if self.js_scenario:
            logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled')

        if self.js:
            logging.warning('Params "js" is ignored. Works only if render_js is enabled')

        if self.rendering_wait:
            logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled')

    if self.asp is True:
        params['asp'] = self._bool_to_http(self.asp)

    if self.retry is False:
        params['retry'] = self._bool_to_http(self.retry)

    if self.cache is True:
        params['cache'] = self._bool_to_http(self.cache)

        if self.cache_clear is True:
            params['cache_clear'] = self._bool_to_http(self.cache_clear)

        if self.cache_ttl is not None:
            params['cache_ttl'] = self.cache_ttl
    else:
        if self.cache_clear is True:
            logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

        if self.cache_ttl is not None:
            logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

    if self.dns is True:
        params['dns'] = self._bool_to_http(self.dns)

    if self.ssl is True:
        params['ssl'] = self._bool_to_http(self.ssl)

    if self.tags:
        params['tags'] = ','.join(self.tags)

    if self.format:
        params['format'] = Format(self.format).value
        if self.format_options:
            params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options)

    if self.extraction_template and self.extraction_ephemeral_template:
        raise ScrapeConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

    if self.extraction_template:
        params['extraction_template'] = self.extraction_template

    if self.extraction_ephemeral_template:
        self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
        params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

    if self.extraction_prompt:
        params['extraction_prompt'] = quote_plus(self.extraction_prompt)

    if self.extraction_model:
        params['extraction_model'] = self.extraction_model

    if self.correlation_id:
        params['correlation_id'] = self.correlation_id

    if self.session:
        params['session'] = self.session

        if self.session_sticky_proxy is True: # false by default
            params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy)
    else:
        if self.session_sticky_proxy:
            logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled')

    if self.debug is True:
        params['debug'] = self._bool_to_http(self.debug)

    if self.proxy_pool is not None:
        params['proxy_pool'] = self.proxy_pool

    if self.lang is not None:
        params['lang'] = ','.join(self.lang)

    if self.os is not None:
        params['os'] = self.os

    return params
def to_dict(self) ‑> Dict
Expand source code
def to_dict(self) -> Dict:
    """
    Export the ScrapeConfig instance to a plain dictionary. 
    Useful for JSON-serialization or other external storage.
    """
    
    return {
        'url': self.url,
        'retry': self.retry,
        'method': self.method,
        'country': self.country,
        'render_js': self.render_js,
        'cache': self.cache,
        'cache_clear': self.cache_clear,
        'ssl': self.ssl,
        'dns': self.dns,
        'asp': self.asp,
        'debug': self.debug,
        'raise_on_upstream_error': self.raise_on_upstream_error,
        'cache_ttl': self.cache_ttl,
        'proxy_pool': self.proxy_pool,
        'session': self.session,
        'tags': list(self.tags),
        'format': Format(self.format).value if self.format else None,
        'format_options': [FormatOption(option).value for option in self.format_options] if self.format_options else None,
        'extraction_template': self.extraction_template,
        'extraction_ephemeral_template': self.extraction_ephemeral_template,
        'extraction_prompt': self.extraction_prompt,
        'extraction_model': self.extraction_model,
        'correlation_id': self.correlation_id,
        'cookies': CaseInsensitiveDict(self.cookies),
        'body': self.body,
        'data': None if self.body else self.data,
        'headers': CaseInsensitiveDict(self.headers),
        'js': self.js,
        'rendering_wait': self.rendering_wait,
        'wait_for_selector': self.wait_for_selector,
        'session_sticky_proxy': self.session_sticky_proxy,
        'screenshots': self.screenshots,
        'screenshot_flags': [ScreenshotFlag(flag).value for flag in self.screenshot_flags] if self.screenshot_flags else None,
        'webhook': self.webhook,
        'timeout': self.timeout,
        'js_scenario': self.js_scenario,
        'extract': self.extract,
        'lang': self.lang,
        'os': self.os,
        'auto_scroll': self.auto_scroll,
        'cost_budget': self.cost_budget,
    }

Export the ScrapeConfig instance to a plain dictionary. Useful for JSON-serialization or other external storage.

class ScraperAPI
Expand source code
class ScraperAPI:

    MONITORING_DATA_FORMAT_STRUCTURED = 'structured'
    MONITORING_DATA_FORMAT_PROMETHEUS = 'prometheus'

    MONITORING_PERIOD_SUBSCRIPTION = 'subscription'
    MONITORING_PERIOD_LAST_7D = 'last7d'
    MONITORING_PERIOD_LAST_24H = 'last24h'
    MONITORING_PERIOD_LAST_1H = 'last1h'
    MONITORING_PERIOD_LAST_5m = 'last5m'

    MONITORING_ACCOUNT_AGGREGATION = 'account'
    MONITORING_PROJECT_AGGREGATION = 'project'
    MONITORING_TARGET_AGGREGATION = 'target'

Class variables

var MONITORING_ACCOUNT_AGGREGATION

The type of the None singleton.

var MONITORING_DATA_FORMAT_PROMETHEUS

The type of the None singleton.

var MONITORING_DATA_FORMAT_STRUCTURED

The type of the None singleton.

var MONITORING_PERIOD_LAST_1H

The type of the None singleton.

var MONITORING_PERIOD_LAST_24H

The type of the None singleton.

var MONITORING_PERIOD_LAST_5m

The type of the None singleton.

var MONITORING_PERIOD_LAST_7D

The type of the None singleton.

var MONITORING_PERIOD_SUBSCRIPTION

The type of the None singleton.

var MONITORING_PROJECT_AGGREGATION

The type of the None singleton.

var MONITORING_TARGET_AGGREGATION

The type of the None singleton.

class ScrapflyAspError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflyAspError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Inherited members

class ScrapflyClient (key: str,
host: str | None = 'https://api.scrapfly.io',
verify=True,
debug: bool = False,
max_concurrency: int = 1,
connect_timeout: int = 30,
web_scraping_api_read_timeout: int = 160,
extraction_api_read_timeout: int = 35,
screenshot_api_read_timeout: int = 60,
read_timeout: int = 30,
default_read_timeout: int = 30,
reporter: Callable | None = None,
**kwargs)
Expand source code
class ScrapflyClient:

    HOST = 'https://api.scrapfly.io'
    DEFAULT_CONNECT_TIMEOUT = 30
    DEFAULT_READ_TIMEOUT = 30

    DEFAULT_WEBSCRAPING_API_READ_TIMEOUT = 160 # 155 real
    DEFAULT_SCREENSHOT_API_READ_TIMEOUT = 60  # 30 real
    DEFAULT_EXTRACTION_API_READ_TIMEOUT = 35 # 30 real

    host:str
    key:str
    max_concurrency:int
    verify:bool
    debug:bool
    distributed_mode:bool
    connect_timeout:int
    web_scraping_api_read_timeout:int
    screenshot_api_read_timeout:int
    extraction_api_read_timeout:int
    monitoring_api_read_timeout:int
    default_read_timeout:int
    brotli: bool
    reporter:Reporter
    version:str

    # @deprecated
    read_timeout:int

    CONCURRENCY_AUTO = 'auto' # retrieve the allowed concurrency from your account
    DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'

    def __init__(
        self,
        key: str,
        host: Optional[str] = HOST,
        verify=True,
        debug: bool = False,
        max_concurrency:int=1,
        connect_timeout:int = DEFAULT_CONNECT_TIMEOUT,
        web_scraping_api_read_timeout: int = DEFAULT_WEBSCRAPING_API_READ_TIMEOUT,
        extraction_api_read_timeout: int = DEFAULT_EXTRACTION_API_READ_TIMEOUT,
        screenshot_api_read_timeout: int = DEFAULT_SCREENSHOT_API_READ_TIMEOUT,

        # @deprecated
        read_timeout:int = DEFAULT_READ_TIMEOUT,
        default_read_timeout:int = DEFAULT_READ_TIMEOUT,
        reporter:Optional[Callable]=None,
        **kwargs
    ):
        if host[-1] == '/':  # remove last '/' if exists
            host = host[:-1]

        if 'distributed_mode' in kwargs:
            warnings.warn("distributed mode is deprecated and will be remove the next version -"
              " user should handle themself the session name based on the concurrency",
              DeprecationWarning,
              stacklevel=2
            )

        if 'brotli' in kwargs:
            warnings.warn("brotli arg is deprecated and will be remove the next version - "
                "brotli is disabled by default",
                DeprecationWarning,
                stacklevel=2
            )

        self.version = __version__
        self.host = host
        self.key = key
        self.verify = verify
        self.debug = debug
        self.connect_timeout = connect_timeout
        self.web_scraping_api_read_timeout = web_scraping_api_read_timeout
        self.screenshot_api_read_timeout = screenshot_api_read_timeout
        self.extraction_api_read_timeout = extraction_api_read_timeout
        self.monitoring_api_read_timeout = default_read_timeout
        self.default_read_timeout = default_read_timeout

        # @deprecated
        self.read_timeout = default_read_timeout

        self.max_concurrency = max_concurrency
        self.body_handler = ResponseBodyHandler(use_brotli=False)
        self.async_executor = ThreadPoolExecutor()
        self.http_session = None

        if not self.verify and not self.HOST.endswith('.local'):
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        if self.debug is True:
            http.client.HTTPConnection.debuglevel = 5

        if reporter is None:
            from .reporter import NoopReporter

            reporter = NoopReporter()

        self.reporter = Reporter(reporter)

    @property
    def ua(self) -> str:
        return 'ScrapflySDK/%s (Python %s, %s, %s)' % (
            self.version,
            platform.python_version(),
            platform.uname().system,
            platform.uname().machine
        )

    @cached_property
    def _http_handler(self):
        return partial(self.http_session.request if self.http_session else requests.request)

    @property
    def http(self):
        return self._http_handler

    def _scrape_request(self, scrape_config:ScrapeConfig):
        return {
            'method': scrape_config.method,
            'url': self.host + '/scrape',
            'data': scrape_config.body,
            'verify': self.verify,
            'timeout': (self.connect_timeout, self.web_scraping_api_read_timeout),
            'headers': {
                'content-type': scrape_config.headers['content-type'] if scrape_config.method in ['POST', 'PUT', 'PATCH'] else self.body_handler.content_type,
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
            'params': scrape_config.to_api_params(key=self.key)
        }
    
    def _screenshot_request(self, screenshot_config:ScreenshotConfig):
        return {
            'method': 'GET',
            'url': self.host + '/screenshot',
            'timeout': (self.connect_timeout, self.screenshot_api_read_timeout),
            'headers': {
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },            
            'params': screenshot_config.to_api_params(key=self.key)
        }        

    def _extraction_request(self, extraction_config:ExtractionConfig):
        headers = {
                'content-type': extraction_config.content_type,
                'accept-encoding': self.body_handler.content_encoding,
                'content-encoding': extraction_config.document_compression_format if extraction_config.document_compression_format else None,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
        }

        if extraction_config.document_compression_format:
            headers['content-encoding'] = extraction_config.document_compression_format.value

        return {
            'method': 'POST',
            'url': self.host + '/extraction',
            'data': extraction_config.body,
            'timeout': (self.connect_timeout, self.extraction_api_read_timeout),
            'headers': headers,
            'params': extraction_config.to_api_params(key=self.key)
        }


    def account(self) -> Union[str, Dict]:
        response = self._http_handler(
            method='GET',
            url=self.host + '/account',
            params={'key': self.key},
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')

    def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None):
        params = {'key': self.key, 'format': format}

        if period is not None:
            params['period'] = period

        if aggregation is not None:
            params['aggregation'] = ','.join(aggregation)

        response = self._http_handler(
            method='GET',
            url=self.host + '/scrape/monitoring/metrics',
            params=params,
            timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')

    def get_monitoring_target_metrics(
            self,
            domain:str,
            group_subdomain:bool=False,
            period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H,
            start:Optional[datetime.datetime]=None,
            end:Optional[datetime.datetime]=None,
    ):
        params = {
            'key': self.key,
            'domain': domain,
            'group_subdomain': group_subdomain
        }

        if (start is not None and end is None) or (start is None and end is not None):
            raise ValueError('You must provide both start and end date')

        if start is not None and end is not None:
            params['start'] = start.strftime(self.DATETIME_FORMAT)
            params['end'] = end.strftime(self.DATETIME_FORMAT)
            period = None

        params['period'] = period

        response = self._http_handler(
            method='GET',
            url=self.host + '/scrape/monitoring/metrics/target',
            timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
            params=params,
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')


    def resilient_scrape(
        self,
        scrape_config:ScrapeConfig,
        retry_on_errors:Set[Exception]={ScrapflyError},
        retry_on_status_code:Optional[List[int]]=None,
        tries: int = 5,
        delay: int = 20,
    ) -> ScrapeApiResponse:
        assert retry_on_errors is not None, 'Retry on error is None'
        assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()'

        @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay)
        def inner() -> ScrapeApiResponse:

            try:
                return self.scrape(scrape_config=scrape_config)
            except (UpstreamHttpClientError, UpstreamHttpServerError) as e:
                if retry_on_status_code is not None and e.api_response:
                    if e.api_response.upstream_status_code in retry_on_status_code:
                        raise e
                    else:
                        return e.api_response

                raise e

        return inner()

    def open(self):
        if self.http_session is None:
            self.http_session = Session()
            self.http_session.verify = self.verify
            self.http_session.timeout = (self.connect_timeout, self.default_read_timeout)
            self.http_session.params['key'] = self.key
            self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding
            self.http_session.headers['accept'] = self.body_handler.accept
            self.http_session.headers['user-agent'] = self.ua

    def close(self):
        self.http_session.close()
        self.http_session = None

    def __enter__(self) -> 'ScrapflyClient':
        self.open()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)

    async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None):
        if concurrency is None:
            concurrency = self.max_concurrency
        elif concurrency == self.CONCURRENCY_AUTO:
            concurrency = self.account()['subscription']['max_concurrency']

        loop = asyncio.get_running_loop()
        processing_tasks = []
        results = []
        processed_tasks = 0
        expected_tasks = len(scrape_configs)

        def scrape_done_callback(task:Task):
            nonlocal processed_tasks

            try:
                if task.cancelled() is True:
                    return

                error = task.exception()

                if error is not None:
                    results.append(error)
                else:
                    results.append(task.result())
            finally:
                processing_tasks.remove(task)
                processed_tasks += 1

        while scrape_configs or results or processing_tasks:
            logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

            if scrape_configs:
                if len(processing_tasks) < concurrency:
                    # @todo handle backpressure
                    for _ in range(0, concurrency - len(processing_tasks)):
                        try:
                            scrape_config = scrape_configs.pop()
                        except:
                            break

                        scrape_config.raise_on_upstream_error = False
                        task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop))
                        processing_tasks.append(task)
                        task.add_done_callback(scrape_done_callback)

            for _ in results:
                result = results.pop()
                yield result

            await asyncio.sleep(.5)

        logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse:
        """
        Scrape a website
        :param scrape_config: ScrapeConfig
        :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration
        :return: ScrapeApiResponse

        If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED',
            'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds',
            'retryable': False,
            'http_code': 422,
            'links': {
                'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED'
            }
        }
        """

        try:
            logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url))
            request_data = self._scrape_request(scrape_config=scrape_config)
            response = self._http_handler(**request_data)
            scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config)

            self.reporter.report(scrape_api_response=scrape_api_response)

            return scrape_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse:
        """
        Take a screenshot
        :param screenshot_config: ScrapeConfig
        :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration
        :return: str

        If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT',
            'message': 'For some reason we were unable to take the screenshot',
            'http_code': 422,
            'links': {
                'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT'
            }
        }
        """

        try:
            logger.debug('--> %s Screenshoting' % (screenshot_config.url))
            request_data = self._screenshot_request(screenshot_config=screenshot_config)
            response = self._http_handler(**request_data)
            screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config)
            return screenshot_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.extract, extraction_config)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse:
        """
        Extract structured data from text content
        :param extraction_config: ExtractionConfig
        :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration
        :return: str

        If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED',
            'message': 'The content type of the response is not supported for extraction',
            'http_code': 422,
            'links': {
                'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED'
            }
        }
        """

        try:
            logger.debug('--> %s Extracting data from' % (extraction_config.content_type))
            request_data = self._extraction_request(extraction_config=extraction_config)
            response = self._http_handler(**request_data)
            extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config)
            return extraction_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    def _handle_response(self, response:Response, scrape_config:ScrapeConfig) -> ScrapeApiResponse:
        try:
            api_response = self._handle_api_response(
                response=response,
                scrape_config=scrape_config,
                raise_on_upstream_error=scrape_config.raise_on_upstream_error
            )

            if scrape_config.method == 'HEAD':
                logger.debug('<-- [%s %s] %s | %ss' % (
                    api_response.response.status_code,
                    api_response.response.reason,
                    api_response.response.request.url,
                    0
                ))
            else:
                logger.debug('<-- [%s %s] %s | %ss' % (
                    api_response.result['result']['status_code'],
                    api_response.result['result']['reason'],
                    api_response.result['config']['url'],
                    api_response.result['result']['duration'])
                )

                logger.debug('Log url: %s' % api_response.result['result']['log_url'])

            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise

    def _handle_screenshot_response(self, response:Response, screenshot_config:ScreenshotConfig) -> ScreenshotApiResponse:    
        try:
            api_response = self._handle_screenshot_api_response(
                response=response,
                screenshot_config=screenshot_config,
                raise_on_upstream_error=screenshot_config.raise_on_upstream_error
            )
            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise         

    def _handle_extraction_response(self, response:Response, extraction_config:ExtractionConfig) -> ExtractionApiResponse:
        try:
            api_response = self._handle_extraction_api_response(
                response=response,
                extraction_config=extraction_config,
                raise_on_upstream_error=extraction_config.raise_on_upstream_error
            )
            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise    

    def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None):
        """
        Save a screenshot from a screenshot API response
        :param api_response: ScreenshotApiResponse
        :param name: str - name of the screenshot to save as
        :param path: Optional[str]
        """

        if screenshot_api_response.screenshot_success is not True:
            raise RuntimeError('Screenshot was not successful')

        if not screenshot_api_response.image:
            raise RuntimeError('Screenshot binary does not exist')

        content = screenshot_api_response.image
        extension_name = screenshot_api_response.metadata['extension_name']

        if path:
            os.makedirs(path, exist_ok=True)
            file_path = os.path.join(path, f'{name}.{extension_name}')
        else:
            file_path = f'{name}.{extension_name}'

        if isinstance(content, bytes):
            content = BytesIO(content)

        with open(file_path, 'wb') as f:
            shutil.copyfileobj(content, f, length=131072)

    def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None):
        """
        Save a screenshot from a scrape result
        :param api_response: ScrapeApiResponse
        :param name: str - name of the screenshot given in the scrape config
        :param path: Optional[str]
        """

        if not api_response.scrape_result['screenshots']:
            raise RuntimeError('Screenshot %s do no exists' % name)

        try:
            api_response.scrape_result['screenshots'][name]
        except KeyError:
            raise RuntimeError('Screenshot %s do no exists' % name)

        screenshot_response = self._http_handler(
            method='GET',
            url=api_response.scrape_result['screenshots'][name]['url'],
            params={'key': self.key},
            verify=self.verify
        )

        screenshot_response.raise_for_status()

        if not name.endswith('.jpg'):
            name += '.jpg'

        api_response.sink(path=path, name=name, content=screenshot_response.content)

    def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str:
        scrape_result = api_response.result['result']
        scrape_config = api_response.result['config']

        file_content = content or scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = scrape_config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)
        return file_path

    def _handle_scrape_large_objects(
        self,
        callback_url:str,
        format: Literal['clob', 'blob']
    ) -> Tuple[Union[BytesIO, str], str]:
        if format not in ['clob', 'blob']:
            raise ContentError('Large objects handle can handles format format [blob, clob], given: %s' % format)

        response = self._http_handler(**{
            'method': 'GET',
            'url': callback_url,
            'verify': self.verify,
            'timeout': (self.connect_timeout, self.default_read_timeout),
            'headers': {
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
            'params': {'key': self.key}
        })

        if self.body_handler.support(headers=response.headers):
            content = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            content = response.content

        if format == 'clob':
            return content.decode('utf-8'), 'text'

        return BytesIO(content), 'binary'

    def _handle_api_response(
        self,
        response: Response,
        scrape_config:ScrapeConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ScrapeApiResponse:

        if scrape_config.method == 'HEAD':
            body = None
        else:
            if self.body_handler.support(headers=response.headers):
                body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
            else:
                body = response.content.decode('utf-8')

        api_response:ScrapeApiResponse = ScrapeApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            scrape_config=scrape_config,
            large_object_handler=self._handle_scrape_large_objects
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

    def _handle_screenshot_api_response(
        self,
        response: Response,
        screenshot_config:ScreenshotConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ScreenshotApiResponse:

        if self.body_handler.support(headers=response.headers):
            body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            body = {'result': response.content}

        api_response:ScreenshotApiResponse = ScreenshotApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            screenshot_config=screenshot_config
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

    def _handle_extraction_api_response(
        self,
        response: Response,
        extraction_config:ExtractionConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ExtractionApiResponse:
        
        if self.body_handler.support(headers=response.headers):
            body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            body = response.content.decode('utf-8')

        api_response:ExtractionApiResponse = ExtractionApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            extraction_config=extraction_config
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

Class variables

var CONCURRENCY_AUTO

The type of the None singleton.

var DATETIME_FORMAT

The type of the None singleton.

var DEFAULT_CONNECT_TIMEOUT

The type of the None singleton.

var DEFAULT_EXTRACTION_API_READ_TIMEOUT

The type of the None singleton.

var DEFAULT_READ_TIMEOUT

The type of the None singleton.

var DEFAULT_SCREENSHOT_API_READ_TIMEOUT

The type of the None singleton.

var DEFAULT_WEBSCRAPING_API_READ_TIMEOUT

The type of the None singleton.

var HOST

The type of the None singleton.

var brotli : bool

The type of the None singleton.

var connect_timeout : int

The type of the None singleton.

var debug : bool

The type of the None singleton.

var default_read_timeout : int

The type of the None singleton.

var distributed_mode : bool

The type of the None singleton.

var extraction_api_read_timeout : int

The type of the None singleton.

var host : str

The type of the None singleton.

var key : str

The type of the None singleton.

var max_concurrency : int

The type of the None singleton.

var monitoring_api_read_timeout : int

The type of the None singleton.

var read_timeout : int

The type of the None singleton.

var reporter : scrapfly.reporter.Reporter

The type of the None singleton.

var screenshot_api_read_timeout : int

The type of the None singleton.

var verify : bool

The type of the None singleton.

var version : str

The type of the None singleton.

var web_scraping_api_read_timeout : int

The type of the None singleton.

Instance variables

prop http
Expand source code
@property
def http(self):
    return self._http_handler
prop ua : str
Expand source code
@property
def ua(self) -> str:
    return 'ScrapflySDK/%s (Python %s, %s, %s)' % (
        self.version,
        platform.python_version(),
        platform.uname().system,
        platform.uname().machine
    )

Methods

def account(self) ‑> str | Dict
Expand source code
def account(self) -> Union[str, Dict]:
    response = self._http_handler(
        method='GET',
        url=self.host + '/account',
        params={'key': self.key},
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content, response.headers['content-type'])

    return response.content.decode('utf-8')
async def async_extraction(self,
extraction_config: ExtractionConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ExtractionApiResponse
Expand source code
async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.extract, extraction_config)
async def async_scrape(self,
scrape_config: ScrapeConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ScrapeApiResponse
Expand source code
async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)
async def async_screenshot(self,
screenshot_config: ScreenshotConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ScreenshotApiResponse
Expand source code
async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config)
def close(self)
Expand source code
def close(self):
    self.http_session.close()
    self.http_session = None
async def concurrent_scrape(self,
scrape_configs: List[ScrapeConfig],
concurrency: int | None = None)
Expand source code
async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None):
    if concurrency is None:
        concurrency = self.max_concurrency
    elif concurrency == self.CONCURRENCY_AUTO:
        concurrency = self.account()['subscription']['max_concurrency']

    loop = asyncio.get_running_loop()
    processing_tasks = []
    results = []
    processed_tasks = 0
    expected_tasks = len(scrape_configs)

    def scrape_done_callback(task:Task):
        nonlocal processed_tasks

        try:
            if task.cancelled() is True:
                return

            error = task.exception()

            if error is not None:
                results.append(error)
            else:
                results.append(task.result())
        finally:
            processing_tasks.remove(task)
            processed_tasks += 1

    while scrape_configs or results or processing_tasks:
        logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

        if scrape_configs:
            if len(processing_tasks) < concurrency:
                # @todo handle backpressure
                for _ in range(0, concurrency - len(processing_tasks)):
                    try:
                        scrape_config = scrape_configs.pop()
                    except:
                        break

                    scrape_config.raise_on_upstream_error = False
                    task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop))
                    processing_tasks.append(task)
                    task.add_done_callback(scrape_done_callback)

        for _ in results:
            result = results.pop()
            yield result

        await asyncio.sleep(.5)

    logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))
def extract(self,
extraction_config: ExtractionConfig,
no_raise: bool = False) ‑> ExtractionApiResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse:
    """
    Extract structured data from text content
    :param extraction_config: ExtractionConfig
    :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration
    :return: str

    If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error.
    If the error is not none, you will get the following structure for example

    'error': {
        'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED',
        'message': 'The content type of the response is not supported for extraction',
        'http_code': 422,
        'links': {
            'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED'
        }
    }
    """

    try:
        logger.debug('--> %s Extracting data from' % (extraction_config.content_type))
        request_data = self._extraction_request(extraction_config=extraction_config)
        response = self._http_handler(**request_data)
        extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config)
        return extraction_api_response
    except BaseException as e:
        self.reporter.report(error=e)

        if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
            return e.api_response

        raise e

Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str

If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } }

def get_monitoring_metrics(self,
format: str = 'structured',
period: str | None = None,
aggregation: List[Literal['account', 'project', 'target']] | None = None)
Expand source code
def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None):
    params = {'key': self.key, 'format': format}

    if period is not None:
        params['period'] = period

    if aggregation is not None:
        params['aggregation'] = ','.join(aggregation)

    response = self._http_handler(
        method='GET',
        url=self.host + '/scrape/monitoring/metrics',
        params=params,
        timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content, response.headers['content-type'])

    return response.content.decode('utf-8')
def get_monitoring_target_metrics(self,
domain: str,
group_subdomain: bool = False,
period: Literal['subscription', 'last7d', 'last24h', 'last1h', 'last5m'] | None = 'last24h',
start: datetime.datetime | None = None,
end: datetime.datetime | None = None)
Expand source code
def get_monitoring_target_metrics(
        self,
        domain:str,
        group_subdomain:bool=False,
        period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H,
        start:Optional[datetime.datetime]=None,
        end:Optional[datetime.datetime]=None,
):
    params = {
        'key': self.key,
        'domain': domain,
        'group_subdomain': group_subdomain
    }

    if (start is not None and end is None) or (start is None and end is not None):
        raise ValueError('You must provide both start and end date')

    if start is not None and end is not None:
        params['start'] = start.strftime(self.DATETIME_FORMAT)
        params['end'] = end.strftime(self.DATETIME_FORMAT)
        period = None

    params['period'] = period

    response = self._http_handler(
        method='GET',
        url=self.host + '/scrape/monitoring/metrics/target',
        timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
        params=params,
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content, response.headers['content-type'])

    return response.content.decode('utf-8')
def open(self)
Expand source code
def open(self):
    if self.http_session is None:
        self.http_session = Session()
        self.http_session.verify = self.verify
        self.http_session.timeout = (self.connect_timeout, self.default_read_timeout)
        self.http_session.params['key'] = self.key
        self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding
        self.http_session.headers['accept'] = self.body_handler.accept
        self.http_session.headers['user-agent'] = self.ua
def resilient_scrape(self,
scrape_config: ScrapeConfig,
retry_on_errors: Set[Exception] = {<class 'scrapfly.errors.ScrapflyError'>},
retry_on_status_code: List[int] | None = None,
tries: int = 5,
delay: int = 20) ‑> ScrapeApiResponse
Expand source code
def resilient_scrape(
    self,
    scrape_config:ScrapeConfig,
    retry_on_errors:Set[Exception]={ScrapflyError},
    retry_on_status_code:Optional[List[int]]=None,
    tries: int = 5,
    delay: int = 20,
) -> ScrapeApiResponse:
    assert retry_on_errors is not None, 'Retry on error is None'
    assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()'

    @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay)
    def inner() -> ScrapeApiResponse:

        try:
            return self.scrape(scrape_config=scrape_config)
        except (UpstreamHttpClientError, UpstreamHttpServerError) as e:
            if retry_on_status_code is not None and e.api_response:
                if e.api_response.upstream_status_code in retry_on_status_code:
                    raise e
                else:
                    return e.api_response

            raise e

    return inner()
def save_scrape_screenshot(self,
api_response: ScrapeApiResponse,
name: str,
path: str | None = None)
Expand source code
def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None):
    """
    Save a screenshot from a scrape result
    :param api_response: ScrapeApiResponse
    :param name: str - name of the screenshot given in the scrape config
    :param path: Optional[str]
    """

    if not api_response.scrape_result['screenshots']:
        raise RuntimeError('Screenshot %s do no exists' % name)

    try:
        api_response.scrape_result['screenshots'][name]
    except KeyError:
        raise RuntimeError('Screenshot %s do no exists' % name)

    screenshot_response = self._http_handler(
        method='GET',
        url=api_response.scrape_result['screenshots'][name]['url'],
        params={'key': self.key},
        verify=self.verify
    )

    screenshot_response.raise_for_status()

    if not name.endswith('.jpg'):
        name += '.jpg'

    api_response.sink(path=path, name=name, content=screenshot_response.content)

Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str]

def save_screenshot(self,
screenshot_api_response: ScreenshotApiResponse,
name: str,
path: str | None = None)
Expand source code
def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None):
    """
    Save a screenshot from a screenshot API response
    :param api_response: ScreenshotApiResponse
    :param name: str - name of the screenshot to save as
    :param path: Optional[str]
    """

    if screenshot_api_response.screenshot_success is not True:
        raise RuntimeError('Screenshot was not successful')

    if not screenshot_api_response.image:
        raise RuntimeError('Screenshot binary does not exist')

    content = screenshot_api_response.image
    extension_name = screenshot_api_response.metadata['extension_name']

    if path:
        os.makedirs(path, exist_ok=True)
        file_path = os.path.join(path, f'{name}.{extension_name}')
    else:
        file_path = f'{name}.{extension_name}'

    if isinstance(content, bytes):
        content = BytesIO(content)

    with open(file_path, 'wb') as f:
        shutil.copyfileobj(content, f, length=131072)

Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str]

def scrape(self,
scrape_config: ScrapeConfig,
no_raise: bool = False) ‑> ScrapeApiResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse:
    """
    Scrape a website
    :param scrape_config: ScrapeConfig
    :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration
    :return: ScrapeApiResponse

    If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error.
    If the error is not none, you will get the following structure for example

    'error': {
        'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED',
        'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds',
        'retryable': False,
        'http_code': 422,
        'links': {
            'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED'
        }
    }
    """

    try:
        logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url))
        request_data = self._scrape_request(scrape_config=scrape_config)
        response = self._http_handler(**request_data)
        scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config)

        self.reporter.report(scrape_api_response=scrape_api_response)

        return scrape_api_response
    except BaseException as e:
        self.reporter.report(error=e)

        if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
            return e.api_response

        raise e

Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse

If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } }

def screenshot(self,
screenshot_config: ScreenshotConfig,
no_raise: bool = False) ‑> ScreenshotApiResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse:
    """
    Take a screenshot
    :param screenshot_config: ScrapeConfig
    :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration
    :return: str

    If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error.
    If the error is not none, you will get the following structure for example

    'error': {
        'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT',
        'message': 'For some reason we were unable to take the screenshot',
        'http_code': 422,
        'links': {
            'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT'
        }
    }
    """

    try:
        logger.debug('--> %s Screenshoting' % (screenshot_config.url))
        request_data = self._screenshot_request(screenshot_config=screenshot_config)
        response = self._http_handler(**request_data)
        screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config)
        return screenshot_api_response
    except BaseException as e:
        self.reporter.report(error=e)

        if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
            return e.api_response

        raise e

Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str

If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } }

def sink(self,
api_response: ScrapeApiResponse,
content: str | bytes | None = None,
path: str | None = None,
name: str | None = None,
file:  | _io.BytesIO | None = None) ‑> str
Expand source code
def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str:
    scrape_result = api_response.result['result']
    scrape_config = api_response.result['config']

    file_content = content or scrape_result['content']
    file_path = None
    file_extension = None

    if name:
        name_parts = name.split('.')
        if len(name_parts) > 1:
            file_extension = name_parts[-1]

    if not file:
        if file_extension is None:
            try:
                mime_type = scrape_result['response_headers']['content-type']
            except KeyError:
                mime_type = 'application/octet-stream'

            if ';' in mime_type:
                mime_type = mime_type.split(';')[0]

            file_extension = '.' + mime_type.split('/')[1]

        if not name:
            name = scrape_config['url'].split('/')[-1]

        if name.find(file_extension) == -1:
            name += file_extension

        file_path = path + '/' + name if path else name

        if file_path == file_extension:
            url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-')

            if url[-1] == '-':
                url = url[:-1]

            url += file_extension

            file_path = url

        file = open(file_path, 'wb')

    if isinstance(file_content, str):
        file_content = BytesIO(file_content.encode('utf-8'))
    elif isinstance(file_content, bytes):
        file_content = BytesIO(file_content)

    file_content.seek(0)
    with file as f:
        shutil.copyfileobj(file_content, f, length=131072)

    logger.info('file %s created' % file_path)
    return file_path
class ScrapflyError (message: str,
code: str,
http_status_code: int,
resource: str | None = None,
is_retryable: bool = False,
retry_delay: int | None = None,
retry_times: int | None = None,
documentation_url: str | None = None,
api_response: ForwardRef('ApiResponse') | None = None)
Expand source code
class ScrapflyError(Exception):
    KIND_HTTP_BAD_RESPONSE = 'HTTP_BAD_RESPONSE'
    KIND_SCRAPFLY_ERROR = 'SCRAPFLY_ERROR'

    RESOURCE_PROXY = 'PROXY'
    RESOURCE_THROTTLE = 'THROTTLE'
    RESOURCE_SCRAPE = 'SCRAPE'
    RESOURCE_ASP = 'ASP'
    RESOURCE_SCHEDULE = 'SCHEDULE'
    RESOURCE_WEBHOOK = 'WEBHOOK'
    RESOURCE_SESSION = 'SESSION'

    def __init__(
        self,
        message: str,
        code: str,
        http_status_code: int,
        resource: Optional[str]=None,
        is_retryable: bool = False,
        retry_delay: Optional[int] = None,
        retry_times: Optional[int] = None,
        documentation_url: Optional[str] = None,
        api_response: Optional['ApiResponse'] = None
    ):
        self.message = message
        self.code = code
        self.retry_delay = retry_delay
        self.retry_times = retry_times
        self.resource = resource
        self.is_retryable = is_retryable
        self.documentation_url = documentation_url
        self.api_response = api_response
        self.http_status_code = http_status_code

        super().__init__(self.message, str(self.code))

    def __str__(self):
        message = self.message

        if self.documentation_url is not None:
            message += '. Learn more: %s' % self.documentation_url

        return message

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

  • scrapfly.errors.ExtraUsageForbidden
  • scrapfly.errors.HttpError

Class variables

var KIND_HTTP_BAD_RESPONSE

The type of the None singleton.

var KIND_SCRAPFLY_ERROR

The type of the None singleton.

var RESOURCE_ASP

The type of the None singleton.

var RESOURCE_PROXY

The type of the None singleton.

var RESOURCE_SCHEDULE

The type of the None singleton.

var RESOURCE_SCRAPE

The type of the None singleton.

var RESOURCE_SESSION

The type of the None singleton.

var RESOURCE_THROTTLE

The type of the None singleton.

var RESOURCE_WEBHOOK

The type of the None singleton.

class ScrapflyProxyError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflyProxyError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Inherited members

class ScrapflyScheduleError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflyScheduleError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Inherited members

class ScrapflyScrapeError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflyScrapeError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Inherited members

class ScrapflySessionError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflySessionError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Inherited members

class ScrapflyThrottleError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflyThrottleError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Inherited members

class ScrapflyWebhookError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflyWebhookError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Inherited members

class ScreenshotAPIError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScreenshotAPIError(HttpError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Inherited members

class ScreenshotApiResponse (request: requests.models.Request,
response: requests.models.Response,
screenshot_config: ScreenshotConfig,
api_result: bytes | None = None)
Expand source code
class ScreenshotApiResponse(ApiResponse):
    def __init__(self, request: Request, response: Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None):
        super().__init__(request, response)
        self.screenshot_config = screenshot_config
        self.result = self.handle_api_result(api_result)

    @property
    def image(self) -> Optional[str]:
        binary = self.result.get('result', None)
        if binary is None:
            return ''

        return binary

    @property
    def metadata(self) -> Optional[Dict]:
        if not self.image:
            return {}

        content_type = self.response.headers.get('content-type')
        extension_name = content_type[content_type.find('/') + 1:].split(';')[0]

        return {
            'extension_name': extension_name,
            'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'),
            'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url')
        }

    @property
    def screenshot_success(self) -> bool:
        if not self.image:
            return False

        return True

    @property
    def error(self) -> Optional[Dict]:
        if self.image:
            return None

        if self.screenshot_success is False:
            return self.result

    def _is_api_error(self, api_result: Dict) -> bool:
        if api_result is None:
            return True

        return 'error_id' in api_result

    def handle_api_result(self, api_result: bytes) -> FrozenDict:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        return api_result

    def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError):
        super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Ancestors

Instance variables

prop error : Dict | None
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.image:
        return None

    if self.screenshot_success is False:
        return self.result
prop image : str | None
Expand source code
@property
def image(self) -> Optional[str]:
    binary = self.result.get('result', None)
    if binary is None:
        return ''

    return binary
prop metadata : Dict | None
Expand source code
@property
def metadata(self) -> Optional[Dict]:
    if not self.image:
        return {}

    content_type = self.response.headers.get('content-type')
    extension_name = content_type[content_type.find('/') + 1:].split(';')[0]

    return {
        'extension_name': extension_name,
        'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'),
        'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url')
    }
prop screenshot_success : bool
Expand source code
@property
def screenshot_success(self) -> bool:
    if not self.image:
        return False

    return True

Methods

def handle_api_result(self, api_result: bytes) ‑> FrozenDict
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict:
    if self._is_api_error(api_result=api_result) is True:
        return FrozenDict(api_result)

    return api_result
def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ScreenshotAPIError)
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError):
    super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Inherited members

class ScreenshotConfig (url: str,
format: Format | None = None,
capture: str | None = None,
resolution: str | None = None,
country: str | None = None,
timeout: int | None = None,
rendering_wait: int | None = None,
wait_for_selector: str | None = None,
options: List[Options] | None = None,
auto_scroll: bool | None = None,
js: str | None = None,
cache: bool | None = None,
cache_ttl: bool | None = None,
cache_clear: bool | None = None,
webhook: str | None = None,
raise_on_upstream_error: bool = True)
Expand source code
class ScreenshotConfig(BaseApiConfig):
    url: str
    format: Optional[Format] = None
    capture: Optional[str] = None
    resolution: Optional[str] = None
    country: Optional[str] = None
    timeout: Optional[int] = None # in milliseconds
    rendering_wait: Optional[int] = None # in milliseconds
    wait_for_selector: Optional[str] = None
    options: Optional[List[Options]] = None
    auto_scroll: Optional[bool] = None
    js: Optional[str] = None
    cache: Optional[bool] = None
    cache_ttl: Optional[bool] = None
    cache_clear: Optional[bool] = None
    webhook: Optional[str] = None
    raise_on_upstream_error: bool = True

    def __init__(
        self,
        url: str,
        format: Optional[Format] = None,
        capture: Optional[str] = None,
        resolution: Optional[str] = None,
        country: Optional[str] = None,
        timeout: Optional[int] = None, # in milliseconds
        rendering_wait: Optional[int] = None, # in milliseconds
        wait_for_selector: Optional[str] = None,
        options: Optional[List[Options]] = None,
        auto_scroll: Optional[bool] = None,
        js: Optional[str] = None,
        cache: Optional[bool] = None,
        cache_ttl: Optional[bool] = None,
        cache_clear: Optional[bool] = None,
        webhook: Optional[str] = None,
        raise_on_upstream_error: bool = True
    ):
        assert(type(url) is str)

        self.url = url
        self.key = None
        self.format = format
        self.capture = capture
        self.resolution = resolution
        self.country = country
        self.timeout = timeout
        self.rendering_wait = rendering_wait
        self.wait_for_selector = wait_for_selector
        self.options = [Options(flag) for flag in options] if options else None
        self.auto_scroll = auto_scroll
        self.js = js
        self.cache = cache
        self.cache_ttl = cache_ttl
        self.cache_clear = cache_clear
        self.webhook = webhook
        self.raise_on_upstream_error = raise_on_upstream_error

    def to_api_params(self, key:str) -> Dict:
        params = {
            'key': self.key or key,
            'url': self.url
        }

        if self.format:
            params['format'] = Format(self.format).value

        if self.capture:
            params['capture'] = self.capture

        if self.resolution:
            params['resolution'] = self.resolution

        if self.country is not None:
            params['country'] = self.country

        if self.timeout is not None:
            params['timeout'] = self.timeout

        if self.rendering_wait is not None:
            params['rendering_wait'] = self.rendering_wait

        if self.wait_for_selector is not None:
            params['wait_for_selector'] = self.wait_for_selector            

        if self.options is not None:
            params["options"] = ",".join(flag.value for flag in self.options)

        if self.auto_scroll is not None:
            params['auto_scroll'] = self._bool_to_http(self.auto_scroll)

        if self.js:
            params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

        if self.cache is not None:
            params['cache'] = self._bool_to_http(self.cache)
            
            if self.cache_ttl is not None:
                params['cache_ttl'] = self._bool_to_http(self.cache_ttl)

            if self.cache_clear is not None:
                params['cache_clear'] = self._bool_to_http(self.cache_clear)

        else:
            if self.cache_ttl is not None:
                logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

            if self.cache_clear is not None:
                logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

        if self.webhook is not None:
            params['webhook_name'] = self.webhook

        return params

    def to_dict(self) -> Dict:
        """
        Export the ScreenshotConfig instance to a plain dictionary.
        """
        return {
            'url': self.url,
            'format': Format(self.format).value if self.format else None,
            'capture': self.capture,
            'resolution': self.resolution,
            'country': self.country,
            'timeout': self.timeout,
            'rendering_wait': self.rendering_wait,
            'wait_for_selector': self.wait_for_selector,
            'options': [Options(option).value for option in self.options] if self.options else None,
            'auto_scroll': self.auto_scroll,
            'js': self.js,
            'cache': self.cache,
            'cache_ttl': self.cache_ttl,
            'cache_clear': self.cache_clear,
            'webhook': self.webhook,
            'raise_on_upstream_error': self.raise_on_upstream_error
        }
    
    @staticmethod
    def from_dict(screenshot_config_dict: Dict) -> 'ScreenshotConfig':
        """Create a ScreenshotConfig instance from a dictionary."""
        url = screenshot_config_dict.get('url', None)

        format = screenshot_config_dict.get('format', None)
        format = Format(format) if format else None

        capture = screenshot_config_dict.get('capture', None)
        resolution = screenshot_config_dict.get('resolution', None)
        country = screenshot_config_dict.get('country', None)
        timeout = screenshot_config_dict.get('timeout', None)
        rendering_wait = screenshot_config_dict.get('rendering_wait', None)
        wait_for_selector = screenshot_config_dict.get('wait_for_selector', None)

        options = screenshot_config_dict.get('options', None)
        options = [Options(option) for option in options] if options else None

        auto_scroll = screenshot_config_dict.get('auto_scroll', None)
        js = screenshot_config_dict.get('js', None)
        cache = screenshot_config_dict.get('cache', None)
        cache_ttl = screenshot_config_dict.get('cache_ttl', None)
        cache_clear = screenshot_config_dict.get('cache_clear', None)
        webhook = screenshot_config_dict.get('webhook', None)
        raise_on_upstream_error = screenshot_config_dict.get('raise_on_upstream_error', True)

        return ScreenshotConfig(
            url=url,
            format=format,
            capture=capture,
            resolution=resolution,
            country=country,
            timeout=timeout,
            rendering_wait=rendering_wait,
            wait_for_selector=wait_for_selector,
            options=options,
            auto_scroll=auto_scroll,
            js=js,
            cache=cache,
            cache_ttl=cache_ttl,
            cache_clear=cache_clear,
            webhook=webhook,
            raise_on_upstream_error=raise_on_upstream_error
        )

Ancestors

Class variables

var auto_scroll : bool | None

The type of the None singleton.

var cache : bool | None

The type of the None singleton.

var cache_clear : bool | None

The type of the None singleton.

var cache_ttl : bool | None

The type of the None singleton.

var capture : str | None

The type of the None singleton.

var country : str | None

The type of the None singleton.

var formatFormat | None

The type of the None singleton.

var js : str | None

The type of the None singleton.

var options : List[Options] | None

The type of the None singleton.

var raise_on_upstream_error : bool

The type of the None singleton.

var rendering_wait : int | None

The type of the None singleton.

var resolution : str | None

The type of the None singleton.

var timeout : int | None

The type of the None singleton.

var url : str

The type of the None singleton.

var wait_for_selector : str | None

The type of the None singleton.

var webhook : str | None

The type of the None singleton.

Static methods

def from_dict(screenshot_config_dict: Dict) ‑> ScreenshotConfig
Expand source code
@staticmethod
def from_dict(screenshot_config_dict: Dict) -> 'ScreenshotConfig':
    """Create a ScreenshotConfig instance from a dictionary."""
    url = screenshot_config_dict.get('url', None)

    format = screenshot_config_dict.get('format', None)
    format = Format(format) if format else None

    capture = screenshot_config_dict.get('capture', None)
    resolution = screenshot_config_dict.get('resolution', None)
    country = screenshot_config_dict.get('country', None)
    timeout = screenshot_config_dict.get('timeout', None)
    rendering_wait = screenshot_config_dict.get('rendering_wait', None)
    wait_for_selector = screenshot_config_dict.get('wait_for_selector', None)

    options = screenshot_config_dict.get('options', None)
    options = [Options(option) for option in options] if options else None

    auto_scroll = screenshot_config_dict.get('auto_scroll', None)
    js = screenshot_config_dict.get('js', None)
    cache = screenshot_config_dict.get('cache', None)
    cache_ttl = screenshot_config_dict.get('cache_ttl', None)
    cache_clear = screenshot_config_dict.get('cache_clear', None)
    webhook = screenshot_config_dict.get('webhook', None)
    raise_on_upstream_error = screenshot_config_dict.get('raise_on_upstream_error', True)

    return ScreenshotConfig(
        url=url,
        format=format,
        capture=capture,
        resolution=resolution,
        country=country,
        timeout=timeout,
        rendering_wait=rendering_wait,
        wait_for_selector=wait_for_selector,
        options=options,
        auto_scroll=auto_scroll,
        js=js,
        cache=cache,
        cache_ttl=cache_ttl,
        cache_clear=cache_clear,
        webhook=webhook,
        raise_on_upstream_error=raise_on_upstream_error
    )

Create a ScreenshotConfig instance from a dictionary.

Methods

def to_api_params(self, key: str) ‑> Dict
Expand source code
def to_api_params(self, key:str) -> Dict:
    params = {
        'key': self.key or key,
        'url': self.url
    }

    if self.format:
        params['format'] = Format(self.format).value

    if self.capture:
        params['capture'] = self.capture

    if self.resolution:
        params['resolution'] = self.resolution

    if self.country is not None:
        params['country'] = self.country

    if self.timeout is not None:
        params['timeout'] = self.timeout

    if self.rendering_wait is not None:
        params['rendering_wait'] = self.rendering_wait

    if self.wait_for_selector is not None:
        params['wait_for_selector'] = self.wait_for_selector            

    if self.options is not None:
        params["options"] = ",".join(flag.value for flag in self.options)

    if self.auto_scroll is not None:
        params['auto_scroll'] = self._bool_to_http(self.auto_scroll)

    if self.js:
        params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

    if self.cache is not None:
        params['cache'] = self._bool_to_http(self.cache)
        
        if self.cache_ttl is not None:
            params['cache_ttl'] = self._bool_to_http(self.cache_ttl)

        if self.cache_clear is not None:
            params['cache_clear'] = self._bool_to_http(self.cache_clear)

    else:
        if self.cache_ttl is not None:
            logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

        if self.cache_clear is not None:
            logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

    if self.webhook is not None:
        params['webhook_name'] = self.webhook

    return params
def to_dict(self) ‑> Dict
Expand source code
def to_dict(self) -> Dict:
    """
    Export the ScreenshotConfig instance to a plain dictionary.
    """
    return {
        'url': self.url,
        'format': Format(self.format).value if self.format else None,
        'capture': self.capture,
        'resolution': self.resolution,
        'country': self.country,
        'timeout': self.timeout,
        'rendering_wait': self.rendering_wait,
        'wait_for_selector': self.wait_for_selector,
        'options': [Options(option).value for option in self.options] if self.options else None,
        'auto_scroll': self.auto_scroll,
        'js': self.js,
        'cache': self.cache,
        'cache_ttl': self.cache_ttl,
        'cache_clear': self.cache_clear,
        'webhook': self.webhook,
        'raise_on_upstream_error': self.raise_on_upstream_error
    }

Export the ScreenshotConfig instance to a plain dictionary.

class UpstreamHttpClientError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class UpstreamHttpClientError(UpstreamHttpError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.UpstreamHttpError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

Inherited members

class UpstreamHttpError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class UpstreamHttpError(HttpError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

Inherited members

class UpstreamHttpServerError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class UpstreamHttpServerError(UpstreamHttpClientError):
    pass

Common base class for all non-exit exceptions.

Ancestors

Inherited members