Package scrapfly

Expand source code
__version__ = '0.8.19'

from typing import Tuple
from .errors import ScrapflyError
from .errors import ScrapflyAspError
from .errors import ScrapflyProxyError
from .errors import ScrapflyScheduleError
from .errors import ScrapflyScrapeError
from .errors import ScrapflySessionError
from .errors import ScrapflyThrottleError
from .errors import ScrapflyWebhookError
from .errors import EncoderError
from .errors import ErrorFactory
from .errors import HttpError
from .errors import UpstreamHttpError
from .errors import UpstreamHttpClientError
from .errors import UpstreamHttpServerError
from .errors import ApiHttpClientError
from .errors import ApiHttpServerError
from .errors import ScreenshotAPIError
from .errors import ExtractionAPIError
from .api_response import ScrapeApiResponse, ScreenshotApiResponse, ExtractionApiResponse, ResponseBodyHandler
from .client import ScrapflyClient, ScraperAPI, MonitoringTargetPeriod, MonitoringAggregation
from .scrape_config import ScrapeConfig
from .screenshot_config import ScreenshotConfig
from .extraction_config import ExtractionConfig


__all__: Tuple[str, ...] = (
    'ScrapflyError',
    'ScrapflyAspError',
    'ScrapflyProxyError',
    'ScrapflyScheduleError',
    'ScrapflyScrapeError',
    'ScrapflySessionError',
    'ScrapflyThrottleError',
    'ScrapflyWebhookError',
    'UpstreamHttpError',
    'UpstreamHttpClientError',
    'UpstreamHttpServerError',
    'ApiHttpClientError',
    'ApiHttpServerError',
    'EncoderError',
    'ScrapeApiResponse',
    'ScreenshotApiResponse',
    'ExtractionApiResponse',
    'ErrorFactory',
    'HttpError',
    'ScrapflyClient',
    'ResponseBodyHandler',
    'ScrapeConfig',
    'ScreenshotConfig',
    'ExtractionConfig',
    'ScreenshotAPIError',
    'ExtractionAPIError',
    'ScraperAPI',
    'MonitoringTargetPeriod',
    'MonitoringAggregation',
)

Sub-modules

scrapfly.api_config
scrapfly.api_response
scrapfly.client
scrapfly.errors
scrapfly.extraction_config
scrapfly.frozen_dict
scrapfly.polyfill
scrapfly.reporter
scrapfly.scrape_config
scrapfly.scrapy
scrapfly.screenshot_config
scrapfly.webhook

Classes

class ApiHttpClientError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ApiHttpClientError(HttpError):
    pass

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

  • ApiHttpServerError
  • scrapfly.errors.BadApiKeyError
  • scrapfly.errors.PaymentRequired
  • scrapfly.errors.TooManyRequest
class ApiHttpServerError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ApiHttpServerError(ApiHttpClientError):
    pass

Ancestors

class EncoderError (content: str)

Common base class for all exceptions

Expand source code
class EncoderError(BaseException):

    def __init__(self, content:str):
        self.content = content
        super().__init__()

    def __str__(self) -> str:
        return self.content

    def __repr__(self):
        return "Invalid payload: %s" % self.content

Ancestors

  • builtins.BaseException
class ErrorFactory
Expand source code
class ErrorFactory:
    RESOURCE_TO_ERROR = {
        ScrapflyError.RESOURCE_SCRAPE: ScrapflyScrapeError,
        ScrapflyError.RESOURCE_WEBHOOK: ScrapflyWebhookError,
        ScrapflyError.RESOURCE_PROXY: ScrapflyProxyError,
        ScrapflyError.RESOURCE_SCHEDULE: ScrapflyScheduleError,
        ScrapflyError.RESOURCE_ASP: ScrapflyAspError,
        ScrapflyError.RESOURCE_SESSION: ScrapflySessionError
    }

    # Notable http error has own class for more convenience
    # Only applicable for generic API error
    HTTP_STATUS_TO_ERROR = {
        401: BadApiKeyError,
        402: PaymentRequired,
        429: TooManyRequest
    }

    @staticmethod
    def _get_resource(code: str) -> Optional[Tuple[str, str]]:

        if isinstance(code, str) and '::' in code:
            _, resource, _ = code.split('::')
            return resource

        return None

    @staticmethod
    def create(api_response: 'ScrapeApiResponse'):
        is_retryable = False
        kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR
        http_code = api_response.status_code
        retry_delay = 5
        retry_times = 3
        description = None
        error_url = 'https://scrapfly.io/docs/scrape-api/errors#api'
        code = api_response.error['code']

        if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
            http_code = api_response.scrape_result['status_code']

        if 'description' in api_response.error:
            description = api_response.error['description']

        message = '%s %s %s' % (str(http_code), code, api_response.error['message'])

        if 'doc_url' in api_response.error:
            error_url = api_response.error['doc_url']

        if 'retryable' in api_response.error:
            is_retryable = api_response.error['retryable']

        resource = ErrorFactory._get_resource(code=code)

        if is_retryable is True:
            if 'X-Retry' in api_response.headers:
                retry_delay = int(api_response.headers['Retry-After'])

        message = '%s: %s' % (message, description) if description else message

        if retry_delay is not None and is_retryable is True:
            message = '%s. Retry delay : %s seconds' % (message, str(retry_delay))

        args = {
            'message': message,
            'code': code,
            'http_status_code': http_code,
            'is_retryable': is_retryable,
            'api_response': api_response,
            'resource': resource,
            'retry_delay': retry_delay,
            'retry_times': retry_times,
            'documentation_url': error_url,
            'request': api_response.request,
            'response': api_response.response
        }

        if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE:
            if http_code >= 500:
                return ApiHttpServerError(**args)

            is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR

            if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error:
                return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args)

            if is_scraper_api_error:
                return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

            return ApiHttpClientError(**args)

        elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR:
            if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
                if http_code >= 500:
                    return UpstreamHttpServerError(**args)

                if http_code >= 400:
                    return UpstreamHttpClientError(**args)

            if resource in ErrorFactory.RESOURCE_TO_ERROR:
                return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

            return ScrapflyError(**args)

Class variables

var HTTP_STATUS_TO_ERROR
var RESOURCE_TO_ERROR

Static methods

def create(api_response: ScrapeApiResponse)
Expand source code
@staticmethod
def create(api_response: 'ScrapeApiResponse'):
    is_retryable = False
    kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR
    http_code = api_response.status_code
    retry_delay = 5
    retry_times = 3
    description = None
    error_url = 'https://scrapfly.io/docs/scrape-api/errors#api'
    code = api_response.error['code']

    if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
        http_code = api_response.scrape_result['status_code']

    if 'description' in api_response.error:
        description = api_response.error['description']

    message = '%s %s %s' % (str(http_code), code, api_response.error['message'])

    if 'doc_url' in api_response.error:
        error_url = api_response.error['doc_url']

    if 'retryable' in api_response.error:
        is_retryable = api_response.error['retryable']

    resource = ErrorFactory._get_resource(code=code)

    if is_retryable is True:
        if 'X-Retry' in api_response.headers:
            retry_delay = int(api_response.headers['Retry-After'])

    message = '%s: %s' % (message, description) if description else message

    if retry_delay is not None and is_retryable is True:
        message = '%s. Retry delay : %s seconds' % (message, str(retry_delay))

    args = {
        'message': message,
        'code': code,
        'http_status_code': http_code,
        'is_retryable': is_retryable,
        'api_response': api_response,
        'resource': resource,
        'retry_delay': retry_delay,
        'retry_times': retry_times,
        'documentation_url': error_url,
        'request': api_response.request,
        'response': api_response.response
    }

    if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE:
        if http_code >= 500:
            return ApiHttpServerError(**args)

        is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR

        if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error:
            return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args)

        if is_scraper_api_error:
            return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

        return ApiHttpClientError(**args)

    elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR:
        if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
            if http_code >= 500:
                return UpstreamHttpServerError(**args)

            if http_code >= 400:
                return UpstreamHttpClientError(**args)

        if resource in ErrorFactory.RESOURCE_TO_ERROR:
            return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

        return ScrapflyError(**args)
class ExtractionAPIError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ExtractionAPIError(HttpError):
    pass

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ExtractionApiResponse (request: requests.models.Request, response: requests.models.Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None)
Expand source code
class ExtractionApiResponse(ApiResponse):
    def __init__(self, request: Request, response: Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None):
        super().__init__(request, response)
        self.extraction_config = extraction_config
        self.result = self.handle_api_result(api_result)

    @property
    def extraction_result(self) -> Optional[Dict]:
        extraction_result = self.result.get('result', None)
        if not extraction_result:  # handle empty extraction responses
            return {'data': None, 'content_type': None}
        else:
            return extraction_result

    @property
    def data(self) -> Union[Dict, List, str]:  # depends on the LLM prompt
        if self.error is None:
            return self.extraction_result['data']

        return None

    @property
    def content_type(self) -> Optional[str]:
        if self.error is None:
            return self.extraction_result['content_type']

        return None

    @property
    def extraction_success(self) -> bool:
        extraction_result = self.extraction_result
        if extraction_result is None or extraction_result['data'] is None:
            return False

        return True

    @property
    def error(self) -> Optional[Dict]:
        if self.extraction_result is None:
            return self.result

        return None

    def _is_api_error(self, api_result: Dict) -> bool:
        if api_result is None:
            return True

        return 'error_id' in api_result

    def handle_api_result(self, api_result: bytes) -> FrozenDict:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        return FrozenDict({'result': api_result})

    def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError):
        super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Ancestors

Instance variables

var content_type : Optional[str]
Expand source code
@property
def content_type(self) -> Optional[str]:
    if self.error is None:
        return self.extraction_result['content_type']

    return None
var data : Union[Dict, List, str]
Expand source code
@property
def data(self) -> Union[Dict, List, str]:  # depends on the LLM prompt
    if self.error is None:
        return self.extraction_result['data']

    return None
var error : Optional[Dict]
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.extraction_result is None:
        return self.result

    return None
var extraction_result : Optional[Dict]
Expand source code
@property
def extraction_result(self) -> Optional[Dict]:
    extraction_result = self.result.get('result', None)
    if not extraction_result:  # handle empty extraction responses
        return {'data': None, 'content_type': None}
    else:
        return extraction_result
var extraction_success : bool
Expand source code
@property
def extraction_success(self) -> bool:
    extraction_result = self.extraction_result
    if extraction_result is None or extraction_result['data'] is None:
        return False

    return True

Methods

def handle_api_result(self, api_result: bytes) ‑> FrozenDict
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict:
    if self._is_api_error(api_result=api_result) is True:
        return FrozenDict(api_result)

    return FrozenDict({'result': api_result})
def raise_for_result(self, raise_on_upstream_error=True, error_class=scrapfly.errors.ExtractionAPIError)
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError):
    super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Inherited members

