Package scrapfly
Expand source code
__version__ = '0.8.18'
from typing import Tuple
from .errors import ScrapflyError
from .errors import ScrapflyAspError
from .errors import ScrapflyProxyError
from .errors import ScrapflyScheduleError
from .errors import ScrapflyScrapeError
from .errors import ScrapflySessionError
from .errors import ScrapflyThrottleError
from .errors import ScrapflyWebhookError
from .errors import EncoderError
from .errors import ErrorFactory
from .errors import HttpError
from .errors import UpstreamHttpError
from .errors import UpstreamHttpClientError
from .errors import UpstreamHttpServerError
from .errors import ApiHttpClientError
from .errors import ApiHttpServerError
from .errors import ScreenshotAPIError
from .errors import ExtractionAPIError
from .api_response import ScrapeApiResponse, ScreenshotApiResponse, ExtractionApiResponse, ResponseBodyHandler
from .client import ScrapflyClient, ScraperAPI, MonitoringTargetPeriod, MonitoringAggregation
from .scrape_config import ScrapeConfig
from .screenshot_config import ScreenshotConfig
from .extraction_config import ExtractionConfig
__all__: Tuple[str, ...] = (
'ScrapflyError',
'ScrapflyAspError',
'ScrapflyProxyError',
'ScrapflyScheduleError',
'ScrapflyScrapeError',
'ScrapflySessionError',
'ScrapflyThrottleError',
'ScrapflyWebhookError',
'UpstreamHttpError',
'UpstreamHttpClientError',
'UpstreamHttpServerError',
'ApiHttpClientError',
'ApiHttpServerError',
'EncoderError',
'ScrapeApiResponse',
'ScreenshotApiResponse',
'ExtractionApiResponse',
'ErrorFactory',
'HttpError',
'ScrapflyClient',
'ResponseBodyHandler',
'ScrapeConfig',
'ScreenshotConfig',
'ExtractionConfig',
'ScreenshotAPIError',
'ExtractionAPIError',
'ScraperAPI',
'MonitoringTargetPeriod',
'MonitoringAggregation',
)
Sub-modules
scrapfly.api_config
scrapfly.api_response
scrapfly.client
scrapfly.errors
scrapfly.extraction_config
scrapfly.frozen_dict
scrapfly.polyfill
scrapfly.reporter
scrapfly.scrape_config
scrapfly.scrapy
scrapfly.screenshot_config
scrapfly.webhook
Classes
class ApiHttpClientError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ApiHttpClientError(HttpError): pass
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
- ApiHttpServerError
- scrapfly.errors.BadApiKeyError
- scrapfly.errors.PaymentRequired
- scrapfly.errors.TooManyRequest
class ApiHttpServerError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ApiHttpServerError(ApiHttpClientError): pass
Ancestors
- ApiHttpClientError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class EncoderError (content: str)
-
Common base class for all exceptions
Expand source code
class EncoderError(BaseException): def __init__(self, content:str): self.content = content super().__init__() def __str__(self) -> str: return self.content def __repr__(self): return "Invalid payload: %s" % self.content
Ancestors
- builtins.BaseException
class ErrorFactory
-
Expand source code
class ErrorFactory: RESOURCE_TO_ERROR = { ScrapflyError.RESOURCE_SCRAPE: ScrapflyScrapeError, ScrapflyError.RESOURCE_WEBHOOK: ScrapflyWebhookError, ScrapflyError.RESOURCE_PROXY: ScrapflyProxyError, ScrapflyError.RESOURCE_SCHEDULE: ScrapflyScheduleError, ScrapflyError.RESOURCE_ASP: ScrapflyAspError, ScrapflyError.RESOURCE_SESSION: ScrapflySessionError } # Notable http error has own class for more convenience # Only applicable for generic API error HTTP_STATUS_TO_ERROR = { 401: BadApiKeyError, 402: PaymentRequired, 429: TooManyRequest } @staticmethod def _get_resource(code: str) -> Optional[Tuple[str, str]]: if isinstance(code, str) and '::' in code: _, resource, _ = code.split('::') return resource return None @staticmethod def create(api_response: 'ScrapeApiResponse'): is_retryable = False kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR http_code = api_response.status_code retry_delay = 5 retry_times = 3 description = None error_url = 'https://scrapfly.io/docs/scrape-api/errors#api' code = api_response.error['code'] if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': http_code = api_response.scrape_result['status_code'] if 'description' in api_response.error: description = api_response.error['description'] message = '%s %s %s' % (str(http_code), code, api_response.error['message']) if 'doc_url' in api_response.error: error_url = api_response.error['doc_url'] if 'retryable' in api_response.error: is_retryable = api_response.error['retryable'] resource = ErrorFactory._get_resource(code=code) if is_retryable is True: if 'X-Retry' in api_response.headers: retry_delay = int(api_response.headers['Retry-After']) message = '%s: %s' % (message, description) if description else message if retry_delay is not None and is_retryable is True: message = '%s. Retry delay : %s seconds' % (message, str(retry_delay)) args = { 'message': message, 'code': code, 'http_status_code': http_code, 'is_retryable': is_retryable, 'api_response': api_response, 'resource': resource, 'retry_delay': retry_delay, 'retry_times': retry_times, 'documentation_url': error_url, 'request': api_response.request, 'response': api_response.response } if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE: if http_code >= 500: return ApiHttpServerError(**args) is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error: return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args) if is_scraper_api_error: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ApiHttpClientError(**args) elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR: if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': if http_code >= 500: return UpstreamHttpServerError(**args) if http_code >= 400: return UpstreamHttpClientError(**args) if resource in ErrorFactory.RESOURCE_TO_ERROR: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ScrapflyError(**args)
Class variables
var HTTP_STATUS_TO_ERROR
var RESOURCE_TO_ERROR
Static methods
def create(api_response: ScrapeApiResponse)
-
Expand source code
@staticmethod def create(api_response: 'ScrapeApiResponse'): is_retryable = False kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR http_code = api_response.status_code retry_delay = 5 retry_times = 3 description = None error_url = 'https://scrapfly.io/docs/scrape-api/errors#api' code = api_response.error['code'] if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': http_code = api_response.scrape_result['status_code'] if 'description' in api_response.error: description = api_response.error['description'] message = '%s %s %s' % (str(http_code), code, api_response.error['message']) if 'doc_url' in api_response.error: error_url = api_response.error['doc_url'] if 'retryable' in api_response.error: is_retryable = api_response.error['retryable'] resource = ErrorFactory._get_resource(code=code) if is_retryable is True: if 'X-Retry' in api_response.headers: retry_delay = int(api_response.headers['Retry-After']) message = '%s: %s' % (message, description) if description else message if retry_delay is not None and is_retryable is True: message = '%s. Retry delay : %s seconds' % (message, str(retry_delay)) args = { 'message': message, 'code': code, 'http_status_code': http_code, 'is_retryable': is_retryable, 'api_response': api_response, 'resource': resource, 'retry_delay': retry_delay, 'retry_times': retry_times, 'documentation_url': error_url, 'request': api_response.request, 'response': api_response.response } if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE: if http_code >= 500: return ApiHttpServerError(**args) is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error: return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args) if is_scraper_api_error: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ApiHttpClientError(**args) elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR: if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': if http_code >= 500: return UpstreamHttpServerError(**args) if http_code >= 400: return UpstreamHttpClientError(**args) if resource in ErrorFactory.RESOURCE_TO_ERROR: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ScrapflyError(**args)
class ExtractionAPIError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ExtractionAPIError(HttpError): pass
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ExtractionApiResponse (request: requests.models.Request, response: requests.models.Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None)
-
Expand source code
class ExtractionApiResponse(ApiResponse): def __init__(self, request: Request, response: Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None): super().__init__(request, response) self.extraction_config = extraction_config self.result = self.handle_api_result(api_result) @property def extraction_result(self) -> Optional[Dict]: extraction_result = self.result.get('result', None) if not extraction_result: # handle empty extraction responses return {'data': None, 'content_type': None} else: return extraction_result @property def data(self) -> Union[Dict, List, str]: # depends on the LLM prompt if self.error is None: return self.extraction_result['data'] return None @property def content_type(self) -> Optional[str]: if self.error is None: return self.extraction_result['content_type'] return None @property def extraction_success(self) -> bool: extraction_result = self.extraction_result if extraction_result is None or extraction_result['data'] is None: return False return True @property def error(self) -> Optional[Dict]: if self.extraction_result is None: return self.result return None def _is_api_error(self, api_result: Dict) -> bool: if api_result is None: return True return 'error_id' in api_result def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return FrozenDict({'result': api_result}) def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Ancestors
Instance variables
var content_type : Optional[str]
-
Expand source code
@property def content_type(self) -> Optional[str]: if self.error is None: return self.extraction_result['content_type'] return None
var data : Union[Dict, List, str]
-
Expand source code
@property def data(self) -> Union[Dict, List, str]: # depends on the LLM prompt if self.error is None: return self.extraction_result['data'] return None
var error : Optional[Dict]
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.extraction_result is None: return self.result return None
var extraction_result : Optional[Dict]
-
Expand source code
@property def extraction_result(self) -> Optional[Dict]: extraction_result = self.result.get('result', None) if not extraction_result: # handle empty extraction responses return {'data': None, 'content_type': None} else: return extraction_result
var extraction_success : bool
-
Expand source code
@property def extraction_success(self) -> bool: extraction_result = self.extraction_result if extraction_result is None or extraction_result['data'] is None: return False return True
Methods
def handle_api_result(self, api_result: bytes) ‑> FrozenDict
-
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return FrozenDict({'result': api_result})
def raise_for_result(self, raise_on_upstream_error=True, error_class=scrapfly.errors.ExtractionAPIError)
-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Inherited members
class ExtractionConfig (body: str, content_type: str, url: Optional[str] = None, charset: Optional[str] = None, template: Optional[str] = None, ephemeral_template: Optional[Dict] = None, extraction_prompt: Optional[str] = None, extraction_model: Optional[str] = None, is_document_compressed: Optional[bool] = None, document_compression_format: Optional[CompressionFormat] = None, webhook: Optional[str] = None, raise_on_upstream_error: bool = True)
-
Expand source code
class ExtractionConfig(BaseApiConfig): body: str content_type: str url: Optional[str] = None charset: Optional[str] = None template: Optional[str] = None # a saved template name ephemeral_template: Optional[Dict] # ephemeraly declared json template extraction_prompt: Optional[str] = None extraction_model: Optional[str] = None is_document_compressed: Optional[bool] = None document_compression_format: Optional[CompressionFormat] = None webhook: Optional[str] = None raise_on_upstream_error: bool = True def __init__( self, body: str, content_type: str, url: Optional[str] = None, charset: Optional[str] = None, template: Optional[str] = None, # a saved template name ephemeral_template: Optional[Dict] = None, # ephemeraly declared json template extraction_prompt: Optional[str] = None, extraction_model: Optional[str] = None, is_document_compressed: Optional[bool] = None, document_compression_format: Optional[CompressionFormat] = None, webhook: Optional[str] = None, raise_on_upstream_error: bool = True ): self.key = None self.body = body self.content_type = content_type self.url = url self.charset = charset self.template = template self.ephemeral_template = ephemeral_template self.extraction_prompt = extraction_prompt self.extraction_model = extraction_model self.is_document_compressed = is_document_compressed self.document_compression_format = document_compression_format self.webhook = webhook self.raise_on_upstream_error = raise_on_upstream_error if self.document_compression_format is not None: if self.is_document_compressed is None: raise ExtractionConfigError( 'When declaring compression format, your must declare the is_document_compressed parameter to compress the document or skip it.' ) if self.is_document_compressed is False: if self.document_compression_format == CompressionFormat.GZIP: import gzip self.body = gzip.compress(bytes(self.body, 'utf-8')) else: raise ExtractionConfigError( f'Auto compression for {self.document_compression_format.value} format is not available. You can manually compress to {self.document_compression_format.value} or choose the gzip format for auto compression.' ) def to_api_params(self, key: str) -> Dict: params = { 'key': self.key or key, 'content_type': self.content_type } if self.url: params['url'] = self.url if self.charset: params['charset'] = self.charset if self.template and self.ephemeral_template: raise ExtractionConfigError('You cannot pass both parameters template and ephemeral_template. You must choose') if self.template: params['extraction_template'] = self.template if self.ephemeral_template: self.ephemeral_template = json.dumps(self.ephemeral_template) params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.ephemeral_template.encode('utf-8')).decode('utf-8') if self.extraction_prompt: params['extraction_prompt'] = quote_plus(self.extraction_prompt) if self.extraction_model: params['extraction_model'] = self.extraction_model if self.webhook: params['webhook_name'] = self.webhook return params
Ancestors
Class variables
var body : str
var charset : Optional[str]
var content_type : str
var document_compression_format : Optional[CompressionFormat]
var ephemeral_template : Optional[Dict]
var extraction_model : Optional[str]
var extraction_prompt : Optional[str]
var is_document_compressed : Optional[bool]
var raise_on_upstream_error : bool
var template : Optional[str]
var url : Optional[str]
var webhook : Optional[str]
Methods
def to_api_params(self, key: str) ‑> Dict
-
Expand source code
def to_api_params(self, key: str) -> Dict: params = { 'key': self.key or key, 'content_type': self.content_type } if self.url: params['url'] = self.url if self.charset: params['charset'] = self.charset if self.template and self.ephemeral_template: raise ExtractionConfigError('You cannot pass both parameters template and ephemeral_template. You must choose') if self.template: params['extraction_template'] = self.template if self.ephemeral_template: self.ephemeral_template = json.dumps(self.ephemeral_template) params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.ephemeral_template.encode('utf-8')).decode('utf-8') if self.extraction_prompt: params['extraction_prompt'] = quote_plus(self.extraction_prompt) if self.extraction_model: params['extraction_model'] = self.extraction_model if self.webhook: params['webhook_name'] = self.webhook return params
class HttpError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class HttpError(ScrapflyError): def __init__(self, request:Request, response:Optional[Response]=None, **kwargs): self.request = request self.response = response super().__init__(**kwargs) def __str__(self) -> str: if isinstance(self, UpstreamHttpError): return f"Target website responded with {self.api_response.scrape_result['status_code']} - {self.api_response.scrape_result['reason']}" if self.api_response is not None: return self.api_response.error_message text = f"{self.response.status_code} - {self.response.reason}" if isinstance(self, (ApiHttpClientError, ApiHttpServerError)): text += " - " + self.message return text
Ancestors
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
- ApiHttpClientError
- scrapfly.errors.ExtractionAPIError
- scrapfly.errors.QuotaLimitReached
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.ScreenshotAPIError
- scrapfly.errors.TooManyConcurrentRequest
- scrapfly.errors.UpstreamHttpError
class ResponseBodyHandler (use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None)
-
Expand source code
class ResponseBodyHandler: SUPPORTED_COMPRESSION = ['gzip', 'deflate'] SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json'] class JSONDateTimeDecoder(JSONDecoder): def __init__(self, *args, **kargs): JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs) # brotli under perform at same gzip level and upper level destroy the cpu so # the trade off do not worth it for most of usage def __init__(self, use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None): if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION: try: try: import brotlicffi as brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: import brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: pass try: import zstd self.SUPPORTED_COMPRESSION.append('zstd') except ImportError: pass self.content_encoding: str = ', '.join(self.SUPPORTED_COMPRESSION) self._signing_secret: Optional[Tuple[str]] = None if signing_secrets: _secrets = set() for signing_secret in signing_secrets: _secrets.add(binascii.unhexlify(signing_secret)) self._signing_secret = tuple(_secrets) try: # automatically use msgpack if available https://msgpack.org/ import msgpack self.accept = 'application/msgpack;charset=utf-8' self.content_type = 'application/msgpack;charset=utf-8' self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False) except ImportError: self.accept = 'application/json;charset=utf-8' self.content_type = 'application/json;charset=utf-8' self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder) def support(self, headers: Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False def verify(self, message: bytes, signature: str) -> bool: for signing_secret in self._signing_secret: if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature: return True return False def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict: if content_encoding == 'gzip' or content_encoding == 'gz': import gzip content = gzip.decompress(content) elif content_encoding == 'deflate': import zlib content = zlib.decompress(content) elif content_encoding == 'brotli' or content_encoding == 'br': import brotli content = brotli.decompress(content) elif content_encoding == 'zstd': import zstd content = zstd.decompress(content) if self._signing_secret is not None and signature is not None: if not self.verify(content, signature): raise WebhookSignatureMissMatch() if content_type.startswith('application/json'): content = loads(content, cls=self.JSONDateTimeDecoder) elif content_type.startswith('application/msgpack'): import msgpack content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False) return content def __call__(self, content: bytes, content_type: str) -> Union[str, Dict]: content_loader = None if content_type.find('application/json') != -1: content_loader = partial(loads, cls=self.JSONDateTimeDecoder) elif content_type.find('application/msgpack') != -1: import msgpack content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False) if content_loader is None: raise Exception('Unsupported content type') try: return content_loader(content) except Exception as e: try: raise EncoderError(content=content.decode('utf-8')) from e except UnicodeError: raise EncoderError(content=base64.b64encode(content).decode('utf-8')) from e
Class variables
var JSONDateTimeDecoder
-
Simple JSON http://json.org decoder
Performs the following translations in decoding by default:
+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+
It also understands
NaN
,Infinity
, and-Infinity
as their correspondingfloat
values, which is outside the JSON spec. var SUPPORTED_COMPRESSION
var SUPPORTED_CONTENT_TYPES
Methods
def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) ‑> Dict
-
Expand source code
def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict: if content_encoding == 'gzip' or content_encoding == 'gz': import gzip content = gzip.decompress(content) elif content_encoding == 'deflate': import zlib content = zlib.decompress(content) elif content_encoding == 'brotli' or content_encoding == 'br': import brotli content = brotli.decompress(content) elif content_encoding == 'zstd': import zstd content = zstd.decompress(content) if self._signing_secret is not None and signature is not None: if not self.verify(content, signature): raise WebhookSignatureMissMatch() if content_type.startswith('application/json'): content = loads(content, cls=self.JSONDateTimeDecoder) elif content_type.startswith('application/msgpack'): import msgpack content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False) return content
def support(self, headers: Dict) ‑> bool
-
Expand source code
def support(self, headers: Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False
def verify(self, message: bytes, signature: str) ‑> bool
-
Expand source code
def verify(self, message: bytes, signature: str) -> bool: for signing_secret in self._signing_secret: if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature: return True return False
class ScrapeApiResponse (request: requests.models.Request, response: requests.models.Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler: Optional[Callable] = None)
-
Expand source code
class ScrapeApiResponse(ApiResponse): scrape_config:ScrapeConfig large_object_handler:Callable def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler:Optional[Callable]=None): super().__init__(request, response) self.scrape_config = scrape_config self.large_object_handler = large_object_handler if self.scrape_config.method == 'HEAD': api_result = { 'result': { 'request_headers': {}, 'status': 'DONE', 'success': 200 >= self.response.status_code < 300, 'response_headers': self.response.headers, 'status_code': self.response.status_code, 'reason': self.response.reason, 'format': 'text', 'content': '' }, 'context': {}, 'config': self.scrape_config.__dict__ } if 'X-Scrapfly-Reject-Code' in self.response.headers: api_result['result']['error'] = { 'code': self.response.headers['X-Scrapfly-Reject-Code'], 'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']), 'message': self.response.headers['X-Scrapfly-Reject-Description'], 'error_id': self.response.headers['X-Scrapfly-Reject-ID'], 'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False, 'doc_url': '', 'links': {} } if 'X-Scrapfly-Reject-Doc' in self.response.headers: api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc'] api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc'] if isinstance(api_result, str): raise HttpError( request=request, response=response, message='Bad gateway', code=502, http_status_code=502, is_retryable=True ) self.result = self.handle_api_result(api_result=api_result) @property def scrape_result(self) -> Optional[Dict]: return self.result.get('result', None) @property def scrape_result(self) -> Optional[Dict]: return self.result.get('result', None) @property def config(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['config'] @property def context(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['context'] @property def content(self) -> str: if self.scrape_result is None: return '' return self.scrape_result['content'] @property def success(self) -> bool: """ /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299 @property def scrape_success(self) -> bool: scrape_result = self.scrape_result if not scrape_result: return False return self.scrape_result['success'] @property def error(self) -> Optional[Dict]: if self.scrape_result is None: return None if self.scrape_success is False: return self.scrape_result['error'] @property def upstream_status_code(self) -> Optional[int]: if self.scrape_result is None: return None if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None @cached_property def soup(self) -> 'BeautifulSoup': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from bs4 import BeautifulSoup soup = BeautifulSoup(self.content, "lxml") return soup except ImportError as e: logger.error('You must install scrapfly[parser] to enable this feature') @cached_property def selector(self) -> 'Selector': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from parsel import Selector return Selector(text=self.content) except ImportError as e: logger.error('You must install parsel or scrapy package to enable this feature') raise e def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if self.large_object_handler is not None and api_result['result']['content']: content_format = api_result['result']['format'] if content_format in ['clob', 'blob']: api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format) elif content_format == 'binary' and isinstance(api_result['result']['content'], bytes): api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content'])) return FrozenDict(api_result) def _is_api_error(self, api_result: Dict) -> bool: if self.scrape_config.method == 'HEAD': if 'X-Reject-Reason' in self.response.headers: return True return False if api_result is None: return True return 'error_id' in api_result def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class) if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error
Ancestors
Class variables
var large_object_handler : Callable
var scrape_config : ScrapeConfig
Instance variables
var config : Optional[Dict]
-
Expand source code
@property def config(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['config']
var content : str
-
Expand source code
@property def content(self) -> str: if self.scrape_result is None: return '' return self.scrape_result['content']
var context : Optional[Dict]
-
Expand source code
@property def context(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['context']
var error : Optional[Dict]
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.scrape_result is None: return None if self.scrape_success is False: return self.scrape_result['error']
var scrape_result : Optional[Dict]
-
Expand source code
@property def scrape_result(self) -> Optional[Dict]: return self.result.get('result', None)
var scrape_success : bool
-
Expand source code
@property def scrape_success(self) -> bool: scrape_result = self.scrape_result if not scrape_result: return False return self.scrape_result['success']
var selector
-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
var soup
-
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: with self.lock: # check if another thread filled cache while we awaited lock val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val
var success : bool
-
/!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
Expand source code
@property def success(self) -> bool: """ /!\ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299
var upstream_status_code : Optional[int]
-
Expand source code
@property def upstream_status_code(self) -> Optional[int]: if self.scrape_result is None: return None if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None
Methods
def handle_api_result(self, api_result: Dict) ‑> Optional[FrozenDict]
-
Expand source code
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if self.large_object_handler is not None and api_result['result']['content']: content_format = api_result['result']['format'] if content_format in ['clob', 'blob']: api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format) elif content_format == 'binary' and isinstance(api_result['result']['content'], bytes): api_result['result']['content'] = BytesIO(b64decode(api_result['result']['content'])) return FrozenDict(api_result)
def raise_for_result(self, raise_on_upstream_error=True, error_class=scrapfly.errors.ApiHttpClientError)
-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class) if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None, content: Union[str, bytes, ForwardRef(None)] = None)
-
Expand source code
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path)
def upstream_result_into_response(self) ‑> Optional[requests.models.Response]
-
Expand source code
def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response
Inherited members
class ScrapeConfig (url: str, retry: bool = True, method: str = 'GET', country: Optional[str] = None, render_js: bool = False, cache: bool = False, cache_clear: bool = False, ssl: bool = False, dns: bool = False, asp: bool = False, debug: bool = False, raise_on_upstream_error: bool = True, cache_ttl: Optional[int] = None, proxy_pool: Optional[str] = None, session: Optional[str] = None, tags: Union[List[str], Set[str], ForwardRef(None)] = None, format: Optional[Format] = None, format_options: Optional[List[FormatOption]] = None, correlation_id: Optional[str] = None, cookies: Optional[requests.structures.CaseInsensitiveDict] = None, body: Optional[str] = None, data: Optional[Dict] = None, headers: Union[requests.structures.CaseInsensitiveDict, Dict[str, str], ForwardRef(None)] = None, js: str = None, rendering_wait: int = None, wait_for_selector: Optional[str] = None, screenshots: Optional[Dict] = None, screenshot_flags: Optional[List[ScreenshotFlag]] = None, session_sticky_proxy: Optional[bool] = None, webhook: Optional[str] = None, timeout: Optional[int] = None, js_scenario: Optional[List] = None, extract: Optional[Dict] = None, os: Optional[str] = None, lang: Optional[List[str]] = None, auto_scroll: Optional[bool] = None, cost_budget: Optional[int] = None)
-
Expand source code
class ScrapeConfig(BaseApiConfig): PUBLIC_DATACENTER_POOL = 'public_datacenter_pool' PUBLIC_RESIDENTIAL_POOL = 'public_residential_pool' url: str retry: bool = True method: str = 'GET' country: Optional[str] = None render_js: bool = False cache: bool = False cache_clear:bool = False ssl:bool = False dns:bool = False asp:bool = False debug: bool = False raise_on_upstream_error:bool = True cache_ttl:Optional[int] = None proxy_pool:Optional[str] = None session: Optional[str] = None tags: Optional[List[str]] = None format: Optional[Format] = None, # raw(unchanged) correlation_id: Optional[str] = None cookies: Optional[CaseInsensitiveDict] = None body: Optional[str] = None data: Optional[Dict] = None headers: Optional[CaseInsensitiveDict] = None js: str = None rendering_wait: int = None wait_for_selector: Optional[str] = None session_sticky_proxy:bool = True screenshots:Optional[Dict]=None screenshot_flags: Optional[List[ScreenshotFlag]] = None, webhook:Optional[str]=None timeout:Optional[int]=None # in milliseconds js_scenario: Dict = None extract: Dict = None lang:Optional[List[str]] = None os:Optional[str] = None auto_scroll:Optional[bool] = None cost_budget:Optional[int] = None def __init__( self, url: str, retry: bool = True, method: str = 'GET', country: Optional[str] = None, render_js: bool = False, cache: bool = False, cache_clear:bool = False, ssl:bool = False, dns:bool = False, asp:bool = False, debug: bool = False, raise_on_upstream_error:bool = True, cache_ttl:Optional[int] = None, proxy_pool:Optional[str] = None, session: Optional[str] = None, tags: Optional[Union[List[str], Set[str]]] = None, format: Optional[Format] = None, # raw(unchanged) format_options: Optional[List[FormatOption]] = None, # raw(unchanged) correlation_id: Optional[str] = None, cookies: Optional[CaseInsensitiveDict] = None, body: Optional[str] = None, data: Optional[Dict] = None, headers: Optional[Union[CaseInsensitiveDict, Dict[str, str]]] = None, js: str = None, rendering_wait: int = None, wait_for_selector: Optional[str] = None, screenshots:Optional[Dict]=None, screenshot_flags: Optional[List[ScreenshotFlag]] = None, session_sticky_proxy:Optional[bool] = None, webhook:Optional[str] = None, timeout:Optional[int] = None, # in milliseconds js_scenario:Optional[List] = None, extract:Optional[Dict] = None, os:Optional[str] = None, lang:Optional[List[str]] = None, auto_scroll:Optional[bool] = None, cost_budget:Optional[int] = None ): assert(type(url) is str) if isinstance(tags, List): tags = set(tags) cookies = cookies or {} headers = headers or {} self.cookies = CaseInsensitiveDict(cookies) self.headers = CaseInsensitiveDict(headers) self.url = url self.retry = retry self.method = method self.country = country self.session_sticky_proxy = session_sticky_proxy self.render_js = render_js self.cache = cache self.cache_clear = cache_clear self.asp = asp self.webhook = webhook self.session = session self.debug = debug self.cache_ttl = cache_ttl self.proxy_pool = proxy_pool self.tags = tags or set() self.format = format self.format_options = format_options self.correlation_id = correlation_id self.wait_for_selector = wait_for_selector self.body = body self.data = data self.js = js self.rendering_wait = rendering_wait self.raise_on_upstream_error = raise_on_upstream_error self.screenshots = screenshots self.screenshot_flags = screenshot_flags self.key = None self.dns = dns self.ssl = ssl self.js_scenario = js_scenario self.timeout = timeout self.extract = extract self.lang = lang self.os = os self.auto_scroll = auto_scroll self.cost_budget = cost_budget if cookies: _cookies = [] for name, value in cookies.items(): _cookies.append(name + '=' + value) if 'cookie' in self.headers: if self.headers['cookie'][-1] != ';': self.headers['cookie'] += ';' else: self.headers['cookie'] = '' self.headers['cookie'] += '; '.join(_cookies) if self.body and self.data: raise ScrapeConfigError('You cannot pass both parameters body and data. You must choose') if method in ['POST', 'PUT', 'PATCH']: if self.body is None and self.data is not None: if 'content-type' not in self.headers: self.headers['content-type'] = 'application/x-www-form-urlencoded' self.body = urlencode(data) else: if self.headers['content-type'].find('application/json') != -1: self.body = json.dumps(data) elif self.headers['content-type'].find('application/x-www-form-urlencoded') != -1: self.body = urlencode(data) else: raise ScrapeConfigError('Content-Type "%s" not supported, use body parameter to pass pre encoded body according to your content type' % self.headers['content-type']) elif self.body is None and self.data is None: self.headers['content-type'] = 'text/plain' def to_api_params(self, key:str) -> Dict: params = { 'key': self.key or key, 'url': self.url } if self.country is not None: params['country'] = self.country for name, value in self.headers.items(): params['headers[%s]' % name] = value if self.webhook is not None: params['webhook_name'] = self.webhook if self.timeout is not None: params['timeout'] = self.timeout if self.extract is not None: params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8') if self.cost_budget is not None: params['cost_budget'] = self.cost_budget if self.render_js is True: params['render_js'] = self._bool_to_http(self.render_js) if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.js_scenario: params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8') if self.rendering_wait: params['rendering_wait'] = self.rendering_wait if self.screenshots is not None: for name, element in self.screenshots.items(): params['screenshots[%s]' % name] = element if self.screenshot_flags is not None: self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags] params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags) else: if self.screenshot_flags is not None: logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled') if self.auto_scroll is True: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) else: if self.wait_for_selector is not None: logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled') if self.screenshots: logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled') if self.js_scenario: logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled') if self.js: logging.warning('Params "js" is ignored. Works only if render_js is enabled') if self.rendering_wait: logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled') if self.asp is True: params['asp'] = self._bool_to_http(self.asp) if self.retry is False: params['retry'] = self._bool_to_http(self.retry) if self.cache is True: params['cache'] = self._bool_to_http(self.cache) if self.cache_clear is True: params['cache_clear'] = self._bool_to_http(self.cache_clear) if self.cache_ttl is not None: params['cache_ttl'] = self.cache_ttl else: if self.cache_clear is True: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.dns is True: params['dns'] = self._bool_to_http(self.dns) if self.ssl is True: params['ssl'] = self._bool_to_http(self.ssl) if self.tags: params['tags'] = ','.join(self.tags) if self.format: params['format'] = Format(self.format).value if self.format_options: params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options) if self.correlation_id: params['correlation_id'] = self.correlation_id if self.session: params['session'] = self.session if self.session_sticky_proxy is True: # false by default params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy) else: if self.session_sticky_proxy: logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled') if self.debug is True: params['debug'] = self._bool_to_http(self.debug) if self.proxy_pool is not None: params['proxy_pool'] = self.proxy_pool if self.lang is not None: params['lang'] = ','.join(self.lang) if self.os is not None: params['os'] = self.os return params @staticmethod def from_exported_config(config:str) -> 'ScrapeConfig': try: from msgpack import loads as msgpack_loads except ImportError as e: print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack') raise data = msgpack_loads(base64.b64decode(config)) headers = {} for name, value in data['headers'].items(): if isinstance(value, Iterable): headers[name] = '; '.join(value) else: headers[name] = value return ScrapeConfig( url=data['url'], retry=data['retry'], headers=headers, session=data['session'], session_sticky_proxy=data['session_sticky_proxy'], cache=data['cache'], cache_ttl=data['cache_ttl'], cache_clear=data['cache_clear'], render_js=data['render_js'], method=data['method'], asp=data['asp'], body=data['body'], ssl=data['ssl'], dns=data['dns'], country=data['country'], debug=data['debug'], correlation_id=data['correlation_id'], tags=data['tags'], format=data['format'], js=data['js'], rendering_wait=data['rendering_wait'], screenshots=data['screenshots'] or {}, screenshot_flags=data['screenshot_flags'], proxy_pool=data['proxy_pool'], auto_scroll=data['auto_scroll'], cost_budget=data['cost_budget'] )
Ancestors
Class variables
var PUBLIC_DATACENTER_POOL
var PUBLIC_RESIDENTIAL_POOL
var asp : bool
var auto_scroll : Optional[bool]
var body : Optional[str]
var cache : bool
var cache_clear : bool
var cache_ttl : Optional[int]
var correlation_id : Optional[str]
var cost_budget : Optional[int]
var country : Optional[str]
var data : Optional[Dict]
var debug : bool
var dns : bool
var extract : Dict
var format : Optional[Format]
var headers : Optional[requests.structures.CaseInsensitiveDict]
var js : str
var js_scenario : Dict
var lang : Optional[List[str]]
var method : str
var os : Optional[str]
var proxy_pool : Optional[str]
var raise_on_upstream_error : bool
var render_js : bool
var rendering_wait : int
var retry : bool
var screenshot_flags : Optional[List[ScreenshotFlag]]
var screenshots : Optional[Dict]
var session : Optional[str]
var session_sticky_proxy : bool
var ssl : bool
var timeout : Optional[int]
var url : str
var wait_for_selector : Optional[str]
var webhook : Optional[str]
Static methods
def from_exported_config(config: str) ‑> ScrapeConfig
-
Expand source code
@staticmethod def from_exported_config(config:str) -> 'ScrapeConfig': try: from msgpack import loads as msgpack_loads except ImportError as e: print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack') raise data = msgpack_loads(base64.b64decode(config)) headers = {} for name, value in data['headers'].items(): if isinstance(value, Iterable): headers[name] = '; '.join(value) else: headers[name] = value return ScrapeConfig( url=data['url'], retry=data['retry'], headers=headers, session=data['session'], session_sticky_proxy=data['session_sticky_proxy'], cache=data['cache'], cache_ttl=data['cache_ttl'], cache_clear=data['cache_clear'], render_js=data['render_js'], method=data['method'], asp=data['asp'], body=data['body'], ssl=data['ssl'], dns=data['dns'], country=data['country'], debug=data['debug'], correlation_id=data['correlation_id'], tags=data['tags'], format=data['format'], js=data['js'], rendering_wait=data['rendering_wait'], screenshots=data['screenshots'] or {}, screenshot_flags=data['screenshot_flags'], proxy_pool=data['proxy_pool'], auto_scroll=data['auto_scroll'], cost_budget=data['cost_budget'] )
Methods
def to_api_params(self, key: str) ‑> Dict
-
Expand source code
def to_api_params(self, key:str) -> Dict: params = { 'key': self.key or key, 'url': self.url } if self.country is not None: params['country'] = self.country for name, value in self.headers.items(): params['headers[%s]' % name] = value if self.webhook is not None: params['webhook_name'] = self.webhook if self.timeout is not None: params['timeout'] = self.timeout if self.extract is not None: params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8') if self.cost_budget is not None: params['cost_budget'] = self.cost_budget if self.render_js is True: params['render_js'] = self._bool_to_http(self.render_js) if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.js_scenario: params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8') if self.rendering_wait: params['rendering_wait'] = self.rendering_wait if self.screenshots is not None: for name, element in self.screenshots.items(): params['screenshots[%s]' % name] = element if self.screenshot_flags is not None: self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags] params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags) else: if self.screenshot_flags is not None: logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled') if self.auto_scroll is True: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) else: if self.wait_for_selector is not None: logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled') if self.screenshots: logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled') if self.js_scenario: logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled') if self.js: logging.warning('Params "js" is ignored. Works only if render_js is enabled') if self.rendering_wait: logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled') if self.asp is True: params['asp'] = self._bool_to_http(self.asp) if self.retry is False: params['retry'] = self._bool_to_http(self.retry) if self.cache is True: params['cache'] = self._bool_to_http(self.cache) if self.cache_clear is True: params['cache_clear'] = self._bool_to_http(self.cache_clear) if self.cache_ttl is not None: params['cache_ttl'] = self.cache_ttl else: if self.cache_clear is True: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.dns is True: params['dns'] = self._bool_to_http(self.dns) if self.ssl is True: params['ssl'] = self._bool_to_http(self.ssl) if self.tags: params['tags'] = ','.join(self.tags) if self.format: params['format'] = Format(self.format).value if self.format_options: params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options) if self.correlation_id: params['correlation_id'] = self.correlation_id if self.session: params['session'] = self.session if self.session_sticky_proxy is True: # false by default params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy) else: if self.session_sticky_proxy: logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled') if self.debug is True: params['debug'] = self._bool_to_http(self.debug) if self.proxy_pool is not None: params['proxy_pool'] = self.proxy_pool if self.lang is not None: params['lang'] = ','.join(self.lang) if self.os is not None: params['os'] = self.os return params
class ScraperAPI
-
Expand source code
class ScraperAPI: MONITORING_DATA_FORMAT_STRUCTURED = 'structured' MONITORING_DATA_FORMAT_PROMETHEUS = 'prometheus' MONITORING_PERIOD_SUBSCRIPTION = 'subscription' MONITORING_PERIOD_LAST_7D = 'last7d' MONITORING_PERIOD_LAST_24H = 'last24h' MONITORING_PERIOD_LAST_1H = 'last1h' MONITORING_PERIOD_LAST_5m = 'last5m' MONITORING_ACCOUNT_AGGREGATION = 'account' MONITORING_PROJECT_AGGREGATION = 'project' MONITORING_TARGET_AGGREGATION = 'target'
Class variables
var MONITORING_ACCOUNT_AGGREGATION
var MONITORING_DATA_FORMAT_PROMETHEUS
var MONITORING_DATA_FORMAT_STRUCTURED
var MONITORING_PERIOD_LAST_1H
var MONITORING_PERIOD_LAST_24H
var MONITORING_PERIOD_LAST_5m
var MONITORING_PERIOD_LAST_7D
var MONITORING_PERIOD_SUBSCRIPTION
var MONITORING_PROJECT_AGGREGATION
var MONITORING_TARGET_AGGREGATION
class ScrapflyAspError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyAspError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyClient (key: str, host: Optional[str] = 'https://api.scrapfly.io', verify=True, debug: bool = False, max_concurrency: int = 1, connect_timeout: int = 30, web_scraping_api_read_timeout: int = 160, extraction_api_read_timeout: int = 35, screenshot_api_read_timeout: int = 60, read_timeout: int = 30, default_read_timeout: int = 30, reporter: Optional[Callable] = None, **kwargs)
-
Expand source code
class ScrapflyClient: HOST = 'https://api.scrapfly.io' DEFAULT_CONNECT_TIMEOUT = 30 DEFAULT_READ_TIMEOUT = 30 DEFAULT_WEBSCRAPING_API_READ_TIMEOUT = 160 # 155 real DEFAULT_SCREENSHOT_API_READ_TIMEOUT = 60 # 30 real DEFAULT_EXTRACTION_API_READ_TIMEOUT = 35 # 30 real host:str key:str max_concurrency:int verify:bool debug:bool distributed_mode:bool connect_timeout:int web_scraping_api_read_timeout:int screenshot_api_read_timeout:int extraction_api_read_timeout:int monitoring_api_read_timeout:int default_read_timeout:int brotli: bool reporter:Reporter version:str # @deprecated read_timeout:int CONCURRENCY_AUTO = 'auto' # retrieve the allowed concurrency from your account DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S' def __init__( self, key: str, host: Optional[str] = HOST, verify=True, debug: bool = False, max_concurrency:int=1, connect_timeout:int = DEFAULT_CONNECT_TIMEOUT, web_scraping_api_read_timeout: int = DEFAULT_WEBSCRAPING_API_READ_TIMEOUT, extraction_api_read_timeout: int = DEFAULT_EXTRACTION_API_READ_TIMEOUT, screenshot_api_read_timeout: int = DEFAULT_SCREENSHOT_API_READ_TIMEOUT, # @deprecated read_timeout:int = DEFAULT_READ_TIMEOUT, default_read_timeout:int = DEFAULT_READ_TIMEOUT, reporter:Optional[Callable]=None, **kwargs ): if host[-1] == '/': # remove last '/' if exists host = host[:-1] if 'distributed_mode' in kwargs: warnings.warn("distributed mode is deprecated and will be remove the next version -" " user should handle themself the session name based on the concurrency", DeprecationWarning, stacklevel=2 ) if 'brotli' in kwargs: warnings.warn("brotli arg is deprecated and will be remove the next version - " "brotli is disabled by default", DeprecationWarning, stacklevel=2 ) self.version = __version__ self.host = host self.key = key self.verify = verify self.debug = debug self.connect_timeout = connect_timeout self.web_scraping_api_read_timeout = web_scraping_api_read_timeout self.screenshot_api_read_timeout = screenshot_api_read_timeout self.extraction_api_read_timeout = extraction_api_read_timeout self.monitoring_api_read_timeout = default_read_timeout self.default_read_timeout = default_read_timeout # @deprecated self.read_timeout = default_read_timeout self.max_concurrency = max_concurrency self.body_handler = ResponseBodyHandler(use_brotli=False) self.async_executor = ThreadPoolExecutor() self.http_session = None if not self.verify and not self.HOST.endswith('.local'): urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) if self.debug is True: http.client.HTTPConnection.debuglevel = 5 if reporter is None: from .reporter import NoopReporter reporter = NoopReporter() self.reporter = Reporter(reporter) @property def ua(self) -> str: return 'ScrapflySDK/%s (Python %s, %s, %s)' % ( self.version, platform.python_version(), platform.uname().system, platform.uname().machine ) @cached_property def _http_handler(self): return partial(self.http_session.request if self.http_session else requests.request) @property def http(self): return self._http_handler def _scrape_request(self, scrape_config:ScrapeConfig): return { 'method': scrape_config.method, 'url': self.host + '/scrape', 'data': scrape_config.body, 'verify': self.verify, 'timeout': (self.connect_timeout, self.web_scraping_api_read_timeout), 'headers': { 'content-type': scrape_config.headers['content-type'] if scrape_config.method in ['POST', 'PUT', 'PATCH'] else self.body_handler.content_type, 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, 'params': scrape_config.to_api_params(key=self.key) } def _screenshot_request(self, screenshot_config:ScreenshotConfig): return { 'method': 'GET', 'url': self.host + '/screenshot', 'timeout': (self.connect_timeout, self.screenshot_api_read_timeout), 'headers': { 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, 'params': screenshot_config.to_api_params(key=self.key) } def _extraction_request(self, extraction_config:ExtractionConfig): headers = { 'content-type': extraction_config.content_type, 'accept-encoding': self.body_handler.content_encoding, 'content-encoding': extraction_config.document_compression_format if extraction_config.document_compression_format else None, 'accept': self.body_handler.accept, 'user-agent': self.ua } if extraction_config.document_compression_format: headers['content-encoding'] = extraction_config.document_compression_format.value return { 'method': 'POST', 'url': self.host + '/extraction', 'data': extraction_config.body, 'timeout': (self.connect_timeout, self.extraction_api_read_timeout), 'headers': headers, 'params': extraction_config.to_api_params(key=self.key) } def account(self) -> Union[str, Dict]: response = self._http_handler( method='GET', url=self.host + '/account', params={'key': self.key}, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8') def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None): params = {'key': self.key, 'format': format} if period is not None: params['period'] = period if aggregation is not None: params['aggregation'] = ','.join(aggregation) response = self._http_handler( method='GET', url=self.host + '/scrape/monitoring/metrics', params=params, timeout=(self.connect_timeout, self.monitoring_api_read_timeout), verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8') def get_monitoring_target_metrics( self, domain:str, group_subdomain:bool=False, period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H, start:Optional[datetime.datetime]=None, end:Optional[datetime.datetime]=None, ): params = { 'key': self.key, 'domain': domain, 'group_subdomain': group_subdomain } if (start is not None and end is None) or (start is None and end is not None): raise ValueError('You must provide both start and end date') if start is not None and end is not None: params['start'] = start.strftime(self.DATETIME_FORMAT) params['end'] = end.strftime(self.DATETIME_FORMAT) period = None params['period'] = period response = self._http_handler( method='GET', url=self.host + '/scrape/monitoring/metrics/target', timeout=(self.connect_timeout, self.monitoring_api_read_timeout), params=params, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8') def resilient_scrape( self, scrape_config:ScrapeConfig, retry_on_errors:Set[Exception]={ScrapflyError}, retry_on_status_code:Optional[List[int]]=None, tries: int = 5, delay: int = 20, ) -> ScrapeApiResponse: assert retry_on_errors is not None, 'Retry on error is None' assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()' @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay) def inner() -> ScrapeApiResponse: try: return self.scrape(scrape_config=scrape_config) except (UpstreamHttpClientError, UpstreamHttpServerError) as e: if retry_on_status_code is not None and e.api_response: if e.api_response.upstream_status_code in retry_on_status_code: raise e else: return e.api_response raise e return inner() def open(self): if self.http_session is None: self.http_session = Session() self.http_session.verify = self.verify self.http_session.timeout = (self.connect_timeout, self.default_read_timeout) self.http_session.params['key'] = self.key self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding self.http_session.headers['accept'] = self.body_handler.accept self.http_session.headers['user-agent'] = self.ua def close(self): self.http_session.close() self.http_session = None def __enter__(self) -> 'ScrapflyClient': self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config) async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None): if concurrency is None: concurrency = self.max_concurrency elif concurrency == self.CONCURRENCY_AUTO: concurrency = self.account()['subscription']['max_concurrency'] loop = asyncio.get_running_loop() processing_tasks = [] results = [] processed_tasks = 0 expected_tasks = len(scrape_configs) def scrape_done_callback(task:Task): nonlocal processed_tasks try: if task.cancelled() is True: return error = task.exception() if error is not None: results.append(error) else: results.append(task.result()) finally: processing_tasks.remove(task) processed_tasks += 1 while scrape_configs or results or processing_tasks: logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) if scrape_configs: if len(processing_tasks) < concurrency: # @todo handle backpressure for _ in range(0, concurrency - len(processing_tasks)): try: scrape_config = scrape_configs.pop() except: break scrape_config.raise_on_upstream_error = False task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop)) processing_tasks.append(task) task.add_done_callback(scrape_done_callback) for _ in results: result = results.pop() yield result await asyncio.sleep(.5) logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse: """ Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } } """ try: logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url)) request_data = self._scrape_request(scrape_config=scrape_config) response = self._http_handler(**request_data) scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config) self.reporter.report(scrape_api_response=scrape_api_response) return scrape_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config) @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse: """ Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } } """ try: logger.debug('--> %s Screenshoting' % (screenshot_config.url)) request_data = self._screenshot_request(screenshot_config=screenshot_config) response = self._http_handler(**request_data) screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config) return screenshot_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.extract, extraction_config) @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse: """ Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } } """ try: logger.debug('--> %s Extracting data from' % (extraction_config.content_type)) request_data = self._extraction_request(extraction_config=extraction_config) response = self._http_handler(**request_data) extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config) return extraction_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e def _handle_response(self, response:Response, scrape_config:ScrapeConfig) -> ScrapeApiResponse: try: api_response = self._handle_api_response( response=response, scrape_config=scrape_config, raise_on_upstream_error=scrape_config.raise_on_upstream_error ) if scrape_config.method == 'HEAD': logger.debug('<-- [%s %s] %s | %ss' % ( api_response.response.status_code, api_response.response.reason, api_response.response.request.url, 0 )) else: logger.debug('<-- [%s %s] %s | %ss' % ( api_response.result['result']['status_code'], api_response.result['result']['reason'], api_response.result['config']['url'], api_response.result['result']['duration']) ) logger.debug('Log url: %s' % api_response.result['result']['log_url']) return api_response except UpstreamHttpError as e: logger.critical(e.api_response.error_message) raise except HttpError as e: if e.api_response is not None: logger.critical(e.api_response.error_message) else: logger.critical(e.message) raise except ScrapflyError as e: logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url)) raise def _handle_screenshot_response(self, response:Response, screenshot_config:ScreenshotConfig) -> ScreenshotApiResponse: try: api_response = self._handle_screenshot_api_response( response=response, screenshot_config=screenshot_config, raise_on_upstream_error=screenshot_config.raise_on_upstream_error ) return api_response except UpstreamHttpError as e: logger.critical(e.api_response.error_message) raise except HttpError as e: if e.api_response is not None: logger.critical(e.api_response.error_message) else: logger.critical(e.message) raise except ScrapflyError as e: logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url)) raise def _handle_extraction_response(self, response:Response, extraction_config:ExtractionConfig) -> ExtractionApiResponse: try: api_response = self._handle_extraction_api_response( response=response, extraction_config=extraction_config, raise_on_upstream_error=extraction_config.raise_on_upstream_error ) return api_response except UpstreamHttpError as e: logger.critical(e.api_response.error_message) raise except HttpError as e: if e.api_response is not None: logger.critical(e.api_response.error_message) else: logger.critical(e.message) raise except ScrapflyError as e: logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url)) raise def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None): """ Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str] """ if screenshot_api_response.screenshot_success is not True: raise RuntimeError('Screenshot was not successful') if not screenshot_api_response.image: raise RuntimeError('Screenshot binary does not exist') content = screenshot_api_response.image extension_name = screenshot_api_response.metadata['extension_name'] if path: os.makedirs(path, exist_ok=True) file_path = os.path.join(path, f'{name}.{extension_name}') else: file_path = f'{name}.{extension_name}' if isinstance(content, bytes): content = BytesIO(content) with open(file_path, 'wb') as f: shutil.copyfileobj(content, f, length=131072) def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None): """ Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str] """ if not api_response.scrape_result['screenshots']: raise RuntimeError('Screenshot %s do no exists' % name) try: api_response.scrape_result['screenshots'][name] except KeyError: raise RuntimeError('Screenshot %s do no exists' % name) screenshot_response = self._http_handler( method='GET', url=api_response.scrape_result['screenshots'][name]['url'], params={'key': self.key}, verify=self.verify ) screenshot_response.raise_for_status() if not name.endswith('.jpg'): name += '.jpg' api_response.sink(path=path, name=name, content=screenshot_response.content) def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str: scrape_result = api_response.result['result'] scrape_config = api_response.result['config'] file_content = content or scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = scrape_config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) return file_path def _handle_scrape_large_objects( self, callback_url:str, format: Literal['clob', 'blob'] ) -> Tuple[Union[BytesIO, str], str]: if format not in ['clob', 'blob']: raise ContentError('Large objects handle can handles format format [blob, clob], given: %s' % format) response = self._http_handler(**{ 'method': 'GET', 'url': callback_url, 'verify': self.verify, 'timeout': (self.connect_timeout, self.default_read_timeout), 'headers': { 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, 'params': {'key': self.key} }) if self.body_handler.support(headers=response.headers): content = self.body_handler(content=response.content, content_type=response.headers['content-type']) else: content = response.content if format == 'clob': return content.decode('utf-8'), 'text' return BytesIO(content), 'binary' def _handle_api_response( self, response: Response, scrape_config:ScrapeConfig, raise_on_upstream_error: Optional[bool] = True ) -> ScrapeApiResponse: if scrape_config.method == 'HEAD': body = None else: if self.body_handler.support(headers=response.headers): body = self.body_handler(content=response.content, content_type=response.headers['content-type']) else: body = response.content.decode('utf-8') api_response:ScrapeApiResponse = ScrapeApiResponse( response=response, request=response.request, api_result=body, scrape_config=scrape_config, large_object_handler=self._handle_scrape_large_objects ) api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error) return api_response def _handle_screenshot_api_response( self, response: Response, screenshot_config:ScreenshotConfig, raise_on_upstream_error: Optional[bool] = True ) -> ScreenshotApiResponse: if self.body_handler.support(headers=response.headers): body = self.body_handler(content=response.content, content_type=response.headers['content-type']) else: body = {'result': response.content} api_response:ScreenshotApiResponse = ScreenshotApiResponse( response=response, request=response.request, api_result=body, screenshot_config=screenshot_config ) api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error) return api_response def _handle_extraction_api_response( self, response: Response, extraction_config:ExtractionConfig, raise_on_upstream_error: Optional[bool] = True ) -> ExtractionApiResponse: if self.body_handler.support(headers=response.headers): body = self.body_handler(content=response.content, content_type=response.headers['content-type']) else: body = response.content.decode('utf-8') api_response:ExtractionApiResponse = ExtractionApiResponse( response=response, request=response.request, api_result=body, extraction_config=extraction_config ) api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error) return api_response
Class variables
var CONCURRENCY_AUTO
var DATETIME_FORMAT
var DEFAULT_CONNECT_TIMEOUT
var DEFAULT_EXTRACTION_API_READ_TIMEOUT
var DEFAULT_READ_TIMEOUT
var DEFAULT_SCREENSHOT_API_READ_TIMEOUT
var DEFAULT_WEBSCRAPING_API_READ_TIMEOUT
var HOST
var brotli : bool
var connect_timeout : int
var debug : bool
var default_read_timeout : int
var distributed_mode : bool
var extraction_api_read_timeout : int
var host : str
var key : str
var max_concurrency : int
var monitoring_api_read_timeout : int
var read_timeout : int
var reporter : scrapfly.reporter.Reporter
var screenshot_api_read_timeout : int
var verify : bool
var version : str
var web_scraping_api_read_timeout : int
Instance variables
var http
-
Expand source code
@property def http(self): return self._http_handler
var ua : str
-
Expand source code
@property def ua(self) -> str: return 'ScrapflySDK/%s (Python %s, %s, %s)' % ( self.version, platform.python_version(), platform.uname().system, platform.uname().machine )
Methods
def account(self) ‑> Union[str, Dict]
-
Expand source code
def account(self) -> Union[str, Dict]: response = self._http_handler( method='GET', url=self.host + '/account', params={'key': self.key}, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8')
async def async_extraction(self, extraction_config: ExtractionConfig, loop: Optional[asyncio.events.AbstractEventLoop] = None) ‑> ExtractionApiResponse
-
Expand source code
async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.extract, extraction_config)
async def async_scrape(self, scrape_config: ScrapeConfig, loop: Optional[asyncio.events.AbstractEventLoop] = None) ‑> ScrapeApiResponse
-
Expand source code
async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)
async def async_screenshot(self, screenshot_config: ScreenshotConfig, loop: Optional[asyncio.events.AbstractEventLoop] = None) ‑> ScreenshotApiResponse
-
Expand source code
async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config)
def close(self)
-
Expand source code
def close(self): self.http_session.close() self.http_session = None
async def concurrent_scrape(self, scrape_configs: List[ScrapeConfig], concurrency: Optional[int] = None)
-
Expand source code
async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None): if concurrency is None: concurrency = self.max_concurrency elif concurrency == self.CONCURRENCY_AUTO: concurrency = self.account()['subscription']['max_concurrency'] loop = asyncio.get_running_loop() processing_tasks = [] results = [] processed_tasks = 0 expected_tasks = len(scrape_configs) def scrape_done_callback(task:Task): nonlocal processed_tasks try: if task.cancelled() is True: return error = task.exception() if error is not None: results.append(error) else: results.append(task.result()) finally: processing_tasks.remove(task) processed_tasks += 1 while scrape_configs or results or processing_tasks: logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) if scrape_configs: if len(processing_tasks) < concurrency: # @todo handle backpressure for _ in range(0, concurrency - len(processing_tasks)): try: scrape_config = scrape_configs.pop() except: break scrape_config.raise_on_upstream_error = False task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop)) processing_tasks.append(task) task.add_done_callback(scrape_done_callback) for _ in results: result = results.pop() yield result await asyncio.sleep(.5) logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))
def extract(self, extraction_config: ExtractionConfig, no_raise: bool = False) ‑> ExtractionApiResponse
-
Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str
If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example
'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } }
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse: """ Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } } """ try: logger.debug('--> %s Extracting data from' % (extraction_config.content_type)) request_data = self._extraction_request(extraction_config=extraction_config) response = self._http_handler(**request_data) extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config) return extraction_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e
def get_monitoring_metrics(self, format: str = 'structured', period: Optional[str] = None, aggregation: Optional[List[Literal['account', 'project', 'target']]] = None)
-
Expand source code
def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None): params = {'key': self.key, 'format': format} if period is not None: params['period'] = period if aggregation is not None: params['aggregation'] = ','.join(aggregation) response = self._http_handler( method='GET', url=self.host + '/scrape/monitoring/metrics', params=params, timeout=(self.connect_timeout, self.monitoring_api_read_timeout), verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8')
def get_monitoring_target_metrics(self, domain: str, group_subdomain: bool = False, period: Optional[Literal['subscription', 'last7d', 'last24h', 'last1h', 'last5m']] = 'last24h', start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None)
-
Expand source code
def get_monitoring_target_metrics( self, domain:str, group_subdomain:bool=False, period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H, start:Optional[datetime.datetime]=None, end:Optional[datetime.datetime]=None, ): params = { 'key': self.key, 'domain': domain, 'group_subdomain': group_subdomain } if (start is not None and end is None) or (start is None and end is not None): raise ValueError('You must provide both start and end date') if start is not None and end is not None: params['start'] = start.strftime(self.DATETIME_FORMAT) params['end'] = end.strftime(self.DATETIME_FORMAT) period = None params['period'] = period response = self._http_handler( method='GET', url=self.host + '/scrape/monitoring/metrics/target', timeout=(self.connect_timeout, self.monitoring_api_read_timeout), params=params, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8')
def open(self)
-
Expand source code
def open(self): if self.http_session is None: self.http_session = Session() self.http_session.verify = self.verify self.http_session.timeout = (self.connect_timeout, self.default_read_timeout) self.http_session.params['key'] = self.key self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding self.http_session.headers['accept'] = self.body_handler.accept self.http_session.headers['user-agent'] = self.ua
def resilient_scrape(self, scrape_config: ScrapeConfig, retry_on_errors: Set[Exception] = {<class 'scrapfly.errors.ScrapflyError'>}, retry_on_status_code: Optional[List[int]] = None, tries: int = 5, delay: int = 20) ‑> ScrapeApiResponse
-
Expand source code
def resilient_scrape( self, scrape_config:ScrapeConfig, retry_on_errors:Set[Exception]={ScrapflyError}, retry_on_status_code:Optional[List[int]]=None, tries: int = 5, delay: int = 20, ) -> ScrapeApiResponse: assert retry_on_errors is not None, 'Retry on error is None' assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()' @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay) def inner() -> ScrapeApiResponse: try: return self.scrape(scrape_config=scrape_config) except (UpstreamHttpClientError, UpstreamHttpServerError) as e: if retry_on_status_code is not None and e.api_response: if e.api_response.upstream_status_code in retry_on_status_code: raise e else: return e.api_response raise e return inner()
def save_scrape_screenshot(self, api_response: ScrapeApiResponse, name: str, path: Optional[str] = None)
-
Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str]
Expand source code
def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None): """ Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str] """ if not api_response.scrape_result['screenshots']: raise RuntimeError('Screenshot %s do no exists' % name) try: api_response.scrape_result['screenshots'][name] except KeyError: raise RuntimeError('Screenshot %s do no exists' % name) screenshot_response = self._http_handler( method='GET', url=api_response.scrape_result['screenshots'][name]['url'], params={'key': self.key}, verify=self.verify ) screenshot_response.raise_for_status() if not name.endswith('.jpg'): name += '.jpg' api_response.sink(path=path, name=name, content=screenshot_response.content)
def save_screenshot(self, screenshot_api_response: ScreenshotApiResponse, name: str, path: Optional[str] = None)
-
Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str]
Expand source code
def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None): """ Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str] """ if screenshot_api_response.screenshot_success is not True: raise RuntimeError('Screenshot was not successful') if not screenshot_api_response.image: raise RuntimeError('Screenshot binary does not exist') content = screenshot_api_response.image extension_name = screenshot_api_response.metadata['extension_name'] if path: os.makedirs(path, exist_ok=True) file_path = os.path.join(path, f'{name}.{extension_name}') else: file_path = f'{name}.{extension_name}' if isinstance(content, bytes): content = BytesIO(content) with open(file_path, 'wb') as f: shutil.copyfileobj(content, f, length=131072)
def scrape(self, scrape_config: ScrapeConfig, no_raise: bool = False) ‑> ScrapeApiResponse
-
Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse
If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example
'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } }
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse: """ Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } } """ try: logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url)) request_data = self._scrape_request(scrape_config=scrape_config) response = self._http_handler(**request_data) scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config) self.reporter.report(scrape_api_response=scrape_api_response) return scrape_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e
def screenshot(self, screenshot_config: ScreenshotConfig, no_raise: bool = False) ‑> ScreenshotApiResponse
-
Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str
If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example
'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } }
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse: """ Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } } """ try: logger.debug('--> %s Screenshoting' % (screenshot_config.url)) request_data = self._screenshot_request(screenshot_config=screenshot_config) response = self._http_handler(**request_data) screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config) return screenshot_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e
def sink(self, api_response: ScrapeApiResponse, content: Union[str, bytes, ForwardRef(None)] = None, path: Optional[str] = None, name: Optional[str] = None, file: Union[TextIO, _io.BytesIO, ForwardRef(None)] = None) ‑> str
-
Expand source code
def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str: scrape_result = api_response.result['result'] scrape_config = api_response.result['config'] file_content = content or scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = scrape_config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) return file_path
class ScrapflyError (message: str, code: str, http_status_code: int, resource: Optional[str] = None, is_retryable: bool = False, retry_delay: Optional[int] = None, retry_times: Optional[int] = None, documentation_url: Optional[str] = None, api_response: Optional[ForwardRef('ApiResponse')] = None)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyError(Exception): KIND_HTTP_BAD_RESPONSE = 'HTTP_BAD_RESPONSE' KIND_SCRAPFLY_ERROR = 'SCRAPFLY_ERROR' RESOURCE_PROXY = 'PROXY' RESOURCE_THROTTLE = 'THROTTLE' RESOURCE_SCRAPE = 'SCRAPE' RESOURCE_ASP = 'ASP' RESOURCE_SCHEDULE = 'SCHEDULE' RESOURCE_WEBHOOK = 'WEBHOOK' RESOURCE_SESSION = 'SESSION' def __init__( self, message: str, code: str, http_status_code: int, resource: Optional[str]=None, is_retryable: bool = False, retry_delay: Optional[int] = None, retry_times: Optional[int] = None, documentation_url: Optional[str] = None, api_response: Optional['ApiResponse'] = None ): self.message = message self.code = code self.retry_delay = retry_delay self.retry_times = retry_times self.resource = resource self.is_retryable = is_retryable self.documentation_url = documentation_url self.api_response = api_response self.http_status_code = http_status_code super().__init__(self.message, str(self.code)) def __str__(self): message = self.message if self.documentation_url is not None: message += '. Learn more: %s' % self.documentation_url return message
Ancestors
- builtins.Exception
- builtins.BaseException
Subclasses
- scrapfly.errors.ExtraUsageForbidden
- scrapfly.errors.HttpError
Class variables
var KIND_HTTP_BAD_RESPONSE
var KIND_SCRAPFLY_ERROR
var RESOURCE_ASP
var RESOURCE_PROXY
var RESOURCE_SCHEDULE
var RESOURCE_SCRAPE
var RESOURCE_SESSION
var RESOURCE_THROTTLE
var RESOURCE_WEBHOOK
class ScrapflyProxyError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyProxyError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyScheduleError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyScheduleError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyScrapeError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyScrapeError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflySessionError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflySessionError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyThrottleError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyThrottleError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyWebhookError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScrapflyWebhookError(ScraperAPIError): pass
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScreenshotAPIError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ScreenshotAPIError(HttpError): pass
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScreenshotApiResponse (request: requests.models.Request, response: requests.models.Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None)
-
Expand source code
class ScreenshotApiResponse(ApiResponse): def __init__(self, request: Request, response: Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None): super().__init__(request, response) self.screenshot_config = screenshot_config self.result = self.handle_api_result(api_result) @property def image(self) -> Optional[str]: binary = self.result.get('result', None) if binary is None: return '' return binary @property def metadata(self) -> Optional[Dict]: if not self.image: return {} content_type = self.response.headers.get('content-type') extension_name = content_type[content_type.find('/') + 1:].split(';')[0] return { 'extension_name': extension_name, 'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'), 'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url') } @property def screenshot_success(self) -> bool: if not self.image: return False return True @property def error(self) -> Optional[Dict]: if self.image: return None if self.screenshot_success is False: return self.result def _is_api_error(self, api_result: Dict) -> bool: if api_result is None: return True return 'error_id' in api_result def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return api_result def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Ancestors
Instance variables
var error : Optional[Dict]
-
Expand source code
@property def error(self) -> Optional[Dict]: if self.image: return None if self.screenshot_success is False: return self.result
var image : Optional[str]
-
Expand source code
@property def image(self) -> Optional[str]: binary = self.result.get('result', None) if binary is None: return '' return binary
var metadata : Optional[Dict]
-
Expand source code
@property def metadata(self) -> Optional[Dict]: if not self.image: return {} content_type = self.response.headers.get('content-type') extension_name = content_type[content_type.find('/') + 1:].split(';')[0] return { 'extension_name': extension_name, 'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'), 'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url') }
var screenshot_success : bool
-
Expand source code
@property def screenshot_success(self) -> bool: if not self.image: return False return True
Methods
def handle_api_result(self, api_result: bytes) ‑> FrozenDict
-
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return api_result
def raise_for_result(self, raise_on_upstream_error=True, error_class=scrapfly.errors.ScreenshotAPIError)
-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Inherited members
class ScreenshotConfig (url: str, format: Optional[Format] = None, capture: Optional[str] = None, resolution: Optional[str] = None, country: Optional[str] = None, timeout: Optional[int] = None, rendering_wait: Optional[int] = None, wait_for_selector: Optional[str] = None, options: Optional[List[Options]] = None, auto_scroll: Optional[bool] = None, js: Optional[str] = None, cache: Optional[bool] = None, cache_ttl: Optional[bool] = None, cache_clear: Optional[bool] = None, webhook: Optional[str] = None, raise_on_upstream_error: bool = True)
-
Expand source code
class ScreenshotConfig(BaseApiConfig): url: str format: Optional[Format] = None capture: Optional[str] = None resolution: Optional[str] = None country: Optional[str] = None timeout: Optional[int] = None # in milliseconds rendering_wait: Optional[int] = None # in milliseconds wait_for_selector: Optional[str] = None options: Optional[List[Options]] = None auto_scroll: Optional[bool] = None js: Optional[str] = None cache: Optional[bool] = None cache_ttl: Optional[bool] = None cache_clear: Optional[bool] = None webhook: Optional[str] = None raise_on_upstream_error: bool = True def __init__( self, url: str, format: Optional[Format] = None, capture: Optional[str] = None, resolution: Optional[str] = None, country: Optional[str] = None, timeout: Optional[int] = None, # in milliseconds rendering_wait: Optional[int] = None, # in milliseconds wait_for_selector: Optional[str] = None, options: Optional[List[Options]] = None, auto_scroll: Optional[bool] = None, js: Optional[str] = None, cache: Optional[bool] = None, cache_ttl: Optional[bool] = None, cache_clear: Optional[bool] = None, webhook: Optional[str] = None, raise_on_upstream_error: bool = True ): assert(type(url) is str) self.url = url self.key = None self.format = format self.capture = capture self.resolution = resolution self.country = country self.timeout = timeout self.rendering_wait = rendering_wait self.wait_for_selector = wait_for_selector self.options = [Options(flag) for flag in options] if options else None self.auto_scroll = auto_scroll self.js = js self.cache = cache self.cache_ttl = cache_ttl self.cache_clear = cache_clear self.webhook = webhook self.raise_on_upstream_error = raise_on_upstream_error def to_api_params(self, key:str) -> Dict: params = { 'key': self.key or key, 'url': self.url } if self.format: params['format'] = Format(self.format).value if self.capture: params['capture'] = self.capture if self.resolution: params['resolution'] = self.resolution if self.country is not None: params['country'] = self.country if self.timeout is not None: params['timeout'] = self.timeout if self.rendering_wait is not None: params['rendering_wait'] = self.rendering_wait if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.options is not None: params["options"] = ",".join(flag.value for flag in self.options) if self.auto_scroll is not None: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.cache is not None: params['cache'] = self._bool_to_http(self.cache) if self.cache_ttl is not None: params['cache_ttl'] = self._bool_to_http(self.cache_ttl) if self.cache_clear is not None: params['cache_clear'] = self._bool_to_http(self.cache_clear) else: if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.cache_clear is not None: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.webhook is not None: params['webhook_name'] = self.webhook return params
Ancestors
Class variables
var auto_scroll : Optional[bool]
var cache : Optional[bool]
var cache_clear : Optional[bool]
var cache_ttl : Optional[bool]
var capture : Optional[str]
var country : Optional[str]
var format : Optional[Format]
var js : Optional[str]
var options : Optional[List[Options]]
var raise_on_upstream_error : bool
var rendering_wait : Optional[int]
var resolution : Optional[str]
var timeout : Optional[int]
var url : str
var wait_for_selector : Optional[str]
var webhook : Optional[str]
Methods
def to_api_params(self, key: str) ‑> Dict
-
Expand source code
def to_api_params(self, key:str) -> Dict: params = { 'key': self.key or key, 'url': self.url } if self.format: params['format'] = Format(self.format).value if self.capture: params['capture'] = self.capture if self.resolution: params['resolution'] = self.resolution if self.country is not None: params['country'] = self.country if self.timeout is not None: params['timeout'] = self.timeout if self.rendering_wait is not None: params['rendering_wait'] = self.rendering_wait if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.options is not None: params["options"] = ",".join(flag.value for flag in self.options) if self.auto_scroll is not None: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.cache is not None: params['cache'] = self._bool_to_http(self.cache) if self.cache_ttl is not None: params['cache_ttl'] = self._bool_to_http(self.cache_ttl) if self.cache_clear is not None: params['cache_clear'] = self._bool_to_http(self.cache_clear) else: if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.cache_clear is not None: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.webhook is not None: params['webhook_name'] = self.webhook return params
class UpstreamHttpClientError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class UpstreamHttpClientError(UpstreamHttpError): pass
Ancestors
- scrapfly.errors.UpstreamHttpError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
class UpstreamHttpError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class UpstreamHttpError(HttpError): pass
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
class UpstreamHttpServerError (request: requests.models.Request, response: Optional[requests.models.Response] = None, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class UpstreamHttpServerError(UpstreamHttpClientError): pass
Ancestors
- UpstreamHttpClientError
- scrapfly.errors.UpstreamHttpError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException