Package scrapfly

Expand source code
__version__ = '0.8.15'

from typing import Tuple
from .errors import ScrapflyError
from .errors import ScrapflyAspError
from .errors import ScrapflyProxyError
from .errors import ScrapflyScheduleError
from .errors import ScrapflyScrapeError
from .errors import ScrapflySessionError
from .errors import ScrapflyThrottleError
from .errors import ScrapflyWebhookError
from .errors import EncoderError
from .errors import ErrorFactory
from .errors import HttpError
from .errors import UpstreamHttpError
from .errors import UpstreamHttpClientError
from .errors import UpstreamHttpServerError
from .errors import ApiHttpClientError
from .errors import ApiHttpServerError
from .api_response import ScrapeApiResponse, ResponseBodyHandler
from .client import ScrapflyClient
from .scrape_config import ScrapeConfig

__all__:Tuple[str, ...] = (
    'ScrapflyError',
    'ScrapflyAspError',
    'ScrapflyProxyError',
    'ScrapflyScheduleError',
    'ScrapflyScrapeError',
    'ScrapflySessionError',
    'ScrapflyThrottleError',
    'ScrapflyWebhookError',
    'UpstreamHttpError',
    'UpstreamHttpClientError',
    'UpstreamHttpServerError',
    'ApiHttpClientError',
    'ApiHttpServerError',
    'EncoderError',
    'ScrapeApiResponse',
    'ErrorFactory',
    'HttpError',
    'ScrapflyClient',
    'ResponseBodyHandler',
    'ScrapeConfig'
)

Sub-modules

scrapfly.api_response
scrapfly.client
scrapfly.errors
scrapfly.frozen_dict
scrapfly.polyfill
scrapfly.reporter
scrapfly.scrape_config
scrapfly.scrapy
scrapfly.webhook

Classes

class ApiHttpClientError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ApiHttpClientError(HttpError):
    pass

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

  • ApiHttpServerError
  • scrapfly.errors.BadApiKeyError
  • scrapfly.errors.PaymentRequired
  • scrapfly.errors.TooManyRequest
class ApiHttpServerError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ApiHttpServerError(ApiHttpClientError):
    pass

Ancestors

class EncoderError (content: str)

Common base class for all exceptions

Expand source code
class EncoderError(BaseException):

    def __init__(self, content:str):
        self.content = content
        super().__init__()

    def __str__(self) -> str:
        return self.content

    def __repr__(self):
        return "Invalid payload: %s" % self.content

Ancestors

  • builtins.BaseException
class ErrorFactory
Expand source code
class ErrorFactory:
    RESOURCE_TO_ERROR = {
        ScrapflyError.RESOURCE_SCRAPE: ScrapflyScrapeError,
        ScrapflyError.RESOURCE_WEBHOOK: ScrapflyWebhookError,
        ScrapflyError.RESOURCE_PROXY: ScrapflyProxyError,
        ScrapflyError.RESOURCE_SCHEDULE: ScrapflyScheduleError,
        ScrapflyError.RESOURCE_ASP: ScrapflyAspError,
        ScrapflyError.RESOURCE_SESSION: ScrapflySessionError
    }

    # Notable http error has own class for more convenience
    # Only applicable for generic API error
    HTTP_STATUS_TO_ERROR = {
        401: BadApiKeyError,
        402: PaymentRequired,
        429: TooManyRequest
    }

    @staticmethod
    def _get_resource(code: str) -> Optional[Tuple[str, str]]:

        if isinstance(code, str) and '::' in code:
            _, resource, _ = code.split('::')
            return resource

        return None

    @staticmethod
    def create(api_response: 'ScrapeApiResponse'):
        is_retryable = False
        kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR
        http_code = api_response.status_code
        retry_delay = 5
        retry_times = 3
        description = None
        error_url = 'https://scrapfly.io/docs/scrape-api/errors#api'
        code = api_response.error['code']

        if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
            http_code = api_response.scrape_result['status_code']

        if 'description' in api_response.error:
            description = api_response.error['description']

        message = '%s %s %s' % (str(http_code), code, api_response.error['message'])

        if 'doc_url' in api_response.error:
            error_url = api_response.error['doc_url']

        if 'retryable' in api_response.error:
            is_retryable = api_response.error['retryable']

        resource = ErrorFactory._get_resource(code=code)

        if is_retryable is True:
            if 'X-Retry' in api_response.headers:
                retry_delay = int(api_response.headers['Retry-After'])

        message = '%s: %s' % (message, description) if description else message

        if retry_delay is not None and is_retryable is True:
            message = '%s. Retry delay : %s seconds' % (message, str(retry_delay))

        args = {
            'message': message,
            'code': code,
            'http_status_code': http_code,
            'is_retryable': is_retryable,
            'api_response': api_response,
            'resource': resource,
            'retry_delay': retry_delay,
            'retry_times': retry_times,
            'documentation_url': error_url,
            'request': api_response.request,
            'response': api_response.response
        }

        if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE:
            if http_code >= 500:
                return ApiHttpServerError(**args)

            is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR

            if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error:
                return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args)

            if is_scraper_api_error:
                return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

            return ApiHttpClientError(**args)

        elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR:
            if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
                if http_code >= 500:
                    return UpstreamHttpServerError(**args)

                if http_code >= 400:
                    return UpstreamHttpClientError(**args)

            if resource in ErrorFactory.RESOURCE_TO_ERROR:
                return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

            return ScrapflyError(**args)

Class variables

var HTTP_STATUS_TO_ERROR
var RESOURCE_TO_ERROR

Static methods

def create(api_response: ScrapeApiResponse)
Expand source code
@staticmethod
def create(api_response: 'ScrapeApiResponse'):
    is_retryable = False
    kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR
    http_code = api_response.status_code
    retry_delay = 5
    retry_times = 3
    description = None
    error_url = 'https://scrapfly.io/docs/scrape-api/errors#api'
    code = api_response.error['code']

    if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
        http_code = api_response.scrape_result['status_code']

    if 'description' in api_response.error:
        description = api_response.error['description']

    message = '%s %s %s' % (str(http_code), code, api_response.error['message'])

    if 'doc_url' in api_response.error:
        error_url = api_response.error['doc_url']

    if 'retryable' in api_response.error:
        is_retryable = api_response.error['retryable']

    resource = ErrorFactory._get_resource(code=code)

    if is_retryable is True:
        if 'X-Retry' in api_response.headers:
            retry_delay = int(api_response.headers['Retry-After'])

    message = '%s: %s' % (message, description) if description else message

    if retry_delay is not None and is_retryable is True:
        message = '%s. Retry delay : %s seconds' % (message, str(retry_delay))

    args = {
        'message': message,
        'code': code,
        'http_status_code': http_code,
        'is_retryable': is_retryable,
        'api_response': api_response,
        'resource': resource,
        'retry_delay': retry_delay,
        'retry_times': retry_times,
        'documentation_url': error_url,
        'request': api_response.request,
        'response': api_response.response
    }

    if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE:
        if http_code >= 500:
            return ApiHttpServerError(**args)

        is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR

        if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error:
            return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args)

        if is_scraper_api_error:
            return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

        return ApiHttpClientError(**args)

    elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR:
        if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
            if http_code >= 500:
                return UpstreamHttpServerError(**args)

            if http_code >= 400:
                return UpstreamHttpClientError(**args)

        if resource in ErrorFactory.RESOURCE_TO_ERROR:
            return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

        return ScrapflyError(**args)
class HttpError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class HttpError(ScrapflyError):

    def __init__(self, request:Request, response:Optional[Response]=None, **kwargs):
        self.request = request
        self.response = response
        super().__init__(**kwargs)

    def __str__(self) -> str:
        if isinstance(self, UpstreamHttpError):
            text = f"Target website responded with {self.api_response.scrape_result['status_code']} - {self.api_response.scrape_result['reason']}"
        else:
            text = f"{self.response.status_code} - {self.response.reason}"

            if isinstance(self, (ApiHttpClientError, ApiHttpServerError)):
                try:
                    text += self.response.content.decode('utf-8')
                except UnicodeError:
                    raise EncoderError(content=base64.b64encode(self.response.content).decode('utf-8'))
            elif isinstance(self, ScraperAPIError):
                print(self.api_response.error)
                text += f" | {self.api_response.error['code']} - {self.api_response.error['message']} - {self.api_response.error['links']}"

        return text