class ExtractionConfig (body: str, content_type: str, url: Optional[str] = None, charset: Optional[str] = None, extraction_template: Optional[str] = None, extraction_ephemeral_template: Optional[Dict] = None, extraction_prompt: Optional[str] = None, extraction_model: Optional[str] = None, is_document_compressed: Optional[bool] = None, document_compression_format: Optional[CompressionFormat] = None, webhook: Optional[str] = None, raise_on_upstream_error: bool = True, template: Optional[str] = None, ephemeral_template: Optional[Dict] = None)
Expand source code
class ExtractionConfig(BaseApiConfig):
    body: str
    content_type: str
    url: Optional[str] = None
    charset: Optional[str] = None
    extraction_template: Optional[str] = None  # a saved template name
    extraction_ephemeral_template: Optional[Dict]  # ephemeraly declared json template
    extraction_prompt: Optional[str] = None
    extraction_model: Optional[str] = None
    is_document_compressed: Optional[bool] = None
    document_compression_format: Optional[CompressionFormat] = None
    webhook: Optional[str] = None
    raise_on_upstream_error: bool = True

    # deprecated options
    template: Optional[str] = None
    ephemeral_template: Optional[Dict] = None

    def __init__(
        self,
        body: str,
        content_type: str,
        url: Optional[str] = None,
        charset: Optional[str] = None,
        extraction_template: Optional[str] = None,  # a saved template name
        extraction_ephemeral_template: Optional[Dict] = None,  # ephemeraly declared json template
        extraction_prompt: Optional[str] = None,
        extraction_model: Optional[str] = None,
        is_document_compressed: Optional[bool] = None,
        document_compression_format: Optional[CompressionFormat] = None,
        webhook: Optional[str] = None,
        raise_on_upstream_error: bool = True,

        # deprecated options
        template: Optional[str] = None,
        ephemeral_template: Optional[Dict] = None     
    ):
        if template:
            print("WARNGING")
            warnings.warn(
                "Deprecation warning: 'template' is deprecated. Use 'extraction_template' instead."
            )
            extraction_template = template

        if ephemeral_template:
            warnings.warn(
                "Deprecation warning: 'ephemeral_template' is deprecated. Use 'extraction_ephemeral_template' instead."
            )
            extraction_ephemeral_template = ephemeral_template

        self.key = None
        self.body = body
        self.content_type = content_type
        self.url = url
        self.charset = charset
        self.extraction_template = extraction_template
        self.extraction_ephemeral_template = extraction_ephemeral_template
        self.extraction_prompt = extraction_prompt
        self.extraction_model = extraction_model
        self.is_document_compressed = is_document_compressed
        self.document_compression_format = document_compression_format
        self.webhook = webhook
        self.raise_on_upstream_error = raise_on_upstream_error

        if self.document_compression_format is not None:
            if self.is_document_compressed is None:
                raise ExtractionConfigError(
                    'When declaring compression format, your must declare the is_document_compressed parameter to compress the document or skip it.'
                )
            if self.is_document_compressed is False:
                if self.document_compression_format == CompressionFormat.GZIP:
                    import gzip
                    self.body = gzip.compress(bytes(self.body, 'utf-8'))
                else:
                    raise ExtractionConfigError(
                        f'Auto compression for {self.document_compression_format.value} format is not available. You can manually compress to {self.document_compression_format.value} or choose the gzip format for auto compression.'
                    )

    def to_api_params(self, key: str) -> Dict:
        params = {
            'key': self.key or key,
            'content_type': self.content_type
        }

        if self.url:
            params['url'] = self.url

        if self.charset:
            params['charset'] = self.charset

        if self.extraction_template and self.extraction_ephemeral_template:
            raise ExtractionConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

        if self.extraction_template:
            params['extraction_template'] = self.extraction_template

        if self.extraction_ephemeral_template:
            self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
            params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

        if self.extraction_prompt:
            params['extraction_prompt'] = quote_plus(self.extraction_prompt)

        if self.extraction_model:
            params['extraction_model'] = self.extraction_model

        if self.webhook:
            params['webhook_name'] = self.webhook

        return params

Ancestors

Class variables

var body : str
var charset : Optional[str]
var content_type : str
var document_compression_format : Optional[CompressionFormat]
var ephemeral_template : Optional[Dict]
var extraction_ephemeral_template : Optional[Dict]
var extraction_model : Optional[str]
var extraction_prompt : Optional[str]
var extraction_template : Optional[str]
var is_document_compressed : Optional[bool]
var raise_on_upstream_error : bool
var template : Optional[str]
var url : Optional[str]
var webhook : Optional[str]

Methods

def to_api_params(self, key: str) ‑> Dict
Expand source code
def to_api_params(self, key: str) -> Dict:
    params = {
        'key': self.key or key,
        'content_type': self.content_type
    }

    if self.url:
        params['url'] = self.url

    if self.charset:
        params['charset'] = self.charset

    if self.extraction_template and self.extraction_ephemeral_template:
        raise ExtractionConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

    if self.extraction_template:
        params['extraction_template'] = self.extraction_template

    if self.extraction_ephemeral_template:
        self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
        params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

    if self.extraction_prompt:
        params['extraction_prompt'] = quote_plus(self.extraction_prompt)

    if self.extraction_model:
        params['extraction_model'] = self.extraction_model

    if self.webhook:
        params['webhook_name'] = self.webhook

    return params
class HttpError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class HttpError(ScrapflyError):

    def __init__(self, request:Request, response:Optional[Response]=None, **kwargs):
        self.request = request
        self.response = response
        super().__init__(**kwargs)

    def __str__(self) -> str:
        if isinstance(self, UpstreamHttpError):
            return f"Target website responded with {self.api_response.scrape_result['status_code']} - {self.api_response.scrape_result['reason']}"

        if self.api_response is not None:
            return self.api_response.error_message

        text = f"{self.response.status_code} - {self.response.reason}"

        if isinstance(self, (ApiHttpClientError, ApiHttpServerError)):
            text += " - " + self.message

        return text

Ancestors

Subclasses

  • ApiHttpClientError
  • scrapfly.errors.ExtractionAPIError
  • scrapfly.errors.QuotaLimitReached
  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.ScreenshotAPIError
  • scrapfly.errors.TooManyConcurrentRequest
  • scrapfly.errors.UpstreamHttpError
class ResponseBodyHandler (use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None)
Expand source code
class ResponseBodyHandler:

    SUPPORTED_COMPRESSION = ['gzip', 'deflate']
    SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json']

    class JSONDateTimeDecoder(JSONDecoder):
        def __init__(self, *args, **kargs):
            JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs)

    # brotli under perform at same gzip level and upper level destroy the cpu so
    # the trade off do not worth it for most of usage
    def __init__(self, use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None):
        if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION:
            try:
                try:
                    import brotlicffi as brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
                except ImportError:
                    import brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
            except ImportError:
                pass

        try:
            import zstd
            self.SUPPORTED_COMPRESSION.append('zstd')
        except ImportError:
            pass

        self.content_encoding: str = ', '.join(self.SUPPORTED_COMPRESSION)
        self._signing_secret: Optional[Tuple[str]] = None

        if signing_secrets:
            _secrets = set()

            for signing_secret in signing_secrets:
                _secrets.add(binascii.unhexlify(signing_secret))

            self._signing_secret = tuple(_secrets)

        try:  # automatically use msgpack if available https://msgpack.org/
            import msgpack
            self.accept = 'application/msgpack;charset=utf-8'
            self.content_type = 'application/msgpack;charset=utf-8'
            self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)
        except ImportError:
            self.accept = 'application/json;charset=utf-8'
            self.content_type = 'application/json;charset=utf-8'
            self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder)

    def support(self, headers: Dict) -> bool:
        if 'content-type' not in headers:
            return False

        for content_type in self.SUPPORTED_CONTENT_TYPES:
            if headers['content-type'].find(content_type) != -1:
                return True

        return False

    def verify(self, message: bytes, signature: str) -> bool:
        for signing_secret in self._signing_secret:
            if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature:
                return True

        return False

    def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict:
        if content_encoding == 'gzip' or content_encoding == 'gz':
            import gzip
            content = gzip.decompress(content)
        elif content_encoding == 'deflate':
            import zlib
            content = zlib.decompress(content)
        elif content_encoding == 'brotli' or content_encoding == 'br':
            import brotli
            content = brotli.decompress(content)
        elif content_encoding == 'zstd':
            import zstd
            content = zstd.decompress(content)

        if self._signing_secret is not None and signature is not None:
            if not self.verify(content, signature):
                raise WebhookSignatureMissMatch()

        if content_type.startswith('application/json'):
            content = loads(content, cls=self.JSONDateTimeDecoder)
        elif content_type.startswith('application/msgpack'):
            import msgpack
            content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False)

        return content

    def __call__(self, content: bytes, content_type: str) -> Union[str, Dict]:
        content_loader = None

        if content_type.find('application/json') != -1:
            content_loader = partial(loads, cls=self.JSONDateTimeDecoder)
        elif content_type.find('application/msgpack') != -1:
            import msgpack
            content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)

        if content_loader is None:
            raise Exception('Unsupported content type')

        try:
            return content_loader(content)
        except Exception as e:
            try:
                raise EncoderError(content=content.decode('utf-8')) from e
            except UnicodeError:
                raise EncoderError(content=base64.b64encode(content).decode('utf-8')) from e

Class variables

var JSONDateTimeDecoder

Simple JSON http://json.org decoder

Performs the following translations in decoding by default:

+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+

It also understands NaN, Infinity, and -Infinity as their corresponding float values, which is outside the JSON spec.

var SUPPORTED_COMPRESSION
var SUPPORTED_CONTENT_TYPES

Methods

def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) ‑> Dict
Expand source code
def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict:
    if content_encoding == 'gzip' or content_encoding == 'gz':
        import gzip
        content = gzip.decompress(content)
    elif content_encoding == 'deflate':
        import zlib
        content = zlib.decompress(content)
    elif content_encoding == 'brotli' or content_encoding == 'br':
        import brotli
        content = brotli.decompress(content)
    elif content_encoding == 'zstd':
        import zstd
        content = zstd.decompress(content)

    if self._signing_secret is not None and signature is not None:
        if not self.verify(content, signature):
            raise WebhookSignatureMissMatch()

    if content_type.startswith('application/json'):
        content = loads(content, cls=self.JSONDateTimeDecoder)
    elif content_type.startswith('application/msgpack'):
        import msgpack
        content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False)

    return content
def support(self, headers: Dict) ‑> bool
Expand source code
def support(self, headers: Dict) -> bool:
    if 'content-type' not in headers:
        return False

    for content_type in self.SUPPORTED_CONTENT_TYPES:
        if headers['content-type'].find(content_type) != -1:
            return True

    return False
def verify(self, message: bytes, signature: str) ‑> bool
Expand source code
def verify(self, message: bytes, signature: str) -> bool:
    for signing_secret in self._signing_secret:
        if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature:
            return True

    return False
