Module scrapfly.api_response
Expand source code
import base64
import binascii
import hashlib
import hmac
import re
import logging as logger
import shutil
from base64 import b64decode
from contextlib import suppress
from datetime import datetime
from functools import partial
try:
from functools import cached_property
except ImportError:
from .polyfill.cached_property import cached_property
from http.cookiejar import Cookie
from http.cookies import SimpleCookie
from io import BytesIO
from json import JSONDecoder, loads
from dateutil.parser import parse
from requests import Request, Response, HTTPError
from typing import List, Dict, Optional, Iterable, Union, TextIO, Tuple, Callable
from requests.structures import CaseInsensitiveDict
from .scrape_config import ScrapeConfig
from .screenshot_config import ScreenshotConfig
from .extraction_config import ExtractionConfig
from .errors import ErrorFactory, ScreenshotAPIError, ExtractionAPIError, EncoderError, ApiHttpClientError, \
ApiHttpServerError, UpstreamHttpError, HttpError, \
ExtraUsageForbidden, WebhookSignatureMissMatch, ContentError
from .frozen_dict import FrozenDict
logger.getLogger(__name__)
_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
def _date_parser(value):
if isinstance(value, Dict):
over = value.items()
else:
over = enumerate(value)
for k, v in over:
if isinstance(v, str):
if len(v) <= 26:
try:
value[k] = datetime.strptime(v, _DATE_FORMAT)
except ValueError:
value[k] = v
else:
value[k] = v
elif isinstance(v, Iterable):
value[k] = _date_parser(v)
else:
value[k] = v
return value
class ResponseBodyHandler:
SUPPORTED_COMPRESSION = ['gzip', 'deflate']
SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json']
class JSONDateTimeDecoder(JSONDecoder):
def __init__(self, *args, **kargs):
JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs)
# brotli under perform at same gzip level and upper level destroy the cpu so
# the trade off do not worth it for most of usage
def __init__(self, use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None):
if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION:
try:
try:
import brotlicffi as brotli
self.SUPPORTED_COMPRESSION.insert(0, 'br')
except ImportError:
import brotli
self.SUPPORTED_COMPRESSION.insert(0, 'br')
except ImportError:
pass
try:
import zstd
self.SUPPORTED_COMPRESSION.append('zstd')
except ImportError:
pass
self.content_encoding: str = ', '.join(self.SUPPORTED_COMPRESSION)
self._signing_secret: Optional[Tuple[str]] = None
if signing_secrets:
_secrets = set()
for signing_secret in signing_secrets:
_secrets.add(binascii.unhexlify(signing_secret))
self._signing_secret = tuple(_secrets)
try: # automatically use msgpack if available https://msgpack.org/
import msgpack
self.accept = 'application/msgpack;charset=utf-8'
self.content_type = 'application/msgpack;charset=utf-8'
self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)
except ImportError:
self.accept = 'application/json;charset=utf-8'
self.content_type = 'application/json;charset=utf-8'
self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder)
def support(self, headers: Dict) -> bool:
if 'content-type' not in headers:
return False
for content_type in self.SUPPORTED_CONTENT_TYPES:
if headers['content-type'].find(content_type) != -1:
return True
return False
def verify(self, message: bytes, signature: str) -> bool:
for signing_secret in self._signing_secret:
if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature:
return True
return False
def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict:
if content_encoding == 'gzip' or content_encoding == 'gz':
import gzip
content = gzip.decompress(content)
elif content_encoding == 'deflate':
import zlib
content = zlib.decompress(content)
elif content_encoding == 'brotli' or content_encoding == 'br':
import brotli
content = brotli.decompress(content)
elif content_encoding == 'zstd':
import zstd
content = zstd.decompress(content)
if self._signing_secret is not None and signature is not None:
if not self.verify(content, signature):
raise WebhookSignatureMissMatch()
if content_type.startswith('application/json'):
content = loads(content, cls=self.JSONDateTimeDecoder)
elif content_type.startswith('application/msgpack'):
import msgpack
content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False)
return content
def __call__(self, content: bytes, content_type: str) -> Union[str, Dict]:
content_loader = None
if content_type.find('application/json') != -1:
content_loader = partial(loads, cls=self.JSONDateTimeDecoder)
elif content_type.find('application/msgpack') != -1:
import msgpack
content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)
if content_loader is None:
raise Exception('Unsupported content type')
try:
return content_loader(content)
except Exception as e:
try:
raise EncoderError(content=content.decode('utf-8')) from e
except UnicodeError:
raise EncoderError(content=base64.b64encode(content).decode('utf-8')) from e
class ApiResponse:
def __init__(self, request: Request, response: Response):
self.request = request
self.response = response
@property
def headers(self) -> CaseInsensitiveDict:
return self.response.headers
@property
def status_code(self) -> int:
"""
/!\ This is the status code of our API, not the upstream website
"""
return self.response.status_code
@property
def remaining_quota(self) -> Optional[int]:
remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape')
if remaining_scrape:
remaining_scrape = int(remaining_scrape)
return remaining_scrape
@property
def cost(self) -> Optional[int]:
cost = self.response.headers.get('X-Scrapfly-Api-Cost')
if cost:
cost = int(cost)
return cost
@property
def duration_ms(self) -> Optional[float]:
duration = self.response.headers.get('X-Scrapfly-Response-Time')
if duration:
duration = float(duration)
return duration
@property
def error_message(self):
if self.error is not None:
message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message'])
if self.error['links']:
message += " Checkout the related doc: %s" % list(self.error['links'].values())[0]
return message
message = "<-- %s | %s." % (self.response.status_code, self.result['message'])
if self.result.get('links'):
message += " Checkout the related doc: %s" % ", ".join(self.result['links'])
return message
def prevent_extra_usage(self):
if self.remaining_quota == 0:
raise ExtraUsageForbidden(
message='All Pre Paid Quota Used',
code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE',
http_status_code=429,
is_retryable=False
)
def raise_for_result(
self, raise_on_upstream_error: bool, error_class: Union[ApiHttpClientError, ScreenshotAPIError, ExtractionAPIError]
):
try:
self.response.raise_for_status()
except HTTPError as e:
if 'error_id' in self.result:
if e.response.status_code >= 500:
raise ApiHttpServerError(
request=e.request,
response=e.response,
message=self.result['message'],
code='',
resource='',
http_status_code=e.response.status_code,
documentation_url=self.result.get('links'),
api_response=self,
) from e
# respect raise_on_upstream_error with screenshot and extraction only
elif error_class in (ScreenshotAPIError, ExtractionAPIError):
if raise_on_upstream_error:
raise error_class(
request=e.request,
response=e.response,
message=self.result['message'],
code='',
resource='API',
http_status_code=self.result['http_code'],
documentation_url=self.result.get('links'),
api_response=self,
) from e
else:
raise error_class(
request=e.request,
response=e.response,
message=self.result['message'],
code='',
resource='API',
http_status_code=self.result['http_code'],
documentation_url=self.result.get('links'),
api_response=self,
) from e
class ScrapeApiResponse(ApiResponse):
scrape_config:ScrapeConfig
large_object_handler:Callable
def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler:Optional[Callable]=None):
super().__init__(request, response)
self.scrape_config = scrape_config
self.large_object_handler = large_object_handler
if self.scrape_config.method == 'HEAD':
api_result = {
'result': {
'request_headers': {},
'status': 'DONE',
'success': 200 >= self.response.status_code < 300,
'response_headers': self.response.headers,
'status_code': self.response.status_code,
'reason': self.response.reason,
'format': 'text',
'content': ''
},
'context': {},
'config': self.scrape_config.__dict__
}
if 'X-Scrapfly-Reject-Code' in self.response.headers:
api_result['result']['error'] = {
'code': self.response.headers['X-Scrapfly-Reject-Code'],
'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']),
'message': self.response.headers['X-Scrapfly-Reject-Description'],
'error_id': self.response.headers['X-Scrapfly-Reject-ID'],
'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False,
'doc_url': '',
'links': {}
}
if 'X-Scrapfly-Reject-Doc' in self.response.headers:
api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc']
api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc']
if isinstance(api_result, str):
raise HttpError(
request=request,
response=response,
message='Bad gateway',
code=502,
http_status_code=502,
is_retryable=True
)
self.result = self.handle_api_result(api_result=api_result)
@property
def scrape_result(self) -> Optional[Dict]:
return self.result.get('result', None)
@property
def scrape_result(self) -> Optional[Dict]:
return self.result.get('result', None)
@property
def config(self) -> Optional[Dict]:
if self.scrape_result is None:
return None
return self.result['config']
@property
def context(self) -> Optional[Dict]:
if self.scrape_result is None:
return None
return self.result['context']
@property
def content(self) -> str:
if self.scrape_result is None:
return ''
return self.scrape_result['content']
@property
def success(self) -> bool:
"""
/!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
"""
return 200 >= self.response.status_code <= 299
@property
def scrape_success(self) -> bool:
scrape_result = self.scrape_result
if not scrape_result:
return False
return self.scrape_result['success']
@property
def error(self) -> Optional[Dict]:
if self.scrape_result is None:
return None
if self.scrape_success is False:
return self.scrape_result['error']
@property
def upstream_status_code(self) -> Optional[int]:
if self.scrape_result is None:
return None
if 'status_code' in self.scrape_result:
return self.scrape_result['status_code']
return None
@cached_property
def soup(self) -> 'BeautifulSoup':
if self.scrape_result['format'] != 'text':
raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(self.content, "lxml")
return soup
except ImportError as e:
logger.error('You must install scrapfly[parser] to enable this feature')
@cached_property
def selector(self) -> 'Selector':
if self.scrape_result['format'] != 'text':
raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")
try:
from parsel import Selector
return Selector(text=self.content)
except ImportError as e:
logger.error('You must install parsel or scrapy package to enable this feature')
raise e
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
if self._is_api_error(api_result=api_result) is True:
return FrozenDict(api_result)
try:
if isinstance(api_result['config']['headers'], list):
api_result['config']['headers'] = {}
except TypeError:
logger.info(api_result)
raise
with suppress(KeyError):
api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])
if self.large_object_handler is not None and api_result['result']['content']:
content_format = api_result['result']['format']
if content_format in ['clob', 'blob']:
api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format)
elif content_format == 'binary' and isinstance(api_result['result']['content'], bytes):
api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content']))
return FrozenDict(api_result)
def _is_api_error(self, api_result: Dict) -> bool:
if self.scrape_config.method == 'HEAD':
if 'X-Reject-Reason' in self.response.headers:
return True
return False
if api_result is None:
return True
return 'error_id' in api_result
def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
if _class != Response:
raise RuntimeError('only Response from requests package is supported at the moment')
if self.result is None:
return None
if self.response.status_code != 200:
return None
response = Response()
response.status_code = self.scrape_result['status_code']
response.reason = self.scrape_result['reason']
if self.scrape_result['content']:
if isinstance(self.scrape_result['content'], BytesIO):
response._content = self.scrape_result['content'].getvalue()
elif isinstance(self.scrape_result['content'], bytes):
response._content = self.scrape_result['content']
elif isinstance(self.scrape_result['content'], str):
response._content = self.scrape_result['content'].encode('utf-8')
else:
response._content = None
response.headers.update(self.scrape_result['response_headers'])
response.url = self.scrape_result['url']
response.request = Request(
method=self.config['method'],
url=self.config['url'],
headers=self.scrape_result['request_headers'],
data=self.config['body'] if self.config['body'] else None
)
if 'set-cookie' in response.headers:
for raw_cookie in response.headers['set-cookie']:
for name, cookie in SimpleCookie(raw_cookie).items():
expires = cookie.get('expires')
if expires == '':
expires = None
if expires:
try:
expires = parse(expires).timestamp()
except ValueError:
expires = None
if type(expires) == str:
if '.' in expires:
expires = float(expires)
else:
expires = int(expires)
response.cookies.set_cookie(Cookie(
version=cookie.get('version') if cookie.get('version') else None,
name=name,
value=cookie.value,
path=cookie.get('path', ''),
expires=expires,
comment=cookie.get('comment'),
domain=cookie.get('domain', ''),
secure=cookie.get('secure'),
port=None,
port_specified=False,
domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
discard=False,
comment_url=None,
rest={
'httponly': cookie.get('httponly'),
'samesite': cookie.get('samesite'),
'max-age': cookie.get('max-age')
}
))
return response
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None):
file_content = content or self.scrape_result['content']
file_path = None
file_extension = None
if name:
name_parts = name.split('.')
if len(name_parts) > 1:
file_extension = name_parts[-1]
if not file:
if file_extension is None:
try:
mime_type = self.scrape_result['response_headers']['content-type']
except KeyError:
mime_type = 'application/octet-stream'
if ';' in mime_type:
mime_type = mime_type.split(';')[0]
file_extension = '.' + mime_type.split('/')[1]
if not name:
name = self.config['url'].split('/')[-1]
if name.find(file_extension) == -1:
name += file_extension
file_path = path + '/' + name if path is not None else name
if file_path == file_extension:
url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')
if url[-1] == '-':
url = url[:-1]
url += file_extension
file_path = url
file = open(file_path, 'wb')
if isinstance(file_content, str):
file_content = BytesIO(file_content.encode('utf-8'))
elif isinstance(file_content, bytes):
file_content = BytesIO(file_content)
file_content.seek(0)
with file as f:
shutil.copyfileobj(file_content, f, length=131072)
logger.info('file %s created' % file_path)
def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError):
super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
error = ErrorFactory.create(api_response=self)
if error:
if isinstance(error, UpstreamHttpError):
if raise_on_upstream_error is True:
raise error
else:
raise error
class ScreenshotApiResponse(ApiResponse):
def __init__(self, request: Request, response: Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None):
super().__init__(request, response)
self.screenshot_config = screenshot_config
self.result = self.handle_api_result(api_result)
@property
def image(self) -> Optional[str]:
binary = self.result.get('result', None)
if binary is None:
return ''
return binary
@property
def metadata(self) -> Optional[Dict]:
if not self.image:
return {}
content_type = self.response.headers.get('content-type')
extension_name = content_type[content_type.find('/') + 1:].split(';')[0]
return {
'extension_name': extension_name,
'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'),
'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url')
}
@property
def screenshot_success(self) -> bool:
if not self.image:
return False
return True
@property
def error(self) -> Optional[Dict]:
if self.image:
return None
if self.screenshot_success is False:
return self.result
def _is_api_error(self, api_result: Dict) -> bool:
if api_result is None:
return True
return 'error_id' in api_result
def handle_api_result(self, api_result: bytes) -> FrozenDict:
if self._is_api_error(api_result=api_result) is True:
return FrozenDict(api_result)
return api_result
def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError):
super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
class ExtractionApiResponse(ApiResponse):
def __init__(self, request: Request, response: Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None):
super().__init__(request, response)
self.extraction_config = extraction_config
self.result = self.handle_api_result(api_result)
@property
def extraction_result(self) -> Optional[Dict]:
extraction_result = self.result.get('result', None)
if not extraction_result: # handle empty extraction responses
return {'data': None, 'content_type': None}
else:
return extraction_result
@property
def data(self) -> Union[Dict, List, str]: # depends on the LLM prompt
if self.error is None:
return self.extraction_result['data']
return None
@property
def content_type(self) -> Optional[str]:
if self.error is None:
return self.extraction_result['content_type']
return None
@property
def extraction_success(self) -> bool:
extraction_result = self.extraction_result
if extraction_result is None or extraction_result['data'] is None:
return False
return True
@property
def error(self) -> Optional[Dict]:
if self.extraction_result is None:
return self.result
return None
def _is_api_error(self, api_result: Dict) -> bool:
if api_result is None:
return True
return 'error_id' in api_result
def handle_api_result(self, api_result: bytes) -> FrozenDict:
if self._is_api_error(api_result=api_result) is True:
return FrozenDict(api_result)
return FrozenDict({'result': api_result})
def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError):
super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Classes
class ApiResponse (request: requests.models.Request, response: requests.models.Response)
-
Expand source code
class ApiResponse: def __init__(self, request: Request, response: Response): self.request = request self.response = response @property def headers(self) -> CaseInsensitiveDict: return self.response.headers @property def status_code(self) -> int: """ /!\ This is the status code of our API, not the upstream website """ return self.response.status_code @property def remaining_quota(self) -> Optional[int]: remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape') if remaining_scrape: remaining_scrape = int(remaining_scrape) return remaining_scrape @property def cost(self) -> Optional[int]: cost = self.response.headers.get('X-Scrapfly-Api-Cost') if cost: cost = int(cost) return cost @property def duration_ms(self) -> Optional[float]: duration = self.response.headers.get('X-Scrapfly-Response-Time') if duration: duration = float(duration) return duration @property def error_message(self): if self.error is not None: message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message']) if self.error['links']: message += " Checkout the related doc: %s" % list(self.error['links'].values())[0] return message message = "<-- %s | %s." % (self.response.status_code, self.result['message']) if self.result.get('links'): message += " Checkout the related doc: %s" % ", ".join(self.result['links']) return message def prevent_extra_usage(self): if self.remaining_quota == 0: raise ExtraUsageForbidden( message='All Pre Paid Quota Used', code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE', http_status_code=429, is_retryable=False ) def raise_for_result( self, raise_on_upstream_error: bool, error_class: Union[ApiHttpClientError, ScreenshotAPIError, ExtractionAPIError] ): try: self.response.raise_for_status() except HTTPError as e: if 'error_id' in self.result: if e.response.status_code >= 500: raise ApiHttpServerError( request=e.request, response=e.response, message=self.result['message'], code='', resource='', http_status_code=e.response.status_code, documentation_url=self.result.get('links'), api_response=self, ) from e # respect raise_on_upstream_error with screenshot and extraction only elif error_class in (ScreenshotAPIError, ExtractionAPIError): if raise_on_upstream_error: raise error_class( request=e.request, response=e.response, message=self.result['message'], code='', resource='API', http_status_code=self.result['http_code'], documentation_url=self.result.get('links'), api_response=self, ) from e else: raise error_class( request=e.request, response=e.response, message=self.result['message'], code='', resource='API', http_status_code=self.result['http_code'], documentation_url=self.result.get('links'), api_response=self, ) from e
Subclasses
Instance variables
var cost : Optional[int]
-
Expand source code
@property def cost(self) -> Optional[int]: cost = self.response.headers.get('X-Scrapfly-Api-Cost') if cost: cost = int(cost) return cost
var duration_ms : Optional[float]
-
Expand source code
@property def duration_ms(self) -> Optional[float]: duration = self.response.headers.get('X-Scrapfly-Response-Time') if duration: duration = float(duration) return duration
var error_message
-
Expand source code
@property def error_message(self): if self.error is not None: message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message']) if self.error['links']: message += " Checkout the related doc: %s" % list(self.error['links'].values())[0] return message message = "<-- %s | %s." % (self.response.status_code, self.result['message']) if self.result.get('links'): message += " Checkout the related doc: %s" % ", ".join(self.result['links']) return message
var headers : requests.structures.CaseInsensitiveDict
-
Expand source code
@property def headers(self) -> CaseInsensitiveDict: return self.response.headers
var remaining_quota : Optional[int]
-
Expand source code
@property def remaining_quota(self) -> Optional[int]: remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape') if remaining_scrape: remaining_scrape = int(remaining_scrape) return remaining_scrape
var status_code : int
-
/!\ This is the status code of our API, not the upstream website
Expand source code
@property def status_code(self) -> int: """ /!\ This is the status code of our API, not the upstream website """ return self.response.status_code
Methods
def prevent_extra_usage(self)
-
Expand source code
def prevent_extra_usage(self): if self.remaining_quota == 0: raise ExtraUsageForbidden( message='All Pre Paid Quota Used', code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE', http_status_code=429, is_retryable=False )
def raise_for_result(self, raise_on_upstream_error: bool, error_class: Union[ApiHttpClientError, scrapfly.errors.ScreenshotAPIError, scrapfly.errors.ExtractionAPIError])
-
Expand source code
def raise_for_result( self, raise_on_upstream_error: bool, error_class: Union[ApiHttpClientError, ScreenshotAPIError, ExtractionAPIError] ): try: self.response.raise_for_status() except HTTPError as e: if 'error_id' in self.result: if e.response.status_code >= 500: raise ApiHttpServerError( request=e.request, response=e.response, message=self.result['message'], code='', resource='', http_status_code=e.response.status_code, documentation_url=self.result.get('links'), api_response=self, ) from e # respect raise_on_upstream_error with screenshot and extraction only elif error_class in (ScreenshotAPIError, ExtractionAPIError): if raise_on_upstream_error: raise error_class( request=e.request, response=e.response, message=self.result['message'], code='', resource='API', http_status_code=self.result['http_code'], documentation_url=self.result.get('links'), api_response=self, ) from e else: raise error_class( request=e.request, response=e.response, message=self.result['message'], code='', resource='API', http_status_code=self.result['http_code'], documentation_url=self.result.get('links'), api_response=self, ) from e
class ExtractionApiResponse (request: requests.models.Request, response: requests.models.Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None)
-
Expand source code
class ExtractionApiResponse(ApiResponse): def __init__(self, request: Request, response: Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None): super().__init__(request, response) self.extraction_config = extraction_config self.result = self.handle_api_result(api_result) @property def extraction_result(self) -> Optional[Dict]: extraction_result = self.result.get('result', None) if not extraction_result: # handle empty extraction responses return {'data': None, 'content_type': None} else: return extraction_result @property def data(self) -> Union[Dict, List, str]: # depends on the LLM prompt if self.error is None: return self.extraction_result['data'] return None @property def content_type(self) -> Optional[str]: if self.error is None: return self.extraction_result['content_type'] return None @property def extraction_success(self) -> bool: extraction_result = self.extraction_result if extraction_result is None or extraction_result['data'] is None: return False return True @property def error(self) -> Optional[Dict]: if self.extraction_result is None: return self.result return None def _is_api_error(self, api_result: Dict) -> bool: if api_result is None: return True return 'error_id' in api_result def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return FrozenDict({'result': api_result}) def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Ancestors
Instance variables
var content_type : Optional[str]
-
Expand source code
@property def content_type(self) -> Optional[str]: if self.error is None: return self.extraction_result['content_type'] return None
var data : Union[Dict, List, str]
-
Expand source code
@property def data(self) -> Union[Dict, List, str]: # depends on the LLM prompt if self.error is None: return self.extraction_result['data'] return None
var error : Optional[Dict]
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.extraction_result is None: return self.result return None
var extraction_result : Optional[Dict]
-
Expand source code
@property def extraction_result(self) -> Optional[Dict]: extraction_result = self.result.get('result', None) if not extraction_result: # handle empty extraction responses return {'data': None, 'content_type': None} else: return extraction_result
var extraction_success : bool
-
Expand source code
@property def extraction_success(self) -> bool: extraction_result = self.extraction_result if extraction_result is None or extraction_result['data'] is None: return False return True
Methods
def handle_api_result(self, api_result: bytes) ‑> FrozenDict
-
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return FrozenDict({'result': api_result})
def raise_for_result(self, raise_on_upstream_error=True, error_class=scrapfly.errors.ExtractionAPIError)
-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Inherited members
class ResponseBodyHandler (use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None)
-
Expand source code
class ResponseBodyHandler: SUPPORTED_COMPRESSION = ['gzip', 'deflate'] SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json'] class JSONDateTimeDecoder(JSONDecoder): def __init__(self, *args, **kargs): JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs) # brotli under perform at same gzip level and upper level destroy the cpu so # the trade off do not worth it for most of usage def __init__(self, use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None): if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION: try: try: import brotlicffi as brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: import brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: pass try: import zstd self.SUPPORTED_COMPRESSION.append('zstd') except ImportError: pass self.content_encoding: str = ', '.join(self.SUPPORTED_COMPRESSION) self._signing_secret: Optional[Tuple[str]] = None if signing_secrets: _secrets = set() for signing_secret in signing_secrets: _secrets.add(binascii.unhexlify(signing_secret)) self._signing_secret = tuple(_secrets) try: # automatically use msgpack if available https://msgpack.org/ import msgpack self.accept = 'application/msgpack;charset=utf-8' self.content_type = 'application/msgpack;charset=utf-8' self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False) except ImportError: self.accept = 'application/json;charset=utf-8' self.content_type = 'application/json;charset=utf-8' self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder) def support(self, headers: Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False def verify(self, message: bytes, signature: str) -> bool: for signing_secret in self._signing_secret: if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature: return True return False def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict: if content_encoding == 'gzip' or content_encoding == 'gz': import gzip content = gzip.decompress(content) elif content_encoding == 'deflate': import zlib content = zlib.decompress(content) elif content_encoding == 'brotli' or content_encoding == 'br': import brotli content = brotli.decompress(content) elif content_encoding == 'zstd': import zstd content = zstd.decompress(content) if self._signing_secret is not None and signature is not None: if not self.verify(content, signature): raise WebhookSignatureMissMatch() if content_type.startswith('application/json'): content = loads(content, cls=self.JSONDateTimeDecoder) elif content_type.startswith('application/msgpack'): import msgpack content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False) return content def __call__(self, content: bytes, content_type: str) -> Union[str, Dict]: content_loader = None if content_type.find('application/json') != -1: content_loader = partial(loads, cls=self.JSONDateTimeDecoder) elif content_type.find('application/msgpack') != -1: import msgpack content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False) if content_loader is None: raise Exception('Unsupported content type') try: return content_loader(content) except Exception as e: try: raise EncoderError(content=content.decode('utf-8')) from e except UnicodeError: raise EncoderError(content=base64.b64encode(content).decode('utf-8')) from e
Class variables
var JSONDateTimeDecoder
-
Simple JSON http://json.org decoder
Performs the following translations in decoding by default:
+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+
It also understands
NaN
,Infinity
, and-Infinity
as their correspondingfloat
values, which is outside the JSON spec. var SUPPORTED_COMPRESSION
var SUPPORTED_CONTENT_TYPES
Methods
def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) ‑> Dict
-
Expand source code
def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict: if content_encoding == 'gzip' or content_encoding == 'gz': import gzip content = gzip.decompress(content) elif content_encoding == 'deflate': import zlib content = zlib.decompress(content) elif content_encoding == 'brotli' or content_encoding == 'br': import brotli content = brotli.decompress(content) elif content_encoding == 'zstd': import zstd content = zstd.decompress(content) if self._signing_secret is not None and signature is not None: if not self.verify(content, signature): raise WebhookSignatureMissMatch() if content_type.startswith('application/json'): content = loads(content, cls=self.JSONDateTimeDecoder) elif content_type.startswith('application/msgpack'): import msgpack content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False) return content
def support(self, headers: Dict) ‑> bool
-
Expand source code
def support(self, headers: Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False
def verify(self, message: bytes, signature: str) ‑> bool
-
Expand source code
def verify(self, message: bytes, signature: str) -> bool: for signing_secret in self._signing_secret: if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature: return True return False
class ScrapeApiResponse (request: requests.models.Request, response: requests.models.Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler: Optional[Callable] = None)
-
Expand source code
class ScrapeApiResponse(ApiResponse): scrape_config:ScrapeConfig large_object_handler:Callable def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler:Optional[Callable]=None): super().__init__(request, response) self.scrape_config = scrape_config self.large_object_handler = large_object_handler if self.scrape_config.method == 'HEAD': api_result = { 'result': { 'request_headers': {}, 'status': 'DONE', 'success': 200 >= self.response.status_code < 300, 'response_headers': self.response.headers, 'status_code': self.response.status_code, 'reason': self.response.reason, 'format': 'text', 'content': '' }, 'context': {}, 'config': self.scrape_config.__dict__ } if 'X-Scrapfly-Reject-Code' in self.response.headers: api_result['result']['error'] = { 'code': self.response.headers['X-Scrapfly-Reject-Code'], 'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']), 'message': self.response.headers['X-Scrapfly-Reject-Description'], 'error_id': self.response.headers['X-Scrapfly-Reject-ID'], 'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False, 'doc_url': '', 'links': {} } if 'X-Scrapfly-Reject-Doc' in self.response.headers: api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc'] api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc'] if isinstance(api_result, str): raise HttpError( request=request, response=response, message='Bad gateway', code=502, http_status_code=502, is_retryable=True ) self.result = self.handle_api_result(api_result=api_result) @property def scrape_result(self) -> Optional[Dict]: return self.result.get('result', None) @property def scrape_result(self) -> Optional[Dict]: return self.result.get('result', None) @property def config(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['config'] @property def context(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['context'] @property def content(self) -> str: if self.scrape_result is None: return '' return self.scrape_result['content'] @property def success(self) -> bool: """ /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299 @property def scrape_success(self) -> bool: scrape_result = self.scrape_result if not scrape_result: return False return self.scrape_result['success'] @property def error(self) -> Optional[Dict]: if self.scrape_result is None: return None if self.scrape_success is False: return self.scrape_result['error'] @property def upstream_status_code(self) -> Optional[int]: if self.scrape_result is None: return None if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None @cached_property def soup(self) -> 'BeautifulSoup': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from bs4 import BeautifulSoup soup = BeautifulSoup(self.content, "lxml") return soup except ImportError as e: logger.error('You must install scrapfly[parser] to enable this feature') @cached_property def selector(self) -> 'Selector': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from parsel import Selector return Selector(text=self.content) except ImportError as e: logger.error('You must install parsel or scrapy package to enable this feature') raise e def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if self.large_object_handler is not None and api_result['result']['content']: content_format = api_result['result']['format'] if content_format in ['clob', 'blob']: api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format) elif content_format == 'binary' and isinstance(api_result['result']['content'], bytes): api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content'])) return FrozenDict(api_result) def _is_api_error(self, api_result: Dict) -> bool: if self.scrape_config.method == 'HEAD': if 'X-Reject-Reason' in self.response.headers: return True return False if api_result is None: return True return 'error_id' in api_result def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class) if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error
Ancestors
Class variables
var large_object_handler : Callable
var scrape_config : ScrapeConfig
Instance variables
var config : Optional[Dict]
-
Expand source code
@property def config(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['config']
var content : str
-
Expand source code
@property def content(self) -> str: if self.scrape_result is None: return '' return self.scrape_result['content']
var context : Optional[Dict]
-
Expand source code
@property def context(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['context']
var error : Optional[Dict]
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.scrape_result is None: return None if self.scrape_success is False: return self.scrape_result['error']
var scrape_result : Optional[Dict]
-
Expand source code
@property def scrape_result(self) -> Optional[Dict]: return self.result.get('result', None)
var scrape_success : bool
-
Expand source code
@property def scrape_success(self) -> bool: scrape_result = self.scrape_result if not scrape_result: return False return self.scrape_result['success']
var selector
-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
var soup
-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
var success : bool
-
/!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
Expand source code
@property def success(self) -> bool: """ /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299
var upstream_status_code : Optional[int]
-
Expand source code
@property def upstream_status_code(self) -> Optional[int]: if self.scrape_result is None: return None if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None
Methods
def handle_api_result(self, api_result: Dict) ‑> Optional[FrozenDict]
-
Expand source code
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if self.large_object_handler is not None and api_result['result']['content']: content_format = api_result['result']['format'] if content_format in ['clob', 'blob']: api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format) elif content_format == 'binary' and isinstance(api_result['result']['content'], bytes): api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content'])) return FrozenDict(api_result)
def raise_for_result(self, raise_on_upstream_error=True, error_class=scrapfly.errors.ApiHttpClientError)
-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class) if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None, content: Union[str, bytes, ForwardRef(None)] = None)
-
Expand source code
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path)
def upstream_result_into_response(self) ‑> Optional[requests.models.Response]
-
Expand source code
def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response
Inherited members
class ScreenshotApiResponse (request: requests.models.Request, response: requests.models.Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None)
-
Expand source code
class ScreenshotApiResponse(ApiResponse): def __init__(self, request: Request, response: Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None): super().__init__(request, response) self.screenshot_config = screenshot_config self.result = self.handle_api_result(api_result) @property def image(self) -> Optional[str]: binary = self.result.get('result', None) if binary is None: return '' return binary @property def metadata(self) -> Optional[Dict]: if not self.image: return {} content_type = self.response.headers.get('content-type') extension_name = content_type[content_type.find('/') + 1:].split(';')[0] return { 'extension_name': extension_name, 'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'), 'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url') } @property def screenshot_success(self) -> bool: if not self.image: return False return True @property def error(self) -> Optional[Dict]: if self.image: return None if self.screenshot_success is False: return self.result def _is_api_error(self, api_result: Dict) -> bool: if api_result is None: return True return 'error_id' in api_result def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return api_result def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Ancestors
Instance variables
var error : Optional[Dict]
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.image: return None if self.screenshot_success is False: return self.result
var image : Optional[str]
-
Expand source code
@property def image(self) -> Optional[str]: binary = self.result.get('result', None) if binary is None: return '' return binary
var metadata : Optional[Dict]
-
Expand source code
@property def metadata(self) -> Optional[Dict]: if not self.image: return {} content_type = self.response.headers.get('content-type') extension_name = content_type[content_type.find('/') + 1:].split(';')[0] return { 'extension_name': extension_name, 'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'), 'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url') }
var screenshot_success : bool
-
Expand source code
@property def screenshot_success(self) -> bool: if not self.image: return False return True
Methods
def handle_api_result(self, api_result: bytes) ‑> FrozenDict
-
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return api_result
def raise_for_result(self, raise_on_upstream_error=True, error_class=scrapfly.errors.ScreenshotAPIError)
-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Inherited members