Ancestors

Subclasses

  • ApiHttpClientError
  • scrapfly.errors.QuotaLimitReached
  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.TooManyConcurrentRequest
  • scrapfly.errors.UpstreamHttpError
class ResponseBodyHandler (use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None)
Expand source code
class ResponseBodyHandler:

    SUPPORTED_COMPRESSION = ['gzip', 'deflate']
    SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json']

    class JSONDateTimeDecoder(JSONDecoder):
        def __init__(self, *args, **kargs):
            JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs)

    # brotli under perform at same gzip level and upper level destroy the cpu so
    # the trade off do not worth it for most of usage
    def __init__(self, use_brotli:bool=False, signing_secrets:Optional[Tuple[str]]=None):
        if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION:
            try:
                try:
                    import brotlicffi as brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
                except ImportError:
                    import brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
            except ImportError:
                pass

        self.content_encoding:str = ', '.join(self.SUPPORTED_COMPRESSION)
        self._signing_secret:Optional[Tuple[str]] = None

        if signing_secrets:
            _secrets = set()

            for signing_secret in signing_secrets:
                _secrets.add(binascii.unhexlify(signing_secret))

            self._signing_secret = tuple(_secrets)

        try:  # automatically use msgpack if available https://msgpack.org/
            import msgpack
            self.accept = 'application/msgpack;charset=utf-8'
            self.content_type = 'application/msgpack;charset=utf-8'
            self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)
        except ImportError:
            self.accept = 'application/json;charset=utf-8'
            self.content_type = 'application/json;charset=utf-8'
            self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder)

    def support(self, headers:Dict) -> bool:
        if 'content-type' not in headers:
            return False

        for content_type in self.SUPPORTED_CONTENT_TYPES:
            if headers['content-type'].find(content_type) != -1:
                return True

        return False

    def verify(self, message:bytes, signature:str) -> bool:
        for signing_secret in self._signing_secret:
            if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature:
                return True

        return False

    def read(self, content: bytes, content_encoding:str, content_type:str, signature:Optional[str]) -> Dict:
        if content_encoding == 'gzip':
            import gzip
            content = gzip.decompress(content)
        elif content_encoding == 'deflate':
            import zlib
            content = zlib.decompress(content)
        elif content_encoding == 'brotli':
            import brotli
            content = brotli.decompress(content)

        if self._signing_secret is not None and signature is not None:
            if not self.verify(content, signature):
                raise WebhookSignatureMissMatch()

        if content_type.startswith('application/json'):
            content = loads(content, cls=self.JSONDateTimeDecoder)
        elif content_type.startswith('application/msgpack'):
            import msgpack
            content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False)

        return content

    def __call__(self, content: bytes) -> Union[str, Dict]:
        try:
            return self.content_loader(content)
        except Exception as e:
            try:
                raise EncoderError(content=content.decode('utf-8')) from e
            except UnicodeError:
                raise EncoderError(content=base64.b64encode(content).decode('utf-8')) from e

Class variables

var JSONDateTimeDecoder

Simple JSON http://json.org decoder

Performs the following translations in decoding by default:

+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+

It also understands NaN, Infinity, and -Infinity as their corresponding float values, which is outside the JSON spec.

var SUPPORTED_COMPRESSION
var SUPPORTED_CONTENT_TYPES

Methods

def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) ‑> Dict
Expand source code
def read(self, content: bytes, content_encoding:str, content_type:str, signature:Optional[str]) -> Dict:
    if content_encoding == 'gzip':
        import gzip
        content = gzip.decompress(content)
    elif content_encoding == 'deflate':
        import zlib
        content = zlib.decompress(content)
    elif content_encoding == 'brotli':
        import brotli
        content = brotli.decompress(content)

    if self._signing_secret is not None and signature is not None:
        if not self.verify(content, signature):
            raise WebhookSignatureMissMatch()

    if content_type.startswith('application/json'):
        content = loads(content, cls=self.JSONDateTimeDecoder)
    elif content_type.startswith('application/msgpack'):
        import msgpack
        content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False)

    return content
def support(self, headers: Dict) ‑> bool
Expand source code
def support(self, headers:Dict) -> bool:
    if 'content-type' not in headers:
        return False

    for content_type in self.SUPPORTED_CONTENT_TYPES:
        if headers['content-type'].find(content_type) != -1:
            return True

    return False
def verify(self, message: bytes, signature: str) ‑> bool
Expand source code
def verify(self, message:bytes, signature:str) -> bool:
    for signing_secret in self._signing_secret:
        if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature:
            return True

    return False
