Module scrapfly.api_response

Expand source code
import re
import logging as logger
import shutil

from base64 import b64decode
from contextlib import suppress
from datetime import datetime
from functools import partial

try:
    from functools import cached_property
except ImportError:
    from .polyfill.cached_property import cached_property

from http.cookiejar import Cookie
from http.cookies import SimpleCookie
from io import BytesIO
from json import JSONDecoder, loads

from dateutil.parser import parse
from requests import Request, Response, HTTPError
from typing import Dict, Optional, Iterable, Union, TextIO

from requests.structures import CaseInsensitiveDict

from .scrape_config import ScrapeConfig
from .errors import ErrorFactory, EncoderError, ApiHttpClientError, ApiHttpServerError, UpstreamHttpError, HttpError, ExtraUsageForbidden
from .frozen_dict import FrozenDict

logger.getLogger(__name__)

_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'


def _date_parser(value):
    if isinstance(value, Dict):
        over = value.items()
    else:
        over = enumerate(value)

    for k, v in over:
        if isinstance(v, str):
            if len(v) <= 26:
                try:
                    value[k] = datetime.strptime(v, _DATE_FORMAT)
                except ValueError:
                    value[k] = v
            else:
                value[k] = v
        elif isinstance(v, Iterable):
            value[k] = _date_parser(v)
        else:
            value[k] = v

    return value


class ResponseBodyHandler:

    SUPPORTED_COMPRESSION = ['gzip', 'deflate']
    SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json']

    class JSONDateTimeDecoder(JSONDecoder):
        def __init__(self, *args, **kargs):
            JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs)

    # brotli under perform at same gzip level and upper level destroy the cpu so
    # the trade off do not worth it for most of usage
    def __init__(self, use_brotli:bool=False):
        if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION:
            try:
                try:
                    import brotlicffi as brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
                except ImportError:
                    import brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
            except ImportError:
                pass

        self.content_encoding = ', '.join(self.SUPPORTED_COMPRESSION)

        try:  # automatically use msgpack if available https://msgpack.org/
            import msgpack
            self.accept = 'application/msgpack;charset=utf-8'
            self.content_type = 'application/msgpack;charset=utf-8'
            self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)
        except ImportError:
            self.accept = 'application/json;charset=utf-8'
            self.content_type = 'application/json;charset=utf-8'
            self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder)

    def support(self, headers:Dict) -> bool:
        if 'content-type' not in headers:
            return False

        for content_type in self.SUPPORTED_CONTENT_TYPES:
            if headers['content-type'].find(content_type) != -1:
                return True

        return False

    def __call__(self, content: bytes) -> Union[str, Dict]:
        try:
            return self.content_loader(content)
        except Exception as e:
            raise EncoderError(content=content.decode('utf-8')) from e