class ScrapeApiResponse (request: requests.models.Request, response: requests.models.Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler: Optional[Callable] = None)
Expand source code
class ScrapeApiResponse(ApiResponse):
    scrape_config:ScrapeConfig
    large_object_handler:Callable

    def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler:Optional[Callable]=None):
        super().__init__(request, response)
        self.scrape_config = scrape_config
        self.large_object_handler = large_object_handler

        if self.scrape_config.method == 'HEAD':
            api_result = {
                'result': {
                    'request_headers': {},
                    'status': 'DONE',
                    'success': 200 >= self.response.status_code < 300,
                    'response_headers': self.response.headers,
                    'status_code': self.response.status_code,
                    'reason': self.response.reason,
                    'format': 'text',
                    'content': ''
                },
                'context': {},
                'config': self.scrape_config.__dict__
            }

            if 'X-Scrapfly-Reject-Code' in self.response.headers:
                api_result['result']['error'] = {
                    'code': self.response.headers['X-Scrapfly-Reject-Code'],
                    'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']),
                    'message': self.response.headers['X-Scrapfly-Reject-Description'],
                    'error_id': self.response.headers['X-Scrapfly-Reject-ID'],
                    'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False,
                    'doc_url': '',
                    'links': {}
                }

                if 'X-Scrapfly-Reject-Doc' in self.response.headers:
                    api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc']
                    api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc']

        if isinstance(api_result, str):
            raise HttpError(
                request=request,
                response=response,
                message='Bad gateway',
                code=502,
                http_status_code=502,
                is_retryable=True
            )

        self.result = self.handle_api_result(api_result=api_result)

    @property
    def scrape_result(self) -> Optional[Dict]:
        return self.result.get('result', None)

    @property
    def scrape_result(self) -> Optional[Dict]:
        return self.result.get('result', None)

    @property
    def config(self) -> Optional[Dict]:
        if self.scrape_result is None:
            return None

        return self.result['config']

    @property
    def context(self) -> Optional[Dict]:
        if self.scrape_result is None:
            return None

        return self.result['context']

    @property
    def content(self) -> str:
        if self.scrape_result is None:
            return ''

        return self.scrape_result['content']

    @property
    def success(self) -> bool:
        """
            /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
        """
        return 200 >= self.response.status_code <= 299

    @property
    def scrape_success(self) -> bool:
        scrape_result = self.scrape_result

        if not scrape_result:
            return False

        return self.scrape_result['success']

    @property
    def error(self) -> Optional[Dict]:
        if self.scrape_result is None:
            return None

        if self.scrape_success is False:
            return self.scrape_result['error']

    @property
    def upstream_status_code(self) -> Optional[int]:
        if self.scrape_result is None:
            return None

        if 'status_code' in self.scrape_result:
            return self.scrape_result['status_code']

        return None

    @cached_property
    def soup(self) -> 'BeautifulSoup':
        if self.scrape_result['format'] != 'text':
            raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")

        try:
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(self.content, "lxml")
            return soup
        except ImportError as e:
            logger.error('You must install scrapfly[parser] to enable this feature')

    @cached_property
    def selector(self) -> 'Selector':
        if self.scrape_result['format'] != 'text':
            raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")

        try:
            from parsel import Selector
            return Selector(text=self.content)
        except ImportError as e:
            logger.error('You must install parsel or scrapy package to enable this feature')
            raise e

    def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        try:
            if isinstance(api_result['config']['headers'], list):
                api_result['config']['headers'] = {}
        except TypeError:
            logger.info(api_result)
            raise

        with suppress(KeyError):
            api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
            api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])

        if self.large_object_handler is not None and api_result['result']['content']:
            content_format = api_result['result']['format']

            if content_format in ['clob', 'blob']:
                api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format)
            elif content_format == 'binary' and isinstance(api_result['result']['content'], bytes):
                api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content']))

        return FrozenDict(api_result)

    def _is_api_error(self, api_result: Dict) -> bool:
        if self.scrape_config.method == 'HEAD':
            if 'X-Reject-Reason' in self.response.headers:
                return True
            return False

        if api_result is None:
            return True

        return 'error_id' in api_result

    def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
        if _class != Response:
            raise RuntimeError('only Response from requests package is supported at the moment')

        if self.result is None:
            return None

        if self.response.status_code != 200:
            return None

        response = Response()
        response.status_code = self.scrape_result['status_code']
        response.reason = self.scrape_result['reason']

        if self.scrape_result['content']:
            if isinstance(self.scrape_result['content'], BytesIO):
                response._content = self.scrape_result['content'].getvalue()
            elif isinstance(self.scrape_result['content'], bytes):
                response._content = self.scrape_result['content']
            elif isinstance(self.scrape_result['content'], str):
                response._content = self.scrape_result['content'].encode('utf-8')
        else:
            response._content = None

        response.headers.update(self.scrape_result['response_headers'])
        response.url = self.scrape_result['url']

        response.request = Request(
            method=self.config['method'],
            url=self.config['url'],
            headers=self.scrape_result['request_headers'],
            data=self.config['body'] if self.config['body'] else None
        )

        if 'set-cookie' in response.headers:
            for raw_cookie in response.headers['set-cookie']:
                for name, cookie in SimpleCookie(raw_cookie).items():
                    expires = cookie.get('expires')

                    if expires == '':
                        expires = None

                    if expires:
                        try:
                            expires = parse(expires).timestamp()
                        except ValueError:
                            expires = None

                    if type(expires) == str:
                        if '.' in expires:
                            expires = float(expires)
                        else:
                            expires = int(expires)

                    response.cookies.set_cookie(Cookie(
                        version=cookie.get('version') if cookie.get('version') else None,
                        name=name,
                        value=cookie.value,
                        path=cookie.get('path', ''),
                        expires=expires,
                        comment=cookie.get('comment'),
                        domain=cookie.get('domain', ''),
                        secure=cookie.get('secure'),
                        port=None,
                        port_specified=False,
                        domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
                        domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
                        path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
                        discard=False,
                        comment_url=None,
                        rest={
                            'httponly': cookie.get('httponly'),
                            'samesite': cookie.get('samesite'),
                            'max-age': cookie.get('max-age')
                        }
                    ))

        return response

    def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None):
        file_content = content or self.scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = self.scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = self.config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path is not None else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)

    def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError):
        super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
        if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
            error = ErrorFactory.create(api_response=self)
            if error:
                if isinstance(error, UpstreamHttpError):
                    if raise_on_upstream_error is True:
                        raise error
                else:
                    raise error

Ancestors

Class variables

var large_object_handler : Callable
var scrape_configScrapeConfig

Instance variables

var config : Optional[Dict]
Expand source code
@property
def config(self) -> Optional[Dict]:
    if self.scrape_result is None:
        return None

    return self.result['config']
var content : str
Expand source code
@property
def content(self) -> str:
    if self.scrape_result is None:
        return ''

    return self.scrape_result['content']
var context : Optional[Dict]
Expand source code
@property
def context(self) -> Optional[Dict]:
    if self.scrape_result is None:
        return None

    return self.result['context']
var error : Optional[Dict]
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.scrape_result is None:
        return None

    if self.scrape_success is False:
        return self.scrape_result['error']
var scrape_result : Optional[Dict]
Expand source code
@property
def scrape_result(self) -> Optional[Dict]:
    return self.result.get('result', None)
var scrape_success : bool
Expand source code
@property
def scrape_success(self) -> bool:
    scrape_result = self.scrape_result

    if not scrape_result:
        return False

    return self.scrape_result['success']
var selector
Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        with self.lock:
            # check if another thread filled cache while we awaited lock
            val = cache.get(self.attrname, _NOT_FOUND)
            if val is _NOT_FOUND:
                val = self.func(instance)
                try:
                    cache[self.attrname] = val
                except TypeError:
                    msg = (
                        f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                        f"does not support item assignment for caching {self.attrname!r} property."
                    )
                    raise TypeError(msg) from None
    return val
var soup
Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        with self.lock:
            # check if another thread filled cache while we awaited lock
            val = cache.get(self.attrname, _NOT_FOUND)
            if val is _NOT_FOUND:
                val = self.func(instance)
                try:
                    cache[self.attrname] = val
                except TypeError:
                    msg = (
                        f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                        f"does not support item assignment for caching {self.attrname!r} property."
                    )
                    raise TypeError(msg) from None
    return val
var success : bool

/!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code

Expand source code
@property
def success(self) -> bool:
    """
        /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
    """
    return 200 >= self.response.status_code <= 299
var upstream_status_code : Optional[int]
Expand source code
@property
def upstream_status_code(self) -> Optional[int]:
    if self.scrape_result is None:
        return None

    if 'status_code' in self.scrape_result:
        return self.scrape_result['status_code']

    return None

Methods

def handle_api_result(self, api_result: Dict) ‑> Optional[FrozenDict]
Expand source code
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
    if self._is_api_error(api_result=api_result) is True:
        return FrozenDict(api_result)

    try:
        if isinstance(api_result['config']['headers'], list):
            api_result['config']['headers'] = {}
    except TypeError:
        logger.info(api_result)
        raise

    with suppress(KeyError):
        api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
        api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])

    if self.large_object_handler is not None and api_result['result']['content']:
        content_format = api_result['result']['format']

        if content_format in ['clob', 'blob']:
            api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format)
        elif content_format == 'binary' and isinstance(api_result['result']['content'], bytes):
            api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content']))

    return FrozenDict(api_result)
def raise_for_result(self, raise_on_upstream_error=True, error_class=scrapfly.errors.ApiHttpClientError)
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError):
    super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
    if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
        error = ErrorFactory.create(api_response=self)
        if error:
            if isinstance(error, UpstreamHttpError):
                if raise_on_upstream_error is True:
                    raise error
            else:
                raise error
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None, content: Union[str, bytes, ForwardRef(None)] = None)
Expand source code
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None):
    file_content = content or self.scrape_result['content']
    file_path = None
    file_extension = None

    if name:
        name_parts = name.split('.')
        if len(name_parts) > 1:
            file_extension = name_parts[-1]

    if not file:
        if file_extension is None:
            try:
                mime_type = self.scrape_result['response_headers']['content-type']
            except KeyError:
                mime_type = 'application/octet-stream'

            if ';' in mime_type:
                mime_type = mime_type.split(';')[0]

            file_extension = '.' + mime_type.split('/')[1]

        if not name:
            name = self.config['url'].split('/')[-1]

        if name.find(file_extension) == -1:
            name += file_extension

        file_path = path + '/' + name if path is not None else name

        if file_path == file_extension:
            url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')

            if url[-1] == '-':
                url = url[:-1]

            url += file_extension

            file_path = url

        file = open(file_path, 'wb')

    if isinstance(file_content, str):
        file_content = BytesIO(file_content.encode('utf-8'))
    elif isinstance(file_content, bytes):
        file_content = BytesIO(file_content)

    file_content.seek(0)
    with file as f:
        shutil.copyfileobj(file_content, f, length=131072)

    logger.info('file %s created' % file_path)
def upstream_result_into_response(self) ‑> Optional[requests.models.Response]
Expand source code
def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
    if _class != Response:
        raise RuntimeError('only Response from requests package is supported at the moment')

    if self.result is None:
        return None

    if self.response.status_code != 200:
        return None

    response = Response()
    response.status_code = self.scrape_result['status_code']
    response.reason = self.scrape_result['reason']

    if self.scrape_result['content']:
        if isinstance(self.scrape_result['content'], BytesIO):
            response._content = self.scrape_result['content'].getvalue()
        elif isinstance(self.scrape_result['content'], bytes):
            response._content = self.scrape_result['content']
        elif isinstance(self.scrape_result['content'], str):
            response._content = self.scrape_result['content'].encode('utf-8')
    else:
        response._content = None

    response.headers.update(self.scrape_result['response_headers'])
    response.url = self.scrape_result['url']

    response.request = Request(
        method=self.config['method'],
        url=self.config['url'],
        headers=self.scrape_result['request_headers'],
        data=self.config['body'] if self.config['body'] else None
    )

    if 'set-cookie' in response.headers:
        for raw_cookie in response.headers['set-cookie']:
            for name, cookie in SimpleCookie(raw_cookie).items():
                expires = cookie.get('expires')

                if expires == '':
                    expires = None

                if expires:
                    try:
                        expires = parse(expires).timestamp()
                    except ValueError:
                        expires = None

                if type(expires) == str:
                    if '.' in expires:
                        expires = float(expires)
                    else:
                        expires = int(expires)

                response.cookies.set_cookie(Cookie(
                    version=cookie.get('version') if cookie.get('version') else None,
                    name=name,
                    value=cookie.value,
                    path=cookie.get('path', ''),
                    expires=expires,
                    comment=cookie.get('comment'),
                    domain=cookie.get('domain', ''),
                    secure=cookie.get('secure'),
                    port=None,
                    port_specified=False,
                    domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
                    domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
                    path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
                    discard=False,
                    comment_url=None,
                    rest={
                        'httponly': cookie.get('httponly'),
                        'samesite': cookie.get('samesite'),
                        'max-age': cookie.get('max-age')
                    }
                ))

    return response

Inherited members