class ScrapeApiResponse (request: requests.models.Request, response: requests.models.Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None)
Expand source code
class ScrapeApiResponse:

    def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None):
        self.request = request
        self.response = response
        self.scrape_config = scrape_config

        if self.scrape_config.method == 'HEAD':
            api_result = {
                'result': {
                    'request_headers': {},
                    'status': 'DONE',
                    'success': 200 >= self.response.status_code < 300,
                    'response_headers': self.response.headers,
                    'status_code': self.response.status_code,
                    'reason': self.response.reason,
                    'format': 'text',
                    'content': ''
                },
                'context': {},
                'config': self.scrape_config.__dict__
            }

            if 'X-Scrapfly-Reject-Code' in self.response.headers:
                api_result['result']['error'] = {
                    'code': self.response.headers['X-Scrapfly-Reject-Code'],
                    'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']),
                    'message': self.response.headers['X-Scrapfly-Reject-Description'],
                    'error_id': self.response.headers['X-Scrapfly-Reject-ID'],
                    'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False,
                    'doc_url': '',
                    'links': {}
                }

                if 'X-Scrapfly-Reject-Doc' in self.response.headers:
                    api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc']
                    api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc']

        if isinstance(api_result, str):
            raise HttpError(
                request=request,
                response=response,
                message='Bad gateway',
                code=502,
                http_status_code=502,
                is_retryable=True
            )

        self.result = self.handle_api_result(api_result=api_result)

    @property
    def scrape_result(self) -> Dict:
        return self.result['result']

    @property
    def config(self) -> Dict:
        return self.result['config']

    @property
    def context(self) -> Dict:
        return self.result['context']

    @property
    def content(self) -> str:
        return self.scrape_result['content']

    @property
    def success(self) -> bool:
        """
            /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
        """
        return 200 >= self.response.status_code <= 299

    @property
    def scrape_success(self) -> bool:
        return self.scrape_result['success']

    @property
    def error(self) -> Optional[Dict]:
        if self.scrape_success is False:
            return self.scrape_result['error']

    @property
    def status_code(self) -> int:
        """
            /!\ This is the status code of our API, not the upstream website
        """
        return self.response.status_code

    @property
    def upstream_status_code(self) -> Optional[int]:
        if 'status_code' in self.scrape_result:
            return self.scrape_result['status_code']

        return None

    def prevent_extra_usage(self):
        if self.remaining_quota == 0:
            raise ExtraUsageForbidden(
                message='All Pre Paid Quota Used',
                code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE',
                http_status_code=429,
                is_retryable=False
            )

    @property
    def remaining_quota(self) -> Optional[int]:
        remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape')

        if remaining_scrape:
            remaining_scrape = int(remaining_scrape)

        return remaining_scrape

    @property
    def cost(self) -> Optional[int]:
        cost = self.response.headers.get('X-Scrapfly-Api-Cost')

        if cost:
            cost = int(cost)

        return cost

    @property
    def duration_ms(self) -> Optional[float]:
        duration = self.response.headers.get('X-Scrapfly-Response-Time')

        if duration:
            duration = float(duration)

        return duration

    @property
    def headers(self) -> CaseInsensitiveDict:
        return self.response.headers

    def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        try:
            if isinstance(api_result['config']['headers'], list):
                api_result['config']['headers'] = {}
        except TypeError:
            logger.info(api_result)
            raise

        with suppress(KeyError):
            api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
            api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])

        if api_result['result']['format'] == 'binary' and api_result['result']['content']:
            api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content']))

        return FrozenDict(api_result)

    @cached_property
    def soup(self) -> 'BeautifulSoup':
        try:
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(self.content, "lxml")
            return soup
        except ImportError as e:
            logger.error('You must install scrapfly[parser] to enable this feature')

    @cached_property
    def selector(self) -> 'Selector':
        try:
            from parsel import Selector
            return Selector(text=self.content)
        except ImportError as e:
            logger.error('You must install parsel or scrapy package to enable this feature')
            raise e

    @property
    def error_message(self) :
        if self.error:
            message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message'])

            if self.error['links']:
                message += "Checkout the related doc: %s" % list(self.error['links'].values())[0]

            return message

        return '<-- %s - %s %s | Doc: %s' % (self.response.status_code, self.http_status_code, self.code, self.documentation_url)

    def _is_api_error(self, api_result: Dict) -> bool:
        if self.scrape_config.method == 'HEAD':
            if 'X-Reject-Reason' in self.response.headers:
                return True
            return False

        if api_result is None:
            return True

        return 'error_id' in api_result

    def raise_for_result(self, raise_on_upstream_error: bool = True):

        try:
            self.response.raise_for_status()
        except HTTPError as e:
            if 'http_code' in self.result:
                if e.response.status_code >= 500:
                    raise ApiHttpServerError(
                        request=e.request,
                        response=e.response,
                        message=self.result['message'],
                        code='',
                        resource='',
                        http_status_code=e.response.status_code,
                        documentation_url=self.result.get('links')
                    ) from e
                else:
                    raise ApiHttpClientError(
                        request=e.request,
                        response=e.response,
                        message=self.result['message'],
                        code='',
                        resource='API',
                        http_status_code=self.result['http_code'],
                        documentation_url=self.result.get('links')
                    ) from e

        if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
            error = ErrorFactory.create(api_response=self)

            if error:
                if isinstance(error, UpstreamHttpError):
                    if raise_on_upstream_error is True:
                        raise error
                else:
                    raise error

    def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
        if _class != Response:
            raise RuntimeError('only Response from requests package is supported at the moment')

        if self.result is None:
            return None

        if self.response.status_code != 200:
            return None

        response = Response()
        response.status_code = self.scrape_result['status_code']
        response.reason = self.scrape_result['reason']

        if self.scrape_result['content']:
            if isinstance(self.scrape_result['content'], BytesIO):
                response._content = self.scrape_result['content'].getvalue()
            elif isinstance(self.scrape_result['content'], bytes):
                response._content = self.scrape_result['content']
            elif isinstance(self.scrape_result['content'], str):
                response._content = self.scrape_result['content'].encode('utf-8')
        else:
            response._content = None

        response.headers.update(self.scrape_result['response_headers'])
        response.url = self.scrape_result['url']

        response.request = Request(
            method=self.config['method'],
            url=self.config['url'],
            headers=self.scrape_result['request_headers'],
            data=self.config['body'] if self.config['body'] else None
        )

        if 'set-cookie' in response.headers:
            for raw_cookie in response.headers['set-cookie']:
                for name, cookie in SimpleCookie(raw_cookie).items():
                    expires = cookie.get('expires')

                    if expires == '':
                        expires = None

                    if expires:
                        try:
                            expires = parse(expires).timestamp()
                        except ValueError:
                            expires = None

                    if type(expires) == str:
                        if '.' in expires:
                            expires = float(expires)
                        else:
                            expires = int(expires)

                    response.cookies.set_cookie(Cookie(
                        version=cookie.get('version') if cookie.get('version') else None,
                        name=name,
                        value=cookie.value,
                        path=cookie.get('path', ''),
                        expires=expires,
                        comment=cookie.get('comment'),
                        domain=cookie.get('domain', ''),
                        secure=cookie.get('secure'),
                        port=None,
                        port_specified=False,
                        domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
                        domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
                        path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
                        discard=False,
                        comment_url=None,
                        rest={
                            'httponly': cookie.get('httponly'),
                            'samesite': cookie.get('samesite'),
                            'max-age': cookie.get('max-age')
                        }
                    ))

        return response

    def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content:Optional[Union[str, bytes]]=None):
        file_content = content or self.scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = self.scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = self.config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path is not None else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)

Instance variables

var config : Dict
Expand source code
@property
def config(self) -> Dict:
    return self.result['config']
var content : str
Expand source code
@property
def content(self) -> str:
    return self.scrape_result['content']
var context : Dict
Expand source code
@property
def context(self) -> Dict:
    return self.result['context']
var cost : Optional[int]
Expand source code
@property
def cost(self) -> Optional[int]:
    cost = self.response.headers.get('X-Scrapfly-Api-Cost')

    if cost:
        cost = int(cost)

    return cost
var duration_ms : Optional[float]
Expand source code
@property
def duration_ms(self) -> Optional[float]:
    duration = self.response.headers.get('X-Scrapfly-Response-Time')

    if duration:
        duration = float(duration)

    return duration
var error : Optional[Dict]
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.scrape_success is False:
        return self.scrape_result['error']
var error_message
Expand source code
@property
def error_message(self) :
    if self.error:
        message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message'])

        if self.error['links']:
            message += "Checkout the related doc: %s" % list(self.error['links'].values())[0]

        return message

    return '<-- %s - %s %s | Doc: %s' % (self.response.status_code, self.http_status_code, self.code, self.documentation_url)
var headers : requests.structures.CaseInsensitiveDict
Expand source code
@property
def headers(self) -> CaseInsensitiveDict:
    return self.response.headers
var remaining_quota : Optional[int]
Expand source code
@property
def remaining_quota(self) -> Optional[int]:
    remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape')

    if remaining_scrape:
        remaining_scrape = int(remaining_scrape)

    return remaining_scrape
var scrape_result : Dict
Expand source code
@property
def scrape_result(self) -> Dict:
    return self.result['result']
var scrape_success : bool
Expand source code
@property
def scrape_success(self) -> bool:
    return self.scrape_result['success']
var selector
Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        with self.lock:
            # check if another thread filled cache while we awaited lock
            val = cache.get(self.attrname, _NOT_FOUND)
            if val is _NOT_FOUND:
                val = self.func(instance)
                try:
                    cache[self.attrname] = val
                except TypeError:
                    msg = (
                        f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                        f"does not support item assignment for caching {self.attrname!r} property."
                    )
                    raise TypeError(msg) from None
    return val
var soup
Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        with self.lock:
            # check if another thread filled cache while we awaited lock
            val = cache.get(self.attrname, _NOT_FOUND)
            if val is _NOT_FOUND:
                val = self.func(instance)
                try:
                    cache[self.attrname] = val
                except TypeError:
                    msg = (
                        f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                        f"does not support item assignment for caching {self.attrname!r} property."
                    )
                    raise TypeError(msg) from None
    return val
var status_code : int

/!\ This is the status code of our API, not the upstream website

Expand source code
@property
def status_code(self) -> int:
    """
        /!\ This is the status code of our API, not the upstream website
    """
    return self.response.status_code
var success : bool

/!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code

Expand source code
@property
def success(self) -> bool:
    """
        /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
    """
    return 200 >= self.response.status_code <= 299
var upstream_status_code : Optional[int]
Expand source code
@property
def upstream_status_code(self) -> Optional[int]:
    if 'status_code' in self.scrape_result:
        return self.scrape_result['status_code']

    return None

Methods

