Package scrapfly

Sub-modules

scrapfly.api_config
scrapfly.api_response
scrapfly.client
scrapfly.crawler

Scrapfly Crawler API …

scrapfly.errors
scrapfly.extraction_config
scrapfly.frozen_dict
scrapfly.polyfill
scrapfly.reporter
scrapfly.scrape_config
scrapfly.scrapy
scrapfly.screenshot_config
scrapfly.webhook

Functions

def parse_warc(warc_data: bytes | ) ‑> WarcParser
Expand source code
def parse_warc(warc_data: Union[bytes, BinaryIO]) -> WarcParser:
    """
    Convenience function to create a WARC parser

    Args:
        warc_data: WARC data as bytes or file-like object

    Returns:
        WarcParser: Parser instance

    Example:
        ```python
        from scrapfly import parse_warc

        # Quick way to get all pages
        pages = parse_warc(warc_bytes).get_pages()
        for page in pages:
            print(f"{page['url']}: {page['status_code']}")
        ```
    """
    return WarcParser(warc_data)

Convenience function to create a WARC parser

Args

warc_data
WARC data as bytes or file-like object

Returns

WarcParser
Parser instance

Example

from scrapfly import parse_warc

# Quick way to get all pages
pages = parse_warc(warc_bytes).get_pages()
for page in pages:
    print(f"{page['url']}: {page['status_code']}")
def webhook_from_payload(payload: Dict,
signing_secrets: Tuple[str] | None = None,
signature: str | None = None) ‑> CrawlStartedWebhook | CrawlUrlDiscoveredWebhook | CrawlUrlFailedWebhook | CrawlCompletedWebhook
Expand source code
def webhook_from_payload(
    payload: Dict,
    signing_secrets: Optional[Tuple[str]] = None,
    signature: Optional[str] = None
) -> CrawlerWebhook:
    """
    Create a typed webhook instance from a raw payload dictionary.

    This helper automatically determines the webhook type based on the 'event' field
    and returns the appropriate typed webhook instance.

    Args:
        payload: The webhook payload as a dictionary
        signing_secrets: Optional tuple of signing secrets (hex strings) for verification
        signature: Optional webhook signature header for verification

    Returns:
        A typed webhook instance (CrawlStartedWebhook, CrawlUrlDiscoveredWebhook, etc.)

    Raises:
        ValueError: If the event type is unknown
        WebhookSignatureMissMatch: If signature verification fails

    Example:
        ```python
        from scrapfly import webhook_from_payload

        # From Flask request
        @app.route('/webhook', methods=['POST'])
        def handle_webhook():
            webhook = webhook_from_payload(
                request.json,
                signing_secrets=('your-secret-key',),
                signature=request.headers.get('X-Scrapfly-Webhook-Signature')
            )

            if isinstance(webhook, CrawlCompletedWebhook):
                print(f"Crawl {webhook.uuid} completed!")
                print(f"Crawled {webhook.urls_crawled} URLs")

            return '', 200
        ```
    """
    # Verify signature if provided
    if signing_secrets and signature:
        from ..api_response import ResponseBodyHandler
        from json import dumps

        handler = ResponseBodyHandler(signing_secrets=signing_secrets)
        message = dumps(payload, separators=(',', ':')).encode('utf-8')
        if not handler.verify(message, signature):
            from ..errors import WebhookSignatureMissMatch
            raise WebhookSignatureMissMatch()

    # Determine event type and create appropriate webhook instance
    event = payload.get('event')

    if event == CrawlerWebhookEvent.STARTED.value:
        return CrawlStartedWebhook.from_dict(payload)
    elif event == CrawlerWebhookEvent.URL_DISCOVERED.value:
        return CrawlUrlDiscoveredWebhook.from_dict(payload)
    elif event == CrawlerWebhookEvent.URL_FAILED.value:
        return CrawlUrlFailedWebhook.from_dict(payload)
    elif event == CrawlerWebhookEvent.COMPLETED.value:
        return CrawlCompletedWebhook.from_dict(payload)
    else:
        raise ValueError(f"Unknown crawler webhook event type: {event}")

Create a typed webhook instance from a raw payload dictionary.

This helper automatically determines the webhook type based on the 'event' field and returns the appropriate typed webhook instance.

Args

payload
The webhook payload as a dictionary
signing_secrets
Optional tuple of signing secrets (hex strings) for verification
signature
Optional webhook signature header for verification

Returns

A typed webhook instance (CrawlStartedWebhook, CrawlUrlDiscoveredWebhook, etc.)

Raises

ValueError
If the event type is unknown
WebhookSignatureMissMatch
If signature verification fails

Example

from scrapfly import webhook_from_payload

# From Flask request
@app.route('/webhook', methods=['POST'])
def handle_webhook():
    webhook = webhook_from_payload(
        request.json,
        signing_secrets=('your-secret-key',),
        signature=request.headers.get('X-Scrapfly-Webhook-Signature')
    )

    if isinstance(webhook, CrawlCompletedWebhook):
        print(f"Crawl {webhook.uuid} completed!")
        print(f"Crawled {webhook.urls_crawled} URLs")

    return '', 200

Classes