class ScrapeApiResponse:

    def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None):
        self.request = request
        self.response = response
        self.scrape_config = scrape_config

        if self.scrape_config.method == 'HEAD':
            api_result = {
                'result': {
                    'request_headers': {},
                    'status': 'DONE',
                    'success': 200 >= self.response.status_code < 300,
                    'response_headers': self.response.headers,
                    'status_code': self.response.status_code,
                    'reason': self.response.reason,
                    'format': 'text',
                    'content': ''
                },
                'context': {},
                'config': self.scrape_config.__dict__
            }

            if 'X-Scrapfly-Reject-Code' in self.response.headers:
                api_result['result']['error'] = {
                    'code': self.response.headers['X-Scrapfly-Reject-Code'],
                    'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']),
                    'message': self.response.headers['X-Scrapfly-Reject-Description'],
                    'error_id': self.response.headers['X-Scrapfly-Reject-ID'],
                    'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False,
                    'doc_url': '',
                    'links': {}
                }

                if 'X-Scrapfly-Reject-Doc' in self.response.headers:
                    api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc']
                    api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc']

        if isinstance(api_result, str):
            raise HttpError(
                request=request,
                response=response,
                message='Bad gateway',
                code=502,
                http_status_code=502,
                is_retryable=True
            )

        self.result = self.handle_api_result(api_result=api_result)

    @property
    def scrape_result(self) -> Dict:
        return self.result['result']

    @property
    def config(self) -> Dict:
        return self.result['config']

    @property
    def context(self) -> Dict:
        return self.result['context']

    @property
    def content(self) -> str:
        return self.scrape_result['content']

    @property
    def success(self) -> bool:
        """
            /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
        """
        return 200 >= self.response.status_code <= 299

    @property
    def scrape_success(self) -> bool:
        return self.scrape_result['success']

    @property
    def error(self) -> Optional[Dict]:
        if self.scrape_success is False:
            return self.scrape_result['error']

    @property
    def status_code(self) -> int:
        """
            /!\ This is the status code of our API, not the upstream website
        """
        return self.response.status_code

    @property
    def upstream_status_code(self) -> Optional[int]:
        if 'status_code' in self.scrape_result:
            return self.scrape_result['status_code']

        return None

    def prevent_extra_usage(self):
        if self.remaining_quota == 0:
            raise ExtraUsageForbidden(
                message='All Pre Paid Quota Used',
                code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE',
                http_status_code=429,
                is_retryable=False
            )

    @property
    def remaining_quota(self) -> Optional[int]:
        remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape')

        if remaining_scrape:
            remaining_scrape = int(remaining_scrape)

        return remaining_scrape

    @property
    def cost(self) -> Optional[int]:
        cost = self.response.headers.get('X-Scrapfly-Api-Cost')

        if cost:
            cost = int(cost)

        return cost

    @property
    def duration_ms(self) -> Optional[float]:
        duration = self.response.headers.get('X-Scrapfly-Response-Time')

        if duration:
            duration = float(duration)

        return duration

    @property
    def headers(self) -> CaseInsensitiveDict:
        return self.response.headers

    def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        try:
            if isinstance(api_result['config']['headers'], list):
                api_result['config']['headers'] = {}
        except TypeError:
            logger.info(api_result)
            raise

        with suppress(KeyError):
            api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
            api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])

        if api_result['result']['format'] == 'binary' and api_result['result']['content']:
            api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content']))

        return FrozenDict(api_result)

    @cached_property
    def soup(self) -> 'BeautifulSoup':
        try:
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(self.content, "lxml")
            return soup
        except ImportError as e:
            logger.error('You must install scrapfly[parser] to enable this feature')

    @cached_property
    def selector(self) -> 'Selector':
        try:
            from scrapy import Selector
            return Selector(text=self.content)
        except ImportError as e:
            logger.error('You must install scrapfly[scrapy] to enable this feature')
            raise e

    @property
    def error_message(self) :
        if self.error:
            message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message'])

            if self.error['links']:
                message += "Checkout the related doc: %s" % list(self.error['links'].values())[0]

            return message

        return '<-- %s - %s %s | Doc: %s' % (self.response.status_code, self.http_status_code, self.code, self.documentation_url)

    def _is_api_error(self, api_result: Dict) -> bool:
        if self.scrape_config.method == 'HEAD':
            if 'X-Reject-Reason' in self.response.headers:
                return True
            return False

        if api_result is None:
            return True

        return 'error_id' in api_result

    def raise_for_result(self, raise_on_upstream_error: bool = True):

        try:
            self.response.raise_for_status()
        except HTTPError as e:
            if 'http_code' in self.result:
                if e.response.status_code >= 500:
                    raise ApiHttpServerError(
                        request=e.request,
                        response=e.response,
                        message=self.result['message'],
                        code='',
                        resource='',
                        http_status_code=e.response.status_code,
                        documentation_url=self.result.get('links')
                    ) from e
                else:
                    raise ApiHttpClientError(
                        request=e.request,
                        response=e.response,
                        message=self.result['message'],
                        code='',
                        resource='API',
                        http_status_code=self.result['http_code'],
                        documentation_url=self.result.get('links')
                    ) from e

        if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
            error = ErrorFactory.create(api_response=self)

            if error:
                if isinstance(error, UpstreamHttpError):
                    if raise_on_upstream_error is True:
                        raise error
                else:
                    raise error

    def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
        if _class != Response:
            raise RuntimeError('only Response from requests package is supported at the moment')

        if self.result is None:
            return None

        if self.response.status_code != 200:
            return None

        response = Response()
        response.status_code = self.scrape_result['status_code']
        response.reason = self.scrape_result['reason']

        if self.scrape_result['content']:
            if isinstance(self.scrape_result['content'], BytesIO):
                response._content = self.scrape_result['content'].getvalue()
            elif isinstance(self.scrape_result['content'], bytes):
                response._content = self.scrape_result['content']
            elif isinstance(self.scrape_result['content'], str):
                response._content = self.scrape_result['content'].encode('utf-8')
        else:
            response._content = None
        
        response.headers.update(self.scrape_result['response_headers'])
        response.url = self.scrape_result['url']

        response.request = Request(
            method=self.config['method'],
            url=self.config['url'],
            headers=self.scrape_result['request_headers'],
            data=self.config['body'] if self.config['body'] else None
        )

        if 'set-cookie' in response.headers:
            for raw_cookie in response.headers['set-cookie']:
                for name, cookie in SimpleCookie(raw_cookie).items():
                    expires = cookie.get('expires')

                    if expires == '':
                        expires = None

                    if expires:
                        try:
                            expires = parse(expires).timestamp()
                        except ValueError:
                            expires = None

                    if type(expires) == str:
                        if '.' in expires:
                            expires = float(expires)
                        else:
                            expires = int(expires)

                    response.cookies.set_cookie(Cookie(
                        version=cookie.get('version') if cookie.get('version') else None,
                        name=name,
                        value=cookie.value,
                        path=cookie.get('path', ''),
                        expires=expires,
                        comment=cookie.get('comment'),
                        domain=cookie.get('domain', ''),
                        secure=cookie.get('secure'),
                        port=None,
                        port_specified=False,
                        domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
                        domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
                        path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
                        discard=False,
                        comment_url=None,
                        rest={
                            'httponly': cookie.get('httponly'),
                            'samesite': cookie.get('samesite'),
                            'max-age': cookie.get('max-age')
                        }
                    ))

        return response

    def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content:Optional[Union[str, bytes]]=None):
        file_content = content or self.scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = self.scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = self.config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path is not None else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)

