Module scrapfly.client

Classes

class ScraperAPI
Expand source code
class ScraperAPI:

    MONITORING_DATA_FORMAT_STRUCTURED = 'structured'
    MONITORING_DATA_FORMAT_PROMETHEUS = 'prometheus'

    MONITORING_PERIOD_SUBSCRIPTION = 'subscription'
    MONITORING_PERIOD_LAST_7D = 'last7d'
    MONITORING_PERIOD_LAST_24H = 'last24h'
    MONITORING_PERIOD_LAST_1H = 'last1h'
    MONITORING_PERIOD_LAST_5m = 'last5m'

    MONITORING_ACCOUNT_AGGREGATION = 'account'
    MONITORING_PROJECT_AGGREGATION = 'project'
    MONITORING_TARGET_AGGREGATION = 'target'

Class variables

var MONITORING_ACCOUNT_AGGREGATION
var MONITORING_DATA_FORMAT_PROMETHEUS
var MONITORING_DATA_FORMAT_STRUCTURED
var MONITORING_PERIOD_LAST_1H
var MONITORING_PERIOD_LAST_24H
var MONITORING_PERIOD_LAST_5m
var MONITORING_PERIOD_LAST_7D
var MONITORING_PERIOD_SUBSCRIPTION
var MONITORING_PROJECT_AGGREGATION
var MONITORING_TARGET_AGGREGATION
class ScrapflyClient (key: str,
host: str = 'https://api.scrapfly.io',
verify=True,
debug: bool = False,
max_concurrency: int = 1,
connect_timeout: int = 30,
web_scraping_api_read_timeout: int = 160,
extraction_api_read_timeout: int = 35,
screenshot_api_read_timeout: int = 60,
read_timeout: int = 30,
default_read_timeout: int = 30,
reporter: Callable | None = None,
**kwargs)
Expand source code
class ScrapflyClient:

    HOST = 'https://api.scrapfly.io'
    DEFAULT_CONNECT_TIMEOUT = 30
    DEFAULT_READ_TIMEOUT = 30

    DEFAULT_WEBSCRAPING_API_READ_TIMEOUT = 160 # 155 real
    DEFAULT_SCREENSHOT_API_READ_TIMEOUT = 60  # 30 real
    DEFAULT_EXTRACTION_API_READ_TIMEOUT = 35 # 30 real
    DEFAULT_CRAWLER_API_READ_TIMEOUT = 30

    host:str
    key:str
    max_concurrency:int
    verify:bool
    debug:bool
    distributed_mode:bool
    connect_timeout:int
    web_scraping_api_read_timeout:int
    screenshot_api_read_timeout:int
    extraction_api_read_timeout:int
    monitoring_api_read_timeout:int
    default_read_timeout:int
    brotli: bool
    reporter:Reporter
    version:str

    # @deprecated
    read_timeout:int

    CONCURRENCY_AUTO = 'auto' # retrieve the allowed concurrency from your account
    DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'

    def __init__(
        self,
        key: str,
        host: str = HOST,
        verify=True,
        debug: bool = False,
        max_concurrency:int=1,
        connect_timeout:int = DEFAULT_CONNECT_TIMEOUT,
        web_scraping_api_read_timeout: int = DEFAULT_WEBSCRAPING_API_READ_TIMEOUT,
        extraction_api_read_timeout: int = DEFAULT_EXTRACTION_API_READ_TIMEOUT,
        screenshot_api_read_timeout: int = DEFAULT_SCREENSHOT_API_READ_TIMEOUT,

        # @deprecated
        read_timeout:int = DEFAULT_READ_TIMEOUT,
        default_read_timeout:int = DEFAULT_READ_TIMEOUT,
        reporter:Optional[Callable]=None,
        **kwargs
    ):
        if host[-1] == '/':  # remove last '/' if exists
            host = host[:-1]

        if 'distributed_mode' in kwargs:
            warnings.warn("distributed mode is deprecated and will be remove the next version -"
              " user should handle themself the session name based on the concurrency",
              DeprecationWarning,
              stacklevel=2
            )

        if 'brotli' in kwargs:
            warnings.warn("brotli arg is deprecated and will be remove the next version - "
                "brotli is disabled by default",
                DeprecationWarning,
                stacklevel=2
            )

        self.version = __version__
        self.host = host
        self.key = key
        self.verify = verify
        self.debug = debug
        self.connect_timeout = connect_timeout
        self.web_scraping_api_read_timeout = web_scraping_api_read_timeout
        self.screenshot_api_read_timeout = screenshot_api_read_timeout
        self.extraction_api_read_timeout = extraction_api_read_timeout
        self.monitoring_api_read_timeout = default_read_timeout
        self.default_read_timeout = default_read_timeout

        # @deprecated
        self.read_timeout = default_read_timeout

        self.max_concurrency = max_concurrency
        self.body_handler = ResponseBodyHandler(use_brotli=False)
        self.async_executor = ThreadPoolExecutor()
        self.http_session = None

        if not self.verify and not self.HOST.endswith('.local'):
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        if self.debug is True:
            http.client.HTTPConnection.debuglevel = 5

        if reporter is None:
            from .reporter import NoopReporter

            reporter = NoopReporter()

        self.reporter = Reporter(reporter)

    @property
    def ua(self) -> str:
        return 'ScrapflySDK/%s (Python %s, %s, %s)' % (
            self.version,
            platform.python_version(),
            platform.uname().system,
            platform.uname().machine
        )

    @cached_property
    def _http_handler(self):
        return partial(self.http_session.request if self.http_session else requests.request)

    @property
    def http(self):
        return self._http_handler

    def _scrape_request(self, scrape_config:ScrapeConfig):
        return {
            'method': scrape_config.method,
            'url': self.host + '/scrape',
            'data': scrape_config.body,
            'verify': self.verify,
            'timeout': (self.connect_timeout, self.web_scraping_api_read_timeout),
            'headers': {
                'content-type': scrape_config.headers['content-type'] if scrape_config.method in ['POST', 'PUT', 'PATCH'] else self.body_handler.content_type,
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
            'params': scrape_config.to_api_params(key=self.key)
        }
    
    def _screenshot_request(self, screenshot_config:ScreenshotConfig):
        return {
            'method': 'GET',
            'url': self.host + '/screenshot',
            'timeout': (self.connect_timeout, self.screenshot_api_read_timeout),
            'headers': {
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },            
            'params': screenshot_config.to_api_params(key=self.key)
        }        

    def _extraction_request(self, extraction_config:ExtractionConfig):
        headers = {
                'content-type': extraction_config.content_type,
                'accept-encoding': self.body_handler.content_encoding,
                'content-encoding': extraction_config.document_compression_format if extraction_config.document_compression_format else None,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
        }

        if extraction_config.document_compression_format:
            headers['content-encoding'] = extraction_config.document_compression_format.value

        return {
            'method': 'POST',
            'url': self.host + '/extraction',
            'data': extraction_config.body,
            'timeout': (self.connect_timeout, self.extraction_api_read_timeout),
            'headers': headers,
            'params': extraction_config.to_api_params(key=self.key)
        }


    def account(self) -> Union[str, Dict]:
        response = self._http_handler(
            method='GET',
            url=self.host + '/account',
            params={'key': self.key},
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')

    def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None):
        params = {'key': self.key, 'format': format}

        if period is not None:
            params['period'] = period

        if aggregation is not None:
            params['aggregation'] = ','.join(aggregation)

        response = self._http_handler(
            method='GET',
            url=self.host + '/scrape/monitoring/metrics',
            params=params,
            timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')

    def get_monitoring_target_metrics(
            self,
            domain:str,
            group_subdomain:bool=False,
            period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H,
            start:Optional[datetime.datetime]=None,
            end:Optional[datetime.datetime]=None,
    ):
        params = {
            'key': self.key,
            'domain': domain,
            'group_subdomain': group_subdomain
        }

        if (start is not None and end is None) or (start is None and end is not None):
            raise ValueError('You must provide both start and end date')

        if start is not None and end is not None:
            params['start'] = start.strftime(self.DATETIME_FORMAT)
            params['end'] = end.strftime(self.DATETIME_FORMAT)
            period = None

        params['period'] = period

        response = self._http_handler(
            method='GET',
            url=self.host + '/scrape/monitoring/metrics/target',
            timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
            params=params,
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')


    def resilient_scrape(
        self,
        scrape_config:ScrapeConfig,
        retry_on_errors:Set[Exception]={ScrapflyError},
        retry_on_status_code:Optional[List[int]]=None,
        tries: int = 5,
        delay: int = 20,
    ) -> ScrapeApiResponse:
        assert retry_on_errors is not None, 'Retry on error is None'
        assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()'

        @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay)
        def inner() -> ScrapeApiResponse:

            try:
                return self.scrape(scrape_config=scrape_config)
            except (UpstreamHttpClientError, UpstreamHttpServerError) as e:
                if retry_on_status_code is not None and e.api_response:
                    if e.api_response.upstream_status_code in retry_on_status_code:
                        raise e
                    else:
                        return e.api_response

                raise e

        return inner()

    def open(self):
        if self.http_session is None:
            self.http_session = Session()
            self.http_session.verify = self.verify
            self.http_session.timeout = (self.connect_timeout, self.default_read_timeout)
            self.http_session.params['key'] = self.key
            self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding
            self.http_session.headers['accept'] = self.body_handler.accept
            self.http_session.headers['user-agent'] = self.ua

    def close(self):
        self.http_session.close()
        self.http_session = None

    def __enter__(self) -> 'ScrapflyClient':
        self.open()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)

    async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None):
        if concurrency is None:
            concurrency = self.max_concurrency
        elif concurrency == self.CONCURRENCY_AUTO:
            concurrency = self.account()['subscription']['max_concurrency']

        loop = asyncio.get_running_loop()
        processing_tasks = []
        results = []
        processed_tasks = 0
        expected_tasks = len(scrape_configs)

        def scrape_done_callback(task:Task):
            nonlocal processed_tasks

            try:
                if task.cancelled() is True:
                    return

                error = task.exception()

                if error is not None:
                    results.append(error)
                else:
                    results.append(task.result())
            finally:
                processing_tasks.remove(task)
                processed_tasks += 1

        while scrape_configs or results or processing_tasks:
            logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

            if scrape_configs:
                if len(processing_tasks) < concurrency:
                    # @todo handle backpressure
                    for _ in range(0, concurrency - len(processing_tasks)):
                        try:
                            scrape_config = scrape_configs.pop()
                        except:
                            break

                        scrape_config.raise_on_upstream_error = False
                        task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop))
                        processing_tasks.append(task)
                        task.add_done_callback(scrape_done_callback)

            for _ in results:
                result = results.pop()
                yield result

            await asyncio.sleep(.5)

        logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse:
        """
        Scrape a website
        :param scrape_config: ScrapeConfig
        :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration
        :return: ScrapeApiResponse

        If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED',
            'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds',
            'retryable': False,
            'http_code': 422,
            'links': {
                'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED'
            }
        }
        """

        try:
            logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url))
            request_data = self._scrape_request(scrape_config=scrape_config)
            response = self._http_handler(**request_data)
            scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config)

            self.reporter.report(scrape_api_response=scrape_api_response)

            return scrape_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse:
        """
        Take a screenshot
        :param screenshot_config: ScrapeConfig
        :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration
        :return: str

        If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT',
            'message': 'For some reason we were unable to take the screenshot',
            'http_code': 422,
            'links': {
                'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT'
            }
        }
        """

        try:
            logger.debug('--> %s Screenshoting' % (screenshot_config.url))
            request_data = self._screenshot_request(screenshot_config=screenshot_config)
            response = self._http_handler(**request_data)
            screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config)
            return screenshot_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.extract, extraction_config)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse:
        """
        Extract structured data from text content
        :param extraction_config: ExtractionConfig
        :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration
        :return: str

        If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED',
            'message': 'The content type of the response is not supported for extraction',
            'http_code': 422,
            'links': {
                'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED'
            }
        }
        """

        try:
            logger.debug('--> %s Extracting data from' % (extraction_config.content_type))
            request_data = self._extraction_request(extraction_config=extraction_config)
            response = self._http_handler(**request_data)
            extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config)
            return extraction_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    def _handle_response(self, response:Response, scrape_config:ScrapeConfig) -> ScrapeApiResponse:
        try:
            api_response = self._handle_api_response(
                response=response,
                scrape_config=scrape_config,
                raise_on_upstream_error=scrape_config.raise_on_upstream_error
            )

            if scrape_config.method == 'HEAD':
                logger.debug('<-- [%s %s] %s | %ss' % (
                    api_response.response.status_code,
                    api_response.response.reason,
                    api_response.response.request.url,
                    0
                ))
            else:
                logger.debug('<-- [%s %s] %s | %ss' % (
                    api_response.result['result']['status_code'],
                    api_response.result['result']['reason'],
                    api_response.result['config']['url'],
                    api_response.result['result']['duration'])
                )

                logger.debug('Log url: %s' % api_response.result['result']['log_url'])

            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise

    def _handle_screenshot_response(self, response:Response, screenshot_config:ScreenshotConfig) -> ScreenshotApiResponse:    
        try:
            api_response = self._handle_screenshot_api_response(
                response=response,
                screenshot_config=screenshot_config,
                raise_on_upstream_error=screenshot_config.raise_on_upstream_error
            )
            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise         

    def _handle_extraction_response(self, response:Response, extraction_config:ExtractionConfig) -> ExtractionApiResponse:
        try:
            api_response = self._handle_extraction_api_response(
                response=response,
                extraction_config=extraction_config,
                raise_on_upstream_error=extraction_config.raise_on_upstream_error
            )
            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise    

    def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None):
        """
        Save a screenshot from a screenshot API response
        :param api_response: ScreenshotApiResponse
        :param name: str - name of the screenshot to save as
        :param path: Optional[str]
        """

        if screenshot_api_response.screenshot_success is not True:
            raise RuntimeError('Screenshot was not successful')

        if not screenshot_api_response.image:
            raise RuntimeError('Screenshot binary does not exist')

        content = screenshot_api_response.image
        extension_name = screenshot_api_response.metadata['extension_name']

        if path:
            os.makedirs(path, exist_ok=True)
            file_path = os.path.join(path, f'{name}.{extension_name}')
        else:
            file_path = f'{name}.{extension_name}'

        if isinstance(content, bytes):
            content = BytesIO(content)

        with open(file_path, 'wb') as f:
            shutil.copyfileobj(content, f, length=131072)

    def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None):
        """
        Save a screenshot from a scrape result
        :param api_response: ScrapeApiResponse
        :param name: str - name of the screenshot given in the scrape config
        :param path: Optional[str]
        """

        if not api_response.scrape_result['screenshots']:
            raise RuntimeError('Screenshot %s do no exists' % name)

        try:
            api_response.scrape_result['screenshots'][name]
        except KeyError:
            raise RuntimeError('Screenshot %s do no exists' % name)

        screenshot_response = self._http_handler(
            method='GET',
            url=api_response.scrape_result['screenshots'][name]['url'],
            params={'key': self.key},
            verify=self.verify
        )

        screenshot_response.raise_for_status()

        if not name.endswith('.jpg'):
            name += '.jpg'

        api_response.sink(path=path, name=name, content=screenshot_response.content)

    def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str:
        scrape_result = api_response.result['result']
        scrape_config = api_response.result['config']

        file_content = content or scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = scrape_config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)
        return file_path

    def _handle_scrape_large_objects(
        self,
        callback_url:str,
        format: Literal['clob', 'blob']
    ) -> Tuple[Union[BytesIO, str], str]:
        if format not in ['clob', 'blob']:
            raise ContentError('Large objects handle can handles format format [blob, clob], given: %s' % format)

        response = self._http_handler(**{
            'method': 'GET',
            'url': callback_url,
            'verify': self.verify,
            'timeout': (self.connect_timeout, self.default_read_timeout),
            'headers': {
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
            'params': {'key': self.key}
        })

        if self.body_handler.support(headers=response.headers):
            content = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            content = response.content

        if format == 'clob':
            return content.decode('utf-8'), 'text'

        return BytesIO(content), 'binary'

    def _handle_api_response(
        self,
        response: Response,
        scrape_config:ScrapeConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ScrapeApiResponse:

        if scrape_config.method == 'HEAD':
            body = None
        else:
            if self.body_handler.support(headers=response.headers):
                body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
            else:
                body = response.content.decode('utf-8')

        api_response:ScrapeApiResponse = ScrapeApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            scrape_config=scrape_config,
            large_object_handler=self._handle_scrape_large_objects
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

    def _handle_screenshot_api_response(
        self,
        response: Response,
        screenshot_config:ScreenshotConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ScreenshotApiResponse:

        if self.body_handler.support(headers=response.headers):
            body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            body = {'result': response.content}

        api_response:ScreenshotApiResponse = ScreenshotApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            screenshot_config=screenshot_config
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

    def _handle_extraction_api_response(
        self,
        response: Response,
        extraction_config:ExtractionConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ExtractionApiResponse:
        
        if self.body_handler.support(headers=response.headers):
            body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            body = response.content.decode('utf-8')

        api_response:ExtractionApiResponse = ExtractionApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            extraction_config=extraction_config
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

    @backoff.on_exception(backoff.expo, exception=ConnectionError, max_tries=5)
    def start_crawl(self, crawler_config: CrawlerConfig) -> CrawlerStartResponse:
        """
        Start a crawler job

        :param crawler_config: CrawlerConfig
        :return: CrawlerStartResponse with UUID and initial status

        Example:
            ```python
            from scrapfly import ScrapflyClient, CrawlerConfig

            client = ScrapflyClient(key='YOUR_API_KEY')
            config = CrawlerConfig(
                url='https://example.com',
                page_limit=100,
                max_depth=3
            )

            response = client.start_crawl(config)
            print(f"Crawler started: {response.uuid}")
            ```
        """
        # Get crawler config params (without key)
        body_params = crawler_config.to_api_params()

        # API key must be passed as query parameter, not in body
        query_params = {'key': self.key}

        timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

        url = f'{self.host}/crawl'
        logger.debug(f"Crawler API POST {url}?key=***")
        logger.debug(f"Crawler API body: {body_params}")

        response = self._http_handler(
            method='POST',
            url=url,
            params=query_params,  # key as query param
            json=body_params,      # config in body
            timeout=timeout,
            headers={'User-Agent': self.ua},
            verify=self.verify
        )

        if response.status_code not in (200, 201):
            # Log error details for debugging
            try:
                error_detail = response.json()
            except:
                error_detail = response.text
            logger.debug(f"Crawler API error ({response.status_code}): {error_detail}")
            self._handle_crawler_error_response(response)

        result = response.json()
        return CrawlerStartResponse(result)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def get_crawl_status(self, uuid: str) -> CrawlerStatusResponse:
        """
        Get crawler job status

        :param uuid: Crawler job UUID
        :return: CrawlerStatusResponse with progress information

        Example:
            ```python
            status = client.get_crawl_status(uuid)
            print(f"Status: {status.status}")
            print(f"Progress: {status.progress_pct:.1f}%")
            print(f"Crawled: {status.urls_crawled}/{status.urls_discovered}")

            if status.is_complete:
                print("Crawl completed!")
            ```
        """
        timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

        response = self._http_handler(
            method='GET',
            url=f'{self.host}/crawl/{uuid}/status',
            params={'key': self.key},  # key as query param (already correct)
            timeout=timeout,
            headers={'User-Agent': self.ua},
            verify=self.verify
        )

        if response.status_code != 200:
            self._handle_crawler_error_response(response)

        result = response.json()
        return CrawlerStatusResponse(result)

    def cancel_crawl(self, crawl_uuid: str) -> bool:
        """
        Cancel a running crawler job

        :param crawl_uuid: Crawler job UUID to cancel
        :return: True if cancelled successfully

        Example:
            ```python
            # Start a crawl
            crawl = client.start_crawl(config)

            # Cancel it
            client.cancel_crawl(crawl.uuid)
            ```
        """
        timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

        response = self._http_handler(
            method='DELETE',
            url=f'{self.host}/crawl/{crawl_uuid}',
            params={'key': self.key},
            timeout=timeout,
            headers={'User-Agent': self.ua},
            verify=self.verify
        )

        if response.status_code not in (200, 204):
            self._handle_crawler_error_response(response)

        return True

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def get_crawl_artifact(
        self,
        uuid: str,
        artifact_type: str = 'warc'
    ) -> CrawlerArtifactResponse:
        """
        Download crawler job artifact

        :param uuid: Crawler job UUID
        :param artifact_type: Artifact type ('warc' or 'har')
        :return: CrawlerArtifactResponse with WARC data and parsing utilities

        Example:
            ```python
            # Wait for crawl to complete
            while True:
                status = client.get_crawl_status(uuid)
                if status.is_complete:
                    break
                time.sleep(5)

            # Download artifact
            artifact = client.get_crawl_artifact(uuid)

            # Easy mode: get all pages
            pages = artifact.get_pages()
            for page in pages:
                print(f"{page['url']}: {page['status_code']}")

            # Memory-efficient: iterate
            for record in artifact.iter_responses():
                process(record.content)

            # Save to file
            artifact.save('crawl.warc.gz')
            ```
        """
        timeout = (self.connect_timeout, 300)  # 5 minutes for large downloads

        response = self._http_handler(
            method='GET',
            url=f'{self.host}/crawl/{uuid}/artifact',
            params={
                'key': self.key,
                'type': artifact_type
            },
            timeout=timeout,
            headers={'User-Agent': self.ua},
            verify=self.verify
        )

        if response.status_code != 200:
            self._handle_crawler_error_response(response)

        return CrawlerArtifactResponse(response.content, artifact_type=artifact_type)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def get_crawl_contents(
        self,
        uuid: str,
        format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html'
    ) -> Dict[str, Any]:
        """
        Get crawl contents in a specific format

        Retrieves extracted content from crawled pages in the format(s) specified
        in your crawl configuration (via content_formats parameter).

        :param uuid: Crawler job UUID
        :param format: Content format - 'html', 'clean_html', 'markdown', 'json', 'text',
                      'extracted_data', 'page_metadata'
        :return: Dictionary with format {"contents": {url: content, ...}, "links": {...}}

        Example:
            ```python
            # Get all content in markdown format
            result = client.get_crawl_contents(uuid, format='markdown')
            contents = result['contents']

            # Access specific URL
            for url, content in contents.items():
                print(f"{url}: {len(content)} chars")
            ```
        """
        timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

        params = {
            'key': self.key,
            'format': format
        }

        response = self._http_handler(
            method='GET',
            url=f'{self.host}/crawl/{uuid}/contents',
            params=params,
            timeout=timeout,
            headers={'User-Agent': self.ua},
            verify=self.verify
        )

        if response.status_code != 200:
            self._handle_crawler_error_response(response)

        return response.json()

    def _handle_crawler_error_response(self, response: Response):
        """Handle error responses from Crawler API"""
        try:
            error_data = response.json()
            error_msg = error_data.get('message', 'Unknown error')
            error_code = error_data.get('code', 'ERR::CRAWLER::UNKNOWN')
        except Exception:
            error_msg = response.text
            error_code = 'ERR::CRAWLER::UNKNOWN'

        raise HttpError(
            message=f"Crawler API error ({response.status_code}): {error_msg}",
            code=error_code,
            http_status_code=response.status_code,
            request=response.request,
            response=response
        )

Class variables

var CONCURRENCY_AUTO
var DATETIME_FORMAT
var DEFAULT_CONNECT_TIMEOUT
var DEFAULT_CRAWLER_API_READ_TIMEOUT
var DEFAULT_EXTRACTION_API_READ_TIMEOUT
var DEFAULT_READ_TIMEOUT
var DEFAULT_SCREENSHOT_API_READ_TIMEOUT
var DEFAULT_WEBSCRAPING_API_READ_TIMEOUT
var HOST
var brotli : bool
var connect_timeout : int
var debug : bool
var default_read_timeout : int
var distributed_mode : bool
var extraction_api_read_timeout : int
var host : str
var key : str
var max_concurrency : int
var monitoring_api_read_timeout : int
var read_timeout : int
var reporter : scrapfly.reporter.Reporter
var screenshot_api_read_timeout : int
var verify : bool
var version : str
var web_scraping_api_read_timeout : int

Instance variables

prop http
Expand source code
@property
def http(self):
    return self._http_handler
prop ua : str
Expand source code
@property
def ua(self) -> str:
    return 'ScrapflySDK/%s (Python %s, %s, %s)' % (
        self.version,
        platform.python_version(),
        platform.uname().system,
        platform.uname().machine
    )

Methods

def account(self) ‑> str | Dict
Expand source code
def account(self) -> Union[str, Dict]:
    response = self._http_handler(
        method='GET',
        url=self.host + '/account',
        params={'key': self.key},
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content, response.headers['content-type'])

    return response.content.decode('utf-8')
async def async_extraction(self,
extraction_config: ExtractionConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ExtractionApiResponse
Expand source code
async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.extract, extraction_config)
async def async_scrape(self,
scrape_config: ScrapeConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ScrapeApiResponse
Expand source code
async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)
async def async_screenshot(self,
screenshot_config: ScreenshotConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ScreenshotApiResponse
Expand source code
async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config)
def cancel_crawl(self, crawl_uuid: str) ‑> bool
Expand source code
def cancel_crawl(self, crawl_uuid: str) -> bool:
    """
    Cancel a running crawler job

    :param crawl_uuid: Crawler job UUID to cancel
    :return: True if cancelled successfully

    Example:
        ```python
        # Start a crawl
        crawl = client.start_crawl(config)

        # Cancel it
        client.cancel_crawl(crawl.uuid)
        ```
    """
    timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

    response = self._http_handler(
        method='DELETE',
        url=f'{self.host}/crawl/{crawl_uuid}',
        params={'key': self.key},
        timeout=timeout,
        headers={'User-Agent': self.ua},
        verify=self.verify
    )

    if response.status_code not in (200, 204):
        self._handle_crawler_error_response(response)

    return True

Cancel a running crawler job

:param crawl_uuid: Crawler job UUID to cancel :return: True if cancelled successfully

Example

# Start a crawl
crawl = client.start_crawl(config)

# Cancel it
client.cancel_crawl(crawl.uuid)
def close(self)
Expand source code
def close(self):
    self.http_session.close()
    self.http_session = None
async def concurrent_scrape(self,
scrape_configs: List[ScrapeConfig],
concurrency: int | None = None)
Expand source code
async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None):
    if concurrency is None:
        concurrency = self.max_concurrency
    elif concurrency == self.CONCURRENCY_AUTO:
        concurrency = self.account()['subscription']['max_concurrency']

    loop = asyncio.get_running_loop()
    processing_tasks = []
    results = []
    processed_tasks = 0
    expected_tasks = len(scrape_configs)

    def scrape_done_callback(task:Task):
        nonlocal processed_tasks

        try:
            if task.cancelled() is True:
                return

            error = task.exception()

            if error is not None:
                results.append(error)
            else:
                results.append(task.result())
        finally:
            processing_tasks.remove(task)
            processed_tasks += 1

    while scrape_configs or results or processing_tasks:
        logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

        if scrape_configs:
            if len(processing_tasks) < concurrency:
                # @todo handle backpressure
                for _ in range(0, concurrency - len(processing_tasks)):
                    try:
                        scrape_config = scrape_configs.pop()
                    except:
                        break

                    scrape_config.raise_on_upstream_error = False
                    task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop))
                    processing_tasks.append(task)
                    task.add_done_callback(scrape_done_callback)

        for _ in results:
            result = results.pop()
            yield result

        await asyncio.sleep(.5)

    logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))