class ApiHttpClientError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ApiHttpClientError(HttpError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

  • ApiHttpServerError
  • scrapfly.errors.BadApiKeyError
  • scrapfly.errors.PaymentRequired
  • scrapfly.errors.TooManyRequest
class ApiHttpServerError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ApiHttpServerError(ApiHttpClientError):
    pass

Common base class for all non-exit exceptions.

Ancestors

class Crawl (client: ScrapflyClient,
config: CrawlerConfig)
Expand source code
class Crawl:
    """
    High-level abstraction for managing a crawler job

    The Crawl object maintains the state of a crawler job and provides
    convenient methods for managing its lifecycle.

    Example:
        ```python
        from scrapfly import ScrapflyClient, CrawlerConfig, Crawl

        client = ScrapflyClient(key='your-key')
        config = CrawlerConfig(url='https://example.com', page_limit=10)

        # Create and start crawl
        crawl = Crawl(client, config)
        crawl.crawl()  # Start the crawler

        # Wait for completion
        crawl.wait()

        # Get results
        pages = crawl.warc().get_pages()
        for page in pages:
            print(f"{page['url']}: {page['status_code']}")

        # Or read specific URLs
        html = crawl.read('https://example.com/page1', format='html')
        ```
    """

    def __init__(self, client: 'ScrapflyClient', config: CrawlerConfig):
        """
        Initialize a Crawl object

        Args:
            client: ScrapflyClient instance
            config: CrawlerConfig with crawler settings
        """
        self._client = client
        self._config = config
        self._uuid: Optional[str] = None
        self._status_cache: Optional[CrawlerStatusResponse] = None
        self._artifact_cache: Optional[CrawlerArtifactResponse] = None

    @property
    def uuid(self) -> Optional[str]:
        """Get the crawler job UUID (None if not started)"""
        return self._uuid

    @property
    def started(self) -> bool:
        """Check if the crawler has been started"""
        return self._uuid is not None

    def crawl(self) -> 'Crawl':
        """
        Start the crawler job

        Returns:
            Self for method chaining

        Raises:
            RuntimeError: If crawler already started

        Example:
            ```python
            crawl = Crawl(client, config)
            crawl.crawl()  # Start crawling
            ```
        """
        if self._uuid is not None:
            raise ScrapflyCrawlerError(
                message="Crawler already started",
                code="ALREADY_STARTED",
                http_status_code=400
            )

        response = self._client.start_crawl(self._config)
        self._uuid = response.uuid
        return self

    def status(self, refresh: bool = True) -> CrawlerStatusResponse:
        """
        Get current crawler status

        Args:
            refresh: If True, fetch fresh status from API. If False, return cached status.

        Returns:
            CrawlerStatusResponse with current status

        Raises:
            RuntimeError: If crawler not started yet

        Example:
            ```python
            status = crawl.status()
            print(f"Progress: {status.progress_pct}%")
            print(f"URLs crawled: {status.urls_crawled}")
            ```
        """
        if self._uuid is None:
            raise ScrapflyCrawlerError(
                message="Crawler not started yet. Call crawl() first.",
                code="NOT_STARTED",
                http_status_code=400
            )

        if refresh or self._status_cache is None:
            self._status_cache = self._client.get_crawl_status(self._uuid)

        return self._status_cache

    def wait(
        self,
        poll_interval: int = 5,
        max_wait: Optional[int] = None,
        verbose: bool = False
    ) -> 'Crawl':
        """
        Wait for crawler to complete

        Polls the status endpoint until the crawler finishes.

        Args:
            poll_interval: Seconds between status checks (default: 5)
            max_wait: Maximum seconds to wait (None = wait forever)
            verbose: If True, print progress updates

        Returns:
            Self for method chaining

        Raises:
            RuntimeError: If crawler not started, failed, or timed out

        Example:
            ```python
            # Wait with progress updates
            crawl.crawl().wait(verbose=True)

            # Wait with timeout
            crawl.crawl().wait(max_wait=300)  # 5 minutes max
            ```
        """
        if self._uuid is None:
            raise ScrapflyCrawlerError(
                message="Crawler not started yet. Call crawl() first.",
                code="NOT_STARTED",
                http_status_code=400
            )

        start_time = time.time()
        poll_count = 0

        while True:
            status = self.status(refresh=True)
            poll_count += 1

            if verbose:
                logger.info(f"Poll #{poll_count}: {status.status} - "
                           f"{status.progress_pct:.1f}% - "
                           f"{status.urls_crawled}/{status.urls_discovered} URLs")

            if status.is_complete:
                if verbose:
                    logger.info(f"✓ Crawler completed successfully!")
                return self
            elif status.is_failed:
                raise ScrapflyCrawlerError(
                    message=f"Crawler failed with status: {status.status}",
                    code="FAILED",
                    http_status_code=400
                )
            elif status.is_cancelled:
                raise ScrapflyCrawlerError(
                    message="Crawler was cancelled",
                    code="CANCELLED",
                    http_status_code=400
                )

            # Check timeout
            if max_wait is not None:
                elapsed = time.time() - start_time
                if elapsed > max_wait:
                    raise ScrapflyCrawlerError(
                        message=f"Timeout waiting for crawler (>{max_wait}s)",
                        code="TIMEOUT",
                        http_status_code=400
                    )

            time.sleep(poll_interval)

    def cancel(self) -> bool:
        """
        Cancel the running crawler job

        Returns:
            True if cancelled successfully

        Raises:
            ScrapflyCrawlerError: If crawler not started yet

        Example:
            ```python
            # Start a crawl
            crawl = Crawl(client, config).crawl()

            # Cancel it
            crawl.cancel()
            ```
        """
        if self._uuid is None:
            raise ScrapflyCrawlerError(
                message="Crawler not started yet. Call crawl() first.",
                code="NOT_STARTED",
                http_status_code=400
            )

        return self._client.cancel_crawl(self._uuid)

    def warc(self, artifact_type: str = 'warc') -> CrawlerArtifactResponse:
        """
        Download the crawler artifact (WARC file)

        Args:
            artifact_type: Type of artifact to download (default: 'warc')

        Returns:
            CrawlerArtifactResponse with parsed WARC data

        Raises:
            RuntimeError: If crawler not started yet

        Example:
            ```python
            # Get WARC artifact
            artifact = crawl.warc()

            # Get all pages
            pages = artifact.get_pages()

            # Iterate through responses
            for record in artifact.iter_responses():
                print(record.url)
            ```
        """
        if self._uuid is None:
            raise ScrapflyCrawlerError(
                message="Crawler not started yet. Call crawl() first.",
                code="NOT_STARTED",
                http_status_code=400
            )

        if self._artifact_cache is None:
            self._artifact_cache = self._client.get_crawl_artifact(
                self._uuid,
                artifact_type=artifact_type
            )

        return self._artifact_cache

    def har(self) -> CrawlerArtifactResponse:
        """
        Download the crawler artifact in HAR (HTTP Archive) format

        Returns:
            CrawlerArtifactResponse with parsed HAR data

        Raises:
            RuntimeError: If crawler not started yet

        Example:
            ```python
            # Get HAR artifact
            artifact = crawl.har()

            # Get all pages
            pages = artifact.get_pages()

            # Iterate through HAR entries
            for entry in artifact.iter_responses():
                print(f"{entry.url}: {entry.status_code}")
                print(f"Timing: {entry.time}ms")
            ```
        """
        if self._uuid is None:
            raise ScrapflyCrawlerError(
                message="Crawler not started yet. Call crawl() first.",
                code="NOT_STARTED",
                http_status_code=400
            )

        return self._client.get_crawl_artifact(
            self._uuid,
            artifact_type='har'
        )

    def read(self, url: str, format: ContentFormat = 'html') -> Optional[CrawlContent]:
        """
        Read content from a specific URL in the crawl results

        Args:
            url: The URL to retrieve content for
            format: Content format - 'html', 'markdown', 'text', 'clean_html', 'json',
                   'extracted_data', 'page_metadata'

        Returns:
            CrawlContent object with content and metadata, or None if URL not found

        Example:
            ```python
            # Get HTML content for a specific URL
            content = crawl.read('https://example.com/page1')
            if content:
                print(f"URL: {content.url}")
                print(f"Status: {content.status_code}")
                print(f"Duration: {content.duration}s")
                print(content.content)

            # Get markdown content
            content = crawl.read('https://example.com/page1', format='markdown')
            if content:
                print(content.content)

            # Check if URL was crawled
            if crawl.read('https://example.com/missing') is None:
                print("URL not found in crawl results")
            ```
        """
        if self._uuid is None:
            raise ScrapflyCrawlerError(
                message="Crawler not started yet. Call crawl() first.",
                code="NOT_STARTED",
                http_status_code=400
            )

        # For HTML format, we can get it from the WARC artifact (faster)
        if format == 'html':
            artifact = self.warc()
            for record in artifact.iter_responses():
                if record.url == url:
                    # Extract metadata from WARC headers
                    warc_headers = record.warc_headers or {}
                    duration_str = warc_headers.get('WARC-Scrape-Duration')
                    duration = float(duration_str) if duration_str else None

                    return CrawlContent(
                        url=record.url,
                        content=record.content.decode('utf-8', errors='replace'),
                        status_code=record.status_code,
                        headers=record.headers,
                        duration=duration,
                        log_id=warc_headers.get('WARC-Scrape-Log-Id'),
                        country=warc_headers.get('WARC-Scrape-Country'),
                        crawl_uuid=self._uuid
                    )
            return None

        # For other formats (markdown, text, etc.), use the contents API
        try:
            result = self._client.get_crawl_contents(
                self._uuid,
                format=format
            )

            # The API returns: {"contents": {url: {format: content, ...}, ...}, "links": {...}}
            contents = result.get('contents', {})

            if url in contents:
                content_data = contents[url]
                # Content is always a dict with format keys (e.g., {"html": "...", "markdown": "..."})
                content_str = content_data.get(format)

                if content_str:
                    # For non-HTML formats from contents API, we don't have full metadata
                    # Try to get status code from WARC if possible
                    status_code = 200  # Default
                    headers = {}
                    duration = None
                    log_id = None
                    country = None

                    # Try to get metadata from WARC
                    try:
                        artifact = self.warc()
                        for record in artifact.iter_responses():
                            if record.url == url:
                                status_code = record.status_code
                                headers = record.headers
                                warc_headers = record.warc_headers or {}
                                duration_str = warc_headers.get('WARC-Scrape-Duration')
                                duration = float(duration_str) if duration_str else None
                                log_id = warc_headers.get('WARC-Scrape-Log-Id')
                                country = warc_headers.get('WARC-Scrape-Country')
                                break
                    except:
                        pass

                    return CrawlContent(
                        url=url,
                        content=content_str,
                        status_code=status_code,
                        headers=headers,
                        duration=duration,
                        log_id=log_id,
                        country=country,
                        crawl_uuid=self._uuid
                    )

            return None

        except Exception:
            # If contents API fails, return None
            return None

    def read_iter(
        self,
        pattern: str,
        format: ContentFormat = 'html'
    ) -> Iterator[CrawlContent]:
        """
        Iterate through URLs matching a pattern and yield their content

        Supports wildcard patterns using * and ? for flexible URL matching.

        Args:
            pattern: URL pattern with wildcards (* matches any characters, ? matches one)
                    Examples: "/products?page=*", "https://example.com/*/detail", "*/product/*"
            format: Content format to retrieve

        Yields:
            CrawlContent objects for each matching URL

        Example:
            ```python
            # Get all product pages in markdown
            for content in crawl.read_iter(pattern="*/products?page=*", format="markdown"):
                print(f"{content.url}: {len(content.content)} chars")
                print(f"Duration: {content.duration}s")

            # Get all detail pages
            for content in crawl.read_iter(pattern="*/detail/*"):
                process(content.content)

            # Pattern matching examples:
            # "/products?page=*" matches /products?page=1, /products?page=2, etc.
            # "*/product/*" matches any URL with /product/ in the path
            # "https://example.com/page?" matches https://example.com/page1, page2, etc.
            ```
        """
        if self._uuid is None:
            raise ScrapflyCrawlerError(
                message="Crawler not started yet. Call crawl() first.",
                code="NOT_STARTED",
                http_status_code=400
            )

        # For HTML format, use WARC artifact (faster)
        if format == 'html':
            artifact = self.warc()
            for record in artifact.iter_responses():
                if fnmatch.fnmatch(record.url, pattern):
                    # Extract metadata from WARC headers
                    warc_headers = record.warc_headers or {}
                    duration_str = warc_headers.get('WARC-Scrape-Duration')
                    duration = float(duration_str) if duration_str else None

                    yield CrawlContent(
                        url=record.url,
                        content=record.content.decode('utf-8', errors='replace'),
                        status_code=record.status_code,
                        headers=record.headers,
                        duration=duration,
                        log_id=warc_headers.get('WARC-Scrape-Log-Id'),
                        country=warc_headers.get('WARC-Scrape-Country'),
                        crawl_uuid=self._uuid
                    )
        else:
            # For other formats, use contents API
            try:
                result = self._client.get_crawl_contents(
                    self._uuid,
                    format=format
                )

                contents = result.get('contents', {})

                # Build a metadata cache from WARC for non-HTML formats
                metadata_cache = {}
                try:
                    artifact = self.warc()
                    for record in artifact.iter_responses():
                        warc_headers = record.warc_headers or {}
                        duration_str = warc_headers.get('WARC-Scrape-Duration')
                        metadata_cache[record.url] = {
                            'status_code': record.status_code,
                            'headers': record.headers,
                            'duration': float(duration_str) if duration_str else None,
                            'log_id': warc_headers.get('WARC-Scrape-Log-Id'),
                            'country': warc_headers.get('WARC-Scrape-Country')
                        }
                except:
                    pass

                # Iterate through matching URLs
                for url, content_data in contents.items():
                    if fnmatch.fnmatch(url, pattern):
                        # Content is always a dict with format keys (e.g., {"html": "...", "markdown": "..."})
                        content = content_data.get(format)

                        if content:
                            # Get metadata from cache or use defaults
                            metadata = metadata_cache.get(url, {})
                            yield CrawlContent(
                                url=url,
                                content=content,
                                status_code=metadata.get('status_code', 200),
                                headers=metadata.get('headers', {}),
                                duration=metadata.get('duration'),
                                log_id=metadata.get('log_id'),
                                country=metadata.get('country'),
                                crawl_uuid=self._uuid
                            )

            except Exception:
                # If contents API fails, yield nothing
                return

    def read_batch(
        self,
        urls: List[str],
        formats: List[ContentFormat] = None
    ) -> Dict[str, Dict[str, str]]:
        """
        Retrieve content for multiple URLs in a single batch request

        This is more efficient than calling read() multiple times as it retrieves
        all content in a single API call. Maximum 100 URLs per request.

        Args:
            urls: List of URLs to retrieve (max 100)
            formats: List of content formats to retrieve (e.g., ['markdown', 'text'])
                    If None, defaults to ['html']

        Returns:
            Dictionary mapping URLs to their content in requested formats:
            {
                'https://example.com/page1': {
                    'markdown': '# Page 1...',
                    'text': 'Page 1...'
                },
                'https://example.com/page2': {
                    'markdown': '# Page 2...',
                    'text': 'Page 2...'
                }
            }

        Example:
            ```python
            # Get markdown and text for multiple URLs
            urls = ['https://example.com/page1', 'https://example.com/page2']
            contents = crawl.read_batch(urls, formats=['markdown', 'text'])

            for url, formats in contents.items():
                markdown = formats.get('markdown', '')
                text = formats.get('text', '')
                print(f"{url}: {len(markdown)} chars markdown, {len(text)} chars text")
            ```

        Raises:
            ValueError: If more than 100 URLs are provided
            ScrapflyCrawlerError: If crawler not started or request fails
        """
        if self._uuid is None:
            raise ScrapflyCrawlerError(
                message="Crawler not started yet. Call crawl() first.",
                code="NOT_STARTED",
                http_status_code=400
            )

        if len(urls) > 100:
            raise ValueError("Maximum 100 URLs per batch request")

        if not urls:
            return {}

        # Default to html if no formats specified
        if formats is None:
            formats = ['html']

        # Build URL with formats parameter
        formats_str = ','.join(formats)
        url = f"{self._client.host}/crawl/{self._uuid}/contents/batch"
        params = {
            'key': self._client.key,
            'formats': formats_str
        }

        # Prepare request body (newline-separated URLs)
        body = '\n'.join(urls)

        # Make request
        import requests
        response = requests.post(
            url,
            params=params,
            data=body.encode('utf-8'),
            headers={'Content-Type': 'text/plain'},
            verify=self._client.verify
        )

        if response.status_code != 200:
            raise ScrapflyCrawlerError(
                message=f"Batch content request failed: {response.status_code}",
                code="BATCH_REQUEST_FAILED",
                http_status_code=response.status_code
            )

        # Parse multipart response
        content_type = response.headers.get('Content-Type', '')
        if not content_type.startswith('multipart/related'):
            raise ScrapflyCrawlerError(
                message=f"Unexpected content type: {content_type}",
                code="INVALID_RESPONSE",
                http_status_code=500
            )

        # Extract boundary from Content-Type header
        boundary = None
        for part in content_type.split(';'):
            part = part.strip()
            if part.startswith('boundary='):
                boundary = part.split('=', 1)[1]
                break

        if not boundary:
            raise ScrapflyCrawlerError(
                message="No boundary found in multipart response",
                code="INVALID_RESPONSE",
                http_status_code=500
            )

        # Parse multipart message
        # Prepend Content-Type header to make it a valid email message for the parser
        message_bytes = f"Content-Type: {content_type}\r\n\r\n".encode('utf-8') + response.content
        parser = BytesParser(policy=default)
        message = parser.parsebytes(message_bytes)

        # Extract content from each part
        result = {}

        for part in message.walk():
            # Skip the container itself
            if part.get_content_maintype() == 'multipart':
                continue

            # Get the URL from Content-Location header
            content_location = part.get('Content-Location')
            if not content_location:
                continue

            # Get content type to determine format
            part_content_type = part.get_content_type()
            format_type = None

            # Map MIME types to format names
            if 'markdown' in part_content_type:
                format_type = 'markdown'
            elif 'plain' in part_content_type:
                format_type = 'text'
            elif 'html' in part_content_type:
                format_type = 'html'
            elif 'json' in part_content_type:
                format_type = 'json'

            if not format_type:
                continue

            # Get content
            content = part.get_content()
            if isinstance(content, bytes):
                content = content.decode('utf-8', errors='replace')

            # Initialize URL dict if needed
            if content_location not in result:
                result[content_location] = {}

            # Store content
            result[content_location][format_type] = content

        return result

    def stats(self) -> Dict[str, Any]:
        """
        Get comprehensive statistics about the crawl

        Returns:
            Dictionary with crawl statistics

        Example:
            ```python
            stats = crawl.stats()
            print(f"URLs discovered: {stats['urls_discovered']}")
            print(f"URLs crawled: {stats['urls_crawled']}")
            print(f"Success rate: {stats['success_rate']:.1f}%")
            print(f"Total size: {stats['total_size_kb']:.2f} KB")
            ```
        """
        status = self.status(refresh=False)

        # Basic stats from status
        stats_dict = {
            'uuid': self._uuid,
            'status': status.status,
            'urls_discovered': status.urls_discovered,
            'urls_crawled': status.urls_crawled,
            'urls_pending': status.urls_pending,
            'urls_failed': status.urls_failed,
            'progress_pct': status.progress_pct,
            'is_complete': status.is_complete,
            'is_running': status.is_running,
            'is_failed': status.is_failed,
        }

        # Calculate basic crawl rate (crawled vs discovered)
        if status.urls_discovered > 0:
            stats_dict['crawl_rate'] = (status.urls_crawled / status.urls_discovered) * 100

        # Add artifact stats if available
        if self._artifact_cache is not None:
            pages = self._artifact_cache.get_pages()
            total_size = sum(len(p['content']) for p in pages)
            avg_size = total_size / len(pages) if pages else 0

            stats_dict.update({
                'pages_downloaded': len(pages),
                'total_size_bytes': total_size,
                'total_size_kb': total_size / 1024,
                'total_size_mb': total_size / (1024 * 1024),
                'avg_page_size_bytes': avg_size,
                'avg_page_size_kb': avg_size / 1024,
            })

            # Calculate download rate (pages vs discovered)
            if status.urls_discovered > 0:
                stats_dict['download_rate'] = (len(pages) / status.urls_discovered) * 100

        return stats_dict

    def __repr__(self):
        if self._uuid is None:
            return f"Crawl(not started)"

        status_str = "unknown"
        if self._status_cache:
            status_str = self._status_cache.status

        return f"Crawl(uuid={self._uuid}, status={status_str})"

High-level abstraction for managing a crawler job

The Crawl object maintains the state of a crawler job and provides convenient methods for managing its lifecycle.

Example

from scrapfly import ScrapflyClient, CrawlerConfig, Crawl

client = ScrapflyClient(key='your-key')
config = CrawlerConfig(url='https://example.com', page_limit=10)

# Create and start crawl
crawl = Crawl(client, config)
crawl.crawl()  # Start the crawler

# Wait for completion
crawl.wait()

# Get results
pages = crawl.warc().get_pages()
for page in pages:
    print(f"{page['url']}: {page['status_code']}")

# Or read specific URLs
html = crawl.read('https://example.com/page1', format='html')

Initialize a Crawl object

Args

client
ScrapflyClient instance
config
CrawlerConfig with crawler settings

Instance variables

prop started : bool
Expand source code
@property
def started(self) -> bool:
    """Check if the crawler has been started"""
    return self._uuid is not None

Check if the crawler has been started

prop uuid : str | None
Expand source code
@property
def uuid(self) -> Optional[str]:
    """Get the crawler job UUID (None if not started)"""
    return self._uuid

Get the crawler job UUID (None if not started)

Methods

def cancel(self) ‑> bool
Expand source code
def cancel(self) -> bool:
    """
    Cancel the running crawler job

    Returns:
        True if cancelled successfully

    Raises:
        ScrapflyCrawlerError: If crawler not started yet

    Example:
        ```python
        # Start a crawl
        crawl = Crawl(client, config).crawl()

        # Cancel it
        crawl.cancel()
        ```
    """
    if self._uuid is None:
        raise ScrapflyCrawlerError(
            message="Crawler not started yet. Call crawl() first.",
            code="NOT_STARTED",
            http_status_code=400
        )

    return self._client.cancel_crawl(self._uuid)

Cancel the running crawler job

Returns

True if cancelled successfully

Raises

ScrapflyCrawlerError
If crawler not started yet

Example

# Start a crawl
crawl = Crawl(client, config).crawl()

# Cancel it
crawl.cancel()
def crawl(self) ‑> Crawl
Expand source code
def crawl(self) -> 'Crawl':
    """
    Start the crawler job

    Returns:
        Self for method chaining

    Raises:
        RuntimeError: If crawler already started

    Example:
        ```python
        crawl = Crawl(client, config)
        crawl.crawl()  # Start crawling
        ```
    """
    if self._uuid is not None:
        raise ScrapflyCrawlerError(
            message="Crawler already started",
            code="ALREADY_STARTED",
            http_status_code=400
        )

    response = self._client.start_crawl(self._config)
    self._uuid = response.uuid
    return self

Start the crawler job

Returns

Self for method chaining

Raises

RuntimeError
If crawler already started

Example

crawl = Crawl(client, config)
crawl.crawl()  # Start crawling
def har(self) ‑> CrawlerArtifactResponse
Expand source code
def har(self) -> CrawlerArtifactResponse:
    """
    Download the crawler artifact in HAR (HTTP Archive) format

    Returns:
        CrawlerArtifactResponse with parsed HAR data

    Raises:
        RuntimeError: If crawler not started yet

    Example:
        ```python
        # Get HAR artifact
        artifact = crawl.har()

        # Get all pages
        pages = artifact.get_pages()

        # Iterate through HAR entries
        for entry in artifact.iter_responses():
            print(f"{entry.url}: {entry.status_code}")
            print(f"Timing: {entry.time}ms")
        ```
    """
    if self._uuid is None:
        raise ScrapflyCrawlerError(
            message="Crawler not started yet. Call crawl() first.",
            code="NOT_STARTED",
            http_status_code=400
        )

    return self._client.get_crawl_artifact(
        self._uuid,
        artifact_type='har'
    )

Download the crawler artifact in HAR (HTTP Archive) format

Returns

CrawlerArtifactResponse with parsed HAR data

Raises

RuntimeError
If crawler not started yet

Example

# Get HAR artifact
artifact = crawl.har()

# Get all pages
pages = artifact.get_pages()

# Iterate through HAR entries
for entry in artifact.iter_responses():
    print(f"{entry.url}: {entry.status_code}")
    print(f"Timing: {entry.time}ms")
def read(self,
url: str,
format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html') ‑> CrawlContent | None
Expand source code
def read(self, url: str, format: ContentFormat = 'html') -> Optional[CrawlContent]:
    """
    Read content from a specific URL in the crawl results

    Args:
        url: The URL to retrieve content for
        format: Content format - 'html', 'markdown', 'text', 'clean_html', 'json',
               'extracted_data', 'page_metadata'

    Returns:
        CrawlContent object with content and metadata, or None if URL not found

    Example:
        ```python
        # Get HTML content for a specific URL
        content = crawl.read('https://example.com/page1')
        if content:
            print(f"URL: {content.url}")
            print(f"Status: {content.status_code}")
            print(f"Duration: {content.duration}s")
            print(content.content)

        # Get markdown content
        content = crawl.read('https://example.com/page1', format='markdown')
        if content:
            print(content.content)

        # Check if URL was crawled
        if crawl.read('https://example.com/missing') is None:
            print("URL not found in crawl results")
        ```
    """
    if self._uuid is None:
        raise ScrapflyCrawlerError(
            message="Crawler not started yet. Call crawl() first.",
            code="NOT_STARTED",
            http_status_code=400
        )

    # For HTML format, we can get it from the WARC artifact (faster)
    if format == 'html':
        artifact = self.warc()
        for record in artifact.iter_responses():
            if record.url == url:
                # Extract metadata from WARC headers
                warc_headers = record.warc_headers or {}
                duration_str = warc_headers.get('WARC-Scrape-Duration')
                duration = float(duration_str) if duration_str else None

                return CrawlContent(
                    url=record.url,
                    content=record.content.decode('utf-8', errors='replace'),
                    status_code=record.status_code,
                    headers=record.headers,
                    duration=duration,
                    log_id=warc_headers.get('WARC-Scrape-Log-Id'),
                    country=warc_headers.get('WARC-Scrape-Country'),
                    crawl_uuid=self._uuid
                )
        return None

    # For other formats (markdown, text, etc.), use the contents API
    try:
        result = self._client.get_crawl_contents(
            self._uuid,
            format=format
        )

        # The API returns: {"contents": {url: {format: content, ...}, ...}, "links": {...}}
        contents = result.get('contents', {})

        if url in contents:
            content_data = contents[url]
            # Content is always a dict with format keys (e.g., {"html": "...", "markdown": "..."})
            content_str = content_data.get(format)

            if content_str:
                # For non-HTML formats from contents API, we don't have full metadata
                # Try to get status code from WARC if possible
                status_code = 200  # Default
                headers = {}
                duration = None
                log_id = None
                country = None

                # Try to get metadata from WARC
                try:
                    artifact = self.warc()
                    for record in artifact.iter_responses():
                        if record.url == url:
                            status_code = record.status_code
                            headers = record.headers
                            warc_headers = record.warc_headers or {}
                            duration_str = warc_headers.get('WARC-Scrape-Duration')
                            duration = float(duration_str) if duration_str else None
                            log_id = warc_headers.get('WARC-Scrape-Log-Id')
                            country = warc_headers.get('WARC-Scrape-Country')
                            break
                except:
                    pass

                return CrawlContent(
                    url=url,
                    content=content_str,
                    status_code=status_code,
                    headers=headers,
                    duration=duration,
                    log_id=log_id,
                    country=country,
                    crawl_uuid=self._uuid
                )

        return None

    except Exception:
        # If contents API fails, return None
        return None

Read content from a specific URL in the crawl results

Args

url
The URL to retrieve content for
format
Content format - 'html', 'markdown', 'text', 'clean_html', 'json', 'extracted_data', 'page_metadata'

Returns

CrawlContent object with content and metadata, or None if URL not found

Example

# Get HTML content for a specific URL
content = crawl.read('https://example.com/page1')
if content:
    print(f"URL: {content.url}")
    print(f"Status: {content.status_code}")
    print(f"Duration: {content.duration}s")
    print(content.content)

# Get markdown content
content = crawl.read('https://example.com/page1', format='markdown')
if content:
    print(content.content)

# Check if URL was crawled
if crawl.read('https://example.com/missing') is None:
    print("URL not found in crawl results")
def read_batch(self,
urls: List[str],
formats: List[Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata']] = None) ‑> Dict[str, Dict[str, str]]
Expand source code
def read_batch(
    self,
    urls: List[str],
    formats: List[ContentFormat] = None
) -> Dict[str, Dict[str, str]]:
    """
    Retrieve content for multiple URLs in a single batch request

    This is more efficient than calling read() multiple times as it retrieves
    all content in a single API call. Maximum 100 URLs per request.

    Args:
        urls: List of URLs to retrieve (max 100)
        formats: List of content formats to retrieve (e.g., ['markdown', 'text'])
                If None, defaults to ['html']

    Returns:
        Dictionary mapping URLs to their content in requested formats:
        {
            'https://example.com/page1': {
                'markdown': '# Page 1...',
                'text': 'Page 1...'
            },
            'https://example.com/page2': {
                'markdown': '# Page 2...',
                'text': 'Page 2...'
            }
        }

    Example:
        ```python
        # Get markdown and text for multiple URLs
        urls = ['https://example.com/page1', 'https://example.com/page2']
        contents = crawl.read_batch(urls, formats=['markdown', 'text'])

        for url, formats in contents.items():
            markdown = formats.get('markdown', '')
            text = formats.get('text', '')
            print(f"{url}: {len(markdown)} chars markdown, {len(text)} chars text")
        ```

    Raises:
        ValueError: If more than 100 URLs are provided
        ScrapflyCrawlerError: If crawler not started or request fails
    """
    if self._uuid is None:
        raise ScrapflyCrawlerError(
            message="Crawler not started yet. Call crawl() first.",
            code="NOT_STARTED",
            http_status_code=400
        )

    if len(urls) > 100:
        raise ValueError("Maximum 100 URLs per batch request")

    if not urls:
        return {}

    # Default to html if no formats specified
    if formats is None:
        formats = ['html']

    # Build URL with formats parameter
    formats_str = ','.join(formats)
    url = f"{self._client.host}/crawl/{self._uuid}/contents/batch"
    params = {
        'key': self._client.key,
        'formats': formats_str
    }

    # Prepare request body (newline-separated URLs)
    body = '\n'.join(urls)

    # Make request
    import requests
    response = requests.post(
        url,
        params=params,
        data=body.encode('utf-8'),
        headers={'Content-Type': 'text/plain'},
        verify=self._client.verify
    )

    if response.status_code != 200:
        raise ScrapflyCrawlerError(
            message=f"Batch content request failed: {response.status_code}",
            code="BATCH_REQUEST_FAILED",
            http_status_code=response.status_code
        )

    # Parse multipart response
    content_type = response.headers.get('Content-Type', '')
    if not content_type.startswith('multipart/related'):
        raise ScrapflyCrawlerError(
            message=f"Unexpected content type: {content_type}",
            code="INVALID_RESPONSE",
            http_status_code=500
        )

    # Extract boundary from Content-Type header
    boundary = None
    for part in content_type.split(';'):
        part = part.strip()
        if part.startswith('boundary='):
            boundary = part.split('=', 1)[1]
            break

    if not boundary:
        raise ScrapflyCrawlerError(
            message="No boundary found in multipart response",
            code="INVALID_RESPONSE",
            http_status_code=500
        )

    # Parse multipart message
    # Prepend Content-Type header to make it a valid email message for the parser
    message_bytes = f"Content-Type: {content_type}\r\n\r\n".encode('utf-8') + response.content
    parser = BytesParser(policy=default)
    message = parser.parsebytes(message_bytes)

    # Extract content from each part
    result = {}

    for part in message.walk():
        # Skip the container itself
        if part.get_content_maintype() == 'multipart':
            continue

        # Get the URL from Content-Location header
        content_location = part.get('Content-Location')
        if not content_location:
            continue

        # Get content type to determine format
        part_content_type = part.get_content_type()
        format_type = None

        # Map MIME types to format names
        if 'markdown' in part_content_type:
            format_type = 'markdown'
        elif 'plain' in part_content_type:
            format_type = 'text'
        elif 'html' in part_content_type:
            format_type = 'html'
        elif 'json' in part_content_type:
            format_type = 'json'

        if not format_type:
            continue

        # Get content
        content = part.get_content()
        if isinstance(content, bytes):
            content = content.decode('utf-8', errors='replace')

        # Initialize URL dict if needed
        if content_location not in result:
            result[content_location] = {}

        # Store content
        result[content_location][format_type] = content

    return result

Retrieve content for multiple URLs in a single batch request

This is more efficient than calling read() multiple times as it retrieves all content in a single API call. Maximum 100 URLs per request.

Args

urls
List of URLs to retrieve (max 100)
formats
List of content formats to retrieve (e.g., ['markdown', 'text']) If None, defaults to ['html']

Returns

Dictionary mapping URLs to their content in requested formats: { 'https://example.com/page1': { 'markdown': '# Page 1…', 'text': 'Page 1…' }, 'https://example.com/page2': { 'markdown': '# Page 2…', 'text': 'Page 2…' } }

Example

# Get markdown and text for multiple URLs
urls = ['https://example.com/page1', 'https://example.com/page2']
contents = crawl.read_batch(urls, formats=['markdown', 'text'])

for url, formats in contents.items():
    markdown = formats.get('markdown', '')
    text = formats.get('text', '')
    print(f"{url}: {len(markdown)} chars markdown, {len(text)} chars text")

Raises

ValueError
If more than 100 URLs are provided
ScrapflyCrawlerError
If crawler not started or request fails
def read_iter(self,
pattern: str,
format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html') ‑> Iterator[CrawlContent]
Expand source code
def read_iter(
    self,
    pattern: str,
    format: ContentFormat = 'html'
) -> Iterator[CrawlContent]:
    """
    Iterate through URLs matching a pattern and yield their content

    Supports wildcard patterns using * and ? for flexible URL matching.

    Args:
        pattern: URL pattern with wildcards (* matches any characters, ? matches one)
                Examples: "/products?page=*", "https://example.com/*/detail", "*/product/*"
        format: Content format to retrieve

    Yields:
        CrawlContent objects for each matching URL

    Example:
        ```python
        # Get all product pages in markdown
        for content in crawl.read_iter(pattern="*/products?page=*", format="markdown"):
            print(f"{content.url}: {len(content.content)} chars")
            print(f"Duration: {content.duration}s")

        # Get all detail pages
        for content in crawl.read_iter(pattern="*/detail/*"):
            process(content.content)

        # Pattern matching examples:
        # "/products?page=*" matches /products?page=1, /products?page=2, etc.
        # "*/product/*" matches any URL with /product/ in the path
        # "https://example.com/page?" matches https://example.com/page1, page2, etc.
        ```
    """
    if self._uuid is None:
        raise ScrapflyCrawlerError(
            message="Crawler not started yet. Call crawl() first.",
            code="NOT_STARTED",
            http_status_code=400
        )

    # For HTML format, use WARC artifact (faster)
    if format == 'html':
        artifact = self.warc()
        for record in artifact.iter_responses():
            if fnmatch.fnmatch(record.url, pattern):
                # Extract metadata from WARC headers
                warc_headers = record.warc_headers or {}
                duration_str = warc_headers.get('WARC-Scrape-Duration')
                duration = float(duration_str) if duration_str else None

                yield CrawlContent(
                    url=record.url,
                    content=record.content.decode('utf-8', errors='replace'),
                    status_code=record.status_code,
                    headers=record.headers,
                    duration=duration,
                    log_id=warc_headers.get('WARC-Scrape-Log-Id'),
                    country=warc_headers.get('WARC-Scrape-Country'),
                    crawl_uuid=self._uuid
                )
    else:
        # For other formats, use contents API
        try:
            result = self._client.get_crawl_contents(
                self._uuid,
                format=format
            )

            contents = result.get('contents', {})

            # Build a metadata cache from WARC for non-HTML formats
            metadata_cache = {}
            try:
                artifact = self.warc()
                for record in artifact.iter_responses():
                    warc_headers = record.warc_headers or {}
                    duration_str = warc_headers.get('WARC-Scrape-Duration')
                    metadata_cache[record.url] = {
                        'status_code': record.status_code,
                        'headers': record.headers,
                        'duration': float(duration_str) if duration_str else None,
                        'log_id': warc_headers.get('WARC-Scrape-Log-Id'),
                        'country': warc_headers.get('WARC-Scrape-Country')
                    }
            except:
                pass

            # Iterate through matching URLs
            for url, content_data in contents.items():
                if fnmatch.fnmatch(url, pattern):
                    # Content is always a dict with format keys (e.g., {"html": "...", "markdown": "..."})
                    content = content_data.get(format)

                    if content:
                        # Get metadata from cache or use defaults
                        metadata = metadata_cache.get(url, {})
                        yield CrawlContent(
                            url=url,
                            content=content,
                            status_code=metadata.get('status_code', 200),
                            headers=metadata.get('headers', {}),
                            duration=metadata.get('duration'),
                            log_id=metadata.get('log_id'),
                            country=metadata.get('country'),
                            crawl_uuid=self._uuid
                        )

        except Exception:
            # If contents API fails, yield nothing
            return

Iterate through URLs matching a pattern and yield their content

Supports wildcard patterns using * and ? for flexible URL matching.

Args

pattern
URL pattern with wildcards ( matches any characters, ? matches one) Examples: "/products?page=", "https://example.com//detail", "/product/*"
format
Content format to retrieve

Yields

CrawlContent objects for each matching URL

Example

# Get all product pages in markdown
for content in crawl.read_iter(pattern="*/products?page=*", format="markdown"):
    print(f"{content.url}: {len(content.content)} chars")
    print(f"Duration: {content.duration}s")

# Get all detail pages
for content in crawl.read_iter(pattern="*/detail/*"):
    process(content.content)

# Pattern matching examples:
# "/products?page=*" matches /products?page=1, /products?page=2, etc.
# "*/product/*" matches any URL with /product/ in the path
# "https://example.com/page?" matches <https://example.com/page1,> page2, etc.
def stats(self) ‑> Dict[str, Any]
Expand source code
def stats(self) -> Dict[str, Any]:
    """
    Get comprehensive statistics about the crawl

    Returns:
        Dictionary with crawl statistics

    Example:
        ```python
        stats = crawl.stats()
        print(f"URLs discovered: {stats['urls_discovered']}")
        print(f"URLs crawled: {stats['urls_crawled']}")
        print(f"Success rate: {stats['success_rate']:.1f}%")
        print(f"Total size: {stats['total_size_kb']:.2f} KB")
        ```
    """
    status = self.status(refresh=False)

    # Basic stats from status
    stats_dict = {
        'uuid': self._uuid,
        'status': status.status,
        'urls_discovered': status.urls_discovered,
        'urls_crawled': status.urls_crawled,
        'urls_pending': status.urls_pending,
        'urls_failed': status.urls_failed,
        'progress_pct': status.progress_pct,
        'is_complete': status.is_complete,
        'is_running': status.is_running,
        'is_failed': status.is_failed,
    }

    # Calculate basic crawl rate (crawled vs discovered)
    if status.urls_discovered > 0:
        stats_dict['crawl_rate'] = (status.urls_crawled / status.urls_discovered) * 100

    # Add artifact stats if available
    if self._artifact_cache is not None:
        pages = self._artifact_cache.get_pages()
        total_size = sum(len(p['content']) for p in pages)
        avg_size = total_size / len(pages) if pages else 0

        stats_dict.update({
            'pages_downloaded': len(pages),
            'total_size_bytes': total_size,
            'total_size_kb': total_size / 1024,
            'total_size_mb': total_size / (1024 * 1024),
            'avg_page_size_bytes': avg_size,
            'avg_page_size_kb': avg_size / 1024,
        })

        # Calculate download rate (pages vs discovered)
        if status.urls_discovered > 0:
            stats_dict['download_rate'] = (len(pages) / status.urls_discovered) * 100

    return stats_dict

Get comprehensive statistics about the crawl

Returns

Dictionary with crawl statistics

Example

stats = crawl.stats()
print(f"URLs discovered: {stats['urls_discovered']}")
print(f"URLs crawled: {stats['urls_crawled']}")
print(f"Success rate: {stats['success_rate']:.1f}%")
print(f"Total size: {stats['total_size_kb']:.2f} KB")
def status(self, refresh: bool = True) ‑> CrawlerStatusResponse
Expand source code
def status(self, refresh: bool = True) -> CrawlerStatusResponse:
    """
    Get current crawler status

    Args:
        refresh: If True, fetch fresh status from API. If False, return cached status.

    Returns:
        CrawlerStatusResponse with current status

    Raises:
        RuntimeError: If crawler not started yet

    Example:
        ```python
        status = crawl.status()
        print(f"Progress: {status.progress_pct}%")
        print(f"URLs crawled: {status.urls_crawled}")
        ```
    """
    if self._uuid is None:
        raise ScrapflyCrawlerError(
            message="Crawler not started yet. Call crawl() first.",
            code="NOT_STARTED",
            http_status_code=400
        )

    if refresh or self._status_cache is None:
        self._status_cache = self._client.get_crawl_status(self._uuid)

    return self._status_cache

Get current crawler status

Args

refresh
If True, fetch fresh status from API. If False, return cached status.

Returns

CrawlerStatusResponse with current status

Raises

RuntimeError
If crawler not started yet

Example

status = crawl.status()
print(f"Progress: {status.progress_pct}%")
print(f"URLs crawled: {status.urls_crawled}")
def wait(self, poll_interval: int = 5, max_wait: int | None = None, verbose: bool = False) ‑> Crawl
Expand source code
def wait(
    self,
    poll_interval: int = 5,
    max_wait: Optional[int] = None,
    verbose: bool = False
) -> 'Crawl':
    """
    Wait for crawler to complete

    Polls the status endpoint until the crawler finishes.

    Args:
        poll_interval: Seconds between status checks (default: 5)
        max_wait: Maximum seconds to wait (None = wait forever)
        verbose: If True, print progress updates

    Returns:
        Self for method chaining

    Raises:
        RuntimeError: If crawler not started, failed, or timed out

    Example:
        ```python
        # Wait with progress updates
        crawl.crawl().wait(verbose=True)

        # Wait with timeout
        crawl.crawl().wait(max_wait=300)  # 5 minutes max
        ```
    """
    if self._uuid is None:
        raise ScrapflyCrawlerError(
            message="Crawler not started yet. Call crawl() first.",
            code="NOT_STARTED",
            http_status_code=400
        )

    start_time = time.time()
    poll_count = 0

    while True:
        status = self.status(refresh=True)
        poll_count += 1

        if verbose:
            logger.info(f"Poll #{poll_count}: {status.status} - "
                       f"{status.progress_pct:.1f}% - "
                       f"{status.urls_crawled}/{status.urls_discovered} URLs")

        if status.is_complete:
            if verbose:
                logger.info(f"✓ Crawler completed successfully!")
            return self
        elif status.is_failed:
            raise ScrapflyCrawlerError(
                message=f"Crawler failed with status: {status.status}",
                code="FAILED",
                http_status_code=400
            )
        elif status.is_cancelled:
            raise ScrapflyCrawlerError(
                message="Crawler was cancelled",
                code="CANCELLED",
                http_status_code=400
            )

        # Check timeout
        if max_wait is not None:
            elapsed = time.time() - start_time
            if elapsed > max_wait:
                raise ScrapflyCrawlerError(
                    message=f"Timeout waiting for crawler (>{max_wait}s)",
                    code="TIMEOUT",
                    http_status_code=400
                )

        time.sleep(poll_interval)

Wait for crawler to complete

Polls the status endpoint until the crawler finishes.

Args

poll_interval
Seconds between status checks (default: 5)
max_wait
Maximum seconds to wait (None = wait forever)
verbose
If True, print progress updates

Returns

Self for method chaining

Raises

RuntimeError
If crawler not started, failed, or timed out

Example

# Wait with progress updates
crawl.crawl().wait(verbose=True)

# Wait with timeout
crawl.crawl().wait(max_wait=300)  # 5 minutes max
def warc(self, artifact_type: str = 'warc') ‑> CrawlerArtifactResponse
Expand source code
def warc(self, artifact_type: str = 'warc') -> CrawlerArtifactResponse:
    """
    Download the crawler artifact (WARC file)

    Args:
        artifact_type: Type of artifact to download (default: 'warc')

    Returns:
        CrawlerArtifactResponse with parsed WARC data

    Raises:
        RuntimeError: If crawler not started yet

    Example:
        ```python
        # Get WARC artifact
        artifact = crawl.warc()

        # Get all pages
        pages = artifact.get_pages()

        # Iterate through responses
        for record in artifact.iter_responses():
            print(record.url)
        ```
    """
    if self._uuid is None:
        raise ScrapflyCrawlerError(
            message="Crawler not started yet. Call crawl() first.",
            code="NOT_STARTED",
            http_status_code=400
        )

    if self._artifact_cache is None:
        self._artifact_cache = self._client.get_crawl_artifact(
            self._uuid,
            artifact_type=artifact_type
        )

    return self._artifact_cache

Download the crawler artifact (WARC file)

Args

artifact_type
Type of artifact to download (default: 'warc')

Returns

CrawlerArtifactResponse with parsed WARC data

Raises

RuntimeError
If crawler not started yet

Example

# Get WARC artifact
artifact = crawl.warc()

# Get all pages
pages = artifact.get_pages()

# Iterate through responses
for record in artifact.iter_responses():
    print(record.url)
class CrawlCompletedWebhook (event: str,
uuid: str,
timestamp: datetime.datetime,
status: str,
urls_discovered: int,
urls_crawled: int,
urls_failed: int)
Expand source code
@dataclass
class CrawlCompletedWebhook(CrawlerWebhookBase):
    """
    Webhook payload for crawl.completed event.

    Sent when a crawler job completes (successfully or with errors).

    Additional fields:
    - status: Final crawler status (COMPLETED, FAILED, etc.)
    - urls_discovered: Total number of URLs discovered
    - urls_crawled: Number of URLs successfully crawled
    - urls_failed: Number of URLs that failed

    Example payload:
    {
        "event": "crawl.completed",
        "uuid": "550e8400-e29b-41d4-a716-446655440000",
        "status": "COMPLETED",
        "urls_discovered": 100,
        "urls_crawled": 95,
        "urls_failed": 5,
        "timestamp": "2025-01-16T10:35:00Z"
    }
    """
    status: str
    urls_discovered: int
    urls_crawled: int
    urls_failed: int

    @classmethod
    def from_dict(cls, data: Dict) -> 'CrawlCompletedWebhook':
        """Create webhook instance from dictionary payload"""
        base = CrawlerWebhookBase.from_dict(data)
        return cls(
            event=base.event,
            uuid=base.uuid,
            timestamp=base.timestamp,
            status=data['status'],
            urls_discovered=data['urls_discovered'],
            urls_crawled=data['urls_crawled'],
            urls_failed=data['urls_failed']
        )

Webhook payload for crawl.completed event.

Sent when a crawler job completes (successfully or with errors).

Additional fields: - status: Final crawler status (COMPLETED, FAILED, etc.) - urls_discovered: Total number of URLs discovered - urls_crawled: Number of URLs successfully crawled - urls_failed: Number of URLs that failed

Example payload: { "event": "crawl.completed", "uuid": "550e8400-e29b-41d4-a716-446655440000", "status": "COMPLETED", "urls_discovered": 100, "urls_crawled": 95, "urls_failed": 5, "timestamp": "2025-01-16T10:35:00Z" }

Ancestors

Instance variables

var status : str
var urls_crawled : int
var urls_discovered : int
var urls_failed : int

Inherited members

class CrawlContent (url: str,
content: str,
status_code: int,
headers: Dict[str, str] | None = None,
duration: float | None = None,
log_id: str | None = None,
country: str | None = None,
crawl_uuid: str | None = None)
Expand source code
class CrawlContent:
    """
    Response object for a single crawled URL

    Provides access to content and metadata for a crawled page.
    Similar to ScrapeApiResponse but for crawler results.

    Attributes:
        url: The crawled URL (mandatory)
        content: Page content in requested format (mandatory)
        status_code: HTTP response status code (mandatory)
        headers: HTTP response headers (optional)
        duration: Request duration in seconds (optional)
        log_id: Scrape log ID for debugging (optional)
        log_url: URL to view scrape logs (optional)
        country: Country the request was made from (optional)

    Example:
        ```python
        # Get content for a URL
        content = crawl.read('https://example.com', format='markdown')

        print(f"URL: {content.url}")
        print(f"Status: {content.status_code}")
        print(f"Duration: {content.duration}s")
        print(f"Content: {content.content}")

        # Access metadata
        if content.log_url:
            print(f"View logs: {content.log_url}")
        ```
    """

    def __init__(
        self,
        url: str,
        content: str,
        status_code: int,
        headers: Optional[Dict[str, str]] = None,
        duration: Optional[float] = None,
        log_id: Optional[str] = None,
        country: Optional[str] = None,
        crawl_uuid: Optional[str] = None
    ):
        """
        Initialize CrawlContent

        Args:
            url: The crawled URL
            content: Page content in requested format
            status_code: HTTP response status code
            headers: HTTP response headers
            duration: Request duration in seconds
            log_id: Scrape log ID
            country: Country the request was made from
            crawl_uuid: Crawl job UUID
        """
        self.url = url
        self.content = content
        self.status_code = status_code
        self.headers = headers or {}
        self.duration = duration
        self.log_id = log_id
        self.country = country
        self._crawl_uuid = crawl_uuid

    @property
    def log_url(self) -> Optional[str]:
        """
        Get URL to view scrape logs

        Returns:
            Log URL if log_id is available, None otherwise
        """
        if self.log_id:
            return f"https://scrapfly.io/dashboard/logs/{self.log_id}"
        return None

    @property
    def success(self) -> bool:
        """Check if the request was successful (2xx status code)"""
        return 200 <= self.status_code < 300

    @property
    def error(self) -> bool:
        """Check if the request resulted in an error (4xx/5xx status code)"""
        return self.status_code >= 400

    def __repr__(self) -> str:
        return (f"CrawlContent(url={self.url!r}, status={self.status_code}, "
                f"content_length={len(self.content)})")

    def __str__(self) -> str:
        return self.content

    def __len__(self) -> int:
        """Get content length"""
        return len(self.content)

Response object for a single crawled URL

Provides access to content and metadata for a crawled page. Similar to ScrapeApiResponse but for crawler results.

Attributes

url
The crawled URL (mandatory)
content
Page content in requested format (mandatory)
status_code
HTTP response status code (mandatory)
headers
HTTP response headers (optional)
duration
Request duration in seconds (optional)
log_id
Scrape log ID for debugging (optional)
log_url
URL to view scrape logs (optional)
country
Country the request was made from (optional)

Example

# Get content for a URL
content = crawl.read('https://example.com', format='markdown')

print(f"URL: {content.url}")
print(f"Status: {content.status_code}")
print(f"Duration: {content.duration}s")
print(f"Content: {content.content}")

# Access metadata
if content.log_url:
    print(f"View logs: {content.log_url}")

Initialize CrawlContent

Args

url
The crawled URL
content
Page content in requested format
status_code
HTTP response status code
headers
HTTP response headers
duration
Request duration in seconds
log_id
Scrape log ID
country
Country the request was made from
crawl_uuid
Crawl job UUID

Instance variables

prop error : bool
Expand source code
@property
def error(self) -> bool:
    """Check if the request resulted in an error (4xx/5xx status code)"""
    return self.status_code >= 400

Check if the request resulted in an error (4xx/5xx status code)

prop log_url : str | None
Expand source code
@property
def log_url(self) -> Optional[str]:
    """
    Get URL to view scrape logs

    Returns:
        Log URL if log_id is available, None otherwise
    """
    if self.log_id:
        return f"https://scrapfly.io/dashboard/logs/{self.log_id}"
    return None

Get URL to view scrape logs

Returns

Log URL if log_id is available, None otherwise

prop success : bool
Expand source code
@property
def success(self) -> bool:
    """Check if the request was successful (2xx status code)"""
    return 200 <= self.status_code < 300

Check if the request was successful (2xx status code)

class CrawlStartedWebhook (event: str, uuid: str, timestamp: datetime.datetime, status: str)
Expand source code
@dataclass
class CrawlStartedWebhook(CrawlerWebhookBase):
    """
    Webhook payload for crawl.started event.

    Sent when a crawler job starts running.

    Additional fields:
    - status: Current crawler status (should be 'RUNNING')

    Example payload:
    {
        "event": "crawl.started",
        "uuid": "550e8400-e29b-41d4-a716-446655440000",
        "status": "RUNNING",
        "timestamp": "2025-01-16T10:30:00Z"
    }
    """
    status: str

    @classmethod
    def from_dict(cls, data: Dict) -> 'CrawlStartedWebhook':
        """Create webhook instance from dictionary payload"""
        base = CrawlerWebhookBase.from_dict(data)
        return cls(
            event=base.event,
            uuid=base.uuid,
            timestamp=base.timestamp,
            status=data['status']
        )

Webhook payload for crawl.started event.

Sent when a crawler job starts running.

Additional fields: - status: Current crawler status (should be 'RUNNING')

Example payload: { "event": "crawl.started", "uuid": "550e8400-e29b-41d4-a716-446655440000", "status": "RUNNING", "timestamp": "2025-01-16T10:30:00Z" }

Ancestors

Instance variables

var status : str

Inherited members

class CrawlUrlDiscoveredWebhook (event: str, uuid: str, timestamp: datetime.datetime, url: str, depth: int)
Expand source code
@dataclass
class CrawlUrlDiscoveredWebhook(CrawlerWebhookBase):
    """
    Webhook payload for crawl.url_discovered event.

    Sent when a new URL is discovered during crawling.

    Additional fields:
    - url: The discovered URL
    - depth: Depth level of the URL from the starting URL

    Example payload:
    {
        "event": "crawl.url_discovered",
        "uuid": "550e8400-e29b-41d4-a716-446655440000",
        "url": "https://example.com/page",
        "depth": 1,
        "timestamp": "2025-01-16T10:30:05Z"
    }
    """
    url: str
    depth: int

    @classmethod
    def from_dict(cls, data: Dict) -> 'CrawlUrlDiscoveredWebhook':
        """Create webhook instance from dictionary payload"""
        base = CrawlerWebhookBase.from_dict(data)
        return cls(
            event=base.event,
            uuid=base.uuid,
            timestamp=base.timestamp,
            url=data['url'],
            depth=data['depth']
        )

Webhook payload for crawl.url_discovered event.

Sent when a new URL is discovered during crawling.

Additional fields: - url: The discovered URL - depth: Depth level of the URL from the starting URL

Example payload: { "event": "crawl.url_discovered", "uuid": "550e8400-e29b-41d4-a716-446655440000", "url": "https://example.com/page", "depth": 1, "timestamp": "2025-01-16T10:30:05Z" }

Ancestors

Instance variables

var depth : int
var url : str

Inherited members

class CrawlUrlFailedWebhook (event: str,
uuid: str,
timestamp: datetime.datetime,
url: str,
error: str,
status_code: int | None = None)
Expand source code
@dataclass
class CrawlUrlFailedWebhook(CrawlerWebhookBase):
    """
    Webhook payload for crawl.url_failed event.

    Sent when a URL fails to be crawled.

    Additional fields:
    - url: The URL that failed
    - error: Error message describing the failure
    - status_code: HTTP status code if available (optional)

    Example payload:
    {
        "event": "crawl.url_failed",
        "uuid": "550e8400-e29b-41d4-a716-446655440000",
        "url": "https://example.com/page",
        "error": "HTTP 404 Not Found",
        "status_code": 404,
        "timestamp": "2025-01-16T10:30:10Z"
    }
    """
    url: str
    error: str
    status_code: Optional[int] = None

    @classmethod
    def from_dict(cls, data: Dict) -> 'CrawlUrlFailedWebhook':
        """Create webhook instance from dictionary payload"""
        base = CrawlerWebhookBase.from_dict(data)
        return cls(
            event=base.event,
            uuid=base.uuid,
            timestamp=base.timestamp,
            url=data['url'],
            error=data['error'],
            status_code=data.get('status_code')
        )

Webhook payload for crawl.url_failed event.

Sent when a URL fails to be crawled.

Additional fields: - url: The URL that failed - error: Error message describing the failure - status_code: HTTP status code if available (optional)

Example payload: { "event": "crawl.url_failed", "uuid": "550e8400-e29b-41d4-a716-446655440000", "url": "https://example.com/page", "error": "HTTP 404 Not Found", "status_code": 404, "timestamp": "2025-01-16T10:30:10Z" }

Ancestors

Instance variables

var error : str
var status_code : int | None
var url : str

Inherited members

class CrawlerArtifactResponse (artifact_data: bytes, artifact_type: str = 'warc')
Expand source code
class CrawlerArtifactResponse:
    """
    Response from downloading crawler artifacts

    Returned by ScrapflyClient.get_crawl_artifact() method.

    Provides high-level access to crawl results with automatic WARC/HAR parsing.
    Users don't need to understand WARC or HAR format to use this class.

    Example:
        ```python
        # Get WARC artifact (default)
        artifact = client.get_crawl_artifact(uuid)

        # Get HAR artifact
        artifact = client.get_crawl_artifact(uuid, artifact_type='har')

        # Easy mode: get all pages as dicts
        pages = artifact.get_pages()
        for page in pages:
            print(f"{page['url']}: {page['status_code']}")
            html = page['content'].decode('utf-8')

        # Memory-efficient: iterate one page at a time
        for record in artifact.iter_responses():
            print(f"{record.url}: {record.status_code}")
            process(record.content)

        # Save to file
        artifact.save('crawl_results.warc.gz')
        ```
    """

    def __init__(self, artifact_data: bytes, artifact_type: str = 'warc'):
        """
        Initialize from artifact data

        Args:
            artifact_data: Raw artifact file bytes
            artifact_type: Type of artifact ('warc' or 'har')
        """
        self._artifact_data = artifact_data
        self._artifact_type = artifact_type
        self._warc_parser: Optional[WarcParser] = None
        self._har_parser: Optional[HarArchive] = None

    @property
    def artifact_type(self) -> str:
        """Get artifact type ('warc' or 'har')"""
        return self._artifact_type

    @property
    def artifact_data(self) -> bytes:
        """Get raw artifact data (for advanced users)"""
        return self._artifact_data

    @property
    def warc_data(self) -> bytes:
        """Get raw WARC data (deprecated, use artifact_data)"""
        return self._artifact_data

    @property
    def parser(self) -> Union[WarcParser, HarArchive]:
        """Get artifact parser instance (lazy-loaded)"""
        if self._artifact_type == 'har':
            if self._har_parser is None:
                self._har_parser = HarArchive(self._artifact_data)
            return self._har_parser
        else:
            if self._warc_parser is None:
                self._warc_parser = parse_warc(self._artifact_data)
            return self._warc_parser

    def iter_records(self) -> Iterator[Union[WarcRecord, HarEntry]]:
        """
        Iterate through all records

        For WARC: iterates through all WARC records
        For HAR: iterates through all HAR entries

        Yields:
            WarcRecord or HarEntry: Each record in the artifact
        """
        if self._artifact_type == 'har':
            return self.parser.iter_entries()
        else:
            return self.parser.iter_records()

    def iter_responses(self) -> Iterator[Union[WarcRecord, HarEntry]]:
        """
        Iterate through HTTP response records only

        This is more memory-efficient than get_pages() for large crawls.

        For WARC: iterates through response records
        For HAR: iterates through all entries (HAR only contains responses)

        Yields:
            WarcRecord or HarEntry: HTTP response records with url, status_code, headers, content
        """
        if self._artifact_type == 'har':
            return self.parser.iter_entries()
        else:
            return self.parser.iter_responses()

    def get_pages(self) -> List[Dict]:
        """
        Get all crawled pages as simple dictionaries

        This is the easiest way to access crawl results.
        Works with both WARC and HAR formats.

        Returns:
            List of dicts with keys: url, status_code, headers, content

        Example:
            ```python
            pages = artifact.get_pages()
            for page in pages:
                print(f"{page['url']}: {len(page['content'])} bytes")
                html = page['content'].decode('utf-8')
            ```
        """
        if self._artifact_type == 'har':
            # Convert HAR entries to page dicts
            pages = []
            for entry in self.parser.iter_entries():
                pages.append({
                    'url': entry.url,
                    'status_code': entry.status_code,
                    'headers': entry.response_headers,
                    'content': entry.content
                })
            return pages
        else:
            return self.parser.get_pages()

    @property
    def total_pages(self) -> int:
        """Get total number of pages in the artifact"""
        return len(self.get_pages())

    def save(self, filepath: str):
        """
        Save WARC data to file

        Args:
            filepath: Path to save the WARC file

        Example:
            ```python
            artifact.save('crawl_results.warc.gz')
            ```
        """
        with open(filepath, 'wb') as f:
            f.write(self.warc_data)

    def __repr__(self):
        return f"CrawlerArtifactResponse(size={len(self.warc_data)} bytes)"

Response from downloading crawler artifacts

Returned by ScrapflyClient.get_crawl_artifact() method.

Provides high-level access to crawl results with automatic WARC/HAR parsing. Users don't need to understand WARC or HAR format to use this class.

Example

# Get WARC artifact (default)
artifact = client.get_crawl_artifact(uuid)

# Get HAR artifact
artifact = client.get_crawl_artifact(uuid, artifact_type='har')

# Easy mode: get all pages as dicts
pages = artifact.get_pages()
for page in pages:
    print(f"{page['url']}: {page['status_code']}")
    html = page['content'].decode('utf-8')

# Memory-efficient: iterate one page at a time
for record in artifact.iter_responses():
    print(f"{record.url}: {record.status_code}")
    process(record.content)

# Save to file
artifact.save('crawl_results.warc.gz')

Initialize from artifact data

Args

artifact_data
Raw artifact file bytes
artifact_type
Type of artifact ('warc' or 'har')

Instance variables

prop artifact_data : bytes
Expand source code
@property
def artifact_data(self) -> bytes:
    """Get raw artifact data (for advanced users)"""
    return self._artifact_data

Get raw artifact data (for advanced users)

prop artifact_type : str
Expand source code
@property
def artifact_type(self) -> str:
    """Get artifact type ('warc' or 'har')"""
    return self._artifact_type

Get artifact type ('warc' or 'har')

prop parserWarcParser | HarArchive
Expand source code
@property
def parser(self) -> Union[WarcParser, HarArchive]:
    """Get artifact parser instance (lazy-loaded)"""
    if self._artifact_type == 'har':
        if self._har_parser is None:
            self._har_parser = HarArchive(self._artifact_data)
        return self._har_parser
    else:
        if self._warc_parser is None:
            self._warc_parser = parse_warc(self._artifact_data)
        return self._warc_parser

Get artifact parser instance (lazy-loaded)

prop total_pages : int
Expand source code
@property
def total_pages(self) -> int:
    """Get total number of pages in the artifact"""
    return len(self.get_pages())

Get total number of pages in the artifact

prop warc_data : bytes
Expand source code
@property
def warc_data(self) -> bytes:
    """Get raw WARC data (deprecated, use artifact_data)"""
    return self._artifact_data

Get raw WARC data (deprecated, use artifact_data)

Methods

def get_pages(self) ‑> List[Dict]
Expand source code
def get_pages(self) -> List[Dict]:
    """
    Get all crawled pages as simple dictionaries

    This is the easiest way to access crawl results.
    Works with both WARC and HAR formats.

    Returns:
        List of dicts with keys: url, status_code, headers, content

    Example:
        ```python
        pages = artifact.get_pages()
        for page in pages:
            print(f"{page['url']}: {len(page['content'])} bytes")
            html = page['content'].decode('utf-8')
        ```
    """
    if self._artifact_type == 'har':
        # Convert HAR entries to page dicts
        pages = []
        for entry in self.parser.iter_entries():
            pages.append({
                'url': entry.url,
                'status_code': entry.status_code,
                'headers': entry.response_headers,
                'content': entry.content
            })
        return pages
    else:
        return self.parser.get_pages()

Get all crawled pages as simple dictionaries

This is the easiest way to access crawl results. Works with both WARC and HAR formats.

Returns

List of dicts with keys
url, status_code, headers, content

Example

pages = artifact.get_pages()
for page in pages:
    print(f"{page['url']}: {len(page['content'])} bytes")
    html = page['content'].decode('utf-8')
def iter_records(self) ‑> Iterator[WarcRecord | HarEntry]
Expand source code
def iter_records(self) -> Iterator[Union[WarcRecord, HarEntry]]:
    """
    Iterate through all records

    For WARC: iterates through all WARC records
    For HAR: iterates through all HAR entries

    Yields:
        WarcRecord or HarEntry: Each record in the artifact
    """
    if self._artifact_type == 'har':
        return self.parser.iter_entries()
    else:
        return self.parser.iter_records()

Iterate through all records

For WARC: iterates through all WARC records For HAR: iterates through all HAR entries

Yields

WarcRecord or HarEntry
Each record in the artifact
def iter_responses(self) ‑> Iterator[WarcRecord | HarEntry]
Expand source code
def iter_responses(self) -> Iterator[Union[WarcRecord, HarEntry]]:
    """
    Iterate through HTTP response records only

    This is more memory-efficient than get_pages() for large crawls.

    For WARC: iterates through response records
    For HAR: iterates through all entries (HAR only contains responses)

    Yields:
        WarcRecord or HarEntry: HTTP response records with url, status_code, headers, content
    """
    if self._artifact_type == 'har':
        return self.parser.iter_entries()
    else:
        return self.parser.iter_responses()

Iterate through HTTP response records only

This is more memory-efficient than get_pages() for large crawls.

For WARC: iterates through response records For HAR: iterates through all entries (HAR only contains responses)

Yields

WarcRecord or HarEntry
HTTP response records with url, status_code, headers, content
def save(self, filepath: str)
Expand source code
def save(self, filepath: str):
    """
    Save WARC data to file

    Args:
        filepath: Path to save the WARC file

    Example:
        ```python
        artifact.save('crawl_results.warc.gz')
        ```
    """
    with open(filepath, 'wb') as f:
        f.write(self.warc_data)

Save WARC data to file

Args

filepath
Path to save the WARC file

Example

artifact.save('crawl_results.warc.gz')
class CrawlerConfig (url: str,
page_limit: int | None = None,
max_depth: int | None = None,
max_duration: int | None = None,
exclude_paths: List[str] | None = None,
include_only_paths: List[str] | None = None,
ignore_base_path_restriction: bool = False,
follow_external_links: bool = False,
allowed_external_domains: List[str] | None = None,
headers: Dict[str, str] | None = None,
delay: int | None = None,
user_agent: str | None = None,
max_concurrency: int | None = None,
rendering_delay: int | None = None,
use_sitemaps: bool = False,
respect_robots_txt: bool = False,
ignore_no_follow: bool = False,
cache: bool = False,
cache_ttl: int | None = None,
cache_clear: bool = False,
content_formats: List[Literal['html', 'markdown', 'text', 'clean_html']] | None = None,
extraction_rules: Dict | None = None,
asp: bool = False,
proxy_pool: str | None = None,
country: str | None = None,
webhook_name: str | None = None,
webhook_events: List[str] | None = None,
max_api_credit: int | None = None)
Expand source code
class CrawlerConfig(BaseApiConfig):
    """
    Configuration for Scrapfly Crawler API

    The Crawler API performs recursive website crawling with advanced
    configuration, content extraction, and artifact storage.

    Example:
        ```python
        from scrapfly import ScrapflyClient, CrawlerConfig
        client = ScrapflyClient(key='YOUR_API_KEY')
        config = CrawlerConfig(
            url='https://example.com',
            page_limit=100,
            max_depth=3,
            content_formats=['markdown', 'html']
        )

        # Start crawl
        start_response = client.start_crawl(config)
        uuid = start_response.uuid

        # Poll status
        status = client.get_crawl_status(uuid)

        # Get results when complete
        if status.is_complete:
            artifact = client.get_crawl_artifact(uuid)
            pages = artifact.get_pages()
        ```
    """

    WEBHOOK_CRAWLER_STARTED = 'crawler_started'
    WEBHOOK_CRAWLER_URL_VISITED = 'crawler_url_visited'
    WEBHOOK_CRAWLER_URL_SKIPPED = 'crawler_url_skipped'
    WEBHOOK_CRAWLER_URL_DISCOVERED = 'crawler_url_discovered'
    WEBHOOK_CRAWLER_URL_FAILED = 'crawler_url_failed'
    WEBHOOK_CRAWLER_STOPPED = 'crawler_stopped'
    WEBHOOK_CRAWLER_CANCELLED = 'crawler_cancelled'
    WEBHOOK_CRAWLER_FINISHED = 'crawler_finished'

    ALL_WEBHOOK_EVENTS = [
        WEBHOOK_CRAWLER_STARTED,
        WEBHOOK_CRAWLER_URL_VISITED,
        WEBHOOK_CRAWLER_URL_SKIPPED,
        WEBHOOK_CRAWLER_URL_DISCOVERED,
        WEBHOOK_CRAWLER_URL_FAILED,
        WEBHOOK_CRAWLER_STOPPED,
        WEBHOOK_CRAWLER_CANCELLED,
        WEBHOOK_CRAWLER_FINISHED,
    ]

    def __init__(
        self,
        url: str,
        # Crawl limits
        page_limit: Optional[int] = None,
        max_depth: Optional[int] = None,
        max_duration: Optional[int] = None,

        # Path filtering (mutually exclusive)
        exclude_paths: Optional[List[str]] = None,
        include_only_paths: Optional[List[str]] = None,

        # Advanced crawl options
        ignore_base_path_restriction: bool = False,
        follow_external_links: bool = False,
        allowed_external_domains: Optional[List[str]] = None,

        # Request configuration
        headers: Optional[Dict[str, str]] = None,
        delay: Optional[int] = None,
        user_agent: Optional[str] = None,
        max_concurrency: Optional[int] = None,
        rendering_delay: Optional[int] = None,

        # Crawl strategy options
        use_sitemaps: bool = False,
        respect_robots_txt: bool = False,
        ignore_no_follow: bool = False,

        # Cache options
        cache: bool = False,
        cache_ttl: Optional[int] = None,
        cache_clear: bool = False,

        # Content extraction
        content_formats: Optional[List[Literal['html', 'markdown', 'text', 'clean_html']]] = None,
        extraction_rules: Optional[Dict] = None,

        # Web scraping features
        asp: bool = False,
        proxy_pool: Optional[str] = None,
        country: Optional[str] = None,

        # Webhook integration
        webhook_name: Optional[str] = None,
        webhook_events: Optional[List[str]] = None,

        # Cost control
        max_api_credit: Optional[int] = None
    ):
        """
        Initialize a CrawlerConfig

        Args:
            url: Starting URL for the crawl (required)
            page_limit: Maximum number of pages to crawl
            max_depth: Maximum crawl depth from starting URL
            max_duration: Maximum crawl duration in seconds

            exclude_paths: List of path patterns to exclude (mutually exclusive with include_only_paths)
            include_only_paths: List of path patterns to include only (mutually exclusive with exclude_paths)

            ignore_base_path_restriction: Allow crawling outside the base path
            follow_external_links: Follow links to external domains
            allowed_external_domains: List of external domains allowed when follow_external_links is True

            headers: Custom HTTP headers for requests
            delay: Delay between requests in milliseconds
            user_agent: Custom user agent string
            max_concurrency: Maximum concurrent requests
            rendering_delay: Delay for JavaScript rendering in milliseconds

            use_sitemaps: Use sitemap.xml to discover URLs
            respect_robots_txt: Respect robots.txt rules
            ignore_no_follow: Ignore rel="nofollow" attributes

            cache: Enable caching
            cache_ttl: Cache time-to-live in seconds
            cache_clear: Clear cache before crawling

            content_formats: List of content formats to extract ('html', 'markdown', 'text', 'clean_html')
            extraction_rules: Custom extraction rules

            asp: Enable Anti-Scraping Protection bypass
            proxy_pool: Proxy pool to use (e.g., 'public_residential_pool')
            country: Target country for geo-located content

            webhook_name: Webhook name for event notifications
            webhook_events: List of webhook events to trigger

            max_api_credit: Maximum API credits to spend on this crawl
        """
        if exclude_paths and include_only_paths:
            raise ValueError("exclude_paths and include_only_paths are mutually exclusive")

        params = {
            'url': url,
        }

        # Add optional parameters
        if page_limit is not None:
            params['page_limit'] = page_limit
        if max_depth is not None:
            params['max_depth'] = max_depth
        if max_duration is not None:
            params['max_duration'] = max_duration

        # Path filtering
        if exclude_paths:
            params['exclude_paths'] = exclude_paths
        if include_only_paths:
            params['include_only_paths'] = include_only_paths

        # Advanced options
        if ignore_base_path_restriction:
            params['ignore_base_path_restriction'] = True
        if follow_external_links:
            params['follow_external_links'] = True
        if allowed_external_domains:
            params['allowed_external_domains'] = allowed_external_domains

        # Request configuration
        if headers:
            params['headers'] = headers
        if delay is not None:
            params['delay'] = delay
        if user_agent:
            params['user_agent'] = user_agent
        if max_concurrency is not None:
            params['max_concurrency'] = max_concurrency
        if rendering_delay is not None:
            params['rendering_delay'] = rendering_delay

        # Crawl strategy
        if use_sitemaps:
            params['use_sitemaps'] = True
        if respect_robots_txt:
            params['respect_robots_txt'] = True
        if ignore_no_follow:
            params['ignore_no_follow'] = True

        # Cache
        if cache:
            params['cache'] = True
        if cache_ttl is not None:
            params['cache_ttl'] = cache_ttl
        if cache_clear:
            params['cache_clear'] = True

        # Content extraction
        if content_formats:
            params['content_formats'] = content_formats
        if extraction_rules:
            params['extraction_rules'] = extraction_rules

        # Web scraping features
        if asp:
            params['asp'] = True
        if proxy_pool:
            params['proxy_pool'] = proxy_pool
        if country:
            params['country'] = country

        # Webhooks
        if webhook_name:
            params['webhook_name'] = webhook_name

        if webhook_events:
            assert all(
                event in self.ALL_WEBHOOK_EVENTS for event in webhook_events
            ), f"Invalid webhook events. Valid events are: {self.ALL_WEBHOOK_EVENTS}"
            
            params['webhook_events'] = webhook_events

        # Cost control
        if max_api_credit is not None:
            params['max_api_credit'] = max_api_credit

        self._params = params

    def to_api_params(self, key: Optional[str] = None) -> Dict:
        """
        Convert config to API parameters

        :param key: API key (optional, can be added by client)
        :return: Dictionary of API parameters
        """
        params = self._params.copy()
        if key:
            params['key'] = key
        return params

Configuration for Scrapfly Crawler API

The Crawler API performs recursive website crawling with advanced configuration, content extraction, and artifact storage.

Example

from scrapfly import ScrapflyClient, CrawlerConfig
client = ScrapflyClient(key='YOUR_API_KEY')
config = CrawlerConfig(
    url='https://example.com',
    page_limit=100,
    max_depth=3,
    content_formats=['markdown', 'html']
)

# Start crawl
start_response = client.start_crawl(config)
uuid = start_response.uuid

# Poll status
status = client.get_crawl_status(uuid)

# Get results when complete
if status.is_complete:
    artifact = client.get_crawl_artifact(uuid)
    pages = artifact.get_pages()

Initialize a CrawlerConfig

Args

url
Starting URL for the crawl (required)
page_limit
Maximum number of pages to crawl
max_depth
Maximum crawl depth from starting URL
max_duration
Maximum crawl duration in seconds
exclude_paths
List of path patterns to exclude (mutually exclusive with include_only_paths)
include_only_paths
List of path patterns to include only (mutually exclusive with exclude_paths)
ignore_base_path_restriction
Allow crawling outside the base path
follow_external_links
Follow links to external domains
allowed_external_domains
List of external domains allowed when follow_external_links is True
headers
Custom HTTP headers for requests
delay
Delay between requests in milliseconds
user_agent
Custom user agent string
max_concurrency
Maximum concurrent requests
rendering_delay
Delay for JavaScript rendering in milliseconds
use_sitemaps
Use sitemap.xml to discover URLs
respect_robots_txt
Respect robots.txt rules
ignore_no_follow
Ignore rel="nofollow" attributes
cache
Enable caching
cache_ttl
Cache time-to-live in seconds
cache_clear
Clear cache before crawling
content_formats
List of content formats to extract ('html', 'markdown', 'text', 'clean_html')
extraction_rules
Custom extraction rules
asp
Enable Anti-Scraping Protection bypass
proxy_pool
Proxy pool to use (e.g., 'public_residential_pool')
country
Target country for geo-located content
webhook_name
Webhook name for event notifications
webhook_events
List of webhook events to trigger
max_api_credit
Maximum API credits to spend on this crawl

Ancestors

Class variables

var ALL_WEBHOOK_EVENTS
var WEBHOOK_CRAWLER_CANCELLED
var WEBHOOK_CRAWLER_FINISHED
var WEBHOOK_CRAWLER_STARTED
var WEBHOOK_CRAWLER_STOPPED
var WEBHOOK_CRAWLER_URL_DISCOVERED
var WEBHOOK_CRAWLER_URL_FAILED
var WEBHOOK_CRAWLER_URL_SKIPPED
var WEBHOOK_CRAWLER_URL_VISITED

Methods

def to_api_params(self, key: str | None = None) ‑> Dict
Expand source code
def to_api_params(self, key: Optional[str] = None) -> Dict:
    """
    Convert config to API parameters

    :param key: API key (optional, can be added by client)
    :return: Dictionary of API parameters
    """
    params = self._params.copy()
    if key:
        params['key'] = key
    return params

Convert config to API parameters

:param key: API key (optional, can be added by client) :return: Dictionary of API parameters

class CrawlerError (message: str,
code: str,
http_status_code: int,
resource: str | None = None,
is_retryable: bool = False,
retry_delay: int | None = None,
retry_times: int | None = None,
documentation_url: str | None = None,
api_response: ForwardRef('ApiResponse') | None = None)
Expand source code
class CrawlerError(ScrapflyError):
    """Base exception for Crawler API errors"""
    pass

Base exception for Crawler API errors

Ancestors

Subclasses

class CrawlerStartResponse (response_data: Dict[str, Any])
Expand source code
class CrawlerStartResponse:
    """
    Response from starting a crawler job

    Returned by ScrapflyClient.start_crawl() method.

    Attributes:
        uuid: Unique identifier for the crawler job
        status: Initial status (typically 'PENDING')
    """

    def __init__(self, response_data: Dict[str, Any]):
        """
        Initialize from API response

        Args:
            response_data: Raw API response dictionary
        """
        self._data = response_data
        # API returns 'crawler_uuid' not 'uuid'
        self.uuid = response_data.get('crawler_uuid') or response_data.get('uuid')
        self.status = response_data.get('status')

    def __repr__(self):
        return f"CrawlerStartResponse(uuid={self.uuid}, status={self.status})"

Response from starting a crawler job

Returned by ScrapflyClient.start_crawl() method.

Attributes

uuid
Unique identifier for the crawler job
status
Initial status (typically 'PENDING')

Initialize from API response

Args

response_data
Raw API response dictionary
class CrawlerStatusResponse (response_data: Dict[str, Any])
Expand source code
class CrawlerStatusResponse:
    """
    Response from checking crawler job status

    Returned by ScrapflyClient.get_crawl_status() method.

    Provides real-time progress tracking for crawler jobs.

    Attributes:
        uuid: Crawler job UUID
        status: Current status (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED, DONE)
        is_success: Whether the crawler job completed successfully
        is_finished: Whether the crawler job has finished (regardless of success/failure)
        api_credit_cost: Total API credits consumed by this crawl
        stop_reason: Reason why the crawler stopped (e.g., 'seed_url_failed', 'page_limit_reached'), None if still running
        urls_discovered: Total URLs discovered so far
        urls_crawled: Number of URLs successfully crawled
        urls_pending: Number of URLs waiting to be crawled
        urls_failed: Number of URLs that failed to crawl
    """

    # Status constants
    STATUS_PENDING = 'PENDING'
    STATUS_RUNNING = 'RUNNING'
    STATUS_COMPLETED = 'COMPLETED'
    STATUS_DONE = 'DONE'
    STATUS_FAILED = 'FAILED'
    STATUS_CANCELLED = 'CANCELLED'

    def __init__(self, response_data: Dict[str, Any]):
        """
        Initialize from API response

        Args:
            response_data: Raw API response dictionary
        """
        self._data = response_data
        # API returns crawler_uuid in status response
        self.uuid = response_data.get('crawler_uuid') or response_data.get('uuid')
        self.status = response_data.get('status')

        # New fields from API
        self.is_success = response_data.get('is_success', False)
        self.is_finished = response_data.get('is_finished', False)

        # Parse state dict if present (actual API format)
        state = response_data.get('state', {})
        if state:
            # Actual API response structure
            self.urls_discovered = state.get('urls_extracted', 0)
            self.urls_crawled = state.get('urls_visited', 0)
            self.urls_pending = state.get('urls_to_crawl', 0)
            self.urls_failed = state.get('urls_failed', 0)
            self.stop_reason = state.get('stop_reason')
            # API credit cost is in the state dict as 'api_credit_used'
            self.api_credit_cost = state.get('api_credit_used', 0)
        else:
            # Fallback for simpler format (if docs change)
            self.urls_discovered = response_data.get('urls_discovered', 0)
            self.urls_crawled = response_data.get('urls_crawled', 0)
            self.urls_pending = response_data.get('urls_pending', 0)
            self.urls_failed = response_data.get('urls_failed', 0)
            self.stop_reason = None
            self.api_credit_cost = response_data.get('api_credit_cost', 0)

    @property
    def is_complete(self) -> bool:
        """Check if crawler job is complete"""
        return self.status in (self.STATUS_COMPLETED, self.STATUS_DONE)

    @property
    def is_running(self) -> bool:
        """Check if crawler job is currently running"""
        return self.status in (self.STATUS_PENDING, self.STATUS_RUNNING)

    @property
    def is_failed(self) -> bool:
        """Check if crawler job failed"""
        return self.status == self.STATUS_FAILED

    @property
    def is_cancelled(self) -> bool:
        """Check if crawler job was cancelled"""
        return self.status == self.STATUS_CANCELLED

    @property
    def progress_pct(self) -> float:
        """
        Calculate progress percentage

        Returns:
            Progress as percentage (0-100)
        """
        if self.urls_discovered == 0:
            return 0.0
        return (self.urls_crawled / self.urls_discovered) * 100

    def __repr__(self):
        return (f"CrawlerStatusResponse(uuid={self.uuid}, status={self.status}, "
                f"progress={self.progress_pct:.1f}%, "
                f"crawled={self.urls_crawled}/{self.urls_discovered})")

Response from checking crawler job status

Returned by ScrapflyClient.get_crawl_status() method.

Provides real-time progress tracking for crawler jobs.

Attributes

uuid
Crawler job UUID
status
Current status (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED, DONE)
is_success
Whether the crawler job completed successfully
is_finished
Whether the crawler job has finished (regardless of success/failure)
api_credit_cost
Total API credits consumed by this crawl
stop_reason
Reason why the crawler stopped (e.g., 'seed_url_failed', 'page_limit_reached'), None if still running
urls_discovered
Total URLs discovered so far
urls_crawled
Number of URLs successfully crawled
urls_pending
Number of URLs waiting to be crawled
urls_failed
Number of URLs that failed to crawl

Initialize from API response

Args

response_data
Raw API response dictionary

Class variables

var STATUS_CANCELLED
var STATUS_COMPLETED
var STATUS_DONE
var STATUS_FAILED
var STATUS_PENDING
var STATUS_RUNNING

Instance variables

prop is_cancelled : bool
Expand source code
@property
def is_cancelled(self) -> bool:
    """Check if crawler job was cancelled"""
    return self.status == self.STATUS_CANCELLED

Check if crawler job was cancelled

prop is_complete : bool
Expand source code
@property
def is_complete(self) -> bool:
    """Check if crawler job is complete"""
    return self.status in (self.STATUS_COMPLETED, self.STATUS_DONE)

Check if crawler job is complete

prop is_failed : bool
Expand source code
@property
def is_failed(self) -> bool:
    """Check if crawler job failed"""
    return self.status == self.STATUS_FAILED

Check if crawler job failed

prop is_running : bool
Expand source code
@property
def is_running(self) -> bool:
    """Check if crawler job is currently running"""
    return self.status in (self.STATUS_PENDING, self.STATUS_RUNNING)

Check if crawler job is currently running

prop progress_pct : float
Expand source code
@property
def progress_pct(self) -> float:
    """
    Calculate progress percentage

    Returns:
        Progress as percentage (0-100)
    """
    if self.urls_discovered == 0:
        return 0.0
    return (self.urls_crawled / self.urls_discovered) * 100

Calculate progress percentage

Returns

Progress as percentage (0-100)

class CrawlerWebhookBase (event: str, uuid: str, timestamp: datetime.datetime)
Expand source code
@dataclass
class CrawlerWebhookBase:
    """
    Base class for all crawler webhook payloads.

    All webhook events share these common fields:
    - event: The event type (crawl.started, crawl.url_discovered, etc.)
    - uuid: The crawler job UUID
    - timestamp: When the event occurred (ISO 8601 format)
    """
    event: str
    uuid: str
    timestamp: datetime

    @classmethod
    def from_dict(cls, data: Dict) -> 'CrawlerWebhookBase':
        """Create webhook instance from dictionary payload"""
        # Parse timestamp if it's a string
        timestamp = data.get('timestamp')
        if isinstance(timestamp, str):
            # Handle ISO 8601 format
            if timestamp.endswith('Z'):
                timestamp = timestamp[:-1] + '+00:00'
            timestamp = datetime.fromisoformat(timestamp)

        return cls(
            event=data['event'],
            uuid=data['uuid'],
            timestamp=timestamp
        )

Base class for all crawler webhook payloads.

All webhook events share these common fields: - event: The event type (crawl.started, crawl.url_discovered, etc.) - uuid: The crawler job UUID - timestamp: When the event occurred (ISO 8601 format)

Subclasses

Static methods

def from_dict(data: Dict) ‑> CrawlerWebhookBase

Create webhook instance from dictionary payload

Instance variables

var event : str
var timestamp : datetime.datetime
var uuid : str
class CrawlerWebhookEvent (value, names=None, *, module=None, qualname=None, type=None, start=1)
Expand source code
class CrawlerWebhookEvent(Enum):
    """Crawler webhook event types"""
    STARTED = 'crawl.started'
    URL_DISCOVERED = 'crawl.url_discovered'
    URL_FAILED = 'crawl.url_failed'
    COMPLETED = 'crawl.completed'

Crawler webhook event types

Ancestors

  • enum.Enum

Class variables

var COMPLETED
var STARTED
var URL_DISCOVERED
var URL_FAILED
class EncoderError (content: str)
Expand source code
class EncoderError(BaseException):

    def __init__(self, content:str):
        self.content = content
        super().__init__()

    def __str__(self) -> str:
        return self.content

    def __repr__(self):
        return "Invalid payload: %s" % self.content

Common base class for all exceptions

Ancestors

  • builtins.BaseException
class ErrorFactory
Expand source code
class ErrorFactory:
    RESOURCE_TO_ERROR = {
        ScrapflyError.RESOURCE_SCRAPE: ScrapflyScrapeError,
        ScrapflyError.RESOURCE_WEBHOOK: ScrapflyWebhookError,
        ScrapflyError.RESOURCE_PROXY: ScrapflyProxyError,
        ScrapflyError.RESOURCE_SCHEDULE: ScrapflyScheduleError,
        ScrapflyError.RESOURCE_ASP: ScrapflyAspError,
        ScrapflyError.RESOURCE_SESSION: ScrapflySessionError
    }

    # Notable http error has own class for more convenience
    # Only applicable for generic API error
    HTTP_STATUS_TO_ERROR = {
        401: BadApiKeyError,
        402: PaymentRequired,
        429: TooManyRequest
    }

    @staticmethod
    def _get_resource(code: str) -> Optional[Tuple[str, str]]:

        if isinstance(code, str) and '::' in code:
            _, resource, _ = code.split('::')
            return resource

        return None

    @staticmethod
    def create(api_response: 'ScrapeApiResponse'):
        is_retryable = False
        kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR
        http_code = api_response.status_code
        retry_delay = 5
        retry_times = 3
        description = None
        error_url = 'https://scrapfly.io/docs/scrape-api/errors#api'
        code = api_response.error['code']

        if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
            http_code = api_response.scrape_result['status_code']

        if 'description' in api_response.error:
            description = api_response.error['description']

        message = '%s %s %s' % (str(http_code), code, api_response.error['message'])

        if 'doc_url' in api_response.error:
            error_url = api_response.error['doc_url']

        if 'retryable' in api_response.error:
            is_retryable = api_response.error['retryable']

        resource = ErrorFactory._get_resource(code=code)

        if is_retryable is True:
            if 'X-Retry' in api_response.headers:
                retry_delay = int(api_response.headers['Retry-After'])

        message = '%s: %s' % (message, description) if description else message

        if retry_delay is not None and is_retryable is True:
            message = '%s. Retry delay : %s seconds' % (message, str(retry_delay))

        args = {
            'message': message,
            'code': code,
            'http_status_code': http_code,
            'is_retryable': is_retryable,
            'api_response': api_response,
            'resource': resource,
            'retry_delay': retry_delay,
            'retry_times': retry_times,
            'documentation_url': error_url,
            'request': api_response.request,
            'response': api_response.response
        }

        if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE:
            if http_code >= 500:
                return ApiHttpServerError(**args)

            is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR

            if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error:
                return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args)

            if is_scraper_api_error:
                return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

            return ApiHttpClientError(**args)

        elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR:
            if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
                if http_code >= 500:
                    return UpstreamHttpServerError(**args)

                if http_code >= 400:
                    return UpstreamHttpClientError(**args)

            if resource in ErrorFactory.RESOURCE_TO_ERROR:
                return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

            return ScrapflyError(**args)

Class variables

var HTTP_STATUS_TO_ERROR
var RESOURCE_TO_ERROR

Static methods

def create(api_response: ScrapeApiResponse)
Expand source code
@staticmethod
def create(api_response: 'ScrapeApiResponse'):
    is_retryable = False
    kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR
    http_code = api_response.status_code
    retry_delay = 5
    retry_times = 3
    description = None
    error_url = 'https://scrapfly.io/docs/scrape-api/errors#api'
    code = api_response.error['code']

    if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
        http_code = api_response.scrape_result['status_code']

    if 'description' in api_response.error:
        description = api_response.error['description']

    message = '%s %s %s' % (str(http_code), code, api_response.error['message'])

    if 'doc_url' in api_response.error:
        error_url = api_response.error['doc_url']

    if 'retryable' in api_response.error:
        is_retryable = api_response.error['retryable']

    resource = ErrorFactory._get_resource(code=code)

    if is_retryable is True:
        if 'X-Retry' in api_response.headers:
            retry_delay = int(api_response.headers['Retry-After'])

    message = '%s: %s' % (message, description) if description else message

    if retry_delay is not None and is_retryable is True:
        message = '%s. Retry delay : %s seconds' % (message, str(retry_delay))

    args = {
        'message': message,
        'code': code,
        'http_status_code': http_code,
        'is_retryable': is_retryable,
        'api_response': api_response,
        'resource': resource,
        'retry_delay': retry_delay,
        'retry_times': retry_times,
        'documentation_url': error_url,
        'request': api_response.request,
        'response': api_response.response
    }

    if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE:
        if http_code >= 500:
            return ApiHttpServerError(**args)

        is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR

        if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error:
            return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args)

        if is_scraper_api_error:
            return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

        return ApiHttpClientError(**args)

    elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR:
        if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE':
            if http_code >= 500:
                return UpstreamHttpServerError(**args)

            if http_code >= 400:
                return UpstreamHttpClientError(**args)

        if resource in ErrorFactory.RESOURCE_TO_ERROR:
            return ErrorFactory.RESOURCE_TO_ERROR[resource](**args)

        return ScrapflyError(**args)
class ExtractionAPIError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ExtractionAPIError(HttpError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ExtractionApiResponse (request: requests.models.Request,
response: requests.models.Response,
extraction_config: ExtractionConfig,
api_result: bytes | None = None)
Expand source code
class ExtractionApiResponse(ApiResponse):
    def __init__(self, request: Request, response: Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None):
        super().__init__(request, response)
        self.extraction_config = extraction_config
        self.result = self.handle_api_result(api_result)

    @property
    def extraction_result(self) -> Optional[Dict]:
        extraction_result = self.result.get('result', None)
        if not extraction_result:  # handle empty extraction responses
            return {'data': None, 'content_type': None}
        else:
            return extraction_result

    @property
    def data(self) -> Union[Dict, List, str]:  # depends on the LLM prompt
        if self.error is None:
            return self.extraction_result['data']

        return None

    @property
    def content_type(self) -> Optional[str]:
        if self.error is None:
            return self.extraction_result['content_type']

        return None

    @property
    def extraction_success(self) -> bool:
        extraction_result = self.extraction_result
        if extraction_result is None or extraction_result['data'] is None:
            return False

        return True

    @property
    def error(self) -> Optional[Dict]:
        if self.extraction_result is None:
            return self.result

        return None

    def _is_api_error(self, api_result: Dict) -> bool:
        if api_result is None:
            return True

        return 'error_id' in api_result

    def handle_api_result(self, api_result: bytes) -> FrozenDict:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        return FrozenDict({'result': api_result})

    def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError):
        super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Ancestors

Instance variables

prop content_type : str | None
Expand source code
@property
def content_type(self) -> Optional[str]:
    if self.error is None:
        return self.extraction_result['content_type']

    return None
prop data : Dict | List | str
Expand source code
@property
def data(self) -> Union[Dict, List, str]:  # depends on the LLM prompt
    if self.error is None:
        return self.extraction_result['data']

    return None
prop error : Dict | None
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.extraction_result is None:
        return self.result

    return None
prop extraction_result : Dict | None
Expand source code
@property
def extraction_result(self) -> Optional[Dict]:
    extraction_result = self.result.get('result', None)
    if not extraction_result:  # handle empty extraction responses
        return {'data': None, 'content_type': None}
    else:
        return extraction_result
prop extraction_success : bool
Expand source code
@property
def extraction_success(self) -> bool:
    extraction_result = self.extraction_result
    if extraction_result is None or extraction_result['data'] is None:
        return False

    return True

Methods

def handle_api_result(self, api_result: bytes) ‑> FrozenDict
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict:
    if self._is_api_error(api_result=api_result) is True:
        return FrozenDict(api_result)

    return FrozenDict({'result': api_result})
def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ExtractionAPIError)
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError):
    super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Inherited members

class ExtractionConfig (body: str | bytes,
content_type: str,
url: str | None = None,
charset: str | None = None,
extraction_template: str | None = None,
extraction_ephemeral_template: Dict | None = None,
extraction_prompt: str | None = None,
extraction_model: str | None = None,
is_document_compressed: bool | None = None,
document_compression_format: CompressionFormat | None = None,
webhook: str | None = None,
raise_on_upstream_error: bool = True,
template: str | None = None,
ephemeral_template: Dict | None = None)
Expand source code
class ExtractionConfig(BaseApiConfig):
    body: Union[str, bytes]
    content_type: str
    url: Optional[str] = None
    charset: Optional[str] = None
    extraction_template: Optional[str] = None  # a saved template name
    extraction_ephemeral_template: Optional[Dict]  # ephemeraly declared json template
    extraction_prompt: Optional[str] = None
    extraction_model: Optional[str] = None
    is_document_compressed: Optional[bool] = None
    document_compression_format: Optional[CompressionFormat] = None
    webhook: Optional[str] = None
    raise_on_upstream_error: bool = True

    # deprecated options
    template: Optional[str] = None
    ephemeral_template: Optional[Dict] = None

    def __init__(
        self,
        body: Union[str, bytes],
        content_type: str,
        url: Optional[str] = None,
        charset: Optional[str] = None,
        extraction_template: Optional[str] = None,  # a saved template name
        extraction_ephemeral_template: Optional[Dict] = None,  # ephemeraly declared json template
        extraction_prompt: Optional[str] = None,
        extraction_model: Optional[str] = None,
        is_document_compressed: Optional[bool] = None,
        document_compression_format: Optional[CompressionFormat] = None,
        webhook: Optional[str] = None,
        raise_on_upstream_error: bool = True,

        # deprecated options
        template: Optional[str] = None,
        ephemeral_template: Optional[Dict] = None     
    ):
        if template:
            warnings.warn(
                "Deprecation warning: 'template' is deprecated. Use 'extraction_template' instead."
            )
            extraction_template = template

        if ephemeral_template:
            warnings.warn(
                "Deprecation warning: 'ephemeral_template' is deprecated. Use 'extraction_ephemeral_template' instead."
            )
            extraction_ephemeral_template = ephemeral_template

        self.key = None
        self.body = body
        self.content_type = content_type
        self.url = url
        self.charset = charset
        self.extraction_template = extraction_template
        self.extraction_ephemeral_template = extraction_ephemeral_template
        self.extraction_prompt = extraction_prompt
        self.extraction_model = extraction_model
        self.is_document_compressed = is_document_compressed
        self.document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None
        self.webhook = webhook
        self.raise_on_upstream_error = raise_on_upstream_error

        if isinstance(body, bytes) or document_compression_format:
            compression_format = detect_compression_format(body)

            if compression_format is not None:
                self.is_document_compressed = True

                if self.document_compression_format and compression_format != self.document_compression_format:
                    raise ExtractionConfigError(
                        f'The detected compression format `{compression_format}` does not match declared format `{self.document_compression_format}`. '
                        f'You must pass the compression format or disable compression.'
                    )
                
                self.document_compression_format = compression_format
            
            else:
                self.is_document_compressed = False

            if self.is_document_compressed is False:
                compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None
                
                if isinstance(self.body, str) and compression_foramt:
                    self.body = self.body.encode('utf-8')

                if compression_foramt == CompressionFormat.GZIP:
                    import gzip
                    self.body = gzip.compress(self.body)

                elif compression_foramt == CompressionFormat.ZSTD:
                    try:
                        import zstandard as zstd
                    except ImportError:
                        raise ExtractionConfigError(
                            f'zstandard is not installed. You must run pip install zstandard'
                            f' to auto compress into zstd or use compression formats.'
                        )
                    self.body = zstd.compress(self.body)
                
                elif compression_foramt == CompressionFormat.DEFLATE:
                    import zlib
                    compressor = zlib.compressobj(wbits=-zlib.MAX_WBITS) # raw deflate compression
                    self.body = compressor.compress(self.body) + compressor.flush()

    def to_api_params(self, key: str) -> Dict:
        params = {
            'key': self.key or key,
            'content_type': self.content_type
        }

        if self.url:
            params['url'] = self.url

        if self.charset:
            params['charset'] = self.charset

        if self.extraction_template and self.extraction_ephemeral_template:
            raise ExtractionConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

        if self.extraction_template:
            params['extraction_template'] = self.extraction_template

        if self.extraction_ephemeral_template:
            self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
            params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

        if self.extraction_prompt:
            params['extraction_prompt'] = quote_plus(self.extraction_prompt)

        if self.extraction_model:
            params['extraction_model'] = self.extraction_model

        if self.webhook:
            params['webhook_name'] = self.webhook

        return params

    def to_dict(self) -> Dict:
        """
        Export the ExtractionConfig instance to a plain dictionary.
        """

        if self.is_document_compressed is True:
                compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None

                if compression_foramt == CompressionFormat.GZIP:
                    import gzip
                    self.body = gzip.decompress(self.body)
                    
                elif compression_foramt == CompressionFormat.ZSTD:
                    import zstandard as zstd
                    self.body = zstd.decompress(self.body)

                elif compression_foramt == CompressionFormat.DEFLATE:
                    import zlib
                    decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
                    self.body = decompressor.decompress(self.body) + decompressor.flush()

                if isinstance(self.body, bytes):
                    self.body = self.body.decode('utf-8')
                    self.is_document_compressed = False

        return {
            'body': self.body,
            'content_type': self.content_type,
            'url': self.url,
            'charset': self.charset,
            'extraction_template': self.extraction_template,
            'extraction_ephemeral_template': self.extraction_ephemeral_template,
            'extraction_prompt': self.extraction_prompt,
            'extraction_model': self.extraction_model,
            'is_document_compressed': self.is_document_compressed,
            'document_compression_format': CompressionFormat(self.document_compression_format).value if self.document_compression_format else None,
            'webhook': self.webhook,
            'raise_on_upstream_error': self.raise_on_upstream_error,
        }
    
    @staticmethod
    def from_dict(extraction_config_dict: Dict) -> 'ExtractionConfig':
        """Create an ExtractionConfig instance from a dictionary."""
        body = extraction_config_dict.get('body', None)
        content_type = extraction_config_dict.get('content_type', None)
        url = extraction_config_dict.get('url', None)
        charset = extraction_config_dict.get('charset', None)
        extraction_template = extraction_config_dict.get('extraction_template', None)
        extraction_ephemeral_template = extraction_config_dict.get('extraction_ephemeral_template', None)
        extraction_prompt = extraction_config_dict.get('extraction_prompt', None)
        extraction_model = extraction_config_dict.get('extraction_model', None)
        is_document_compressed = extraction_config_dict.get('is_document_compressed', None)

        document_compression_format = extraction_config_dict.get('document_compression_format', None)
        document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None
        
        webhook = extraction_config_dict.get('webhook', None)
        raise_on_upstream_error = extraction_config_dict.get('raise_on_upstream_error', True)

        return ExtractionConfig(
            body=body,
            content_type=content_type,
            url=url,
            charset=charset,
            extraction_template=extraction_template,
            extraction_ephemeral_template=extraction_ephemeral_template,
            extraction_prompt=extraction_prompt,
            extraction_model=extraction_model,
            is_document_compressed=is_document_compressed,
            document_compression_format=document_compression_format,
            webhook=webhook,
            raise_on_upstream_error=raise_on_upstream_error
        )

Ancestors

Class variables

var body : str | bytes
var charset : str | None
var content_type : str
var document_compression_formatCompressionFormat | None
var ephemeral_template : Dict | None
var extraction_ephemeral_template : Dict | None
var extraction_model : str | None
var extraction_prompt : str | None
var extraction_template : str | None
var is_document_compressed : bool | None
var raise_on_upstream_error : bool
var template : str | None
var url : str | None
var webhook : str | None

Static methods

def from_dict(extraction_config_dict: Dict) ‑> ExtractionConfig
Expand source code
@staticmethod
def from_dict(extraction_config_dict: Dict) -> 'ExtractionConfig':
    """Create an ExtractionConfig instance from a dictionary."""
    body = extraction_config_dict.get('body', None)
    content_type = extraction_config_dict.get('content_type', None)
    url = extraction_config_dict.get('url', None)
    charset = extraction_config_dict.get('charset', None)
    extraction_template = extraction_config_dict.get('extraction_template', None)
    extraction_ephemeral_template = extraction_config_dict.get('extraction_ephemeral_template', None)
    extraction_prompt = extraction_config_dict.get('extraction_prompt', None)
    extraction_model = extraction_config_dict.get('extraction_model', None)
    is_document_compressed = extraction_config_dict.get('is_document_compressed', None)

    document_compression_format = extraction_config_dict.get('document_compression_format', None)
    document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None
    
    webhook = extraction_config_dict.get('webhook', None)
    raise_on_upstream_error = extraction_config_dict.get('raise_on_upstream_error', True)

    return ExtractionConfig(
        body=body,
        content_type=content_type,
        url=url,
        charset=charset,
        extraction_template=extraction_template,
        extraction_ephemeral_template=extraction_ephemeral_template,
        extraction_prompt=extraction_prompt,
        extraction_model=extraction_model,
        is_document_compressed=is_document_compressed,
        document_compression_format=document_compression_format,
        webhook=webhook,
        raise_on_upstream_error=raise_on_upstream_error
    )

Create an ExtractionConfig instance from a dictionary.

Methods

def to_api_params(self, key: str) ‑> Dict
Expand source code
def to_api_params(self, key: str) -> Dict:
    params = {
        'key': self.key or key,
        'content_type': self.content_type
    }

    if self.url:
        params['url'] = self.url

    if self.charset:
        params['charset'] = self.charset

    if self.extraction_template and self.extraction_ephemeral_template:
        raise ExtractionConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

    if self.extraction_template:
        params['extraction_template'] = self.extraction_template

    if self.extraction_ephemeral_template:
        self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
        params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

    if self.extraction_prompt:
        params['extraction_prompt'] = quote_plus(self.extraction_prompt)

    if self.extraction_model:
        params['extraction_model'] = self.extraction_model

    if self.webhook:
        params['webhook_name'] = self.webhook

    return params
def to_dict(self) ‑> Dict
Expand source code
def to_dict(self) -> Dict:
    """
    Export the ExtractionConfig instance to a plain dictionary.
    """

    if self.is_document_compressed is True:
            compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None

            if compression_foramt == CompressionFormat.GZIP:
                import gzip
                self.body = gzip.decompress(self.body)
                
            elif compression_foramt == CompressionFormat.ZSTD:
                import zstandard as zstd
                self.body = zstd.decompress(self.body)

            elif compression_foramt == CompressionFormat.DEFLATE:
                import zlib
                decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
                self.body = decompressor.decompress(self.body) + decompressor.flush()

            if isinstance(self.body, bytes):
                self.body = self.body.decode('utf-8')
                self.is_document_compressed = False

    return {
        'body': self.body,
        'content_type': self.content_type,
        'url': self.url,
        'charset': self.charset,
        'extraction_template': self.extraction_template,
        'extraction_ephemeral_template': self.extraction_ephemeral_template,
        'extraction_prompt': self.extraction_prompt,
        'extraction_model': self.extraction_model,
        'is_document_compressed': self.is_document_compressed,
        'document_compression_format': CompressionFormat(self.document_compression_format).value if self.document_compression_format else None,
        'webhook': self.webhook,
        'raise_on_upstream_error': self.raise_on_upstream_error,
    }

Export the ExtractionConfig instance to a plain dictionary.

class HarArchive (har_data: bytes)
Expand source code
class HarArchive:
    """Parser and accessor for HAR (HTTP Archive) format data"""

    def __init__(self, har_data: bytes):
        """
        Initialize HAR archive from bytes

        Args:
            har_data: HAR file content as bytes (JSON format, may be gzipped)
        """
        # Decompress if gzipped
        if isinstance(har_data, bytes):
            if har_data[:2] == b'\x1f\x8b':  # gzip magic number
                har_data = gzip.decompress(har_data)
            har_data = har_data.decode('utf-8')

        # Parse the special format: {"log":{...,"entries":[]}}{"entry1"}{"entry2"}...
        # First object is HAR log structure, subsequent objects are individual entries
        objects = []
        decoder = json.JSONDecoder()
        idx = 0
        while idx < len(har_data):
            har_data_stripped = har_data[idx:].lstrip()
            if not har_data_stripped:
                break
            try:
                obj, end_idx = decoder.raw_decode(har_data_stripped)
                objects.append(obj)
                idx += len(har_data[idx:]) - len(har_data_stripped) + end_idx
            except json.JSONDecodeError:
                break

        # First object should be the HAR log structure
        if objects and 'log' in objects[0]:
            self._data = objects[0]
            self._log = self._data.get('log', {})
            # Remaining objects are the entries
            self._entries = objects[1:] if len(objects) > 1 else []
        else:
            # Fallback: standard HAR format
            self._data = json.loads(har_data) if isinstance(har_data, str) else {}
            self._log = self._data.get('log', {})
            self._entries = self._log.get('entries', [])

    @property
    def version(self) -> str:
        """Get HAR version"""
        return self._log.get('version', '')

    @property
    def creator(self) -> Dict[str, Any]:
        """Get creator information"""
        return self._log.get('creator', {})

    @property
    def pages(self) -> List[Dict[str, Any]]:
        """Get pages list"""
        return self._log.get('pages', [])

    def get_entries(self) -> List[HarEntry]:
        """
        Get all entries as list

        Returns:
            List of HarEntry objects
        """
        return [HarEntry(entry) for entry in self._entries]

    def iter_entries(self) -> Iterator[HarEntry]:
        """
        Iterate through all HAR entries

        Yields:
            HarEntry objects
        """
        for entry in self._entries:
            yield HarEntry(entry)

    def get_urls(self) -> List[str]:
        """
        Get all URLs in the archive

        Returns:
            List of unique URLs
        """
        urls = []
        for entry in self._entries:
            url = entry.get('request', {}).get('url', '')
            if url and url not in urls:
                urls.append(url)
        return urls

    def find_by_url(self, url: str) -> Optional[HarEntry]:
        """
        Find entry by exact URL match

        Args:
            url: URL to search for

        Returns:
            First matching HarEntry or None
        """
        for entry in self.iter_entries():
            if entry.url == url:
                return entry
        return None

    def filter_by_status(self, status_code: int) -> List[HarEntry]:
        """
        Filter entries by status code

        Args:
            status_code: HTTP status code to filter by

        Returns:
            List of matching HarEntry objects
        """
        return [entry for entry in self.iter_entries()
                if entry.status_code == status_code]

    def filter_by_content_type(self, content_type: str) -> List[HarEntry]:
        """
        Filter entries by content type (substring match)

        Args:
            content_type: Content type to filter by (e.g., 'text/html')

        Returns:
            List of matching HarEntry objects
        """
        return [entry for entry in self.iter_entries()
                if content_type.lower() in entry.content_type.lower()]

    def __len__(self) -> int:
        """Get number of entries"""
        return len(self._entries)

    def __repr__(self) -> str:
        return f"<HarArchive {len(self._entries)} entries>"

Parser and accessor for HAR (HTTP Archive) format data

Initialize HAR archive from bytes

Args

har_data
HAR file content as bytes (JSON format, may be gzipped)

Instance variables

prop creator : Dict[str, Any]
Expand source code
@property
def creator(self) -> Dict[str, Any]:
    """Get creator information"""
    return self._log.get('creator', {})

Get creator information

prop pages : List[Dict[str, Any]]
Expand source code
@property
def pages(self) -> List[Dict[str, Any]]:
    """Get pages list"""
    return self._log.get('pages', [])

Get pages list

prop version : str
Expand source code
@property
def version(self) -> str:
    """Get HAR version"""
    return self._log.get('version', '')

Get HAR version

Methods

def filter_by_content_type(self, content_type: str) ‑> List[HarEntry]
Expand source code
def filter_by_content_type(self, content_type: str) -> List[HarEntry]:
    """
    Filter entries by content type (substring match)

    Args:
        content_type: Content type to filter by (e.g., 'text/html')

    Returns:
        List of matching HarEntry objects
    """
    return [entry for entry in self.iter_entries()
            if content_type.lower() in entry.content_type.lower()]

Filter entries by content type (substring match)

Args

content_type
Content type to filter by (e.g., 'text/html')

Returns

List of matching HarEntry objects

def filter_by_status(self, status_code: int) ‑> List[HarEntry]
Expand source code
def filter_by_status(self, status_code: int) -> List[HarEntry]:
    """
    Filter entries by status code

    Args:
        status_code: HTTP status code to filter by

    Returns:
        List of matching HarEntry objects
    """
    return [entry for entry in self.iter_entries()
            if entry.status_code == status_code]

Filter entries by status code

Args

status_code
HTTP status code to filter by

Returns

List of matching HarEntry objects

def find_by_url(self, url: str) ‑> HarEntry | None
Expand source code
def find_by_url(self, url: str) -> Optional[HarEntry]:
    """
    Find entry by exact URL match

    Args:
        url: URL to search for

    Returns:
        First matching HarEntry or None
    """
    for entry in self.iter_entries():
        if entry.url == url:
            return entry
    return None

Find entry by exact URL match

Args

url
URL to search for

Returns

First matching HarEntry or None

def get_entries(self) ‑> List[HarEntry]
Expand source code
def get_entries(self) -> List[HarEntry]:
    """
    Get all entries as list

    Returns:
        List of HarEntry objects
    """
    return [HarEntry(entry) for entry in self._entries]

Get all entries as list

Returns

List of HarEntry objects

def get_urls(self) ‑> List[str]
Expand source code
def get_urls(self) -> List[str]:
    """
    Get all URLs in the archive

    Returns:
        List of unique URLs
    """
    urls = []
    for entry in self._entries:
        url = entry.get('request', {}).get('url', '')
        if url and url not in urls:
            urls.append(url)
    return urls

Get all URLs in the archive

Returns

List of unique URLs

def iter_entries(self) ‑> Iterator[HarEntry]
Expand source code
def iter_entries(self) -> Iterator[HarEntry]:
    """
    Iterate through all HAR entries

    Yields:
        HarEntry objects
    """
    for entry in self._entries:
        yield HarEntry(entry)

Iterate through all HAR entries

Yields

HarEntry objects

class HarEntry (entry_data: Dict[str, Any])
Expand source code
class HarEntry:
    """Represents a single HAR entry (HTTP request/response pair)"""

    def __init__(self, entry_data: Dict[str, Any]):
        """
        Initialize from HAR entry dict

        Args:
            entry_data: HAR entry dictionary
        """
        self._data = entry_data
        self._request = entry_data.get('request', {})
        self._response = entry_data.get('response', {})

    @property
    def url(self) -> str:
        """Get request URL"""
        return self._request.get('url', '')

    @property
    def method(self) -> str:
        """Get HTTP method"""
        return self._request.get('method', 'GET')

    @property
    def status_code(self) -> int:
        """Get response status code"""
        # Handle case where response doesn't exist or status is missing
        if not self._response:
            return 0
        status = self._response.get('status')
        if status is None:
            return 0
        # Ensure it's an int (HAR data might have status as string)
        try:
            return int(status)
        except (ValueError, TypeError):
            return 0

    @property
    def status_text(self) -> str:
        """Get response status text"""
        return self._response.get('statusText', '')

    @property
    def request_headers(self) -> Dict[str, str]:
        """Get request headers as dict"""
        headers = {}
        for header in self._request.get('headers', []):
            headers[header['name']] = header['value']
        return headers

    @property
    def response_headers(self) -> Dict[str, str]:
        """Get response headers as dict"""
        headers = {}
        for header in self._response.get('headers', []):
            headers[header['name']] = header['value']
        return headers

    @property
    def content(self) -> bytes:
        """Get response content as bytes"""
        content_data = self._response.get('content', {})
        text = content_data.get('text', '')

        # Handle base64 encoding if present
        encoding = content_data.get('encoding', '')
        if encoding == 'base64':
            import base64
            return base64.b64decode(text)

        # Return as UTF-8 bytes
        if isinstance(text, str):
            return text.encode('utf-8')
        return text

    @property
    def content_type(self) -> str:
        """Get response content type"""
        return self._response.get('content', {}).get('mimeType', '')

    @property
    def content_size(self) -> int:
        """Get response content size"""
        return self._response.get('content', {}).get('size', 0)

    @property
    def started_datetime(self) -> str:
        """Get when request was started (ISO 8601 format)"""
        return self._data.get('startedDateTime', '')

    @property
    def time(self) -> float:
        """Get total elapsed time in milliseconds"""
        return self._data.get('time', 0.0)

    @property
    def timings(self) -> Dict[str, float]:
        """Get detailed timing information"""
        return self._data.get('timings', {})

    def __repr__(self) -> str:
        return f"<HarEntry {self.method} {self.url} [{self.status_code}]>"

Represents a single HAR entry (HTTP request/response pair)

Initialize from HAR entry dict

Args

entry_data
HAR entry dictionary

Instance variables

prop content : bytes
Expand source code
@property
def content(self) -> bytes:
    """Get response content as bytes"""
    content_data = self._response.get('content', {})
    text = content_data.get('text', '')

    # Handle base64 encoding if present
    encoding = content_data.get('encoding', '')
    if encoding == 'base64':
        import base64
        return base64.b64decode(text)

    # Return as UTF-8 bytes
    if isinstance(text, str):
        return text.encode('utf-8')
    return text

Get response content as bytes

prop content_size : int
Expand source code
@property
def content_size(self) -> int:
    """Get response content size"""
    return self._response.get('content', {}).get('size', 0)

Get response content size

prop content_type : str
Expand source code
@property
def content_type(self) -> str:
    """Get response content type"""
    return self._response.get('content', {}).get('mimeType', '')

Get response content type

prop method : str
Expand source code
@property
def method(self) -> str:
    """Get HTTP method"""
    return self._request.get('method', 'GET')

Get HTTP method

prop request_headers : Dict[str, str]
Expand source code
@property
def request_headers(self) -> Dict[str, str]:
    """Get request headers as dict"""
    headers = {}
    for header in self._request.get('headers', []):
        headers[header['name']] = header['value']
    return headers

Get request headers as dict

prop response_headers : Dict[str, str]
Expand source code
@property
def response_headers(self) -> Dict[str, str]:
    """Get response headers as dict"""
    headers = {}
    for header in self._response.get('headers', []):
        headers[header['name']] = header['value']
    return headers

Get response headers as dict

prop started_datetime : str
Expand source code
@property
def started_datetime(self) -> str:
    """Get when request was started (ISO 8601 format)"""
    return self._data.get('startedDateTime', '')

Get when request was started (ISO 8601 format)

prop status_code : int
Expand source code
@property
def status_code(self) -> int:
    """Get response status code"""
    # Handle case where response doesn't exist or status is missing
    if not self._response:
        return 0
    status = self._response.get('status')
    if status is None:
        return 0
    # Ensure it's an int (HAR data might have status as string)
    try:
        return int(status)
    except (ValueError, TypeError):
        return 0

Get response status code

prop status_text : str
Expand source code
@property
def status_text(self) -> str:
    """Get response status text"""
    return self._response.get('statusText', '')

Get response status text

prop time : float
Expand source code
@property
def time(self) -> float:
    """Get total elapsed time in milliseconds"""
    return self._data.get('time', 0.0)

Get total elapsed time in milliseconds

prop timings : Dict[str, float]
Expand source code
@property
def timings(self) -> Dict[str, float]:
    """Get detailed timing information"""
    return self._data.get('timings', {})

Get detailed timing information

prop url : str
Expand source code
@property
def url(self) -> str:
    """Get request URL"""
    return self._request.get('url', '')

Get request URL

class HttpError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class HttpError(ScrapflyError):

    def __init__(self, request:Request, response:Optional[Response]=None, **kwargs):
        self.request = request
        self.response = response
        super().__init__(**kwargs)

    def __str__(self) -> str:
        if isinstance(self, UpstreamHttpError):
            return f"Target website responded with {self.api_response.scrape_result['status_code']} - {self.api_response.scrape_result['reason']}"

        if self.api_response is not None:
            return self.api_response.error_message

        text = f"{self.response.status_code} - {self.response.reason}"

        # Include detailed error message for all HTTP errors
        if self.message:
            text += f" - {self.message}"

        return text

Common base class for all non-exit exceptions.

Ancestors

Subclasses

  • ApiHttpClientError
  • scrapfly.errors.ExtractionAPIError
  • scrapfly.errors.QuotaLimitReached
  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.ScreenshotAPIError
  • scrapfly.errors.TooManyConcurrentRequest
  • scrapfly.errors.UpstreamHttpError
class ResponseBodyHandler (use_brotli: bool = False, signing_secrets: Tuple[str] | None = None)
Expand source code
class ResponseBodyHandler:

    SUPPORTED_COMPRESSION = ['gzip', 'deflate']
    SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json']

    class JSONDateTimeDecoder(JSONDecoder):
        def __init__(self, *args, **kargs):
            JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs)

    # brotli under perform at same gzip level and upper level destroy the cpu so
    # the trade off do not worth it for most of usage
    def __init__(self, use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None):
        if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION:
            try:
                try:
                    import brotlicffi as brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
                except ImportError:
                    import brotli
                    self.SUPPORTED_COMPRESSION.insert(0, 'br')
            except ImportError:
                pass

        try:
            from compression import zstd as _zstd  # noqa: F401 - Python 3.14+ native zstd
            if 'zstd' not in self.SUPPORTED_COMPRESSION:
                self.SUPPORTED_COMPRESSION.append('zstd')
        except ImportError:
            try:
                import zstandard  # noqa: F401 - aligned with urllib3 for transparent decompression
                if 'zstd' not in self.SUPPORTED_COMPRESSION:
                    self.SUPPORTED_COMPRESSION.append('zstd')
            except ImportError:
                pass

        self.content_encoding: str = ', '.join(self.SUPPORTED_COMPRESSION)
        self._signing_secret: Optional[Tuple[str]] = None

        if signing_secrets:
            _secrets = set()

            for signing_secret in signing_secrets:
                _secrets.add(binascii.unhexlify(signing_secret))

            self._signing_secret = tuple(_secrets)

        try:  # automatically use msgpack if available https://msgpack.org/
            import msgpack
            self.accept = 'application/msgpack;charset=utf-8'
            self.content_type = 'application/msgpack;charset=utf-8'
            self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)
        except ImportError:
            self.accept = 'application/json;charset=utf-8'
            self.content_type = 'application/json;charset=utf-8'
            self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder)

    def support(self, headers: Dict) -> bool:
        if 'content-type' not in headers:
            return False

        for content_type in self.SUPPORTED_CONTENT_TYPES:
            if headers['content-type'].find(content_type) != -1:
                return True

        return False

    def verify(self, message: bytes, signature: str) -> bool:
        for signing_secret in self._signing_secret:
            if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature:
                return True

        return False

    def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict:
        if content_encoding == 'gzip' or content_encoding == 'gz':
            import gzip
            content = gzip.decompress(content)
        elif content_encoding == 'deflate':
            import zlib
            content = zlib.decompress(content)
        elif content_encoding == 'brotli' or content_encoding == 'br':
            import brotli
            content = brotli.decompress(content)
        elif content_encoding == 'zstd':
            try:
                from compression import zstd as _zstd  # Python 3.14+
                content = _zstd.decompress(content)
            except ImportError:
                import zstandard
                content = zstandard.decompress(content)

        if self._signing_secret is not None and signature is not None:
            if not self.verify(content, signature):
                raise WebhookSignatureMissMatch()

        if content_type.startswith('application/json'):
            content = loads(content, cls=self.JSONDateTimeDecoder)
        elif content_type.startswith('application/msgpack'):
            import msgpack
            content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False)

        return content

    def __call__(self, content: bytes, content_type: str) -> Union[str, Dict]:
        content_loader = None

        if content_type.find('application/json') != -1:
            content_loader = partial(loads, cls=self.JSONDateTimeDecoder)
        elif content_type.find('application/msgpack') != -1:
            import msgpack
            content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False)

        if content_loader is None:
            raise Exception('Unsupported content type')

        try:
            return content_loader(content)
        except Exception as e:
            try:
                raise EncoderError(content=content.decode('utf-8')) from e
            except UnicodeError:
                raise EncoderError(content=base64.b64encode(content).decode('utf-8')) from e

