Module scrapfly.api_response
Expand source code
import re
import logging as logger
import shutil
from base64 import b64decode
from contextlib import suppress
from datetime import datetime
from functools import partial
try:
from functools import cached_property
except ImportError:
from .polyfill.cached_property import cached_property
from http.cookiejar import Cookie
from http.cookies import SimpleCookie
from io import BytesIO
from json import JSONDecoder, loads
from dateutil.parser import parse
from requests import Request, Response, HTTPError
from typing import Dict, Optional, Iterable, Union, TextIO
from requests.structures import CaseInsensitiveDict
from .scrape_config import ScrapeConfig
from .errors import ErrorFactory, EncoderError, ApiHttpClientError, ApiHttpServerError, UpstreamHttpError, HttpError, ExtraUsageForbidden
from .frozen_dict import FrozenDict
logger.getLogger(__name__)
_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
def _date_parser(value):
if isinstance(value, Dict):
over = value.items()
else:
over = enumerate(value)
for k, v in over:
if isinstance(v, str):
if len(v) <= 26:
try:
value[k] = datetime.strptime(v, _DATE_FORMAT)
except ValueError:
value[k] = v
else:
value[k] = v
elif isinstance(v, Iterable):
value[k] = _date_parser(v)
else:
value[k] = v
return value
class ResponseBodyHandler:
SUPPORTED_COMPRESSION = ['gzip', 'deflate']
SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json']
class JSONDateTimeDecoder(JSONDecoder):
def __init__(self, *args, **kargs):
JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs)
# brotli under perform at same gzip level and upper level destroy the cpu so
# the trade off do not worth it for most of usage
def __init__(self, use_brotli:bool=False):
if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION:
try:
try:
import brotlicffi as brotli
self.SUPPORTED_COMPRESSION.insert(0, 'br')
except ImportError:
import brotli
self.SUPPORTED_COMPRESSION.insert(0, 'br')
except ImportError:
pass
self.content_encoding = ', '.join(self.SUPPORTED_COMPRESSION)
try: # automatically use msgpack if available https://msgpack.org/
import msgpack
self.accept = 'application/msgpack;charset=utf-8'
self.content_type = 'application/msgpack;charset=utf-8'
self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)
except ImportError:
self.accept = 'application/json;charset=utf-8'
self.content_type = 'application/json;charset=utf-8'
self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder)
def support(self, headers:Dict) -> bool:
if 'content-type' not in headers:
return False
for content_type in self.SUPPORTED_CONTENT_TYPES:
if headers['content-type'].find(content_type) != -1:
return True
return False
def __call__(self, content: bytes) -> Union[str, Dict]:
try:
return self.content_loader(content)
except Exception as e:
raise EncoderError(content=content.decode('utf-8')) from e
class ScrapeApiResponse:
def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None):
self.request = request
self.response = response
self.scrape_config = scrape_config
if self.scrape_config.method == 'HEAD':
api_result = {
'result': {
'request_headers': {},
'status': 'DONE',
'success': 200 >= self.response.status_code < 300,
'response_headers': self.response.headers,
'status_code': self.response.status_code,
'reason': self.response.reason,
'format': 'text',
'content': ''
},
'context': {},
'config': self.scrape_config.__dict__
}
if 'X-Scrapfly-Reject-Code' in self.response.headers:
api_result['result']['error'] = {
'code': self.response.headers['X-Scrapfly-Reject-Code'],
'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']),
'message': self.response.headers['X-Scrapfly-Reject-Description'],
'error_id': self.response.headers['X-Scrapfly-Reject-ID'],
'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False,
'doc_url': '',
'links': {}
}
if 'X-Scrapfly-Reject-Doc' in self.response.headers:
api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc']
api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc']
if isinstance(api_result, str):
raise HttpError(
request=request,
response=response,
message='Bad gateway',
code=502,
http_status_code=502,
is_retryable=True
)
self.result = self.handle_api_result(api_result=api_result)
@property
def scrape_result(self) -> Dict:
return self.result['result']
@property
def config(self) -> Dict:
return self.result['config']
@property
def context(self) -> Dict:
return self.result['context']
@property
def content(self) -> str:
return self.scrape_result['content']
@property
def success(self) -> bool:
"""
/!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
"""
return 200 >= self.response.status_code <= 299
@property
def scrape_success(self) -> bool:
return self.scrape_result['success']
@property
def error(self) -> Optional[Dict]:
if self.scrape_success is False:
return self.scrape_result['error']
@property
def status_code(self) -> int:
"""
/!\ This is the status code of our API, not the upstream website
"""
return self.response.status_code
@property
def upstream_status_code(self) -> Optional[int]:
if 'status_code' in self.scrape_result:
return self.scrape_result['status_code']
return None
def prevent_extra_usage(self):
if self.remaining_quota == 0:
raise ExtraUsageForbidden(
message='All Pre Paid Quota Used',
code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE',
http_status_code=429,
is_retryable=False
)
@property
def remaining_quota(self) -> Optional[int]:
remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape')
if remaining_scrape:
remaining_scrape = int(remaining_scrape)
return remaining_scrape
@property
def cost(self) -> Optional[int]:
cost = self.response.headers.get('X-Scrapfly-Api-Cost')
if cost:
cost = int(cost)
return cost
@property
def duration_ms(self) -> Optional[float]:
duration = self.response.headers.get('X-Scrapfly-Response-Time')
if duration:
duration = float(duration)
return duration
@property
def headers(self) -> CaseInsensitiveDict:
return self.response.headers
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
if self._is_api_error(api_result=api_result) is True:
return FrozenDict(api_result)
try:
if isinstance(api_result['config']['headers'], list):
api_result['config']['headers'] = {}
except TypeError:
logger.info(api_result)
raise
with suppress(KeyError):
api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])
if api_result['result']['format'] == 'binary' and api_result['result']['content']:
api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content']))
return FrozenDict(api_result)
@cached_property
def soup(self) -> 'BeautifulSoup':
try:
from bs4 import BeautifulSoup
soup = BeautifulSoup(self.content, "lxml")
return soup
except ImportError as e:
logger.error('You must install scrapfly[parser] to enable this feature')
@cached_property
def selector(self) -> 'Selector':
try:
from scrapy import Selector
return Selector(text=self.content)
except ImportError as e:
logger.error('You must install scrapfly[scrapy] to enable this feature')
raise e
@property
def error_message(self) :
if self.error:
message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message'])
if self.error['links']:
message += "Checkout the related doc: %s" % list(self.error['links'].values())[0]
return message
return '<-- %s - %s %s | Doc: %s' % (self.response.status_code, self.http_status_code, self.code, self.documentation_url)
def _is_api_error(self, api_result: Dict) -> bool:
if self.scrape_config.method == 'HEAD':
if 'X-Reject-Reason' in self.response.headers:
return True
return False
if api_result is None:
return True
return 'error_id' in api_result
def raise_for_result(self, raise_on_upstream_error: bool = True):
try:
self.response.raise_for_status()
except HTTPError as e:
if 'http_code' in self.result:
if e.response.status_code >= 500:
raise ApiHttpServerError(
request=e.request,
response=e.response,
message=self.result['message'],
code='',
resource='',
http_status_code=e.response.status_code,
documentation_url=self.result.get('links')
) from e
else:
raise ApiHttpClientError(
request=e.request,
response=e.response,
message=self.result['message'],
code='',
resource='API',
http_status_code=self.result['http_code'],
documentation_url=self.result.get('links')
) from e
if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
error = ErrorFactory.create(api_response=self)
if error:
if isinstance(error, UpstreamHttpError):
if raise_on_upstream_error is True:
raise error
else:
raise error
def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
if _class != Response:
raise RuntimeError('only Response from requests package is supported at the moment')
if self.result is None:
return None
if self.response.status_code != 200:
return None
response = Response()
response.status_code = self.scrape_result['status_code']
response.reason = self.scrape_result['reason']
if self.scrape_result['content']:
if isinstance(self.scrape_result['content'], BytesIO):
response._content = self.scrape_result['content'].getvalue()
elif isinstance(self.scrape_result['content'], bytes):
response._content = self.scrape_result['content']
elif isinstance(self.scrape_result['content'], str):
response._content = self.scrape_result['content'].encode('utf-8')
else:
response._content = None
response.headers.update(self.scrape_result['response_headers'])
response.url = self.scrape_result['url']
response.request = Request(
method=self.config['method'],
url=self.config['url'],
headers=self.scrape_result['request_headers'],
data=self.config['body'] if self.config['body'] else None
)
if 'set-cookie' in response.headers:
for raw_cookie in response.headers['set-cookie']:
for name, cookie in SimpleCookie(raw_cookie).items():
expires = cookie.get('expires')
if expires == '':
expires = None
if expires:
try:
expires = parse(expires).timestamp()
except ValueError:
expires = None
if type(expires) == str:
if '.' in expires:
expires = float(expires)
else:
expires = int(expires)
response.cookies.set_cookie(Cookie(
version=cookie.get('version') if cookie.get('version') else None,
name=name,
value=cookie.value,
path=cookie.get('path', ''),
expires=expires,
comment=cookie.get('comment'),
domain=cookie.get('domain', ''),
secure=cookie.get('secure'),
port=None,
port_specified=False,
domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
discard=False,
comment_url=None,
rest={
'httponly': cookie.get('httponly'),
'samesite': cookie.get('samesite'),
'max-age': cookie.get('max-age')
}
))
return response
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content:Optional[Union[str, bytes]]=None):
file_content = content or self.scrape_result['content']
file_path = None
file_extension = None
if name:
name_parts = name.split('.')
if len(name_parts) > 1:
file_extension = name_parts[-1]
if not file:
if file_extension is None:
try:
mime_type = self.scrape_result['response_headers']['content-type']
except KeyError:
mime_type = 'application/octet-stream'
if ';' in mime_type:
mime_type = mime_type.split(';')[0]
file_extension = '.' + mime_type.split('/')[1]
if not name:
name = self.config['url'].split('/')[-1]
if name.find(file_extension) == -1:
name += file_extension
file_path = path + '/' + name if path is not None else name
if file_path == file_extension:
url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')
if url[-1] == '-':
url = url[:-1]
url += file_extension
file_path = url
file = open(file_path, 'wb')
if isinstance(file_content, str):
file_content = BytesIO(file_content.encode('utf-8'))
elif isinstance(file_content, bytes):
file_content = BytesIO(file_content)
file_content.seek(0)
with file as f:
shutil.copyfileobj(file_content, f, length=131072)
logger.info('file %s created' % file_path)
Classes
class ResponseBodyHandler (use_brotli: bool = False)
-
Expand source code
class ResponseBodyHandler: SUPPORTED_COMPRESSION = ['gzip', 'deflate'] SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json'] class JSONDateTimeDecoder(JSONDecoder): def __init__(self, *args, **kargs): JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs) # brotli under perform at same gzip level and upper level destroy the cpu so # the trade off do not worth it for most of usage def __init__(self, use_brotli:bool=False): if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION: try: try: import brotlicffi as brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: import brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: pass self.content_encoding = ', '.join(self.SUPPORTED_COMPRESSION) try: # automatically use msgpack if available https://msgpack.org/ import msgpack self.accept = 'application/msgpack;charset=utf-8' self.content_type = 'application/msgpack;charset=utf-8' self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False) except ImportError: self.accept = 'application/json;charset=utf-8' self.content_type = 'application/json;charset=utf-8' self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder) def support(self, headers:Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False def __call__(self, content: bytes) -> Union[str, Dict]: try: return self.content_loader(content) except Exception as e: raise EncoderError(content=content.decode('utf-8')) from e
Class variables
var JSONDateTimeDecoder
-
Simple JSON https://json.org decoder
Performs the following translations in decoding by default:
+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+
It also understands
NaN
,Infinity
, and-Infinity
as their correspondingfloat
values, which is outside the JSON spec. var SUPPORTED_COMPRESSION
var SUPPORTED_CONTENT_TYPES
Methods
def support(self, headers: Dict) ‑> bool
-
Expand source code
def support(self, headers:Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False
class ScrapeApiResponse (request: requests.models.Request, response: requests.models.Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None)
-
Expand source code
class ScrapeApiResponse: def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None): self.request = request self.response = response self.scrape_config = scrape_config if self.scrape_config.method == 'HEAD': api_result = { 'result': { 'request_headers': {}, 'status': 'DONE', 'success': 200 >= self.response.status_code < 300, 'response_headers': self.response.headers, 'status_code': self.response.status_code, 'reason': self.response.reason, 'format': 'text', 'content': '' }, 'context': {}, 'config': self.scrape_config.__dict__ } if 'X-Scrapfly-Reject-Code' in self.response.headers: api_result['result']['error'] = { 'code': self.response.headers['X-Scrapfly-Reject-Code'], 'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']), 'message': self.response.headers['X-Scrapfly-Reject-Description'], 'error_id': self.response.headers['X-Scrapfly-Reject-ID'], 'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False, 'doc_url': '', 'links': {} } if 'X-Scrapfly-Reject-Doc' in self.response.headers: api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc'] api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc'] if isinstance(api_result, str): raise HttpError( request=request, response=response, message='Bad gateway', code=502, http_status_code=502, is_retryable=True ) self.result = self.handle_api_result(api_result=api_result) @property def scrape_result(self) -> Dict: return self.result['result'] @property def config(self) -> Dict: return self.result['config'] @property def context(self) -> Dict: return self.result['context'] @property def content(self) -> str: return self.scrape_result['content'] @property def success(self) -> bool: """ /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299 @property def scrape_success(self) -> bool: return self.scrape_result['success'] @property def error(self) -> Optional[Dict]: if self.scrape_success is False: return self.scrape_result['error'] @property def status_code(self) -> int: """ /!\ This is the status code of our API, not the upstream website """ return self.response.status_code @property def upstream_status_code(self) -> Optional[int]: if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None def prevent_extra_usage(self): if self.remaining_quota == 0: raise ExtraUsageForbidden( message='All Pre Paid Quota Used', code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE', http_status_code=429, is_retryable=False ) @property def remaining_quota(self) -> Optional[int]: remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape') if remaining_scrape: remaining_scrape = int(remaining_scrape) return remaining_scrape @property def cost(self) -> Optional[int]: cost = self.response.headers.get('X-Scrapfly-Api-Cost') if cost: cost = int(cost) return cost @property def duration_ms(self) -> Optional[float]: duration = self.response.headers.get('X-Scrapfly-Response-Time') if duration: duration = float(duration) return duration @property def headers(self) -> CaseInsensitiveDict: return self.response.headers def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if api_result['result']['format'] == 'binary' and api_result['result']['content']: api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content'])) return FrozenDict(api_result) @cached_property def soup(self) -> 'BeautifulSoup': try: from bs4 import BeautifulSoup soup = BeautifulSoup(self.content, "lxml") return soup except ImportError as e: logger.error('You must install scrapfly[parser] to enable this feature') @cached_property def selector(self) -> 'Selector': try: from scrapy import Selector return Selector(text=self.content) except ImportError as e: logger.error('You must install scrapfly[scrapy] to enable this feature') raise e @property def error_message(self) : if self.error: message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message']) if self.error['links']: message += "Checkout the related doc: %s" % list(self.error['links'].values())[0] return message return '<-- %s - %s %s | Doc: %s' % (self.response.status_code, self.http_status_code, self.code, self.documentation_url) def _is_api_error(self, api_result: Dict) -> bool: if self.scrape_config.method == 'HEAD': if 'X-Reject-Reason' in self.response.headers: return True return False if api_result is None: return True return 'error_id' in api_result def raise_for_result(self, raise_on_upstream_error: bool = True): try: self.response.raise_for_status() except HTTPError as e: if 'http_code' in self.result: if e.response.status_code >= 500: raise ApiHttpServerError( request=e.request, response=e.response, message=self.result['message'], code='', resource='', http_status_code=e.response.status_code, documentation_url=self.result.get('links') ) from e else: raise ApiHttpClientError( request=e.request, response=e.response, message=self.result['message'], code='', resource='API', http_status_code=self.result['http_code'], documentation_url=self.result.get('links') ) from e if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content:Optional[Union[str, bytes]]=None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path)
Instance variables
var config : Dict
-
Expand source code
@property def config(self) -> Dict: return self.result['config']
var content : str
-
Expand source code
@property def content(self) -> str: return self.scrape_result['content']
var context : Dict
-
Expand source code
@property def context(self) -> Dict: return self.result['context']
var cost : Optional[int]
-
Expand source code
@property def cost(self) -> Optional[int]: cost = self.response.headers.get('X-Scrapfly-Api-Cost') if cost: cost = int(cost) return cost
var duration_ms : Optional[float]
-
Expand source code
@property def duration_ms(self) -> Optional[float]: duration = self.response.headers.get('X-Scrapfly-Response-Time') if duration: duration = float(duration) return duration
var error : Optional[Dict]
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.scrape_success is False: return self.scrape_result['error']
var error_message
-
Expand source code
@property def error_message(self) : if self.error: message = "<-- %s | %s - %s." % (self.response.status_code, self.error['code'], self.error['message']) if self.error['links']: message += "Checkout the related doc: %s" % list(self.error['links'].values())[0] return message return '<-- %s - %s %s | Doc: %s' % (self.response.status_code, self.http_status_code, self.code, self.documentation_url)
var headers : requests.structures.CaseInsensitiveDict
-
Expand source code
@property def headers(self) -> CaseInsensitiveDict: return self.response.headers
var remaining_quota : Optional[int]
-
Expand source code
@property def remaining_quota(self) -> Optional[int]: remaining_scrape = self.response.headers.get('X-Scrapfly-Remaining-Scrape') if remaining_scrape: remaining_scrape = int(remaining_scrape) return remaining_scrape
var scrape_result : Dict
-
Expand source code
@property def scrape_result(self) -> Dict: return self.result['result']
var scrape_success : bool
-
Expand source code
@property def scrape_success(self) -> bool: return self.scrape_result['success']
var selector
-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
var soup
-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
var status_code : int
-
/!\ This is the status code of our API, not the upstream website
Expand source code
@property def status_code(self) -> int: """ /!\ This is the status code of our API, not the upstream website """ return self.response.status_code
var success : bool
-
/!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
Expand source code
@property def success(self) -> bool: """ /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299
var upstream_status_code : Optional[int]
-
Expand source code
@property def upstream_status_code(self) -> Optional[int]: if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None
Methods
def handle_api_result(self, api_result: Dict) ‑> Optional[FrozenDict]
-
Expand source code
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if api_result['result']['format'] == 'binary' and api_result['result']['content']: api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content'])) return FrozenDict(api_result)
def prevent_extra_usage(self)
-
Expand source code
def prevent_extra_usage(self): if self.remaining_quota == 0: raise ExtraUsageForbidden( message='All Pre Paid Quota Used', code='ERR::ACCOUNT::PREVENT_EXTRA_USAGE', http_status_code=429, is_retryable=False )
def raise_for_result(self, raise_on_upstream_error: bool = True)
-
Expand source code
def raise_for_result(self, raise_on_upstream_error: bool = True): try: self.response.raise_for_status() except HTTPError as e: if 'http_code' in self.result: if e.response.status_code >= 500: raise ApiHttpServerError( request=e.request, response=e.response, message=self.result['message'], code='', resource='', http_status_code=e.response.status_code, documentation_url=self.result.get('links') ) from e else: raise ApiHttpClientError( request=e.request, response=e.response, message=self.result['message'], code='', resource='API', http_status_code=self.result['http_code'], documentation_url=self.result.get('links') ) from e if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None, content: Union[str, bytes, ForwardRef(None)] = None)
-
Expand source code
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content:Optional[Union[str, bytes]]=None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path)
def upstream_result_into_response(self) ‑> Optional[requests.models.Response]
-
Expand source code
def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response