class ScrapeConfig (url: str, retry: bool = True, method: str = 'GET', country: Optional[str] = None, render_js: bool = False, cache: bool = False, cache_clear: bool = False, ssl: bool = False, dns: bool = False, asp: bool = False, debug: bool = False, raise_on_upstream_error: bool = True, cache_ttl: Optional[int] = None, proxy_pool: Optional[str] = None, session: Optional[str] = None, tags: Union[List[str], Set[str], ForwardRef(None)] = None, format: Optional[Format] = None, format_options: Optional[List[FormatOption]] = None, extraction_template: Optional[str] = None, extraction_ephemeral_template: Optional[Dict] = None, extraction_prompt: Optional[str] = None, extraction_model: Optional[str] = None, correlation_id: Optional[str] = None, cookies: Optional[requests.structures.CaseInsensitiveDict] = None, body: Optional[str] = None, data: Optional[Dict] = None, headers: Union[requests.structures.CaseInsensitiveDict, Dict[str, str], ForwardRef(None)] = None, js: str = None, rendering_wait: int = None, wait_for_selector: Optional[str] = None, screenshots: Optional[Dict] = None, screenshot_flags: Optional[List[ScreenshotFlag]] = None, session_sticky_proxy: Optional[bool] = None, webhook: Optional[str] = None, timeout: Optional[int] = None, js_scenario: Optional[List] = None, extract: Optional[Dict] = None, os: Optional[str] = None, lang: Optional[List[str]] = None, auto_scroll: Optional[bool] = None, cost_budget: Optional[int] = None)
Expand source code
class ScrapeConfig(BaseApiConfig):

    PUBLIC_DATACENTER_POOL = 'public_datacenter_pool'
    PUBLIC_RESIDENTIAL_POOL = 'public_residential_pool'

    url: str
    retry: bool = True
    method: str = 'GET'
    country: Optional[str] = None
    render_js: bool = False
    cache: bool = False
    cache_clear:bool = False
    ssl:bool = False
    dns:bool = False
    asp:bool = False
    debug: bool = False
    raise_on_upstream_error:bool = True
    cache_ttl:Optional[int] = None
    proxy_pool:Optional[str] = None
    session: Optional[str] = None
    tags: Optional[List[str]] = None
    format: Optional[Format] = None, # raw(unchanged)
    format_options: Optional[List[FormatOption]] 
    extraction_template: Optional[str] = None  # a saved template name
    extraction_ephemeral_template: Optional[Dict]  # ephemeraly declared json template
    extraction_prompt: Optional[str] = None
    extraction_model: Optional[str] = None    
    correlation_id: Optional[str] = None
    cookies: Optional[CaseInsensitiveDict] = None
    body: Optional[str] = None
    data: Optional[Dict] = None
    headers: Optional[CaseInsensitiveDict] = None
    js: str = None
    rendering_wait: int = None
    wait_for_selector: Optional[str] = None
    session_sticky_proxy:bool = True
    screenshots:Optional[Dict]=None
    screenshot_flags: Optional[List[ScreenshotFlag]] = None,
    webhook:Optional[str]=None
    timeout:Optional[int]=None # in milliseconds
    js_scenario: Dict = None
    extract: Dict = None
    lang:Optional[List[str]] = None
    os:Optional[str] = None
    auto_scroll:Optional[bool] = None
    cost_budget:Optional[int] = None

    def __init__(
        self,
        url: str,
        retry: bool = True,
        method: str = 'GET',
        country: Optional[str] = None,
        render_js: bool = False,
        cache: bool = False,
        cache_clear:bool = False,
        ssl:bool = False,
        dns:bool = False,
        asp:bool = False,
        debug: bool = False,
        raise_on_upstream_error:bool = True,
        cache_ttl:Optional[int] = None,
        proxy_pool:Optional[str] = None,
        session: Optional[str] = None,
        tags: Optional[Union[List[str], Set[str]]] = None,
        format: Optional[Format] = None, # raw(unchanged)
        format_options: Optional[List[FormatOption]] = None, # raw(unchanged)
        extraction_template: Optional[str] = None,  # a saved template name
        extraction_ephemeral_template: Optional[Dict] = None,  # ephemeraly declared json template
        extraction_prompt: Optional[str] = None,
        extraction_model: Optional[str] = None,        
        correlation_id: Optional[str] = None,
        cookies: Optional[CaseInsensitiveDict] = None,
        body: Optional[str] = None,
        data: Optional[Dict] = None,
        headers: Optional[Union[CaseInsensitiveDict, Dict[str, str]]] = None,
        js: str = None,
        rendering_wait: int = None,
        wait_for_selector: Optional[str] = None,
        screenshots:Optional[Dict]=None,
        screenshot_flags: Optional[List[ScreenshotFlag]] = None,
        session_sticky_proxy:Optional[bool] = None,
        webhook:Optional[str] = None,
        timeout:Optional[int] = None, # in milliseconds
        js_scenario:Optional[List] = None,
        extract:Optional[Dict] = None,
        os:Optional[str] = None,
        lang:Optional[List[str]] = None,
        auto_scroll:Optional[bool] = None,
        cost_budget:Optional[int] = None
    ):
        assert(type(url) is str)

        if isinstance(tags, List):
            tags = set(tags)

        cookies = cookies or {}
        headers = headers or {}

        self.cookies = CaseInsensitiveDict(cookies)
        self.headers = CaseInsensitiveDict(headers)
        self.url = url
        self.retry = retry
        self.method = method
        self.country = country
        self.session_sticky_proxy = session_sticky_proxy
        self.render_js = render_js
        self.cache = cache
        self.cache_clear = cache_clear
        self.asp = asp
        self.webhook = webhook
        self.session = session
        self.debug = debug
        self.cache_ttl = cache_ttl
        self.proxy_pool = proxy_pool
        self.tags = tags or set()
        self.format = format
        self.format_options = format_options
        self.extraction_template = extraction_template
        self.extraction_ephemeral_template = extraction_ephemeral_template
        self.extraction_prompt = extraction_prompt
        self.extraction_model = extraction_model        
        self.correlation_id = correlation_id
        self.wait_for_selector = wait_for_selector
        self.body = body
        self.data = data
        self.js = js
        self.rendering_wait = rendering_wait
        self.raise_on_upstream_error = raise_on_upstream_error
        self.screenshots = screenshots
        self.screenshot_flags = screenshot_flags
        self.key = None
        self.dns = dns
        self.ssl = ssl
        self.js_scenario = js_scenario
        self.timeout = timeout
        self.extract = extract
        self.lang = lang
        self.os = os
        self.auto_scroll = auto_scroll
        self.cost_budget = cost_budget

        if cookies:
            _cookies = []

            for name, value in cookies.items():
                _cookies.append(name + '=' + value)

            if 'cookie' in self.headers:
                if self.headers['cookie'][-1] != ';':
                    self.headers['cookie'] += ';'
            else:
                self.headers['cookie'] = ''

            self.headers['cookie'] += '; '.join(_cookies)

        if self.body and self.data:
            raise ScrapeConfigError('You cannot pass both parameters body and data. You must choose')

        if method in ['POST', 'PUT', 'PATCH']:
            if self.body is None and self.data is not None:
                if 'content-type' not in self.headers:
                    self.headers['content-type'] = 'application/x-www-form-urlencoded'
                    self.body = urlencode(data)
                else:
                    if self.headers['content-type'].find('application/json') != -1:
                        self.body = json.dumps(data)
                    elif self.headers['content-type'].find('application/x-www-form-urlencoded') != -1:
                        self.body = urlencode(data)
                    else:
                        raise ScrapeConfigError('Content-Type "%s" not supported, use body parameter to pass pre encoded body according to your content type' % self.headers['content-type'])
            elif self.body is None and self.data is None:
                self.headers['content-type'] = 'text/plain'

    def to_api_params(self, key:str) -> Dict:
        params = {
            'key': self.key or key,
            'url': self.url
        }

        if self.country is not None:
            params['country'] = self.country

        for name, value in self.headers.items():
            params['headers[%s]' % name] = value

        if self.webhook is not None:
            params['webhook_name'] = self.webhook

        if self.timeout is not None:
            params['timeout'] = self.timeout

        if self.extract is not None:
            params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8')

        if self.cost_budget is not None:
            params['cost_budget'] = self.cost_budget

        if self.render_js is True:
            params['render_js'] = self._bool_to_http(self.render_js)

            if self.wait_for_selector is not None:
                params['wait_for_selector'] = self.wait_for_selector

            if self.js:
                params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

            if self.js_scenario:
                params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8')

            if self.rendering_wait:
                params['rendering_wait'] = self.rendering_wait

            if self.screenshots is not None:
                for name, element in self.screenshots.items():
                    params['screenshots[%s]' % name] = element

            if self.screenshot_flags is not None:
                self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags]
                params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags)
            else:
                if self.screenshot_flags is not None:
                    logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled')

            if self.auto_scroll is True:
                params['auto_scroll'] = self._bool_to_http(self.auto_scroll)
        else:
            if self.wait_for_selector is not None:
                logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled')

            if self.screenshots:
                logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled')

            if self.js_scenario:
                logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled')

            if self.js:
                logging.warning('Params "js" is ignored. Works only if render_js is enabled')

            if self.rendering_wait:
                logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled')

        if self.asp is True:
            params['asp'] = self._bool_to_http(self.asp)

        if self.retry is False:
            params['retry'] = self._bool_to_http(self.retry)

        if self.cache is True:
            params['cache'] = self._bool_to_http(self.cache)

            if self.cache_clear is True:
                params['cache_clear'] = self._bool_to_http(self.cache_clear)

            if self.cache_ttl is not None:
                params['cache_ttl'] = self.cache_ttl
        else:
            if self.cache_clear is True:
                logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

            if self.cache_ttl is not None:
                logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

        if self.dns is True:
            params['dns'] = self._bool_to_http(self.dns)

        if self.ssl is True:
            params['ssl'] = self._bool_to_http(self.ssl)

        if self.tags:
            params['tags'] = ','.join(self.tags)

        if self.format:
            params['format'] = Format(self.format).value
            if self.format_options:
                params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options)

        if self.extraction_template and self.extraction_ephemeral_template:
            raise ScrapeConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

        if self.extraction_template:
            params['extraction_template'] = self.extraction_template

        if self.extraction_ephemeral_template:
            self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
            params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

        if self.extraction_prompt:
            params['extraction_prompt'] = quote_plus(self.extraction_prompt)

        if self.extraction_model:
            params['extraction_model'] = self.extraction_model

        if self.correlation_id:
            params['correlation_id'] = self.correlation_id

        if self.session:
            params['session'] = self.session

            if self.session_sticky_proxy is True: # false by default
                params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy)
        else:
            if self.session_sticky_proxy:
                logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled')

        if self.debug is True:
            params['debug'] = self._bool_to_http(self.debug)

        if self.proxy_pool is not None:
            params['proxy_pool'] = self.proxy_pool

        if self.lang is not None:
            params['lang'] = ','.join(self.lang)

        if self.os is not None:
            params['os'] = self.os

        return params

    @staticmethod
    def from_exported_config(config:str) -> 'ScrapeConfig':
        try:
            from msgpack import loads as msgpack_loads
        except ImportError as e:
            print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack')
            raise

        data = msgpack_loads(base64.b64decode(config))

        headers = {}

        for name, value in data['headers'].items():
            if isinstance(value, Iterable):
                headers[name] = '; '.join(value)
            else:
                headers[name] = value

        return ScrapeConfig(
            url=data['url'],
            retry=data['retry'],
            headers=headers,
            session=data['session'],
            session_sticky_proxy=data['session_sticky_proxy'],
            cache=data['cache'],
            cache_ttl=data['cache_ttl'],
            cache_clear=data['cache_clear'],
            render_js=data['render_js'],
            method=data['method'],
            asp=data['asp'],
            body=data['body'],
            ssl=data['ssl'],
            dns=data['dns'],
            country=data['country'],
            debug=data['debug'],
            correlation_id=data['correlation_id'],
            tags=data['tags'],
            format=data['format'],
            js=data['js'],
            rendering_wait=data['rendering_wait'],
            screenshots=data['screenshots'] or {},
            screenshot_flags=data['screenshot_flags'],
            proxy_pool=data['proxy_pool'],
            auto_scroll=data['auto_scroll'],
            cost_budget=data['cost_budget']
        )

Ancestors

Class variables

var PUBLIC_DATACENTER_POOL
var PUBLIC_RESIDENTIAL_POOL
var asp : bool
var auto_scroll : Optional[bool]
var body : Optional[str]
var cache : bool
var cache_clear : bool
var cache_ttl : Optional[int]
var cookies : Optional[requests.structures.CaseInsensitiveDict]
var correlation_id : Optional[str]
var cost_budget : Optional[int]
var country : Optional[str]
var data : Optional[Dict]
var debug : bool
var dns : bool
var extract : Dict
var extraction_ephemeral_template : Optional[Dict]
var extraction_model : Optional[str]
var extraction_prompt : Optional[str]
var extraction_template : Optional[str]
var format : Optional[Format]
var format_options : Optional[List[FormatOption]]
var headers : Optional[requests.structures.CaseInsensitiveDict]
var js : str
var js_scenario : Dict
var lang : Optional[List[str]]
var method : str
var os : Optional[str]
var proxy_pool : Optional[str]
var raise_on_upstream_error : bool
var render_js : bool
var rendering_wait : int
var retry : bool
var screenshot_flags : Optional[List[ScreenshotFlag]]
var screenshots : Optional[Dict]
var session : Optional[str]
var session_sticky_proxy : bool
var ssl : bool
var tags : Optional[List[str]]
var timeout : Optional[int]
var url : str
var wait_for_selector : Optional[str]
var webhook : Optional[str]

Static methods

def from_exported_config(config: str) ‑> ScrapeConfig
Expand source code
@staticmethod
def from_exported_config(config:str) -> 'ScrapeConfig':
    try:
        from msgpack import loads as msgpack_loads
    except ImportError as e:
        print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack')
        raise

    data = msgpack_loads(base64.b64decode(config))

    headers = {}

    for name, value in data['headers'].items():
        if isinstance(value, Iterable):
            headers[name] = '; '.join(value)
        else:
            headers[name] = value

    return ScrapeConfig(
        url=data['url'],
        retry=data['retry'],
        headers=headers,
        session=data['session'],
        session_sticky_proxy=data['session_sticky_proxy'],
        cache=data['cache'],
        cache_ttl=data['cache_ttl'],
        cache_clear=data['cache_clear'],
        render_js=data['render_js'],
        method=data['method'],
        asp=data['asp'],
        body=data['body'],
        ssl=data['ssl'],
        dns=data['dns'],
        country=data['country'],
        debug=data['debug'],
        correlation_id=data['correlation_id'],
        tags=data['tags'],
        format=data['format'],
        js=data['js'],
        rendering_wait=data['rendering_wait'],
        screenshots=data['screenshots'] or {},
        screenshot_flags=data['screenshot_flags'],
        proxy_pool=data['proxy_pool'],
        auto_scroll=data['auto_scroll'],
        cost_budget=data['cost_budget']
    )

Methods

def to_api_params(self, key: str) ‑> Dict
Expand source code
def to_api_params(self, key:str) -> Dict:
    params = {
        'key': self.key or key,
        'url': self.url
    }

    if self.country is not None:
        params['country'] = self.country

    for name, value in self.headers.items():
        params['headers[%s]' % name] = value

    if self.webhook is not None:
        params['webhook_name'] = self.webhook

    if self.timeout is not None:
        params['timeout'] = self.timeout

    if self.extract is not None:
        params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8')

    if self.cost_budget is not None:
        params['cost_budget'] = self.cost_budget

    if self.render_js is True:
        params['render_js'] = self._bool_to_http(self.render_js)

        if self.wait_for_selector is not None:
            params['wait_for_selector'] = self.wait_for_selector

        if self.js:
            params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

        if self.js_scenario:
            params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8')

        if self.rendering_wait:
            params['rendering_wait'] = self.rendering_wait

        if self.screenshots is not None:
            for name, element in self.screenshots.items():
                params['screenshots[%s]' % name] = element

        if self.screenshot_flags is not None:
            self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags]
            params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags)
        else:
            if self.screenshot_flags is not None:
                logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled')

        if self.auto_scroll is True:
            params['auto_scroll'] = self._bool_to_http(self.auto_scroll)
    else:
        if self.wait_for_selector is not None:
            logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled')

        if self.screenshots:
            logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled')

        if self.js_scenario:
            logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled')

        if self.js:
            logging.warning('Params "js" is ignored. Works only if render_js is enabled')

        if self.rendering_wait:
            logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled')

    if self.asp is True:
        params['asp'] = self._bool_to_http(self.asp)

    if self.retry is False:
        params['retry'] = self._bool_to_http(self.retry)

    if self.cache is True:
        params['cache'] = self._bool_to_http(self.cache)

        if self.cache_clear is True:
            params['cache_clear'] = self._bool_to_http(self.cache_clear)

        if self.cache_ttl is not None:
            params['cache_ttl'] = self.cache_ttl
    else:
        if self.cache_clear is True:
            logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

        if self.cache_ttl is not None:
            logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

    if self.dns is True:
        params['dns'] = self._bool_to_http(self.dns)

    if self.ssl is True:
        params['ssl'] = self._bool_to_http(self.ssl)

    if self.tags:
        params['tags'] = ','.join(self.tags)

    if self.format:
        params['format'] = Format(self.format).value
        if self.format_options:
            params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options)

    if self.extraction_template and self.extraction_ephemeral_template:
        raise ScrapeConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

    if self.extraction_template:
        params['extraction_template'] = self.extraction_template

    if self.extraction_ephemeral_template:
        self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
        params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

    if self.extraction_prompt:
        params['extraction_prompt'] = quote_plus(self.extraction_prompt)

    if self.extraction_model:
        params['extraction_model'] = self.extraction_model

    if self.correlation_id:
        params['correlation_id'] = self.correlation_id

    if self.session:
        params['session'] = self.session

        if self.session_sticky_proxy is True: # false by default
            params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy)
    else:
        if self.session_sticky_proxy:
            logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled')

    if self.debug is True:
        params['debug'] = self._bool_to_http(self.debug)

    if self.proxy_pool is not None:
        params['proxy_pool'] = self.proxy_pool

    if self.lang is not None:
        params['lang'] = ','.join(self.lang)

    if self.os is not None:
        params['os'] = self.os

    return params
class ScraperAPI
Expand source code
class ScraperAPI:

    MONITORING_DATA_FORMAT_STRUCTURED = 'structured'
    MONITORING_DATA_FORMAT_PROMETHEUS = 'prometheus'

    MONITORING_PERIOD_SUBSCRIPTION = 'subscription'
    MONITORING_PERIOD_LAST_7D = 'last7d'
    MONITORING_PERIOD_LAST_24H = 'last24h'
    MONITORING_PERIOD_LAST_1H = 'last1h'
    MONITORING_PERIOD_LAST_5m = 'last5m'

    MONITORING_ACCOUNT_AGGREGATION = 'account'
    MONITORING_PROJECT_AGGREGATION = 'project'
    MONITORING_TARGET_AGGREGATION = 'target'

Class variables