Class variables

var JSONDateTimeDecoder

Simple JSON https://json.org decoder

Performs the following translations in decoding by default:

+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+

It also understands NaN, Infinity, and -Infinity as their corresponding float values, which is outside the JSON spec.

var SUPPORTED_COMPRESSION
var SUPPORTED_CONTENT_TYPES

Methods

def read(self,
content: bytes,
content_encoding: str,
content_type: str,
signature: str | None) ‑> Dict
Expand source code
def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict:
    if content_encoding == 'gzip' or content_encoding == 'gz':
        import gzip
        content = gzip.decompress(content)
    elif content_encoding == 'deflate':
        import zlib
        content = zlib.decompress(content)
    elif content_encoding == 'brotli' or content_encoding == 'br':
        import brotli
        content = brotli.decompress(content)
    elif content_encoding == 'zstd':
        try:
            from compression import zstd as _zstd  # Python 3.14+
            content = _zstd.decompress(content)
        except ImportError:
            import zstandard
            content = zstandard.decompress(content)

    if self._signing_secret is not None and signature is not None:
        if not self.verify(content, signature):
            raise WebhookSignatureMissMatch()

    if content_type.startswith('application/json'):
        content = loads(content, cls=self.JSONDateTimeDecoder)
    elif content_type.startswith('application/msgpack'):
        import msgpack
        content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False)

    return content