def handle_api_result(self, api_result: Dict) ‑> Optional[FrozenDict]
Expand source code
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
    if self._is_api_error(api_result=api_result) is True:
        return FrozenDict(api_result)

    try:
        if isinstance(api_result['config']['headers'], list):
            api_result['config']['headers'] = {}
    except TypeError:
        logger.info(api_result)
        raise

    with suppress(KeyError):
        api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
        api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])

    if api_result['result']['format'] == 'binary' and api_result['result']['content']:
        api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content']))

    return FrozenDict(api_result)
def prevent_extra_usage(self)
Expand source code
def prevent_extra_usage(self):
    if self.remaining_quota == 0:
        raise ExtraUsageForbidden(
            message='All Pre Paid Quota Used',
            code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE',
            http_status_code=429,
            is_retryable=False
        )
def raise_for_result(self, raise_on_upstream_error: bool = True)
Expand source code
def raise_for_result(self, raise_on_upstream_error: bool = True):

    try:
        self.response.raise_for_status()
    except HTTPError as e:
        if 'http_code' in self.result:
            if e.response.status_code >= 500:
                raise ApiHttpServerError(
                    request=e.request,
                    response=e.response,
                    message=self.result['message'],
                    code='',
                    resource='',
                    http_status_code=e.response.status_code,
                    documentation_url=self.result.get('links')
                ) from e
            else:
                raise ApiHttpClientError(
                    request=e.request,
                    response=e.response,
                    message=self.result['message'],
                    code='',
                    resource='API',
                    http_status_code=self.result['http_code'],
                    documentation_url=self.result.get('links')
                ) from e

    if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
        error = ErrorFactory.create(api_response=self)

        if error:
            if isinstance(error, UpstreamHttpError):
                if raise_on_upstream_error is True:
                    raise error
            else:
                raise error
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None, content: Union[str, bytes, ForwardRef(None)] = None)
Expand source code
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content:Optional[Union[str, bytes]]=None):
    file_content = content or self.scrape_result['content']
    file_path = None
    file_extension = None

    if name:
        name_parts = name.split('.')
        if len(name_parts) > 1:
            file_extension = name_parts[-1]

    if not file:
        if file_extension is None:
            try:
                mime_type = self.scrape_result['response_headers']['content-type']
            except KeyError:
                mime_type = 'application/octet-stream'

            if ';' in mime_type:
                mime_type = mime_type.split(';')[0]

            file_extension = '.' + mime_type.split('/')[1]

        if not name:
            name = self.config['url'].split('/')[-1]

        if name.find(file_extension) == -1:
            name += file_extension

        file_path = path + '/' + name if path is not None else name

        if file_path == file_extension:
            url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')

            if url[-1] == '-':
                url = url[:-1]

            url += file_extension

            file_path = url

        file = open(file_path, 'wb')

    if isinstance(file_content, str):
        file_content = BytesIO(file_content.encode('utf-8'))
    elif isinstance(file_content, bytes):
        file_content = BytesIO(file_content)

    file_content.seek(0)
    with file as f:
        shutil.copyfileobj(file_content, f, length=131072)

    logger.info('file %s created' % file_path)
def upstream_result_into_response(self) ‑> Optional[requests.models.Response]
Expand source code
def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
    if _class != Response:
        raise RuntimeError('only Response from requests package is supported at the moment')

    if self.result is None:
        return None

    if self.response.status_code != 200:
        return None

    response = Response()
    response.status_code = self.scrape_result['status_code']
    response.reason = self.scrape_result['reason']

    if self.scrape_result['content']:
        if isinstance(self.scrape_result['content'], BytesIO):
            response._content = self.scrape_result['content'].getvalue()
        elif isinstance(self.scrape_result['content'], bytes):
            response._content = self.scrape_result['content']
        elif isinstance(self.scrape_result['content'], str):
            response._content = self.scrape_result['content'].encode('utf-8')
    else:
        response._content = None

    response.headers.update(self.scrape_result['response_headers'])
    response.url = self.scrape_result['url']

    response.request = Request(
        method=self.config['method'],
        url=self.config['url'],
        headers=self.scrape_result['request_headers'],
        data=self.config['body'] if self.config['body'] else None
    )

    if 'set-cookie' in response.headers:
        for raw_cookie in response.headers['set-cookie']:
            for name, cookie in SimpleCookie(raw_cookie).items():
                expires = cookie.get('expires')

                if expires == '':
                    expires = None

                if expires:
                    try:
                        expires = parse(expires).timestamp()
                    except ValueError:
                        expires = None

                if type(expires) == str:
                    if '.' in expires:
                        expires = float(expires)
                    else:
                        expires = int(expires)

                response.cookies.set_cookie(Cookie(
                    version=cookie.get('version') if cookie.get('version') else None,
                    name=name,
                    value=cookie.value,
                    path=cookie.get('path', ''),
                    expires=expires,
                    comment=cookie.get('comment'),
                    domain=cookie.get('domain', ''),
                    secure=cookie.get('secure'),
                    port=None,
                    port_specified=False,
                    domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
                    domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
                    path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
                    discard=False,
                    comment_url=None,
                    rest={
                        'httponly': cookie.get('httponly'),
                        'samesite': cookie.get('samesite'),
                        'max-age': cookie.get('max-age')
                    }
                ))

    return response