def extract(self,
extraction_config: ExtractionConfig,
no_raise: bool = False) ‑> ExtractionApiResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse:
    """
    Extract structured data from text content
    :param extraction_config: ExtractionConfig
    :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration
    :return: str

    If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error.
    If the error is not none, you will get the following structure for example

    'error': {
        'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED',
        'message': 'The content type of the response is not supported for extraction',
        'http_code': 422,
        'links': {
            'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED'
        }
    }
    """

    try:
        logger.debug('--> %s Extracting data from' % (extraction_config.content_type))
        request_data = self._extraction_request(extraction_config=extraction_config)
        response = self._http_handler(**request_data)
        extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config)
        return extraction_api_response
    except BaseException as e:
        self.reporter.report(error=e)

        if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
            return e.api_response

        raise e

Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str

If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } }

def get_crawl_artifact(self, uuid: str, artifact_type: str = 'warc') ‑> CrawlerArtifactResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def get_crawl_artifact(
    self,
    uuid: str,
    artifact_type: str = 'warc'
) -> CrawlerArtifactResponse:
    """
    Download crawler job artifact

    :param uuid: Crawler job UUID
    :param artifact_type: Artifact type ('warc' or 'har')
    :return: CrawlerArtifactResponse with WARC data and parsing utilities

    Example:
        ```python
        # Wait for crawl to complete
        while True:
            status = client.get_crawl_status(uuid)
            if status.is_complete:
                break
            time.sleep(5)

        # Download artifact
        artifact = client.get_crawl_artifact(uuid)

        # Easy mode: get all pages
        pages = artifact.get_pages()
        for page in pages:
            print(f"{page['url']}: {page['status_code']}")

        # Memory-efficient: iterate
        for record in artifact.iter_responses():
            process(record.content)

        # Save to file
        artifact.save('crawl.warc.gz')
        ```
    """
    timeout = (self.connect_timeout, 300)  # 5 minutes for large downloads

    response = self._http_handler(
        method='GET',
        url=f'{self.host}/crawl/{uuid}/artifact',
        params={
            'key': self.key,
            'type': artifact_type
        },
        timeout=timeout,
        headers={'User-Agent': self.ua},
        verify=self.verify
    )

    if response.status_code != 200:
        self._handle_crawler_error_response(response)

    return CrawlerArtifactResponse(response.content, artifact_type=artifact_type)