var MONITORING_ACCOUNT_AGGREGATION
var MONITORING_DATA_FORMAT_PROMETHEUS
var MONITORING_DATA_FORMAT_STRUCTURED
var MONITORING_PERIOD_LAST_1H
var MONITORING_PERIOD_LAST_24H
var MONITORING_PERIOD_LAST_5m
var MONITORING_PERIOD_LAST_7D
var MONITORING_PERIOD_SUBSCRIPTION
var MONITORING_PROJECT_AGGREGATION
var MONITORING_TARGET_AGGREGATION
class ScrapflyAspError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyAspError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyClient (key: str, host: Optional[str] = 'https://api.scrapfly.io', verify=True, debug: bool = False, max_concurrency: int = 1, connect_timeout: int = 30, web_scraping_api_read_timeout: int = 160, extraction_api_read_timeout: int = 35, screenshot_api_read_timeout: int = 60, read_timeout: int = 30, default_read_timeout: int = 30, reporter: Optional[Callable] = None, **kwargs)
Expand source code
class ScrapflyClient:

    HOST = 'https://api.scrapfly.io'
    DEFAULT_CONNECT_TIMEOUT = 30
    DEFAULT_READ_TIMEOUT = 30

    DEFAULT_WEBSCRAPING_API_READ_TIMEOUT = 160 # 155 real
    DEFAULT_SCREENSHOT_API_READ_TIMEOUT = 60  # 30 real
    DEFAULT_EXTRACTION_API_READ_TIMEOUT = 35 # 30 real

    host:str
    key:str
    max_concurrency:int
    verify:bool
    debug:bool
    distributed_mode:bool
    connect_timeout:int
    web_scraping_api_read_timeout:int
    screenshot_api_read_timeout:int
    extraction_api_read_timeout:int
    monitoring_api_read_timeout:int
    default_read_timeout:int
    brotli: bool
    reporter:Reporter
    version:str

    # @deprecated
    read_timeout:int

    CONCURRENCY_AUTO = 'auto' # retrieve the allowed concurrency from your account
    DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'

    def __init__(
        self,
        key: str,
        host: Optional[str] = HOST,
        verify=True,
        debug: bool = False,
        max_concurrency:int=1,
        connect_timeout:int = DEFAULT_CONNECT_TIMEOUT,
        web_scraping_api_read_timeout: int = DEFAULT_WEBSCRAPING_API_READ_TIMEOUT,
        extraction_api_read_timeout: int = DEFAULT_EXTRACTION_API_READ_TIMEOUT,
        screenshot_api_read_timeout: int = DEFAULT_SCREENSHOT_API_READ_TIMEOUT,

        # @deprecated
        read_timeout:int = DEFAULT_READ_TIMEOUT,
        default_read_timeout:int = DEFAULT_READ_TIMEOUT,
        reporter:Optional[Callable]=None,
        **kwargs
    ):
        if host[-1] == '/':  # remove last '/' if exists
            host = host[:-1]

        if 'distributed_mode' in kwargs:
            warnings.warn("distributed mode is deprecated and will be remove the next version -"
              " user should handle themself the session name based on the concurrency",
              DeprecationWarning,
              stacklevel=2
            )

        if 'brotli' in kwargs:
            warnings.warn("brotli arg is deprecated and will be remove the next version - "
                "brotli is disabled by default",
                DeprecationWarning,
                stacklevel=2
            )

        self.version = __version__
        self.host = host
        self.key = key
        self.verify = verify
        self.debug = debug
        self.connect_timeout = connect_timeout
        self.web_scraping_api_read_timeout = web_scraping_api_read_timeout
        self.screenshot_api_read_timeout = screenshot_api_read_timeout
        self.extraction_api_read_timeout = extraction_api_read_timeout
        self.monitoring_api_read_timeout = default_read_timeout
        self.default_read_timeout = default_read_timeout

        # @deprecated
        self.read_timeout = default_read_timeout

        self.max_concurrency = max_concurrency
        self.body_handler = ResponseBodyHandler(use_brotli=False)
        self.async_executor = ThreadPoolExecutor()
        self.http_session = None

        if not self.verify and not self.HOST.endswith('.local'):
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        if self.debug is True:
            http.client.HTTPConnection.debuglevel = 5

        if reporter is None:
            from .reporter import NoopReporter

            reporter = NoopReporter()

        self.reporter = Reporter(reporter)

    @property
    def ua(self) -> str:
        return 'ScrapflySDK/%s (Python %s, %s, %s)' % (
            self.version,
            platform.python_version(),
            platform.uname().system,
            platform.uname().machine
        )

    @cached_property
    def _http_handler(self):
        return partial(self.http_session.request if self.http_session else requests.request)

    @property
    def http(self):
        return self._http_handler

    def _scrape_request(self, scrape_config:ScrapeConfig):
        return {
            'method': scrape_config.method,
            'url': self.host + '/scrape',
            'data': scrape_config.body,
            'verify': self.verify,
            'timeout': (self.connect_timeout, self.web_scraping_api_read_timeout),
            'headers': {
                'content-type': scrape_config.headers['content-type'] if scrape_config.method in ['POST', 'PUT', 'PATCH'] else self.body_handler.content_type,
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
            'params': scrape_config.to_api_params(key=self.key)
        }
    
    def _screenshot_request(self, screenshot_config:ScreenshotConfig):
        return {
            'method': 'GET',
            'url': self.host + '/screenshot',
            'timeout': (self.connect_timeout, self.screenshot_api_read_timeout),
            'headers': {
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },            
            'params': screenshot_config.to_api_params(key=self.key)
        }        

    def _extraction_request(self, extraction_config:ExtractionConfig):
        headers = {
                'content-type': extraction_config.content_type,
                'accept-encoding': self.body_handler.content_encoding,
                'content-encoding': extraction_config.document_compression_format if extraction_config.document_compression_format else None,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
        }

        if extraction_config.document_compression_format:
            headers['content-encoding'] = extraction_config.document_compression_format.value

        return {
            'method': 'POST',
            'url': self.host + '/extraction',
            'data': extraction_config.body,
            'timeout': (self.connect_timeout, self.extraction_api_read_timeout),
            'headers': headers,
            'params': extraction_config.to_api_params(key=self.key)
        }


    def account(self) -> Union[str, Dict]:
        response = self._http_handler(
            method='GET',
            url=self.host + '/account',
            params={'key': self.key},
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')

    def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None):
        params = {'key': self.key, 'format': format}

        if period is not None:
            params['period'] = period

        if aggregation is not None:
            params['aggregation'] = ','.join(aggregation)

        response = self._http_handler(
            method='GET',
            url=self.host + '/scrape/monitoring/metrics',
            params=params,
            timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')

    def get_monitoring_target_metrics(
            self,
            domain:str,
            group_subdomain:bool=False,
            period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H,
            start:Optional[datetime.datetime]=None,
            end:Optional[datetime.datetime]=None,
    ):
        params = {
            'key': self.key,
            'domain': domain,
            'group_subdomain': group_subdomain
        }

        if (start is not None and end is None) or (start is None and end is not None):
            raise ValueError('You must provide both start and end date')

        if start is not None and end is not None:
            params['start'] = start.strftime(self.DATETIME_FORMAT)
            params['end'] = end.strftime(self.DATETIME_FORMAT)
            period = None

        params['period'] = period

        response = self._http_handler(
            method='GET',
            url=self.host + '/scrape/monitoring/metrics/target',
            timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
            params=params,
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')


    def resilient_scrape(
        self,
        scrape_config:ScrapeConfig,
        retry_on_errors:Set[Exception]={ScrapflyError},
        retry_on_status_code:Optional[List[int]]=None,
        tries: int = 5,
        delay: int = 20,
    ) -> ScrapeApiResponse:
        assert retry_on_errors is not None, 'Retry on error is None'
        assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()'

        @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay)
        def inner() -> ScrapeApiResponse:

            try:
                return self.scrape(scrape_config=scrape_config)
            except (UpstreamHttpClientError, UpstreamHttpServerError) as e:
                if retry_on_status_code is not None and e.api_response:
                    if e.api_response.upstream_status_code in retry_on_status_code:
                        raise e
                    else:
                        return e.api_response

                raise e

        return inner()

    def open(self):
        if self.http_session is None:
            self.http_session = Session()
            self.http_session.verify = self.verify
            self.http_session.timeout = (self.connect_timeout, self.default_read_timeout)
            self.http_session.params['key'] = self.key
            self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding
            self.http_session.headers['accept'] = self.body_handler.accept
            self.http_session.headers['user-agent'] = self.ua

    def close(self):
        self.http_session.close()
        self.http_session = None

    def __enter__(self) -> 'ScrapflyClient':
        self.open()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)

    async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None):
        if concurrency is None:
            concurrency = self.max_concurrency
        elif concurrency == self.CONCURRENCY_AUTO:
            concurrency = self.account()['subscription']['max_concurrency']

        loop = asyncio.get_running_loop()
        processing_tasks = []
        results = []
        processed_tasks = 0
        expected_tasks = len(scrape_configs)

        def scrape_done_callback(task:Task):
            nonlocal processed_tasks

            try:
                if task.cancelled() is True:
                    return

                error = task.exception()

                if error is not None:
                    results.append(error)
                else:
                    results.append(task.result())
            finally:
                processing_tasks.remove(task)
                processed_tasks += 1

        while scrape_configs or results or processing_tasks:
            logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

            if scrape_configs:
                if len(processing_tasks) < concurrency:
                    # @todo handle backpressure
                    for _ in range(0, concurrency - len(processing_tasks)):
                        try:
                            scrape_config = scrape_configs.pop()
                        except:
                            break

                        scrape_config.raise_on_upstream_error = False
                        task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop))
                        processing_tasks.append(task)
                        task.add_done_callback(scrape_done_callback)

            for _ in results:
                result = results.pop()
                yield result

            await asyncio.sleep(.5)

        logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse:
        """
        Scrape a website
        :param scrape_config: ScrapeConfig
        :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration
        :return: ScrapeApiResponse

        If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED',
            'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds',
            'retryable': False,
            'http_code': 422,
            'links': {
                'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED'
            }
        }
        """

        try:
            logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url))
            request_data = self._scrape_request(scrape_config=scrape_config)
            response = self._http_handler(**request_data)
            scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config)

            self.reporter.report(scrape_api_response=scrape_api_response)

            return scrape_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse:
        """
        Take a screenshot
        :param screenshot_config: ScrapeConfig
        :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration
        :return: str

        If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT',
            'message': 'For some reason we were unable to take the screenshot',
            'http_code': 422,
            'links': {
                'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT'
            }
        }
        """

        try:
            logger.debug('--> %s Screenshoting' % (screenshot_config.url))
            request_data = self._screenshot_request(screenshot_config=screenshot_config)
            response = self._http_handler(**request_data)
            screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config)
            return screenshot_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.extract, extraction_config)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse:
        """
        Extract structured data from text content
        :param extraction_config: ExtractionConfig
        :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration
        :return: str

        If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED',
            'message': 'The content type of the response is not supported for extraction',
            'http_code': 422,
            'links': {
                'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED'
            }
        }
        """

        try:
            logger.debug('--> %s Extracting data from' % (extraction_config.content_type))
            request_data = self._extraction_request(extraction_config=extraction_config)
            response = self._http_handler(**request_data)
            extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config)
            return extraction_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    def _handle_response(self, response:Response, scrape_config:ScrapeConfig) -> ScrapeApiResponse:
        try:
            api_response = self._handle_api_response(
                response=response,
                scrape_config=scrape_config,
                raise_on_upstream_error=scrape_config.raise_on_upstream_error
            )

            if scrape_config.method == 'HEAD':
                logger.debug('<-- [%s %s] %s | %ss' % (
                    api_response.response.status_code,
                    api_response.response.reason,
                    api_response.response.request.url,
                    0
                ))
            else:
                logger.debug('<-- [%s %s] %s | %ss' % (
                    api_response.result['result']['status_code'],
                    api_response.result['result']['reason'],
                    api_response.result['config']['url'],
                    api_response.result['result']['duration'])
                )

                logger.debug('Log url: %s' % api_response.result['result']['log_url'])

            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise

    def _handle_screenshot_response(self, response:Response, screenshot_config:ScreenshotConfig) -> ScreenshotApiResponse:    
        try:
            api_response = self._handle_screenshot_api_response(
                response=response,
                screenshot_config=screenshot_config,
                raise_on_upstream_error=screenshot_config.raise_on_upstream_error
            )
            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise         

    def _handle_extraction_response(self, response:Response, extraction_config:ExtractionConfig) -> ExtractionApiResponse:
        try:
            api_response = self._handle_extraction_api_response(
                response=response,
                extraction_config=extraction_config,
                raise_on_upstream_error=extraction_config.raise_on_upstream_error
            )
            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise    

    def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None):
        """
        Save a screenshot from a screenshot API response
        :param api_response: ScreenshotApiResponse
        :param name: str - name of the screenshot to save as
        :param path: Optional[str]
        """

        if screenshot_api_response.screenshot_success is not True:
            raise RuntimeError('Screenshot was not successful')

        if not screenshot_api_response.image:
            raise RuntimeError('Screenshot binary does not exist')

        content = screenshot_api_response.image
        extension_name = screenshot_api_response.metadata['extension_name']

        if path:
            os.makedirs(path, exist_ok=True)
            file_path = os.path.join(path, f'{name}.{extension_name}')
        else:
            file_path = f'{name}.{extension_name}'

        if isinstance(content, bytes):
            content = BytesIO(content)

        with open(file_path, 'wb') as f:
            shutil.copyfileobj(content, f, length=131072)

    def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None):
        """
        Save a screenshot from a scrape result
        :param api_response: ScrapeApiResponse
        :param name: str - name of the screenshot given in the scrape config
        :param path: Optional[str]
        """

        if not api_response.scrape_result['screenshots']:
            raise RuntimeError('Screenshot %s do no exists' % name)

        try:
            api_response.scrape_result['screenshots'][name]
        except KeyError:
            raise RuntimeError('Screenshot %s do no exists' % name)

        screenshot_response = self._http_handler(
            method='GET',
            url=api_response.scrape_result['screenshots'][name]['url'],
            params={'key': self.key},
            verify=self.verify
        )

        screenshot_response.raise_for_status()

        if not name.endswith('.jpg'):
            name += '.jpg'

        api_response.sink(path=path, name=name, content=screenshot_response.content)

    def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str:
        scrape_result = api_response.result['result']
        scrape_config = api_response.result['config']

        file_content = content or scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = scrape_config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)
        return file_path

    def _handle_scrape_large_objects(
        self,
        callback_url:str,
        format: Literal['clob', 'blob']
    ) -> Tuple[Union[BytesIO, str], str]:
        if format not in ['clob', 'blob']:
            raise ContentError('Large objects handle can handles format format [blob, clob], given: %s' % format)

        response = self._http_handler(**{
            'method': 'GET',
            'url': callback_url,
            'verify': self.verify,
            'timeout': (self.connect_timeout, self.default_read_timeout),
            'headers': {
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
            'params': {'key': self.key}
        })

        if self.body_handler.support(headers=response.headers):
            content = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            content = response.content

        if format == 'clob':
            return content.decode('utf-8'), 'text'

        return BytesIO(content), 'binary'

    def _handle_api_response(
        self,
        response: Response,
        scrape_config:ScrapeConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ScrapeApiResponse:

        if scrape_config.method == 'HEAD':
            body = None
        else:
            if self.body_handler.support(headers=response.headers):
                body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
            else:
                body = response.content.decode('utf-8')

        api_response:ScrapeApiResponse = ScrapeApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            scrape_config=scrape_config,
            large_object_handler=self._handle_scrape_large_objects
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

    def _handle_screenshot_api_response(
        self,
        response: Response,
        screenshot_config:ScreenshotConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ScreenshotApiResponse:

        if self.body_handler.support(headers=response.headers):
            body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            body = {'result': response.content}

        api_response:ScreenshotApiResponse = ScreenshotApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            screenshot_config=screenshot_config
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

    def _handle_extraction_api_response(
        self,
        response: Response,
        extraction_config:ExtractionConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ExtractionApiResponse:
        
        if self.body_handler.support(headers=response.headers):
            body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            body = response.content.decode('utf-8')

        api_response:ExtractionApiResponse = ExtractionApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            extraction_config=extraction_config
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

Class variables

var CONCURRENCY_AUTO
var DATETIME_FORMAT
var DEFAULT_CONNECT_TIMEOUT
var DEFAULT_EXTRACTION_API_READ_TIMEOUT
var DEFAULT_READ_TIMEOUT
var DEFAULT_SCREENSHOT_API_READ_TIMEOUT
var DEFAULT_WEBSCRAPING_API_READ_TIMEOUT
var HOST
var brotli : bool
var connect_timeout : int
var debug : bool
var default_read_timeout : int
var distributed_mode : bool
var extraction_api_read_timeout : int
var host : str
var key : str
var max_concurrency : int
var monitoring_api_read_timeout : int
var read_timeout : int
var reporter : scrapfly.reporter.Reporter
var screenshot_api_read_timeout : int
var verify : bool
var version : str
var web_scraping_api_read_timeout : int

Instance variables

var http
Expand source code
@property
def http(self):
    return self._http_handler
var ua : str
Expand source code
@property
def ua(self) -> str:
    return 'ScrapflySDK/%s (Python %s, %s, %s)' % (
        self.version,
        platform.python_version(),
        platform.uname().system,
        platform.uname().machine
    )

Methods

def account(self) ‑> Union[str, Dict]
Expand source code
def account(self) -> Union[str, Dict]:
    response = self._http_handler(
        method='GET',
        url=self.host + '/account',
        params={'key': self.key},
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content, response.headers['content-type'])

    return response.content.decode('utf-8')
async def async_extraction(self, extraction_config: ExtractionConfig, loop: Optional[asyncio.events.AbstractEventLoop] = None) ‑> ExtractionApiResponse
Expand source code
async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.extract, extraction_config)
async def async_scrape(self, scrape_config: ScrapeConfig, loop: Optional[asyncio.events.AbstractEventLoop] = None) ‑> ScrapeApiResponse
Expand source code
async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)
async def async_screenshot(self, screenshot_config: ScreenshotConfig, loop: Optional[asyncio.events.AbstractEventLoop] = None) ‑> ScreenshotApiResponse
Expand source code
async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config)
def close(self)
Expand source code
def close(self):
    self.http_session.close()
    self.http_session = None
async def concurrent_scrape(self, scrape_configs: List[ScrapeConfig], concurrency: Optional[int] = None)
Expand source code
async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None):
    if concurrency is None:
        concurrency = self.max_concurrency
    elif concurrency == self.CONCURRENCY_AUTO:
        concurrency = self.account()['subscription']['max_concurrency']

    loop = asyncio.get_running_loop()
    processing_tasks = []
    results = []
    processed_tasks = 0
    expected_tasks = len(scrape_configs)

    def scrape_done_callback(task:Task):
        nonlocal processed_tasks

        try:
            if task.cancelled() is True:
                return

            error = task.exception()

            if error is not None:
                results.append(error)
            else:
                results.append(task.result())
        finally:
            processing_tasks.remove(task)
            processed_tasks += 1

    while scrape_configs or results or processing_tasks:
        logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

        if scrape_configs:
            if len(processing_tasks) < concurrency:
                # @todo handle backpressure
                for _ in range(0, concurrency - len(processing_tasks)):
                    try:
                        scrape_config = scrape_configs.pop()
                    except:
                        break

                    scrape_config.raise_on_upstream_error = False
                    task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop))
                    processing_tasks.append(task)
                    task.add_done_callback(scrape_done_callback)

        for _ in results:
            result = results.pop()
            yield result

        await asyncio.sleep(.5)

    logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))