def support(self, headers: Dict) ‑> bool
Expand source code
def support(self, headers: Dict) -> bool:
    if 'content-type' not in headers:
        return False

    for content_type in self.SUPPORTED_CONTENT_TYPES:
        if headers['content-type'].find(content_type) != -1:
            return True

    return False
def verify(self, message: bytes, signature: str) ‑> bool
Expand source code
def verify(self, message: bytes, signature: str) -> bool:
    for signing_secret in self._signing_secret:
        if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature:
            return True

    return False
class ScrapeApiResponse (request: requests.models.Request,
response: requests.models.Response,
scrape_config: ScrapeConfig,
api_result: Dict | None = None,
large_object_handler: Callable | None = None)
Expand source code
class ScrapeApiResponse(ApiResponse):
    scrape_config:ScrapeConfig
    large_object_handler:Callable

    def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler:Optional[Callable]=None):
        super().__init__(request, response)
        self.scrape_config = scrape_config
        self.large_object_handler = large_object_handler

        if self.scrape_config.method == 'HEAD':
            api_result = {
                'result': {
                    'request_headers': {},
                    'status': 'DONE',
                    'success': 200 >= self.response.status_code < 300,
                    'response_headers': self.response.headers,
                    'status_code': self.response.status_code,
                    'reason': self.response.reason,
                    'format': 'text',
                    'content': ''
                },
                'context': {},
                'config': self.scrape_config.__dict__
            }

            if 'X-Scrapfly-Reject-Code' in self.response.headers:
                api_result['result']['error'] = {
                    'code': self.response.headers['X-Scrapfly-Reject-Code'],
                    'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']),
                    'message': self.response.headers['X-Scrapfly-Reject-Description'],
                    'error_id': self.response.headers['X-Scrapfly-Reject-ID'],
                    'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False,
                    'doc_url': '',
                    'links': {}
                }

                if 'X-Scrapfly-Reject-Doc' in self.response.headers:
                    api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc']
                    api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc']

        if isinstance(api_result, str):
            raise HttpError(
                request=request,
                response=response,
                message='Bad gateway',
                code=502,
                http_status_code=502,
                is_retryable=True
            )

        self.result = self.handle_api_result(api_result=api_result)

    @property
    def scrape_result(self) -> Optional[Dict]:
        return self.result.get('result', None)

    @property
    def config(self) -> Optional[Dict]:
        if self.scrape_result is None:
            return None

        return self.result['config']

    @property
    def context(self) -> Optional[Dict]:
        if self.scrape_result is None:
            return None

        return self.result['context']

    @property
    def content(self) -> str:
        if self.scrape_result is None:
            return ''

        return self.scrape_result['content']

    @property
    def success(self) -> bool:
        """
            Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
        """
        return 200 >= self.response.status_code <= 299

    @property
    def scrape_success(self) -> bool:
        scrape_result = self.scrape_result

        if not scrape_result:
            return False

        return self.scrape_result['success']

    @property
    def error(self) -> Optional[Dict]:
        if self.scrape_result is None:
            return None

        if self.scrape_success is False:
            return self.scrape_result['error']

    @property
    def upstream_status_code(self) -> Optional[int]:
        if self.scrape_result is None:
            return None

        if 'status_code' in self.scrape_result:
            return self.scrape_result['status_code']

        return None

    @cached_property
    def soup(self) -> 'BeautifulSoup':
        if self.scrape_result['format'] != 'text':
            raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")

        try:
            from bs4 import BeautifulSoup
            soup = BeautifulSoup(self.content, "lxml")
            return soup
        except ImportError as e:
            logger.error('You must install scrapfly[parser] to enable this feature')

    @cached_property
    def selector(self) -> 'Selector':
        if self.scrape_result['format'] != 'text':
            raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")

        try:
            from parsel import Selector
            return Selector(text=self.content)
        except ImportError as e:
            logger.error('You must install parsel or scrapy package to enable this feature')
            raise e

    def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        try:
            if isinstance(api_result['config']['headers'], list):
                api_result['config']['headers'] = {}
        except TypeError:
            logger.info(api_result)
            raise

        with suppress(KeyError):
            api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
            api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])

        if self.large_object_handler is not None and api_result['result']['content']:
            content_format = api_result['result']['format']

            if content_format in ['clob', 'blob']:
                api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format)
            elif content_format == 'binary':
                base64_payload = api_result['result']['content']

                if isinstance(base64_payload, bytes):
                    base64_payload = base64_payload.decode('utf-8')

                api_result['result']['content'] = BytesIO(b64decode(base64_payload))

        return FrozenDict(api_result)

    def _is_api_error(self, api_result: Dict) -> bool:
        if self.scrape_config.method == 'HEAD':
            if 'X-Reject-Reason' in self.response.headers:
                return True
            return False

        if api_result is None:
            return True

        return 'error_id' in api_result

    def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
        if _class != Response:
            raise RuntimeError('only Response from requests package is supported at the moment')

        if self.result is None:
            return None

        if self.response.status_code != 200:
            return None

        response = Response()
        response.status_code = self.scrape_result['status_code']
        response.reason = self.scrape_result['reason']

        if self.scrape_result['content']:
            if isinstance(self.scrape_result['content'], BytesIO):
                response._content = self.scrape_result['content'].getvalue()
            elif isinstance(self.scrape_result['content'], bytes):
                response._content = self.scrape_result['content']
            elif isinstance(self.scrape_result['content'], str):
                response._content = self.scrape_result['content'].encode('utf-8')
        else:
            response._content = None

        response.headers.update(self.scrape_result['response_headers'])
        response.url = self.scrape_result['url']

        response.request = Request(
            method=self.config['method'],
            url=self.config['url'],
            headers=self.scrape_result['request_headers'],
            data=self.config['body'] if self.config['body'] else None
        )

        if 'set-cookie' in response.headers:
            for raw_cookie in response.headers['set-cookie']:
                for name, cookie in SimpleCookie(raw_cookie).items():
                    expires = cookie.get('expires')

                    if expires == '':
                        expires = None

                    if expires:
                        try:
                            expires = parse(expires).timestamp()
                        except ValueError:
                            expires = None

                    if type(expires) == str:
                        if '.' in expires:
                            expires = float(expires)
                        else:
                            expires = int(expires)

                    response.cookies.set_cookie(Cookie(
                        version=cookie.get('version') if cookie.get('version') else None,
                        name=name,
                        value=cookie.value,
                        path=cookie.get('path', ''),
                        expires=expires,
                        comment=cookie.get('comment'),
                        domain=cookie.get('domain', ''),
                        secure=cookie.get('secure'),
                        port=None,
                        port_specified=False,
                        domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
                        domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
                        path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
                        discard=False,
                        comment_url=None,
                        rest={
                            'httponly': cookie.get('httponly'),
                            'samesite': cookie.get('samesite'),
                            'max-age': cookie.get('max-age')
                        }
                    ))

        return response

    def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None):
        file_content = content or self.scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = self.scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = self.config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path is not None else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)

    def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError):
        super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
        if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
            error = ErrorFactory.create(api_response=self)
            if error:
                if isinstance(error, UpstreamHttpError):
                    if raise_on_upstream_error is True:
                        raise error
                else:
                    raise error