Download crawler job artifact

:param uuid: Crawler job UUID :param artifact_type: Artifact type ('warc' or 'har') :return: CrawlerArtifactResponse with WARC data and parsing utilities

Example

# Wait for crawl to complete
while True:
    status = client.get_crawl_status(uuid)
    if status.is_complete:
        break
    time.sleep(5)

# Download artifact
artifact = client.get_crawl_artifact(uuid)

# Easy mode: get all pages
pages = artifact.get_pages()
for page in pages:
    print(f"{page['url']}: {page['status_code']}")

# Memory-efficient: iterate
for record in artifact.iter_responses():
    process(record.content)

# Save to file
artifact.save('crawl.warc.gz')
def get_crawl_contents(self,
uuid: str,
format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html') ‑> Dict[str, Any]
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def get_crawl_contents(
    self,
    uuid: str,
    format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html'
) -> Dict[str, Any]:
    """
    Get crawl contents in a specific format

    Retrieves extracted content from crawled pages in the format(s) specified
    in your crawl configuration (via content_formats parameter).

    :param uuid: Crawler job UUID
    :param format: Content format - 'html', 'clean_html', 'markdown', 'json', 'text',
                  'extracted_data', 'page_metadata'
    :return: Dictionary with format {"contents": {url: content, ...}, "links": {...}}

    Example:
        ```python
        # Get all content in markdown format
        result = client.get_crawl_contents(uuid, format='markdown')
        contents = result['contents']

        # Access specific URL
        for url, content in contents.items():
            print(f"{url}: {len(content)} chars")
        ```
    """
    timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

    params = {
        'key': self.key,
        'format': format
    }

    response = self._http_handler(
        method='GET',
        url=f'{self.host}/crawl/{uuid}/contents',
        params=params,
        timeout=timeout,
        headers={'User-Agent': self.ua},
        verify=self.verify
    )

    if response.status_code != 200:
        self._handle_crawler_error_response(response)

    return response.json()

Get crawl contents in a specific format

Retrieves extracted content from crawled pages in the format(s) specified in your crawl configuration (via content_formats parameter).

:param uuid: Crawler job UUID :param format: Content format - 'html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata' :return: Dictionary with format {"contents": {url: content, …}, "links": {…}}

Example

# Get all content in markdown format
result = client.get_crawl_contents(uuid, format='markdown')
contents = result['contents']

# Access specific URL
for url, content in contents.items():
    print(f"{url}: {len(content)} chars")
def get_crawl_status(self, uuid: str) ‑> CrawlerStatusResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def get_crawl_status(self, uuid: str) -> CrawlerStatusResponse:
    """
    Get crawler job status

    :param uuid: Crawler job UUID
    :return: CrawlerStatusResponse with progress information

    Example:
        ```python
        status = client.get_crawl_status(uuid)
        print(f"Status: {status.status}")
        print(f"Progress: {status.progress_pct:.1f}%")
        print(f"Crawled: {status.urls_crawled}/{status.urls_discovered}")

        if status.is_complete:
            print("Crawl completed!")
        ```
    """
    timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

    response = self._http_handler(
        method='GET',
        url=f'{self.host}/crawl/{uuid}/status',
        params={'key': self.key},  # key as query param (already correct)
        timeout=timeout,
        headers={'User-Agent': self.ua},
        verify=self.verify
    )

    if response.status_code != 200:
        self._handle_crawler_error_response(response)

    result = response.json()
    return CrawlerStatusResponse(result)

Get crawler job status

:param uuid: Crawler job UUID :return: CrawlerStatusResponse with progress information

Example

status = client.get_crawl_status(uuid)
print(f"Status: {status.status}")
print(f"Progress: {status.progress_pct:.1f}%")
print(f"Crawled: {status.urls_crawled}/{status.urls_discovered}")

if status.is_complete:
    print("Crawl completed!")
def get_monitoring_metrics(self,
format: str = 'structured',
period: str | None = None,
aggregation: List[Literal['account', 'project', 'target']] | None = None)
Expand source code
def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None):
    params = {'key': self.key, 'format': format}

    if period is not None:
        params['period'] = period

    if aggregation is not None:
        params['aggregation'] = ','.join(aggregation)

    response = self._http_handler(
        method='GET',
        url=self.host + '/scrape/monitoring/metrics',
        params=params,
        timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content, response.headers['content-type'])

    return response.content.decode('utf-8')