class ScrapeConfig (url: str, retry: bool = True, method: str = 'GET', country: Optional[str] = None, render_js: bool = False, cache: bool = False, cache_clear: bool = False, ssl: bool = False, dns: bool = False, asp: bool = False, debug: bool = False, raise_on_upstream_error: bool = True, cache_ttl: Optional[int] = None, proxy_pool: Optional[str] = None, session: Optional[str] = None, tags: Optional[Set[str]] = None, correlation_id: Optional[str] = None, cookies: Optional[requests.structures.CaseInsensitiveDict] = None, body: Optional[str] = None, data: Optional[Dict] = None, headers: Union[requests.structures.CaseInsensitiveDict, Dict[str, str], ForwardRef(None)] = None, js: str = None, rendering_wait: int = None, wait_for_selector: Optional[str] = None, screenshots: Optional[Dict] = None, session_sticky_proxy: Optional[bool] = None, webhook: Optional[str] = None, timeout: Optional[int] = None, js_scenario: Optional[Dict] = None, extract: Optional[Dict] = None, os: Optional[str] = None, lang: Optional[List[str]] = None, auto_scroll: Optional[bool] = None, cost_budget: Optional[int] = None)
Expand source code
class ScrapeConfig:

    PUBLIC_DATACENTER_POOL = 'public_datacenter_pool'
    PUBLIC_RESIDENTIAL_POOL = 'public_residential_pool'

    url: str
    retry: bool = True
    method: str = 'GET'
    country: Optional[str] = None
    render_js: bool = False
    cache: bool = False
    cache_clear:bool = False
    ssl:bool = False
    dns:bool = False
    asp:bool = False
    debug: bool = False
    raise_on_upstream_error:bool = True
    cache_ttl:Optional[int] = None
    proxy_pool:Optional[str] = None
    session: Optional[str] = None
    tags: Optional[List[str]] = None
    correlation_id: Optional[str] = None
    cookies: Optional[CaseInsensitiveDict] = None
    body: Optional[str] = None
    data: Optional[Dict] = None
    headers: Optional[CaseInsensitiveDict] = None
    js: str = None
    rendering_wait: int = None
    wait_for_selector: Optional[str] = None
    session_sticky_proxy:bool = True
    screenshots:Optional[Dict]=None
    webhook:Optional[str]=None
    timeout:Optional[int]=None # in milliseconds
    js_scenario: Dict = None
    extract: Dict = None
    lang:Optional[List[str]] = None
    os:Optional[str] = None
    auto_scroll:Optional[bool] = None
    cost_budget:Optional[int] = None

    def __init__(
        self,
        url: str,
        retry: bool = True,
        method: str = 'GET',
        country: Optional[str] = None,
        render_js: bool = False,
        cache: bool = False,
        cache_clear:bool = False,
        ssl:bool = False,
        dns:bool = False,
        asp:bool = False,
        debug: bool = False,
        raise_on_upstream_error:bool = True,
        cache_ttl:Optional[int] = None,
        proxy_pool:Optional[str] = None,
        session: Optional[str] = None,
        tags: Optional[Set[str]] = None,
        correlation_id: Optional[str] = None,
        cookies: Optional[CaseInsensitiveDict] = None,
        body: Optional[str] = None,
        data: Optional[Dict] = None,
        headers: Optional[Union[CaseInsensitiveDict, Dict[str, str]]] = None,
        js: str = None,
        rendering_wait: int = None,
        wait_for_selector: Optional[str] = None,
        screenshots:Optional[Dict]=None,
        session_sticky_proxy:Optional[bool] = None,
        webhook:Optional[str] = None,
        timeout:Optional[int] = None, # in milliseconds
        js_scenario:Optional[Dict] = None,
        extract:Optional[Dict] = None,
        os:Optional[str] = None,
        lang:Optional[List[str]] = None,
        auto_scroll:Optional[bool] = None,
        cost_budget:Optional[int] = None
    ):
        assert(type(url) is str)

        if isinstance(tags, List):
            tags = set(tags)

        cookies = cookies or {}
        headers = headers or {}

        self.cookies = CaseInsensitiveDict(cookies)
        self.headers = CaseInsensitiveDict(headers)
        self.url = url
        self.retry = retry
        self.method = method
        self.country = country
        self.session_sticky_proxy = session_sticky_proxy
        self.render_js = render_js
        self.cache = cache
        self.cache_clear = cache_clear
        self.asp = asp
        self.webhook = webhook
        self.session = session
        self.debug = debug
        self.cache_ttl = cache_ttl
        self.proxy_pool = proxy_pool
        self.tags = tags or set()
        self.correlation_id = correlation_id
        self.wait_for_selector = wait_for_selector
        self.body = body
        self.data = data
        self.js = js
        self.rendering_wait = rendering_wait
        self.raise_on_upstream_error = raise_on_upstream_error
        self.screenshots = screenshots
        self.key = None
        self.dns = dns
        self.ssl = ssl
        self.js_scenario = js_scenario
        self.timeout = timeout
        self.extract = extract
        self.lang = lang
        self.os = os
        self.auto_scroll = auto_scroll
        self.cost_budget = cost_budget

        if cookies:
            _cookies = []

            for name, value in cookies.items():
                _cookies.append(name + '=' + value)

            if 'cookie' in self.headers:
                if self.headers['cookie'][-1] != ';':
                    self.headers['cookie'] += ';'
            else:
                self.headers['cookie'] = ''

            self.headers['cookie'] += '; '.join(_cookies)

        if self.body and self.data:
            raise ScrapeConfigError('You cannot pass both parameters body and data. You must choose')

        if method in ['POST', 'PUT', 'PATCH']:
            if self.body is None and self.data is not None:
                if 'content-type' not in self.headers:
                    self.headers['content-type'] = 'application/x-www-form-urlencoded'
                    self.body = urlencode(data)
                else:
                    if self.headers['content-type'].find('application/json') != -1:
                        self.body = json.dumps(data)
                    elif self.headers['content-type'].find('application/x-www-form-urlencoded') != -1:
                        self.body = urlencode(data)
                    else:
                        raise ScrapeConfigError('Content-Type "%s" not supported, use body parameter to pass pre encoded body according to your content type' % self.headers['content-type'])
            elif self.body is None and self.data is None:
                self.headers['content-type'] = 'text/plain'

    def _bool_to_http(self, _bool:bool) -> str:
        return 'true' if _bool is True else 'false'

    def to_api_params(self, key:str) -> Dict:
        params = {
            'key': self.key if self.key is not None else key,
            'url': self.url
        }

        if self.country is not None:
            params['country'] = self.country

        for name, value in self.headers.items():
            params['headers[%s]' % name] = value

        if self.webhook is not None:
            params['webhook_name'] = self.webhook

        if self.timeout is not None:
            params['timeout'] = self.timeout

        if self.extract is not None:
            params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8')

        if self.cost_budget is not None:
            params['cost_budget'] = self.cost_budget

        if self.render_js is True:
            params['render_js'] = self._bool_to_http(self.render_js)

            if self.wait_for_selector is not None:
                params['wait_for_selector'] = self.wait_for_selector

            if self.js:
                params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

            if self.js_scenario:
                params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8')

            if self.rendering_wait:
                params['rendering_wait'] = self.rendering_wait

            if self.screenshots is not None:
                for name, element in self.screenshots.items():
                    params['screenshots[%s]' % name] = element

            if self.auto_scroll is True:
                params['auto_scroll'] = self._bool_to_http(self.auto_scroll)
        else:
            if self.wait_for_selector is not None:
                logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled')

            if self.screenshots:
                logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled')

            if self.js_scenario:
                logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled')

            if self.js:
                logging.warning('Params "js" is ignored. Works only if render_js is enabled')

            if self.rendering_wait:
                logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled')

        if self.asp is True:
            params['asp'] = self._bool_to_http(self.asp)

        if self.retry is False:
            params['retry'] = self._bool_to_http(self.retry)

        if self.cache is True:
            params['cache'] = self._bool_to_http(self.cache)

            if self.cache_clear is True:
                params['cache_clear'] = self._bool_to_http(self.cache_clear)

            if self.cache_ttl is not None:
                params['cache_ttl'] = self.cache_ttl
        else:
            if self.cache_clear is True:
                logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

            if self.cache_ttl is not None:
                logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

        if self.dns is True:
            params['dns'] = self._bool_to_http(self.dns)

        if self.ssl is True:
            params['ssl'] = self._bool_to_http(self.ssl)

        if self.tags:
            params['tags'] = ','.join(self.tags)

        if self.correlation_id:
            params['correlation_id'] = self.correlation_id

        if self.session:
            params['session'] = self.session

            if self.session_sticky_proxy is True: # false by default
                params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy)
        else:
            if self.session_sticky_proxy:
                logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled')

        if self.debug is True:
            params['debug'] = self._bool_to_http(self.debug)

        if self.proxy_pool is not None:
            params['proxy_pool'] = self.proxy_pool

        if self.lang is not None:
            params['lang'] = ','.join(self.lang)

        if self.os is not None:
            params['os'] = self.os

        return params

    @staticmethod
    def from_exported_config(config:str) -> 'ScrapeConfig':
        try:
            from msgpack import loads as msgpack_loads
        except ImportError as e:
            print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack')
            raise

        data = msgpack_loads(base64.b64decode(config))

        headers = {}

        for name, value in data['headers'].items():
            if isinstance(value, Iterable):
                headers[name] = '; '.join(value)
            else:
                headers[name] = value

        return ScrapeConfig(
            url=data['url'],
            retry=data['retry'],
            headers=headers,
            session=data['session'],
            session_sticky_proxy=data['session_sticky_proxy'],
            cache=data['cache'],
            cache_ttl=data['cache_ttl'],
            cache_clear=data['cache_clear'],
            render_js=data['render_js'],
            method=data['method'],
            asp=data['asp'],
            body=data['body'],
            ssl=data['ssl'],
            dns=data['dns'],
            country=data['country'],
            debug=data['debug'],
            correlation_id=data['correlation_id'],
            tags=data['tags'],
            js=data['js'],
            rendering_wait=data['rendering_wait'],
            screenshots=data['screenshots'] or {},
            proxy_pool=data['proxy_pool'],
            auto_scroll=data['auto_scroll'],
            cost_budget=data['cost_budget']
        )

Class variables

var PUBLIC_DATACENTER_POOL
var PUBLIC_RESIDENTIAL_POOL
var asp : bool
var auto_scroll : Optional[bool]
var body : Optional[str]
var cache : bool
var cache_clear : bool
var cache_ttl : Optional[int]
var cookies : Optional[requests.structures.CaseInsensitiveDict]
var correlation_id : Optional[str]
var cost_budget : Optional[int]
var country : Optional[str]
var data : Optional[Dict]
var debug : bool
var dns : bool
var extract : Dict
var headers : Optional[requests.structures.CaseInsensitiveDict]
var js : str
var js_scenario : Dict
var lang : Optional[List[str]]
var method : str
var os : Optional[str]
var proxy_pool : Optional[str]
var raise_on_upstream_error : bool
var render_js : bool
var rendering_wait : int
var retry : bool
var screenshots : Optional[Dict]
var session : Optional[str]
var session_sticky_proxy : bool
var ssl : bool
var tags : Optional[List[str]]
var timeout : Optional[int]
var url : str
var wait_for_selector : Optional[str]
var webhook : Optional[str]