Ancestors

Class variables

var large_object_handler : Callable
var scrape_configScrapeConfig

Instance variables

prop config : Dict | None
Expand source code
@property
def config(self) -> Optional[Dict]:
    if self.scrape_result is None:
        return None

    return self.result['config']
prop content : str
Expand source code
@property
def content(self) -> str:
    if self.scrape_result is None:
        return ''

    return self.scrape_result['content']
prop context : Dict | None
Expand source code
@property
def context(self) -> Optional[Dict]:
    if self.scrape_result is None:
        return None

    return self.result['context']
prop error : Dict | None
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.scrape_result is None:
        return None

    if self.scrape_success is False:
        return self.scrape_result['error']
prop scrape_result : Dict | None
Expand source code
@property
def scrape_result(self) -> Optional[Dict]:
    return self.result.get('result', None)
prop scrape_success : bool
Expand source code
@property
def scrape_success(self) -> bool:
    scrape_result = self.scrape_result

    if not scrape_result:
        return False

    return self.scrape_result['success']
var selector : Selector
Expand source code
@cached_property
def selector(self) -> 'Selector':
    if self.scrape_result['format'] != 'text':
        raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")

    try:
        from parsel import Selector
        return Selector(text=self.content)
    except ImportError as e:
        logger.error('You must install parsel or scrapy package to enable this feature')
        raise e
var soup : BeautifulSoup
Expand source code
@cached_property
def soup(self) -> 'BeautifulSoup':
    if self.scrape_result['format'] != 'text':
        raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content")

    try:
        from bs4 import BeautifulSoup
        soup = BeautifulSoup(self.content, "lxml")
        return soup
    except ImportError as e:
        logger.error('You must install scrapfly[parser] to enable this feature')
prop success : bool
Expand source code
@property
def success(self) -> bool:
    """
        Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
    """
    return 200 >= self.response.status_code <= 299

Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code

prop upstream_status_code : int | None
Expand source code
@property
def upstream_status_code(self) -> Optional[int]:
    if self.scrape_result is None:
        return None

    if 'status_code' in self.scrape_result:
        return self.scrape_result['status_code']

    return None

Methods

def handle_api_result(self, api_result: Dict) ‑> FrozenDict | None
Expand source code
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]:
    if self._is_api_error(api_result=api_result) is True:
        return FrozenDict(api_result)

    try:
        if isinstance(api_result['config']['headers'], list):
            api_result['config']['headers'] = {}
    except TypeError:
        logger.info(api_result)
        raise

    with suppress(KeyError):
        api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers'])
        api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers'])

    if self.large_object_handler is not None and api_result['result']['content']:
        content_format = api_result['result']['format']

        if content_format in ['clob', 'blob']:
            api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format)
        elif content_format == 'binary':
            base64_payload = api_result['result']['content']

            if isinstance(base64_payload, bytes):
                base64_payload = base64_payload.decode('utf-8')

            api_result['result']['content'] = BytesIO(b64decode(base64_payload))

    return FrozenDict(api_result)
def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ApiHttpClientError)
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError):
    super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
    if self.result['result']['status'] == 'DONE' and self.scrape_success is False:
        error = ErrorFactory.create(api_response=self)
        if error:
            if isinstance(error, UpstreamHttpError):
                if raise_on_upstream_error is True:
                    raise error
            else:
                raise error
def sink(self,
path: str | None = None,
name: str | None = None,
file:  | _io.BytesIO | None = None,
content: str | bytes | None = None)
Expand source code
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None):
    file_content = content or self.scrape_result['content']
    file_path = None
    file_extension = None

    if name:
        name_parts = name.split('.')
        if len(name_parts) > 1:
            file_extension = name_parts[-1]

    if not file:
        if file_extension is None:
            try:
                mime_type = self.scrape_result['response_headers']['content-type']
            except KeyError:
                mime_type = 'application/octet-stream'

            if ';' in mime_type:
                mime_type = mime_type.split(';')[0]

            file_extension = '.' + mime_type.split('/')[1]

        if not name:
            name = self.config['url'].split('/')[-1]

        if name.find(file_extension) == -1:
            name += file_extension

        file_path = path + '/' + name if path is not None else name

        if file_path == file_extension:
            url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-')

            if url[-1] == '-':
                url = url[:-1]

            url += file_extension

            file_path = url

        file = open(file_path, 'wb')

    if isinstance(file_content, str):
        file_content = BytesIO(file_content.encode('utf-8'))
    elif isinstance(file_content, bytes):
        file_content = BytesIO(file_content)

    file_content.seek(0)
    with file as f:
        shutil.copyfileobj(file_content, f, length=131072)

    logger.info('file %s created' % file_path)
def upstream_result_into_response(self) ‑> requests.models.Response | None
Expand source code
def upstream_result_into_response(self, _class=Response) -> Optional[Response]:
    if _class != Response:
        raise RuntimeError('only Response from requests package is supported at the moment')

    if self.result is None:
        return None

    if self.response.status_code != 200:
        return None

    response = Response()
    response.status_code = self.scrape_result['status_code']
    response.reason = self.scrape_result['reason']

    if self.scrape_result['content']:
        if isinstance(self.scrape_result['content'], BytesIO):
            response._content = self.scrape_result['content'].getvalue()
        elif isinstance(self.scrape_result['content'], bytes):
            response._content = self.scrape_result['content']
        elif isinstance(self.scrape_result['content'], str):
            response._content = self.scrape_result['content'].encode('utf-8')
    else:
        response._content = None

    response.headers.update(self.scrape_result['response_headers'])
    response.url = self.scrape_result['url']

    response.request = Request(
        method=self.config['method'],
        url=self.config['url'],
        headers=self.scrape_result['request_headers'],
        data=self.config['body'] if self.config['body'] else None
    )

    if 'set-cookie' in response.headers:
        for raw_cookie in response.headers['set-cookie']:
            for name, cookie in SimpleCookie(raw_cookie).items():
                expires = cookie.get('expires')

                if expires == '':
                    expires = None

                if expires:
                    try:
                        expires = parse(expires).timestamp()
                    except ValueError:
                        expires = None

                if type(expires) == str:
                    if '.' in expires:
                        expires = float(expires)
                    else:
                        expires = int(expires)

                response.cookies.set_cookie(Cookie(
                    version=cookie.get('version') if cookie.get('version') else None,
                    name=name,
                    value=cookie.value,
                    path=cookie.get('path', ''),
                    expires=expires,
                    comment=cookie.get('comment'),
                    domain=cookie.get('domain', ''),
                    secure=cookie.get('secure'),
                    port=None,
                    port_specified=False,
                    domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '',
                    domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False,
                    path_specified=cookie.get('path') != '' and cookie.get('path') is not None,
                    discard=False,
                    comment_url=None,
                    rest={
                        'httponly': cookie.get('httponly'),
                        'samesite': cookie.get('samesite'),
                        'max-age': cookie.get('max-age')
                    }
                ))

    return response

Inherited members

class ScrapeConfig (url: str,
retry: bool = True,
method: str = 'GET',
country: str | None = None,
render_js: bool = False,
cache: bool = False,
cache_clear: bool = False,
ssl: bool = False,
dns: bool = False,
asp: bool = False,
debug: bool = False,
raise_on_upstream_error: bool = True,
cache_ttl: int | None = None,
proxy_pool: str | None = None,
session: str | None = None,
tags: List[str] | Set[str] | None = None,
format: Format | None = None,
format_options: List[FormatOption] | None = None,
extraction_template: str | None = None,
extraction_ephemeral_template: Dict | None = None,
extraction_prompt: str | None = None,
extraction_model: str | None = None,
correlation_id: str | None = None,
cookies: requests.structures.CaseInsensitiveDict | None = None,
body: str | None = None,
data: Dict | None = None,
headers: requests.structures.CaseInsensitiveDict | Dict[str, str] | None = None,
js: str = None,
rendering_wait: int = None,
rendering_stage: Literal['complete', 'domcontentloaded'] = 'complete',
wait_for_selector: str | None = None,
screenshots: Dict | None = None,
screenshot_flags: List[ScreenshotFlag] | None = None,
session_sticky_proxy: bool | None = None,
webhook: str | None = None,
timeout: int | None = None,
js_scenario: List | None = None,
extract: Dict | None = None,
os: str | None = None,
lang: List[str] | None = None,
auto_scroll: bool | None = None,
cost_budget: int | None = None)
Expand source code
class ScrapeConfig(BaseApiConfig):

    PUBLIC_DATACENTER_POOL = 'public_datacenter_pool'
    PUBLIC_RESIDENTIAL_POOL = 'public_residential_pool'

    url: str
    retry: bool = True
    method: str = 'GET'
    country: Optional[str] = None
    render_js: bool = False
    cache: bool = False
    cache_clear:bool = False
    ssl:bool = False
    dns:bool = False
    asp:bool = False
    debug: bool = False
    raise_on_upstream_error:bool = True
    cache_ttl:Optional[int] = None
    proxy_pool:Optional[str] = None
    session: Optional[str] = None
    tags: Optional[List[str]] = None
    format: Optional[Format] = None, # raw(unchanged)
    format_options: Optional[List[FormatOption]] 
    extraction_template: Optional[str] = None  # a saved template name
    extraction_ephemeral_template: Optional[Dict]  # ephemeraly declared json template
    extraction_prompt: Optional[str] = None
    extraction_model: Optional[str] = None    
    correlation_id: Optional[str] = None
    cookies: Optional[CaseInsensitiveDict] = None
    body: Optional[str] = None
    data: Optional[Dict] = None
    headers: Optional[CaseInsensitiveDict] = None
    js: str = None
    rendering_wait: int = None
    rendering_stage: Literal["complete", "domcontentloaded"] = "complete"
    wait_for_selector: Optional[str] = None
    session_sticky_proxy:bool = True
    screenshots:Optional[Dict]=None
    screenshot_flags: Optional[List[ScreenshotFlag]] = None,
    webhook:Optional[str]=None
    timeout:Optional[int]=None # in milliseconds
    js_scenario: Dict = None
    extract: Dict = None
    lang:Optional[List[str]] = None
    os:Optional[str] = None
    auto_scroll:Optional[bool] = None
    cost_budget:Optional[int] = None

    def __init__(
        self,
        url: str,
        retry: bool = True,
        method: str = 'GET',
        country: Optional[str] = None,
        render_js: bool = False,
        cache: bool = False,
        cache_clear:bool = False,
        ssl:bool = False,
        dns:bool = False,
        asp:bool = False,
        debug: bool = False,
        raise_on_upstream_error:bool = True,
        cache_ttl:Optional[int] = None,
        proxy_pool:Optional[str] = None,
        session: Optional[str] = None,
        tags: Optional[Union[List[str], Set[str]]] = None,
        format: Optional[Format] = None, # raw(unchanged)
        format_options: Optional[List[FormatOption]] = None, # raw(unchanged)
        extraction_template: Optional[str] = None,  # a saved template name
        extraction_ephemeral_template: Optional[Dict] = None,  # ephemeraly declared json template
        extraction_prompt: Optional[str] = None,
        extraction_model: Optional[str] = None,        
        correlation_id: Optional[str] = None,
        cookies: Optional[CaseInsensitiveDict] = None,
        body: Optional[str] = None,
        data: Optional[Dict] = None,
        headers: Optional[Union[CaseInsensitiveDict, Dict[str, str]]] = None,
        js: str = None,
        rendering_wait: int = None,
        rendering_stage: Literal["complete", "domcontentloaded"] = "complete",
        wait_for_selector: Optional[str] = None,
        screenshots:Optional[Dict]=None,
        screenshot_flags: Optional[List[ScreenshotFlag]] = None,
        session_sticky_proxy:Optional[bool] = None,
        webhook:Optional[str] = None,
        timeout:Optional[int] = None, # in milliseconds
        js_scenario:Optional[List] = None,
        extract:Optional[Dict] = None,
        os:Optional[str] = None,
        lang:Optional[List[str]] = None,
        auto_scroll:Optional[bool] = None,
        cost_budget:Optional[int] = None
    ):
        assert(type(url) is str)

        if isinstance(tags, List):
            tags = set(tags)

        cookies = cookies or {}
        headers = headers or {}

        self.cookies = CaseInsensitiveDict(cookies)
        self.headers = CaseInsensitiveDict(headers)
        self.url = url
        self.retry = retry
        self.method = method
        self.country = country
        self.session_sticky_proxy = session_sticky_proxy
        self.render_js = render_js
        self.cache = cache
        self.cache_clear = cache_clear
        self.asp = asp
        self.webhook = webhook
        self.session = session
        self.debug = debug
        self.cache_ttl = cache_ttl
        self.proxy_pool = proxy_pool
        self.tags = tags or set()
        self.format = format
        self.format_options = format_options
        self.extraction_template = extraction_template
        self.extraction_ephemeral_template = extraction_ephemeral_template
        self.extraction_prompt = extraction_prompt
        self.extraction_model = extraction_model        
        self.correlation_id = correlation_id
        self.wait_for_selector = wait_for_selector
        self.body = body
        self.data = data
        self.js = js
        self.rendering_wait = rendering_wait
        self.rendering_stage = rendering_stage
        self.raise_on_upstream_error = raise_on_upstream_error
        self.screenshots = screenshots
        self.screenshot_flags = screenshot_flags
        self.key = None
        self.dns = dns
        self.ssl = ssl
        self.js_scenario = js_scenario
        self.timeout = timeout
        self.extract = extract
        self.lang = lang
        self.os = os
        self.auto_scroll = auto_scroll
        self.cost_budget = cost_budget

        if cookies:
            _cookies = []

            for name, value in cookies.items():
                _cookies.append(name + '=' + value)

            if 'cookie' in self.headers:
                if self.headers['cookie'][-1] != ';':
                    self.headers['cookie'] += ';'
            else:
                self.headers['cookie'] = ''

            self.headers['cookie'] += '; '.join(_cookies)

        if self.body and self.data:
            raise ScrapeConfigError('You cannot pass both parameters body and data. You must choose')

        if method in ['POST', 'PUT', 'PATCH']:
            if self.body is None and self.data is not None:
                if 'content-type' not in self.headers:
                    self.headers['content-type'] = 'application/x-www-form-urlencoded'
                    self.body = urlencode(data)
                else:
                    if self.headers['content-type'].find('application/json') != -1:
                        self.body = json.dumps(data)
                    elif self.headers['content-type'].find('application/x-www-form-urlencoded') != -1:
                        self.body = urlencode(data)
                    else:
                        raise ScrapeConfigError('Content-Type "%s" not supported, use body parameter to pass pre encoded body according to your content type' % self.headers['content-type'])
            elif self.body is None and self.data is None:
                self.headers['content-type'] = 'text/plain'

    def to_api_params(self, key:str) -> Dict:
        params = {
            'key': self.key or key,
            'url': self.url
        }

        if self.country is not None:
            params['country'] = self.country

        for name, value in self.headers.items():
            params['headers[%s]' % name] = value

        if self.webhook is not None:
            params['webhook_name'] = self.webhook

        if self.timeout is not None:
            params['timeout'] = self.timeout

        if self.extract is not None:
            params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8')

        if self.cost_budget is not None:
            params['cost_budget'] = self.cost_budget

        if self.render_js is True:
            params['render_js'] = self._bool_to_http(self.render_js)

            if self.wait_for_selector is not None:
                params['wait_for_selector'] = self.wait_for_selector

            if self.js:
                params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

            if self.js_scenario:
                params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8')

            if self.rendering_wait:
                params['rendering_wait'] = self.rendering_wait
            
            if self.rendering_stage:
                params['rendering_stage'] = self.rendering_stage

            if self.screenshots is not None:
                for name, element in self.screenshots.items():
                    params['screenshots[%s]' % name] = element

            if self.screenshot_flags is not None:
                self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags]
                params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags)
            else:
                if self.screenshot_flags is not None:
                    logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled')

            if self.auto_scroll is True:
                params['auto_scroll'] = self._bool_to_http(self.auto_scroll)
        else:
            if self.wait_for_selector is not None:
                logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled')

            if self.screenshots:
                logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled')

            if self.js_scenario:
                logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled')

            if self.js:
                logging.warning('Params "js" is ignored. Works only if render_js is enabled')

            if self.rendering_wait:
                logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled')

        if self.asp is True:
            params['asp'] = self._bool_to_http(self.asp)

        if self.retry is False:
            params['retry'] = self._bool_to_http(self.retry)

        if self.cache is True:
            params['cache'] = self._bool_to_http(self.cache)

            if self.cache_clear is True:
                params['cache_clear'] = self._bool_to_http(self.cache_clear)

            if self.cache_ttl is not None:
                params['cache_ttl'] = self.cache_ttl
        else:
            if self.cache_clear is True:
                logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

            if self.cache_ttl is not None:
                logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

        if self.dns is True:
            params['dns'] = self._bool_to_http(self.dns)

        if self.ssl is True:
            params['ssl'] = self._bool_to_http(self.ssl)

        if self.tags:
            params['tags'] = ','.join(self.tags)

        if self.format:
            params['format'] = Format(self.format).value
            if self.format_options:
                params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options)

        if self.extraction_template and self.extraction_ephemeral_template:
            raise ScrapeConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

        if self.extraction_template:
            params['extraction_template'] = self.extraction_template

        if self.extraction_ephemeral_template:
            self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
            params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

        if self.extraction_prompt:
            params['extraction_prompt'] = quote_plus(self.extraction_prompt)

        if self.extraction_model:
            params['extraction_model'] = self.extraction_model

        if self.correlation_id:
            params['correlation_id'] = self.correlation_id

        if self.session:
            params['session'] = self.session

            if self.session_sticky_proxy is True: # false by default
                params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy)
        else:
            if self.session_sticky_proxy:
                logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled')

        if self.debug is True:
            params['debug'] = self._bool_to_http(self.debug)

        if self.proxy_pool is not None:
            params['proxy_pool'] = self.proxy_pool

        if self.lang is not None:
            params['lang'] = ','.join(self.lang)

        if self.os is not None:
            params['os'] = self.os

        return params

    @staticmethod
    def from_exported_config(config:str) -> 'ScrapeConfig':
        try:
            from msgpack import loads as msgpack_loads
        except ImportError as e:
            print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack')
            raise

        data = msgpack_loads(base64.b64decode(config))

        headers = {}

        for name, value in data['headers'].items():
            if isinstance(value, Iterable):
                headers[name] = '; '.join(value)
            else:
                headers[name] = value

        return ScrapeConfig(
            url=data['url'],
            retry=data['retry'],
            headers=headers,
            session=data['session'],
            session_sticky_proxy=data['session_sticky_proxy'],
            cache=data['cache'],
            cache_ttl=data['cache_ttl'],
            cache_clear=data['cache_clear'],
            render_js=data['render_js'],
            method=data['method'],
            asp=data['asp'],
            body=data['body'],
            ssl=data['ssl'],
            dns=data['dns'],
            country=data['country'],
            debug=data['debug'],
            correlation_id=data['correlation_id'],
            tags=data['tags'],
            format=data['format'],
            js=data['js'],
            rendering_wait=data['rendering_wait'],
            screenshots=data['screenshots'] or {},
            screenshot_flags=data['screenshot_flags'],
            proxy_pool=data['proxy_pool'],
            auto_scroll=data['auto_scroll'],
            cost_budget=data['cost_budget']
        )

    def to_dict(self) -> Dict:
        """
        Export the ScrapeConfig instance to a plain dictionary. 
        Useful for JSON-serialization or other external storage.
        """
        
        return {
            'url': self.url,
            'retry': self.retry,
            'method': self.method,
            'country': self.country,
            'render_js': self.render_js,
            'cache': self.cache,
            'cache_clear': self.cache_clear,
            'ssl': self.ssl,
            'dns': self.dns,
            'asp': self.asp,
            'debug': self.debug,
            'raise_on_upstream_error': self.raise_on_upstream_error,
            'cache_ttl': self.cache_ttl,
            'proxy_pool': self.proxy_pool,
            'session': self.session,
            'tags': list(self.tags),
            'format': Format(self.format).value if self.format else None,
            'format_options': [FormatOption(option).value for option in self.format_options] if self.format_options else None,
            'extraction_template': self.extraction_template,
            'extraction_ephemeral_template': self.extraction_ephemeral_template,
            'extraction_prompt': self.extraction_prompt,
            'extraction_model': self.extraction_model,
            'correlation_id': self.correlation_id,
            'cookies': CaseInsensitiveDict(self.cookies),
            'body': self.body,
            'data': None if self.body else self.data,
            'headers': CaseInsensitiveDict(self.headers),
            'js': self.js,
            'rendering_wait': self.rendering_wait,
            'wait_for_selector': self.wait_for_selector,
            'session_sticky_proxy': self.session_sticky_proxy,
            'screenshots': self.screenshots,
            'screenshot_flags': [ScreenshotFlag(flag).value for flag in self.screenshot_flags] if self.screenshot_flags else None,
            'webhook': self.webhook,
            'timeout': self.timeout,
            'js_scenario': self.js_scenario,
            'extract': self.extract,
            'lang': self.lang,
            'os': self.os,
            'auto_scroll': self.auto_scroll,
            'cost_budget': self.cost_budget,
        }

    @staticmethod
    def from_dict(scrape_config_dict: Dict) -> 'ScrapeConfig':
        """Create a ScrapeConfig instance from a dictionary."""
        url = scrape_config_dict.get('url', None)
        retry = scrape_config_dict.get('retry', False)
        method = scrape_config_dict.get('method', 'GET')
        country = scrape_config_dict.get('country', None)
        render_js = scrape_config_dict.get('render_js', False)
        cache = scrape_config_dict.get('cache', False)
        cache_clear = scrape_config_dict.get('cache_clear', False)
        ssl = scrape_config_dict.get('ssl', False)
        dns = scrape_config_dict.get('dns', False)
        asp = scrape_config_dict.get('asp', False)
        debug = scrape_config_dict.get('debug', False)
        raise_on_upstream_error = scrape_config_dict.get('raise_on_upstream_error', True)
        cache_ttl = scrape_config_dict.get('cache_ttl', None)
        proxy_pool = scrape_config_dict.get('proxy_pool', None)
        session = scrape_config_dict.get('session', None)
        tags = scrape_config_dict.get('tags', [])

        format = scrape_config_dict.get('format', None)
        format = Format(format) if format else None

        format_options = scrape_config_dict.get('format_options', None)
        format_options = [FormatOption(option) for option in format_options] if format_options else None

        extraction_template = scrape_config_dict.get('extraction_template', None)
        extraction_ephemeral_template = scrape_config_dict.get('extraction_ephemeral_template', None)
        extraction_prompt = scrape_config_dict.get('extraction_prompt', None)
        extraction_model = scrape_config_dict.get('extraction_model', None)
        correlation_id = scrape_config_dict.get('correlation_id', None)
        cookies = scrape_config_dict.get('cookies', {})
        body = scrape_config_dict.get('body', None)
        data = scrape_config_dict.get('data', None)
        headers = scrape_config_dict.get('headers', {})
        js = scrape_config_dict.get('js', None)
        rendering_wait = scrape_config_dict.get('rendering_wait', None)
        wait_for_selector = scrape_config_dict.get('wait_for_selector', None)
        screenshots = scrape_config_dict.get('screenshots', [])
        
        screenshot_flags = scrape_config_dict.get('screenshot_flags', [])
        screenshot_flags = [ScreenshotFlag(flag) for flag in screenshot_flags] if screenshot_flags else None

        session_sticky_proxy = scrape_config_dict.get('session_sticky_proxy', False)
        webhook = scrape_config_dict.get('webhook', None)
        timeout = scrape_config_dict.get('timeout', None)
        js_scenario = scrape_config_dict.get('js_scenario', None)
        extract = scrape_config_dict.get('extract', None)
        os = scrape_config_dict.get('os', None)
        lang = scrape_config_dict.get('lang', None)
        auto_scroll = scrape_config_dict.get('auto_scroll', None)
        cost_budget = scrape_config_dict.get('cost_budget', None)

        return ScrapeConfig(
            url=url,
            retry=retry,
            method=method,
            country=country,
            render_js=render_js,
            cache=cache,
            cache_clear=cache_clear,
            ssl=ssl,
            dns=dns,
            asp=asp,
            debug=debug,
            raise_on_upstream_error=raise_on_upstream_error,
            cache_ttl=cache_ttl,
            proxy_pool=proxy_pool,
            session=session,
            tags=tags,
            format=format,
            format_options=format_options,
            extraction_template=extraction_template,
            extraction_ephemeral_template=extraction_ephemeral_template,
            extraction_prompt=extraction_prompt,
            extraction_model=extraction_model,
            correlation_id=correlation_id,
            cookies=cookies,
            body=body,
            data=data,
            headers=headers,
            js=js,
            rendering_wait=rendering_wait,
            wait_for_selector=wait_for_selector,
            screenshots=screenshots,
            screenshot_flags=screenshot_flags,
            session_sticky_proxy=session_sticky_proxy,
            webhook=webhook,
            timeout=timeout,
            js_scenario=js_scenario,
            extract=extract,
            os=os,
            lang=lang,
            auto_scroll=auto_scroll,
            cost_budget=cost_budget,
        )