def get_monitoring_target_metrics(self,
domain: str,
group_subdomain: bool = False,
period: Literal['subscription', 'last7d', 'last24h', 'last1h', 'last5m'] | None = 'last24h',
start: datetime.datetime | None = None,
end: datetime.datetime | None = None)
Expand source code
def get_monitoring_target_metrics(
        self,
        domain:str,
        group_subdomain:bool=False,
        period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H,
        start:Optional[datetime.datetime]=None,
        end:Optional[datetime.datetime]=None,
):
    params = {
        'key': self.key,
        'domain': domain,
        'group_subdomain': group_subdomain
    }

    if (start is not None and end is None) or (start is None and end is not None):
        raise ValueError('You must provide both start and end date')

    if start is not None and end is not None:
        params['start'] = start.strftime(self.DATETIME_FORMAT)
        params['end'] = end.strftime(self.DATETIME_FORMAT)
        period = None

    params['period'] = period

    response = self._http_handler(
        method='GET',
        url=self.host + '/scrape/monitoring/metrics/target',
        timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
        params=params,
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content, response.headers['content-type'])

    return response.content.decode('utf-8')
def open(self)
Expand source code
def open(self):
    if self.http_session is None:
        self.http_session = Session()
        self.http_session.verify = self.verify
        self.http_session.timeout = (self.connect_timeout, self.default_read_timeout)
        self.http_session.params['key'] = self.key
        self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding
        self.http_session.headers['accept'] = self.body_handler.accept
        self.http_session.headers['user-agent'] = self.ua