def extract(self, extraction_config: ExtractionConfig, no_raise: bool = False) ‑> ExtractionApiResponse

Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str

If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } }

Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse:
    """
    Extract structured data from text content
    :param extraction_config: ExtractionConfig
    :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration
    :return: str

    If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error.
    If the error is not none, you will get the following structure for example

    'error': {
        'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED',
        'message': 'The content type of the response is not supported for extraction',
        'http_code': 422,
        'links': {
            'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED'
        }
    }
    """

    try:
        logger.debug('--> %s Extracting data from' % (extraction_config.content_type))
        request_data = self._extraction_request(extraction_config=extraction_config)
        response = self._http_handler(**request_data)
        extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config)
        return extraction_api_response
    except BaseException as e:
        self.reporter.report(error=e)

        if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
            return e.api_response

        raise e
def get_monitoring_metrics(self, format: str = 'structured', period: Optional[str] = None, aggregation: Optional[List[Literal['account', 'project', 'target']]] = None)
Expand source code
def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None):
    params = {'key': self.key, 'format': format}

    if period is not None:
        params['period'] = period

    if aggregation is not None:
        params['aggregation'] = ','.join(aggregation)

    response = self._http_handler(
        method='GET',
        url=self.host + '/scrape/monitoring/metrics',
        params=params,
        timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content, response.headers['content-type'])

    return response.content.decode('utf-8')
def get_monitoring_target_metrics(self, domain: str, group_subdomain: bool = False, period: Optional[Literal['subscription', 'last7d', 'last24h', 'last1h', 'last5m']] = 'last24h', start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None)
Expand source code
def get_monitoring_target_metrics(
        self,
        domain:str,
        group_subdomain:bool=False,
        period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H,
        start:Optional[datetime.datetime]=None,
        end:Optional[datetime.datetime]=None,
):
    params = {
        'key': self.key,
        'domain': domain,
        'group_subdomain': group_subdomain
    }

    if (start is not None and end is None) or (start is None and end is not None):
        raise ValueError('You must provide both start and end date')

    if start is not None and end is not None:
        params['start'] = start.strftime(self.DATETIME_FORMAT)
        params['end'] = end.strftime(self.DATETIME_FORMAT)
        period = None

    params['period'] = period

    response = self._http_handler(
        method='GET',
        url=self.host + '/scrape/monitoring/metrics/target',
        timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
        params=params,
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content, response.headers['content-type'])

    return response.content.decode('utf-8')
def open(self)
Expand source code
def open(self):
    if self.http_session is None:
        self.http_session = Session()
        self.http_session.verify = self.verify
        self.http_session.timeout = (self.connect_timeout, self.default_read_timeout)
        self.http_session.params['key'] = self.key
        self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding
        self.http_session.headers['accept'] = self.body_handler.accept
        self.http_session.headers['user-agent'] = self.ua
def resilient_scrape(self, scrape_config: ScrapeConfig, retry_on_errors: Set[Exception] = {<class 'scrapfly.errors.ScrapflyError'>}, retry_on_status_code: Optional[List[int]] = None, tries: int = 5, delay: int = 20) ‑> ScrapeApiResponse
Expand source code
def resilient_scrape(
    self,
    scrape_config:ScrapeConfig,
    retry_on_errors:Set[Exception]={ScrapflyError},
    retry_on_status_code:Optional[List[int]]=None,
    tries: int = 5,
    delay: int = 20,
) -> ScrapeApiResponse:
    assert retry_on_errors is not None, 'Retry on error is None'
    assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()'

    @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay)
    def inner() -> ScrapeApiResponse:

        try:
            return self.scrape(scrape_config=scrape_config)
        except (UpstreamHttpClientError, UpstreamHttpServerError) as e:
            if retry_on_status_code is not None and e.api_response:
                if e.api_response.upstream_status_code in retry_on_status_code:
                    raise e
                else:
                    return e.api_response

            raise e

    return inner()
def save_scrape_screenshot(self, api_response: ScrapeApiResponse, name: str, path: Optional[str] = None)

Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str]

Expand source code
def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None):
    """
    Save a screenshot from a scrape result
    :param api_response: ScrapeApiResponse
    :param name: str - name of the screenshot given in the scrape config
    :param path: Optional[str]
    """

    if not api_response.scrape_result['screenshots']:
        raise RuntimeError('Screenshot %s do no exists' % name)

    try:
        api_response.scrape_result['screenshots'][name]
    except KeyError:
        raise RuntimeError('Screenshot %s do no exists' % name)

    screenshot_response = self._http_handler(
        method='GET',
        url=api_response.scrape_result['screenshots'][name]['url'],
        params={'key': self.key},
        verify=self.verify
    )

    screenshot_response.raise_for_status()

    if not name.endswith('.jpg'):
        name += '.jpg'

    api_response.sink(path=path, name=name, content=screenshot_response.content)
def save_screenshot(self, screenshot_api_response: ScreenshotApiResponse, name: str, path: Optional[str] = None)

Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str]

Expand source code
def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None):
    """
    Save a screenshot from a screenshot API response
    :param api_response: ScreenshotApiResponse
    :param name: str - name of the screenshot to save as
    :param path: Optional[str]
    """

    if screenshot_api_response.screenshot_success is not True:
        raise RuntimeError('Screenshot was not successful')

    if not screenshot_api_response.image:
        raise RuntimeError('Screenshot binary does not exist')

    content = screenshot_api_response.image
    extension_name = screenshot_api_response.metadata['extension_name']

    if path:
        os.makedirs(path, exist_ok=True)
        file_path = os.path.join(path, f'{name}.{extension_name}')
    else:
        file_path = f'{name}.{extension_name}'

    if isinstance(content, bytes):
        content = BytesIO(content)

    with open(file_path, 'wb') as f:
        shutil.copyfileobj(content, f, length=131072)
def scrape(self, scrape_config: ScrapeConfig, no_raise: bool = False) ‑> ScrapeApiResponse

Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse

If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } }

Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse:
    """
    Scrape a website
    :param scrape_config: ScrapeConfig
    :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration
    :return: ScrapeApiResponse

    If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error.
    If the error is not none, you will get the following structure for example

    'error': {
        'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED',
        'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds',
        'retryable': False,
        'http_code': 422,
        'links': {
            'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED'
        }
    }
    """

    try:
        logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url))
        request_data = self._scrape_request(scrape_config=scrape_config)
        response = self._http_handler(**request_data)
        scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config)

        self.reporter.report(scrape_api_response=scrape_api_response)

        return scrape_api_response
    except BaseException as e:
        self.reporter.report(error=e)

        if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
            return e.api_response

        raise e
def screenshot(self, screenshot_config: ScreenshotConfig, no_raise: bool = False) ‑> ScreenshotApiResponse

Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str

If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } }

Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse:
    """
    Take a screenshot
    :param screenshot_config: ScrapeConfig
    :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration
    :return: str

    If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error.
    If the error is not none, you will get the following structure for example

    'error': {
        'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT',
        'message': 'For some reason we were unable to take the screenshot',
        'http_code': 422,
        'links': {
            'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT'
        }
    }
    """

    try:
        logger.debug('--> %s Screenshoting' % (screenshot_config.url))
        request_data = self._screenshot_request(screenshot_config=screenshot_config)
        response = self._http_handler(**request_data)
        screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config)
        return screenshot_api_response
    except BaseException as e:
        self.reporter.report(error=e)

        if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
            return e.api_response

        raise e