Ancestors

Class variables

var PUBLIC_DATACENTER_POOL
var PUBLIC_RESIDENTIAL_POOL
var asp : bool
var auto_scroll : bool | None
var body : str | None
var cache : bool
var cache_clear : bool
var cache_ttl : int | None
var cookies : requests.structures.CaseInsensitiveDict | None
var correlation_id : str | None
var cost_budget : int | None
var country : str | None
var data : Dict | None
var debug : bool
var dns : bool
var extract : Dict
var extraction_ephemeral_template : Dict | None
var extraction_model : str | None
var extraction_prompt : str | None
var extraction_template : str | None
var formatFormat | None
var format_options : List[FormatOption] | None
var headers : requests.structures.CaseInsensitiveDict | None
var js : str
var js_scenario : Dict
var lang : List[str] | None
var method : str
var os : str | None
var proxy_pool : str | None
var raise_on_upstream_error : bool
var render_js : bool
var rendering_stage : Literal['complete', 'domcontentloaded']
var rendering_wait : int
var retry : bool
var screenshot_flags : List[ScreenshotFlag] | None
var screenshots : Dict | None
var session : str | None
var session_sticky_proxy : bool
var ssl : bool
var tags : List[str] | None
var timeout : int | None
var url : str
var wait_for_selector : str | None
var webhook : str | None

Static methods

def from_dict(scrape_config_dict: Dict) ‑> ScrapeConfig
Expand source code
@staticmethod
def from_dict(scrape_config_dict: Dict) -> 'ScrapeConfig':
    """Create a ScrapeConfig instance from a dictionary."""
    url = scrape_config_dict.get('url', None)
    retry = scrape_config_dict.get('retry', False)
    method = scrape_config_dict.get('method', 'GET')
    country = scrape_config_dict.get('country', None)
    render_js = scrape_config_dict.get('render_js', False)
    cache = scrape_config_dict.get('cache', False)
    cache_clear = scrape_config_dict.get('cache_clear', False)
    ssl = scrape_config_dict.get('ssl', False)
    dns = scrape_config_dict.get('dns', False)
    asp = scrape_config_dict.get('asp', False)
    debug = scrape_config_dict.get('debug', False)
    raise_on_upstream_error = scrape_config_dict.get('raise_on_upstream_error', True)
    cache_ttl = scrape_config_dict.get('cache_ttl', None)
    proxy_pool = scrape_config_dict.get('proxy_pool', None)
    session = scrape_config_dict.get('session', None)
    tags = scrape_config_dict.get('tags', [])

    format = scrape_config_dict.get('format', None)
    format = Format(format) if format else None

    format_options = scrape_config_dict.get('format_options', None)
    format_options = [FormatOption(option) for option in format_options] if format_options else None

    extraction_template = scrape_config_dict.get('extraction_template', None)
    extraction_ephemeral_template = scrape_config_dict.get('extraction_ephemeral_template', None)
    extraction_prompt = scrape_config_dict.get('extraction_prompt', None)
    extraction_model = scrape_config_dict.get('extraction_model', None)
    correlation_id = scrape_config_dict.get('correlation_id', None)
    cookies = scrape_config_dict.get('cookies', {})
    body = scrape_config_dict.get('body', None)
    data = scrape_config_dict.get('data', None)
    headers = scrape_config_dict.get('headers', {})
    js = scrape_config_dict.get('js', None)
    rendering_wait = scrape_config_dict.get('rendering_wait', None)
    wait_for_selector = scrape_config_dict.get('wait_for_selector', None)
    screenshots = scrape_config_dict.get('screenshots', [])
    
    screenshot_flags = scrape_config_dict.get('screenshot_flags', [])
    screenshot_flags = [ScreenshotFlag(flag) for flag in screenshot_flags] if screenshot_flags else None

    session_sticky_proxy = scrape_config_dict.get('session_sticky_proxy', False)
    webhook = scrape_config_dict.get('webhook', None)
    timeout = scrape_config_dict.get('timeout', None)
    js_scenario = scrape_config_dict.get('js_scenario', None)
    extract = scrape_config_dict.get('extract', None)
    os = scrape_config_dict.get('os', None)
    lang = scrape_config_dict.get('lang', None)
    auto_scroll = scrape_config_dict.get('auto_scroll', None)
    cost_budget = scrape_config_dict.get('cost_budget', None)

    return ScrapeConfig(
        url=url,
        retry=retry,
        method=method,
        country=country,
        render_js=render_js,
        cache=cache,
        cache_clear=cache_clear,
        ssl=ssl,
        dns=dns,
        asp=asp,
        debug=debug,
        raise_on_upstream_error=raise_on_upstream_error,
        cache_ttl=cache_ttl,
        proxy_pool=proxy_pool,
        session=session,
        tags=tags,
        format=format,
        format_options=format_options,
        extraction_template=extraction_template,
        extraction_ephemeral_template=extraction_ephemeral_template,
        extraction_prompt=extraction_prompt,
        extraction_model=extraction_model,
        correlation_id=correlation_id,
        cookies=cookies,
        body=body,
        data=data,
        headers=headers,
        js=js,
        rendering_wait=rendering_wait,
        wait_for_selector=wait_for_selector,
        screenshots=screenshots,
        screenshot_flags=screenshot_flags,
        session_sticky_proxy=session_sticky_proxy,
        webhook=webhook,
        timeout=timeout,
        js_scenario=js_scenario,
        extract=extract,
        os=os,
        lang=lang,
        auto_scroll=auto_scroll,
        cost_budget=cost_budget,
    )

Create a ScrapeConfig instance from a dictionary.

def from_exported_config(config: str) ‑> ScrapeConfig
Expand source code
@staticmethod
def from_exported_config(config:str) -> 'ScrapeConfig':
    try:
        from msgpack import loads as msgpack_loads
    except ImportError as e:
        print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack')
        raise

    data = msgpack_loads(base64.b64decode(config))

    headers = {}

    for name, value in data['headers'].items():
        if isinstance(value, Iterable):
            headers[name] = '; '.join(value)
        else:
            headers[name] = value

    return ScrapeConfig(
        url=data['url'],
        retry=data['retry'],
        headers=headers,
        session=data['session'],
        session_sticky_proxy=data['session_sticky_proxy'],
        cache=data['cache'],
        cache_ttl=data['cache_ttl'],
        cache_clear=data['cache_clear'],
        render_js=data['render_js'],
        method=data['method'],
        asp=data['asp'],
        body=data['body'],
        ssl=data['ssl'],
        dns=data['dns'],
        country=data['country'],
        debug=data['debug'],
        correlation_id=data['correlation_id'],
        tags=data['tags'],
        format=data['format'],
        js=data['js'],
        rendering_wait=data['rendering_wait'],
        screenshots=data['screenshots'] or {},
        screenshot_flags=data['screenshot_flags'],
        proxy_pool=data['proxy_pool'],
        auto_scroll=data['auto_scroll'],
        cost_budget=data['cost_budget']
    )

Methods

def to_api_params(self, key: str) ‑> Dict
Expand source code
def to_api_params(self, key:str) -> Dict:
    params = {
        'key': self.key or key,
        'url': self.url
    }

    if self.country is not None:
        params['country'] = self.country

    for name, value in self.headers.items():
        params['headers[%s]' % name] = value

    if self.webhook is not None:
        params['webhook_name'] = self.webhook

    if self.timeout is not None:
        params['timeout'] = self.timeout

    if self.extract is not None:
        params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8')

    if self.cost_budget is not None:
        params['cost_budget'] = self.cost_budget

    if self.render_js is True:
        params['render_js'] = self._bool_to_http(self.render_js)

        if self.wait_for_selector is not None:
            params['wait_for_selector'] = self.wait_for_selector

        if self.js:
            params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

        if self.js_scenario:
            params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8')

        if self.rendering_wait:
            params['rendering_wait'] = self.rendering_wait
        
        if self.rendering_stage:
            params['rendering_stage'] = self.rendering_stage

        if self.screenshots is not None:
            for name, element in self.screenshots.items():
                params['screenshots[%s]' % name] = element

        if self.screenshot_flags is not None:
            self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags]
            params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags)
        else:
            if self.screenshot_flags is not None:
                logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled')

        if self.auto_scroll is True:
            params['auto_scroll'] = self._bool_to_http(self.auto_scroll)
    else:
        if self.wait_for_selector is not None:
            logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled')

        if self.screenshots:
            logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled')

        if self.js_scenario:
            logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled')

        if self.js:
            logging.warning('Params "js" is ignored. Works only if render_js is enabled')

        if self.rendering_wait:
            logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled')

    if self.asp is True:
        params['asp'] = self._bool_to_http(self.asp)

    if self.retry is False:
        params['retry'] = self._bool_to_http(self.retry)

    if self.cache is True:
        params['cache'] = self._bool_to_http(self.cache)

        if self.cache_clear is True:
            params['cache_clear'] = self._bool_to_http(self.cache_clear)

        if self.cache_ttl is not None:
            params['cache_ttl'] = self.cache_ttl
    else:
        if self.cache_clear is True:
            logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

        if self.cache_ttl is not None:
            logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

    if self.dns is True:
        params['dns'] = self._bool_to_http(self.dns)

    if self.ssl is True:
        params['ssl'] = self._bool_to_http(self.ssl)

    if self.tags:
        params['tags'] = ','.join(self.tags)

    if self.format:
        params['format'] = Format(self.format).value
        if self.format_options:
            params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options)

    if self.extraction_template and self.extraction_ephemeral_template:
        raise ScrapeConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose')

    if self.extraction_template:
        params['extraction_template'] = self.extraction_template

    if self.extraction_ephemeral_template:
        self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template)
        params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8')

    if self.extraction_prompt:
        params['extraction_prompt'] = quote_plus(self.extraction_prompt)

    if self.extraction_model:
        params['extraction_model'] = self.extraction_model

    if self.correlation_id:
        params['correlation_id'] = self.correlation_id

    if self.session:
        params['session'] = self.session

        if self.session_sticky_proxy is True: # false by default
            params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy)
    else:
        if self.session_sticky_proxy:
            logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled')

    if self.debug is True:
        params['debug'] = self._bool_to_http(self.debug)

    if self.proxy_pool is not None:
        params['proxy_pool'] = self.proxy_pool

    if self.lang is not None:
        params['lang'] = ','.join(self.lang)

    if self.os is not None:
        params['os'] = self.os

    return params
def to_dict(self) ‑> Dict
Expand source code
def to_dict(self) -> Dict:
    """
    Export the ScrapeConfig instance to a plain dictionary. 
    Useful for JSON-serialization or other external storage.
    """
    
    return {
        'url': self.url,
        'retry': self.retry,
        'method': self.method,
        'country': self.country,
        'render_js': self.render_js,
        'cache': self.cache,
        'cache_clear': self.cache_clear,
        'ssl': self.ssl,
        'dns': self.dns,
        'asp': self.asp,
        'debug': self.debug,
        'raise_on_upstream_error': self.raise_on_upstream_error,
        'cache_ttl': self.cache_ttl,
        'proxy_pool': self.proxy_pool,
        'session': self.session,
        'tags': list(self.tags),
        'format': Format(self.format).value if self.format else None,
        'format_options': [FormatOption(option).value for option in self.format_options] if self.format_options else None,
        'extraction_template': self.extraction_template,
        'extraction_ephemeral_template': self.extraction_ephemeral_template,
        'extraction_prompt': self.extraction_prompt,
        'extraction_model': self.extraction_model,
        'correlation_id': self.correlation_id,
        'cookies': CaseInsensitiveDict(self.cookies),
        'body': self.body,
        'data': None if self.body else self.data,
        'headers': CaseInsensitiveDict(self.headers),
        'js': self.js,
        'rendering_wait': self.rendering_wait,
        'wait_for_selector': self.wait_for_selector,
        'session_sticky_proxy': self.session_sticky_proxy,
        'screenshots': self.screenshots,
        'screenshot_flags': [ScreenshotFlag(flag).value for flag in self.screenshot_flags] if self.screenshot_flags else None,
        'webhook': self.webhook,
        'timeout': self.timeout,
        'js_scenario': self.js_scenario,
        'extract': self.extract,
        'lang': self.lang,
        'os': self.os,
        'auto_scroll': self.auto_scroll,
        'cost_budget': self.cost_budget,
    }

Export the ScrapeConfig instance to a plain dictionary. Useful for JSON-serialization or other external storage.

class ScraperAPI
Expand source code
class ScraperAPI:

    MONITORING_DATA_FORMAT_STRUCTURED = 'structured'
    MONITORING_DATA_FORMAT_PROMETHEUS = 'prometheus'

    MONITORING_PERIOD_SUBSCRIPTION = 'subscription'
    MONITORING_PERIOD_LAST_7D = 'last7d'
    MONITORING_PERIOD_LAST_24H = 'last24h'
    MONITORING_PERIOD_LAST_1H = 'last1h'
    MONITORING_PERIOD_LAST_5m = 'last5m'

    MONITORING_ACCOUNT_AGGREGATION = 'account'
    MONITORING_PROJECT_AGGREGATION = 'project'
    MONITORING_TARGET_AGGREGATION = 'target'

Class variables