def resilient_scrape(self,
scrape_config: ScrapeConfig,
retry_on_errors: Set[Exception] = {<class 'scrapfly.errors.ScrapflyError'>},
retry_on_status_code: List[int] | None = None,
tries: int = 5,
delay: int = 20) ‑> ScrapeApiResponse
Expand source code
def resilient_scrape(
    self,
    scrape_config:ScrapeConfig,
    retry_on_errors:Set[Exception]={ScrapflyError},
    retry_on_status_code:Optional[List[int]]=None,
    tries: int = 5,
    delay: int = 20,
) -> ScrapeApiResponse:
    assert retry_on_errors is not None, 'Retry on error is None'
    assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()'

    @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay)
    def inner() -> ScrapeApiResponse:

        try:
            return self.scrape(scrape_config=scrape_config)
        except (UpstreamHttpClientError, UpstreamHttpServerError) as e:
            if retry_on_status_code is not None and e.api_response:
                if e.api_response.upstream_status_code in retry_on_status_code:
                    raise e
                else:
                    return e.api_response

            raise e

    return inner()
def save_scrape_screenshot(self,
api_response: ScrapeApiResponse,
name: str,
path: str | None = None)
Expand source code
def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None):
    """
    Save a screenshot from a scrape result
    :param api_response: ScrapeApiResponse
    :param name: str - name of the screenshot given in the scrape config
    :param path: Optional[str]
    """

    if not api_response.scrape_result['screenshots']:
        raise RuntimeError('Screenshot %s do no exists' % name)

    try:
        api_response.scrape_result['screenshots'][name]
    except KeyError:
        raise RuntimeError('Screenshot %s do no exists' % name)

    screenshot_response = self._http_handler(
        method='GET',
        url=api_response.scrape_result['screenshots'][name]['url'],
        params={'key': self.key},
        verify=self.verify
    )

    screenshot_response.raise_for_status()

    if not name.endswith('.jpg'):
        name += '.jpg'

    api_response.sink(path=path, name=name, content=screenshot_response.content)

Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str]