def sink(self, api_response: ScrapeApiResponse, content: Union[str, bytes, ForwardRef(None)] = None, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None) ‑> str
Expand source code
def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str:
    scrape_result = api_response.result['result']
    scrape_config = api_response.result['config']

    file_content = content or scrape_result['content']
    file_path = None
    file_extension = None

    if name:
        name_parts = name.split('.')
        if len(name_parts) > 1:
            file_extension = name_parts[-1]

    if not file:
        if file_extension is None:
            try:
                mime_type = scrape_result['response_headers']['content-type']
            except KeyError:
                mime_type = 'application/octet-stream'

            if ';' in mime_type:
                mime_type = mime_type.split(';')[0]

            file_extension = '.' + mime_type.split('/')[1]

        if not name:
            name = scrape_config['url'].split('/')[-1]

        if name.find(file_extension) == -1:
            name += file_extension

        file_path = path + '/' + name if path else name

        if file_path == file_extension:
            url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-')

            if url[-1] == '-':
                url = url[:-1]

            url += file_extension

            file_path = url

        file = open(file_path, 'wb')

    if isinstance(file_content, str):
        file_content = BytesIO(file_content.encode('utf-8'))
    elif isinstance(file_content, bytes):
        file_content = BytesIO(file_content)

    file_content.seek(0)
    with file as f:
        shutil.copyfileobj(file_content, f, length=131072)

    logger.info('file %s created' % file_path)
    return file_path
class ScrapflyError (message: str, code: str, http_status_code: int, resource: Optional[str] = None, is_retryable: bool = False, retry_delay: Optional[int] = None, retry_times: Optional[int] = None, documentation_url: Optional[str] = None, api_response: Optional[ForwardRef('ApiResponse')] = None)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyError(Exception):
    KIND_HTTP_BAD_RESPONSE = 'HTTP_BAD_RESPONSE'
    KIND_SCRAPFLY_ERROR = 'SCRAPFLY_ERROR'

    RESOURCE_PROXY = 'PROXY'
    RESOURCE_THROTTLE = 'THROTTLE'
    RESOURCE_SCRAPE = 'SCRAPE'
    RESOURCE_ASP = 'ASP'
    RESOURCE_SCHEDULE = 'SCHEDULE'
    RESOURCE_WEBHOOK = 'WEBHOOK'
    RESOURCE_SESSION = 'SESSION'

    def __init__(
        self,
        message: str,
        code: str,
        http_status_code: int,
        resource: Optional[str]=None,
        is_retryable: bool = False,
        retry_delay: Optional[int] = None,
        retry_times: Optional[int] = None,
        documentation_url: Optional[str] = None,
        api_response: Optional['ApiResponse'] = None
    ):
        self.message = message
        self.code = code
        self.retry_delay = retry_delay
        self.retry_times = retry_times
        self.resource = resource
        self.is_retryable = is_retryable
        self.documentation_url = documentation_url
        self.api_response = api_response
        self.http_status_code = http_status_code

        super().__init__(self.message, str(self.code))

    def __str__(self):
        message = self.message

        if self.documentation_url is not None:
            message += '. Learn more: %s' % self.documentation_url

        return message

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

  • scrapfly.errors.ExtraUsageForbidden
  • scrapfly.errors.HttpError

Class variables

var KIND_HTTP_BAD_RESPONSE
var KIND_SCRAPFLY_ERROR
var RESOURCE_ASP
var RESOURCE_PROXY
var RESOURCE_SCHEDULE
var RESOURCE_SCRAPE
var RESOURCE_SESSION
var RESOURCE_THROTTLE
var RESOURCE_WEBHOOK
class ScrapflyProxyError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyProxyError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyScheduleError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyScheduleError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyScrapeError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyScrapeError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflySessionError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflySessionError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyThrottleError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyThrottleError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyWebhookError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyWebhookError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScreenshotAPIError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScreenshotAPIError(HttpError):
    pass

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScreenshotApiResponse (request: requests.models.Request, response: requests.models.Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None)
Expand source code
class ScreenshotApiResponse(ApiResponse):
    def __init__(self, request: Request, response: Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None):
        super().__init__(request, response)
        self.screenshot_config = screenshot_config
        self.result = self.handle_api_result(api_result)

    @property
    def image(self) -> Optional[str]:
        binary = self.result.get('result', None)
        if binary is None:
            return ''

        return binary

    @property
    def metadata(self) -> Optional[Dict]:
        if not self.image:
            return {}

        content_type = self.response.headers.get('content-type')
        extension_name = content_type[content_type.find('/') + 1:].split(';')[0]

        return {
            'extension_name': extension_name,
            'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'),
            'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url')
        }

    @property
    def screenshot_success(self) -> bool:
        if not self.image:
            return False

        return True

    @property
    def error(self) -> Optional[Dict]:
        if self.image:
            return None

        if self.screenshot_success is False:
            return self.result

    def _is_api_error(self, api_result: Dict) -> bool:
        if api_result is None:
            return True

        return 'error_id' in api_result

    def handle_api_result(self, api_result: bytes) -> FrozenDict:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        return api_result

    def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError):
        super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Ancestors

Instance variables

var error : Optional[Dict]
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.image:
        return None

    if self.screenshot_success is False:
        return self.result
var image : Optional[str]
Expand source code
@property
def image(self) -> Optional[str]:
    binary = self.result.get('result', None)
    if binary is None:
        return ''

    return binary
var metadata : Optional[Dict]
Expand source code
@property
def metadata(self) -> Optional[Dict]:
    if not self.image:
        return {}

    content_type = self.response.headers.get('content-type')
    extension_name = content_type[content_type.find('/') + 1:].split(';')[0]

    return {
        'extension_name': extension_name,
        'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'),
        'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url')
    }
var screenshot_success : bool
Expand source code
@property
def screenshot_success(self) -> bool:
    if not self.image:
        return False

    return True

Methods

def handle_api_result(self, api_result: bytes) ‑> FrozenDict
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict:
    if self._is_api_error(api_result=api_result) is True:
        return FrozenDict(api_result)

    return api_result
def raise_for_result(self, raise_on_upstream_error=True, error_class=scrapfly.errors.ScreenshotAPIError)
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError):
    super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Inherited members

class ScreenshotConfig (url: str, format: Optional[Format] = None, capture: Optional[str] = None, resolution: Optional[str] = None, country: Optional[str] = None, timeout: Optional[int] = None, rendering_wait: Optional[int] = None, wait_for_selector: Optional[str] = None, options: Optional[List[Options]] = None, auto_scroll: Optional[bool] = None, js: Optional[str] = None, cache: Optional[bool] = None, cache_ttl: Optional[bool] = None, cache_clear: Optional[bool] = None, webhook: Optional[str] = None, raise_on_upstream_error: bool = True)
Expand source code
class ScreenshotConfig(BaseApiConfig):
    url: str
    format: Optional[Format] = None
    capture: Optional[str] = None
    resolution: Optional[str] = None
    country: Optional[str] = None
    timeout: Optional[int] = None # in milliseconds
    rendering_wait: Optional[int] = None # in milliseconds
    wait_for_selector: Optional[str] = None
    options: Optional[List[Options]] = None
    auto_scroll: Optional[bool] = None
    js: Optional[str] = None
    cache: Optional[bool] = None
    cache_ttl: Optional[bool] = None
    cache_clear: Optional[bool] = None
    webhook: Optional[str] = None
    raise_on_upstream_error: bool = True

    def __init__(
        self,
        url: str,
        format: Optional[Format] = None,
        capture: Optional[str] = None,
        resolution: Optional[str] = None,
        country: Optional[str] = None,
        timeout: Optional[int] = None, # in milliseconds
        rendering_wait: Optional[int] = None, # in milliseconds
        wait_for_selector: Optional[str] = None,
        options: Optional[List[Options]] = None,
        auto_scroll: Optional[bool] = None,
        js: Optional[str] = None,
        cache: Optional[bool] = None,
        cache_ttl: Optional[bool] = None,
        cache_clear: Optional[bool] = None,
        webhook: Optional[str] = None,
        raise_on_upstream_error: bool = True
    ):
        assert(type(url) is str)

        self.url = url
        self.key = None
        self.format = format
        self.capture = capture
        self.resolution = resolution
        self.country = country
        self.timeout = timeout
        self.rendering_wait = rendering_wait
        self.wait_for_selector = wait_for_selector
        self.options = [Options(flag) for flag in options] if options else None
        self.auto_scroll = auto_scroll
        self.js = js
        self.cache = cache
        self.cache_ttl = cache_ttl
        self.cache_clear = cache_clear
        self.webhook = webhook
        self.raise_on_upstream_error = raise_on_upstream_error

    def to_api_params(self, key:str) -> Dict:
        params = {
            'key': self.key or key,
            'url': self.url
        }

        if self.format:
            params['format'] = Format(self.format).value

        if self.capture:
            params['capture'] = self.capture

        if self.resolution:
            params['resolution'] = self.resolution

        if self.country is not None:
            params['country'] = self.country

        if self.timeout is not None:
            params['timeout'] = self.timeout

        if self.rendering_wait is not None:
            params['rendering_wait'] = self.rendering_wait

        if self.wait_for_selector is not None:
            params['wait_for_selector'] = self.wait_for_selector            

        if self.options is not None:
            params["options"] = ",".join(flag.value for flag in self.options)

        if self.auto_scroll is not None:
            params['auto_scroll'] = self._bool_to_http(self.auto_scroll)

        if self.js:
            params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

        if self.cache is not None:
            params['cache'] = self._bool_to_http(self.cache)
            
            if self.cache_ttl is not None:
                params['cache_ttl'] = self._bool_to_http(self.cache_ttl)

            if self.cache_clear is not None:
                params['cache_clear'] = self._bool_to_http(self.cache_clear)

        else:
            if self.cache_ttl is not None:
                logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

            if self.cache_clear is not None:
                logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

        if self.webhook is not None:
            params['webhook_name'] = self.webhook

        return params

Ancestors

Class variables

var auto_scroll : Optional[bool]
var cache : Optional[bool]
var cache_clear : Optional[bool]
var cache_ttl : Optional[bool]
var capture : Optional[str]
var country : Optional[str]
var format : Optional[Format]
var js : Optional[str]
var options : Optional[List[Options]]
var raise_on_upstream_error : bool
var rendering_wait : Optional[int]
var resolution : Optional[str]
var timeout : Optional[int]
var url : str
var wait_for_selector : Optional[str]
var webhook : Optional[str]

Methods

def to_api_params(self, key: str) ‑> Dict
Expand source code
def to_api_params(self, key:str) -> Dict:
    params = {
        'key': self.key or key,
        'url': self.url
    }

    if self.format:
        params['format'] = Format(self.format).value

    if self.capture:
        params['capture'] = self.capture

    if self.resolution:
        params['resolution'] = self.resolution

    if self.country is not None:
        params['country'] = self.country

    if self.timeout is not None:
        params['timeout'] = self.timeout

    if self.rendering_wait is not None:
        params['rendering_wait'] = self.rendering_wait

    if self.wait_for_selector is not None:
        params['wait_for_selector'] = self.wait_for_selector            

    if self.options is not None:
        params["options"] = ",".join(flag.value for flag in self.options)

    if self.auto_scroll is not None:
        params['auto_scroll'] = self._bool_to_http(self.auto_scroll)

    if self.js:
        params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

    if self.cache is not None:
        params['cache'] = self._bool_to_http(self.cache)
        
        if self.cache_ttl is not None:
            params['cache_ttl'] = self._bool_to_http(self.cache_ttl)

        if self.cache_clear is not None:
            params['cache_clear'] = self._bool_to_http(self.cache_clear)

    else:
        if self.cache_ttl is not None:
            logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

        if self.cache_clear is not None:
            logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

    if self.webhook is not None:
        params['webhook_name'] = self.webhook

    return params
class UpstreamHttpClientError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class UpstreamHttpClientError(UpstreamHttpError):
    pass

Ancestors

  • scrapfly.errors.UpstreamHttpError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

class UpstreamHttpError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class UpstreamHttpError(HttpError):
    pass

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

class UpstreamHttpServerError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class UpstreamHttpServerError(UpstreamHttpClientError):
    pass

Ancestors