Static methods

def from_exported_config(config: str) ‑> ScrapeConfig
Expand source code
@staticmethod
def from_exported_config(config:str) -> 'ScrapeConfig':
    try:
        from msgpack import loads as msgpack_loads
    except ImportError as e:
        print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack')
        raise

    data = msgpack_loads(base64.b64decode(config))

    headers = {}

    for name, value in data['headers'].items():
        if isinstance(value, Iterable):
            headers[name] = '; '.join(value)
        else:
            headers[name] = value

    return ScrapeConfig(
        url=data['url'],
        retry=data['retry'],
        headers=headers,
        session=data['session'],
        session_sticky_proxy=data['session_sticky_proxy'],
        cache=data['cache'],
        cache_ttl=data['cache_ttl'],
        cache_clear=data['cache_clear'],
        render_js=data['render_js'],
        method=data['method'],
        asp=data['asp'],
        body=data['body'],
        ssl=data['ssl'],
        dns=data['dns'],
        country=data['country'],
        debug=data['debug'],
        correlation_id=data['correlation_id'],
        tags=data['tags'],
        js=data['js'],
        rendering_wait=data['rendering_wait'],
        screenshots=data['screenshots'] or {},
        proxy_pool=data['proxy_pool'],
        auto_scroll=data['auto_scroll'],
        cost_budget=data['cost_budget']
    )

Methods

def to_api_params(self, key: str) ‑> Dict
Expand source code
def to_api_params(self, key:str) -> Dict:
    params = {
        'key': self.key if self.key is not None else key,
        'url': self.url
    }

    if self.country is not None:
        params['country'] = self.country

    for name, value in self.headers.items():
        params['headers[%s]' % name] = value

    if self.webhook is not None:
        params['webhook_name'] = self.webhook

    if self.timeout is not None:
        params['timeout'] = self.timeout

    if self.extract is not None:
        params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8')

    if self.cost_budget is not None:
        params['cost_budget'] = self.cost_budget

    if self.render_js is True:
        params['render_js'] = self._bool_to_http(self.render_js)

        if self.wait_for_selector is not None:
            params['wait_for_selector'] = self.wait_for_selector

        if self.js:
            params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

        if self.js_scenario:
            params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8')

        if self.rendering_wait:
            params['rendering_wait'] = self.rendering_wait

        if self.screenshots is not None:
            for name, element in self.screenshots.items():
                params['screenshots[%s]' % name] = element

        if self.auto_scroll is True:
            params['auto_scroll'] = self._bool_to_http(self.auto_scroll)
    else:
        if self.wait_for_selector is not None:
            logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled')

        if self.screenshots:
            logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled')

        if self.js_scenario:
            logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled')

        if self.js:
            logging.warning('Params "js" is ignored. Works only if render_js is enabled')

        if self.rendering_wait:
            logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled')

    if self.asp is True:
        params['asp'] = self._bool_to_http(self.asp)

    if self.retry is False:
        params['retry'] = self._bool_to_http(self.retry)

    if self.cache is True:
        params['cache'] = self._bool_to_http(self.cache)

        if self.cache_clear is True:
            params['cache_clear'] = self._bool_to_http(self.cache_clear)

        if self.cache_ttl is not None:
            params['cache_ttl'] = self.cache_ttl
    else:
        if self.cache_clear is True:
            logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

        if self.cache_ttl is not None:
            logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

    if self.dns is True:
        params['dns'] = self._bool_to_http(self.dns)

    if self.ssl is True:
        params['ssl'] = self._bool_to_http(self.ssl)

    if self.tags:
        params['tags'] = ','.join(self.tags)

    if self.correlation_id:
        params['correlation_id'] = self.correlation_id

    if self.session:
        params['session'] = self.session

        if self.session_sticky_proxy is True: # false by default
            params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy)
    else:
        if self.session_sticky_proxy:
            logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled')

    if self.debug is True:
        params['debug'] = self._bool_to_http(self.debug)

    if self.proxy_pool is not None:
        params['proxy_pool'] = self.proxy_pool

    if self.lang is not None:
        params['lang'] = ','.join(self.lang)

    if self.os is not None:
        params['os'] = self.os

    return params