def save_screenshot(self,
screenshot_api_response: ScreenshotApiResponse,
name: str,
path: str | None = None)
Expand source code
def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None):
    """
    Save a screenshot from a screenshot API response
    :param api_response: ScreenshotApiResponse
    :param name: str - name of the screenshot to save as
    :param path: Optional[str]
    """

    if screenshot_api_response.screenshot_success is not True:
        raise RuntimeError('Screenshot was not successful')

    if not screenshot_api_response.image:
        raise RuntimeError('Screenshot binary does not exist')

    content = screenshot_api_response.image
    extension_name = screenshot_api_response.metadata['extension_name']

    if path:
        os.makedirs(path, exist_ok=True)
        file_path = os.path.join(path, f'{name}.{extension_name}')
    else:
        file_path = f'{name}.{extension_name}'

    if isinstance(content, bytes):
        content = BytesIO(content)

    with open(file_path, 'wb') as f:
        shutil.copyfileobj(content, f, length=131072)

Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str]

def scrape(self,
scrape_config: ScrapeConfig,
no_raise: bool = False) ‑> ScrapeApiResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse:
    """
    Scrape a website
    :param scrape_config: ScrapeConfig
    :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration
    :return: ScrapeApiResponse

    If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error.
    If the error is not none, you will get the following structure for example

    'error': {
        'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED',
        'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds',
        'retryable': False,
        'http_code': 422,
        'links': {
            'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED'
        }
    }
    """

    try:
        logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url))
        request_data = self._scrape_request(scrape_config=scrape_config)
        response = self._http_handler(**request_data)
        scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config)

        self.reporter.report(scrape_api_response=scrape_api_response)

        return scrape_api_response
    except BaseException as e:
        self.reporter.report(error=e)

        if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
            return e.api_response

        raise e

Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse

If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } }

def screenshot(self,
screenshot_config: ScreenshotConfig,
no_raise: bool = False) ‑> ScreenshotApiResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse:
    """
    Take a screenshot
    :param screenshot_config: ScrapeConfig
    :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration
    :return: str

    If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error.
    If the error is not none, you will get the following structure for example

    'error': {
        'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT',
        'message': 'For some reason we were unable to take the screenshot',
        'http_code': 422,
        'links': {
            'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT'
        }
    }
    """

    try:
        logger.debug('--> %s Screenshoting' % (screenshot_config.url))
        request_data = self._screenshot_request(screenshot_config=screenshot_config)
        response = self._http_handler(**request_data)
        screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config)
        return screenshot_api_response
    except BaseException as e:
        self.reporter.report(error=e)

        if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
            return e.api_response

        raise e

Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str

If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } }

def sink(self,
api_response: ScrapeApiResponse,
content: str | bytes | None = None,
path: str | None = None,
name: str | None = None,
file:  | _io.BytesIO | None = None) ‑> str
Expand source code
def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str:
    scrape_result = api_response.result['result']
    scrape_config = api_response.result['config']

    file_content = content or scrape_result['content']
    file_path = None
    file_extension = None

    if name:
        name_parts = name.split('.')
        if len(name_parts) > 1:
            file_extension = name_parts[-1]

    if not file:
        if file_extension is None:
            try:
                mime_type = scrape_result['response_headers']['content-type']
            except KeyError:
                mime_type = 'application/octet-stream'

            if ';' in mime_type:
                mime_type = mime_type.split(';')[0]

            file_extension = '.' + mime_type.split('/')[1]

        if not name:
            name = scrape_config['url'].split('/')[-1]

        if name.find(file_extension) == -1:
            name += file_extension

        file_path = path + '/' + name if path else name

        if file_path == file_extension:
            url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-')

            if url[-1] == '-':
                url = url[:-1]

            url += file_extension

            file_path = url

        file = open(file_path, 'wb')

    if isinstance(file_content, str):
        file_content = BytesIO(file_content.encode('utf-8'))
    elif isinstance(file_content, bytes):
        file_content = BytesIO(file_content)

    file_content.seek(0)
    with file as f:
        shutil.copyfileobj(file_content, f, length=131072)

    logger.info('file %s created' % file_path)
    return file_path