Classes

class ResponseBodyHandler (use_brotli: bool = False)
Expand source code
class ResponseBodyHandler:

    SUPPORTED_COMPRESSION = ['gzip', 'deflate']
    SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json']

    class JSONDateTimeDecoder(JSONDecoder):
        def __init__(self, *args, **kargs):
            JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs)

    # brotli under perform at same gzip level and upper level destroy the cpu so
    # the trade off do not worth it for most of usage
    def __init__(self, use_brotli:bool=False):
        if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION:
            try:
                try:
                    import brotlicffi as brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
                except ImportError:
                    import brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
            except ImportError:
                pass

        self.content_encoding = ', '.join(self.SUPPORTED_COMPRESSION)

        try:  # automatically use msgpack if available https://msgpack.org/
            import msgpack
            self.accept = 'application/msgpack;charset=utf-8'
            self.content_type = 'application/msgpack;charset=utf-8'
            self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)
        except ImportError:
            self.accept = 'application/json;charset=utf-8'
            self.content_type = 'application/json;charset=utf-8'
            self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder)

    def support(self, headers:Dict) -> bool:
        if 'content-type' not in headers:
            return False

        for content_type in self.SUPPORTED_CONTENT_TYPES:
            if headers['content-type'].find(content_type) != -1:
                return True

        return False

    def __call__(self, content: bytes) -> Union[str, Dict]:
        try:
            return self.content_loader(content)
        except Exception as e:
            raise EncoderError(content=content.decode('utf-8')) from e

Class variables

var JSONDateTimeDecoder

Simple JSON https://json.org decoder

Performs the following translations in decoding by default:

+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+

It also understands NaN, Infinity, and -Infinity as their corresponding float values, which is outside the JSON spec.

var SUPPORTED_COMPRESSION
var SUPPORTED_CONTENT_TYPES

Methods

def support(self, headers: Dict) ‑> bool
Expand source code
def support(self, headers:Dict) -> bool:
    if 'content-type' not in headers:
        return False

    for content_type in self.SUPPORTED_CONTENT_TYPES:
        if headers['content-type'].find(content_type) != -1:
            return True

    return False
