Package scrapfly
Sub-modules
scrapfly.api_configscrapfly.api_responsescrapfly.clientscrapfly.crawler-
Scrapfly Crawler API …
scrapfly.errorsscrapfly.extraction_configscrapfly.frozen_dictscrapfly.polyfillscrapfly.reporterscrapfly.scrape_configscrapfly.scrapyscrapfly.screenshot_configscrapfly.webhook
Functions
def parse_warc(warc_data: bytes |) ‑> WarcParser -
Expand source code
def parse_warc(warc_data: Union[bytes, BinaryIO]) -> WarcParser: """ Convenience function to create a WARC parser Args: warc_data: WARC data as bytes or file-like object Returns: WarcParser: Parser instance Example: ```python from scrapfly import parse_warc # Quick way to get all pages pages = parse_warc(warc_bytes).get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") ``` """ return WarcParser(warc_data)Convenience function to create a WARC parser
Args
warc_data- WARC data as bytes or file-like object
Returns
WarcParser- Parser instance
Example
from scrapfly import parse_warc # Quick way to get all pages pages = parse_warc(warc_bytes).get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") def webhook_from_payload(payload: Dict,
signing_secrets: Tuple[str] | None = None,
signature: str | None = None) ‑> CrawlStartedWebhook | CrawlUrlDiscoveredWebhook | CrawlUrlFailedWebhook | CrawlCompletedWebhook-
Expand source code
def webhook_from_payload( payload: Dict, signing_secrets: Optional[Tuple[str]] = None, signature: Optional[str] = None ) -> CrawlerWebhook: """ Create a typed webhook instance from a raw payload dictionary. This helper automatically determines the webhook type based on the 'event' field and returns the appropriate typed webhook instance. Args: payload: The webhook payload as a dictionary signing_secrets: Optional tuple of signing secrets (hex strings) for verification signature: Optional webhook signature header for verification Returns: A typed webhook instance (CrawlStartedWebhook, CrawlUrlDiscoveredWebhook, etc.) Raises: ValueError: If the event type is unknown WebhookSignatureMissMatch: If signature verification fails Example: ```python from scrapfly import webhook_from_payload # From Flask request @app.route('/webhook', methods=['POST']) def handle_webhook(): webhook = webhook_from_payload( request.json, signing_secrets=('your-secret-key',), signature=request.headers.get('X-Scrapfly-Webhook-Signature') ) if isinstance(webhook, CrawlCompletedWebhook): print(f"Crawl {webhook.uuid} completed!") print(f"Crawled {webhook.urls_crawled} URLs") return '', 200 ``` """ # Verify signature if provided if signing_secrets and signature: from ..api_response import ResponseBodyHandler from json import dumps handler = ResponseBodyHandler(signing_secrets=signing_secrets) message = dumps(payload, separators=(',', ':')).encode('utf-8') if not handler.verify(message, signature): from ..errors import WebhookSignatureMissMatch raise WebhookSignatureMissMatch() # Determine event type and create appropriate webhook instance event = payload.get('event') if event == CrawlerWebhookEvent.STARTED.value: return CrawlStartedWebhook.from_dict(payload) elif event == CrawlerWebhookEvent.URL_DISCOVERED.value: return CrawlUrlDiscoveredWebhook.from_dict(payload) elif event == CrawlerWebhookEvent.URL_FAILED.value: return CrawlUrlFailedWebhook.from_dict(payload) elif event == CrawlerWebhookEvent.COMPLETED.value: return CrawlCompletedWebhook.from_dict(payload) else: raise ValueError(f"Unknown crawler webhook event type: {event}")Create a typed webhook instance from a raw payload dictionary.
This helper automatically determines the webhook type based on the 'event' field and returns the appropriate typed webhook instance.
Args
payload- The webhook payload as a dictionary
signing_secrets- Optional tuple of signing secrets (hex strings) for verification
signature- Optional webhook signature header for verification
Returns
A typed webhook instance (CrawlStartedWebhook, CrawlUrlDiscoveredWebhook, etc.)
Raises
ValueError- If the event type is unknown
WebhookSignatureMissMatch- If signature verification fails
Example
from scrapfly import webhook_from_payload # From Flask request @app.route('/webhook', methods=['POST']) def handle_webhook(): webhook = webhook_from_payload( request.json, signing_secrets=('your-secret-key',), signature=request.headers.get('X-Scrapfly-Webhook-Signature') ) if isinstance(webhook, CrawlCompletedWebhook): print(f"Crawl {webhook.uuid} completed!") print(f"Crawled {webhook.urls_crawled} URLs") return '', 200
Classes
class ApiHttpClientError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ApiHttpClientError(HttpError): passCommon base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
- ApiHttpServerError
- scrapfly.errors.BadApiKeyError
- scrapfly.errors.PaymentRequired
- scrapfly.errors.TooManyRequest
class ApiHttpServerError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ApiHttpServerError(ApiHttpClientError): passCommon base class for all non-exit exceptions.
Ancestors
- ApiHttpClientError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class Crawl (client: ScrapflyClient,
config: CrawlerConfig)-
Expand source code
class Crawl: """ High-level abstraction for managing a crawler job The Crawl object maintains the state of a crawler job and provides convenient methods for managing its lifecycle. Example: ```python from scrapfly import ScrapflyClient, CrawlerConfig, Crawl client = ScrapflyClient(key='your-key') config = CrawlerConfig(url='https://example.com', page_limit=10) # Create and start crawl crawl = Crawl(client, config) crawl.crawl() # Start the crawler # Wait for completion crawl.wait() # Get results pages = crawl.warc().get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") # Or read specific URLs html = crawl.read('https://example.com/page1', format='html') ``` """ def __init__(self, client: 'ScrapflyClient', config: CrawlerConfig): """ Initialize a Crawl object Args: client: ScrapflyClient instance config: CrawlerConfig with crawler settings """ self._client = client self._config = config self._uuid: Optional[str] = None self._status_cache: Optional[CrawlerStatusResponse] = None self._artifact_cache: Optional[CrawlerArtifactResponse] = None @property def uuid(self) -> Optional[str]: """Get the crawler job UUID (None if not started)""" return self._uuid @property def started(self) -> bool: """Check if the crawler has been started""" return self._uuid is not None def crawl(self) -> 'Crawl': """ Start the crawler job Returns: Self for method chaining Raises: RuntimeError: If crawler already started Example: ```python crawl = Crawl(client, config) crawl.crawl() # Start crawling ``` """ if self._uuid is not None: raise ScrapflyCrawlerError( message="Crawler already started", code="ALREADY_STARTED", http_status_code=400 ) response = self._client.start_crawl(self._config) self._uuid = response.uuid return self def status(self, refresh: bool = True) -> CrawlerStatusResponse: """ Get current crawler status Args: refresh: If True, fetch fresh status from API. If False, return cached status. Returns: CrawlerStatusResponse with current status Raises: RuntimeError: If crawler not started yet Example: ```python status = crawl.status() print(f"Progress: {status.progress_pct}%") print(f"URLs crawled: {status.urls_crawled}") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) if refresh or self._status_cache is None: self._status_cache = self._client.get_crawl_status(self._uuid) return self._status_cache def wait( self, poll_interval: int = 5, max_wait: Optional[int] = None, verbose: bool = False ) -> 'Crawl': """ Wait for crawler to complete Polls the status endpoint until the crawler finishes. Args: poll_interval: Seconds between status checks (default: 5) max_wait: Maximum seconds to wait (None = wait forever) verbose: If True, print progress updates Returns: Self for method chaining Raises: RuntimeError: If crawler not started, failed, or timed out Example: ```python # Wait with progress updates crawl.crawl().wait(verbose=True) # Wait with timeout crawl.crawl().wait(max_wait=300) # 5 minutes max ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) start_time = time.time() poll_count = 0 while True: status = self.status(refresh=True) poll_count += 1 if verbose: logger.info(f"Poll #{poll_count}: {status.status} - " f"{status.progress_pct:.1f}% - " f"{status.urls_crawled}/{status.urls_discovered} URLs") if status.is_complete: if verbose: logger.info(f"✓ Crawler completed successfully!") return self elif status.is_failed: raise ScrapflyCrawlerError( message=f"Crawler failed with status: {status.status}", code="FAILED", http_status_code=400 ) elif status.is_cancelled: raise ScrapflyCrawlerError( message="Crawler was cancelled", code="CANCELLED", http_status_code=400 ) # Check timeout if max_wait is not None: elapsed = time.time() - start_time if elapsed > max_wait: raise ScrapflyCrawlerError( message=f"Timeout waiting for crawler (>{max_wait}s)", code="TIMEOUT", http_status_code=400 ) time.sleep(poll_interval) def cancel(self) -> bool: """ Cancel the running crawler job Returns: True if cancelled successfully Raises: ScrapflyCrawlerError: If crawler not started yet Example: ```python # Start a crawl crawl = Crawl(client, config).crawl() # Cancel it crawl.cancel() ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) return self._client.cancel_crawl(self._uuid) def warc(self, artifact_type: str = 'warc') -> CrawlerArtifactResponse: """ Download the crawler artifact (WARC file) Args: artifact_type: Type of artifact to download (default: 'warc') Returns: CrawlerArtifactResponse with parsed WARC data Raises: RuntimeError: If crawler not started yet Example: ```python # Get WARC artifact artifact = crawl.warc() # Get all pages pages = artifact.get_pages() # Iterate through responses for record in artifact.iter_responses(): print(record.url) ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) if self._artifact_cache is None: self._artifact_cache = self._client.get_crawl_artifact( self._uuid, artifact_type=artifact_type ) return self._artifact_cache def har(self) -> CrawlerArtifactResponse: """ Download the crawler artifact in HAR (HTTP Archive) format Returns: CrawlerArtifactResponse with parsed HAR data Raises: RuntimeError: If crawler not started yet Example: ```python # Get HAR artifact artifact = crawl.har() # Get all pages pages = artifact.get_pages() # Iterate through HAR entries for entry in artifact.iter_responses(): print(f"{entry.url}: {entry.status_code}") print(f"Timing: {entry.time}ms") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) return self._client.get_crawl_artifact( self._uuid, artifact_type='har' ) def read(self, url: str, format: ContentFormat = 'html') -> Optional[CrawlContent]: """ Read content from a specific URL in the crawl results Args: url: The URL to retrieve content for format: Content format - 'html', 'markdown', 'text', 'clean_html', 'json', 'extracted_data', 'page_metadata' Returns: CrawlContent object with content and metadata, or None if URL not found Example: ```python # Get HTML content for a specific URL content = crawl.read('https://example.com/page1') if content: print(f"URL: {content.url}") print(f"Status: {content.status_code}") print(f"Duration: {content.duration}s") print(content.content) # Get markdown content content = crawl.read('https://example.com/page1', format='markdown') if content: print(content.content) # Check if URL was crawled if crawl.read('https://example.com/missing') is None: print("URL not found in crawl results") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) # For HTML format, we can get it from the WARC artifact (faster) if format == 'html': artifact = self.warc() for record in artifact.iter_responses(): if record.url == url: # Extract metadata from WARC headers warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') duration = float(duration_str) if duration_str else None return CrawlContent( url=record.url, content=record.content.decode('utf-8', errors='replace'), status_code=record.status_code, headers=record.headers, duration=duration, log_id=warc_headers.get('WARC-Scrape-Log-Id'), country=warc_headers.get('WARC-Scrape-Country'), crawl_uuid=self._uuid ) return None # For other formats (markdown, text, etc.), use the contents API try: result = self._client.get_crawl_contents( self._uuid, format=format ) # The API returns: {"contents": {url: {format: content, ...}, ...}, "links": {...}} contents = result.get('contents', {}) if url in contents: content_data = contents[url] # Content is always a dict with format keys (e.g., {"html": "...", "markdown": "..."}) content_str = content_data.get(format) if content_str: # For non-HTML formats from contents API, we don't have full metadata # Try to get status code from WARC if possible status_code = 200 # Default headers = {} duration = None log_id = None country = None # Try to get metadata from WARC try: artifact = self.warc() for record in artifact.iter_responses(): if record.url == url: status_code = record.status_code headers = record.headers warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') duration = float(duration_str) if duration_str else None log_id = warc_headers.get('WARC-Scrape-Log-Id') country = warc_headers.get('WARC-Scrape-Country') break except: pass return CrawlContent( url=url, content=content_str, status_code=status_code, headers=headers, duration=duration, log_id=log_id, country=country, crawl_uuid=self._uuid ) return None except Exception: # If contents API fails, return None return None def read_iter( self, pattern: str, format: ContentFormat = 'html' ) -> Iterator[CrawlContent]: """ Iterate through URLs matching a pattern and yield their content Supports wildcard patterns using * and ? for flexible URL matching. Args: pattern: URL pattern with wildcards (* matches any characters, ? matches one) Examples: "/products?page=*", "https://example.com/*/detail", "*/product/*" format: Content format to retrieve Yields: CrawlContent objects for each matching URL Example: ```python # Get all product pages in markdown for content in crawl.read_iter(pattern="*/products?page=*", format="markdown"): print(f"{content.url}: {len(content.content)} chars") print(f"Duration: {content.duration}s") # Get all detail pages for content in crawl.read_iter(pattern="*/detail/*"): process(content.content) # Pattern matching examples: # "/products?page=*" matches /products?page=1, /products?page=2, etc. # "*/product/*" matches any URL with /product/ in the path # "https://example.com/page?" matches https://example.com/page1, page2, etc. ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) # For HTML format, use WARC artifact (faster) if format == 'html': artifact = self.warc() for record in artifact.iter_responses(): if fnmatch.fnmatch(record.url, pattern): # Extract metadata from WARC headers warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') duration = float(duration_str) if duration_str else None yield CrawlContent( url=record.url, content=record.content.decode('utf-8', errors='replace'), status_code=record.status_code, headers=record.headers, duration=duration, log_id=warc_headers.get('WARC-Scrape-Log-Id'), country=warc_headers.get('WARC-Scrape-Country'), crawl_uuid=self._uuid ) else: # For other formats, use contents API try: result = self._client.get_crawl_contents( self._uuid, format=format ) contents = result.get('contents', {}) # Build a metadata cache from WARC for non-HTML formats metadata_cache = {} try: artifact = self.warc() for record in artifact.iter_responses(): warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') metadata_cache[record.url] = { 'status_code': record.status_code, 'headers': record.headers, 'duration': float(duration_str) if duration_str else None, 'log_id': warc_headers.get('WARC-Scrape-Log-Id'), 'country': warc_headers.get('WARC-Scrape-Country') } except: pass # Iterate through matching URLs for url, content_data in contents.items(): if fnmatch.fnmatch(url, pattern): # Content is always a dict with format keys (e.g., {"html": "...", "markdown": "..."}) content = content_data.get(format) if content: # Get metadata from cache or use defaults metadata = metadata_cache.get(url, {}) yield CrawlContent( url=url, content=content, status_code=metadata.get('status_code', 200), headers=metadata.get('headers', {}), duration=metadata.get('duration'), log_id=metadata.get('log_id'), country=metadata.get('country'), crawl_uuid=self._uuid ) except Exception: # If contents API fails, yield nothing return def read_batch( self, urls: List[str], formats: List[ContentFormat] = None ) -> Dict[str, Dict[str, str]]: """ Retrieve content for multiple URLs in a single batch request This is more efficient than calling read() multiple times as it retrieves all content in a single API call. Maximum 100 URLs per request. Args: urls: List of URLs to retrieve (max 100) formats: List of content formats to retrieve (e.g., ['markdown', 'text']) If None, defaults to ['html'] Returns: Dictionary mapping URLs to their content in requested formats: { 'https://example.com/page1': { 'markdown': '# Page 1...', 'text': 'Page 1...' }, 'https://example.com/page2': { 'markdown': '# Page 2...', 'text': 'Page 2...' } } Example: ```python # Get markdown and text for multiple URLs urls = ['https://example.com/page1', 'https://example.com/page2'] contents = crawl.read_batch(urls, formats=['markdown', 'text']) for url, formats in contents.items(): markdown = formats.get('markdown', '') text = formats.get('text', '') print(f"{url}: {len(markdown)} chars markdown, {len(text)} chars text") ``` Raises: ValueError: If more than 100 URLs are provided ScrapflyCrawlerError: If crawler not started or request fails """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) if len(urls) > 100: raise ValueError("Maximum 100 URLs per batch request") if not urls: return {} # Default to html if no formats specified if formats is None: formats = ['html'] # Build URL with formats parameter formats_str = ','.join(formats) url = f"{self._client.host}/crawl/{self._uuid}/contents/batch" params = { 'key': self._client.key, 'formats': formats_str } # Prepare request body (newline-separated URLs) body = '\n'.join(urls) # Make request import requests response = requests.post( url, params=params, data=body.encode('utf-8'), headers={'Content-Type': 'text/plain'}, verify=self._client.verify ) if response.status_code != 200: raise ScrapflyCrawlerError( message=f"Batch content request failed: {response.status_code}", code="BATCH_REQUEST_FAILED", http_status_code=response.status_code ) # Parse multipart response content_type = response.headers.get('Content-Type', '') if not content_type.startswith('multipart/related'): raise ScrapflyCrawlerError( message=f"Unexpected content type: {content_type}", code="INVALID_RESPONSE", http_status_code=500 ) # Extract boundary from Content-Type header boundary = None for part in content_type.split(';'): part = part.strip() if part.startswith('boundary='): boundary = part.split('=', 1)[1] break if not boundary: raise ScrapflyCrawlerError( message="No boundary found in multipart response", code="INVALID_RESPONSE", http_status_code=500 ) # Parse multipart message # Prepend Content-Type header to make it a valid email message for the parser message_bytes = f"Content-Type: {content_type}\r\n\r\n".encode('utf-8') + response.content parser = BytesParser(policy=default) message = parser.parsebytes(message_bytes) # Extract content from each part result = {} for part in message.walk(): # Skip the container itself if part.get_content_maintype() == 'multipart': continue # Get the URL from Content-Location header content_location = part.get('Content-Location') if not content_location: continue # Get content type to determine format part_content_type = part.get_content_type() format_type = None # Map MIME types to format names if 'markdown' in part_content_type: format_type = 'markdown' elif 'plain' in part_content_type: format_type = 'text' elif 'html' in part_content_type: format_type = 'html' elif 'json' in part_content_type: format_type = 'json' if not format_type: continue # Get content content = part.get_content() if isinstance(content, bytes): content = content.decode('utf-8', errors='replace') # Initialize URL dict if needed if content_location not in result: result[content_location] = {} # Store content result[content_location][format_type] = content return result def stats(self) -> Dict[str, Any]: """ Get comprehensive statistics about the crawl Returns: Dictionary with crawl statistics Example: ```python stats = crawl.stats() print(f"URLs discovered: {stats['urls_discovered']}") print(f"URLs crawled: {stats['urls_crawled']}") print(f"Success rate: {stats['success_rate']:.1f}%") print(f"Total size: {stats['total_size_kb']:.2f} KB") ``` """ status = self.status(refresh=False) # Basic stats from status stats_dict = { 'uuid': self._uuid, 'status': status.status, 'urls_discovered': status.urls_discovered, 'urls_crawled': status.urls_crawled, 'urls_pending': status.urls_pending, 'urls_failed': status.urls_failed, 'progress_pct': status.progress_pct, 'is_complete': status.is_complete, 'is_running': status.is_running, 'is_failed': status.is_failed, } # Calculate basic crawl rate (crawled vs discovered) if status.urls_discovered > 0: stats_dict['crawl_rate'] = (status.urls_crawled / status.urls_discovered) * 100 # Add artifact stats if available if self._artifact_cache is not None: pages = self._artifact_cache.get_pages() total_size = sum(len(p['content']) for p in pages) avg_size = total_size / len(pages) if pages else 0 stats_dict.update({ 'pages_downloaded': len(pages), 'total_size_bytes': total_size, 'total_size_kb': total_size / 1024, 'total_size_mb': total_size / (1024 * 1024), 'avg_page_size_bytes': avg_size, 'avg_page_size_kb': avg_size / 1024, }) # Calculate download rate (pages vs discovered) if status.urls_discovered > 0: stats_dict['download_rate'] = (len(pages) / status.urls_discovered) * 100 return stats_dict def __repr__(self): if self._uuid is None: return f"Crawl(not started)" status_str = "unknown" if self._status_cache: status_str = self._status_cache.status return f"Crawl(uuid={self._uuid}, status={status_str})"High-level abstraction for managing a crawler job
The Crawl object maintains the state of a crawler job and provides convenient methods for managing its lifecycle.
Example
from scrapfly import ScrapflyClient, CrawlerConfig, Crawl client = ScrapflyClient(key='your-key') config = CrawlerConfig(url='https://example.com', page_limit=10) # Create and start crawl crawl = Crawl(client, config) crawl.crawl() # Start the crawler # Wait for completion crawl.wait() # Get results pages = crawl.warc().get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") # Or read specific URLs html = crawl.read('https://example.com/page1', format='html')Initialize a Crawl object
Args
client- ScrapflyClient instance
config- CrawlerConfig with crawler settings
Instance variables
prop started : bool-
Expand source code
@property def started(self) -> bool: """Check if the crawler has been started""" return self._uuid is not NoneCheck if the crawler has been started
prop uuid : str | None-
Expand source code
@property def uuid(self) -> Optional[str]: """Get the crawler job UUID (None if not started)""" return self._uuidGet the crawler job UUID (None if not started)
Methods
def cancel(self) ‑> bool-
Expand source code
def cancel(self) -> bool: """ Cancel the running crawler job Returns: True if cancelled successfully Raises: ScrapflyCrawlerError: If crawler not started yet Example: ```python # Start a crawl crawl = Crawl(client, config).crawl() # Cancel it crawl.cancel() ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) return self._client.cancel_crawl(self._uuid)Cancel the running crawler job
Returns
True if cancelled successfully
Raises
ScrapflyCrawlerError- If crawler not started yet
Example
# Start a crawl crawl = Crawl(client, config).crawl() # Cancel it crawl.cancel() def crawl(self) ‑> Crawl-
Expand source code
def crawl(self) -> 'Crawl': """ Start the crawler job Returns: Self for method chaining Raises: RuntimeError: If crawler already started Example: ```python crawl = Crawl(client, config) crawl.crawl() # Start crawling ``` """ if self._uuid is not None: raise ScrapflyCrawlerError( message="Crawler already started", code="ALREADY_STARTED", http_status_code=400 ) response = self._client.start_crawl(self._config) self._uuid = response.uuid return selfStart the crawler job
Returns
Self for method chaining
Raises
RuntimeError- If crawler already started
Example
crawl = Crawl(client, config) crawl.crawl() # Start crawling def har(self) ‑> CrawlerArtifactResponse-
Expand source code
def har(self) -> CrawlerArtifactResponse: """ Download the crawler artifact in HAR (HTTP Archive) format Returns: CrawlerArtifactResponse with parsed HAR data Raises: RuntimeError: If crawler not started yet Example: ```python # Get HAR artifact artifact = crawl.har() # Get all pages pages = artifact.get_pages() # Iterate through HAR entries for entry in artifact.iter_responses(): print(f"{entry.url}: {entry.status_code}") print(f"Timing: {entry.time}ms") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) return self._client.get_crawl_artifact( self._uuid, artifact_type='har' )Download the crawler artifact in HAR (HTTP Archive) format
Returns
CrawlerArtifactResponse with parsed HAR data
Raises
RuntimeError- If crawler not started yet
Example
# Get HAR artifact artifact = crawl.har() # Get all pages pages = artifact.get_pages() # Iterate through HAR entries for entry in artifact.iter_responses(): print(f"{entry.url}: {entry.status_code}") print(f"Timing: {entry.time}ms") def read(self,
url: str,
format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html') ‑> CrawlContent | None-
Expand source code
def read(self, url: str, format: ContentFormat = 'html') -> Optional[CrawlContent]: """ Read content from a specific URL in the crawl results Args: url: The URL to retrieve content for format: Content format - 'html', 'markdown', 'text', 'clean_html', 'json', 'extracted_data', 'page_metadata' Returns: CrawlContent object with content and metadata, or None if URL not found Example: ```python # Get HTML content for a specific URL content = crawl.read('https://example.com/page1') if content: print(f"URL: {content.url}") print(f"Status: {content.status_code}") print(f"Duration: {content.duration}s") print(content.content) # Get markdown content content = crawl.read('https://example.com/page1', format='markdown') if content: print(content.content) # Check if URL was crawled if crawl.read('https://example.com/missing') is None: print("URL not found in crawl results") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) # For HTML format, we can get it from the WARC artifact (faster) if format == 'html': artifact = self.warc() for record in artifact.iter_responses(): if record.url == url: # Extract metadata from WARC headers warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') duration = float(duration_str) if duration_str else None return CrawlContent( url=record.url, content=record.content.decode('utf-8', errors='replace'), status_code=record.status_code, headers=record.headers, duration=duration, log_id=warc_headers.get('WARC-Scrape-Log-Id'), country=warc_headers.get('WARC-Scrape-Country'), crawl_uuid=self._uuid ) return None # For other formats (markdown, text, etc.), use the contents API try: result = self._client.get_crawl_contents( self._uuid, format=format ) # The API returns: {"contents": {url: {format: content, ...}, ...}, "links": {...}} contents = result.get('contents', {}) if url in contents: content_data = contents[url] # Content is always a dict with format keys (e.g., {"html": "...", "markdown": "..."}) content_str = content_data.get(format) if content_str: # For non-HTML formats from contents API, we don't have full metadata # Try to get status code from WARC if possible status_code = 200 # Default headers = {} duration = None log_id = None country = None # Try to get metadata from WARC try: artifact = self.warc() for record in artifact.iter_responses(): if record.url == url: status_code = record.status_code headers = record.headers warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') duration = float(duration_str) if duration_str else None log_id = warc_headers.get('WARC-Scrape-Log-Id') country = warc_headers.get('WARC-Scrape-Country') break except: pass return CrawlContent( url=url, content=content_str, status_code=status_code, headers=headers, duration=duration, log_id=log_id, country=country, crawl_uuid=self._uuid ) return None except Exception: # If contents API fails, return None return NoneRead content from a specific URL in the crawl results
Args
url- The URL to retrieve content for
format- Content format - 'html', 'markdown', 'text', 'clean_html', 'json', 'extracted_data', 'page_metadata'
Returns
CrawlContent object with content and metadata, or None if URL not found
Example
# Get HTML content for a specific URL content = crawl.read('https://example.com/page1') if content: print(f"URL: {content.url}") print(f"Status: {content.status_code}") print(f"Duration: {content.duration}s") print(content.content) # Get markdown content content = crawl.read('https://example.com/page1', format='markdown') if content: print(content.content) # Check if URL was crawled if crawl.read('https://example.com/missing') is None: print("URL not found in crawl results") def read_batch(self,
urls: List[str],
formats: List[Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata']] = None) ‑> Dict[str, Dict[str, str]]-
Expand source code
def read_batch( self, urls: List[str], formats: List[ContentFormat] = None ) -> Dict[str, Dict[str, str]]: """ Retrieve content for multiple URLs in a single batch request This is more efficient than calling read() multiple times as it retrieves all content in a single API call. Maximum 100 URLs per request. Args: urls: List of URLs to retrieve (max 100) formats: List of content formats to retrieve (e.g., ['markdown', 'text']) If None, defaults to ['html'] Returns: Dictionary mapping URLs to their content in requested formats: { 'https://example.com/page1': { 'markdown': '# Page 1...', 'text': 'Page 1...' }, 'https://example.com/page2': { 'markdown': '# Page 2...', 'text': 'Page 2...' } } Example: ```python # Get markdown and text for multiple URLs urls = ['https://example.com/page1', 'https://example.com/page2'] contents = crawl.read_batch(urls, formats=['markdown', 'text']) for url, formats in contents.items(): markdown = formats.get('markdown', '') text = formats.get('text', '') print(f"{url}: {len(markdown)} chars markdown, {len(text)} chars text") ``` Raises: ValueError: If more than 100 URLs are provided ScrapflyCrawlerError: If crawler not started or request fails """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) if len(urls) > 100: raise ValueError("Maximum 100 URLs per batch request") if not urls: return {} # Default to html if no formats specified if formats is None: formats = ['html'] # Build URL with formats parameter formats_str = ','.join(formats) url = f"{self._client.host}/crawl/{self._uuid}/contents/batch" params = { 'key': self._client.key, 'formats': formats_str } # Prepare request body (newline-separated URLs) body = '\n'.join(urls) # Make request import requests response = requests.post( url, params=params, data=body.encode('utf-8'), headers={'Content-Type': 'text/plain'}, verify=self._client.verify ) if response.status_code != 200: raise ScrapflyCrawlerError( message=f"Batch content request failed: {response.status_code}", code="BATCH_REQUEST_FAILED", http_status_code=response.status_code ) # Parse multipart response content_type = response.headers.get('Content-Type', '') if not content_type.startswith('multipart/related'): raise ScrapflyCrawlerError( message=f"Unexpected content type: {content_type}", code="INVALID_RESPONSE", http_status_code=500 ) # Extract boundary from Content-Type header boundary = None for part in content_type.split(';'): part = part.strip() if part.startswith('boundary='): boundary = part.split('=', 1)[1] break if not boundary: raise ScrapflyCrawlerError( message="No boundary found in multipart response", code="INVALID_RESPONSE", http_status_code=500 ) # Parse multipart message # Prepend Content-Type header to make it a valid email message for the parser message_bytes = f"Content-Type: {content_type}\r\n\r\n".encode('utf-8') + response.content parser = BytesParser(policy=default) message = parser.parsebytes(message_bytes) # Extract content from each part result = {} for part in message.walk(): # Skip the container itself if part.get_content_maintype() == 'multipart': continue # Get the URL from Content-Location header content_location = part.get('Content-Location') if not content_location: continue # Get content type to determine format part_content_type = part.get_content_type() format_type = None # Map MIME types to format names if 'markdown' in part_content_type: format_type = 'markdown' elif 'plain' in part_content_type: format_type = 'text' elif 'html' in part_content_type: format_type = 'html' elif 'json' in part_content_type: format_type = 'json' if not format_type: continue # Get content content = part.get_content() if isinstance(content, bytes): content = content.decode('utf-8', errors='replace') # Initialize URL dict if needed if content_location not in result: result[content_location] = {} # Store content result[content_location][format_type] = content return resultRetrieve content for multiple URLs in a single batch request
This is more efficient than calling read() multiple times as it retrieves all content in a single API call. Maximum 100 URLs per request.
Args
urls- List of URLs to retrieve (max 100)
formats- List of content formats to retrieve (e.g., ['markdown', 'text']) If None, defaults to ['html']
Returns
Dictionary mapping URLs to their content in requested formats: { 'https://example.com/page1': { 'markdown': '# Page 1…', 'text': 'Page 1…' }, 'https://example.com/page2': { 'markdown': '# Page 2…', 'text': 'Page 2…' } }
Example
# Get markdown and text for multiple URLs urls = ['https://example.com/page1', 'https://example.com/page2'] contents = crawl.read_batch(urls, formats=['markdown', 'text']) for url, formats in contents.items(): markdown = formats.get('markdown', '') text = formats.get('text', '') print(f"{url}: {len(markdown)} chars markdown, {len(text)} chars text")Raises
ValueError- If more than 100 URLs are provided
ScrapflyCrawlerError- If crawler not started or request fails
def read_iter(self,
pattern: str,
format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html') ‑> Iterator[CrawlContent]-
Expand source code
def read_iter( self, pattern: str, format: ContentFormat = 'html' ) -> Iterator[CrawlContent]: """ Iterate through URLs matching a pattern and yield their content Supports wildcard patterns using * and ? for flexible URL matching. Args: pattern: URL pattern with wildcards (* matches any characters, ? matches one) Examples: "/products?page=*", "https://example.com/*/detail", "*/product/*" format: Content format to retrieve Yields: CrawlContent objects for each matching URL Example: ```python # Get all product pages in markdown for content in crawl.read_iter(pattern="*/products?page=*", format="markdown"): print(f"{content.url}: {len(content.content)} chars") print(f"Duration: {content.duration}s") # Get all detail pages for content in crawl.read_iter(pattern="*/detail/*"): process(content.content) # Pattern matching examples: # "/products?page=*" matches /products?page=1, /products?page=2, etc. # "*/product/*" matches any URL with /product/ in the path # "https://example.com/page?" matches https://example.com/page1, page2, etc. ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) # For HTML format, use WARC artifact (faster) if format == 'html': artifact = self.warc() for record in artifact.iter_responses(): if fnmatch.fnmatch(record.url, pattern): # Extract metadata from WARC headers warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') duration = float(duration_str) if duration_str else None yield CrawlContent( url=record.url, content=record.content.decode('utf-8', errors='replace'), status_code=record.status_code, headers=record.headers, duration=duration, log_id=warc_headers.get('WARC-Scrape-Log-Id'), country=warc_headers.get('WARC-Scrape-Country'), crawl_uuid=self._uuid ) else: # For other formats, use contents API try: result = self._client.get_crawl_contents( self._uuid, format=format ) contents = result.get('contents', {}) # Build a metadata cache from WARC for non-HTML formats metadata_cache = {} try: artifact = self.warc() for record in artifact.iter_responses(): warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') metadata_cache[record.url] = { 'status_code': record.status_code, 'headers': record.headers, 'duration': float(duration_str) if duration_str else None, 'log_id': warc_headers.get('WARC-Scrape-Log-Id'), 'country': warc_headers.get('WARC-Scrape-Country') } except: pass # Iterate through matching URLs for url, content_data in contents.items(): if fnmatch.fnmatch(url, pattern): # Content is always a dict with format keys (e.g., {"html": "...", "markdown": "..."}) content = content_data.get(format) if content: # Get metadata from cache or use defaults metadata = metadata_cache.get(url, {}) yield CrawlContent( url=url, content=content, status_code=metadata.get('status_code', 200), headers=metadata.get('headers', {}), duration=metadata.get('duration'), log_id=metadata.get('log_id'), country=metadata.get('country'), crawl_uuid=self._uuid ) except Exception: # If contents API fails, yield nothing returnIterate through URLs matching a pattern and yield their content
Supports wildcard patterns using * and ? for flexible URL matching.
Args
pattern- URL pattern with wildcards ( matches any characters, ? matches one) Examples: "/products?page=", "https://example.com//detail", "/product/*"
format- Content format to retrieve
Yields
CrawlContent objects for each matching URL
Example
# Get all product pages in markdown for content in crawl.read_iter(pattern="*/products?page=*", format="markdown"): print(f"{content.url}: {len(content.content)} chars") print(f"Duration: {content.duration}s") # Get all detail pages for content in crawl.read_iter(pattern="*/detail/*"): process(content.content) # Pattern matching examples: # "/products?page=*" matches /products?page=1, /products?page=2, etc. # "*/product/*" matches any URL with /product/ in the path # "https://example.com/page?" matches <https://example.com/page1,> page2, etc. def stats(self) ‑> Dict[str, Any]-
Expand source code
def stats(self) -> Dict[str, Any]: """ Get comprehensive statistics about the crawl Returns: Dictionary with crawl statistics Example: ```python stats = crawl.stats() print(f"URLs discovered: {stats['urls_discovered']}") print(f"URLs crawled: {stats['urls_crawled']}") print(f"Success rate: {stats['success_rate']:.1f}%") print(f"Total size: {stats['total_size_kb']:.2f} KB") ``` """ status = self.status(refresh=False) # Basic stats from status stats_dict = { 'uuid': self._uuid, 'status': status.status, 'urls_discovered': status.urls_discovered, 'urls_crawled': status.urls_crawled, 'urls_pending': status.urls_pending, 'urls_failed': status.urls_failed, 'progress_pct': status.progress_pct, 'is_complete': status.is_complete, 'is_running': status.is_running, 'is_failed': status.is_failed, } # Calculate basic crawl rate (crawled vs discovered) if status.urls_discovered > 0: stats_dict['crawl_rate'] = (status.urls_crawled / status.urls_discovered) * 100 # Add artifact stats if available if self._artifact_cache is not None: pages = self._artifact_cache.get_pages() total_size = sum(len(p['content']) for p in pages) avg_size = total_size / len(pages) if pages else 0 stats_dict.update({ 'pages_downloaded': len(pages), 'total_size_bytes': total_size, 'total_size_kb': total_size / 1024, 'total_size_mb': total_size / (1024 * 1024), 'avg_page_size_bytes': avg_size, 'avg_page_size_kb': avg_size / 1024, }) # Calculate download rate (pages vs discovered) if status.urls_discovered > 0: stats_dict['download_rate'] = (len(pages) / status.urls_discovered) * 100 return stats_dictGet comprehensive statistics about the crawl
Returns
Dictionary with crawl statistics
Example
stats = crawl.stats() print(f"URLs discovered: {stats['urls_discovered']}") print(f"URLs crawled: {stats['urls_crawled']}") print(f"Success rate: {stats['success_rate']:.1f}%") print(f"Total size: {stats['total_size_kb']:.2f} KB") def status(self, refresh: bool = True) ‑> CrawlerStatusResponse-
Expand source code
def status(self, refresh: bool = True) -> CrawlerStatusResponse: """ Get current crawler status Args: refresh: If True, fetch fresh status from API. If False, return cached status. Returns: CrawlerStatusResponse with current status Raises: RuntimeError: If crawler not started yet Example: ```python status = crawl.status() print(f"Progress: {status.progress_pct}%") print(f"URLs crawled: {status.urls_crawled}") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) if refresh or self._status_cache is None: self._status_cache = self._client.get_crawl_status(self._uuid) return self._status_cacheGet current crawler status
Args
refresh- If True, fetch fresh status from API. If False, return cached status.
Returns
CrawlerStatusResponse with current status
Raises
RuntimeError- If crawler not started yet
Example
status = crawl.status() print(f"Progress: {status.progress_pct}%") print(f"URLs crawled: {status.urls_crawled}") def wait(self, poll_interval: int = 5, max_wait: int | None = None, verbose: bool = False) ‑> Crawl-
Expand source code
def wait( self, poll_interval: int = 5, max_wait: Optional[int] = None, verbose: bool = False ) -> 'Crawl': """ Wait for crawler to complete Polls the status endpoint until the crawler finishes. Args: poll_interval: Seconds between status checks (default: 5) max_wait: Maximum seconds to wait (None = wait forever) verbose: If True, print progress updates Returns: Self for method chaining Raises: RuntimeError: If crawler not started, failed, or timed out Example: ```python # Wait with progress updates crawl.crawl().wait(verbose=True) # Wait with timeout crawl.crawl().wait(max_wait=300) # 5 minutes max ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) start_time = time.time() poll_count = 0 while True: status = self.status(refresh=True) poll_count += 1 if verbose: logger.info(f"Poll #{poll_count}: {status.status} - " f"{status.progress_pct:.1f}% - " f"{status.urls_crawled}/{status.urls_discovered} URLs") if status.is_complete: if verbose: logger.info(f"✓ Crawler completed successfully!") return self elif status.is_failed: raise ScrapflyCrawlerError( message=f"Crawler failed with status: {status.status}", code="FAILED", http_status_code=400 ) elif status.is_cancelled: raise ScrapflyCrawlerError( message="Crawler was cancelled", code="CANCELLED", http_status_code=400 ) # Check timeout if max_wait is not None: elapsed = time.time() - start_time if elapsed > max_wait: raise ScrapflyCrawlerError( message=f"Timeout waiting for crawler (>{max_wait}s)", code="TIMEOUT", http_status_code=400 ) time.sleep(poll_interval)Wait for crawler to complete
Polls the status endpoint until the crawler finishes.
Args
poll_interval- Seconds between status checks (default: 5)
max_wait- Maximum seconds to wait (None = wait forever)
verbose- If True, print progress updates
Returns
Self for method chaining
Raises
RuntimeError- If crawler not started, failed, or timed out
Example
# Wait with progress updates crawl.crawl().wait(verbose=True) # Wait with timeout crawl.crawl().wait(max_wait=300) # 5 minutes max def warc(self, artifact_type: str = 'warc') ‑> CrawlerArtifactResponse-
Expand source code
def warc(self, artifact_type: str = 'warc') -> CrawlerArtifactResponse: """ Download the crawler artifact (WARC file) Args: artifact_type: Type of artifact to download (default: 'warc') Returns: CrawlerArtifactResponse with parsed WARC data Raises: RuntimeError: If crawler not started yet Example: ```python # Get WARC artifact artifact = crawl.warc() # Get all pages pages = artifact.get_pages() # Iterate through responses for record in artifact.iter_responses(): print(record.url) ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) if self._artifact_cache is None: self._artifact_cache = self._client.get_crawl_artifact( self._uuid, artifact_type=artifact_type ) return self._artifact_cacheDownload the crawler artifact (WARC file)
Args
artifact_type- Type of artifact to download (default: 'warc')
Returns
CrawlerArtifactResponse with parsed WARC data
Raises
RuntimeError- If crawler not started yet
Example
# Get WARC artifact artifact = crawl.warc() # Get all pages pages = artifact.get_pages() # Iterate through responses for record in artifact.iter_responses(): print(record.url)
class CrawlCompletedWebhook (event: str,
uuid: str,
timestamp: datetime.datetime,
status: str,
urls_discovered: int,
urls_crawled: int,
urls_failed: int)-
Expand source code
@dataclass class CrawlCompletedWebhook(CrawlerWebhookBase): """ Webhook payload for crawl.completed event. Sent when a crawler job completes (successfully or with errors). Additional fields: - status: Final crawler status (COMPLETED, FAILED, etc.) - urls_discovered: Total number of URLs discovered - urls_crawled: Number of URLs successfully crawled - urls_failed: Number of URLs that failed Example payload: { "event": "crawl.completed", "uuid": "550e8400-e29b-41d4-a716-446655440000", "status": "COMPLETED", "urls_discovered": 100, "urls_crawled": 95, "urls_failed": 5, "timestamp": "2025-01-16T10:35:00Z" } """ status: str urls_discovered: int urls_crawled: int urls_failed: int @classmethod def from_dict(cls, data: Dict) -> 'CrawlCompletedWebhook': """Create webhook instance from dictionary payload""" base = CrawlerWebhookBase.from_dict(data) return cls( event=base.event, uuid=base.uuid, timestamp=base.timestamp, status=data['status'], urls_discovered=data['urls_discovered'], urls_crawled=data['urls_crawled'], urls_failed=data['urls_failed'] )Webhook payload for crawl.completed event.
Sent when a crawler job completes (successfully or with errors).
Additional fields: - status: Final crawler status (COMPLETED, FAILED, etc.) - urls_discovered: Total number of URLs discovered - urls_crawled: Number of URLs successfully crawled - urls_failed: Number of URLs that failed
Example payload: { "event": "crawl.completed", "uuid": "550e8400-e29b-41d4-a716-446655440000", "status": "COMPLETED", "urls_discovered": 100, "urls_crawled": 95, "urls_failed": 5, "timestamp": "2025-01-16T10:35:00Z" }
Ancestors
Instance variables
var status : strvar urls_crawled : intvar urls_discovered : intvar urls_failed : int
Inherited members
class CrawlContent (url: str,
content: str,
status_code: int,
headers: Dict[str, str] | None = None,
duration: float | None = None,
log_id: str | None = None,
country: str | None = None,
crawl_uuid: str | None = None)-
Expand source code
class CrawlContent: """ Response object for a single crawled URL Provides access to content and metadata for a crawled page. Similar to ScrapeApiResponse but for crawler results. Attributes: url: The crawled URL (mandatory) content: Page content in requested format (mandatory) status_code: HTTP response status code (mandatory) headers: HTTP response headers (optional) duration: Request duration in seconds (optional) log_id: Scrape log ID for debugging (optional) log_url: URL to view scrape logs (optional) country: Country the request was made from (optional) Example: ```python # Get content for a URL content = crawl.read('https://example.com', format='markdown') print(f"URL: {content.url}") print(f"Status: {content.status_code}") print(f"Duration: {content.duration}s") print(f"Content: {content.content}") # Access metadata if content.log_url: print(f"View logs: {content.log_url}") ``` """ def __init__( self, url: str, content: str, status_code: int, headers: Optional[Dict[str, str]] = None, duration: Optional[float] = None, log_id: Optional[str] = None, country: Optional[str] = None, crawl_uuid: Optional[str] = None ): """ Initialize CrawlContent Args: url: The crawled URL content: Page content in requested format status_code: HTTP response status code headers: HTTP response headers duration: Request duration in seconds log_id: Scrape log ID country: Country the request was made from crawl_uuid: Crawl job UUID """ self.url = url self.content = content self.status_code = status_code self.headers = headers or {} self.duration = duration self.log_id = log_id self.country = country self._crawl_uuid = crawl_uuid @property def log_url(self) -> Optional[str]: """ Get URL to view scrape logs Returns: Log URL if log_id is available, None otherwise """ if self.log_id: return f"https://scrapfly.io/dashboard/logs/{self.log_id}" return None @property def success(self) -> bool: """Check if the request was successful (2xx status code)""" return 200 <= self.status_code < 300 @property def error(self) -> bool: """Check if the request resulted in an error (4xx/5xx status code)""" return self.status_code >= 400 def __repr__(self) -> str: return (f"CrawlContent(url={self.url!r}, status={self.status_code}, " f"content_length={len(self.content)})") def __str__(self) -> str: return self.content def __len__(self) -> int: """Get content length""" return len(self.content)Response object for a single crawled URL
Provides access to content and metadata for a crawled page. Similar to ScrapeApiResponse but for crawler results.
Attributes
url- The crawled URL (mandatory)
content- Page content in requested format (mandatory)
status_code- HTTP response status code (mandatory)
headers- HTTP response headers (optional)
duration- Request duration in seconds (optional)
log_id- Scrape log ID for debugging (optional)
log_url- URL to view scrape logs (optional)
country- Country the request was made from (optional)
Example
# Get content for a URL content = crawl.read('https://example.com', format='markdown') print(f"URL: {content.url}") print(f"Status: {content.status_code}") print(f"Duration: {content.duration}s") print(f"Content: {content.content}") # Access metadata if content.log_url: print(f"View logs: {content.log_url}")Initialize CrawlContent
Args
url- The crawled URL
content- Page content in requested format
status_code- HTTP response status code
headers- HTTP response headers
duration- Request duration in seconds
log_id- Scrape log ID
country- Country the request was made from
crawl_uuid- Crawl job UUID
Instance variables
prop error : bool-
Expand source code
@property def error(self) -> bool: """Check if the request resulted in an error (4xx/5xx status code)""" return self.status_code >= 400Check if the request resulted in an error (4xx/5xx status code)
prop log_url : str | None-
Expand source code
@property def log_url(self) -> Optional[str]: """ Get URL to view scrape logs Returns: Log URL if log_id is available, None otherwise """ if self.log_id: return f"https://scrapfly.io/dashboard/logs/{self.log_id}" return NoneGet URL to view scrape logs
Returns
Log URL if log_id is available, None otherwise
prop success : bool-
Expand source code
@property def success(self) -> bool: """Check if the request was successful (2xx status code)""" return 200 <= self.status_code < 300Check if the request was successful (2xx status code)
class CrawlStartedWebhook (event: str, uuid: str, timestamp: datetime.datetime, status: str)-
Expand source code
@dataclass class CrawlStartedWebhook(CrawlerWebhookBase): """ Webhook payload for crawl.started event. Sent when a crawler job starts running. Additional fields: - status: Current crawler status (should be 'RUNNING') Example payload: { "event": "crawl.started", "uuid": "550e8400-e29b-41d4-a716-446655440000", "status": "RUNNING", "timestamp": "2025-01-16T10:30:00Z" } """ status: str @classmethod def from_dict(cls, data: Dict) -> 'CrawlStartedWebhook': """Create webhook instance from dictionary payload""" base = CrawlerWebhookBase.from_dict(data) return cls( event=base.event, uuid=base.uuid, timestamp=base.timestamp, status=data['status'] )Webhook payload for crawl.started event.
Sent when a crawler job starts running.
Additional fields: - status: Current crawler status (should be 'RUNNING')
Example payload: { "event": "crawl.started", "uuid": "550e8400-e29b-41d4-a716-446655440000", "status": "RUNNING", "timestamp": "2025-01-16T10:30:00Z" }
Ancestors
Instance variables
var status : str
Inherited members
class CrawlUrlDiscoveredWebhook (event: str, uuid: str, timestamp: datetime.datetime, url: str, depth: int)-
Expand source code
@dataclass class CrawlUrlDiscoveredWebhook(CrawlerWebhookBase): """ Webhook payload for crawl.url_discovered event. Sent when a new URL is discovered during crawling. Additional fields: - url: The discovered URL - depth: Depth level of the URL from the starting URL Example payload: { "event": "crawl.url_discovered", "uuid": "550e8400-e29b-41d4-a716-446655440000", "url": "https://example.com/page", "depth": 1, "timestamp": "2025-01-16T10:30:05Z" } """ url: str depth: int @classmethod def from_dict(cls, data: Dict) -> 'CrawlUrlDiscoveredWebhook': """Create webhook instance from dictionary payload""" base = CrawlerWebhookBase.from_dict(data) return cls( event=base.event, uuid=base.uuid, timestamp=base.timestamp, url=data['url'], depth=data['depth'] )Webhook payload for crawl.url_discovered event.
Sent when a new URL is discovered during crawling.
Additional fields: - url: The discovered URL - depth: Depth level of the URL from the starting URL
Example payload: { "event": "crawl.url_discovered", "uuid": "550e8400-e29b-41d4-a716-446655440000", "url": "https://example.com/page", "depth": 1, "timestamp": "2025-01-16T10:30:05Z" }
Ancestors
Instance variables
var depth : intvar url : str
Inherited members
class CrawlUrlFailedWebhook (event: str,
uuid: str,
timestamp: datetime.datetime,
url: str,
error: str,
status_code: int | None = None)-
Expand source code
@dataclass class CrawlUrlFailedWebhook(CrawlerWebhookBase): """ Webhook payload for crawl.url_failed event. Sent when a URL fails to be crawled. Additional fields: - url: The URL that failed - error: Error message describing the failure - status_code: HTTP status code if available (optional) Example payload: { "event": "crawl.url_failed", "uuid": "550e8400-e29b-41d4-a716-446655440000", "url": "https://example.com/page", "error": "HTTP 404 Not Found", "status_code": 404, "timestamp": "2025-01-16T10:30:10Z" } """ url: str error: str status_code: Optional[int] = None @classmethod def from_dict(cls, data: Dict) -> 'CrawlUrlFailedWebhook': """Create webhook instance from dictionary payload""" base = CrawlerWebhookBase.from_dict(data) return cls( event=base.event, uuid=base.uuid, timestamp=base.timestamp, url=data['url'], error=data['error'], status_code=data.get('status_code') )Webhook payload for crawl.url_failed event.
Sent when a URL fails to be crawled.
Additional fields: - url: The URL that failed - error: Error message describing the failure - status_code: HTTP status code if available (optional)
Example payload: { "event": "crawl.url_failed", "uuid": "550e8400-e29b-41d4-a716-446655440000", "url": "https://example.com/page", "error": "HTTP 404 Not Found", "status_code": 404, "timestamp": "2025-01-16T10:30:10Z" }
Ancestors
Instance variables
var error : strvar status_code : int | Nonevar url : str
Inherited members
class CrawlerArtifactResponse (artifact_data: bytes, artifact_type: str = 'warc')-
Expand source code
class CrawlerArtifactResponse: """ Response from downloading crawler artifacts Returned by ScrapflyClient.get_crawl_artifact() method. Provides high-level access to crawl results with automatic WARC/HAR parsing. Users don't need to understand WARC or HAR format to use this class. Example: ```python # Get WARC artifact (default) artifact = client.get_crawl_artifact(uuid) # Get HAR artifact artifact = client.get_crawl_artifact(uuid, artifact_type='har') # Easy mode: get all pages as dicts pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") html = page['content'].decode('utf-8') # Memory-efficient: iterate one page at a time for record in artifact.iter_responses(): print(f"{record.url}: {record.status_code}") process(record.content) # Save to file artifact.save('crawl_results.warc.gz') ``` """ def __init__(self, artifact_data: bytes, artifact_type: str = 'warc'): """ Initialize from artifact data Args: artifact_data: Raw artifact file bytes artifact_type: Type of artifact ('warc' or 'har') """ self._artifact_data = artifact_data self._artifact_type = artifact_type self._warc_parser: Optional[WarcParser] = None self._har_parser: Optional[HarArchive] = None @property def artifact_type(self) -> str: """Get artifact type ('warc' or 'har')""" return self._artifact_type @property def artifact_data(self) -> bytes: """Get raw artifact data (for advanced users)""" return self._artifact_data @property def warc_data(self) -> bytes: """Get raw WARC data (deprecated, use artifact_data)""" return self._artifact_data @property def parser(self) -> Union[WarcParser, HarArchive]: """Get artifact parser instance (lazy-loaded)""" if self._artifact_type == 'har': if self._har_parser is None: self._har_parser = HarArchive(self._artifact_data) return self._har_parser else: if self._warc_parser is None: self._warc_parser = parse_warc(self._artifact_data) return self._warc_parser def iter_records(self) -> Iterator[Union[WarcRecord, HarEntry]]: """ Iterate through all records For WARC: iterates through all WARC records For HAR: iterates through all HAR entries Yields: WarcRecord or HarEntry: Each record in the artifact """ if self._artifact_type == 'har': return self.parser.iter_entries() else: return self.parser.iter_records() def iter_responses(self) -> Iterator[Union[WarcRecord, HarEntry]]: """ Iterate through HTTP response records only This is more memory-efficient than get_pages() for large crawls. For WARC: iterates through response records For HAR: iterates through all entries (HAR only contains responses) Yields: WarcRecord or HarEntry: HTTP response records with url, status_code, headers, content """ if self._artifact_type == 'har': return self.parser.iter_entries() else: return self.parser.iter_responses() def get_pages(self) -> List[Dict]: """ Get all crawled pages as simple dictionaries This is the easiest way to access crawl results. Works with both WARC and HAR formats. Returns: List of dicts with keys: url, status_code, headers, content Example: ```python pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {len(page['content'])} bytes") html = page['content'].decode('utf-8') ``` """ if self._artifact_type == 'har': # Convert HAR entries to page dicts pages = [] for entry in self.parser.iter_entries(): pages.append({ 'url': entry.url, 'status_code': entry.status_code, 'headers': entry.response_headers, 'content': entry.content }) return pages else: return self.parser.get_pages() @property def total_pages(self) -> int: """Get total number of pages in the artifact""" return len(self.get_pages()) def save(self, filepath: str): """ Save WARC data to file Args: filepath: Path to save the WARC file Example: ```python artifact.save('crawl_results.warc.gz') ``` """ with open(filepath, 'wb') as f: f.write(self.warc_data) def __repr__(self): return f"CrawlerArtifactResponse(size={len(self.warc_data)} bytes)"Response from downloading crawler artifacts
Returned by ScrapflyClient.get_crawl_artifact() method.
Provides high-level access to crawl results with automatic WARC/HAR parsing. Users don't need to understand WARC or HAR format to use this class.
Example
# Get WARC artifact (default) artifact = client.get_crawl_artifact(uuid) # Get HAR artifact artifact = client.get_crawl_artifact(uuid, artifact_type='har') # Easy mode: get all pages as dicts pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") html = page['content'].decode('utf-8') # Memory-efficient: iterate one page at a time for record in artifact.iter_responses(): print(f"{record.url}: {record.status_code}") process(record.content) # Save to file artifact.save('crawl_results.warc.gz')Initialize from artifact data
Args
artifact_data- Raw artifact file bytes
artifact_type- Type of artifact ('warc' or 'har')
Instance variables
prop artifact_data : bytes-
Expand source code
@property def artifact_data(self) -> bytes: """Get raw artifact data (for advanced users)""" return self._artifact_dataGet raw artifact data (for advanced users)
prop artifact_type : str-
Expand source code
@property def artifact_type(self) -> str: """Get artifact type ('warc' or 'har')""" return self._artifact_typeGet artifact type ('warc' or 'har')
prop parser : WarcParser | HarArchive-
Expand source code
@property def parser(self) -> Union[WarcParser, HarArchive]: """Get artifact parser instance (lazy-loaded)""" if self._artifact_type == 'har': if self._har_parser is None: self._har_parser = HarArchive(self._artifact_data) return self._har_parser else: if self._warc_parser is None: self._warc_parser = parse_warc(self._artifact_data) return self._warc_parserGet artifact parser instance (lazy-loaded)
prop total_pages : int-
Expand source code
@property def total_pages(self) -> int: """Get total number of pages in the artifact""" return len(self.get_pages())Get total number of pages in the artifact
prop warc_data : bytes-
Expand source code
@property def warc_data(self) -> bytes: """Get raw WARC data (deprecated, use artifact_data)""" return self._artifact_dataGet raw WARC data (deprecated, use artifact_data)
Methods
def get_pages(self) ‑> List[Dict]-
Expand source code
def get_pages(self) -> List[Dict]: """ Get all crawled pages as simple dictionaries This is the easiest way to access crawl results. Works with both WARC and HAR formats. Returns: List of dicts with keys: url, status_code, headers, content Example: ```python pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {len(page['content'])} bytes") html = page['content'].decode('utf-8') ``` """ if self._artifact_type == 'har': # Convert HAR entries to page dicts pages = [] for entry in self.parser.iter_entries(): pages.append({ 'url': entry.url, 'status_code': entry.status_code, 'headers': entry.response_headers, 'content': entry.content }) return pages else: return self.parser.get_pages()Get all crawled pages as simple dictionaries
This is the easiest way to access crawl results. Works with both WARC and HAR formats.
Returns
Listofdicts with keys- url, status_code, headers, content
Example
pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {len(page['content'])} bytes") html = page['content'].decode('utf-8') def iter_records(self) ‑> Iterator[WarcRecord | HarEntry]-
Expand source code
def iter_records(self) -> Iterator[Union[WarcRecord, HarEntry]]: """ Iterate through all records For WARC: iterates through all WARC records For HAR: iterates through all HAR entries Yields: WarcRecord or HarEntry: Each record in the artifact """ if self._artifact_type == 'har': return self.parser.iter_entries() else: return self.parser.iter_records()Iterate through all records
For WARC: iterates through all WARC records For HAR: iterates through all HAR entries
Yields
WarcRecordorHarEntry- Each record in the artifact
def iter_responses(self) ‑> Iterator[WarcRecord | HarEntry]-
Expand source code
def iter_responses(self) -> Iterator[Union[WarcRecord, HarEntry]]: """ Iterate through HTTP response records only This is more memory-efficient than get_pages() for large crawls. For WARC: iterates through response records For HAR: iterates through all entries (HAR only contains responses) Yields: WarcRecord or HarEntry: HTTP response records with url, status_code, headers, content """ if self._artifact_type == 'har': return self.parser.iter_entries() else: return self.parser.iter_responses()Iterate through HTTP response records only
This is more memory-efficient than get_pages() for large crawls.
For WARC: iterates through response records For HAR: iterates through all entries (HAR only contains responses)
Yields
WarcRecordorHarEntry- HTTP response records with url, status_code, headers, content
def save(self, filepath: str)-
Expand source code
def save(self, filepath: str): """ Save WARC data to file Args: filepath: Path to save the WARC file Example: ```python artifact.save('crawl_results.warc.gz') ``` """ with open(filepath, 'wb') as f: f.write(self.warc_data)Save WARC data to file
Args
filepath- Path to save the WARC file
Example
artifact.save('crawl_results.warc.gz')
class CrawlerConfig (url: str,
page_limit: int | None = None,
max_depth: int | None = None,
max_duration: int | None = None,
exclude_paths: List[str] | None = None,
include_only_paths: List[str] | None = None,
ignore_base_path_restriction: bool = False,
follow_external_links: bool = False,
allowed_external_domains: List[str] | None = None,
headers: Dict[str, str] | None = None,
delay: int | None = None,
user_agent: str | None = None,
max_concurrency: int | None = None,
rendering_delay: int | None = None,
use_sitemaps: bool = False,
respect_robots_txt: bool = False,
ignore_no_follow: bool = False,
cache: bool = False,
cache_ttl: int | None = None,
cache_clear: bool = False,
content_formats: List[Literal['html', 'markdown', 'text', 'clean_html']] | None = None,
extraction_rules: Dict | None = None,
asp: bool = False,
proxy_pool: str | None = None,
country: str | None = None,
webhook_name: str | None = None,
webhook_events: List[str] | None = None,
max_api_credit: int | None = None)-
Expand source code
class CrawlerConfig(BaseApiConfig): """ Configuration for Scrapfly Crawler API The Crawler API performs recursive website crawling with advanced configuration, content extraction, and artifact storage. Example: ```python from scrapfly import ScrapflyClient, CrawlerConfig client = ScrapflyClient(key='YOUR_API_KEY') config = CrawlerConfig( url='https://example.com', page_limit=100, max_depth=3, content_formats=['markdown', 'html'] ) # Start crawl start_response = client.start_crawl(config) uuid = start_response.uuid # Poll status status = client.get_crawl_status(uuid) # Get results when complete if status.is_complete: artifact = client.get_crawl_artifact(uuid) pages = artifact.get_pages() ``` """ WEBHOOK_CRAWLER_STARTED = 'crawler_started' WEBHOOK_CRAWLER_URL_VISITED = 'crawler_url_visited' WEBHOOK_CRAWLER_URL_SKIPPED = 'crawler_url_skipped' WEBHOOK_CRAWLER_URL_DISCOVERED = 'crawler_url_discovered' WEBHOOK_CRAWLER_URL_FAILED = 'crawler_url_failed' WEBHOOK_CRAWLER_STOPPED = 'crawler_stopped' WEBHOOK_CRAWLER_CANCELLED = 'crawler_cancelled' WEBHOOK_CRAWLER_FINISHED = 'crawler_finished' ALL_WEBHOOK_EVENTS = [ WEBHOOK_CRAWLER_STARTED, WEBHOOK_CRAWLER_URL_VISITED, WEBHOOK_CRAWLER_URL_SKIPPED, WEBHOOK_CRAWLER_URL_DISCOVERED, WEBHOOK_CRAWLER_URL_FAILED, WEBHOOK_CRAWLER_STOPPED, WEBHOOK_CRAWLER_CANCELLED, WEBHOOK_CRAWLER_FINISHED, ] def __init__( self, url: str, # Crawl limits page_limit: Optional[int] = None, max_depth: Optional[int] = None, max_duration: Optional[int] = None, # Path filtering (mutually exclusive) exclude_paths: Optional[List[str]] = None, include_only_paths: Optional[List[str]] = None, # Advanced crawl options ignore_base_path_restriction: bool = False, follow_external_links: bool = False, allowed_external_domains: Optional[List[str]] = None, # Request configuration headers: Optional[Dict[str, str]] = None, delay: Optional[int] = None, user_agent: Optional[str] = None, max_concurrency: Optional[int] = None, rendering_delay: Optional[int] = None, # Crawl strategy options use_sitemaps: bool = False, respect_robots_txt: bool = False, ignore_no_follow: bool = False, # Cache options cache: bool = False, cache_ttl: Optional[int] = None, cache_clear: bool = False, # Content extraction content_formats: Optional[List[Literal['html', 'markdown', 'text', 'clean_html']]] = None, extraction_rules: Optional[Dict] = None, # Web scraping features asp: bool = False, proxy_pool: Optional[str] = None, country: Optional[str] = None, # Webhook integration webhook_name: Optional[str] = None, webhook_events: Optional[List[str]] = None, # Cost control max_api_credit: Optional[int] = None ): """ Initialize a CrawlerConfig Args: url: Starting URL for the crawl (required) page_limit: Maximum number of pages to crawl max_depth: Maximum crawl depth from starting URL max_duration: Maximum crawl duration in seconds exclude_paths: List of path patterns to exclude (mutually exclusive with include_only_paths) include_only_paths: List of path patterns to include only (mutually exclusive with exclude_paths) ignore_base_path_restriction: Allow crawling outside the base path follow_external_links: Follow links to external domains allowed_external_domains: List of external domains allowed when follow_external_links is True headers: Custom HTTP headers for requests delay: Delay between requests in milliseconds user_agent: Custom user agent string max_concurrency: Maximum concurrent requests rendering_delay: Delay for JavaScript rendering in milliseconds use_sitemaps: Use sitemap.xml to discover URLs respect_robots_txt: Respect robots.txt rules ignore_no_follow: Ignore rel="nofollow" attributes cache: Enable caching cache_ttl: Cache time-to-live in seconds cache_clear: Clear cache before crawling content_formats: List of content formats to extract ('html', 'markdown', 'text', 'clean_html') extraction_rules: Custom extraction rules asp: Enable Anti-Scraping Protection bypass proxy_pool: Proxy pool to use (e.g., 'public_residential_pool') country: Target country for geo-located content webhook_name: Webhook name for event notifications webhook_events: List of webhook events to trigger max_api_credit: Maximum API credits to spend on this crawl """ if exclude_paths and include_only_paths: raise ValueError("exclude_paths and include_only_paths are mutually exclusive") params = { 'url': url, } # Add optional parameters if page_limit is not None: params['page_limit'] = page_limit if max_depth is not None: params['max_depth'] = max_depth if max_duration is not None: params['max_duration'] = max_duration # Path filtering if exclude_paths: params['exclude_paths'] = exclude_paths if include_only_paths: params['include_only_paths'] = include_only_paths # Advanced options if ignore_base_path_restriction: params['ignore_base_path_restriction'] = True if follow_external_links: params['follow_external_links'] = True if allowed_external_domains: params['allowed_external_domains'] = allowed_external_domains # Request configuration if headers: params['headers'] = headers if delay is not None: params['delay'] = delay if user_agent: params['user_agent'] = user_agent if max_concurrency is not None: params['max_concurrency'] = max_concurrency if rendering_delay is not None: params['rendering_delay'] = rendering_delay # Crawl strategy if use_sitemaps: params['use_sitemaps'] = True if respect_robots_txt: params['respect_robots_txt'] = True if ignore_no_follow: params['ignore_no_follow'] = True # Cache if cache: params['cache'] = True if cache_ttl is not None: params['cache_ttl'] = cache_ttl if cache_clear: params['cache_clear'] = True # Content extraction if content_formats: params['content_formats'] = content_formats if extraction_rules: params['extraction_rules'] = extraction_rules # Web scraping features if asp: params['asp'] = True if proxy_pool: params['proxy_pool'] = proxy_pool if country: params['country'] = country # Webhooks if webhook_name: params['webhook_name'] = webhook_name if webhook_events: assert all( event in self.ALL_WEBHOOK_EVENTS for event in webhook_events ), f"Invalid webhook events. Valid events are: {self.ALL_WEBHOOK_EVENTS}" params['webhook_events'] = webhook_events # Cost control if max_api_credit is not None: params['max_api_credit'] = max_api_credit self._params = params def to_api_params(self, key: Optional[str] = None) -> Dict: """ Convert config to API parameters :param key: API key (optional, can be added by client) :return: Dictionary of API parameters """ params = self._params.copy() if key: params['key'] = key return paramsConfiguration for Scrapfly Crawler API
The Crawler API performs recursive website crawling with advanced configuration, content extraction, and artifact storage.
Example
from scrapfly import ScrapflyClient, CrawlerConfig client = ScrapflyClient(key='YOUR_API_KEY') config = CrawlerConfig( url='https://example.com', page_limit=100, max_depth=3, content_formats=['markdown', 'html'] ) # Start crawl start_response = client.start_crawl(config) uuid = start_response.uuid # Poll status status = client.get_crawl_status(uuid) # Get results when complete if status.is_complete: artifact = client.get_crawl_artifact(uuid) pages = artifact.get_pages()Initialize a CrawlerConfig
Args
url- Starting URL for the crawl (required)
page_limit- Maximum number of pages to crawl
max_depth- Maximum crawl depth from starting URL
max_duration- Maximum crawl duration in seconds
exclude_paths- List of path patterns to exclude (mutually exclusive with include_only_paths)
include_only_paths- List of path patterns to include only (mutually exclusive with exclude_paths)
ignore_base_path_restriction- Allow crawling outside the base path
follow_external_links- Follow links to external domains
allowed_external_domains- List of external domains allowed when follow_external_links is True
headers- Custom HTTP headers for requests
delay- Delay between requests in milliseconds
user_agent- Custom user agent string
max_concurrency- Maximum concurrent requests
rendering_delay- Delay for JavaScript rendering in milliseconds
use_sitemaps- Use sitemap.xml to discover URLs
respect_robots_txt- Respect robots.txt rules
ignore_no_follow- Ignore rel="nofollow" attributes
cache- Enable caching
cache_ttl- Cache time-to-live in seconds
cache_clear- Clear cache before crawling
content_formats- List of content formats to extract ('html', 'markdown', 'text', 'clean_html')
extraction_rules- Custom extraction rules
asp- Enable Anti-Scraping Protection bypass
proxy_pool- Proxy pool to use (e.g., 'public_residential_pool')
country- Target country for geo-located content
webhook_name- Webhook name for event notifications
webhook_events- List of webhook events to trigger
max_api_credit- Maximum API credits to spend on this crawl
Ancestors
Class variables
var ALL_WEBHOOK_EVENTSvar WEBHOOK_CRAWLER_CANCELLEDvar WEBHOOK_CRAWLER_FINISHEDvar WEBHOOK_CRAWLER_STARTEDvar WEBHOOK_CRAWLER_STOPPEDvar WEBHOOK_CRAWLER_URL_DISCOVEREDvar WEBHOOK_CRAWLER_URL_FAILEDvar WEBHOOK_CRAWLER_URL_SKIPPEDvar WEBHOOK_CRAWLER_URL_VISITED
Methods
def to_api_params(self, key: str | None = None) ‑> Dict-
Expand source code
def to_api_params(self, key: Optional[str] = None) -> Dict: """ Convert config to API parameters :param key: API key (optional, can be added by client) :return: Dictionary of API parameters """ params = self._params.copy() if key: params['key'] = key return paramsConvert config to API parameters
:param key: API key (optional, can be added by client) :return: Dictionary of API parameters
class CrawlerError (message: str,
code: str,
http_status_code: int,
resource: str | None = None,
is_retryable: bool = False,
retry_delay: int | None = None,
retry_times: int | None = None,
documentation_url: str | None = None,
api_response: ForwardRef('ApiResponse') | None = None)-
Expand source code
class CrawlerError(ScrapflyError): """Base exception for Crawler API errors""" passBase exception for Crawler API errors
Ancestors
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
class CrawlerStartResponse (response_data: Dict[str, Any])-
Expand source code
class CrawlerStartResponse: """ Response from starting a crawler job Returned by ScrapflyClient.start_crawl() method. Attributes: uuid: Unique identifier for the crawler job status: Initial status (typically 'PENDING') """ def __init__(self, response_data: Dict[str, Any]): """ Initialize from API response Args: response_data: Raw API response dictionary """ self._data = response_data # API returns 'crawler_uuid' not 'uuid' self.uuid = response_data.get('crawler_uuid') or response_data.get('uuid') self.status = response_data.get('status') def __repr__(self): return f"CrawlerStartResponse(uuid={self.uuid}, status={self.status})"Response from starting a crawler job
Returned by ScrapflyClient.start_crawl() method.
Attributes
uuid- Unique identifier for the crawler job
status- Initial status (typically 'PENDING')
Initialize from API response
Args
response_data- Raw API response dictionary
class CrawlerStatusResponse (response_data: Dict[str, Any])-
Expand source code
class CrawlerStatusResponse: """ Response from checking crawler job status Returned by ScrapflyClient.get_crawl_status() method. Provides real-time progress tracking for crawler jobs. Attributes: uuid: Crawler job UUID status: Current status (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED, DONE) is_success: Whether the crawler job completed successfully is_finished: Whether the crawler job has finished (regardless of success/failure) api_credit_cost: Total API credits consumed by this crawl stop_reason: Reason why the crawler stopped (e.g., 'seed_url_failed', 'page_limit_reached'), None if still running urls_discovered: Total URLs discovered so far urls_crawled: Number of URLs successfully crawled urls_pending: Number of URLs waiting to be crawled urls_failed: Number of URLs that failed to crawl """ # Status constants STATUS_PENDING = 'PENDING' STATUS_RUNNING = 'RUNNING' STATUS_COMPLETED = 'COMPLETED' STATUS_DONE = 'DONE' STATUS_FAILED = 'FAILED' STATUS_CANCELLED = 'CANCELLED' def __init__(self, response_data: Dict[str, Any]): """ Initialize from API response Args: response_data: Raw API response dictionary """ self._data = response_data # API returns crawler_uuid in status response self.uuid = response_data.get('crawler_uuid') or response_data.get('uuid') self.status = response_data.get('status') # New fields from API self.is_success = response_data.get('is_success', False) self.is_finished = response_data.get('is_finished', False) # Parse state dict if present (actual API format) state = response_data.get('state', {}) if state: # Actual API response structure self.urls_discovered = state.get('urls_extracted', 0) self.urls_crawled = state.get('urls_visited', 0) self.urls_pending = state.get('urls_to_crawl', 0) self.urls_failed = state.get('urls_failed', 0) self.stop_reason = state.get('stop_reason') # API credit cost is in the state dict as 'api_credit_used' self.api_credit_cost = state.get('api_credit_used', 0) else: # Fallback for simpler format (if docs change) self.urls_discovered = response_data.get('urls_discovered', 0) self.urls_crawled = response_data.get('urls_crawled', 0) self.urls_pending = response_data.get('urls_pending', 0) self.urls_failed = response_data.get('urls_failed', 0) self.stop_reason = None self.api_credit_cost = response_data.get('api_credit_cost', 0) @property def is_complete(self) -> bool: """Check if crawler job is complete""" return self.status in (self.STATUS_COMPLETED, self.STATUS_DONE) @property def is_running(self) -> bool: """Check if crawler job is currently running""" return self.status in (self.STATUS_PENDING, self.STATUS_RUNNING) @property def is_failed(self) -> bool: """Check if crawler job failed""" return self.status == self.STATUS_FAILED @property def is_cancelled(self) -> bool: """Check if crawler job was cancelled""" return self.status == self.STATUS_CANCELLED @property def progress_pct(self) -> float: """ Calculate progress percentage Returns: Progress as percentage (0-100) """ if self.urls_discovered == 0: return 0.0 return (self.urls_crawled / self.urls_discovered) * 100 def __repr__(self): return (f"CrawlerStatusResponse(uuid={self.uuid}, status={self.status}, " f"progress={self.progress_pct:.1f}%, " f"crawled={self.urls_crawled}/{self.urls_discovered})")Response from checking crawler job status
Returned by ScrapflyClient.get_crawl_status() method.
Provides real-time progress tracking for crawler jobs.
Attributes
uuid- Crawler job UUID
status- Current status (PENDING, RUNNING, COMPLETED, FAILED, CANCELLED, DONE)
is_success- Whether the crawler job completed successfully
is_finished- Whether the crawler job has finished (regardless of success/failure)
api_credit_cost- Total API credits consumed by this crawl
stop_reason- Reason why the crawler stopped (e.g., 'seed_url_failed', 'page_limit_reached'), None if still running
urls_discovered- Total URLs discovered so far
urls_crawled- Number of URLs successfully crawled
urls_pending- Number of URLs waiting to be crawled
urls_failed- Number of URLs that failed to crawl
Initialize from API response
Args
response_data- Raw API response dictionary
Class variables
var STATUS_CANCELLEDvar STATUS_COMPLETEDvar STATUS_DONEvar STATUS_FAILEDvar STATUS_PENDINGvar STATUS_RUNNING
Instance variables
prop is_cancelled : bool-
Expand source code
@property def is_cancelled(self) -> bool: """Check if crawler job was cancelled""" return self.status == self.STATUS_CANCELLEDCheck if crawler job was cancelled
prop is_complete : bool-
Expand source code
@property def is_complete(self) -> bool: """Check if crawler job is complete""" return self.status in (self.STATUS_COMPLETED, self.STATUS_DONE)Check if crawler job is complete
prop is_failed : bool-
Expand source code
@property def is_failed(self) -> bool: """Check if crawler job failed""" return self.status == self.STATUS_FAILEDCheck if crawler job failed
prop is_running : bool-
Expand source code
@property def is_running(self) -> bool: """Check if crawler job is currently running""" return self.status in (self.STATUS_PENDING, self.STATUS_RUNNING)Check if crawler job is currently running
prop progress_pct : float-
Expand source code
@property def progress_pct(self) -> float: """ Calculate progress percentage Returns: Progress as percentage (0-100) """ if self.urls_discovered == 0: return 0.0 return (self.urls_crawled / self.urls_discovered) * 100Calculate progress percentage
Returns
Progress as percentage (0-100)
class CrawlerWebhookBase (event: str, uuid: str, timestamp: datetime.datetime)-
Expand source code
@dataclass class CrawlerWebhookBase: """ Base class for all crawler webhook payloads. All webhook events share these common fields: - event: The event type (crawl.started, crawl.url_discovered, etc.) - uuid: The crawler job UUID - timestamp: When the event occurred (ISO 8601 format) """ event: str uuid: str timestamp: datetime @classmethod def from_dict(cls, data: Dict) -> 'CrawlerWebhookBase': """Create webhook instance from dictionary payload""" # Parse timestamp if it's a string timestamp = data.get('timestamp') if isinstance(timestamp, str): # Handle ISO 8601 format if timestamp.endswith('Z'): timestamp = timestamp[:-1] + '+00:00' timestamp = datetime.fromisoformat(timestamp) return cls( event=data['event'], uuid=data['uuid'], timestamp=timestamp )Base class for all crawler webhook payloads.
All webhook events share these common fields: - event: The event type (crawl.started, crawl.url_discovered, etc.) - uuid: The crawler job UUID - timestamp: When the event occurred (ISO 8601 format)
Subclasses
Static methods
def from_dict(data: Dict) ‑> CrawlerWebhookBase-
Create webhook instance from dictionary payload
Instance variables
var event : strvar timestamp : datetime.datetimevar uuid : str
class CrawlerWebhookEvent (value, names=None, *, module=None, qualname=None, type=None, start=1)-
Expand source code
class CrawlerWebhookEvent(Enum): """Crawler webhook event types""" STARTED = 'crawl.started' URL_DISCOVERED = 'crawl.url_discovered' URL_FAILED = 'crawl.url_failed' COMPLETED = 'crawl.completed'Crawler webhook event types
Ancestors
- enum.Enum
Class variables
var COMPLETEDvar STARTEDvar URL_DISCOVEREDvar URL_FAILED
class EncoderError (content: str)-
Expand source code
class EncoderError(BaseException): def __init__(self, content:str): self.content = content super().__init__() def __str__(self) -> str: return self.content def __repr__(self): return "Invalid payload: %s" % self.contentCommon base class for all exceptions
Ancestors
- builtins.BaseException
class ErrorFactory-
Expand source code
class ErrorFactory: RESOURCE_TO_ERROR = { ScrapflyError.RESOURCE_SCRAPE: ScrapflyScrapeError, ScrapflyError.RESOURCE_WEBHOOK: ScrapflyWebhookError, ScrapflyError.RESOURCE_PROXY: ScrapflyProxyError, ScrapflyError.RESOURCE_SCHEDULE: ScrapflyScheduleError, ScrapflyError.RESOURCE_ASP: ScrapflyAspError, ScrapflyError.RESOURCE_SESSION: ScrapflySessionError } # Notable http error has own class for more convenience # Only applicable for generic API error HTTP_STATUS_TO_ERROR = { 401: BadApiKeyError, 402: PaymentRequired, 429: TooManyRequest } @staticmethod def _get_resource(code: str) -> Optional[Tuple[str, str]]: if isinstance(code, str) and '::' in code: _, resource, _ = code.split('::') return resource return None @staticmethod def create(api_response: 'ScrapeApiResponse'): is_retryable = False kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR http_code = api_response.status_code retry_delay = 5 retry_times = 3 description = None error_url = 'https://scrapfly.io/docs/scrape-api/errors#api' code = api_response.error['code'] if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': http_code = api_response.scrape_result['status_code'] if 'description' in api_response.error: description = api_response.error['description'] message = '%s %s %s' % (str(http_code), code, api_response.error['message']) if 'doc_url' in api_response.error: error_url = api_response.error['doc_url'] if 'retryable' in api_response.error: is_retryable = api_response.error['retryable'] resource = ErrorFactory._get_resource(code=code) if is_retryable is True: if 'X-Retry' in api_response.headers: retry_delay = int(api_response.headers['Retry-After']) message = '%s: %s' % (message, description) if description else message if retry_delay is not None and is_retryable is True: message = '%s. Retry delay : %s seconds' % (message, str(retry_delay)) args = { 'message': message, 'code': code, 'http_status_code': http_code, 'is_retryable': is_retryable, 'api_response': api_response, 'resource': resource, 'retry_delay': retry_delay, 'retry_times': retry_times, 'documentation_url': error_url, 'request': api_response.request, 'response': api_response.response } if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE: if http_code >= 500: return ApiHttpServerError(**args) is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error: return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args) if is_scraper_api_error: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ApiHttpClientError(**args) elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR: if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': if http_code >= 500: return UpstreamHttpServerError(**args) if http_code >= 400: return UpstreamHttpClientError(**args) if resource in ErrorFactory.RESOURCE_TO_ERROR: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ScrapflyError(**args)Class variables
var HTTP_STATUS_TO_ERRORvar RESOURCE_TO_ERROR
Static methods
def create(api_response: ScrapeApiResponse)-
Expand source code
@staticmethod def create(api_response: 'ScrapeApiResponse'): is_retryable = False kind = ScrapflyError.KIND_HTTP_BAD_RESPONSE if api_response.success is False else ScrapflyError.KIND_SCRAPFLY_ERROR http_code = api_response.status_code retry_delay = 5 retry_times = 3 description = None error_url = 'https://scrapfly.io/docs/scrape-api/errors#api' code = api_response.error['code'] if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': http_code = api_response.scrape_result['status_code'] if 'description' in api_response.error: description = api_response.error['description'] message = '%s %s %s' % (str(http_code), code, api_response.error['message']) if 'doc_url' in api_response.error: error_url = api_response.error['doc_url'] if 'retryable' in api_response.error: is_retryable = api_response.error['retryable'] resource = ErrorFactory._get_resource(code=code) if is_retryable is True: if 'X-Retry' in api_response.headers: retry_delay = int(api_response.headers['Retry-After']) message = '%s: %s' % (message, description) if description else message if retry_delay is not None and is_retryable is True: message = '%s. Retry delay : %s seconds' % (message, str(retry_delay)) args = { 'message': message, 'code': code, 'http_status_code': http_code, 'is_retryable': is_retryable, 'api_response': api_response, 'resource': resource, 'retry_delay': retry_delay, 'retry_times': retry_times, 'documentation_url': error_url, 'request': api_response.request, 'response': api_response.response } if kind == ScrapflyError.KIND_HTTP_BAD_RESPONSE: if http_code >= 500: return ApiHttpServerError(**args) is_scraper_api_error = resource in ErrorFactory.RESOURCE_TO_ERROR if http_code in ErrorFactory.HTTP_STATUS_TO_ERROR and not is_scraper_api_error: return ErrorFactory.HTTP_STATUS_TO_ERROR[http_code](**args) if is_scraper_api_error: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ApiHttpClientError(**args) elif kind == ScrapflyError.KIND_SCRAPFLY_ERROR: if code == 'ERR::SCRAPE::BAD_UPSTREAM_RESPONSE': if http_code >= 500: return UpstreamHttpServerError(**args) if http_code >= 400: return UpstreamHttpClientError(**args) if resource in ErrorFactory.RESOURCE_TO_ERROR: return ErrorFactory.RESOURCE_TO_ERROR[resource](**args) return ScrapflyError(**args)
class ExtractionAPIError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ExtractionAPIError(HttpError): passCommon base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ExtractionApiResponse (request: requests.models.Request,
response: requests.models.Response,
extraction_config: ExtractionConfig,
api_result: bytes | None = None)-
Expand source code
class ExtractionApiResponse(ApiResponse): def __init__(self, request: Request, response: Response, extraction_config: ExtractionConfig, api_result: Optional[bytes] = None): super().__init__(request, response) self.extraction_config = extraction_config self.result = self.handle_api_result(api_result) @property def extraction_result(self) -> Optional[Dict]: extraction_result = self.result.get('result', None) if not extraction_result: # handle empty extraction responses return {'data': None, 'content_type': None} else: return extraction_result @property def data(self) -> Union[Dict, List, str]: # depends on the LLM prompt if self.error is None: return self.extraction_result['data'] return None @property def content_type(self) -> Optional[str]: if self.error is None: return self.extraction_result['content_type'] return None @property def extraction_success(self) -> bool: extraction_result = self.extraction_result if extraction_result is None or extraction_result['data'] is None: return False return True @property def error(self) -> Optional[Dict]: if self.extraction_result is None: return self.result return None def _is_api_error(self, api_result: Dict) -> bool: if api_result is None: return True return 'error_id' in api_result def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return FrozenDict({'result': api_result}) def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)Ancestors
Instance variables
prop content_type : str | None-
Expand source code
@property def content_type(self) -> Optional[str]: if self.error is None: return self.extraction_result['content_type'] return None prop data : Dict | List | str-
Expand source code
@property def data(self) -> Union[Dict, List, str]: # depends on the LLM prompt if self.error is None: return self.extraction_result['data'] return None prop error : Dict | None-
Expand source code
@property def error(self) -> Optional[Dict]: if self.extraction_result is None: return self.result return None prop extraction_result : Dict | None-
Expand source code
@property def extraction_result(self) -> Optional[Dict]: extraction_result = self.result.get('result', None) if not extraction_result: # handle empty extraction responses return {'data': None, 'content_type': None} else: return extraction_result prop extraction_success : bool-
Expand source code
@property def extraction_success(self) -> bool: extraction_result = self.extraction_result if extraction_result is None or extraction_result['data'] is None: return False return True
Methods
def handle_api_result(self, api_result: bytes) ‑> FrozenDict-
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return FrozenDict({'result': api_result}) def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ExtractionAPIError)-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ExtractionAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Inherited members
class ExtractionConfig (body: str | bytes,
content_type: str,
url: str | None = None,
charset: str | None = None,
extraction_template: str | None = None,
extraction_ephemeral_template: Dict | None = None,
extraction_prompt: str | None = None,
extraction_model: str | None = None,
is_document_compressed: bool | None = None,
document_compression_format: CompressionFormat | None = None,
webhook: str | None = None,
raise_on_upstream_error: bool = True,
template: str | None = None,
ephemeral_template: Dict | None = None)-
Expand source code
class ExtractionConfig(BaseApiConfig): body: Union[str, bytes] content_type: str url: Optional[str] = None charset: Optional[str] = None extraction_template: Optional[str] = None # a saved template name extraction_ephemeral_template: Optional[Dict] # ephemeraly declared json template extraction_prompt: Optional[str] = None extraction_model: Optional[str] = None is_document_compressed: Optional[bool] = None document_compression_format: Optional[CompressionFormat] = None webhook: Optional[str] = None raise_on_upstream_error: bool = True # deprecated options template: Optional[str] = None ephemeral_template: Optional[Dict] = None def __init__( self, body: Union[str, bytes], content_type: str, url: Optional[str] = None, charset: Optional[str] = None, extraction_template: Optional[str] = None, # a saved template name extraction_ephemeral_template: Optional[Dict] = None, # ephemeraly declared json template extraction_prompt: Optional[str] = None, extraction_model: Optional[str] = None, is_document_compressed: Optional[bool] = None, document_compression_format: Optional[CompressionFormat] = None, webhook: Optional[str] = None, raise_on_upstream_error: bool = True, # deprecated options template: Optional[str] = None, ephemeral_template: Optional[Dict] = None ): if template: warnings.warn( "Deprecation warning: 'template' is deprecated. Use 'extraction_template' instead." ) extraction_template = template if ephemeral_template: warnings.warn( "Deprecation warning: 'ephemeral_template' is deprecated. Use 'extraction_ephemeral_template' instead." ) extraction_ephemeral_template = ephemeral_template self.key = None self.body = body self.content_type = content_type self.url = url self.charset = charset self.extraction_template = extraction_template self.extraction_ephemeral_template = extraction_ephemeral_template self.extraction_prompt = extraction_prompt self.extraction_model = extraction_model self.is_document_compressed = is_document_compressed self.document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None self.webhook = webhook self.raise_on_upstream_error = raise_on_upstream_error if isinstance(body, bytes) or document_compression_format: compression_format = detect_compression_format(body) if compression_format is not None: self.is_document_compressed = True if self.document_compression_format and compression_format != self.document_compression_format: raise ExtractionConfigError( f'The detected compression format `{compression_format}` does not match declared format `{self.document_compression_format}`. ' f'You must pass the compression format or disable compression.' ) self.document_compression_format = compression_format else: self.is_document_compressed = False if self.is_document_compressed is False: compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None if isinstance(self.body, str) and compression_foramt: self.body = self.body.encode('utf-8') if compression_foramt == CompressionFormat.GZIP: import gzip self.body = gzip.compress(self.body) elif compression_foramt == CompressionFormat.ZSTD: try: import zstandard as zstd except ImportError: raise ExtractionConfigError( f'zstandard is not installed. You must run pip install zstandard' f' to auto compress into zstd or use compression formats.' ) self.body = zstd.compress(self.body) elif compression_foramt == CompressionFormat.DEFLATE: import zlib compressor = zlib.compressobj(wbits=-zlib.MAX_WBITS) # raw deflate compression self.body = compressor.compress(self.body) + compressor.flush() def to_api_params(self, key: str) -> Dict: params = { 'key': self.key or key, 'content_type': self.content_type } if self.url: params['url'] = self.url if self.charset: params['charset'] = self.charset if self.extraction_template and self.extraction_ephemeral_template: raise ExtractionConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose') if self.extraction_template: params['extraction_template'] = self.extraction_template if self.extraction_ephemeral_template: self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template) params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8') if self.extraction_prompt: params['extraction_prompt'] = quote_plus(self.extraction_prompt) if self.extraction_model: params['extraction_model'] = self.extraction_model if self.webhook: params['webhook_name'] = self.webhook return params def to_dict(self) -> Dict: """ Export the ExtractionConfig instance to a plain dictionary. """ if self.is_document_compressed is True: compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None if compression_foramt == CompressionFormat.GZIP: import gzip self.body = gzip.decompress(self.body) elif compression_foramt == CompressionFormat.ZSTD: import zstandard as zstd self.body = zstd.decompress(self.body) elif compression_foramt == CompressionFormat.DEFLATE: import zlib decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS) self.body = decompressor.decompress(self.body) + decompressor.flush() if isinstance(self.body, bytes): self.body = self.body.decode('utf-8') self.is_document_compressed = False return { 'body': self.body, 'content_type': self.content_type, 'url': self.url, 'charset': self.charset, 'extraction_template': self.extraction_template, 'extraction_ephemeral_template': self.extraction_ephemeral_template, 'extraction_prompt': self.extraction_prompt, 'extraction_model': self.extraction_model, 'is_document_compressed': self.is_document_compressed, 'document_compression_format': CompressionFormat(self.document_compression_format).value if self.document_compression_format else None, 'webhook': self.webhook, 'raise_on_upstream_error': self.raise_on_upstream_error, } @staticmethod def from_dict(extraction_config_dict: Dict) -> 'ExtractionConfig': """Create an ExtractionConfig instance from a dictionary.""" body = extraction_config_dict.get('body', None) content_type = extraction_config_dict.get('content_type', None) url = extraction_config_dict.get('url', None) charset = extraction_config_dict.get('charset', None) extraction_template = extraction_config_dict.get('extraction_template', None) extraction_ephemeral_template = extraction_config_dict.get('extraction_ephemeral_template', None) extraction_prompt = extraction_config_dict.get('extraction_prompt', None) extraction_model = extraction_config_dict.get('extraction_model', None) is_document_compressed = extraction_config_dict.get('is_document_compressed', None) document_compression_format = extraction_config_dict.get('document_compression_format', None) document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None webhook = extraction_config_dict.get('webhook', None) raise_on_upstream_error = extraction_config_dict.get('raise_on_upstream_error', True) return ExtractionConfig( body=body, content_type=content_type, url=url, charset=charset, extraction_template=extraction_template, extraction_ephemeral_template=extraction_ephemeral_template, extraction_prompt=extraction_prompt, extraction_model=extraction_model, is_document_compressed=is_document_compressed, document_compression_format=document_compression_format, webhook=webhook, raise_on_upstream_error=raise_on_upstream_error )Ancestors
Class variables
var body : str | bytesvar charset : str | Nonevar content_type : strvar document_compression_format : CompressionFormat | Nonevar ephemeral_template : Dict | Nonevar extraction_ephemeral_template : Dict | Nonevar extraction_model : str | Nonevar extraction_prompt : str | Nonevar extraction_template : str | Nonevar is_document_compressed : bool | Nonevar raise_on_upstream_error : boolvar template : str | Nonevar url : str | Nonevar webhook : str | None
Static methods
def from_dict(extraction_config_dict: Dict) ‑> ExtractionConfig-
Expand source code
@staticmethod def from_dict(extraction_config_dict: Dict) -> 'ExtractionConfig': """Create an ExtractionConfig instance from a dictionary.""" body = extraction_config_dict.get('body', None) content_type = extraction_config_dict.get('content_type', None) url = extraction_config_dict.get('url', None) charset = extraction_config_dict.get('charset', None) extraction_template = extraction_config_dict.get('extraction_template', None) extraction_ephemeral_template = extraction_config_dict.get('extraction_ephemeral_template', None) extraction_prompt = extraction_config_dict.get('extraction_prompt', None) extraction_model = extraction_config_dict.get('extraction_model', None) is_document_compressed = extraction_config_dict.get('is_document_compressed', None) document_compression_format = extraction_config_dict.get('document_compression_format', None) document_compression_format = CompressionFormat(document_compression_format) if document_compression_format else None webhook = extraction_config_dict.get('webhook', None) raise_on_upstream_error = extraction_config_dict.get('raise_on_upstream_error', True) return ExtractionConfig( body=body, content_type=content_type, url=url, charset=charset, extraction_template=extraction_template, extraction_ephemeral_template=extraction_ephemeral_template, extraction_prompt=extraction_prompt, extraction_model=extraction_model, is_document_compressed=is_document_compressed, document_compression_format=document_compression_format, webhook=webhook, raise_on_upstream_error=raise_on_upstream_error )Create an ExtractionConfig instance from a dictionary.
Methods
def to_api_params(self, key: str) ‑> Dict-
Expand source code
def to_api_params(self, key: str) -> Dict: params = { 'key': self.key or key, 'content_type': self.content_type } if self.url: params['url'] = self.url if self.charset: params['charset'] = self.charset if self.extraction_template and self.extraction_ephemeral_template: raise ExtractionConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose') if self.extraction_template: params['extraction_template'] = self.extraction_template if self.extraction_ephemeral_template: self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template) params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8') if self.extraction_prompt: params['extraction_prompt'] = quote_plus(self.extraction_prompt) if self.extraction_model: params['extraction_model'] = self.extraction_model if self.webhook: params['webhook_name'] = self.webhook return params def to_dict(self) ‑> Dict-
Expand source code
def to_dict(self) -> Dict: """ Export the ExtractionConfig instance to a plain dictionary. """ if self.is_document_compressed is True: compression_foramt = CompressionFormat(self.document_compression_format) if self.document_compression_format else None if compression_foramt == CompressionFormat.GZIP: import gzip self.body = gzip.decompress(self.body) elif compression_foramt == CompressionFormat.ZSTD: import zstandard as zstd self.body = zstd.decompress(self.body) elif compression_foramt == CompressionFormat.DEFLATE: import zlib decompressor = zlib.decompressobj(wbits=-zlib.MAX_WBITS) self.body = decompressor.decompress(self.body) + decompressor.flush() if isinstance(self.body, bytes): self.body = self.body.decode('utf-8') self.is_document_compressed = False return { 'body': self.body, 'content_type': self.content_type, 'url': self.url, 'charset': self.charset, 'extraction_template': self.extraction_template, 'extraction_ephemeral_template': self.extraction_ephemeral_template, 'extraction_prompt': self.extraction_prompt, 'extraction_model': self.extraction_model, 'is_document_compressed': self.is_document_compressed, 'document_compression_format': CompressionFormat(self.document_compression_format).value if self.document_compression_format else None, 'webhook': self.webhook, 'raise_on_upstream_error': self.raise_on_upstream_error, }Export the ExtractionConfig instance to a plain dictionary.
class HarArchive (har_data: bytes)-
Expand source code
class HarArchive: """Parser and accessor for HAR (HTTP Archive) format data""" def __init__(self, har_data: bytes): """ Initialize HAR archive from bytes Args: har_data: HAR file content as bytes (JSON format, may be gzipped) """ # Decompress if gzipped if isinstance(har_data, bytes): if har_data[:2] == b'\x1f\x8b': # gzip magic number har_data = gzip.decompress(har_data) har_data = har_data.decode('utf-8') # Parse the special format: {"log":{...,"entries":[]}}{"entry1"}{"entry2"}... # First object is HAR log structure, subsequent objects are individual entries objects = [] decoder = json.JSONDecoder() idx = 0 while idx < len(har_data): har_data_stripped = har_data[idx:].lstrip() if not har_data_stripped: break try: obj, end_idx = decoder.raw_decode(har_data_stripped) objects.append(obj) idx += len(har_data[idx:]) - len(har_data_stripped) + end_idx except json.JSONDecodeError: break # First object should be the HAR log structure if objects and 'log' in objects[0]: self._data = objects[0] self._log = self._data.get('log', {}) # Remaining objects are the entries self._entries = objects[1:] if len(objects) > 1 else [] else: # Fallback: standard HAR format self._data = json.loads(har_data) if isinstance(har_data, str) else {} self._log = self._data.get('log', {}) self._entries = self._log.get('entries', []) @property def version(self) -> str: """Get HAR version""" return self._log.get('version', '') @property def creator(self) -> Dict[str, Any]: """Get creator information""" return self._log.get('creator', {}) @property def pages(self) -> List[Dict[str, Any]]: """Get pages list""" return self._log.get('pages', []) def get_entries(self) -> List[HarEntry]: """ Get all entries as list Returns: List of HarEntry objects """ return [HarEntry(entry) for entry in self._entries] def iter_entries(self) -> Iterator[HarEntry]: """ Iterate through all HAR entries Yields: HarEntry objects """ for entry in self._entries: yield HarEntry(entry) def get_urls(self) -> List[str]: """ Get all URLs in the archive Returns: List of unique URLs """ urls = [] for entry in self._entries: url = entry.get('request', {}).get('url', '') if url and url not in urls: urls.append(url) return urls def find_by_url(self, url: str) -> Optional[HarEntry]: """ Find entry by exact URL match Args: url: URL to search for Returns: First matching HarEntry or None """ for entry in self.iter_entries(): if entry.url == url: return entry return None def filter_by_status(self, status_code: int) -> List[HarEntry]: """ Filter entries by status code Args: status_code: HTTP status code to filter by Returns: List of matching HarEntry objects """ return [entry for entry in self.iter_entries() if entry.status_code == status_code] def filter_by_content_type(self, content_type: str) -> List[HarEntry]: """ Filter entries by content type (substring match) Args: content_type: Content type to filter by (e.g., 'text/html') Returns: List of matching HarEntry objects """ return [entry for entry in self.iter_entries() if content_type.lower() in entry.content_type.lower()] def __len__(self) -> int: """Get number of entries""" return len(self._entries) def __repr__(self) -> str: return f"<HarArchive {len(self._entries)} entries>"Parser and accessor for HAR (HTTP Archive) format data
Initialize HAR archive from bytes
Args
har_data- HAR file content as bytes (JSON format, may be gzipped)
Instance variables
prop creator : Dict[str, Any]-
Expand source code
@property def creator(self) -> Dict[str, Any]: """Get creator information""" return self._log.get('creator', {})Get creator information
prop pages : List[Dict[str, Any]]-
Expand source code
@property def pages(self) -> List[Dict[str, Any]]: """Get pages list""" return self._log.get('pages', [])Get pages list
prop version : str-
Expand source code
@property def version(self) -> str: """Get HAR version""" return self._log.get('version', '')Get HAR version
Methods
def filter_by_content_type(self, content_type: str) ‑> List[HarEntry]-
Expand source code
def filter_by_content_type(self, content_type: str) -> List[HarEntry]: """ Filter entries by content type (substring match) Args: content_type: Content type to filter by (e.g., 'text/html') Returns: List of matching HarEntry objects """ return [entry for entry in self.iter_entries() if content_type.lower() in entry.content_type.lower()]Filter entries by content type (substring match)
Args
content_type- Content type to filter by (e.g., 'text/html')
Returns
List of matching HarEntry objects
def filter_by_status(self, status_code: int) ‑> List[HarEntry]-
Expand source code
def filter_by_status(self, status_code: int) -> List[HarEntry]: """ Filter entries by status code Args: status_code: HTTP status code to filter by Returns: List of matching HarEntry objects """ return [entry for entry in self.iter_entries() if entry.status_code == status_code]Filter entries by status code
Args
status_code- HTTP status code to filter by
Returns
List of matching HarEntry objects
def find_by_url(self, url: str) ‑> HarEntry | None-
Expand source code
def find_by_url(self, url: str) -> Optional[HarEntry]: """ Find entry by exact URL match Args: url: URL to search for Returns: First matching HarEntry or None """ for entry in self.iter_entries(): if entry.url == url: return entry return NoneFind entry by exact URL match
Args
url- URL to search for
Returns
First matching HarEntry or None
def get_entries(self) ‑> List[HarEntry]-
Expand source code
def get_entries(self) -> List[HarEntry]: """ Get all entries as list Returns: List of HarEntry objects """ return [HarEntry(entry) for entry in self._entries]Get all entries as list
Returns
List of HarEntry objects
def get_urls(self) ‑> List[str]-
Expand source code
def get_urls(self) -> List[str]: """ Get all URLs in the archive Returns: List of unique URLs """ urls = [] for entry in self._entries: url = entry.get('request', {}).get('url', '') if url and url not in urls: urls.append(url) return urlsGet all URLs in the archive
Returns
List of unique URLs
def iter_entries(self) ‑> Iterator[HarEntry]-
Expand source code
def iter_entries(self) -> Iterator[HarEntry]: """ Iterate through all HAR entries Yields: HarEntry objects """ for entry in self._entries: yield HarEntry(entry)Iterate through all HAR entries
Yields
HarEntry objects
class HarEntry (entry_data: Dict[str, Any])-
Expand source code
class HarEntry: """Represents a single HAR entry (HTTP request/response pair)""" def __init__(self, entry_data: Dict[str, Any]): """ Initialize from HAR entry dict Args: entry_data: HAR entry dictionary """ self._data = entry_data self._request = entry_data.get('request', {}) self._response = entry_data.get('response', {}) @property def url(self) -> str: """Get request URL""" return self._request.get('url', '') @property def method(self) -> str: """Get HTTP method""" return self._request.get('method', 'GET') @property def status_code(self) -> int: """Get response status code""" # Handle case where response doesn't exist or status is missing if not self._response: return 0 status = self._response.get('status') if status is None: return 0 # Ensure it's an int (HAR data might have status as string) try: return int(status) except (ValueError, TypeError): return 0 @property def status_text(self) -> str: """Get response status text""" return self._response.get('statusText', '') @property def request_headers(self) -> Dict[str, str]: """Get request headers as dict""" headers = {} for header in self._request.get('headers', []): headers[header['name']] = header['value'] return headers @property def response_headers(self) -> Dict[str, str]: """Get response headers as dict""" headers = {} for header in self._response.get('headers', []): headers[header['name']] = header['value'] return headers @property def content(self) -> bytes: """Get response content as bytes""" content_data = self._response.get('content', {}) text = content_data.get('text', '') # Handle base64 encoding if present encoding = content_data.get('encoding', '') if encoding == 'base64': import base64 return base64.b64decode(text) # Return as UTF-8 bytes if isinstance(text, str): return text.encode('utf-8') return text @property def content_type(self) -> str: """Get response content type""" return self._response.get('content', {}).get('mimeType', '') @property def content_size(self) -> int: """Get response content size""" return self._response.get('content', {}).get('size', 0) @property def started_datetime(self) -> str: """Get when request was started (ISO 8601 format)""" return self._data.get('startedDateTime', '') @property def time(self) -> float: """Get total elapsed time in milliseconds""" return self._data.get('time', 0.0) @property def timings(self) -> Dict[str, float]: """Get detailed timing information""" return self._data.get('timings', {}) def __repr__(self) -> str: return f"<HarEntry {self.method} {self.url} [{self.status_code}]>"Represents a single HAR entry (HTTP request/response pair)
Initialize from HAR entry dict
Args
entry_data- HAR entry dictionary
Instance variables
prop content : bytes-
Expand source code
@property def content(self) -> bytes: """Get response content as bytes""" content_data = self._response.get('content', {}) text = content_data.get('text', '') # Handle base64 encoding if present encoding = content_data.get('encoding', '') if encoding == 'base64': import base64 return base64.b64decode(text) # Return as UTF-8 bytes if isinstance(text, str): return text.encode('utf-8') return textGet response content as bytes
prop content_size : int-
Expand source code
@property def content_size(self) -> int: """Get response content size""" return self._response.get('content', {}).get('size', 0)Get response content size
prop content_type : str-
Expand source code
@property def content_type(self) -> str: """Get response content type""" return self._response.get('content', {}).get('mimeType', '')Get response content type
prop method : str-
Expand source code
@property def method(self) -> str: """Get HTTP method""" return self._request.get('method', 'GET')Get HTTP method
prop request_headers : Dict[str, str]-
Expand source code
@property def request_headers(self) -> Dict[str, str]: """Get request headers as dict""" headers = {} for header in self._request.get('headers', []): headers[header['name']] = header['value'] return headersGet request headers as dict
prop response_headers : Dict[str, str]-
Expand source code
@property def response_headers(self) -> Dict[str, str]: """Get response headers as dict""" headers = {} for header in self._response.get('headers', []): headers[header['name']] = header['value'] return headersGet response headers as dict
prop started_datetime : str-
Expand source code
@property def started_datetime(self) -> str: """Get when request was started (ISO 8601 format)""" return self._data.get('startedDateTime', '')Get when request was started (ISO 8601 format)
prop status_code : int-
Expand source code
@property def status_code(self) -> int: """Get response status code""" # Handle case where response doesn't exist or status is missing if not self._response: return 0 status = self._response.get('status') if status is None: return 0 # Ensure it's an int (HAR data might have status as string) try: return int(status) except (ValueError, TypeError): return 0Get response status code
prop status_text : str-
Expand source code
@property def status_text(self) -> str: """Get response status text""" return self._response.get('statusText', '')Get response status text
prop time : float-
Expand source code
@property def time(self) -> float: """Get total elapsed time in milliseconds""" return self._data.get('time', 0.0)Get total elapsed time in milliseconds
prop timings : Dict[str, float]-
Expand source code
@property def timings(self) -> Dict[str, float]: """Get detailed timing information""" return self._data.get('timings', {})Get detailed timing information
prop url : str-
Expand source code
@property def url(self) -> str: """Get request URL""" return self._request.get('url', '')Get request URL
class HttpError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class HttpError(ScrapflyError): def __init__(self, request:Request, response:Optional[Response]=None, **kwargs): self.request = request self.response = response super().__init__(**kwargs) def __str__(self) -> str: if isinstance(self, UpstreamHttpError): return f"Target website responded with {self.api_response.scrape_result['status_code']} - {self.api_response.scrape_result['reason']}" if self.api_response is not None: return self.api_response.error_message text = f"{self.response.status_code} - {self.response.reason}" # Include detailed error message for all HTTP errors if self.message: text += f" - {self.message}" return textCommon base class for all non-exit exceptions.
Ancestors
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
- ApiHttpClientError
- scrapfly.errors.ExtractionAPIError
- scrapfly.errors.QuotaLimitReached
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.ScreenshotAPIError
- scrapfly.errors.TooManyConcurrentRequest
- scrapfly.errors.UpstreamHttpError
class ResponseBodyHandler (use_brotli: bool = False, signing_secrets: Tuple[str] | None = None)-
Expand source code
class ResponseBodyHandler: SUPPORTED_COMPRESSION = ['gzip', 'deflate'] SUPPORTED_CONTENT_TYPES = ['application/msgpack', 'application/json'] class JSONDateTimeDecoder(JSONDecoder): def __init__(self, *args, **kargs): JSONDecoder.__init__(self, *args, object_hook=_date_parser, **kargs) # brotli under perform at same gzip level and upper level destroy the cpu so # the trade off do not worth it for most of usage def __init__(self, use_brotli: bool = False, signing_secrets: Optional[Tuple[str]] = None): if use_brotli is True and 'br' not in self.SUPPORTED_COMPRESSION: try: try: import brotlicffi as brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: import brotli self.SUPPORTED_COMPRESSION.insert(0, 'br') except ImportError: pass try: from compression import zstd as _zstd # noqa: F401 - Python 3.14+ native zstd if 'zstd' not in self.SUPPORTED_COMPRESSION: self.SUPPORTED_COMPRESSION.append('zstd') except ImportError: try: import zstandard # noqa: F401 - aligned with urllib3 for transparent decompression if 'zstd' not in self.SUPPORTED_COMPRESSION: self.SUPPORTED_COMPRESSION.append('zstd') except ImportError: pass self.content_encoding: str = ', '.join(self.SUPPORTED_COMPRESSION) self._signing_secret: Optional[Tuple[str]] = None if signing_secrets: _secrets = set() for signing_secret in signing_secrets: _secrets.add(binascii.unhexlify(signing_secret)) self._signing_secret = tuple(_secrets) try: # automatically use msgpack if available https://msgpack.org/ import msgpack self.accept = 'application/msgpack;charset=utf-8' self.content_type = 'application/msgpack;charset=utf-8' self.content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False) except ImportError: self.accept = 'application/json;charset=utf-8' self.content_type = 'application/json;charset=utf-8' self.content_loader = partial(loads, cls=self.JSONDateTimeDecoder) def support(self, headers: Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False def verify(self, message: bytes, signature: str) -> bool: for signing_secret in self._signing_secret: if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature: return True return False def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict: if content_encoding == 'gzip' or content_encoding == 'gz': import gzip content = gzip.decompress(content) elif content_encoding == 'deflate': import zlib content = zlib.decompress(content) elif content_encoding == 'brotli' or content_encoding == 'br': import brotli content = brotli.decompress(content) elif content_encoding == 'zstd': try: from compression import zstd as _zstd # Python 3.14+ content = _zstd.decompress(content) except ImportError: import zstandard content = zstandard.decompress(content) if self._signing_secret is not None and signature is not None: if not self.verify(content, signature): raise WebhookSignatureMissMatch() if content_type.startswith('application/json'): content = loads(content, cls=self.JSONDateTimeDecoder) elif content_type.startswith('application/msgpack'): import msgpack content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False) return content def __call__(self, content: bytes, content_type: str) -> Union[str, Dict]: content_loader = None if content_type.find('application/json') != -1: content_loader = partial(loads, cls=self.JSONDateTimeDecoder) elif content_type.find('application/msgpack') != -1: import msgpack content_loader = partial(msgpack.loads, object_hook=_date_parser, strict_map_key=False) if content_loader is None: raise Exception('Unsupported content type') try: return content_loader(content) except Exception as e: try: raise EncoderError(content=content.decode('utf-8')) from e except UnicodeError: raise EncoderError(content=base64.b64encode(content).decode('utf-8')) from eClass variables
var JSONDateTimeDecoder-
Simple JSON https://json.org decoder
Performs the following translations in decoding by default:
+---------------+-------------------+ | JSON | Python | +===============+===================+ | object | dict | +---------------+-------------------+ | array | list | +---------------+-------------------+ | string | str | +---------------+-------------------+ | number (int) | int | +---------------+-------------------+ | number (real) | float | +---------------+-------------------+ | true | True | +---------------+-------------------+ | false | False | +---------------+-------------------+ | null | None | +---------------+-------------------+
It also understands
NaN,Infinity, and-Infinityas their correspondingfloatvalues, which is outside the JSON spec. var SUPPORTED_COMPRESSIONvar SUPPORTED_CONTENT_TYPES
Methods
def read(self,
content: bytes,
content_encoding: str,
content_type: str,
signature: str | None) ‑> Dict-
Expand source code
def read(self, content: bytes, content_encoding: str, content_type: str, signature: Optional[str]) -> Dict: if content_encoding == 'gzip' or content_encoding == 'gz': import gzip content = gzip.decompress(content) elif content_encoding == 'deflate': import zlib content = zlib.decompress(content) elif content_encoding == 'brotli' or content_encoding == 'br': import brotli content = brotli.decompress(content) elif content_encoding == 'zstd': try: from compression import zstd as _zstd # Python 3.14+ content = _zstd.decompress(content) except ImportError: import zstandard content = zstandard.decompress(content) if self._signing_secret is not None and signature is not None: if not self.verify(content, signature): raise WebhookSignatureMissMatch() if content_type.startswith('application/json'): content = loads(content, cls=self.JSONDateTimeDecoder) elif content_type.startswith('application/msgpack'): import msgpack content = msgpack.loads(content, object_hook=_date_parser, strict_map_key=False) return content def support(self, headers: Dict) ‑> bool-
Expand source code
def support(self, headers: Dict) -> bool: if 'content-type' not in headers: return False for content_type in self.SUPPORTED_CONTENT_TYPES: if headers['content-type'].find(content_type) != -1: return True return False def verify(self, message: bytes, signature: str) ‑> bool-
Expand source code
def verify(self, message: bytes, signature: str) -> bool: for signing_secret in self._signing_secret: if hmac.new(signing_secret, message, hashlib.sha256).hexdigest().upper() == signature: return True return False
class ScrapeApiResponse (request: requests.models.Request,
response: requests.models.Response,
scrape_config: ScrapeConfig,
api_result: Dict | None = None,
large_object_handler: Callable | None = None)-
Expand source code
class ScrapeApiResponse(ApiResponse): scrape_config:ScrapeConfig large_object_handler:Callable def __init__(self, request: Request, response: Response, scrape_config: ScrapeConfig, api_result: Optional[Dict] = None, large_object_handler:Optional[Callable]=None): super().__init__(request, response) self.scrape_config = scrape_config self.large_object_handler = large_object_handler if self.scrape_config.method == 'HEAD': api_result = { 'result': { 'request_headers': {}, 'status': 'DONE', 'success': 200 >= self.response.status_code < 300, 'response_headers': self.response.headers, 'status_code': self.response.status_code, 'reason': self.response.reason, 'format': 'text', 'content': '' }, 'context': {}, 'config': self.scrape_config.__dict__ } if 'X-Scrapfly-Reject-Code' in self.response.headers: api_result['result']['error'] = { 'code': self.response.headers['X-Scrapfly-Reject-Code'], 'http_code': int(self.response.headers['X-Scrapfly-Reject-Http-Code']), 'message': self.response.headers['X-Scrapfly-Reject-Description'], 'error_id': self.response.headers['X-Scrapfly-Reject-ID'], 'retryable': True if self.response.headers['X-Scrapfly-Reject-Retryable'] == 'yes' else False, 'doc_url': '', 'links': {} } if 'X-Scrapfly-Reject-Doc' in self.response.headers: api_result['result']['error']['doc_url'] = self.response.headers['X-Scrapfly-Reject-Doc'] api_result['result']['error']['links']['Related Docs'] = self.response.headers['X-Scrapfly-Reject-Doc'] if isinstance(api_result, str): raise HttpError( request=request, response=response, message='Bad gateway', code=502, http_status_code=502, is_retryable=True ) self.result = self.handle_api_result(api_result=api_result) @property def scrape_result(self) -> Optional[Dict]: return self.result.get('result', None) @property def config(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['config'] @property def context(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['context'] @property def content(self) -> str: if self.scrape_result is None: return '' return self.scrape_result['content'] @property def success(self) -> bool: """ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299 @property def scrape_success(self) -> bool: scrape_result = self.scrape_result if not scrape_result: return False return self.scrape_result['success'] @property def error(self) -> Optional[Dict]: if self.scrape_result is None: return None if self.scrape_success is False: return self.scrape_result['error'] @property def upstream_status_code(self) -> Optional[int]: if self.scrape_result is None: return None if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None @cached_property def soup(self) -> 'BeautifulSoup': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from bs4 import BeautifulSoup soup = BeautifulSoup(self.content, "lxml") return soup except ImportError as e: logger.error('You must install scrapfly[parser] to enable this feature') @cached_property def selector(self) -> 'Selector': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from parsel import Selector return Selector(text=self.content) except ImportError as e: logger.error('You must install parsel or scrapy package to enable this feature') raise e def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if self.large_object_handler is not None and api_result['result']['content']: content_format = api_result['result']['format'] if content_format in ['clob', 'blob']: api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format) elif content_format == 'binary': base64_payload = api_result['result']['content'] if isinstance(base64_payload, bytes): base64_payload = base64_payload.decode('utf-8') api_result['result']['content'] = BytesIO(b64decode(base64_payload)) return FrozenDict(api_result) def _is_api_error(self, api_result: Dict) -> bool: if self.scrape_config.method == 'HEAD': if 'X-Reject-Reason' in self.response.headers: return True return False if api_result is None: return True return 'error_id' in api_result def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class) if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise errorAncestors
Class variables
var large_object_handler : Callablevar scrape_config : ScrapeConfig
Instance variables
prop config : Dict | None-
Expand source code
@property def config(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['config'] prop content : str-
Expand source code
@property def content(self) -> str: if self.scrape_result is None: return '' return self.scrape_result['content'] prop context : Dict | None-
Expand source code
@property def context(self) -> Optional[Dict]: if self.scrape_result is None: return None return self.result['context'] prop error : Dict | None-
Expand source code
@property def error(self) -> Optional[Dict]: if self.scrape_result is None: return None if self.scrape_success is False: return self.scrape_result['error'] prop scrape_result : Dict | None-
Expand source code
@property def scrape_result(self) -> Optional[Dict]: return self.result.get('result', None) prop scrape_success : bool-
Expand source code
@property def scrape_success(self) -> bool: scrape_result = self.scrape_result if not scrape_result: return False return self.scrape_result['success'] var selector : Selector-
Expand source code
@cached_property def selector(self) -> 'Selector': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from parsel import Selector return Selector(text=self.content) except ImportError as e: logger.error('You must install parsel or scrapy package to enable this feature') raise e var soup : BeautifulSoup-
Expand source code
@cached_property def soup(self) -> 'BeautifulSoup': if self.scrape_result['format'] != 'text': raise ContentError("Unable to cast into beautiful soup, the format of data is binary - must be text content") try: from bs4 import BeautifulSoup soup = BeautifulSoup(self.content, "lxml") return soup except ImportError as e: logger.error('You must install scrapfly[parser] to enable this feature') prop success : bool-
Expand source code
@property def success(self) -> bool: """ Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code """ return 200 >= self.response.status_code <= 299Success means Scrapfly api reply correctly to the call, but the scrape can be unsuccessful if the upstream reply with error status code
prop upstream_status_code : int | None-
Expand source code
@property def upstream_status_code(self) -> Optional[int]: if self.scrape_result is None: return None if 'status_code' in self.scrape_result: return self.scrape_result['status_code'] return None
Methods
def handle_api_result(self, api_result: Dict) ‑> FrozenDict | None-
Expand source code
def handle_api_result(self, api_result: Dict) -> Optional[FrozenDict]: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) try: if isinstance(api_result['config']['headers'], list): api_result['config']['headers'] = {} except TypeError: logger.info(api_result) raise with suppress(KeyError): api_result['result']['request_headers'] = CaseInsensitiveDict(api_result['result']['request_headers']) api_result['result']['response_headers'] = CaseInsensitiveDict(api_result['result']['response_headers']) if self.large_object_handler is not None and api_result['result']['content']: content_format = api_result['result']['format'] if content_format in ['clob', 'blob']: api_result['result']['content'], api_result['result']['format'] = self.large_object_handler(callback_url=api_result['result']['content'], format=content_format) elif content_format == 'binary': base64_payload = api_result['result']['content'] if isinstance(base64_payload, bytes): base64_payload = base64_payload.decode('utf-8') api_result['result']['content'] = BytesIO(b64decode(base64_payload)) return FrozenDict(api_result) def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ApiHttpClientError)-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ApiHttpClientError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class) if self.result['result']['status'] == 'DONE' and self.scrape_success is False: error = ErrorFactory.create(api_response=self) if error: if isinstance(error, UpstreamHttpError): if raise_on_upstream_error is True: raise error else: raise error def sink(self,
path: str | None = None,
name: str | None = None,
file:| _io.BytesIO | None = None,
content: str | bytes | None = None)-
Expand source code
def sink(self, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None, content: Optional[Union[str, bytes]] = None): file_content = content or self.scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = self.scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = self.config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path is not None else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', self.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) def upstream_result_into_response(self) ‑> requests.models.Response | None-
Expand source code
def upstream_result_into_response(self, _class=Response) -> Optional[Response]: if _class != Response: raise RuntimeError('only Response from requests package is supported at the moment') if self.result is None: return None if self.response.status_code != 200: return None response = Response() response.status_code = self.scrape_result['status_code'] response.reason = self.scrape_result['reason'] if self.scrape_result['content']: if isinstance(self.scrape_result['content'], BytesIO): response._content = self.scrape_result['content'].getvalue() elif isinstance(self.scrape_result['content'], bytes): response._content = self.scrape_result['content'] elif isinstance(self.scrape_result['content'], str): response._content = self.scrape_result['content'].encode('utf-8') else: response._content = None response.headers.update(self.scrape_result['response_headers']) response.url = self.scrape_result['url'] response.request = Request( method=self.config['method'], url=self.config['url'], headers=self.scrape_result['request_headers'], data=self.config['body'] if self.config['body'] else None ) if 'set-cookie' in response.headers: for raw_cookie in response.headers['set-cookie']: for name, cookie in SimpleCookie(raw_cookie).items(): expires = cookie.get('expires') if expires == '': expires = None if expires: try: expires = parse(expires).timestamp() except ValueError: expires = None if type(expires) == str: if '.' in expires: expires = float(expires) else: expires = int(expires) response.cookies.set_cookie(Cookie( version=cookie.get('version') if cookie.get('version') else None, name=name, value=cookie.value, path=cookie.get('path', ''), expires=expires, comment=cookie.get('comment'), domain=cookie.get('domain', ''), secure=cookie.get('secure'), port=None, port_specified=False, domain_specified=cookie.get('domain') is not None and cookie.get('domain') != '', domain_initial_dot=bool(cookie.get('domain').startswith('.')) if cookie.get('domain') is not None else False, path_specified=cookie.get('path') != '' and cookie.get('path') is not None, discard=False, comment_url=None, rest={ 'httponly': cookie.get('httponly'), 'samesite': cookie.get('samesite'), 'max-age': cookie.get('max-age') } )) return response
Inherited members
class ScrapeConfig (url: str,
retry: bool = True,
method: str = 'GET',
country: str | None = None,
render_js: bool = False,
cache: bool = False,
cache_clear: bool = False,
ssl: bool = False,
dns: bool = False,
asp: bool = False,
debug: bool = False,
raise_on_upstream_error: bool = True,
cache_ttl: int | None = None,
proxy_pool: str | None = None,
session: str | None = None,
tags: List[str] | Set[str] | None = None,
format: Format | None = None,
format_options: List[FormatOption] | None = None,
extraction_template: str | None = None,
extraction_ephemeral_template: Dict | None = None,
extraction_prompt: str | None = None,
extraction_model: str | None = None,
correlation_id: str | None = None,
cookies: requests.structures.CaseInsensitiveDict | None = None,
body: str | None = None,
data: Dict | None = None,
headers: requests.structures.CaseInsensitiveDict | Dict[str, str] | None = None,
js: str = None,
rendering_wait: int = None,
rendering_stage: Literal['complete', 'domcontentloaded'] = 'complete',
wait_for_selector: str | None = None,
screenshots: Dict | None = None,
screenshot_flags: List[ScreenshotFlag] | None = None,
session_sticky_proxy: bool | None = None,
webhook: str | None = None,
timeout: int | None = None,
js_scenario: List | None = None,
extract: Dict | None = None,
os: str | None = None,
lang: List[str] | None = None,
auto_scroll: bool | None = None,
cost_budget: int | None = None)-
Expand source code
class ScrapeConfig(BaseApiConfig): PUBLIC_DATACENTER_POOL = 'public_datacenter_pool' PUBLIC_RESIDENTIAL_POOL = 'public_residential_pool' url: str retry: bool = True method: str = 'GET' country: Optional[str] = None render_js: bool = False cache: bool = False cache_clear:bool = False ssl:bool = False dns:bool = False asp:bool = False debug: bool = False raise_on_upstream_error:bool = True cache_ttl:Optional[int] = None proxy_pool:Optional[str] = None session: Optional[str] = None tags: Optional[List[str]] = None format: Optional[Format] = None, # raw(unchanged) format_options: Optional[List[FormatOption]] extraction_template: Optional[str] = None # a saved template name extraction_ephemeral_template: Optional[Dict] # ephemeraly declared json template extraction_prompt: Optional[str] = None extraction_model: Optional[str] = None correlation_id: Optional[str] = None cookies: Optional[CaseInsensitiveDict] = None body: Optional[str] = None data: Optional[Dict] = None headers: Optional[CaseInsensitiveDict] = None js: str = None rendering_wait: int = None rendering_stage: Literal["complete", "domcontentloaded"] = "complete" wait_for_selector: Optional[str] = None session_sticky_proxy:bool = True screenshots:Optional[Dict]=None screenshot_flags: Optional[List[ScreenshotFlag]] = None, webhook:Optional[str]=None timeout:Optional[int]=None # in milliseconds js_scenario: Dict = None extract: Dict = None lang:Optional[List[str]] = None os:Optional[str] = None auto_scroll:Optional[bool] = None cost_budget:Optional[int] = None def __init__( self, url: str, retry: bool = True, method: str = 'GET', country: Optional[str] = None, render_js: bool = False, cache: bool = False, cache_clear:bool = False, ssl:bool = False, dns:bool = False, asp:bool = False, debug: bool = False, raise_on_upstream_error:bool = True, cache_ttl:Optional[int] = None, proxy_pool:Optional[str] = None, session: Optional[str] = None, tags: Optional[Union[List[str], Set[str]]] = None, format: Optional[Format] = None, # raw(unchanged) format_options: Optional[List[FormatOption]] = None, # raw(unchanged) extraction_template: Optional[str] = None, # a saved template name extraction_ephemeral_template: Optional[Dict] = None, # ephemeraly declared json template extraction_prompt: Optional[str] = None, extraction_model: Optional[str] = None, correlation_id: Optional[str] = None, cookies: Optional[CaseInsensitiveDict] = None, body: Optional[str] = None, data: Optional[Dict] = None, headers: Optional[Union[CaseInsensitiveDict, Dict[str, str]]] = None, js: str = None, rendering_wait: int = None, rendering_stage: Literal["complete", "domcontentloaded"] = "complete", wait_for_selector: Optional[str] = None, screenshots:Optional[Dict]=None, screenshot_flags: Optional[List[ScreenshotFlag]] = None, session_sticky_proxy:Optional[bool] = None, webhook:Optional[str] = None, timeout:Optional[int] = None, # in milliseconds js_scenario:Optional[List] = None, extract:Optional[Dict] = None, os:Optional[str] = None, lang:Optional[List[str]] = None, auto_scroll:Optional[bool] = None, cost_budget:Optional[int] = None ): assert(type(url) is str) if isinstance(tags, List): tags = set(tags) cookies = cookies or {} headers = headers or {} self.cookies = CaseInsensitiveDict(cookies) self.headers = CaseInsensitiveDict(headers) self.url = url self.retry = retry self.method = method self.country = country self.session_sticky_proxy = session_sticky_proxy self.render_js = render_js self.cache = cache self.cache_clear = cache_clear self.asp = asp self.webhook = webhook self.session = session self.debug = debug self.cache_ttl = cache_ttl self.proxy_pool = proxy_pool self.tags = tags or set() self.format = format self.format_options = format_options self.extraction_template = extraction_template self.extraction_ephemeral_template = extraction_ephemeral_template self.extraction_prompt = extraction_prompt self.extraction_model = extraction_model self.correlation_id = correlation_id self.wait_for_selector = wait_for_selector self.body = body self.data = data self.js = js self.rendering_wait = rendering_wait self.rendering_stage = rendering_stage self.raise_on_upstream_error = raise_on_upstream_error self.screenshots = screenshots self.screenshot_flags = screenshot_flags self.key = None self.dns = dns self.ssl = ssl self.js_scenario = js_scenario self.timeout = timeout self.extract = extract self.lang = lang self.os = os self.auto_scroll = auto_scroll self.cost_budget = cost_budget if cookies: _cookies = [] for name, value in cookies.items(): _cookies.append(name + '=' + value) if 'cookie' in self.headers: if self.headers['cookie'][-1] != ';': self.headers['cookie'] += ';' else: self.headers['cookie'] = '' self.headers['cookie'] += '; '.join(_cookies) if self.body and self.data: raise ScrapeConfigError('You cannot pass both parameters body and data. You must choose') if method in ['POST', 'PUT', 'PATCH']: if self.body is None and self.data is not None: if 'content-type' not in self.headers: self.headers['content-type'] = 'application/x-www-form-urlencoded' self.body = urlencode(data) else: if self.headers['content-type'].find('application/json') != -1: self.body = json.dumps(data) elif self.headers['content-type'].find('application/x-www-form-urlencoded') != -1: self.body = urlencode(data) else: raise ScrapeConfigError('Content-Type "%s" not supported, use body parameter to pass pre encoded body according to your content type' % self.headers['content-type']) elif self.body is None and self.data is None: self.headers['content-type'] = 'text/plain' def to_api_params(self, key:str) -> Dict: params = { 'key': self.key or key, 'url': self.url } if self.country is not None: params['country'] = self.country for name, value in self.headers.items(): params['headers[%s]' % name] = value if self.webhook is not None: params['webhook_name'] = self.webhook if self.timeout is not None: params['timeout'] = self.timeout if self.extract is not None: params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8') if self.cost_budget is not None: params['cost_budget'] = self.cost_budget if self.render_js is True: params['render_js'] = self._bool_to_http(self.render_js) if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.js_scenario: params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8') if self.rendering_wait: params['rendering_wait'] = self.rendering_wait if self.rendering_stage: params['rendering_stage'] = self.rendering_stage if self.screenshots is not None: for name, element in self.screenshots.items(): params['screenshots[%s]' % name] = element if self.screenshot_flags is not None: self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags] params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags) else: if self.screenshot_flags is not None: logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled') if self.auto_scroll is True: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) else: if self.wait_for_selector is not None: logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled') if self.screenshots: logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled') if self.js_scenario: logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled') if self.js: logging.warning('Params "js" is ignored. Works only if render_js is enabled') if self.rendering_wait: logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled') if self.asp is True: params['asp'] = self._bool_to_http(self.asp) if self.retry is False: params['retry'] = self._bool_to_http(self.retry) if self.cache is True: params['cache'] = self._bool_to_http(self.cache) if self.cache_clear is True: params['cache_clear'] = self._bool_to_http(self.cache_clear) if self.cache_ttl is not None: params['cache_ttl'] = self.cache_ttl else: if self.cache_clear is True: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.dns is True: params['dns'] = self._bool_to_http(self.dns) if self.ssl is True: params['ssl'] = self._bool_to_http(self.ssl) if self.tags: params['tags'] = ','.join(self.tags) if self.format: params['format'] = Format(self.format).value if self.format_options: params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options) if self.extraction_template and self.extraction_ephemeral_template: raise ScrapeConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose') if self.extraction_template: params['extraction_template'] = self.extraction_template if self.extraction_ephemeral_template: self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template) params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8') if self.extraction_prompt: params['extraction_prompt'] = quote_plus(self.extraction_prompt) if self.extraction_model: params['extraction_model'] = self.extraction_model if self.correlation_id: params['correlation_id'] = self.correlation_id if self.session: params['session'] = self.session if self.session_sticky_proxy is True: # false by default params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy) else: if self.session_sticky_proxy: logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled') if self.debug is True: params['debug'] = self._bool_to_http(self.debug) if self.proxy_pool is not None: params['proxy_pool'] = self.proxy_pool if self.lang is not None: params['lang'] = ','.join(self.lang) if self.os is not None: params['os'] = self.os return params @staticmethod def from_exported_config(config:str) -> 'ScrapeConfig': try: from msgpack import loads as msgpack_loads except ImportError as e: print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack') raise data = msgpack_loads(base64.b64decode(config)) headers = {} for name, value in data['headers'].items(): if isinstance(value, Iterable): headers[name] = '; '.join(value) else: headers[name] = value return ScrapeConfig( url=data['url'], retry=data['retry'], headers=headers, session=data['session'], session_sticky_proxy=data['session_sticky_proxy'], cache=data['cache'], cache_ttl=data['cache_ttl'], cache_clear=data['cache_clear'], render_js=data['render_js'], method=data['method'], asp=data['asp'], body=data['body'], ssl=data['ssl'], dns=data['dns'], country=data['country'], debug=data['debug'], correlation_id=data['correlation_id'], tags=data['tags'], format=data['format'], js=data['js'], rendering_wait=data['rendering_wait'], screenshots=data['screenshots'] or {}, screenshot_flags=data['screenshot_flags'], proxy_pool=data['proxy_pool'], auto_scroll=data['auto_scroll'], cost_budget=data['cost_budget'] ) def to_dict(self) -> Dict: """ Export the ScrapeConfig instance to a plain dictionary. Useful for JSON-serialization or other external storage. """ return { 'url': self.url, 'retry': self.retry, 'method': self.method, 'country': self.country, 'render_js': self.render_js, 'cache': self.cache, 'cache_clear': self.cache_clear, 'ssl': self.ssl, 'dns': self.dns, 'asp': self.asp, 'debug': self.debug, 'raise_on_upstream_error': self.raise_on_upstream_error, 'cache_ttl': self.cache_ttl, 'proxy_pool': self.proxy_pool, 'session': self.session, 'tags': list(self.tags), 'format': Format(self.format).value if self.format else None, 'format_options': [FormatOption(option).value for option in self.format_options] if self.format_options else None, 'extraction_template': self.extraction_template, 'extraction_ephemeral_template': self.extraction_ephemeral_template, 'extraction_prompt': self.extraction_prompt, 'extraction_model': self.extraction_model, 'correlation_id': self.correlation_id, 'cookies': CaseInsensitiveDict(self.cookies), 'body': self.body, 'data': None if self.body else self.data, 'headers': CaseInsensitiveDict(self.headers), 'js': self.js, 'rendering_wait': self.rendering_wait, 'wait_for_selector': self.wait_for_selector, 'session_sticky_proxy': self.session_sticky_proxy, 'screenshots': self.screenshots, 'screenshot_flags': [ScreenshotFlag(flag).value for flag in self.screenshot_flags] if self.screenshot_flags else None, 'webhook': self.webhook, 'timeout': self.timeout, 'js_scenario': self.js_scenario, 'extract': self.extract, 'lang': self.lang, 'os': self.os, 'auto_scroll': self.auto_scroll, 'cost_budget': self.cost_budget, } @staticmethod def from_dict(scrape_config_dict: Dict) -> 'ScrapeConfig': """Create a ScrapeConfig instance from a dictionary.""" url = scrape_config_dict.get('url', None) retry = scrape_config_dict.get('retry', False) method = scrape_config_dict.get('method', 'GET') country = scrape_config_dict.get('country', None) render_js = scrape_config_dict.get('render_js', False) cache = scrape_config_dict.get('cache', False) cache_clear = scrape_config_dict.get('cache_clear', False) ssl = scrape_config_dict.get('ssl', False) dns = scrape_config_dict.get('dns', False) asp = scrape_config_dict.get('asp', False) debug = scrape_config_dict.get('debug', False) raise_on_upstream_error = scrape_config_dict.get('raise_on_upstream_error', True) cache_ttl = scrape_config_dict.get('cache_ttl', None) proxy_pool = scrape_config_dict.get('proxy_pool', None) session = scrape_config_dict.get('session', None) tags = scrape_config_dict.get('tags', []) format = scrape_config_dict.get('format', None) format = Format(format) if format else None format_options = scrape_config_dict.get('format_options', None) format_options = [FormatOption(option) for option in format_options] if format_options else None extraction_template = scrape_config_dict.get('extraction_template', None) extraction_ephemeral_template = scrape_config_dict.get('extraction_ephemeral_template', None) extraction_prompt = scrape_config_dict.get('extraction_prompt', None) extraction_model = scrape_config_dict.get('extraction_model', None) correlation_id = scrape_config_dict.get('correlation_id', None) cookies = scrape_config_dict.get('cookies', {}) body = scrape_config_dict.get('body', None) data = scrape_config_dict.get('data', None) headers = scrape_config_dict.get('headers', {}) js = scrape_config_dict.get('js', None) rendering_wait = scrape_config_dict.get('rendering_wait', None) wait_for_selector = scrape_config_dict.get('wait_for_selector', None) screenshots = scrape_config_dict.get('screenshots', []) screenshot_flags = scrape_config_dict.get('screenshot_flags', []) screenshot_flags = [ScreenshotFlag(flag) for flag in screenshot_flags] if screenshot_flags else None session_sticky_proxy = scrape_config_dict.get('session_sticky_proxy', False) webhook = scrape_config_dict.get('webhook', None) timeout = scrape_config_dict.get('timeout', None) js_scenario = scrape_config_dict.get('js_scenario', None) extract = scrape_config_dict.get('extract', None) os = scrape_config_dict.get('os', None) lang = scrape_config_dict.get('lang', None) auto_scroll = scrape_config_dict.get('auto_scroll', None) cost_budget = scrape_config_dict.get('cost_budget', None) return ScrapeConfig( url=url, retry=retry, method=method, country=country, render_js=render_js, cache=cache, cache_clear=cache_clear, ssl=ssl, dns=dns, asp=asp, debug=debug, raise_on_upstream_error=raise_on_upstream_error, cache_ttl=cache_ttl, proxy_pool=proxy_pool, session=session, tags=tags, format=format, format_options=format_options, extraction_template=extraction_template, extraction_ephemeral_template=extraction_ephemeral_template, extraction_prompt=extraction_prompt, extraction_model=extraction_model, correlation_id=correlation_id, cookies=cookies, body=body, data=data, headers=headers, js=js, rendering_wait=rendering_wait, wait_for_selector=wait_for_selector, screenshots=screenshots, screenshot_flags=screenshot_flags, session_sticky_proxy=session_sticky_proxy, webhook=webhook, timeout=timeout, js_scenario=js_scenario, extract=extract, os=os, lang=lang, auto_scroll=auto_scroll, cost_budget=cost_budget, )Ancestors
Class variables
var PUBLIC_DATACENTER_POOLvar PUBLIC_RESIDENTIAL_POOLvar asp : boolvar auto_scroll : bool | Nonevar body : str | Nonevar cache : boolvar cache_clear : boolvar cache_ttl : int | Nonevar correlation_id : str | Nonevar cost_budget : int | Nonevar country : str | Nonevar data : Dict | Nonevar debug : boolvar dns : boolvar extract : Dictvar extraction_ephemeral_template : Dict | Nonevar extraction_model : str | Nonevar extraction_prompt : str | Nonevar extraction_template : str | Nonevar format : Format | Nonevar format_options : List[FormatOption] | Nonevar headers : requests.structures.CaseInsensitiveDict | Nonevar js : strvar js_scenario : Dictvar lang : List[str] | Nonevar method : strvar os : str | Nonevar proxy_pool : str | Nonevar raise_on_upstream_error : boolvar render_js : boolvar rendering_stage : Literal['complete', 'domcontentloaded']var rendering_wait : intvar retry : boolvar screenshot_flags : List[ScreenshotFlag] | Nonevar screenshots : Dict | Nonevar session : str | Nonevar session_sticky_proxy : boolvar ssl : boolvar timeout : int | Nonevar url : strvar wait_for_selector : str | Nonevar webhook : str | None
Static methods
def from_dict(scrape_config_dict: Dict) ‑> ScrapeConfig-
Expand source code
@staticmethod def from_dict(scrape_config_dict: Dict) -> 'ScrapeConfig': """Create a ScrapeConfig instance from a dictionary.""" url = scrape_config_dict.get('url', None) retry = scrape_config_dict.get('retry', False) method = scrape_config_dict.get('method', 'GET') country = scrape_config_dict.get('country', None) render_js = scrape_config_dict.get('render_js', False) cache = scrape_config_dict.get('cache', False) cache_clear = scrape_config_dict.get('cache_clear', False) ssl = scrape_config_dict.get('ssl', False) dns = scrape_config_dict.get('dns', False) asp = scrape_config_dict.get('asp', False) debug = scrape_config_dict.get('debug', False) raise_on_upstream_error = scrape_config_dict.get('raise_on_upstream_error', True) cache_ttl = scrape_config_dict.get('cache_ttl', None) proxy_pool = scrape_config_dict.get('proxy_pool', None) session = scrape_config_dict.get('session', None) tags = scrape_config_dict.get('tags', []) format = scrape_config_dict.get('format', None) format = Format(format) if format else None format_options = scrape_config_dict.get('format_options', None) format_options = [FormatOption(option) for option in format_options] if format_options else None extraction_template = scrape_config_dict.get('extraction_template', None) extraction_ephemeral_template = scrape_config_dict.get('extraction_ephemeral_template', None) extraction_prompt = scrape_config_dict.get('extraction_prompt', None) extraction_model = scrape_config_dict.get('extraction_model', None) correlation_id = scrape_config_dict.get('correlation_id', None) cookies = scrape_config_dict.get('cookies', {}) body = scrape_config_dict.get('body', None) data = scrape_config_dict.get('data', None) headers = scrape_config_dict.get('headers', {}) js = scrape_config_dict.get('js', None) rendering_wait = scrape_config_dict.get('rendering_wait', None) wait_for_selector = scrape_config_dict.get('wait_for_selector', None) screenshots = scrape_config_dict.get('screenshots', []) screenshot_flags = scrape_config_dict.get('screenshot_flags', []) screenshot_flags = [ScreenshotFlag(flag) for flag in screenshot_flags] if screenshot_flags else None session_sticky_proxy = scrape_config_dict.get('session_sticky_proxy', False) webhook = scrape_config_dict.get('webhook', None) timeout = scrape_config_dict.get('timeout', None) js_scenario = scrape_config_dict.get('js_scenario', None) extract = scrape_config_dict.get('extract', None) os = scrape_config_dict.get('os', None) lang = scrape_config_dict.get('lang', None) auto_scroll = scrape_config_dict.get('auto_scroll', None) cost_budget = scrape_config_dict.get('cost_budget', None) return ScrapeConfig( url=url, retry=retry, method=method, country=country, render_js=render_js, cache=cache, cache_clear=cache_clear, ssl=ssl, dns=dns, asp=asp, debug=debug, raise_on_upstream_error=raise_on_upstream_error, cache_ttl=cache_ttl, proxy_pool=proxy_pool, session=session, tags=tags, format=format, format_options=format_options, extraction_template=extraction_template, extraction_ephemeral_template=extraction_ephemeral_template, extraction_prompt=extraction_prompt, extraction_model=extraction_model, correlation_id=correlation_id, cookies=cookies, body=body, data=data, headers=headers, js=js, rendering_wait=rendering_wait, wait_for_selector=wait_for_selector, screenshots=screenshots, screenshot_flags=screenshot_flags, session_sticky_proxy=session_sticky_proxy, webhook=webhook, timeout=timeout, js_scenario=js_scenario, extract=extract, os=os, lang=lang, auto_scroll=auto_scroll, cost_budget=cost_budget, )Create a ScrapeConfig instance from a dictionary.
def from_exported_config(config: str) ‑> ScrapeConfig-
Expand source code
@staticmethod def from_exported_config(config:str) -> 'ScrapeConfig': try: from msgpack import loads as msgpack_loads except ImportError as e: print('You must install msgpack package - run: pip install "scrapfly-sdk[seepdup] or pip install msgpack') raise data = msgpack_loads(base64.b64decode(config)) headers = {} for name, value in data['headers'].items(): if isinstance(value, Iterable): headers[name] = '; '.join(value) else: headers[name] = value return ScrapeConfig( url=data['url'], retry=data['retry'], headers=headers, session=data['session'], session_sticky_proxy=data['session_sticky_proxy'], cache=data['cache'], cache_ttl=data['cache_ttl'], cache_clear=data['cache_clear'], render_js=data['render_js'], method=data['method'], asp=data['asp'], body=data['body'], ssl=data['ssl'], dns=data['dns'], country=data['country'], debug=data['debug'], correlation_id=data['correlation_id'], tags=data['tags'], format=data['format'], js=data['js'], rendering_wait=data['rendering_wait'], screenshots=data['screenshots'] or {}, screenshot_flags=data['screenshot_flags'], proxy_pool=data['proxy_pool'], auto_scroll=data['auto_scroll'], cost_budget=data['cost_budget'] )
Methods
def to_api_params(self, key: str) ‑> Dict-
Expand source code
def to_api_params(self, key:str) -> Dict: params = { 'key': self.key or key, 'url': self.url } if self.country is not None: params['country'] = self.country for name, value in self.headers.items(): params['headers[%s]' % name] = value if self.webhook is not None: params['webhook_name'] = self.webhook if self.timeout is not None: params['timeout'] = self.timeout if self.extract is not None: params['extract'] = base64.urlsafe_b64encode(json.dumps(self.extract).encode('utf-8')).decode('utf-8') if self.cost_budget is not None: params['cost_budget'] = self.cost_budget if self.render_js is True: params['render_js'] = self._bool_to_http(self.render_js) if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.js_scenario: params['js_scenario'] = base64.urlsafe_b64encode(json.dumps(self.js_scenario).encode('utf-8')).decode('utf-8') if self.rendering_wait: params['rendering_wait'] = self.rendering_wait if self.rendering_stage: params['rendering_stage'] = self.rendering_stage if self.screenshots is not None: for name, element in self.screenshots.items(): params['screenshots[%s]' % name] = element if self.screenshot_flags is not None: self.screenshot_flags = [ScreenshotFlag(flag) for flag in self.screenshot_flags] params["screenshot_flags"] = ",".join(flag.value for flag in self.screenshot_flags) else: if self.screenshot_flags is not None: logging.warning('Params "screenshot_flags" is ignored. Works only if screenshots is enabled') if self.auto_scroll is True: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) else: if self.wait_for_selector is not None: logging.warning('Params "wait_for_selector" is ignored. Works only if render_js is enabled') if self.screenshots: logging.warning('Params "screenshots" is ignored. Works only if render_js is enabled') if self.js_scenario: logging.warning('Params "js_scenario" is ignored. Works only if render_js is enabled') if self.js: logging.warning('Params "js" is ignored. Works only if render_js is enabled') if self.rendering_wait: logging.warning('Params "rendering_wait" is ignored. Works only if render_js is enabled') if self.asp is True: params['asp'] = self._bool_to_http(self.asp) if self.retry is False: params['retry'] = self._bool_to_http(self.retry) if self.cache is True: params['cache'] = self._bool_to_http(self.cache) if self.cache_clear is True: params['cache_clear'] = self._bool_to_http(self.cache_clear) if self.cache_ttl is not None: params['cache_ttl'] = self.cache_ttl else: if self.cache_clear is True: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.dns is True: params['dns'] = self._bool_to_http(self.dns) if self.ssl is True: params['ssl'] = self._bool_to_http(self.ssl) if self.tags: params['tags'] = ','.join(self.tags) if self.format: params['format'] = Format(self.format).value if self.format_options: params['format'] += ':' + ','.join(FormatOption(option).value for option in self.format_options) if self.extraction_template and self.extraction_ephemeral_template: raise ScrapeConfigError('You cannot pass both parameters extraction_template and extraction_ephemeral_template. You must choose') if self.extraction_template: params['extraction_template'] = self.extraction_template if self.extraction_ephemeral_template: self.extraction_ephemeral_template = json.dumps(self.extraction_ephemeral_template) params['extraction_template'] = 'ephemeral:' + urlsafe_b64encode(self.extraction_ephemeral_template.encode('utf-8')).decode('utf-8') if self.extraction_prompt: params['extraction_prompt'] = quote_plus(self.extraction_prompt) if self.extraction_model: params['extraction_model'] = self.extraction_model if self.correlation_id: params['correlation_id'] = self.correlation_id if self.session: params['session'] = self.session if self.session_sticky_proxy is True: # false by default params['session_sticky_proxy'] = self._bool_to_http(self.session_sticky_proxy) else: if self.session_sticky_proxy: logging.warning('Params "session_sticky_proxy" is ignored. Works only if session is enabled') if self.debug is True: params['debug'] = self._bool_to_http(self.debug) if self.proxy_pool is not None: params['proxy_pool'] = self.proxy_pool if self.lang is not None: params['lang'] = ','.join(self.lang) if self.os is not None: params['os'] = self.os return params def to_dict(self) ‑> Dict-
Expand source code
def to_dict(self) -> Dict: """ Export the ScrapeConfig instance to a plain dictionary. Useful for JSON-serialization or other external storage. """ return { 'url': self.url, 'retry': self.retry, 'method': self.method, 'country': self.country, 'render_js': self.render_js, 'cache': self.cache, 'cache_clear': self.cache_clear, 'ssl': self.ssl, 'dns': self.dns, 'asp': self.asp, 'debug': self.debug, 'raise_on_upstream_error': self.raise_on_upstream_error, 'cache_ttl': self.cache_ttl, 'proxy_pool': self.proxy_pool, 'session': self.session, 'tags': list(self.tags), 'format': Format(self.format).value if self.format else None, 'format_options': [FormatOption(option).value for option in self.format_options] if self.format_options else None, 'extraction_template': self.extraction_template, 'extraction_ephemeral_template': self.extraction_ephemeral_template, 'extraction_prompt': self.extraction_prompt, 'extraction_model': self.extraction_model, 'correlation_id': self.correlation_id, 'cookies': CaseInsensitiveDict(self.cookies), 'body': self.body, 'data': None if self.body else self.data, 'headers': CaseInsensitiveDict(self.headers), 'js': self.js, 'rendering_wait': self.rendering_wait, 'wait_for_selector': self.wait_for_selector, 'session_sticky_proxy': self.session_sticky_proxy, 'screenshots': self.screenshots, 'screenshot_flags': [ScreenshotFlag(flag).value for flag in self.screenshot_flags] if self.screenshot_flags else None, 'webhook': self.webhook, 'timeout': self.timeout, 'js_scenario': self.js_scenario, 'extract': self.extract, 'lang': self.lang, 'os': self.os, 'auto_scroll': self.auto_scroll, 'cost_budget': self.cost_budget, }Export the ScrapeConfig instance to a plain dictionary. Useful for JSON-serialization or other external storage.
class ScraperAPI-
Expand source code
class ScraperAPI: MONITORING_DATA_FORMAT_STRUCTURED = 'structured' MONITORING_DATA_FORMAT_PROMETHEUS = 'prometheus' MONITORING_PERIOD_SUBSCRIPTION = 'subscription' MONITORING_PERIOD_LAST_7D = 'last7d' MONITORING_PERIOD_LAST_24H = 'last24h' MONITORING_PERIOD_LAST_1H = 'last1h' MONITORING_PERIOD_LAST_5m = 'last5m' MONITORING_ACCOUNT_AGGREGATION = 'account' MONITORING_PROJECT_AGGREGATION = 'project' MONITORING_TARGET_AGGREGATION = 'target'Class variables
var MONITORING_ACCOUNT_AGGREGATIONvar MONITORING_DATA_FORMAT_PROMETHEUSvar MONITORING_DATA_FORMAT_STRUCTUREDvar MONITORING_PERIOD_LAST_1Hvar MONITORING_PERIOD_LAST_24Hvar MONITORING_PERIOD_LAST_5mvar MONITORING_PERIOD_LAST_7Dvar MONITORING_PERIOD_SUBSCRIPTIONvar MONITORING_PROJECT_AGGREGATIONvar MONITORING_TARGET_AGGREGATION
class ScrapflyAspError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflyAspError(ScraperAPIError): passCommon base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyClient (key: str,
host: str = 'https://api.scrapfly.io',
verify=True,
debug: bool = False,
max_concurrency: int = 1,
connect_timeout: int = 30,
web_scraping_api_read_timeout: int = 160,
extraction_api_read_timeout: int = 35,
screenshot_api_read_timeout: int = 60,
read_timeout: int = 30,
default_read_timeout: int = 30,
reporter: Callable | None = None,
**kwargs)-
Expand source code
class ScrapflyClient: HOST = 'https://api.scrapfly.io' DEFAULT_CONNECT_TIMEOUT = 30 DEFAULT_READ_TIMEOUT = 30 DEFAULT_WEBSCRAPING_API_READ_TIMEOUT = 160 # 155 real DEFAULT_SCREENSHOT_API_READ_TIMEOUT = 60 # 30 real DEFAULT_EXTRACTION_API_READ_TIMEOUT = 35 # 30 real DEFAULT_CRAWLER_API_READ_TIMEOUT = 30 host:str key:str max_concurrency:int verify:bool debug:bool distributed_mode:bool connect_timeout:int web_scraping_api_read_timeout:int screenshot_api_read_timeout:int extraction_api_read_timeout:int monitoring_api_read_timeout:int default_read_timeout:int brotli: bool reporter:Reporter version:str # @deprecated read_timeout:int CONCURRENCY_AUTO = 'auto' # retrieve the allowed concurrency from your account DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S' def __init__( self, key: str, host: str = HOST, verify=True, debug: bool = False, max_concurrency:int=1, connect_timeout:int = DEFAULT_CONNECT_TIMEOUT, web_scraping_api_read_timeout: int = DEFAULT_WEBSCRAPING_API_READ_TIMEOUT, extraction_api_read_timeout: int = DEFAULT_EXTRACTION_API_READ_TIMEOUT, screenshot_api_read_timeout: int = DEFAULT_SCREENSHOT_API_READ_TIMEOUT, # @deprecated read_timeout:int = DEFAULT_READ_TIMEOUT, default_read_timeout:int = DEFAULT_READ_TIMEOUT, reporter:Optional[Callable]=None, **kwargs ): if host[-1] == '/': # remove last '/' if exists host = host[:-1] if 'distributed_mode' in kwargs: warnings.warn("distributed mode is deprecated and will be remove the next version -" " user should handle themself the session name based on the concurrency", DeprecationWarning, stacklevel=2 ) if 'brotli' in kwargs: warnings.warn("brotli arg is deprecated and will be remove the next version - " "brotli is disabled by default", DeprecationWarning, stacklevel=2 ) self.version = __version__ self.host = host self.key = key self.verify = verify self.debug = debug self.connect_timeout = connect_timeout self.web_scraping_api_read_timeout = web_scraping_api_read_timeout self.screenshot_api_read_timeout = screenshot_api_read_timeout self.extraction_api_read_timeout = extraction_api_read_timeout self.monitoring_api_read_timeout = default_read_timeout self.default_read_timeout = default_read_timeout # @deprecated self.read_timeout = default_read_timeout self.max_concurrency = max_concurrency self.body_handler = ResponseBodyHandler(use_brotli=False) self.async_executor = ThreadPoolExecutor() self.http_session = None if not self.verify and not self.HOST.endswith('.local'): urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) if self.debug is True: http.client.HTTPConnection.debuglevel = 5 if reporter is None: from .reporter import NoopReporter reporter = NoopReporter() self.reporter = Reporter(reporter) @property def ua(self) -> str: return 'ScrapflySDK/%s (Python %s, %s, %s)' % ( self.version, platform.python_version(), platform.uname().system, platform.uname().machine ) @cached_property def _http_handler(self): return partial(self.http_session.request if self.http_session else requests.request) @property def http(self): return self._http_handler def _scrape_request(self, scrape_config:ScrapeConfig): return { 'method': scrape_config.method, 'url': self.host + '/scrape', 'data': scrape_config.body, 'verify': self.verify, 'timeout': (self.connect_timeout, self.web_scraping_api_read_timeout), 'headers': { 'content-type': scrape_config.headers['content-type'] if scrape_config.method in ['POST', 'PUT', 'PATCH'] else self.body_handler.content_type, 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, 'params': scrape_config.to_api_params(key=self.key) } def _screenshot_request(self, screenshot_config:ScreenshotConfig): return { 'method': 'GET', 'url': self.host + '/screenshot', 'timeout': (self.connect_timeout, self.screenshot_api_read_timeout), 'headers': { 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, 'params': screenshot_config.to_api_params(key=self.key) } def _extraction_request(self, extraction_config:ExtractionConfig): headers = { 'content-type': extraction_config.content_type, 'accept-encoding': self.body_handler.content_encoding, 'content-encoding': extraction_config.document_compression_format if extraction_config.document_compression_format else None, 'accept': self.body_handler.accept, 'user-agent': self.ua } if extraction_config.document_compression_format: headers['content-encoding'] = extraction_config.document_compression_format.value return { 'method': 'POST', 'url': self.host + '/extraction', 'data': extraction_config.body, 'timeout': (self.connect_timeout, self.extraction_api_read_timeout), 'headers': headers, 'params': extraction_config.to_api_params(key=self.key) } def account(self) -> Union[str, Dict]: response = self._http_handler( method='GET', url=self.host + '/account', params={'key': self.key}, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8') def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None): params = {'key': self.key, 'format': format} if period is not None: params['period'] = period if aggregation is not None: params['aggregation'] = ','.join(aggregation) response = self._http_handler( method='GET', url=self.host + '/scrape/monitoring/metrics', params=params, timeout=(self.connect_timeout, self.monitoring_api_read_timeout), verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8') def get_monitoring_target_metrics( self, domain:str, group_subdomain:bool=False, period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H, start:Optional[datetime.datetime]=None, end:Optional[datetime.datetime]=None, ): params = { 'key': self.key, 'domain': domain, 'group_subdomain': group_subdomain } if (start is not None and end is None) or (start is None and end is not None): raise ValueError('You must provide both start and end date') if start is not None and end is not None: params['start'] = start.strftime(self.DATETIME_FORMAT) params['end'] = end.strftime(self.DATETIME_FORMAT) period = None params['period'] = period response = self._http_handler( method='GET', url=self.host + '/scrape/monitoring/metrics/target', timeout=(self.connect_timeout, self.monitoring_api_read_timeout), params=params, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8') def resilient_scrape( self, scrape_config:ScrapeConfig, retry_on_errors:Set[Exception]={ScrapflyError}, retry_on_status_code:Optional[List[int]]=None, tries: int = 5, delay: int = 20, ) -> ScrapeApiResponse: assert retry_on_errors is not None, 'Retry on error is None' assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()' @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay) def inner() -> ScrapeApiResponse: try: return self.scrape(scrape_config=scrape_config) except (UpstreamHttpClientError, UpstreamHttpServerError) as e: if retry_on_status_code is not None and e.api_response: if e.api_response.upstream_status_code in retry_on_status_code: raise e else: return e.api_response raise e return inner() def open(self): if self.http_session is None: self.http_session = Session() self.http_session.verify = self.verify self.http_session.timeout = (self.connect_timeout, self.default_read_timeout) self.http_session.params['key'] = self.key self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding self.http_session.headers['accept'] = self.body_handler.accept self.http_session.headers['user-agent'] = self.ua def close(self): self.http_session.close() self.http_session = None def __enter__(self) -> 'ScrapflyClient': self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config) async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None): if concurrency is None: concurrency = self.max_concurrency elif concurrency == self.CONCURRENCY_AUTO: concurrency = self.account()['subscription']['max_concurrency'] loop = asyncio.get_running_loop() processing_tasks = [] results = [] processed_tasks = 0 expected_tasks = len(scrape_configs) def scrape_done_callback(task:Task): nonlocal processed_tasks try: if task.cancelled() is True: return error = task.exception() if error is not None: results.append(error) else: results.append(task.result()) finally: processing_tasks.remove(task) processed_tasks += 1 while scrape_configs or results or processing_tasks: logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) if scrape_configs: if len(processing_tasks) < concurrency: # @todo handle backpressure for _ in range(0, concurrency - len(processing_tasks)): try: scrape_config = scrape_configs.pop() except: break scrape_config.raise_on_upstream_error = False task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop)) processing_tasks.append(task) task.add_done_callback(scrape_done_callback) for _ in results: result = results.pop() yield result await asyncio.sleep(.5) logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse: """ Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } } """ try: logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url)) request_data = self._scrape_request(scrape_config=scrape_config) response = self._http_handler(**request_data) scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config) self.reporter.report(scrape_api_response=scrape_api_response) return scrape_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config) @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse: """ Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } } """ try: logger.debug('--> %s Screenshoting' % (screenshot_config.url)) request_data = self._screenshot_request(screenshot_config=screenshot_config) response = self._http_handler(**request_data) screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config) return screenshot_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.extract, extraction_config) @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse: """ Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } } """ try: logger.debug('--> %s Extracting data from' % (extraction_config.content_type)) request_data = self._extraction_request(extraction_config=extraction_config) response = self._http_handler(**request_data) extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config) return extraction_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise e def _handle_response(self, response:Response, scrape_config:ScrapeConfig) -> ScrapeApiResponse: try: api_response = self._handle_api_response( response=response, scrape_config=scrape_config, raise_on_upstream_error=scrape_config.raise_on_upstream_error ) if scrape_config.method == 'HEAD': logger.debug('<-- [%s %s] %s | %ss' % ( api_response.response.status_code, api_response.response.reason, api_response.response.request.url, 0 )) else: logger.debug('<-- [%s %s] %s | %ss' % ( api_response.result['result']['status_code'], api_response.result['result']['reason'], api_response.result['config']['url'], api_response.result['result']['duration']) ) logger.debug('Log url: %s' % api_response.result['result']['log_url']) return api_response except UpstreamHttpError as e: logger.critical(e.api_response.error_message) raise except HttpError as e: if e.api_response is not None: logger.critical(e.api_response.error_message) else: logger.critical(e.message) raise except ScrapflyError as e: logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url)) raise def _handle_screenshot_response(self, response:Response, screenshot_config:ScreenshotConfig) -> ScreenshotApiResponse: try: api_response = self._handle_screenshot_api_response( response=response, screenshot_config=screenshot_config, raise_on_upstream_error=screenshot_config.raise_on_upstream_error ) return api_response except UpstreamHttpError as e: logger.critical(e.api_response.error_message) raise except HttpError as e: if e.api_response is not None: logger.critical(e.api_response.error_message) else: logger.critical(e.message) raise except ScrapflyError as e: logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url)) raise def _handle_extraction_response(self, response:Response, extraction_config:ExtractionConfig) -> ExtractionApiResponse: try: api_response = self._handle_extraction_api_response( response=response, extraction_config=extraction_config, raise_on_upstream_error=extraction_config.raise_on_upstream_error ) return api_response except UpstreamHttpError as e: logger.critical(e.api_response.error_message) raise except HttpError as e: if e.api_response is not None: logger.critical(e.api_response.error_message) else: logger.critical(e.message) raise except ScrapflyError as e: logger.critical('<-- %s | Docs: %s' % (str(e), e.documentation_url)) raise def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None): """ Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str] """ if screenshot_api_response.screenshot_success is not True: raise RuntimeError('Screenshot was not successful') if not screenshot_api_response.image: raise RuntimeError('Screenshot binary does not exist') content = screenshot_api_response.image extension_name = screenshot_api_response.metadata['extension_name'] if path: os.makedirs(path, exist_ok=True) file_path = os.path.join(path, f'{name}.{extension_name}') else: file_path = f'{name}.{extension_name}' if isinstance(content, bytes): content = BytesIO(content) with open(file_path, 'wb') as f: shutil.copyfileobj(content, f, length=131072) def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None): """ Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str] """ if not api_response.scrape_result['screenshots']: raise RuntimeError('Screenshot %s do no exists' % name) try: api_response.scrape_result['screenshots'][name] except KeyError: raise RuntimeError('Screenshot %s do no exists' % name) screenshot_response = self._http_handler( method='GET', url=api_response.scrape_result['screenshots'][name]['url'], params={'key': self.key}, verify=self.verify ) screenshot_response.raise_for_status() if not name.endswith('.jpg'): name += '.jpg' api_response.sink(path=path, name=name, content=screenshot_response.content) def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str: scrape_result = api_response.result['result'] scrape_config = api_response.result['config'] file_content = content or scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = scrape_config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) return file_path def _handle_scrape_large_objects( self, callback_url:str, format: Literal['clob', 'blob'] ) -> Tuple[Union[BytesIO, str], str]: if format not in ['clob', 'blob']: raise ContentError('Large objects handle can handles format format [blob, clob], given: %s' % format) response = self._http_handler(**{ 'method': 'GET', 'url': callback_url, 'verify': self.verify, 'timeout': (self.connect_timeout, self.default_read_timeout), 'headers': { 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, 'params': {'key': self.key} }) if self.body_handler.support(headers=response.headers): content = self.body_handler(content=response.content, content_type=response.headers['content-type']) else: content = response.content if format == 'clob': return content.decode('utf-8'), 'text' return BytesIO(content), 'binary' def _handle_api_response( self, response: Response, scrape_config:ScrapeConfig, raise_on_upstream_error: Optional[bool] = True ) -> ScrapeApiResponse: if scrape_config.method == 'HEAD': body = None else: if self.body_handler.support(headers=response.headers): body = self.body_handler(content=response.content, content_type=response.headers['content-type']) else: body = response.content.decode('utf-8') api_response:ScrapeApiResponse = ScrapeApiResponse( response=response, request=response.request, api_result=body, scrape_config=scrape_config, large_object_handler=self._handle_scrape_large_objects ) api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error) return api_response def _handle_screenshot_api_response( self, response: Response, screenshot_config:ScreenshotConfig, raise_on_upstream_error: Optional[bool] = True ) -> ScreenshotApiResponse: if self.body_handler.support(headers=response.headers): body = self.body_handler(content=response.content, content_type=response.headers['content-type']) else: body = {'result': response.content} api_response:ScreenshotApiResponse = ScreenshotApiResponse( response=response, request=response.request, api_result=body, screenshot_config=screenshot_config ) api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error) return api_response def _handle_extraction_api_response( self, response: Response, extraction_config:ExtractionConfig, raise_on_upstream_error: Optional[bool] = True ) -> ExtractionApiResponse: if self.body_handler.support(headers=response.headers): body = self.body_handler(content=response.content, content_type=response.headers['content-type']) else: body = response.content.decode('utf-8') api_response:ExtractionApiResponse = ExtractionApiResponse( response=response, request=response.request, api_result=body, extraction_config=extraction_config ) api_response.raise_for_result(raise_on_upstream_error=raise_on_upstream_error) return api_response @backoff.on_exception(backoff.expo, exception=ConnectionError, max_tries=5) def start_crawl(self, crawler_config: CrawlerConfig) -> CrawlerStartResponse: """ Start a crawler job :param crawler_config: CrawlerConfig :return: CrawlerStartResponse with UUID and initial status Example: ```python from scrapfly import ScrapflyClient, CrawlerConfig client = ScrapflyClient(key='YOUR_API_KEY') config = CrawlerConfig( url='https://example.com', page_limit=100, max_depth=3 ) response = client.start_crawl(config) print(f"Crawler started: {response.uuid}") ``` """ # Get crawler config params (without key) body_params = crawler_config.to_api_params() # API key must be passed as query parameter, not in body query_params = {'key': self.key} timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT) url = f'{self.host}/crawl' logger.debug(f"Crawler API POST {url}?key=***") logger.debug(f"Crawler API body: {body_params}") response = self._http_handler( method='POST', url=url, params=query_params, # key as query param json=body_params, # config in body timeout=timeout, headers={'User-Agent': self.ua}, verify=self.verify ) if response.status_code not in (200, 201): # Log error details for debugging try: error_detail = response.json() except: error_detail = response.text logger.debug(f"Crawler API error ({response.status_code}): {error_detail}") self._handle_crawler_error_response(response) result = response.json() return CrawlerStartResponse(result) @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def get_crawl_status(self, uuid: str) -> CrawlerStatusResponse: """ Get crawler job status :param uuid: Crawler job UUID :return: CrawlerStatusResponse with progress information Example: ```python status = client.get_crawl_status(uuid) print(f"Status: {status.status}") print(f"Progress: {status.progress_pct:.1f}%") print(f"Crawled: {status.urls_crawled}/{status.urls_discovered}") if status.is_complete: print("Crawl completed!") ``` """ timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT) response = self._http_handler( method='GET', url=f'{self.host}/crawl/{uuid}/status', params={'key': self.key}, # key as query param (already correct) timeout=timeout, headers={'User-Agent': self.ua}, verify=self.verify ) if response.status_code != 200: self._handle_crawler_error_response(response) result = response.json() return CrawlerStatusResponse(result) def cancel_crawl(self, crawl_uuid: str) -> bool: """ Cancel a running crawler job :param crawl_uuid: Crawler job UUID to cancel :return: True if cancelled successfully Example: ```python # Start a crawl crawl = client.start_crawl(config) # Cancel it client.cancel_crawl(crawl.uuid) ``` """ timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT) response = self._http_handler( method='DELETE', url=f'{self.host}/crawl/{crawl_uuid}', params={'key': self.key}, timeout=timeout, headers={'User-Agent': self.ua}, verify=self.verify ) if response.status_code not in (200, 204): self._handle_crawler_error_response(response) return True @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def get_crawl_artifact( self, uuid: str, artifact_type: str = 'warc' ) -> CrawlerArtifactResponse: """ Download crawler job artifact :param uuid: Crawler job UUID :param artifact_type: Artifact type ('warc' or 'har') :return: CrawlerArtifactResponse with WARC data and parsing utilities Example: ```python # Wait for crawl to complete while True: status = client.get_crawl_status(uuid) if status.is_complete: break time.sleep(5) # Download artifact artifact = client.get_crawl_artifact(uuid) # Easy mode: get all pages pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") # Memory-efficient: iterate for record in artifact.iter_responses(): process(record.content) # Save to file artifact.save('crawl.warc.gz') ``` """ timeout = (self.connect_timeout, 300) # 5 minutes for large downloads response = self._http_handler( method='GET', url=f'{self.host}/crawl/{uuid}/artifact', params={ 'key': self.key, 'type': artifact_type }, timeout=timeout, headers={'User-Agent': self.ua}, verify=self.verify ) if response.status_code != 200: self._handle_crawler_error_response(response) return CrawlerArtifactResponse(response.content, artifact_type=artifact_type) @backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def get_crawl_contents( self, uuid: str, format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html' ) -> Dict[str, Any]: """ Get crawl contents in a specific format Retrieves extracted content from crawled pages in the format(s) specified in your crawl configuration (via content_formats parameter). :param uuid: Crawler job UUID :param format: Content format - 'html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata' :return: Dictionary with format {"contents": {url: content, ...}, "links": {...}} Example: ```python # Get all content in markdown format result = client.get_crawl_contents(uuid, format='markdown') contents = result['contents'] # Access specific URL for url, content in contents.items(): print(f"{url}: {len(content)} chars") ``` """ timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT) params = { 'key': self.key, 'format': format } response = self._http_handler( method='GET', url=f'{self.host}/crawl/{uuid}/contents', params=params, timeout=timeout, headers={'User-Agent': self.ua}, verify=self.verify ) if response.status_code != 200: self._handle_crawler_error_response(response) return response.json() def _handle_crawler_error_response(self, response: Response): """Handle error responses from Crawler API""" try: error_data = response.json() error_msg = error_data.get('message', 'Unknown error') error_code = error_data.get('code', 'ERR::CRAWLER::UNKNOWN') except Exception: error_msg = response.text error_code = 'ERR::CRAWLER::UNKNOWN' raise HttpError( message=f"Crawler API error ({response.status_code}): {error_msg}", code=error_code, http_status_code=response.status_code, request=response.request, response=response )Class variables
var CONCURRENCY_AUTOvar DATETIME_FORMATvar DEFAULT_CONNECT_TIMEOUTvar DEFAULT_CRAWLER_API_READ_TIMEOUTvar DEFAULT_EXTRACTION_API_READ_TIMEOUTvar DEFAULT_READ_TIMEOUTvar DEFAULT_SCREENSHOT_API_READ_TIMEOUTvar DEFAULT_WEBSCRAPING_API_READ_TIMEOUTvar HOSTvar brotli : boolvar connect_timeout : intvar debug : boolvar default_read_timeout : intvar distributed_mode : boolvar extraction_api_read_timeout : intvar host : strvar key : strvar max_concurrency : intvar monitoring_api_read_timeout : intvar read_timeout : intvar reporter : scrapfly.reporter.Reportervar screenshot_api_read_timeout : intvar verify : boolvar version : strvar web_scraping_api_read_timeout : int
Instance variables
prop http-
Expand source code
@property def http(self): return self._http_handler prop ua : str-
Expand source code
@property def ua(self) -> str: return 'ScrapflySDK/%s (Python %s, %s, %s)' % ( self.version, platform.python_version(), platform.uname().system, platform.uname().machine )
Methods
def account(self) ‑> str | Dict-
Expand source code
def account(self) -> Union[str, Dict]: response = self._http_handler( method='GET', url=self.host + '/account', params={'key': self.key}, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8') async def async_extraction(self,
extraction_config: ExtractionConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ExtractionApiResponse-
Expand source code
async def async_extraction(self, extraction_config:ExtractionConfig, loop:Optional[AbstractEventLoop]=None) -> ExtractionApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.extract, extraction_config) async def async_scrape(self,
scrape_config: ScrapeConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ScrapeApiResponse-
Expand source code
async def async_scrape(self, scrape_config:ScrapeConfig, loop:Optional[AbstractEventLoop]=None) -> ScrapeApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.scrape, scrape_config) async def async_screenshot(self,
screenshot_config: ScreenshotConfig,
loop: asyncio.events.AbstractEventLoop | None = None) ‑> ScreenshotApiResponse-
Expand source code
async def async_screenshot(self, screenshot_config:ScreenshotConfig, loop:Optional[AbstractEventLoop]=None) -> ScreenshotApiResponse: if loop is None: loop = asyncio.get_running_loop() return await loop.run_in_executor(self.async_executor, self.screenshot, screenshot_config) def cancel_crawl(self, crawl_uuid: str) ‑> bool-
Expand source code
def cancel_crawl(self, crawl_uuid: str) -> bool: """ Cancel a running crawler job :param crawl_uuid: Crawler job UUID to cancel :return: True if cancelled successfully Example: ```python # Start a crawl crawl = client.start_crawl(config) # Cancel it client.cancel_crawl(crawl.uuid) ``` """ timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT) response = self._http_handler( method='DELETE', url=f'{self.host}/crawl/{crawl_uuid}', params={'key': self.key}, timeout=timeout, headers={'User-Agent': self.ua}, verify=self.verify ) if response.status_code not in (200, 204): self._handle_crawler_error_response(response) return TrueCancel a running crawler job
:param crawl_uuid: Crawler job UUID to cancel :return: True if cancelled successfully
Example
# Start a crawl crawl = client.start_crawl(config) # Cancel it client.cancel_crawl(crawl.uuid) def close(self)-
Expand source code
def close(self): self.http_session.close() self.http_session = None async def concurrent_scrape(self,
scrape_configs: List[ScrapeConfig],
concurrency: int | None = None)-
Expand source code
async def concurrent_scrape(self, scrape_configs:List[ScrapeConfig], concurrency:Optional[int]=None): if concurrency is None: concurrency = self.max_concurrency elif concurrency == self.CONCURRENCY_AUTO: concurrency = self.account()['subscription']['max_concurrency'] loop = asyncio.get_running_loop() processing_tasks = [] results = [] processed_tasks = 0 expected_tasks = len(scrape_configs) def scrape_done_callback(task:Task): nonlocal processed_tasks try: if task.cancelled() is True: return error = task.exception() if error is not None: results.append(error) else: results.append(task.result()) finally: processing_tasks.remove(task) processed_tasks += 1 while scrape_configs or results or processing_tasks: logger.info("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) if scrape_configs: if len(processing_tasks) < concurrency: # @todo handle backpressure for _ in range(0, concurrency - len(processing_tasks)): try: scrape_config = scrape_configs.pop() except: break scrape_config.raise_on_upstream_error = False task = loop.create_task(self.async_scrape(scrape_config=scrape_config, loop=loop)) processing_tasks.append(task) task.add_done_callback(scrape_done_callback) for _ in results: result = results.pop() yield result await asyncio.sleep(.5) logger.debug("Scrape %d/%d - %d running" % (processed_tasks, expected_tasks, len(processing_tasks))) def extract(self,
extraction_config: ExtractionConfig,
no_raise: bool = False) ‑> ExtractionApiResponse-
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def extract(self, extraction_config:ExtractionConfig, no_raise:bool=False) -> ExtractionApiResponse: """ Extract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } } """ try: logger.debug('--> %s Extracting data from' % (extraction_config.content_type)) request_data = self._extraction_request(extraction_config=extraction_config) response = self._http_handler(**request_data) extraction_api_response = self._handle_extraction_response(response=response, extraction_config=extraction_config) return extraction_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise eExtract structured data from text content :param extraction_config: ExtractionConfig :param no_raise: bool - if True, do not raise exception on error while the extraction api response is a ScrapflyError for seamless integration :return: str
If you use no_raise=True, make sure to check the extraction_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example
'error': { 'code': 'ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED', 'message': 'The content type of the response is not supported for extraction', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/extraction-api/error/ERR::EXTRACTION::CONTENT_TYPE_NOT_SUPPORTED' } }
def get_crawl_artifact(self, uuid: str, artifact_type: str = 'warc') ‑> CrawlerArtifactResponse-
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def get_crawl_artifact( self, uuid: str, artifact_type: str = 'warc' ) -> CrawlerArtifactResponse: """ Download crawler job artifact :param uuid: Crawler job UUID :param artifact_type: Artifact type ('warc' or 'har') :return: CrawlerArtifactResponse with WARC data and parsing utilities Example: ```python # Wait for crawl to complete while True: status = client.get_crawl_status(uuid) if status.is_complete: break time.sleep(5) # Download artifact artifact = client.get_crawl_artifact(uuid) # Easy mode: get all pages pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") # Memory-efficient: iterate for record in artifact.iter_responses(): process(record.content) # Save to file artifact.save('crawl.warc.gz') ``` """ timeout = (self.connect_timeout, 300) # 5 minutes for large downloads response = self._http_handler( method='GET', url=f'{self.host}/crawl/{uuid}/artifact', params={ 'key': self.key, 'type': artifact_type }, timeout=timeout, headers={'User-Agent': self.ua}, verify=self.verify ) if response.status_code != 200: self._handle_crawler_error_response(response) return CrawlerArtifactResponse(response.content, artifact_type=artifact_type)Download crawler job artifact
:param uuid: Crawler job UUID :param artifact_type: Artifact type ('warc' or 'har') :return: CrawlerArtifactResponse with WARC data and parsing utilities
Example
# Wait for crawl to complete while True: status = client.get_crawl_status(uuid) if status.is_complete: break time.sleep(5) # Download artifact artifact = client.get_crawl_artifact(uuid) # Easy mode: get all pages pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") # Memory-efficient: iterate for record in artifact.iter_responses(): process(record.content) # Save to file artifact.save('crawl.warc.gz') def get_crawl_contents(self,
uuid: str,
format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html') ‑> Dict[str, Any]-
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def get_crawl_contents( self, uuid: str, format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html' ) -> Dict[str, Any]: """ Get crawl contents in a specific format Retrieves extracted content from crawled pages in the format(s) specified in your crawl configuration (via content_formats parameter). :param uuid: Crawler job UUID :param format: Content format - 'html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata' :return: Dictionary with format {"contents": {url: content, ...}, "links": {...}} Example: ```python # Get all content in markdown format result = client.get_crawl_contents(uuid, format='markdown') contents = result['contents'] # Access specific URL for url, content in contents.items(): print(f"{url}: {len(content)} chars") ``` """ timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT) params = { 'key': self.key, 'format': format } response = self._http_handler( method='GET', url=f'{self.host}/crawl/{uuid}/contents', params=params, timeout=timeout, headers={'User-Agent': self.ua}, verify=self.verify ) if response.status_code != 200: self._handle_crawler_error_response(response) return response.json()Get crawl contents in a specific format
Retrieves extracted content from crawled pages in the format(s) specified in your crawl configuration (via content_formats parameter).
:param uuid: Crawler job UUID :param format: Content format - 'html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata' :return: Dictionary with format {"contents": {url: content, …}, "links": {…}}
Example
# Get all content in markdown format result = client.get_crawl_contents(uuid, format='markdown') contents = result['contents'] # Access specific URL for url, content in contents.items(): print(f"{url}: {len(content)} chars") def get_crawl_status(self, uuid: str) ‑> CrawlerStatusResponse-
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def get_crawl_status(self, uuid: str) -> CrawlerStatusResponse: """ Get crawler job status :param uuid: Crawler job UUID :return: CrawlerStatusResponse with progress information Example: ```python status = client.get_crawl_status(uuid) print(f"Status: {status.status}") print(f"Progress: {status.progress_pct:.1f}%") print(f"Crawled: {status.urls_crawled}/{status.urls_discovered}") if status.is_complete: print("Crawl completed!") ``` """ timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT) response = self._http_handler( method='GET', url=f'{self.host}/crawl/{uuid}/status', params={'key': self.key}, # key as query param (already correct) timeout=timeout, headers={'User-Agent': self.ua}, verify=self.verify ) if response.status_code != 200: self._handle_crawler_error_response(response) result = response.json() return CrawlerStatusResponse(result)Get crawler job status
:param uuid: Crawler job UUID :return: CrawlerStatusResponse with progress information
Example
status = client.get_crawl_status(uuid) print(f"Status: {status.status}") print(f"Progress: {status.progress_pct:.1f}%") print(f"Crawled: {status.urls_crawled}/{status.urls_discovered}") if status.is_complete: print("Crawl completed!") def get_monitoring_metrics(self,
format: str = 'structured',
period: str | None = None,
aggregation: List[Literal['account', 'project', 'target']] | None = None)-
Expand source code
def get_monitoring_metrics(self, format:str=ScraperAPI.MONITORING_DATA_FORMAT_STRUCTURED, period:Optional[str]=None, aggregation:Optional[List[MonitoringAggregation]]=None): params = {'key': self.key, 'format': format} if period is not None: params['period'] = period if aggregation is not None: params['aggregation'] = ','.join(aggregation) response = self._http_handler( method='GET', url=self.host + '/scrape/monitoring/metrics', params=params, timeout=(self.connect_timeout, self.monitoring_api_read_timeout), verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8') def get_monitoring_target_metrics(self,
domain: str,
group_subdomain: bool = False,
period: Literal['subscription', 'last7d', 'last24h', 'last1h', 'last5m'] | None = 'last24h',
start: datetime.datetime | None = None,
end: datetime.datetime | None = None)-
Expand source code
def get_monitoring_target_metrics( self, domain:str, group_subdomain:bool=False, period:Optional[MonitoringTargetPeriod]=ScraperAPI.MONITORING_PERIOD_LAST_24H, start:Optional[datetime.datetime]=None, end:Optional[datetime.datetime]=None, ): params = { 'key': self.key, 'domain': domain, 'group_subdomain': group_subdomain } if (start is not None and end is None) or (start is None and end is not None): raise ValueError('You must provide both start and end date') if start is not None and end is not None: params['start'] = start.strftime(self.DATETIME_FORMAT) params['end'] = end.strftime(self.DATETIME_FORMAT) period = None params['period'] = period response = self._http_handler( method='GET', url=self.host + '/scrape/monitoring/metrics/target', timeout=(self.connect_timeout, self.monitoring_api_read_timeout), params=params, verify=self.verify, headers={ 'accept-encoding': self.body_handler.content_encoding, 'accept': self.body_handler.accept, 'user-agent': self.ua }, ) response.raise_for_status() if self.body_handler.support(response.headers): return self.body_handler(response.content, response.headers['content-type']) return response.content.decode('utf-8') def open(self)-
Expand source code
def open(self): if self.http_session is None: self.http_session = Session() self.http_session.verify = self.verify self.http_session.timeout = (self.connect_timeout, self.default_read_timeout) self.http_session.params['key'] = self.key self.http_session.headers['accept-encoding'] = self.body_handler.content_encoding self.http_session.headers['accept'] = self.body_handler.accept self.http_session.headers['user-agent'] = self.ua def resilient_scrape(self,
scrape_config: ScrapeConfig,
retry_on_errors: Set[Exception] = {<class 'scrapfly.errors.ScrapflyError'>},
retry_on_status_code: List[int] | None = None,
tries: int = 5,
delay: int = 20) ‑> ScrapeApiResponse-
Expand source code
def resilient_scrape( self, scrape_config:ScrapeConfig, retry_on_errors:Set[Exception]={ScrapflyError}, retry_on_status_code:Optional[List[int]]=None, tries: int = 5, delay: int = 20, ) -> ScrapeApiResponse: assert retry_on_errors is not None, 'Retry on error is None' assert isinstance(retry_on_errors, set), 'retry_on_errors is not a set()' @backoff.on_exception(backoff.expo, exception=tuple(retry_on_errors), max_tries=tries, max_time=delay) def inner() -> ScrapeApiResponse: try: return self.scrape(scrape_config=scrape_config) except (UpstreamHttpClientError, UpstreamHttpServerError) as e: if retry_on_status_code is not None and e.api_response: if e.api_response.upstream_status_code in retry_on_status_code: raise e else: return e.api_response raise e return inner() def save_scrape_screenshot(self,
api_response: ScrapeApiResponse,
name: str,
path: str | None = None)-
Expand source code
def save_scrape_screenshot(self, api_response:ScrapeApiResponse, name:str, path:Optional[str]=None): """ Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str] """ if not api_response.scrape_result['screenshots']: raise RuntimeError('Screenshot %s do no exists' % name) try: api_response.scrape_result['screenshots'][name] except KeyError: raise RuntimeError('Screenshot %s do no exists' % name) screenshot_response = self._http_handler( method='GET', url=api_response.scrape_result['screenshots'][name]['url'], params={'key': self.key}, verify=self.verify ) screenshot_response.raise_for_status() if not name.endswith('.jpg'): name += '.jpg' api_response.sink(path=path, name=name, content=screenshot_response.content)Save a screenshot from a scrape result :param api_response: ScrapeApiResponse :param name: str - name of the screenshot given in the scrape config :param path: Optional[str]
def save_screenshot(self,
screenshot_api_response: ScreenshotApiResponse,
name: str,
path: str | None = None)-
Expand source code
def save_screenshot(self, screenshot_api_response:ScreenshotApiResponse, name:str, path:Optional[str]=None): """ Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str] """ if screenshot_api_response.screenshot_success is not True: raise RuntimeError('Screenshot was not successful') if not screenshot_api_response.image: raise RuntimeError('Screenshot binary does not exist') content = screenshot_api_response.image extension_name = screenshot_api_response.metadata['extension_name'] if path: os.makedirs(path, exist_ok=True) file_path = os.path.join(path, f'{name}.{extension_name}') else: file_path = f'{name}.{extension_name}' if isinstance(content, bytes): content = BytesIO(content) with open(file_path, 'wb') as f: shutil.copyfileobj(content, f, length=131072)Save a screenshot from a screenshot API response :param api_response: ScreenshotApiResponse :param name: str - name of the screenshot to save as :param path: Optional[str]
def scrape(self,
scrape_config: ScrapeConfig,
no_raise: bool = False) ‑> ScrapeApiResponse-
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def scrape(self, scrape_config:ScrapeConfig, no_raise:bool=False) -> ScrapeApiResponse: """ Scrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } } """ try: logger.debug('--> %s Scrapping %s' % (scrape_config.method, scrape_config.url)) request_data = self._scrape_request(scrape_config=scrape_config) response = self._http_handler(**request_data) scrape_api_response = self._handle_response(response=response, scrape_config=scrape_config) self.reporter.report(scrape_api_response=scrape_api_response) return scrape_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise eScrape a website :param scrape_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the api response is a ScrapflyError for seamless integration :return: ScrapeApiResponse
If you use no_raise=True, make sure to check the api_response.scrape_result.error attribute to handle the error. If the error is not none, you will get the following structure for example
'error': { 'code': 'ERR::ASP::SHIELD_PROTECTION_FAILED', 'message': 'The ASP shield failed to solve the challenge against the anti scrapping protection - heuristic_engine bypass failed, please retry in few seconds', 'retryable': False, 'http_code': 422, 'links': { 'Checkout ASP documentation': 'https://scrapfly.io/docs/scrape-api/anti-scraping-protection#maximize_success_rate', 'Related Error Doc': 'https://scrapfly.io/docs/scrape-api/error/ERR::ASP::SHIELD_PROTECTION_FAILED' } }
def screenshot(self,
screenshot_config: ScreenshotConfig,
no_raise: bool = False) ‑> ScreenshotApiResponse-
Expand source code
@backoff.on_exception(backoff.expo, exception=NetworkError, max_tries=5) def screenshot(self, screenshot_config:ScreenshotConfig, no_raise:bool=False) -> ScreenshotApiResponse: """ Take a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example 'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } } """ try: logger.debug('--> %s Screenshoting' % (screenshot_config.url)) request_data = self._screenshot_request(screenshot_config=screenshot_config) response = self._http_handler(**request_data) screenshot_api_response = self._handle_screenshot_response(response=response, screenshot_config=screenshot_config) return screenshot_api_response except BaseException as e: self.reporter.report(error=e) if no_raise and isinstance(e, ScrapflyError) and e.api_response is not None: return e.api_response raise eTake a screenshot :param screenshot_config: ScrapeConfig :param no_raise: bool - if True, do not raise exception on error while the screenshot api response is a ScrapflyError for seamless integration :return: str
If you use no_raise=True, make sure to check the screenshot_api_response.error attribute to handle the error. If the error is not none, you will get the following structure for example
'error': { 'code': 'ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT', 'message': 'For some reason we were unable to take the screenshot', 'http_code': 422, 'links': { 'Checkout the related doc: https://scrapfly.io/docs/screenshot-api/error/ERR::SCREENSHOT::UNABLE_TO_TAKE_SCREENSHOT' } }
def sink(self,
api_response: ScrapeApiResponse,
content: str | bytes | None = None,
path: str | None = None,
name: str | None = None,
file:| _io.BytesIO | None = None) ‑> str -
Expand source code
def sink(self, api_response:ScrapeApiResponse, content:Optional[Union[str, bytes]]=None, path: Optional[str] = None, name: Optional[str] = None, file: Optional[Union[TextIO, BytesIO]] = None) -> str: scrape_result = api_response.result['result'] scrape_config = api_response.result['config'] file_content = content or scrape_result['content'] file_path = None file_extension = None if name: name_parts = name.split('.') if len(name_parts) > 1: file_extension = name_parts[-1] if not file: if file_extension is None: try: mime_type = scrape_result['response_headers']['content-type'] except KeyError: mime_type = 'application/octet-stream' if ';' in mime_type: mime_type = mime_type.split(';')[0] file_extension = '.' + mime_type.split('/')[1] if not name: name = scrape_config['url'].split('/')[-1] if name.find(file_extension) == -1: name += file_extension file_path = path + '/' + name if path else name if file_path == file_extension: url = re.sub(r'(https|http)?://', '', api_response.config['url']).replace('/', '-') if url[-1] == '-': url = url[:-1] url += file_extension file_path = url file = open(file_path, 'wb') if isinstance(file_content, str): file_content = BytesIO(file_content.encode('utf-8')) elif isinstance(file_content, bytes): file_content = BytesIO(file_content) file_content.seek(0) with file as f: shutil.copyfileobj(file_content, f, length=131072) logger.info('file %s created' % file_path) return file_path def start_crawl(self,
crawler_config: CrawlerConfig) ‑> CrawlerStartResponse-
Expand source code
@backoff.on_exception(backoff.expo, exception=ConnectionError, max_tries=5) def start_crawl(self, crawler_config: CrawlerConfig) -> CrawlerStartResponse: """ Start a crawler job :param crawler_config: CrawlerConfig :return: CrawlerStartResponse with UUID and initial status Example: ```python from scrapfly import ScrapflyClient, CrawlerConfig client = ScrapflyClient(key='YOUR_API_KEY') config = CrawlerConfig( url='https://example.com', page_limit=100, max_depth=3 ) response = client.start_crawl(config) print(f"Crawler started: {response.uuid}") ``` """ # Get crawler config params (without key) body_params = crawler_config.to_api_params() # API key must be passed as query parameter, not in body query_params = {'key': self.key} timeout = (self.connect_timeout, self.DEFAULT_CRAWLER_API_READ_TIMEOUT) url = f'{self.host}/crawl' logger.debug(f"Crawler API POST {url}?key=***") logger.debug(f"Crawler API body: {body_params}") response = self._http_handler( method='POST', url=url, params=query_params, # key as query param json=body_params, # config in body timeout=timeout, headers={'User-Agent': self.ua}, verify=self.verify ) if response.status_code not in (200, 201): # Log error details for debugging try: error_detail = response.json() except: error_detail = response.text logger.debug(f"Crawler API error ({response.status_code}): {error_detail}") self._handle_crawler_error_response(response) result = response.json() return CrawlerStartResponse(result)Start a crawler job
:param crawler_config: CrawlerConfig :return: CrawlerStartResponse with UUID and initial status
Example
from scrapfly import ScrapflyClient, CrawlerConfig client = ScrapflyClient(key='YOUR_API_KEY') config = CrawlerConfig( url='https://example.com', page_limit=100, max_depth=3 ) response = client.start_crawl(config) print(f"Crawler started: {response.uuid}")
class ScrapflyCrawlerError (message: str,
code: str,
http_status_code: int,
resource: str | None = None,
is_retryable: bool = False,
retry_delay: int | None = None,
retry_times: int | None = None,
documentation_url: str | None = None,
api_response: ForwardRef('ApiResponse') | None = None)-
Expand source code
class ScrapflyCrawlerError(CrawlerError): """Exception raised when a crawler job fails or is cancelled""" passException raised when a crawler job fails or is cancelled
Ancestors
- CrawlerError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyError (message: str,
code: str,
http_status_code: int,
resource: str | None = None,
is_retryable: bool = False,
retry_delay: int | None = None,
retry_times: int | None = None,
documentation_url: str | None = None,
api_response: ForwardRef('ApiResponse') | None = None)-
Expand source code
class ScrapflyError(Exception): KIND_HTTP_BAD_RESPONSE = 'HTTP_BAD_RESPONSE' KIND_SCRAPFLY_ERROR = 'SCRAPFLY_ERROR' RESOURCE_PROXY = 'PROXY' RESOURCE_THROTTLE = 'THROTTLE' RESOURCE_SCRAPE = 'SCRAPE' RESOURCE_ASP = 'ASP' RESOURCE_SCHEDULE = 'SCHEDULE' RESOURCE_WEBHOOK = 'WEBHOOK' RESOURCE_SESSION = 'SESSION' def __init__( self, message: str, code: str, http_status_code: int, resource: Optional[str]=None, is_retryable: bool = False, retry_delay: Optional[int] = None, retry_times: Optional[int] = None, documentation_url: Optional[str] = None, api_response: Optional['ApiResponse'] = None ): self.message = message self.code = code self.retry_delay = retry_delay self.retry_times = retry_times self.resource = resource self.is_retryable = is_retryable self.documentation_url = documentation_url self.api_response = api_response self.http_status_code = http_status_code super().__init__(self.message, str(self.code)) def __str__(self): message = self.message if self.documentation_url is not None: message += '. Learn more: %s' % self.documentation_url return messageCommon base class for all non-exit exceptions.
Ancestors
- builtins.Exception
- builtins.BaseException
Subclasses
- CrawlerError
- scrapfly.errors.ExtraUsageForbidden
- scrapfly.errors.HttpError
Class variables
var KIND_HTTP_BAD_RESPONSEvar KIND_SCRAPFLY_ERRORvar RESOURCE_ASPvar RESOURCE_PROXYvar RESOURCE_SCHEDULEvar RESOURCE_SCRAPEvar RESOURCE_SESSIONvar RESOURCE_THROTTLEvar RESOURCE_WEBHOOK
class ScrapflyProxyError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflyProxyError(ScraperAPIError): passCommon base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyScheduleError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflyScheduleError(ScraperAPIError): passCommon base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyScrapeError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflyScrapeError(ScraperAPIError): passCommon base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflySessionError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflySessionError(ScraperAPIError): passCommon base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyThrottleError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflyThrottleError(ScraperAPIError): passCommon base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScrapflyWebhookError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScrapflyWebhookError(ScraperAPIError): passCommon base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.ScraperAPIError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScreenshotAPIError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class ScreenshotAPIError(HttpError): passCommon base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class ScreenshotApiResponse (request: requests.models.Request,
response: requests.models.Response,
screenshot_config: ScreenshotConfig,
api_result: bytes | None = None)-
Expand source code
class ScreenshotApiResponse(ApiResponse): def __init__(self, request: Request, response: Response, screenshot_config: ScreenshotConfig, api_result: Optional[bytes] = None): super().__init__(request, response) self.screenshot_config = screenshot_config self.result = self.handle_api_result(api_result) @property def image(self) -> Optional[str]: binary = self.result.get('result', None) if binary is None: return '' return binary @property def metadata(self) -> Optional[Dict]: if not self.image: return {} content_type = self.response.headers.get('content-type') extension_name = content_type[content_type.find('/') + 1:].split(';')[0] return { 'extension_name': extension_name, 'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'), 'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url') } @property def screenshot_success(self) -> bool: if not self.image: return False return True @property def error(self) -> Optional[Dict]: if self.image: return None if self.screenshot_success is False: return self.result def _is_api_error(self, api_result: Dict) -> bool: if api_result is None: return True return 'error_id' in api_result def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return api_result def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)Ancestors
Instance variables
prop error : Dict | None-
Expand source code
@property def error(self) -> Optional[Dict]: if self.image: return None if self.screenshot_success is False: return self.result prop image : str | None-
Expand source code
@property def image(self) -> Optional[str]: binary = self.result.get('result', None) if binary is None: return '' return binary prop metadata : Dict | None-
Expand source code
@property def metadata(self) -> Optional[Dict]: if not self.image: return {} content_type = self.response.headers.get('content-type') extension_name = content_type[content_type.find('/') + 1:].split(';')[0] return { 'extension_name': extension_name, 'upstream-status-code': self.response.headers.get('X-Scrapfly-Upstream-Http-Code'), 'upstream-url': self.response.headers.get('X-Scrapfly-Upstream-Url') } prop screenshot_success : bool-
Expand source code
@property def screenshot_success(self) -> bool: if not self.image: return False return True
Methods
def handle_api_result(self, api_result: bytes) ‑> FrozenDict-
Expand source code
def handle_api_result(self, api_result: bytes) -> FrozenDict: if self._is_api_error(api_result=api_result) is True: return FrozenDict(api_result) return api_result def raise_for_result(self,
raise_on_upstream_error=True,
error_class=scrapfly.errors.ScreenshotAPIError)-
Expand source code
def raise_for_result(self, raise_on_upstream_error=True, error_class=ScreenshotAPIError): super().raise_for_result(raise_on_upstream_error=raise_on_upstream_error, error_class=error_class)
Inherited members
class ScreenshotConfig (url: str,
format: Format | None = None,
capture: str | None = None,
resolution: str | None = None,
country: str | None = None,
timeout: int | None = None,
rendering_wait: int | None = None,
wait_for_selector: str | None = None,
options: List[Options] | None = None,
auto_scroll: bool | None = None,
js: str | None = None,
cache: bool | None = None,
cache_ttl: bool | None = None,
cache_clear: bool | None = None,
vision_deficiency: VisionDeficiency | None = None,
webhook: str | None = None,
raise_on_upstream_error: bool = True)-
Expand source code
class ScreenshotConfig(BaseApiConfig): url: str format: Optional[Format] = None capture: Optional[str] = None resolution: Optional[str] = None country: Optional[str] = None timeout: Optional[int] = None # in milliseconds rendering_wait: Optional[int] = None # in milliseconds wait_for_selector: Optional[str] = None options: Optional[List[Options]] = None auto_scroll: Optional[bool] = None js: Optional[str] = None cache: Optional[bool] = None cache_ttl: Optional[bool] = None cache_clear: Optional[bool] = None webhook: Optional[str] = None raise_on_upstream_error: bool = True def __init__( self, url: str, format: Optional[Format] = None, capture: Optional[str] = None, resolution: Optional[str] = None, country: Optional[str] = None, timeout: Optional[int] = None, # in milliseconds rendering_wait: Optional[int] = None, # in milliseconds wait_for_selector: Optional[str] = None, options: Optional[List[Options]] = None, auto_scroll: Optional[bool] = None, js: Optional[str] = None, cache: Optional[bool] = None, cache_ttl: Optional[bool] = None, cache_clear: Optional[bool] = None, vision_deficiency: Optional[VisionDeficiency] = None, webhook: Optional[str] = None, raise_on_upstream_error: bool = True ): assert(type(url) is str) self.url = url self.key = None self.format = format self.capture = capture self.resolution = resolution self.country = country self.timeout = timeout self.rendering_wait = rendering_wait self.wait_for_selector = wait_for_selector self.options = [Options(flag) for flag in options] if options else None self.auto_scroll = auto_scroll self.js = js self.cache = cache self.cache_ttl = cache_ttl self.cache_clear = cache_clear self.vision_deficiency = vision_deficiency self.webhook = webhook self.raise_on_upstream_error = raise_on_upstream_error def to_api_params(self, key:str) -> Dict: params = { 'key': self.key or key, 'url': self.url } if self.format: params['format'] = Format(self.format).value if self.capture: params['capture'] = self.capture if self.resolution: params['resolution'] = self.resolution if self.country is not None: params['country'] = self.country if self.timeout is not None: params['timeout'] = self.timeout if self.rendering_wait is not None: params['rendering_wait'] = self.rendering_wait if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.options is not None: params["options"] = ",".join(flag.value for flag in self.options) if self.auto_scroll is not None: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.cache is not None: params['cache'] = self._bool_to_http(self.cache) if self.cache_ttl is not None: params['cache_ttl'] = self._bool_to_http(self.cache_ttl) if self.cache_clear is not None: params['cache_clear'] = self._bool_to_http(self.cache_clear) else: if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.cache_clear is not None: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.vision_deficiency is not None: params['vision_deficiency'] = self.vision_deficiency.value if self.webhook is not None: params['webhook_name'] = self.webhook return params def to_dict(self) -> Dict: """ Export the ScreenshotConfig instance to a plain dictionary. """ return { 'url': self.url, 'format': Format(self.format).value if self.format else None, 'capture': self.capture, 'resolution': self.resolution, 'country': self.country, 'timeout': self.timeout, 'rendering_wait': self.rendering_wait, 'wait_for_selector': self.wait_for_selector, 'options': [Options(option).value for option in self.options] if self.options else None, 'auto_scroll': self.auto_scroll, 'js': self.js, 'cache': self.cache, 'cache_ttl': self.cache_ttl, 'cache_clear': self.cache_clear, 'vision_deficiency': self.vision_deficiency.value if self.vision_deficiency else None, 'webhook': self.webhook, 'raise_on_upstream_error': self.raise_on_upstream_error } @staticmethod def from_dict(screenshot_config_dict: Dict) -> 'ScreenshotConfig': """Create a ScreenshotConfig instance from a dictionary.""" url = screenshot_config_dict.get('url', None) format = screenshot_config_dict.get('format', None) format = Format(format) if format else None capture = screenshot_config_dict.get('capture', None) resolution = screenshot_config_dict.get('resolution', None) country = screenshot_config_dict.get('country', None) timeout = screenshot_config_dict.get('timeout', None) rendering_wait = screenshot_config_dict.get('rendering_wait', None) wait_for_selector = screenshot_config_dict.get('wait_for_selector', None) options = screenshot_config_dict.get('options', None) options = [Options(option) for option in options] if options else None auto_scroll = screenshot_config_dict.get('auto_scroll', None) js = screenshot_config_dict.get('js', None) cache = screenshot_config_dict.get('cache', None) cache_ttl = screenshot_config_dict.get('cache_ttl', None) cache_clear = screenshot_config_dict.get('cache_clear', None) vision_deficiency = screenshot_config_dict.get('vision_deficiency', None) webhook = screenshot_config_dict.get('webhook', None) raise_on_upstream_error = screenshot_config_dict.get('raise_on_upstream_error', True) return ScreenshotConfig( url=url, format=format, capture=capture, resolution=resolution, country=country, timeout=timeout, rendering_wait=rendering_wait, wait_for_selector=wait_for_selector, options=options, auto_scroll=auto_scroll, js=js, cache=cache, cache_ttl=cache_ttl, cache_clear=cache_clear, vision_deficiency=vision_deficiency, webhook=webhook, raise_on_upstream_error=raise_on_upstream_error )Ancestors
Class variables
var auto_scroll : bool | Nonevar cache : bool | Nonevar cache_clear : bool | Nonevar cache_ttl : bool | Nonevar capture : str | Nonevar country : str | Nonevar format : Format | Nonevar js : str | Nonevar options : List[Options] | Nonevar raise_on_upstream_error : boolvar rendering_wait : int | Nonevar resolution : str | Nonevar timeout : int | Nonevar url : strvar wait_for_selector : str | Nonevar webhook : str | None
Static methods
def from_dict(screenshot_config_dict: Dict) ‑> ScreenshotConfig-
Expand source code
@staticmethod def from_dict(screenshot_config_dict: Dict) -> 'ScreenshotConfig': """Create a ScreenshotConfig instance from a dictionary.""" url = screenshot_config_dict.get('url', None) format = screenshot_config_dict.get('format', None) format = Format(format) if format else None capture = screenshot_config_dict.get('capture', None) resolution = screenshot_config_dict.get('resolution', None) country = screenshot_config_dict.get('country', None) timeout = screenshot_config_dict.get('timeout', None) rendering_wait = screenshot_config_dict.get('rendering_wait', None) wait_for_selector = screenshot_config_dict.get('wait_for_selector', None) options = screenshot_config_dict.get('options', None) options = [Options(option) for option in options] if options else None auto_scroll = screenshot_config_dict.get('auto_scroll', None) js = screenshot_config_dict.get('js', None) cache = screenshot_config_dict.get('cache', None) cache_ttl = screenshot_config_dict.get('cache_ttl', None) cache_clear = screenshot_config_dict.get('cache_clear', None) vision_deficiency = screenshot_config_dict.get('vision_deficiency', None) webhook = screenshot_config_dict.get('webhook', None) raise_on_upstream_error = screenshot_config_dict.get('raise_on_upstream_error', True) return ScreenshotConfig( url=url, format=format, capture=capture, resolution=resolution, country=country, timeout=timeout, rendering_wait=rendering_wait, wait_for_selector=wait_for_selector, options=options, auto_scroll=auto_scroll, js=js, cache=cache, cache_ttl=cache_ttl, cache_clear=cache_clear, vision_deficiency=vision_deficiency, webhook=webhook, raise_on_upstream_error=raise_on_upstream_error )Create a ScreenshotConfig instance from a dictionary.
Methods
def to_api_params(self, key: str) ‑> Dict-
Expand source code
def to_api_params(self, key:str) -> Dict: params = { 'key': self.key or key, 'url': self.url } if self.format: params['format'] = Format(self.format).value if self.capture: params['capture'] = self.capture if self.resolution: params['resolution'] = self.resolution if self.country is not None: params['country'] = self.country if self.timeout is not None: params['timeout'] = self.timeout if self.rendering_wait is not None: params['rendering_wait'] = self.rendering_wait if self.wait_for_selector is not None: params['wait_for_selector'] = self.wait_for_selector if self.options is not None: params["options"] = ",".join(flag.value for flag in self.options) if self.auto_scroll is not None: params['auto_scroll'] = self._bool_to_http(self.auto_scroll) if self.js: params['js'] = base64.urlsafe_b64encode(self.js.encode('utf-8')).decode('utf-8') if self.cache is not None: params['cache'] = self._bool_to_http(self.cache) if self.cache_ttl is not None: params['cache_ttl'] = self._bool_to_http(self.cache_ttl) if self.cache_clear is not None: params['cache_clear'] = self._bool_to_http(self.cache_clear) else: if self.cache_ttl is not None: logging.warning('Params "cache_ttl" is ignored. Works only if cache is enabled') if self.cache_clear is not None: logging.warning('Params "cache_clear" is ignored. Works only if cache is enabled') if self.vision_deficiency is not None: params['vision_deficiency'] = self.vision_deficiency.value if self.webhook is not None: params['webhook_name'] = self.webhook return params def to_dict(self) ‑> Dict-
Expand source code
def to_dict(self) -> Dict: """ Export the ScreenshotConfig instance to a plain dictionary. """ return { 'url': self.url, 'format': Format(self.format).value if self.format else None, 'capture': self.capture, 'resolution': self.resolution, 'country': self.country, 'timeout': self.timeout, 'rendering_wait': self.rendering_wait, 'wait_for_selector': self.wait_for_selector, 'options': [Options(option).value for option in self.options] if self.options else None, 'auto_scroll': self.auto_scroll, 'js': self.js, 'cache': self.cache, 'cache_ttl': self.cache_ttl, 'cache_clear': self.cache_clear, 'vision_deficiency': self.vision_deficiency.value if self.vision_deficiency else None, 'webhook': self.webhook, 'raise_on_upstream_error': self.raise_on_upstream_error }Export the ScreenshotConfig instance to a plain dictionary.
class UpstreamHttpClientError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class UpstreamHttpClientError(UpstreamHttpError): passCommon base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.UpstreamHttpError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
class UpstreamHttpError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class UpstreamHttpError(HttpError): passCommon base class for all non-exit exceptions.
Ancestors
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
Subclasses
class UpstreamHttpServerError (request: requests.models.Request,
response: requests.models.Response | None = None,
**kwargs)-
Expand source code
class UpstreamHttpServerError(UpstreamHttpClientError): passCommon base class for all non-exit exceptions.
Ancestors
- UpstreamHttpClientError
- scrapfly.errors.UpstreamHttpError
- scrapfly.errors.HttpError
- ScrapflyError
- builtins.Exception
- builtins.BaseException
class WarcParser (warc_data: bytes |) -
Expand source code
class WarcParser: """ Parser for WARC files with automatic decompression Provides methods to iterate through WARC records and extract page data. Example: ```python # From bytes parser = WarcParser(warc_bytes) # Iterate all records for record in parser.iter_records(): print(f"{record.url}: {record.status_code}") # Get only HTTP responses for record in parser.iter_responses(): print(f"Page: {record.url}") html = record.content.decode('utf-8') # Get all pages as simple dicts pages = parser.get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") ``` """ def __init__(self, warc_data: Union[bytes, BinaryIO]): """ Initialize WARC parser Args: warc_data: WARC data as bytes or file-like object (supports both gzip-compressed and uncompressed) """ if isinstance(warc_data, bytes): # Try to decompress if gzipped if warc_data[:2] == b'\x1f\x8b': # gzip magic number try: warc_data = gzip.decompress(warc_data) except Exception: pass # Not gzipped or decompression failed self._data = BytesIO(warc_data) else: self._data = warc_data def iter_records(self) -> Iterator[WarcRecord]: """ Iterate through all WARC records Yields: WarcRecord: Each record in the WARC file """ self._data.seek(0) while True: # Read WARC version line version_line = self._read_line() if not version_line or not version_line.startswith(b'WARC/'): break # Read WARC headers warc_headers = self._read_headers() if not warc_headers: break # Get content length content_length = int(warc_headers.get('Content-Length', 0)) # Read content block content_block = self._data.read(content_length) # Skip trailing newlines self._read_line() self._read_line() # Parse the record record = self._parse_record(warc_headers, content_block) if record: yield record def iter_responses(self) -> Iterator[WarcRecord]: """ Iterate through HTTP response records only Filters out non-response records (requests, metadata, etc.) Yields: WarcRecord: HTTP response records only """ for record in self.iter_records(): if record.record_type == 'response' and record.status_code: yield record def get_pages(self) -> List[Dict]: """ Get all crawled pages as simple dictionaries This is the easiest way to access crawl results without dealing with WARC format details. Returns: List of dicts with keys: url, status_code, headers, content Example: ```python pages = parser.get_pages() for page in pages: print(f"{page['url']}: {len(page['content'])} bytes") html = page['content'].decode('utf-8') ``` """ pages = [] for record in self.iter_responses(): pages.append({ 'url': record.url, 'status_code': record.status_code, 'headers': record.headers, 'content': record.content }) return pages def _read_line(self) -> bytes: """Read a single line from the WARC file""" line = self._data.readline() return line.rstrip(b'\r\n') def _read_headers(self) -> Dict[str, str]: """Read headers until empty line""" headers = {} while True: line = self._read_line() if not line: break # Parse header line if b':' in line: key, value = line.split(b':', 1) headers[key.decode('utf-8').strip()] = value.decode('utf-8').strip() return headers def _parse_record(self, warc_headers: Dict[str, str], content_block: bytes) -> Optional[WarcRecord]: """Parse a WARC record from headers and content""" record_type = warc_headers.get('WARC-Type', '') url = warc_headers.get('WARC-Target-URI', '') if record_type == 'response': # Parse HTTP response http_headers, body = self._parse_http_response(content_block) status_code = self._extract_status_code(content_block) return WarcRecord( record_type=record_type, url=url, headers=http_headers, content=body, status_code=status_code, warc_headers=warc_headers ) elif record_type in ['request', 'metadata', 'warcinfo']: # Other record types - store raw content return WarcRecord( record_type=record_type, url=url, headers={}, content=content_block, status_code=None, warc_headers=warc_headers ) return None def _parse_http_response(self, content_block: bytes) -> tuple: """Parse HTTP response into headers and body""" try: # Split on double newline (end of headers) parts = content_block.split(b'\r\n\r\n', 1) if len(parts) < 2: parts = content_block.split(b'\n\n', 1) if len(parts) == 2: header_section, body = parts else: header_section, body = content_block, b'' # Parse headers headers = {} lines = header_section.split(b'\r\n') if b'\r\n' in header_section else header_section.split(b'\n') # Skip status line for line in lines[1:]: if b':' in line: key, value = line.split(b':', 1) headers[key.decode('utf-8', errors='ignore').strip()] = value.decode('utf-8', errors='ignore').strip() return headers, body except Exception: return {}, content_block def _extract_status_code(self, content_block: bytes) -> Optional[int]: """Extract HTTP status code from response""" try: # Look for HTTP status line (e.g., "HTTP/1.1 200 OK") first_line = content_block.split(b'\r\n', 1)[0] if b'\r\n' in content_block else content_block.split(b'\n', 1)[0] match = re.match(rb'HTTP/\d\.\d (\d+)', first_line) if match: return int(match.group(1)) except Exception: pass return NoneParser for WARC files with automatic decompression
Provides methods to iterate through WARC records and extract page data.
Example
# From bytes parser = WarcParser(warc_bytes) # Iterate all records for record in parser.iter_records(): print(f"{record.url}: {record.status_code}") # Get only HTTP responses for record in parser.iter_responses(): print(f"Page: {record.url}") html = record.content.decode('utf-8') # Get all pages as simple dicts pages = parser.get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}")Initialize WARC parser
Args
warc_data- WARC data as bytes or file-like object (supports both gzip-compressed and uncompressed)
Methods
def get_pages(self) ‑> List[Dict]-
Expand source code
def get_pages(self) -> List[Dict]: """ Get all crawled pages as simple dictionaries This is the easiest way to access crawl results without dealing with WARC format details. Returns: List of dicts with keys: url, status_code, headers, content Example: ```python pages = parser.get_pages() for page in pages: print(f"{page['url']}: {len(page['content'])} bytes") html = page['content'].decode('utf-8') ``` """ pages = [] for record in self.iter_responses(): pages.append({ 'url': record.url, 'status_code': record.status_code, 'headers': record.headers, 'content': record.content }) return pagesGet all crawled pages as simple dictionaries
This is the easiest way to access crawl results without dealing with WARC format details.
Returns
Listofdicts with keys- url, status_code, headers, content
Example
pages = parser.get_pages() for page in pages: print(f"{page['url']}: {len(page['content'])} bytes") html = page['content'].decode('utf-8') def iter_records(self) ‑> Iterator[WarcRecord]-
Expand source code
def iter_records(self) -> Iterator[WarcRecord]: """ Iterate through all WARC records Yields: WarcRecord: Each record in the WARC file """ self._data.seek(0) while True: # Read WARC version line version_line = self._read_line() if not version_line or not version_line.startswith(b'WARC/'): break # Read WARC headers warc_headers = self._read_headers() if not warc_headers: break # Get content length content_length = int(warc_headers.get('Content-Length', 0)) # Read content block content_block = self._data.read(content_length) # Skip trailing newlines self._read_line() self._read_line() # Parse the record record = self._parse_record(warc_headers, content_block) if record: yield record def iter_responses(self) ‑> Iterator[WarcRecord]-
Expand source code
def iter_responses(self) -> Iterator[WarcRecord]: """ Iterate through HTTP response records only Filters out non-response records (requests, metadata, etc.) Yields: WarcRecord: HTTP response records only """ for record in self.iter_records(): if record.record_type == 'response' and record.status_code: yield recordIterate through HTTP response records only
Filters out non-response records (requests, metadata, etc.)
Yields
WarcRecord- HTTP response records only
class WarcRecord (record_type: str,
url: str,
headers: Dict[str, str],
content: bytes,
status_code: int | None,
warc_headers: Dict[str, str])-
Expand source code
@dataclass class WarcRecord: """ Represents a single WARC record A WARC file contains multiple records, each representing a captured HTTP transaction or metadata. """ record_type: str # Type of record (response, request, metadata, etc.) url: str # Associated URL headers: Dict[str, str] # HTTP headers content: bytes # Response body/content status_code: Optional[int] # HTTP status code (for response records) warc_headers: Dict[str, str] # WARC-specific headers def __repr__(self): return f"WarcRecord(type={self.record_type}, url={self.url}, status={self.status_code})"Represents a single WARC record
A WARC file contains multiple records, each representing a captured HTTP transaction or metadata.
Instance variables
var content : bytesvar headers : Dict[str, str]var record_type : strvar status_code : int | Nonevar url : strvar warc_headers : Dict[str, str]