def start_crawl(self,
crawler_config: CrawlerConfig) ‑> CrawlerStartResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=ConnectionError, max_tries=5)
def start_crawl(self, crawler_config: CrawlerConfig) -> CrawlerStartResponse:
    """
    Start a crawler job

    :param crawler_config: CrawlerConfig
    :return: CrawlerStartResponse with UUID and initial status

    Example:
        ```python
        from scrapfly import ScrapflyClient, CrawlerConfig

        client = ScrapflyClient(key='YOUR_API_KEY')
        config = CrawlerConfig(
            url='https://example.com',
            page_limit=100,
            max_depth=3
        )

        response = client.start_crawl(config)
        print(f"Crawler started: {response.uuid}")
        ```
    """
    # Get crawler config params (without key)
    body_params = crawler_config.to_api_params()

    # API key must be passed as query parameter, not in body
    query_params = {'key': self.key}

    timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

    url = f'{self.host}/crawl'
    logger.debug(f"Crawler API POST {url}?key=***")
    logger.debug(f"Crawler API body: {body_params}")

    response = self._http_handler(
        method='POST',
        url=url,
        params=query_params,  # key as query param
        json=body_params,      # config in body
        timeout=timeout,
        headers={'User-Agent': self.ua},
        verify=self.verify
    )

    if response.status_code not in (200, 201):
        # Log error details for debugging
        try:
            error_detail = response.json()
        except:
            error_detail = response.text
        logger.debug(f"Crawler API error ({response.status_code}): {error_detail}")
        self._handle_crawler_error_response(response)

    result = response.json()
    return CrawlerStartResponse(result)

Start a crawler job

:param crawler_config: CrawlerConfig :return: CrawlerStartResponse with UUID and initial status

Example

from scrapfly import ScrapflyClient, CrawlerConfig

client = ScrapflyClient(key='YOUR_API_KEY')
config = CrawlerConfig(
    url='https://example.com',
    page_limit=100,
    max_depth=3
)

response = client.start_crawl(config)
print(f"Crawler started: {response.uuid}")