class ScrapeApiResponse (request: requests.models.Request, response: requests.models.Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None)
Expand source code
class ScrapeApiResponse:

    def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None):
        self.request = request
        self.response = response
        self.scrape_config = scrape_config

        if self.scrape_config.method == 'HEAD':
            api_result = {
                'result': {
                    'request_headers': {},
                    'status': 'DONE',
                    'success': 200 >= self.response.status_code < 300,
                    'response_headers': self.response.headers,
                    'status_code': self.response.status_code,
                    'reason': self.response.reason,
                    'format': 'text',
                    'content': ''
                },
                'context': {},
                'config': self.scrape_config.__dict__
            }

            if 'X-Scrapfly-Reject-Code' in self.response.headers:
                api_result['result']['error'] = {
                    'code': self.response.headers['X-Scrapfly-Reject-Code'],
                    'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']),
                    'message': self.response.headers['X-Scrapfly-Reject-Description'],
                    'error_id': self.response.headers['X-Scrapfly-Reject-ID'],
                    'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False,
                    'doc_url': '',
                    'links': {}
                }

                if 'X-Scrapfly-Reject-Doc' in self.response.headers:
                    api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc']
                    api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc']

        if isinstance(api_result, str):
            raise HttpError(
                request=request,
                response=response,
                message='Bad gateway',
                code=502,
                http_status_code=502,
                is_retryable=True
            )

        self.result = self.handle_api_result(api_result=api_result)

    @property
    def scrape_result(self) -> Dict:
        return self.result['result']

    @property
    def config(self) -> Dict:
        return self.result['config']

    @property
    def context(self) -> Dict:
        return self.result['context']

    @property
    def content(self) -> str:
        return self.scrape_result['content']

    @property
    def success(self) -> bool:
        """
            /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
        """
        return 200 >= self.response.status_code <= 299

    @property
    def scrape_success(self) -> bool:
        return self.scrape_result['success']

    @property
    def error(self) -> Optional[Dict]:
        if self.scrape_success is False:
            return self.scrape_result['error']

    @property
    def status_code(self) -> int:
        """
            /!\ This is the status code of our API, not the upstream website
        """
        return self.response.status_code

    @property
    def upstream_status_code(self) -> Optional[int]:
        if 'status_code' in self.scrape_result:
            return self.scrape_result['status_code']

        return None

    def prevent_extra_usage(self):
        if self.remaining_quota == 0:
            raise ExtraUsageForbidden(
                message='All Pre Paid Quota Used',
                code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE',
                http_status_code=429,
                is_retryable=False
            )

    @property
    def remaining_quota(self) -> Optional[int]:
        remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape')

        if remaining_scrape:
            remaining_scrape = int(remaining_scrape)

        return remaining_scrape

    @property
    def cost(self) -> Optional[int]:
        cost = self.response.headers.get('X-Scrapfly-Api-Cost')

        if cost:
            cost = int(cost)

        return cost

    @property
    def duration_ms(self) -> Optional[float]:
        duration = self.response.headers.get('X-Scrapfly-Response-Time')

        if duration:
            duration = float(duration)

        return duration

    @property
    def headers(self) -> CaseInsensitiveDict:
        return self.response.headers

    def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        try:
            if isinstance(api_result['config']['headers'], list):
                api_result['config']['headers'] = {}
        except TypeError:
            logger.info(api_result)
            raise

        with suppress(KeyError):
            api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
            api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])

        if api_result['result']['format'] == 'binary' and api_result['result']['content']:
            api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content']))

        return FrozenDict(api_result)

    @cached_property
    def soup(self) -> 'BeautifulSoup':
        try:
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(self.content, "lxml")
            return soup
        except ImportError as e:
            logger.error('You must install scrapfly[parser] to enable this feature')

    @cached_property
    def selector(self) -> 'Selector':
        try:
            from scrapy import Selector
            return Selector(text=self.content)
        except ImportError as e:
            logger.error('You must install scrapfly[scrapy] to enable this feature')
            raise e

    @property
    def error_message(self) :
        if self.error:
            message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message'])

            if self.error['links']:
                message += "Checkout the related doc: %s" % list(self.error['links'].values())[0]

            return message

        return '<-- %s - %s %s | Doc: %s' % (self.response.status_code, self.http_status_code, self.code, self.documentation_url)

    def _is_api_error(self, api_result: Dict) -> bool:
        if self.scrape_config.method == 'HEAD':
            if 'X-Reject-Reason' in self.response.headers:
                return True
            return False

        if api_result is None:
            return True

        return 'error_id' in api_result

    def raise_for_result(self, raise_on_upstream_error: bool = True):

        try:
            self.response.raise_for_status()
        except HTTPError as e:
            if 'http_code' in self.result:
                if e.response.status_code >= 500:
                    raise ApiHttpServerError(
                        request=e.request,
                        response=e.response,
                        message=self.result['message'],
                        code='',
                        resource='',
                        http_status_code=e.response.status_code,
                        documentation_url=self.result.get('links')
                    ) from e
                else:
                    raise ApiHttpClientError(
                        request=e.request,
                        response=e.response,
                        message=self.result['message'],
                        code='',
                        resource='API',
                        http_status_code=self.result['http_code'],
                        documentation_url=self.result.get('links')
                    ) from e

        if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
            error = ErrorFactory.create(api_response=self)

            if error:
                if isinstance(error, UpstreamHttpError):
                    if raise_on_upstream_error is True:
                        raise error
                else:
                    raise error

    def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
        if _class != Response:
            raise RuntimeError('only Response from requests package is supported at the moment')

        if self.result is None:
            return None

        if self.response.status_code != 200:
            return None

        response = Response()
        response.status_code = self.scrape_result['status_code']
        response.reason = self.scrape_result['reason']

        if self.scrape_result['content']:
            if isinstance(self.scrape_result['content'], BytesIO):
                response._content = self.scrape_result['content'].getvalue()
            elif isinstance(self.scrape_result['content'], bytes):
                response._content = self.scrape_result['content']
            elif isinstance(self.scrape_result['content'], str):
                response._content = self.scrape_result['content'].encode('utf-8')
        else:
            response._content = None
        
        response.headers.update(self.scrape_result['response_headers'])
        response.url = self.scrape_result['url']

        response.request = Request(
            method=self.config['method'],
            url=self.config['url'],
            headers=self.scrape_result['request_headers'],
            data=self.config['body'] if self.config['body'] else None
        )

        if 'set-cookie' in response.headers:
            for raw_cookie in response.headers['set-cookie']:
                for name, cookie in SimpleCookie(raw_cookie).items():
                    expires = cookie.get('expires')

                    if expires == '':
                        expires = None

                    if expires:
                        try:
                            expires = parse(expires).timestamp()
                        except ValueError:
                            expires = None

                    if type(expires) == str:
                        if '.' in expires:
                            expires = float(expires)
                        else:
                            expires = int(expires)

                    response.cookies.set_cookie(Cookie(
                        version=cookie.get('version') if cookie.get('version') else None,
                        name=name,
                        value=cookie.value,
                        path=cookie.get('path', ''),
                        expires=expires,
                        comment=cookie.get('comment'),
                        domain=cookie.get('domain', ''),
                        secure=cookie.get('secure'),
                        port=None,
                        port_specified=False,
                        domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
                        domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
                        path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
                        discard=False,
                        comment_url=None,
                        rest={
                            'httponly': cookie.get('httponly'),
                            'samesite': cookie.get('samesite'),
                            'max-age': cookie.get('max-age')
                        }
                    ))

        return response

    def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content:Optional[Union[str, bytes]]=None):
        file_content = content or self.scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = self.scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = self.config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path is not None else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)

Instance variables

var config : Dict
Expand source code
@property
def config(self) -> Dict:
    return self.result['config']
var content : str
Expand source code
@property
def content(self) -> str:
    return self.scrape_result['content']
var context : Dict
Expand source code
@property
def context(self) -> Dict:
    return self.result['context']
var cost : Optional[int]
Expand source code
@property
def cost(self) -> Optional[int]:
    cost = self.response.headers.get('X-Scrapfly-Api-Cost')

    if cost:
        cost = int(cost)

    return cost
var duration_ms : Optional[float]
Expand source code
@property
def duration_ms(self) -> Optional[float]:
    duration = self.response.headers.get('X-Scrapfly-Response-Time')

    if duration:
        duration = float(duration)

    return duration
var error : Optional[Dict]
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.scrape_success is False:
        return self.scrape_result['error']
var error_message
Expand source code
@property
def error_message(self) :
    if self.error:
        message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message'])

        if self.error['links']:
            message += "Checkout the related doc: %s" % list(self.error['links'].values())[0]

        return message

    return '<-- %s - %s %s | Doc: %s' % (self.response.status_code, self.http_status_code, self.code, self.documentation_url)
var headers : requests.structures.CaseInsensitiveDict
Expand source code
@property
def headers(self) -> CaseInsensitiveDict:
    return self.response.headers
var remaining_quota : Optional[int]
Expand source code
@property
def remaining_quota(self) -> Optional[int]:
    remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape')

    if remaining_scrape:
        remaining_scrape = int(remaining_scrape)

    return remaining_scrape
var scrape_result : Dict
Expand source code
@property
def scrape_result(self) -> Dict:
    return self.result['result']
var scrape_success : bool
Expand source code
@property
def scrape_success(self) -> bool:
    return self.scrape_result['success']
var selector
Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        with self.lock:
            # check if another thread filled cache while we awaited lock
            val = cache.get(self.attrname, _NOT_FOUND)
            if val is _NOT_FOUND:
                val = self.func(instance)
                try:
                    cache[self.attrname] = val
                except TypeError:
                    msg = (
                        f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                        f"does not support item assignment for caching {self.attrname!r} property."
                    )
                    raise TypeError(msg) from None
    return val
var soup
Expand source code
def __get__(self, instance, owner=None):
    if instance is None:
        return self
    if self.attrname is None:
        raise TypeError(
            "Cannot use cached_property instance without calling __set_name__ on it.")
    try:
        cache = instance.__dict__
    except AttributeError:  # not all objects have __dict__ (e.g. class defines slots)
        msg = (
            f"No '__dict__' attribute on {type(instance).__name__!r} "
            f"instance to cache {self.attrname!r} property."
        )
        raise TypeError(msg) from None
    val = cache.get(self.attrname, _NOT_FOUND)
    if val is _NOT_FOUND:
        with self.lock:
            # check if another thread filled cache while we awaited lock
            val = cache.get(self.attrname, _NOT_FOUND)
            if val is _NOT_FOUND:
                val = self.func(instance)
                try:
                    cache[self.attrname] = val
                except TypeError:
                    msg = (
                        f"The '__dict__' attribute on {type(instance).__name__!r} instance "
                        f"does not support item assignment for caching {self.attrname!r} property."
                    )
                    raise TypeError(msg) from None
    return val
var status_code : int

/!\ This is the status code of our API, not the upstream website

Expand source code
@property
def status_code(self) -> int:
    """
        /!\ This is the status code of our API, not the upstream website
    """
    return self.response.status_code
var success : bool

/!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code

Expand source code
@property
def success(self) -> bool:
    """
        /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
    """
    return 200 >= self.response.status_code <= 299
var upstream_status_code : Optional[int]
Expand source code
@property
def upstream_status_code(self) -> Optional[int]:
    if 'status_code' in self.scrape_result:
        return self.scrape_result['status_code']

    return None

Methods

def handle_api_result(self, api_result: Dict) ‑> Optional[FrozenDict]
Expand source code
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
    if self._is_api_error(api_result=api_result) is True:
        return FrozenDict(api_result)

    try:
        if isinstance(api_result['config']['headers'], list):
            api_result['config']['headers'] = {}
    except TypeError:
        logger.info(api_result)
        raise

    with suppress(KeyError):
        api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
        api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])

    if api_result['result']['format'] == 'binary' and api_result['result']['content']:
        api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content']))

    return FrozenDict(api_result)
def prevent_extra_usage(self)
Expand source code
def prevent_extra_usage(self):
    if self.remaining_quota == 0:
        raise ExtraUsageForbidden(
            message='All Pre Paid Quota Used',
            code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE',
            http_status_code=429,
            is_retryable=False
        )