class ScrapflyAspError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyAspError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyClient (key: str, host: Optional[str] = 'https://api.scrapfly.io', verify=True, debug: bool = False, max_concurrency: int = 1, connect_timeout: int = 30, read_timeout: int = 160, reporter: Optional[Callable] = None, **kwargs)
Expand source code
class ScrapflyClient:

    HOST = 'https://api.scrapfly.io'
    DEFAULT_CONNECT_TIMEOUT = 30
    DEFAULT_READ_TIMEOUT = 160 # 155 real

    host:str
    key:str
    max_concurrency:int
    verify:bool
    debug:bool
    distributed_mode:bool
    connect_timeout:int
    read_timeout:int
    brotli: bool
    reporter:Reporter
    version:str

    CONCURRENCY_AUTO = 'auto' # retrieve the allowed concurrency from your account

    def __init__(
        self,
        key: str,
        host: Optional[str] = HOST,
        verify=True,
        debug: bool = False,
        max_concurrency:int=1,
        connect_timeout:int = DEFAULT_CONNECT_TIMEOUT,
        read_timeout:int = DEFAULT_READ_TIMEOUT,
        reporter:Optional[Callable]=None,
        **kwargs
    ):
        if host[-1] == '/':  # remove last '/' if exists
            host = host[:-1]

        if 'distributed_mode' in kwargs:
            warnings.warn("distributed mode is deprecated and will be remove the next version -"
              " user should handle themself the session name based on the concurrency",
              DeprecationWarning,
              stacklevel=2
            )

        if 'brotli' in kwargs:
            warnings.warn("brotli arg is deprecated and will be remove the next version - "
                "brotli is disabled by default",
                DeprecationWarning,
                stacklevel=2
            )

        self.version = __version__
        self.host = host
        self.key = key
        self.verify = verify
        self.debug = debug
        self.connect_timeout = connect_timeout
        self.read_timeout = read_timeout
        self.max_concurrency = max_concurrency
        self.body_handler = ResponseBodyHandler(use_brotli=False)
        self.async_executor = ThreadPoolExecutor()
        self.http_session = None

        if not self.verify and not self.HOST.endswith('.local'):
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        if self.debug is True:
            http.client.HTTPConnection.debuglevel = 5

        if reporter is None:
            from .reporter import NoopReporter

            reporter = NoopReporter()

        self.reporter = Reporter(reporter)

    @property
    def ua(self) -> str:
        return 'ScrapflySDK/%s (Python %s, %s, %s)' % (
            self.version,
            platform.python_version(),
            platform.uname().system,
            platform.uname().machine
        )

    @cached_property
    def _http_handler(self):
        return partial(self.http_session.request if self.http_session else requests.request)

    @property
    def http(self):
        return self._http_handler

    def _scrape_request(self, scrape_config:ScrapeConfig):
        return {
            'method': scrape_config.method,
            'url': self.host + '/scrape',
            'data': scrape_config.body,
            'verify': self.verify,
            'timeout': (self.connect_timeout, self.read_timeout),
            'headers': {
                'content-type': scrape_config.headers['content-type'] if scrape_config.method in ['POST', 'PUT', 'PATCH'] else self.body_handler.content_type,
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
            'params': scrape_config.to_api_params(key=self.key)
        }

    def account(self) -> Union[str, Dict]:
        response = self._http_handler(
            method='GET',
            url=self.host + '/account',
            params={'key': self.key},
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content)

        return response.content.decode('utf-8')

    def resilient_scrape(
        self,
        scrape_config:ScrapeConfig,
        retry_on_errors:Set[Exception]={ScrapflyError},
        retry_on_status_code:Optional[List[int]]=None,
        tries: int = 5,
        delay: int = 20,
    ) -> ScrapeApiResponse:
        assert retry_on_errors is not None, 'Retry on error is None'
        assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()'

        @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay)
        def inner() -> ScrapeApiResponse:

            try:
                return self.scrape(scrape_config=scrape_config)
            except (UpstreamHttpClientError, UpstreamHttpServerError) as e:
                if retry_on_status_code is not None and e.api_response:
                    if e.api_response.upstream_status_code in retry_on_status_code:
                        raise e
                    else:
                        return e.api_response

                raise e

        return inner()

    def open(self):
        if self.http_session is None:
            self.http_session = Session()
            self.http_session.verify = self.verify
            self.http_session.timeout = (self.connect_timeout, self.read_timeout)
            self.http_session.params['key'] = self.key
            self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding
            self.http_session.headers['accept'] = self.body_handler.accept
            self.http_session.headers['user-agent'] = self.ua

    def close(self):
        self.http_session.close()
        self.http_session = None

    def __enter__(self) -> 'ScrapflyClient':
        self.open()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)

    async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None):
        if concurrency is None:
            concurrency = self.max_concurrency
        elif concurrency == self.CONCURRENCY_AUTO:
            concurrency = self.account()['subscription']['max_concurrency']

        loop = asyncio.get_running_loop()
        processing_tasks = []
        results = []
        processed_tasks = 0
        expected_tasks = len(scrape_configs)

        def scrape_done_callback(task:Task):
            nonlocal processed_tasks

            try:
                if task.cancelled() is True:
                    return

                error = task.exception()

                if error is not None:
                    results.append(error)
                else:
                    results.append(task.result())
            finally:
                processing_tasks.remove(task)
                processed_tasks += 1

        while scrape_configs or results or processing_tasks:
            logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

            if scrape_configs:
                if len(processing_tasks) < concurrency:
                    # @todo handle backpressure
                    for _ in range(0, concurrency - len(processing_tasks)):
                        try:
                            scrape_config = scrape_configs.pop()
                        except:
                            break

                        scrape_config.raise_on_upstream_error = False
                        task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop))
                        processing_tasks.append(task)
                        task.add_done_callback(scrape_done_callback)

            for _ in results:
                result = results.pop()
                yield result

            await asyncio.sleep(.5)

        logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def scrape(self, scrape_config:ScrapeConfig) -> ScrapeApiResponse:

        try:
            logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url))
            request_data = self._scrape_request(scrape_config=scrape_config)
            response = self._http_handler(**request_data)
            scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config)

            self.reporter.report(scrape_api_response=scrape_api_response)

            return scrape_api_response
        except BaseException as e:
            self.reporter.report(error=e)
            raise e

    def _handle_response(self, response:Response, scrape_config:ScrapeConfig) -> ScrapeApiResponse:
        try:
            api_response = self._handle_api_response(
                response=response,
                scrape_config=scrape_config,
                raise_on_upstream_error=scrape_config.raise_on_upstream_error
            )

            if scrape_config.method == 'HEAD':
                logger.debug('<-- [%s %s] %s | %ss' % (
                    api_response.response.status_code,
                    api_response.response.reason,
                    api_response.response.request.url,
                    0
                ))
            else:
                logger.debug('<-- [%s %s] %s | %ss' % (
                    api_response.result['result']['status_code'],
                    api_response.result['result']['reason'],
                    api_response.result['config']['url'],
                    api_response.result['result']['duration'])
                )

                logger.debug('Log url: %s' % api_response.result['result']['log_url'])

            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except ScrapflyScrapeError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise

    def save_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None):

        if not api_response.scrape_result['screenshots']:
            raise RuntimeError('Screenshot %s do no exists' % name)

        try:
            api_response.scrape_result['screenshots'][name]
        except KeyError:
            raise RuntimeError('Screenshot %s do no exists' % name)

        screenshot_response = self._http_handler(
            method='GET',
            url=api_response.scrape_result['screenshots'][name]['url'],
            params={'key': self.key},
            verify=self.verify
        )

        screenshot_response.raise_for_status()

        if not name.endswith('.jpg'):
            name += '.jpg'

        api_response.sink(path=path, name=name, content=screenshot_response.content)

    def screenshot(self, url:str, path:Optional[str]=None, name:Optional[str]=None) -> str:
        # for advance configuration, take screenshots via scrape method with ScrapeConfig
        api_response = self.scrape(scrape_config=ScrapeConfig(
            url=url,
            render_js=True,
            screenshots={'main': 'fullpage'}
        ))

        name = name or 'main.jpg'

        if not name.endswith('.jpg'):
            name += '.jpg'

        response = self._http_handler(
            method='GET',
            url=api_response.scrape_result['screenshots']['main']['url'],
            params={'key': self.key}
        )

        response.raise_for_status()

        return self.sink(api_response, path=path, name=name, content=response.content)

    def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str:
        scrape_result = api_response.result['result']
        scrape_config = api_response.result['config']

        file_content = content or scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = scrape_config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)
        return file_path

    def _handle_api_response(
        self,
        response: Response,
        scrape_config:ScrapeConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ScrapeApiResponse:

        if scrape_config.method == 'HEAD':
            body = None
        else:
            if self.body_handler.support(headers=response.headers):
                body = self.body_handler(response.content)
            else:
                body = response.content.decode('utf-8')

        api_response:ScrapeApiResponse = ScrapeApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            scrape_config=scrape_config
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

Class variables

var CONCURRENCY_AUTO
var DEFAULT_CONNECT_TIMEOUT
var DEFAULT_READ_TIMEOUT
var HOST
var brotli : bool
var connect_timeout : int
var debug : bool
var distributed_mode : bool
var host : str
var key : str
var max_concurrency : int
var read_timeout : int
var reporter : scrapfly.reporter.Reporter
var verify : bool
var version : str

Instance variables

var http
Expand source code
@property
def http(self):
    return self._http_handler
var ua : str
Expand source code
@property
def ua(self) -> str:
    return 'ScrapflySDK/%s (Python %s, %s, %s)' % (
        self.version,
        platform.python_version(),
        platform.uname().system,
        platform.uname().machine
    )

Methods

def account(self) ‑> Union[str, Dict]
Expand source code
def account(self) -> Union[str, Dict]:
    response = self._http_handler(
        method='GET',
        url=self.host + '/account',
        params={'key': self.key},
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content)

    return response.content.decode('utf-8')
async def async_scrape(self, scrape_config: ScrapeConfig, loop: Optional[asyncio.events.AbstractEventLoop] = None) ‑> ScrapeApiResponse
Expand source code
async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)
def close(self)
Expand source code
def close(self):
    self.http_session.close()
    self.http_session = None
async def concurrent_scrape(self, scrape_configs: List[ScrapeConfig], concurrency: Optional[int] = None)
Expand source code
async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None):
    if concurrency is None:
        concurrency = self.max_concurrency
    elif concurrency == self.CONCURRENCY_AUTO:
        concurrency = self.account()['subscription']['max_concurrency']

    loop = asyncio.get_running_loop()
    processing_tasks = []
    results = []
    processed_tasks = 0
    expected_tasks = len(scrape_configs)

    def scrape_done_callback(task:Task):
        nonlocal processed_tasks

        try:
            if task.cancelled() is True:
                return

            error = task.exception()

            if error is not None:
                results.append(error)
            else:
                results.append(task.result())
        finally:
            processing_tasks.remove(task)
            processed_tasks += 1

    while scrape_configs or results or processing_tasks:
        logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

        if scrape_configs:
            if len(processing_tasks) < concurrency:
                # @todo handle backpressure
                for _ in range(0, concurrency - len(processing_tasks)):
                    try:
                        scrape_config = scrape_configs.pop()
                    except:
                        break

                    scrape_config.raise_on_upstream_error = False
                    task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop))
                    processing_tasks.append(task)
                    task.add_done_callback(scrape_done_callback)

        for _ in results:
            result = results.pop()
            yield result

        await asyncio.sleep(.5)

    logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))