var MONITORING_ACCOUNT_AGGREGATION
var MONITORING_DATA_FORMAT_PROMETHEUS
var MONITORING_DATA_FORMAT_STRUCTURED
var MONITORING_PERIOD_LAST_1H
var MONITORING_PERIOD_LAST_24H
var MONITORING_PERIOD_LAST_5m
var MONITORING_PERIOD_LAST_7D
var MONITORING_PERIOD_SUBSCRIPTION
var MONITORING_PROJECT_AGGREGATION
var MONITORING_TARGET_AGGREGATION
class ScrapflyAspError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflyAspError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyClient (key: str,
host: str = 'https://api.scrapfly.io',
verify=True,
debug: bool = False,
max_concurrency: int = 1,
connect_timeout: int = 30,
web_scraping_api_read_timeout: int = 160,
extraction_api_read_timeout: int = 35,
screenshot_api_read_timeout: int = 60,
read_timeout: int = 30,
default_read_timeout: int = 30,
reporter: Callable | None = None,
**kwargs)
Expand source code
class ScrapflyClient:

    HOST = 'https://api.scrapfly.io'
    DEFAULT_CONNECT_TIMEOUT = 30
    DEFAULT_READ_TIMEOUT = 30

    DEFAULT_WEBSCRAPING_API_READ_TIMEOUT = 160 # 155 real
    DEFAULT_SCREENSHOT_API_READ_TIMEOUT = 60  # 30 real
    DEFAULT_EXTRACTION_API_READ_TIMEOUT = 35 # 30 real
    DEFAULT_CRAWLER_API_READ_TIMEOUT = 30

    host:str
    key:str
    max_concurrency:int
    verify:bool
    debug:bool
    distributed_mode:bool
    connect_timeout:int
    web_scraping_api_read_timeout:int
    screenshot_api_read_timeout:int
    extraction_api_read_timeout:int
    monitoring_api_read_timeout:int
    default_read_timeout:int
    brotli: bool
    reporter:Reporter
    version:str

    # @deprecated
    read_timeout:int

    CONCURRENCY_AUTO = 'auto' # retrieve the allowed concurrency from your account
    DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'

    def __init__(
        self,
        key: str,
        host: str = HOST,
        verify=True,
        debug: bool = False,
        max_concurrency:int=1,
        connect_timeout:int = DEFAULT_CONNECT_TIMEOUT,
        web_scraping_api_read_timeout: int = DEFAULT_WEBSCRAPING_API_READ_TIMEOUT,
        extraction_api_read_timeout: int = DEFAULT_EXTRACTION_API_READ_TIMEOUT,
        screenshot_api_read_timeout: int = DEFAULT_SCREENSHOT_API_READ_TIMEOUT,

        # @deprecated
        read_timeout:int = DEFAULT_READ_TIMEOUT,
        default_read_timeout:int = DEFAULT_READ_TIMEOUT,
        reporter:Optional[Callable]=None,
        **kwargs
    ):
        if host[-1] == '/':  # remove last '/' if exists
            host = host[:-1]

        if 'distributed_mode' in kwargs:
            warnings.warn("distributed mode is deprecated and will be remove the next version -"
              " user should handle themself the session name based on the concurrency",
              DeprecationWarning,
              stacklevel=2
            )

        if 'brotli' in kwargs:
            warnings.warn("brotli arg is deprecated and will be remove the next version - "
                "brotli is disabled by default",
                DeprecationWarning,
                stacklevel=2
            )

        self.version = __version__
        self.host = host
        self.key = key
        self.verify = verify
        self.debug = debug
        self.connect_timeout = connect_timeout
        self.web_scraping_api_read_timeout = web_scraping_api_read_timeout
        self.screenshot_api_read_timeout = screenshot_api_read_timeout
        self.extraction_api_read_timeout = extraction_api_read_timeout
        self.monitoring_api_read_timeout = default_read_timeout
        self.default_read_timeout = default_read_timeout

        # @deprecated
        self.read_timeout = default_read_timeout

        self.max_concurrency = max_concurrency
        self.body_handler = ResponseBodyHandler(use_brotli=False)
        self.async_executor = ThreadPoolExecutor()
        self.http_session = None

        if not self.verify and not self.HOST.endswith('.local'):
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        if self.debug is True:
            http.client.HTTPConnection.debuglevel = 5

        if reporter is None:
            from .reporter import NoopReporter

            reporter = NoopReporter()

        self.reporter = Reporter(reporter)

    @property
    def ua(self) -> str:
        return 'ScrapflySDK/%s (Python %s, %s, %s)' % (
            self.version,
            platform.python_version(),
            platform.uname().system,
            platform.uname().machine
        )

    @cached_property
    def _http_handler(self):
        return partial(self.http_session.request if self.http_session else requests.request)

    @property
    def http(self):
        return self._http_handler

    def _scrape_request(self, scrape_config:ScrapeConfig):
        return {
            'method': scrape_config.method,
            'url': self.host + '/scrape',
            'data': scrape_config.body,
            'verify': self.verify,
            'timeout': (self.connect_timeout, self.web_scraping_api_read_timeout),
            'headers': {
                'content-type': scrape_config.headers['content-type'] if scrape_config.method in ['POST', 'PUT', 'PATCH'] else self.body_handler.content_type,
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
            'params': scrape_config.to_api_params(key=self.key)
        }
    
    def _screenshot_request(self, screenshot_config:ScreenshotConfig):
        return {
            'method': 'GET',
            'url': self.host + '/screenshot',
            'timeout': (self.connect_timeout, self.screenshot_api_read_timeout),
            'headers': {
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },            
            'params': screenshot_config.to_api_params(key=self.key)
        }        

    def _extraction_request(self, extraction_config:ExtractionConfig):
        headers = {
                'content-type': extraction_config.content_type,
                'accept-encoding': self.body_handler.content_encoding,
                'content-encoding': extraction_config.document_compression_format if extraction_config.document_compression_format else None,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
        }

        if extraction_config.document_compression_format:
            headers['content-encoding'] = extraction_config.document_compression_format.value

        return {
            'method': 'POST',
            'url': self.host + '/extraction',
            'data': extraction_config.body,
            'timeout': (self.connect_timeout, self.extraction_api_read_timeout),
            'headers': headers,
            'params': extraction_config.to_api_params(key=self.key)
        }


    def account(self) -> Union[str, Dict]:
        response = self._http_handler(
            method='GET',
            url=self.host + '/account',
            params={'key': self.key},
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')

    def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None):
        params = {'key': self.key, 'format': format}

        if period is not None:
            params['period'] = period

        if aggregation is not None:
            params['aggregation'] = ','.join(aggregation)

        response = self._http_handler(
            method='GET',
            url=self.host + '/scrape/monitoring/metrics',
            params=params,
            timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')

    def get_monitoring_target_metrics(
            self,
            domain:str,
            group_subdomain:bool=False,
            period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H,
            start:Optional[datetime.datetime]=None,
            end:Optional[datetime.datetime]=None,
    ):
        params = {
            'key': self.key,
            'domain': domain,
            'group_subdomain': group_subdomain
        }

        if (start is not None and end is None) or (start is None and end is not None):
            raise ValueError('You must provide both start and end date')

        if start is not None and end is not None:
            params['start'] = start.strftime(self.DATETIME_FORMAT)
            params['end'] = end.strftime(self.DATETIME_FORMAT)
            period = None

        params['period'] = period

        response = self._http_handler(
            method='GET',
            url=self.host + '/scrape/monitoring/metrics/target',
            timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
            params=params,
            verify=self.verify,
            headers={
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
        )

        response.raise_for_status()

        if self.body_handler.support(response.headers):
            return self.body_handler(response.content, response.headers['content-type'])

        return response.content.decode('utf-8')


    def resilient_scrape(
        self,
        scrape_config:ScrapeConfig,
        retry_on_errors:Set[Exception]={ScrapflyError},
        retry_on_status_code:Optional[List[int]]=None,
        tries: int = 5,
        delay: int = 20,
    ) -> ScrapeApiResponse:
        assert retry_on_errors is not None, 'Retry on error is None'
        assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()'

        @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay)
        def inner() -> ScrapeApiResponse:

            try:
                return self.scrape(scrape_config=scrape_config)
            except (UpstreamHttpClientError, UpstreamHttpServerError) as e:
                if retry_on_status_code is not None and e.api_response:
                    if e.api_response.upstream_status_code in retry_on_status_code:
                        raise e
                    else:
                        return e.api_response

                raise e

        return inner()

    def open(self):
        if self.http_session is None:
            self.http_session = Session()
            self.http_session.verify = self.verify
            self.http_session.timeout = (self.connect_timeout, self.default_read_timeout)
            self.http_session.params['key'] = self.key
            self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding
            self.http_session.headers['accept'] = self.body_handler.accept
            self.http_session.headers['user-agent'] = self.ua

    def close(self):
        self.http_session.close()
        self.http_session = None

    def __enter__(self) -> 'ScrapflyClient':
        self.open()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

    async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)

    async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None):
        if concurrency is None:
            concurrency = self.max_concurrency
        elif concurrency == self.CONCURRENCY_AUTO:
            concurrency = self.account()['subscription']['max_concurrency']

        loop = asyncio.get_running_loop()
        processing_tasks = []
        results = []
        processed_tasks = 0
        expected_tasks = len(scrape_configs)

        def scrape_done_callback(task:Task):
            nonlocal processed_tasks

            try:
                if task.cancelled() is True:
                    return

                error = task.exception()

                if error is not None:
                    results.append(error)
                else:
                    results.append(task.result())
            finally:
                processing_tasks.remove(task)
                processed_tasks += 1

        while scrape_configs or results or processing_tasks:
            logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

            if scrape_configs:
                if len(processing_tasks) < concurrency:
                    # @todo handle backpressure
                    for _ in range(0, concurrency - len(processing_tasks)):
                        try:
                            scrape_config = scrape_configs.pop()
                        except:
                            break

                        scrape_config.raise_on_upstream_error = False
                        task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop))
                        processing_tasks.append(task)
                        task.add_done_callback(scrape_done_callback)

            for _ in results:
                result = results.pop()
                yield result

            await asyncio.sleep(.5)

        logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse:
        """
        Scrape a website
        :param scrape_config: ScrapeConfig
        :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration
        :return: ScrapeApiResponse

        If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED',
            'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds',
            'retryable': False,
            'http_code': 422,
            'links': {
                'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED'
            }
        }
        """

        try:
            logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url))
            request_data = self._scrape_request(scrape_config=scrape_config)
            response = self._http_handler(**request_data)
            scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config)

            self.reporter.report(scrape_api_response=scrape_api_response)

            return scrape_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse:
        """
        Take a screenshot
        :param screenshot_config: ScrapeConfig
        :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration
        :return: str

        If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT',
            'message': 'For some reason we were unable to take the screenshot',
            'http_code': 422,
            'links': {
                'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT'
            }
        }
        """

        try:
            logger.debug('--> %s Screenshoting' % (screenshot_config.url))
            request_data = self._screenshot_request(screenshot_config=screenshot_config)
            response = self._http_handler(**request_data)
            screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config)
            return screenshot_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse:
        if loop is None:
            loop = asyncio.get_running_loop()

        return await loop.run_in_executor(self.async_executor, self.extract, extraction_config)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse:
        """
        Extract structured data from text content
        :param extraction_config: ExtractionConfig
        :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration
        :return: str

        If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error.
        If the error is not none, you will get the following structure for example

        'error': {
            'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED',
            'message': 'The content type of the response is not supported for extraction',
            'http_code': 422,
            'links': {
                'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED'
            }
        }
        """

        try:
            logger.debug('--> %s Extracting data from' % (extraction_config.content_type))
            request_data = self._extraction_request(extraction_config=extraction_config)
            response = self._http_handler(**request_data)
            extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config)
            return extraction_api_response
        except BaseException as e:
            self.reporter.report(error=e)

            if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
                return e.api_response

            raise e

    def _handle_response(self, response:Response, scrape_config:ScrapeConfig) -> ScrapeApiResponse:
        try:
            api_response = self._handle_api_response(
                response=response,
                scrape_config=scrape_config,
                raise_on_upstream_error=scrape_config.raise_on_upstream_error
            )

            if scrape_config.method == 'HEAD':
                logger.debug('<-- [%s %s] %s | %ss' % (
                    api_response.response.status_code,
                    api_response.response.reason,
                    api_response.response.request.url,
                    0
                ))
            else:
                logger.debug('<-- [%s %s] %s | %ss' % (
                    api_response.result['result']['status_code'],
                    api_response.result['result']['reason'],
                    api_response.result['config']['url'],
                    api_response.result['result']['duration'])
                )

                logger.debug('Log url: %s' % api_response.result['result']['log_url'])

            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise

    def _handle_screenshot_response(self, response:Response, screenshot_config:ScreenshotConfig) -> ScreenshotApiResponse:    
        try:
            api_response = self._handle_screenshot_api_response(
                response=response,
                screenshot_config=screenshot_config,
                raise_on_upstream_error=screenshot_config.raise_on_upstream_error
            )
            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise         

    def _handle_extraction_response(self, response:Response, extraction_config:ExtractionConfig) -> ExtractionApiResponse:
        try:
            api_response = self._handle_extraction_api_response(
                response=response,
                extraction_config=extraction_config,
                raise_on_upstream_error=extraction_config.raise_on_upstream_error
            )
            return api_response
        except UpstreamHttpError as e:
            logger.critical(e.api_response.error_message)
            raise
        except HttpError as e:
            if e.api_response is not None:
                logger.critical(e.api_response.error_message)
            else:
                logger.critical(e.message)
            raise
        except ScrapflyError as e:
            logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url))
            raise    

    def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None):
        """
        Save a screenshot from a screenshot API response
        :param api_response: ScreenshotApiResponse
        :param name: str - name of the screenshot to save as
        :param path: Optional[str]
        """

        if screenshot_api_response.screenshot_success is not True:
            raise RuntimeError('Screenshot was not successful')

        if not screenshot_api_response.image:
            raise RuntimeError('Screenshot binary does not exist')

        content = screenshot_api_response.image
        extension_name = screenshot_api_response.metadata['extension_name']

        if path:
            os.makedirs(path, exist_ok=True)
            file_path = os.path.join(path, f'{name}.{extension_name}')
        else:
            file_path = f'{name}.{extension_name}'

        if isinstance(content, bytes):
            content = BytesIO(content)

        with open(file_path, 'wb') as f:
            shutil.copyfileobj(content, f, length=131072)

    def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None):
        """
        Save a screenshot from a scrape result
        :param api_response: ScrapeApiResponse
        :param name: str - name of the screenshot given in the scrape config
        :param path: Optional[str]
        """

        if not api_response.scrape_result['screenshots']:
            raise RuntimeError('Screenshot %s do no exists' % name)

        try:
            api_response.scrape_result['screenshots'][name]
        except KeyError:
            raise RuntimeError('Screenshot %s do no exists' % name)

        screenshot_response = self._http_handler(
            method='GET',
            url=api_response.scrape_result['screenshots'][name]['url'],
            params={'key': self.key},
            verify=self.verify
        )

        screenshot_response.raise_for_status()

        if not name.endswith('.jpg'):
            name += '.jpg'

        api_response.sink(path=path, name=name, content=screenshot_response.content)

    def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str:
        scrape_result = api_response.result['result']
        scrape_config = api_response.result['config']

        file_content = content or scrape_result['content']
        file_path = None
        file_extension = None

        if name:
            name_parts = name.split('.')
            if len(name_parts) > 1:
                file_extension = name_parts[-1]

        if not file:
            if file_extension is None:
                try:
                    mime_type = scrape_result['response_headers']['content-type']
                except KeyError:
                    mime_type = 'application/octet-stream'

                if ';' in mime_type:
                    mime_type = mime_type.split(';')[0]

                file_extension = '.' + mime_type.split('/')[1]

            if not name:
                name = scrape_config['url'].split('/')[-1]

            if name.find(file_extension) == -1:
                name += file_extension

            file_path = path + '/' + name if path else name

            if file_path == file_extension:
                url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-')

                if url[-1] == '-':
                    url = url[:-1]

                url += file_extension

                file_path = url

            file = open(file_path, 'wb')

        if isinstance(file_content, str):
            file_content = BytesIO(file_content.encode('utf-8'))
        elif isinstance(file_content, bytes):
            file_content = BytesIO(file_content)

        file_content.seek(0)
        with file as f:
            shutil.copyfileobj(file_content, f, length=131072)

        logger.info('file %s created' % file_path)
        return file_path

    def _handle_scrape_large_objects(
        self,
        callback_url:str,
        format: Literal['clob', 'blob']
    ) -> Tuple[Union[BytesIO, str], str]:
        if format not in ['clob', 'blob']:
            raise ContentError('Large objects handle can handles format format [blob, clob], given: %s' % format)

        response = self._http_handler(**{
            'method': 'GET',
            'url': callback_url,
            'verify': self.verify,
            'timeout': (self.connect_timeout, self.default_read_timeout),
            'headers': {
                'accept-encoding': self.body_handler.content_encoding,
                'accept': self.body_handler.accept,
                'user-agent': self.ua
            },
            'params': {'key': self.key}
        })

        if self.body_handler.support(headers=response.headers):
            content = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            content = response.content

        if format == 'clob':
            return content.decode('utf-8'), 'text'

        return BytesIO(content), 'binary'

    def _handle_api_response(
        self,
        response: Response,
        scrape_config:ScrapeConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ScrapeApiResponse:

        if scrape_config.method == 'HEAD':
            body = None
        else:
            if self.body_handler.support(headers=response.headers):
                body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
            else:
                body = response.content.decode('utf-8')

        api_response:ScrapeApiResponse = ScrapeApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            scrape_config=scrape_config,
            large_object_handler=self._handle_scrape_large_objects
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

    def _handle_screenshot_api_response(
        self,
        response: Response,
        screenshot_config:ScreenshotConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ScreenshotApiResponse:

        if self.body_handler.support(headers=response.headers):
            body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            body = {'result': response.content}

        api_response:ScreenshotApiResponse = ScreenshotApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            screenshot_config=screenshot_config
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

    def _handle_extraction_api_response(
        self,
        response: Response,
        extraction_config:ExtractionConfig,
        raise_on_upstream_error: Optional[bool] = True
    ) -> ExtractionApiResponse:
        
        if self.body_handler.support(headers=response.headers):
            body = self.body_handler(content=response.content, content_type=response.headers['content-type'])
        else:
            body = response.content.decode('utf-8')

        api_response:ExtractionApiResponse = ExtractionApiResponse(
            response=response,
            request=response.request,
            api_result=body,
            extraction_config=extraction_config
        )

        api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error)

        return api_response

    @backoff.on_exception(backoff.expo, exception=ConnectionError, max_tries=5)
    def start_crawl(self, crawler_config: CrawlerConfig) -> CrawlerStartResponse:
        """
        Start a crawler job

        :param crawler_config: CrawlerConfig
        :return: CrawlerStartResponse with UUID and initial status

        Example:
            ```python
            from scrapfly import ScrapflyClient, CrawlerConfig

            client = ScrapflyClient(key='YOUR_API_KEY')
            config = CrawlerConfig(
                url='https://example.com',
                page_limit=100,
                max_depth=3
            )

            response = client.start_crawl(config)
            print(f"Crawler started: {response.uuid}")
            ```
        """
        # Get crawler config params (without key)
        body_params = crawler_config.to_api_params()

        # API key must be passed as query parameter, not in body
        query_params = {'key': self.key}

        timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

        url = f'{self.host}/crawl'
        logger.debug(f"Crawler API POST {url}?key=***")
        logger.debug(f"Crawler API body: {body_params}")

        response = self._http_handler(
            method='POST',
            url=url,
            params=query_params,  # key as query param
            json=body_params,      # config in body
            timeout=timeout,
            headers={'User-Agent': self.ua},
            verify=self.verify
        )

        if response.status_code not in (200, 201):
            # Log error details for debugging
            try:
                error_detail = response.json()
            except:
                error_detail = response.text
            logger.debug(f"Crawler API error ({response.status_code}): {error_detail}")
            self._handle_crawler_error_response(response)

        result = response.json()
        return CrawlerStartResponse(result)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def get_crawl_status(self, uuid: str) -> CrawlerStatusResponse:
        """
        Get crawler job status

        :param uuid: Crawler job UUID
        :return: CrawlerStatusResponse with progress information

        Example:
            ```python
            status = client.get_crawl_status(uuid)
            print(f"Status: {status.status}")
            print(f"Progress: {status.progress_pct:.1f}%")
            print(f"Crawled: {status.urls_crawled}/{status.urls_discovered}")

            if status.is_complete:
                print("Crawl completed!")
            ```
        """
        timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

        response = self._http_handler(
            method='GET',
            url=f'{self.host}/crawl/{uuid}/status',
            params={'key': self.key},  # key as query param (already correct)
            timeout=timeout,
            headers={'User-Agent': self.ua},
            verify=self.verify
        )

        if response.status_code != 200:
            self._handle_crawler_error_response(response)

        result = response.json()
        return CrawlerStatusResponse(result)

    def cancel_crawl(self, crawl_uuid: str) -> bool:
        """
        Cancel a running crawler job

        :param crawl_uuid: Crawler job UUID to cancel
        :return: True if cancelled successfully

        Example:
            ```python
            # Start a crawl
            crawl = client.start_crawl(config)

            # Cancel it
            client.cancel_crawl(crawl.uuid)
            ```
        """
        timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

        response = self._http_handler(
            method='DELETE',
            url=f'{self.host}/crawl/{crawl_uuid}',
            params={'key': self.key},
            timeout=timeout,
            headers={'User-Agent': self.ua},
            verify=self.verify
        )

        if response.status_code not in (200, 204):
            self._handle_crawler_error_response(response)

        return True

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def get_crawl_artifact(
        self,
        uuid: str,
        artifact_type: str = 'warc'
    ) -> CrawlerArtifactResponse:
        """
        Download crawler job artifact

        :param uuid: Crawler job UUID
        :param artifact_type: Artifact type ('warc' or 'har')
        :return: CrawlerArtifactResponse with WARC data and parsing utilities

        Example:
            ```python
            # Wait for crawl to complete
            while True:
                status = client.get_crawl_status(uuid)
                if status.is_complete:
                    break
                time.sleep(5)

            # Download artifact
            artifact = client.get_crawl_artifact(uuid)

            # Easy mode: get all pages
            pages = artifact.get_pages()
            for page in pages:
                print(f"{page['url']}: {page['status_code']}")

            # Memory-efficient: iterate
            for record in artifact.iter_responses():
                process(record.content)

            # Save to file
            artifact.save('crawl.warc.gz')
            ```
        """
        timeout = (self.connect_timeout, 300)  # 5 minutes for large downloads

        response = self._http_handler(
            method='GET',
            url=f'{self.host}/crawl/{uuid}/artifact',
            params={
                'key': self.key,
                'type': artifact_type
            },
            timeout=timeout,
            headers={'User-Agent': self.ua},
            verify=self.verify
        )

        if response.status_code != 200:
            self._handle_crawler_error_response(response)

        return CrawlerArtifactResponse(response.content, artifact_type=artifact_type)

    @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
    def get_crawl_contents(
        self,
        uuid: str,
        format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html'
    ) -> Dict[str, Any]:
        """
        Get crawl contents in a specific format

        Retrieves extracted content from crawled pages in the format(s) specified
        in your crawl configuration (via content_formats parameter).

        :param uuid: Crawler job UUID
        :param format: Content format - 'html', 'clean_html', 'markdown', 'json', 'text',
                      'extracted_data', 'page_metadata'
        :return: Dictionary with format {"contents": {url: content, ...}, "links": {...}}

        Example:
            ```python
            # Get all content in markdown format
            result = client.get_crawl_contents(uuid, format='markdown')
            contents = result['contents']

            # Access specific URL
            for url, content in contents.items():
                print(f"{url}: {len(content)} chars")
            ```
        """
        timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

        params = {
            'key': self.key,
            'format': format
        }

        response = self._http_handler(
            method='GET',
            url=f'{self.host}/crawl/{uuid}/contents',
            params=params,
            timeout=timeout,
            headers={'User-Agent': self.ua},
            verify=self.verify
        )

        if response.status_code != 200:
            self._handle_crawler_error_response(response)

        return response.json()

    def _handle_crawler_error_response(self, response: Response):
        """Handle error responses from Crawler API"""
        try:
            error_data = response.json()
            error_msg = error_data.get('message', 'Unknown error')
            error_code = error_data.get('code', 'ERR::CRAWLER::UNKNOWN')
        except Exception:
            error_msg = response.text
            error_code = 'ERR::CRAWLER::UNKNOWN'

        raise HttpError(
            message=f"Crawler API error ({response.status_code}): {error_msg}",
            code=error_code,
            http_status_code=response.status_code,
            request=response.request,
            response=response
        )

Class variables

var CONCURRENCY_AUTO
var DATETIME_FORMAT
var DEFAULT_CONNECT_TIMEOUT
var DEFAULT_CRAWLER_API_READ_TIMEOUT
var DEFAULT_EXTRACTION_API_READ_TIMEOUT
var DEFAULT_READ_TIMEOUT
var DEFAULT_SCREENSHOT_API_READ_TIMEOUT
var DEFAULT_WEBSCRAPING_API_READ_TIMEOUT
var HOST
var brotli : bool
var connect_timeout : int
var debug : bool
var default_read_timeout : int
var distributed_mode : bool
var extraction_api_read_timeout : int
var host : str
var key : str
var max_concurrency : int
var monitoring_api_read_timeout : int
var read_timeout : int
var reporter : scrapfly.reporter.Reporter
var screenshot_api_read_timeout : int
var verify : bool
var version : str
var web_scraping_api_read_timeout : int

Instance variables

prop http
Expand source code
@property
def http(self):
    return self._http_handler
prop ua : str
Expand source code
@property
def ua(self) -> str:
    return 'ScrapflySDK/%s (Python %s, %s, %s)' % (
        self.version,
        platform.python_version(),
        platform.uname().system,
        platform.uname().machine
    )

Methods

def account(self) ‑> str | Dict
Expand source code
def account(self) -> Union[str, Dict]:
    response = self._http_handler(
        method='GET',
        url=self.host + '/account',
        params={'key': self.key},
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content, response.headers['content-type'])

    return response.content.decode('utf-8')
async def async_extraction(self,
extraction_config: ExtractionConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ExtractionApiResponse
Expand source code
async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.extract, extraction_config)
async def async_scrape(self,
scrape_config: ScrapeConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ScrapeApiResponse
Expand source code
async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config)
async def async_screenshot(self,
screenshot_config: ScreenshotConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ScreenshotApiResponse
Expand source code
async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse:
    if loop is None:
        loop = asyncio.get_running_loop()

    return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config)
def cancel_crawl(self, crawl_uuid: str) ‑> bool
Expand source code
def cancel_crawl(self, crawl_uuid: str) -> bool:
    """
    Cancel a running crawler job

    :param crawl_uuid: Crawler job UUID to cancel
    :return: True if cancelled successfully

    Example:
        ```python
        # Start a crawl
        crawl = client.start_crawl(config)

        # Cancel it
        client.cancel_crawl(crawl.uuid)
        ```
    """
    timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

    response = self._http_handler(
        method='DELETE',
        url=f'{self.host}/crawl/{crawl_uuid}',
        params={'key': self.key},
        timeout=timeout,
        headers={'User-Agent': self.ua},
        verify=self.verify
    )

    if response.status_code not in (200, 204):
        self._handle_crawler_error_response(response)

    return True

Cancel a running crawler job

:param crawl_uuid: Crawler job UUID to cancel :return: True if cancelled successfully

Example

# Start a crawl
crawl = client.start_crawl(config)

# Cancel it
client.cancel_crawl(crawl.uuid)
def close(self)
Expand source code
def close(self):
    self.http_session.close()
    self.http_session = None
async def concurrent_scrape(self,
scrape_configs: List[ScrapeConfig],
concurrency: int | None = None)
Expand source code
async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None):
    if concurrency is None:
        concurrency = self.max_concurrency
    elif concurrency == self.CONCURRENCY_AUTO:
        concurrency = self.account()['subscription']['max_concurrency']

    loop = asyncio.get_running_loop()
    processing_tasks = []
    results = []
    processed_tasks = 0
    expected_tasks = len(scrape_configs)

    def scrape_done_callback(task:Task):
        nonlocal processed_tasks

        try:
            if task.cancelled() is True:
                return

            error = task.exception()

            if error is not None:
                results.append(error)
            else:
                results.append(task.result())
        finally:
            processing_tasks.remove(task)
            processed_tasks += 1

    while scrape_configs or results or processing_tasks:
        logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))

        if scrape_configs:
            if len(processing_tasks) < concurrency:
                # @todo handle backpressure
                for _ in range(0, concurrency - len(processing_tasks)):
                    try:
                        scrape_config = scrape_configs.pop()
                    except:
                        break

                    scrape_config.raise_on_upstream_error = False
                    task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop))
                    processing_tasks.append(task)
                    task.add_done_callback(scrape_done_callback)

        for _ in results:
            result = results.pop()
            yield result

        await asyncio.sleep(.5)

    logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks)))
def extract(self,
extraction_config: ExtractionConfig,
no_raise: bool = False) ‑> ExtractionApiResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse:
    """
    Extract structured data from text content
    :param extraction_config: ExtractionConfig
    :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration
    :return: str

    If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error.
    If the error is not none, you will get the following structure for example

    'error': {
        'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED',
        'message': 'The content type of the response is not supported for extraction',
        'http_code': 422,
        'links': {
            'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED'
        }
    }
    """

    try:
        logger.debug('--> %s Extracting data from' % (extraction_config.content_type))
        request_data = self._extraction_request(extraction_config=extraction_config)
        response = self._http_handler(**request_data)
        extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config)
        return extraction_api_response
    except BaseException as e:
        self.reporter.report(error=e)

        if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
            return e.api_response

        raise e

Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str

If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } }

def get_crawl_artifact(self, uuid: str, artifact_type: str = 'warc') ‑> CrawlerArtifactResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def get_crawl_artifact(
    self,
    uuid: str,
    artifact_type: str = 'warc'
) -> CrawlerArtifactResponse:
    """
    Download crawler job artifact

    :param uuid: Crawler job UUID
    :param artifact_type: Artifact type ('warc' or 'har')
    :return: CrawlerArtifactResponse with WARC data and parsing utilities

    Example:
        ```python
        # Wait for crawl to complete
        while True:
            status = client.get_crawl_status(uuid)
            if status.is_complete:
                break
            time.sleep(5)

        # Download artifact
        artifact = client.get_crawl_artifact(uuid)

        # Easy mode: get all pages
        pages = artifact.get_pages()
        for page in pages:
            print(f"{page['url']}: {page['status_code']}")

        # Memory-efficient: iterate
        for record in artifact.iter_responses():
            process(record.content)

        # Save to file
        artifact.save('crawl.warc.gz')
        ```
    """
    timeout = (self.connect_timeout, 300)  # 5 minutes for large downloads

    response = self._http_handler(
        method='GET',
        url=f'{self.host}/crawl/{uuid}/artifact',
        params={
            'key': self.key,
            'type': artifact_type
        },
        timeout=timeout,
        headers={'User-Agent': self.ua},
        verify=self.verify
    )

    if response.status_code != 200:
        self._handle_crawler_error_response(response)

    return CrawlerArtifactResponse(response.content, artifact_type=artifact_type)

Download crawler job artifact

:param uuid: Crawler job UUID :param artifact_type: Artifact type ('warc' or 'har') :return: CrawlerArtifactResponse with WARC data and parsing utilities

Example

# Wait for crawl to complete
while True:
    status = client.get_crawl_status(uuid)
    if status.is_complete:
        break
    time.sleep(5)

# Download artifact
artifact = client.get_crawl_artifact(uuid)

# Easy mode: get all pages
pages = artifact.get_pages()
for page in pages:
    print(f"{page['url']}: {page['status_code']}")

# Memory-efficient: iterate
for record in artifact.iter_responses():
    process(record.content)

# Save to file
artifact.save('crawl.warc.gz')
def get_crawl_contents(self,
uuid: str,
format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html') ‑> Dict[str, Any]
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def get_crawl_contents(
    self,
    uuid: str,
    format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html'
) -> Dict[str, Any]:
    """
    Get crawl contents in a specific format

    Retrieves extracted content from crawled pages in the format(s) specified
    in your crawl configuration (via content_formats parameter).

    :param uuid: Crawler job UUID
    :param format: Content format - 'html', 'clean_html', 'markdown', 'json', 'text',
                  'extracted_data', 'page_metadata'
    :return: Dictionary with format {"contents": {url: content, ...}, "links": {...}}

    Example:
        ```python
        # Get all content in markdown format
        result = client.get_crawl_contents(uuid, format='markdown')
        contents = result['contents']

        # Access specific URL
        for url, content in contents.items():
            print(f"{url}: {len(content)} chars")
        ```
    """
    timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

    params = {
        'key': self.key,
        'format': format
    }

    response = self._http_handler(
        method='GET',
        url=f'{self.host}/crawl/{uuid}/contents',
        params=params,
        timeout=timeout,
        headers={'User-Agent': self.ua},
        verify=self.verify
    )

    if response.status_code != 200:
        self._handle_crawler_error_response(response)

    return response.json()

Get crawl contents in a specific format

Retrieves extracted content from crawled pages in the format(s) specified in your crawl configuration (via content_formats parameter).

:param uuid: Crawler job UUID :param format: Content format - 'html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata' :return: Dictionary with format {"contents": {url: content, …}, "links": {…}}

Example

# Get all content in markdown format
result = client.get_crawl_contents(uuid, format='markdown')
contents = result['contents']

# Access specific URL
for url, content in contents.items():
    print(f"{url}: {len(content)} chars")
def get_crawl_status(self, uuid: str) ‑> CrawlerStatusResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def get_crawl_status(self, uuid: str) -> CrawlerStatusResponse:
    """
    Get crawler job status

    :param uuid: Crawler job UUID
    :return: CrawlerStatusResponse with progress information

    Example:
        ```python
        status = client.get_crawl_status(uuid)
        print(f"Status: {status.status}")
        print(f"Progress: {status.progress_pct:.1f}%")
        print(f"Crawled: {status.urls_crawled}/{status.urls_discovered}")

        if status.is_complete:
            print("Crawl completed!")
        ```
    """
    timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

    response = self._http_handler(
        method='GET',
        url=f'{self.host}/crawl/{uuid}/status',
        params={'key': self.key},  # key as query param (already correct)
        timeout=timeout,
        headers={'User-Agent': self.ua},
        verify=self.verify
    )

    if response.status_code != 200:
        self._handle_crawler_error_response(response)

    result = response.json()
    return CrawlerStatusResponse(result)

Get crawler job status

:param uuid: Crawler job UUID :return: CrawlerStatusResponse with progress information

Example

status = client.get_crawl_status(uuid)
print(f"Status: {status.status}")
print(f"Progress: {status.progress_pct:.1f}%")
print(f"Crawled: {status.urls_crawled}/{status.urls_discovered}")

if status.is_complete:
    print("Crawl completed!")
def get_monitoring_metrics(self,
format: str = 'structured',
period: str | None = None,
aggregation: List[Literal['account', 'project', 'target']] | None = None)
Expand source code
def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None):
    params = {'key': self.key, 'format': format}

    if period is not None:
        params['period'] = period

    if aggregation is not None:
        params['aggregation'] = ','.join(aggregation)

    response = self._http_handler(
        method='GET',
        url=self.host + '/scrape/monitoring/metrics',
        params=params,
        timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content, response.headers['content-type'])

    return response.content.decode('utf-8')
def get_monitoring_target_metrics(self,
domain: str,
group_subdomain: bool = False,
period: Literal['subscription', 'last7d', 'last24h', 'last1h', 'last5m'] | None = 'last24h',
start: datetime.datetime | None = None,
end: datetime.datetime | None = None)
Expand source code
def get_monitoring_target_metrics(
        self,
        domain:str,
        group_subdomain:bool=False,
        period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H,
        start:Optional[datetime.datetime]=None,
        end:Optional[datetime.datetime]=None,
):
    params = {
        'key': self.key,
        'domain': domain,
        'group_subdomain': group_subdomain
    }

    if (start is not None and end is None) or (start is None and end is not None):
        raise ValueError('You must provide both start and end date')

    if start is not None and end is not None:
        params['start'] = start.strftime(self.DATETIME_FORMAT)
        params['end'] = end.strftime(self.DATETIME_FORMAT)
        period = None

    params['period'] = period

    response = self._http_handler(
        method='GET',
        url=self.host + '/scrape/monitoring/metrics/target',
        timeout=(self.connect_timeout, self.monitoring_api_read_timeout),
        params=params,
        verify=self.verify,
        headers={
            'accept-encoding': self.body_handler.content_encoding,
            'accept': self.body_handler.accept,
            'user-agent': self.ua
        },
    )

    response.raise_for_status()

    if self.body_handler.support(response.headers):
        return self.body_handler(response.content, response.headers['content-type'])

    return response.content.decode('utf-8')
def open(self)
Expand source code
def open(self):
    if self.http_session is None:
        self.http_session = Session()
        self.http_session.verify = self.verify
        self.http_session.timeout = (self.connect_timeout, self.default_read_timeout)
        self.http_session.params['key'] = self.key
        self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding
        self.http_session.headers['accept'] = self.body_handler.accept
        self.http_session.headers['user-agent'] = self.ua
def resilient_scrape(self,
scrape_config: ScrapeConfig,
retry_on_errors: Set[Exception] = {<class 'scrapfly.errors.ScrapflyError'>},
retry_on_status_code: List[int] | None = None,
tries: int = 5,
delay: int = 20) ‑> ScrapeApiResponse
Expand source code
def resilient_scrape(
    self,
    scrape_config:ScrapeConfig,
    retry_on_errors:Set[Exception]={ScrapflyError},
    retry_on_status_code:Optional[List[int]]=None,
    tries: int = 5,
    delay: int = 20,
) -> ScrapeApiResponse:
    assert retry_on_errors is not None, 'Retry on error is None'
    assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()'

    @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay)
    def inner() -> ScrapeApiResponse:

        try:
            return self.scrape(scrape_config=scrape_config)
        except (UpstreamHttpClientError, UpstreamHttpServerError) as e:
            if retry_on_status_code is not None and e.api_response:
                if e.api_response.upstream_status_code in retry_on_status_code:
                    raise e
                else:
                    return e.api_response

            raise e

    return inner()
def save_scrape_screenshot(self,
api_response: ScrapeApiResponse,
name: str,
path: str | None = None)
Expand source code
def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None):
    """
    Save a screenshot from a scrape result
    :param api_response: ScrapeApiResponse
    :param name: str - name of the screenshot given in the scrape config
    :param path: Optional[str]
    """

    if not api_response.scrape_result['screenshots']:
        raise RuntimeError('Screenshot %s do no exists' % name)

    try:
        api_response.scrape_result['screenshots'][name]
    except KeyError:
        raise RuntimeError('Screenshot %s do no exists' % name)

    screenshot_response = self._http_handler(
        method='GET',
        url=api_response.scrape_result['screenshots'][name]['url'],
        params={'key': self.key},
        verify=self.verify
    )

    screenshot_response.raise_for_status()

    if not name.endswith('.jpg'):
        name += '.jpg'

    api_response.sink(path=path, name=name, content=screenshot_response.content)

Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str]

def save_screenshot(self,
screenshot_api_response: ScreenshotApiResponse,
name: str,
path: str | None = None)
Expand source code
def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None):
    """
    Save a screenshot from a screenshot API response
    :param api_response: ScreenshotApiResponse
    :param name: str - name of the screenshot to save as
    :param path: Optional[str]
    """

    if screenshot_api_response.screenshot_success is not True:
        raise RuntimeError('Screenshot was not successful')

    if not screenshot_api_response.image:
        raise RuntimeError('Screenshot binary does not exist')

    content = screenshot_api_response.image
    extension_name = screenshot_api_response.metadata['extension_name']

    if path:
        os.makedirs(path, exist_ok=True)
        file_path = os.path.join(path, f'{name}.{extension_name}')
    else:
        file_path = f'{name}.{extension_name}'

    if isinstance(content, bytes):
        content = BytesIO(content)

    with open(file_path, 'wb') as f:
        shutil.copyfileobj(content, f, length=131072)

Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str]

def scrape(self,
scrape_config: ScrapeConfig,
no_raise: bool = False) ‑> ScrapeApiResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse:
    """
    Scrape a website
    :param scrape_config: ScrapeConfig
    :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration
    :return: ScrapeApiResponse

    If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error.
    If the error is not none, you will get the following structure for example

    'error': {
        'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED',
        'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds',
        'retryable': False,
        'http_code': 422,
        'links': {
            'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED'
        }
    }
    """

    try:
        logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url))
        request_data = self._scrape_request(scrape_config=scrape_config)
        response = self._http_handler(**request_data)
        scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config)

        self.reporter.report(scrape_api_response=scrape_api_response)

        return scrape_api_response
    except BaseException as e:
        self.reporter.report(error=e)

        if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
            return e.api_response

        raise e

Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse

If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } }

def screenshot(self,
screenshot_config: ScreenshotConfig,
no_raise: bool = False) ‑> ScreenshotApiResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5)
def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse:
    """
    Take a screenshot
    :param screenshot_config: ScrapeConfig
    :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration
    :return: str

    If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error.
    If the error is not none, you will get the following structure for example

    'error': {
        'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT',
        'message': 'For some reason we were unable to take the screenshot',
        'http_code': 422,
        'links': {
            'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT'
        }
    }
    """

    try:
        logger.debug('--> %s Screenshoting' % (screenshot_config.url))
        request_data = self._screenshot_request(screenshot_config=screenshot_config)
        response = self._http_handler(**request_data)
        screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config)
        return screenshot_api_response
    except BaseException as e:
        self.reporter.report(error=e)

        if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None:
            return e.api_response

        raise e

Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str

If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example

'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } }

def sink(self,
api_response: ScrapeApiResponse,
content: str | bytes | None = None,
path: str | None = None,
name: str | None = None,
file:  | _io.BytesIO | None = None) ‑> str
Expand source code
def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str:
    scrape_result = api_response.result['result']
    scrape_config = api_response.result['config']

    file_content = content or scrape_result['content']
    file_path = None
    file_extension = None

    if name:
        name_parts = name.split('.')
        if len(name_parts) > 1:
            file_extension = name_parts[-1]

    if not file:
        if file_extension is None:
            try:
                mime_type = scrape_result['response_headers']['content-type']
            except KeyError:
                mime_type = 'application/octet-stream'

            if ';' in mime_type:
                mime_type = mime_type.split(';')[0]

            file_extension = '.' + mime_type.split('/')[1]

        if not name:
            name = scrape_config['url'].split('/')[-1]

        if name.find(file_extension) == -1:
            name += file_extension

        file_path = path + '/' + name if path else name

        if file_path == file_extension:
            url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-')

            if url[-1] == '-':
                url = url[:-1]

            url += file_extension

            file_path = url

        file = open(file_path, 'wb')

    if isinstance(file_content, str):
        file_content = BytesIO(file_content.encode('utf-8'))
    elif isinstance(file_content, bytes):
        file_content = BytesIO(file_content)

    file_content.seek(0)
    with file as f:
        shutil.copyfileobj(file_content, f, length=131072)

    logger.info('file %s created' % file_path)
    return file_path
def start_crawl(self,
crawler_config: CrawlerConfig) ‑> CrawlerStartResponse
Expand source code
@backoff.on_exception(backoff.expo, exception=ConnectionError, max_tries=5)
def start_crawl(self, crawler_config: CrawlerConfig) -> CrawlerStartResponse:
    """
    Start a crawler job

    :param crawler_config: CrawlerConfig
    :return: CrawlerStartResponse with UUID and initial status

    Example:
        ```python
        from scrapfly import ScrapflyClient, CrawlerConfig

        client = ScrapflyClient(key='YOUR_API_KEY')
        config = CrawlerConfig(
            url='https://example.com',
            page_limit=100,
            max_depth=3
        )

        response = client.start_crawl(config)
        print(f"Crawler started: {response.uuid}")
        ```
    """
    # Get crawler config params (without key)
    body_params = crawler_config.to_api_params()

    # API key must be passed as query parameter, not in body
    query_params = {'key': self.key}

    timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT)

    url = f'{self.host}/crawl'
    logger.debug(f"Crawler API POST {url}?key=***")
    logger.debug(f"Crawler API body: {body_params}")

    response = self._http_handler(
        method='POST',
        url=url,
        params=query_params,  # key as query param
        json=body_params,      # config in body
        timeout=timeout,
        headers={'User-Agent': self.ua},
        verify=self.verify
    )

    if response.status_code not in (200, 201):
        # Log error details for debugging
        try:
            error_detail = response.json()
        except:
            error_detail = response.text
        logger.debug(f"Crawler API error ({response.status_code}): {error_detail}")
        self._handle_crawler_error_response(response)

    result = response.json()
    return CrawlerStartResponse(result)

Start a crawler job

:param crawler_config: CrawlerConfig :return: CrawlerStartResponse with UUID and initial status

Example

from scrapfly import ScrapflyClient, CrawlerConfig

client = ScrapflyClient(key='YOUR_API_KEY')
config = CrawlerConfig(
    url='https://example.com',
    page_limit=100,
    max_depth=3
)

response = client.start_crawl(config)
print(f"Crawler started: {response.uuid}")
class ScrapflyCrawlerError (message: str,
code: str,
http_status_code: int,
resource: str | None = None,
is_retryable: bool = False,
retry_delay: int | None = None,
retry_times: int | None = None,
documentation_url: str | None = None,
api_response: ForwardRef('ApiResponse') | None = None)
Expand source code
class ScrapflyCrawlerError(CrawlerError):
    """Exception raised when a crawler job fails or is cancelled"""
    pass

Exception raised when a crawler job fails or is cancelled

Ancestors

class ScrapflyError (message: str,
code: str,
http_status_code: int,
resource: str | None = None,
is_retryable: bool = False,
retry_delay: int | None = None,
retry_times: int | None = None,
documentation_url: str | None = None,
api_response: ForwardRef('ApiResponse') | None = None)
Expand source code
class ScrapflyError(Exception):
    KIND_HTTP_BAD_RESPONSE = 'HTTP_BAD_RESPONSE'
    KIND_SCRAPFLY_ERROR = 'SCRAPFLY_ERROR'

    RESOURCE_PROXY = 'PROXY'
    RESOURCE_THROTTLE = 'THROTTLE'
    RESOURCE_SCRAPE = 'SCRAPE'
    RESOURCE_ASP = 'ASP'
    RESOURCE_SCHEDULE = 'SCHEDULE'
    RESOURCE_WEBHOOK = 'WEBHOOK'
    RESOURCE_SESSION = 'SESSION'

    def __init__(
        self,
        message: str,
        code: str,
        http_status_code: int,
        resource: Optional[str]=None,
        is_retryable: bool = False,
        retry_delay: Optional[int] = None,
        retry_times: Optional[int] = None,
        documentation_url: Optional[str] = None,
        api_response: Optional['ApiResponse'] = None
    ):
        self.message = message
        self.code = code
        self.retry_delay = retry_delay
        self.retry_times = retry_times
        self.resource = resource
        self.is_retryable = is_retryable
        self.documentation_url = documentation_url
        self.api_response = api_response
        self.http_status_code = http_status_code

        super().__init__(self.message, str(self.code))

    def __str__(self):
        message = self.message

        if self.documentation_url is not None:
            message += '. Learn more: %s' % self.documentation_url

        return message

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

  • CrawlerError
  • scrapfly.errors.ExtraUsageForbidden
  • scrapfly.errors.HttpError

Class variables

var KIND_HTTP_BAD_RESPONSE
var KIND_SCRAPFLY_ERROR
var RESOURCE_ASP
var RESOURCE_PROXY
var RESOURCE_SCHEDULE
var RESOURCE_SCRAPE
var RESOURCE_SESSION
var RESOURCE_THROTTLE
var RESOURCE_WEBHOOK
class ScrapflyProxyError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflyProxyError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyScheduleError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflyScheduleError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyScrapeError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflyScrapeError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflySessionError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflySessionError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyThrottleError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflyThrottleError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScrapflyWebhookError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScrapflyWebhookError(ScraperAPIError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.ScraperAPIError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScreenshotAPIError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class ScreenshotAPIError(HttpError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException
class ScreenshotApiResponse (request: requests.models.Request,
response: requests.models.Response,
screenshot_config: ScreenshotConfig,
api_result: bytes | None = None)
Expand source code
class ScreenshotApiResponse(ApiResponse):
    def __init__(self, request: Request, response: Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None):
        super().__init__(request, response)
        self.screenshot_config = screenshot_config
        self.result = self.handle_api_result(api_result)

    @property
    def image(self) -> Optional[str]:
        binary = self.result.get('result', None)
        if binary is None:
            return ''

        return binary

    @property
    def metadata(self) -> Optional[Dict]:
        if not self.image:
            return {}

        content_type = self.response.headers.get('content-type')
        extension_name = content_type[content_type.find('/') + 1:].split(';')[0]

        return {
            'extension_name': extension_name,
            'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'),
            'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url')
        }

    @property
    def screenshot_success(self) -> bool:
        if not self.image:
            return False

        return True

    @property
    def error(self) -> Optional[Dict]:
        if self.image:
            return None

        if self.screenshot_success is False:
            return self.result

    def _is_api_error(self, api_result: Dict) -> bool:
        if api_result is None:
            return True

        return 'error_id' in api_result

    def handle_api_result(self, api_result: bytes) -> FrozenDict:
        if self._is_api_error(api_result=api_result) is True:
            return FrozenDict(api_result)

        return api_result

    def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError):
        super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Ancestors

Instance variables

prop error : Dict | None
Expand source code
@property
def error(self) -> Optional[Dict]:
    if self.image:
        return None

    if self.screenshot_success is False:
        return self.result
prop image : str | None
Expand source code
@property
def image(self) -> Optional[str]:
    binary = self.result.get('result', None)
    if binary is None:
        return ''

    return binary
prop metadata : Dict | None
Expand source code
@property
def metadata(self) -> Optional[Dict]:
    if not self.image:
        return {}

    content_type = self.response.headers.get('content-type')
    extension_name = content_type[content_type.find('/') + 1:].split(';')[0]

    return {
        'extension_name': extension_name,
        'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'),
        'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url')
    }
prop screenshot_success : bool
Expand source code
@property
def screenshot_success(self) -> bool:
    if not self.image:
        return False

    return True

Methods

def handle_api_result(self, api_result: bytes) ‑> FrozenDict
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict:
    if self._is_api_error(api_result=api_result) is True:
        return FrozenDict(api_result)

    return api_result
def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ScreenshotAPIError)
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError):
    super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)

Inherited members

class ScreenshotConfig (url: str,
format: Format | None = None,
capture: str | None = None,
resolution: str | None = None,
country: str | None = None,
timeout: int | None = None,
rendering_wait: int | None = None,
wait_for_selector: str | None = None,
options: List[Options] | None = None,
auto_scroll: bool | None = None,
js: str | None = None,
cache: bool | None = None,
cache_ttl: bool | None = None,
cache_clear: bool | None = None,
vision_deficiency: VisionDeficiency | None = None,
webhook: str | None = None,
raise_on_upstream_error: bool = True)
Expand source code
class ScreenshotConfig(BaseApiConfig):
    url: str
    format: Optional[Format] = None
    capture: Optional[str] = None
    resolution: Optional[str] = None
    country: Optional[str] = None
    timeout: Optional[int] = None # in milliseconds
    rendering_wait: Optional[int] = None # in milliseconds
    wait_for_selector: Optional[str] = None
    options: Optional[List[Options]] = None
    auto_scroll: Optional[bool] = None
    js: Optional[str] = None
    cache: Optional[bool] = None
    cache_ttl: Optional[bool] = None
    cache_clear: Optional[bool] = None
    webhook: Optional[str] = None
    raise_on_upstream_error: bool = True

    def __init__(
        self,
        url: str,
        format: Optional[Format] = None,
        capture: Optional[str] = None,
        resolution: Optional[str] = None,
        country: Optional[str] = None,
        timeout: Optional[int] = None, # in milliseconds
        rendering_wait: Optional[int] = None, # in milliseconds
        wait_for_selector: Optional[str] = None,
        options: Optional[List[Options]] = None,
        auto_scroll: Optional[bool] = None,
        js: Optional[str] = None,
        cache: Optional[bool] = None,
        cache_ttl: Optional[bool] = None,
        cache_clear: Optional[bool] = None,
        vision_deficiency: Optional[VisionDeficiency] = None,
        webhook: Optional[str] = None,
        raise_on_upstream_error: bool = True
    ):
        assert(type(url) is str)

        self.url = url
        self.key = None
        self.format = format
        self.capture = capture
        self.resolution = resolution
        self.country = country
        self.timeout = timeout
        self.rendering_wait = rendering_wait
        self.wait_for_selector = wait_for_selector
        self.options = [Options(flag) for flag in options] if options else None
        self.auto_scroll = auto_scroll
        self.js = js
        self.cache = cache
        self.cache_ttl = cache_ttl
        self.cache_clear = cache_clear
        self.vision_deficiency = vision_deficiency
        self.webhook = webhook
        self.raise_on_upstream_error = raise_on_upstream_error

    def to_api_params(self, key:str) -> Dict:
        params = {
            'key': self.key or key,
            'url': self.url
        }

        if self.format:
            params['format'] = Format(self.format).value

        if self.capture:
            params['capture'] = self.capture

        if self.resolution:
            params['resolution'] = self.resolution

        if self.country is not None:
            params['country'] = self.country

        if self.timeout is not None:
            params['timeout'] = self.timeout

        if self.rendering_wait is not None:
            params['rendering_wait'] = self.rendering_wait

        if self.wait_for_selector is not None:
            params['wait_for_selector'] = self.wait_for_selector            

        if self.options is not None:
            params["options"] = ",".join(flag.value for flag in self.options)

        if self.auto_scroll is not None:
            params['auto_scroll'] = self._bool_to_http(self.auto_scroll)

        if self.js:
            params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

        if self.cache is not None:
            params['cache'] = self._bool_to_http(self.cache)
            
            if self.cache_ttl is not None:
                params['cache_ttl'] = self._bool_to_http(self.cache_ttl)

            if self.cache_clear is not None:
                params['cache_clear'] = self._bool_to_http(self.cache_clear)

        else:
            if self.cache_ttl is not None:
                logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

            if self.cache_clear is not None:
                logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

        if self.vision_deficiency is not None:
            params['vision_deficiency'] = self.vision_deficiency.value

        if self.webhook is not None:
            params['webhook_name'] = self.webhook

        return params

    def to_dict(self) -> Dict:
        """
        Export the ScreenshotConfig instance to a plain dictionary.
        """
        return {
            'url': self.url,
            'format': Format(self.format).value if self.format else None,
            'capture': self.capture,
            'resolution': self.resolution,
            'country': self.country,
            'timeout': self.timeout,
            'rendering_wait': self.rendering_wait,
            'wait_for_selector': self.wait_for_selector,
            'options': [Options(option).value for option in self.options] if self.options else None,
            'auto_scroll': self.auto_scroll,
            'js': self.js,
            'cache': self.cache,
            'cache_ttl': self.cache_ttl,
            'cache_clear': self.cache_clear,
            'vision_deficiency': self.vision_deficiency.value if self.vision_deficiency else None,
            'webhook': self.webhook,
            'raise_on_upstream_error': self.raise_on_upstream_error
        }
    
    @staticmethod
    def from_dict(screenshot_config_dict: Dict) -> 'ScreenshotConfig':
        """Create a ScreenshotConfig instance from a dictionary."""
        url = screenshot_config_dict.get('url', None)

        format = screenshot_config_dict.get('format', None)
        format = Format(format) if format else None

        capture = screenshot_config_dict.get('capture', None)
        resolution = screenshot_config_dict.get('resolution', None)
        country = screenshot_config_dict.get('country', None)
        timeout = screenshot_config_dict.get('timeout', None)
        rendering_wait = screenshot_config_dict.get('rendering_wait', None)
        wait_for_selector = screenshot_config_dict.get('wait_for_selector', None)

        options = screenshot_config_dict.get('options', None)
        options = [Options(option) for option in options] if options else None

        auto_scroll = screenshot_config_dict.get('auto_scroll', None)
        js = screenshot_config_dict.get('js', None)
        cache = screenshot_config_dict.get('cache', None)
        cache_ttl = screenshot_config_dict.get('cache_ttl', None)
        cache_clear = screenshot_config_dict.get('cache_clear', None)
        vision_deficiency = screenshot_config_dict.get('vision_deficiency', None)
        webhook = screenshot_config_dict.get('webhook', None)
        raise_on_upstream_error = screenshot_config_dict.get('raise_on_upstream_error', True)

        return ScreenshotConfig(
            url=url,
            format=format,
            capture=capture,
            resolution=resolution,
            country=country,
            timeout=timeout,
            rendering_wait=rendering_wait,
            wait_for_selector=wait_for_selector,
            options=options,
            auto_scroll=auto_scroll,
            js=js,
            cache=cache,
            cache_ttl=cache_ttl,
            cache_clear=cache_clear,
            vision_deficiency=vision_deficiency,
            webhook=webhook,
            raise_on_upstream_error=raise_on_upstream_error
        )

Ancestors

Class variables

var auto_scroll : bool | None
var cache : bool | None
var cache_clear : bool | None
var cache_ttl : bool | None
var capture : str | None
var country : str | None
var formatFormat | None
var js : str | None
var options : List[Options] | None
var raise_on_upstream_error : bool
var rendering_wait : int | None
var resolution : str | None
var timeout : int | None
var url : str
var wait_for_selector : str | None
var webhook : str | None

Static methods

def from_dict(screenshot_config_dict: Dict) ‑> ScreenshotConfig
Expand source code
@staticmethod
def from_dict(screenshot_config_dict: Dict) -> 'ScreenshotConfig':
    """Create a ScreenshotConfig instance from a dictionary."""
    url = screenshot_config_dict.get('url', None)

    format = screenshot_config_dict.get('format', None)
    format = Format(format) if format else None

    capture = screenshot_config_dict.get('capture', None)
    resolution = screenshot_config_dict.get('resolution', None)
    country = screenshot_config_dict.get('country', None)
    timeout = screenshot_config_dict.get('timeout', None)
    rendering_wait = screenshot_config_dict.get('rendering_wait', None)
    wait_for_selector = screenshot_config_dict.get('wait_for_selector', None)

    options = screenshot_config_dict.get('options', None)
    options = [Options(option) for option in options] if options else None

    auto_scroll = screenshot_config_dict.get('auto_scroll', None)
    js = screenshot_config_dict.get('js', None)
    cache = screenshot_config_dict.get('cache', None)
    cache_ttl = screenshot_config_dict.get('cache_ttl', None)
    cache_clear = screenshot_config_dict.get('cache_clear', None)
    vision_deficiency = screenshot_config_dict.get('vision_deficiency', None)
    webhook = screenshot_config_dict.get('webhook', None)
    raise_on_upstream_error = screenshot_config_dict.get('raise_on_upstream_error', True)

    return ScreenshotConfig(
        url=url,
        format=format,
        capture=capture,
        resolution=resolution,
        country=country,
        timeout=timeout,
        rendering_wait=rendering_wait,
        wait_for_selector=wait_for_selector,
        options=options,
        auto_scroll=auto_scroll,
        js=js,
        cache=cache,
        cache_ttl=cache_ttl,
        cache_clear=cache_clear,
        vision_deficiency=vision_deficiency,
        webhook=webhook,
        raise_on_upstream_error=raise_on_upstream_error
    )

Create a ScreenshotConfig instance from a dictionary.

Methods

def to_api_params(self, key: str) ‑> Dict
Expand source code
def to_api_params(self, key:str) -> Dict:
    params = {
        'key': self.key or key,
        'url': self.url
    }

    if self.format:
        params['format'] = Format(self.format).value

    if self.capture:
        params['capture'] = self.capture

    if self.resolution:
        params['resolution'] = self.resolution

    if self.country is not None:
        params['country'] = self.country

    if self.timeout is not None:
        params['timeout'] = self.timeout

    if self.rendering_wait is not None:
        params['rendering_wait'] = self.rendering_wait

    if self.wait_for_selector is not None:
        params['wait_for_selector'] = self.wait_for_selector            

    if self.options is not None:
        params["options"] = ",".join(flag.value for flag in self.options)

    if self.auto_scroll is not None:
        params['auto_scroll'] = self._bool_to_http(self.auto_scroll)

    if self.js:
        params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8')

    if self.cache is not None:
        params['cache'] = self._bool_to_http(self.cache)
        
        if self.cache_ttl is not None:
            params['cache_ttl'] = self._bool_to_http(self.cache_ttl)

        if self.cache_clear is not None:
            params['cache_clear'] = self._bool_to_http(self.cache_clear)

    else:
        if self.cache_ttl is not None:
            logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled')

        if self.cache_clear is not None:
            logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled')

    if self.vision_deficiency is not None:
        params['vision_deficiency'] = self.vision_deficiency.value

    if self.webhook is not None:
        params['webhook_name'] = self.webhook

    return params
def to_dict(self) ‑> Dict
Expand source code
def to_dict(self) -> Dict:
    """
    Export the ScreenshotConfig instance to a plain dictionary.
    """
    return {
        'url': self.url,
        'format': Format(self.format).value if self.format else None,
        'capture': self.capture,
        'resolution': self.resolution,
        'country': self.country,
        'timeout': self.timeout,
        'rendering_wait': self.rendering_wait,
        'wait_for_selector': self.wait_for_selector,
        'options': [Options(option).value for option in self.options] if self.options else None,
        'auto_scroll': self.auto_scroll,
        'js': self.js,
        'cache': self.cache,
        'cache_ttl': self.cache_ttl,
        'cache_clear': self.cache_clear,
        'vision_deficiency': self.vision_deficiency.value if self.vision_deficiency else None,
        'webhook': self.webhook,
        'raise_on_upstream_error': self.raise_on_upstream_error
    }

Export the ScreenshotConfig instance to a plain dictionary.

class UpstreamHttpClientError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class UpstreamHttpClientError(UpstreamHttpError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.UpstreamHttpError
  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

class UpstreamHttpError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class UpstreamHttpError(HttpError):
    pass

Common base class for all non-exit exceptions.

Ancestors

  • scrapfly.errors.HttpError
  • ScrapflyError
  • builtins.Exception
  • builtins.BaseException

Subclasses

class UpstreamHttpServerError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)
Expand source code
class UpstreamHttpServerError(UpstreamHttpClientError):
    pass

Common base class for all non-exit exceptions.

Ancestors

class WarcParser (warc_data: bytes | )
Expand source code
class WarcParser:
    """
    Parser for WARC files with automatic decompression

    Provides methods to iterate through WARC records and extract page data.

    Example:
        ```python
        # From bytes
        parser = WarcParser(warc_bytes)

        # Iterate all records
        for record in parser.iter_records():
            print(f"{record.url}: {record.status_code}")

        # Get only HTTP responses
        for record in parser.iter_responses():
            print(f"Page: {record.url}")
            html = record.content.decode('utf-8')

        # Get all pages as simple dicts
        pages = parser.get_pages()
        for page in pages:
            print(f"{page['url']}: {page['status_code']}")
        ```
    """

    def __init__(self, warc_data: Union[bytes, BinaryIO]):
        """
        Initialize WARC parser

        Args:
            warc_data: WARC data as bytes or file-like object
                      (supports both gzip-compressed and uncompressed)
        """
        if isinstance(warc_data, bytes):
            # Try to decompress if gzipped
            if warc_data[:2] == b'\x1f\x8b':  # gzip magic number
                try:
                    warc_data = gzip.decompress(warc_data)
                except Exception:
                    pass  # Not gzipped or decompression failed
            self._data = BytesIO(warc_data)
        else:
            self._data = warc_data

    def iter_records(self) -> Iterator[WarcRecord]:
        """
        Iterate through all WARC records

        Yields:
            WarcRecord: Each record in the WARC file
        """
        self._data.seek(0)

        while True:
            # Read WARC version line
            version_line = self._read_line()
            if not version_line or not version_line.startswith(b'WARC/'):
                break

            # Read WARC headers
            warc_headers = self._read_headers()
            if not warc_headers:
                break

            # Get content length
            content_length = int(warc_headers.get('Content-Length', 0))

            # Read content block
            content_block = self._data.read(content_length)

            # Skip trailing newlines
            self._read_line()
            self._read_line()

            # Parse the record
            record = self._parse_record(warc_headers, content_block)
            if record:
                yield record

    def iter_responses(self) -> Iterator[WarcRecord]:
        """
        Iterate through HTTP response records only

        Filters out non-response records (requests, metadata, etc.)

        Yields:
            WarcRecord: HTTP response records only
        """
        for record in self.iter_records():
            if record.record_type == 'response' and record.status_code:
                yield record

    def get_pages(self) -> List[Dict]:
        """
        Get all crawled pages as simple dictionaries

        This is the easiest way to access crawl results without dealing
        with WARC format details.

        Returns:
            List of dicts with keys: url, status_code, headers, content

        Example:
            ```python
            pages = parser.get_pages()
            for page in pages:
                print(f"{page['url']}: {len(page['content'])} bytes")
                html = page['content'].decode('utf-8')
            ```
        """
        pages = []
        for record in self.iter_responses():
            pages.append({
                'url': record.url,
                'status_code': record.status_code,
                'headers': record.headers,
                'content': record.content
            })
        return pages

    def _read_line(self) -> bytes:
        """Read a single line from the WARC file"""
        line = self._data.readline()
        return line.rstrip(b'\r\n')

    def _read_headers(self) -> Dict[str, str]:
        """Read headers until empty line"""
        headers = {}
        while True:
            line = self._read_line()
            if not line:
                break

            # Parse header line
            if b':' in line:
                key, value = line.split(b':', 1)
                headers[key.decode('utf-8').strip()] = value.decode('utf-8').strip()

        return headers

    def _parse_record(self, warc_headers: Dict[str, str], content_block: bytes) -> Optional[WarcRecord]:
        """Parse a WARC record from headers and content"""
        record_type = warc_headers.get('WARC-Type', '')
        url = warc_headers.get('WARC-Target-URI', '')

        if record_type == 'response':
            # Parse HTTP response
            http_headers, body = self._parse_http_response(content_block)
            status_code = self._extract_status_code(content_block)

            return WarcRecord(
                record_type=record_type,
                url=url,
                headers=http_headers,
                content=body,
                status_code=status_code,
                warc_headers=warc_headers
            )
        elif record_type in ['request', 'metadata', 'warcinfo']:
            # Other record types - store raw content
            return WarcRecord(
                record_type=record_type,
                url=url,
                headers={},
                content=content_block,
                status_code=None,
                warc_headers=warc_headers
            )

        return None

    def _parse_http_response(self, content_block: bytes) -> tuple:
        """Parse HTTP response into headers and body"""
        try:
            # Split on double newline (end of headers)
            parts = content_block.split(b'\r\n\r\n', 1)
            if len(parts) < 2:
                parts = content_block.split(b'\n\n', 1)

            if len(parts) == 2:
                header_section, body = parts
            else:
                header_section, body = content_block, b''

            # Parse headers
            headers = {}
            lines = header_section.split(b'\r\n') if b'\r\n' in header_section else header_section.split(b'\n')

            # Skip status line
            for line in lines[1:]:
                if b':' in line:
                    key, value = line.split(b':', 1)
                    headers[key.decode('utf-8', errors='ignore').strip()] = value.decode('utf-8', errors='ignore').strip()

            return headers, body

        except Exception:
            return {}, content_block

    def _extract_status_code(self, content_block: bytes) -> Optional[int]:
        """Extract HTTP status code from response"""
        try:
            # Look for HTTP status line (e.g., "HTTP/1.1 200 OK")
            first_line = content_block.split(b'\r\n', 1)[0] if b'\r\n' in content_block else content_block.split(b'\n', 1)[0]
            match = re.match(rb'HTTP/\d\.\d (\d+)', first_line)
            if match:
                return int(match.group(1))
        except Exception:
            pass
        return None

Parser for WARC files with automatic decompression

Provides methods to iterate through WARC records and extract page data.

Example

# From bytes
parser = WarcParser(warc_bytes)

# Iterate all records
for record in parser.iter_records():
    print(f"{record.url}: {record.status_code}")

# Get only HTTP responses
for record in parser.iter_responses():
    print(f"Page: {record.url}")
    html = record.content.decode('utf-8')

# Get all pages as simple dicts
pages = parser.get_pages()
for page in pages:
    print(f"{page['url']}: {page['status_code']}")

Initialize WARC parser

Args

warc_data
WARC data as bytes or file-like object (supports both gzip-compressed and uncompressed)

Methods

def get_pages(self) ‑> List[Dict]
Expand source code
def get_pages(self) -> List[Dict]:
    """
    Get all crawled pages as simple dictionaries

    This is the easiest way to access crawl results without dealing
    with WARC format details.

    Returns:
        List of dicts with keys: url, status_code, headers, content

    Example:
        ```python
        pages = parser.get_pages()
        for page in pages:
            print(f"{page['url']}: {len(page['content'])} bytes")
            html = page['content'].decode('utf-8')
        ```
    """
    pages = []
    for record in self.iter_responses():
        pages.append({
            'url': record.url,
            'status_code': record.status_code,
            'headers': record.headers,
            'content': record.content
        })
    return pages

Get all crawled pages as simple dictionaries

This is the easiest way to access crawl results without dealing with WARC format details.

Returns

List of dicts with keys
url, status_code, headers, content

Example

pages = parser.get_pages()
for page in pages:
    print(f"{page['url']}: {len(page['content'])} bytes")
    html = page['content'].decode('utf-8')
def iter_records(self) ‑> Iterator[WarcRecord]
Expand source code
def iter_records(self) -> Iterator[WarcRecord]:
    """
    Iterate through all WARC records

    Yields:
        WarcRecord: Each record in the WARC file
    """
    self._data.seek(0)

    while True:
        # Read WARC version line
        version_line = self._read_line()
        if not version_line or not version_line.startswith(b'WARC/'):
            break

        # Read WARC headers
        warc_headers = self._read_headers()
        if not warc_headers:
            break

        # Get content length
        content_length = int(warc_headers.get('Content-Length', 0))

        # Read content block
        content_block = self._data.read(content_length)

        # Skip trailing newlines
        self._read_line()
        self._read_line()

        # Parse the record
        record = self._parse_record(warc_headers, content_block)
        if record:
            yield record

Iterate through all WARC records

Yields

WarcRecord
Each record in the WARC file
def iter_responses(self) ‑> Iterator[WarcRecord]
Expand source code
def iter_responses(self) -> Iterator[WarcRecord]:
    """
    Iterate through HTTP response records only

    Filters out non-response records (requests, metadata, etc.)

    Yields:
        WarcRecord: HTTP response records only
    """
    for record in self.iter_records():
        if record.record_type == 'response' and record.status_code:
            yield record

Iterate through HTTP response records only

Filters out non-response records (requests, metadata, etc.)

Yields

WarcRecord
HTTP response records only
class WarcRecord (record_type: str,
url: str,
headers: Dict[str, str],
content: bytes,
status_code: int | None,
warc_headers: Dict[str, str])
Expand source code
@dataclass
class WarcRecord:
    """
    Represents a single WARC record

    A WARC file contains multiple records, each representing a captured
    HTTP transaction or metadata.
    """
    record_type: str  # Type of record (response, request, metadata, etc.)
    url: str  # Associated URL
    headers: Dict[str, str]  # HTTP headers
    content: bytes  # Response body/content
    status_code: Optional[int]  # HTTP status code (for response records)
    warc_headers: Dict[str, str]  # WARC-specific headers

    def __repr__(self):
        return f"WarcRecord(type={self.record_type}, url={self.url}, status={self.status_code})"

Represents a single WARC record

A WARC file contains multiple records, each representing a captured HTTP transaction or metadata.

Instance variables

var content : bytes
var headers : Dict[str, str]
var record_type : str
var status_code : int | None
var url : str
var warc_headers : Dict[str, str]