def raise_for_result(self, raise_on_upstream_error: bool = True)
Expand source code
def raise_for_result(self, raise_on_upstream_error: bool = True):

    try:
        self.response.raise_for_status()
    except HTTPError as e:
        if 'http_code' in self.result:
            if e.response.status_code >= 500:
                raise ApiHttpServerError(
                    request=e.request,
                    response=e.response,
                    message=self.result['message'],
                    code='',
                    resource='',
                    http_status_code=e.response.status_code,
                    documentation_url=self.result.get('links')
                ) from e
            else:
                raise ApiHttpClientError(
                    request=e.request,
                    response=e.response,
                    message=self.result['message'],
                    code='',
                    resource='API',
                    http_status_code=self.result['http_code'],
                    documentation_url=self.result.get('links')
                ) from e

    if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
        error = ErrorFactory.create(api_response=self)

        if error:
            if isinstance(error, UpstreamHttpError):
                if raise_on_upstream_error is True:
                    raise error
            else:
                raise error
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None, content: Union[str, bytes, ForwardRef(None)] = None)
Expand source code
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content:Optional[Union[str, bytes]]=None):
    file_content = content or self.scrape_result['content']
    file_path = None
    file_extension = None

    if name:
        name_parts = name.split('.')
        if len(name_parts) > 1:
            file_extension = name_parts[-1]

    if not file:
        if file_extension is None:
            try:
                mime_type = self.scrape_result['response_headers']['content-type']
            except KeyError:
                mime_type = 'application/octet-stream'

            if ';' in mime_type:
                mime_type = mime_type.split(';')[0]

            file_extension = '.' + mime_type.split('/')[1]

        if not name:
            name = self.config['url'].split('/')[-1]

        if name.find(file_extension) == -1:
            name += file_extension

        file_path = path + '/' + name if path is not None else name

        if file_path == file_extension:
            url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')

            if url[-1] == '-':
                url = url[:-1]

            url += file_extension

            file_path = url

        file = open(file_path, 'wb')

    if isinstance(file_content, str):
        file_content = BytesIO(file_content.encode('utf-8'))
    elif isinstance(file_content, bytes):
        file_content = BytesIO(file_content)

    file_content.seek(0)
    with file as f:
        shutil.copyfileobj(file_content, f, length=131072)

    logger.info('file %s created' % file_path)
def upstream_result_into_response(self) ‑> Optional[requests.models.Response]
Expand source code
def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
    if _class != Response:
        raise RuntimeError('only Response from requests package is supported at the moment')

    if self.result is None:
        return None

    if self.response.status_code != 200:
        return None

    response = Response()
    response.status_code = self.scrape_result['status_code']
    response.reason = self.scrape_result['reason']

    if self.scrape_result['content']:
        if isinstance(self.scrape_result['content'], BytesIO):
            response._content = self.scrape_result['content'].getvalue()
        elif isinstance(self.scrape_result['content'], bytes):
            response._content = self.scrape_result['content']
        elif isinstance(self.scrape_result['content'], str):
            response._content = self.scrape_result['content'].encode('utf-8')
    else:
        response._content = None
    
    response.headers.update(self.scrape_result['response_headers'])
    response.url = self.scrape_result['url']

    response.request = Request(
        method=self.config['method'],
        url=self.config['url'],
        headers=self.scrape_result['request_headers'],
        data=self.config['body'] if self.config['body'] else None
    )

    if 'set-cookie' in response.headers:
        for raw_cookie in response.headers['set-cookie']:
            for name, cookie in SimpleCookie(raw_cookie).items():
                expires = cookie.get('expires')

                if expires == '':
                    expires = None

                if expires:
                    try:
                        expires = parse(expires).timestamp()
                    except ValueError:
                        expires = None

                if type(expires) == str:
                    if '.' in expires:
                        expires = float(expires)
                    else:
                        expires = int(expires)

                response.cookies.set_cookie(Cookie(
                    version=cookie.get('version') if cookie.get('version') else None,
                    name=name,
                    value=cookie.value,
                    path=cookie.get('path', ''),
                    expires=expires,
                    comment=cookie.get('comment'),
                    domain=cookie.get('domain', ''),
                    secure=cookie.get('secure'),
                    port=None,
                    port_specified=False,
                    domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
                    domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
                    path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
                    discard=False,
                    comment_url=None,
                    rest={
                        'httponly': cookie.get('httponly'),
                        'samesite': cookie.get('samesite'),
                        'max-age': cookie.get('max-age')
                    }
                ))

    return response