def open(self)
Expand source code
def open(self):
    if self.http_session is None:
        self.http_session = Session()
        self.http_session.verify = self.verify
        self.http_session.timeout = (self.connect_timeout, self.read_timeout)
        self.http_session.params['key'] = self.key
        self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding
        self.http_session.headers['accept'] = self.body_handler.accept
        self.http_session.headers['user-agent'] = self.ua
def resilient_scrape(self, scrape_config: ScrapeConfig, retry_on_errors: Set[Exception] = {<class 'scrapfly.errors.ScrapflyError'>}, retry_on_status_code: Optional[List[int]] = None, tries: int = 5, delay: int = 20) ‑> ScrapeApiResponse
Expand source code
def resilient_scrape(
    self,
    scrape_config:ScrapeConfig,
    retry_on_errors:Set[Exception]={ScrapflyError},
    retry_on_status_code:Optional[List[int]]=None,
    tries: int = 5,
    delay: int = 20,
) -> ScrapeApiResponse:
    assert retry_on_errors is not None, 'Retry on error is None'
    assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()'

    @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay)
    def inner() -> ScrapeApiResponse:

        try:
            return self.scrape(scrape_config=scrape_config)
        except (UpstreamHttpClientError, UpstreamHttpServerError) as e:
            if retry_on_status_code is not None and e.api_response:
                if e.api_response.upstream_status_code in retry_on_status_code:
                    raise e
                else:
                    return e.api_response

            raise e

    return inner()
def save_screenshot(self, api_response: ScrapeApiResponse, name: str, path: Optional[str] = None)
Expand source code
def save_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None):

    if not api_response.scrape_result['screenshots']:
        raise RuntimeError('Screenshot %s do no exists' % name)

    try:
        api_response.scrape_result['screenshots'][name]
    except KeyError:
        raise RuntimeError('Screenshot %s do no exists' % name)

    screenshot_response = self._http_handler(
        method='GET',
        url=api_response.scrape_result['screenshots'][name]['url'],
        params={'key': self.key},
        verify=self.verify
    )

    screenshot_response.raise_for_status()

    if not name.endswith('.jpg'):
        name += '.jpg'

    api_response.sink(path=path, name=name, content=screenshot_response.content)
def scrape(self, scrape_config: ScrapeConfig) ‑> ScrapeApiResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def scrape(self, scrape_config:ScrapeConfig) -> ScrapeApiResponse:

    try:
        logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url))
        request_data = self._scrape_request(scrape_config=scrape_config)
        response = self._http_handler(**request_data)
        scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config)

        self.reporter.report(scrape_api_response=scrape_api_response)

        return scrape_api_response
    except BaseException as e:
        self.reporter.report(error=e)
        raise e
def screenshot(self, url: str, path: Optional[str] = None, name: Optional[str] = None) ‑> str
Expand source code
def screenshot(self, url:str, path:Optional[str]=None, name:Optional[str]=None) -> str:
    # for advance configuration, take screenshots via scrape method with ScrapeConfig
    api_response = self.scrape(scrape_config=ScrapeConfig(
        url=url,
        render_js=True,
        screenshots={'main': 'fullpage'}
    ))

    name = name or 'main.jpg'

    if not name.endswith('.jpg'):
        name += '.jpg'

    response = self._http_handler(
        method='GET',
        url=api_response.scrape_result['screenshots']['main']['url'],
        params={'key': self.key}
    )

    response.raise_for_status()

    return self.sink(api_response, path=path, name=name, content=response.content)
def sink(self, api_response: ScrapeApiResponse, content: Union[str, bytes, ForwardRef(None)] = None, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None) ‑> str
Expand source code
def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str:
    scrape_result = api_response.result['result']
    scrape_config = api_response.result['config']

    file_content = content or scrape_result['content']
    file_path = None
    file_extension = None

    if name:
        name_parts = name.split('.')
        if len(name_parts) > 1:
            file_extension = name_parts[-1]

    if not file:
        if file_extension is None:
            try:
                mime_type = scrape_result['response_headers']['content-type']
            except KeyError:
                mime_type = 'application/octet-stream'

            if ';' in mime_type:
                mime_type = mime_type.split(';')[0]

            file_extension = '.' + mime_type.split('/')[1]

        if not name:
            name = scrape_config['url'].split('/')[-1]

        if name.find(file_extension) == -1:
            name += file_extension

        file_path = path + '/' + name if path else name

        if file_path == file_extension:
            url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-')

            if url[-1] == '-':
                url = url[:-1]

            url += file_extension

            file_path = url

        file = open(file_path, 'wb')

    if isinstance(file_content, str):
        file_content = BytesIO(file_content.encode('utf-8'))
    elif isinstance(file_content, bytes):
        file_content = BytesIO(file_content)

    file_content.seek(0)
    with file as f:
        shutil.copyfileobj(file_content, f, length=131072)

    logger.info('file %s created' % file_path)
    return file_path
class ScrapflyError (message: str, code: str, http_status_code: int, resource: Optional[str] = None, is_retryable: bool = False, retry_delay: Optional[int] = None, retry_times: Optional[int] = None, documentation_url: Optional[str] = None, api_response: Optional[ForwardRef('ApiResponse')] = None)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyError(Exception):
    KIND_HTTP_BAD_RESPONSE = 'HTTP_BAD_RESPONSE'
    KIND_SCRAPFLY_ERROR = 'SCRAPFLY_ERROR'

    RESOURCE_PROXY = 'PROXY'
    RESOURCE_THROTTLE = 'THROTTLE'
    RESOURCE_SCRAPE = 'SCRAPE'
    RESOURCE_ASP = 'ASP'
    RESOURCE_SCHEDULE = 'SCHEDULE'
    RESOURCE_WEBHOOK = 'WEBHOOK'
    RESOURCE_SESSION = 'SESSION'

    def __init__(
        self,
        message: str,
        code: str,
        http_status_code: int,
        resource: Optional[str]=None,
        is_retryable: bool = False,
        retry_delay: Optional[int] = None,
        retry_times: Optional[int] = None,
        documentation_url: Optional[str] = None,
        api_response: Optional['ApiResponse'] = None
    ):
        self.message = message
        self.code = code
        self.retry_delay = retry_delay
        self.retry_times = retry_times
        self.resource = resource
        self.is_retryable = is_retryable
        self.documentation_url = documentation_url
        self.api_response = api_response
        self.http_status_code = http_status_code

        super().__init__(self.message, str(self.code))

    def __str__(self):
        message = self.message

        if self.documentation_url is not None:
            message += '. Learn more: %s' % self.documentation_url

        return message

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

  • scrapfly.errors.ExtraUsageForbidden
  • scrapfly.errors.HttpError

Class variables

var KIND_HTTP_BAD_RESPONSE
var KIND_SCRAPFLY_ERROR
var RESOURCE_ASP
var RESOURCE_PROXY
var RESOURCE_SCHEDULE
var RESOURCE_SCRAPE
var RESOURCE_SESSION
var RESOURCE_THROTTLE
var RESOURCE_WEBHOOK
class ScrapflyProxyError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyProxyError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyScheduleError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyScheduleError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyScrapeError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyScrapeError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflySessionError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflySessionError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyThrottleError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyThrottleError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyWebhookError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ScrapflyWebhookError(ScraperAPIError):
    pass

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class UpstreamHttpClientError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class UpstreamHttpClientError(UpstreamHttpError):
    pass

Ancestors

  • scrapfly.errors.UpstreamHttpError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

class UpstreamHttpError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class UpstreamHttpError(HttpError):
    pass

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

class UpstreamHttpServerError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class UpstreamHttpServerError(UpstreamHttpClientError):
    pass

Ancestors