Module scrapfly.crawler
Scrapfly Crawler API
This package contains all components for the Crawler API: - Crawl management (Crawl class) - Configuration (CrawlerConfig) - Response types (CrawlerStartResponse, CrawlerStatusResponse, CrawlerArtifactResponse) - Artifact parsing (WARC, HAR) - Webhook handling
Sub-modules
scrapfly.crawler.crawl-
Crawl Object - High-level abstraction for Crawler API …
scrapfly.crawler.crawl_content-
CrawlContent - Response object for crawled URLs …
scrapfly.crawler.crawler_config-
Crawler API Configuration …
scrapfly.crawler.crawler_response-
Crawler API Response Classes …
scrapfly.crawler.crawler_webhook-
Crawler API Webhook Models …
scrapfly.crawler.har_utils-
HAR (HTTP Archive) Format Utilities …
scrapfly.crawler.warc_utils-
WARC Parsing Utilities …
Functions
def parse_warc(warc_data: bytes |) ‑> WarcParser -
Expand source code
def parse_warc(warc_data: Union[bytes, BinaryIO]) -> WarcParser: """ Convenience function to create a WARC parser Args: warc_data: WARC data as bytes or file-like object Returns: WarcParser: Parser instance Example: ```python from scrapfly import parse_warc # Quick way to get all pages pages = parse_warc(warc_bytes).get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") ``` """ return WarcParser(warc_data)Convenience function to create a WARC parser
Args
warc_data- WARC data as bytes or file-like object
Returns
WarcParser- Parser instance
Example
from scrapfly import parse_warc # Quick way to get all pages pages = parse_warc(warc_bytes).get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") def webhook_from_payload(payload: Dict[str, Any],
signing_secrets: Tuple[str, ...] | None = None,
signature: str | None = None) ‑> CrawlerLifecycleWebhook | CrawlerUrlVisitedWebhook | CrawlerUrlSkippedWebhook | CrawlerUrlDiscoveredWebhook | CrawlerUrlFailedWebhook-
Expand source code
def webhook_from_payload( payload: Dict[str, Any], signing_secrets: Optional[Tuple[str, ...]] = None, signature: Optional[str] = None, ) -> CrawlerWebhook: """ Parse a raw crawler webhook envelope into a typed dataclass. The envelope shape is ``{"event": <name>, "payload": {...}}``. This function inspects ``event`` and returns the corresponding typed dataclass — one of :data:`CrawlerWebhook`. Args: payload: The full webhook body as a dict (i.e. what you get from ``request.json``). signing_secrets: Optional tuple of signing secrets for signature verification. Pass each secret as it appears in the webhook dashboard (UTF-8 string, not hex-encoded). signature: Optional webhook signature header value (``X-Scrapfly-Webhook-Signature``). Returns: A typed webhook instance matching the event. Raises: KeyError: If the envelope is missing required fields. ValueError: If ``event`` is not one of the known crawler events. WebhookSignatureMissMatch: If signature verification fails. Example: >>> from flask import Flask, request >>> from scrapfly import webhook_from_payload, CrawlerLifecycleWebhook >>> app = Flask(__name__) >>> @app.route('/webhook', methods=['POST']) ... def handle_webhook(): ... wh = webhook_from_payload( ... request.json, ... signing_secrets=('YOUR-WEBHOOK-SIGNING-SECRET',), ... signature=request.headers.get('X-Scrapfly-Webhook-Signature'), ... ) ... if isinstance(wh, CrawlerLifecycleWebhook) and wh.event == 'crawler_finished': ... print(f"Crawl {wh.crawler_uuid} finished — " ... f"{wh.state.urls_visited} URLs visited") ... return '', 200 """ if signing_secrets and signature: from json import dumps from ..api_response import ResponseBodyHandler from ..errors import WebhookSignatureMissMatch handler = ResponseBodyHandler(signing_secrets=signing_secrets) message = dumps(payload, separators=(',', ':')).encode('utf-8') if not handler.verify(message, signature): raise WebhookSignatureMissMatch() event = payload['event'] inner = payload['payload'] parser = _DISPATCH.get(event) if parser is None: raise ValueError( f"Unknown crawler webhook event: {event!r}. " f"Expected one of: {sorted(_DISPATCH.keys())}" ) return parser.from_payload(event, inner)Parse a raw crawler webhook envelope into a typed dataclass.
The envelope shape is
{"event": <name>, "payload": {...}}. This function inspectseventand returns the corresponding typed dataclass — one of :data:CrawlerWebhook.Args
payload- The full webhook body as a dict (i.e. what you get from
request.json). signing_secrets- Optional tuple of signing secrets for signature verification. Pass each secret as it appears in the webhook dashboard (UTF-8 string, not hex-encoded).
signature- Optional webhook signature header value
(
X-Scrapfly-Webhook-Signature).
Returns
A typed webhook instance matching the event.
Raises
KeyError- If the envelope is missing required fields.
ValueError- If
eventis not one of the known crawler events. WebhookSignatureMissMatch- If signature verification fails.
Example
>>> from flask import Flask, request >>> from scrapfly import webhook_from_payload, CrawlerLifecycleWebhook >>> app = Flask(__name__) >>> @app.route('/webhook', methods=['POST']) ... def handle_webhook(): ... wh = webhook_from_payload( ... request.json, ... signing_secrets=('YOUR-WEBHOOK-SIGNING-SECRET',), ... signature=request.headers.get('X-Scrapfly-Webhook-Signature'), ... ) ... if isinstance(wh, CrawlerLifecycleWebhook) and wh.event == 'crawler_finished': ... print(f"Crawl {wh.crawler_uuid} finished — " ... f"{wh.state.urls_visited} URLs visited") ... return '', 200
Classes
class Crawl (client: ScrapflyClient,
config: CrawlerConfig)-
Expand source code
class Crawl: """ High-level abstraction for managing a crawler job The Crawl object maintains the state of a crawler job and provides convenient methods for managing its lifecycle. Example: ```python from scrapfly import ScrapflyClient, CrawlerConfig, Crawl client = ScrapflyClient(key='your-key') config = CrawlerConfig(url='https://example.com', page_limit=10) # Create and start crawl crawl = Crawl(client, config) crawl.crawl() # Start the crawler # Wait for completion crawl.wait() # Get results pages = crawl.warc().get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") # Or read specific URLs html = crawl.read('https://example.com/page1', format='html') ``` """ def __init__(self, client: 'ScrapflyClient', config: CrawlerConfig): """ Initialize a Crawl object Args: client: ScrapflyClient instance config: CrawlerConfig with crawler settings """ self._client = client self._config = config self._uuid: Optional[str] = None self._status_cache: Optional[CrawlerStatusResponse] = None self._artifact_cache: Optional[CrawlerArtifactResponse] = None @property def uuid(self) -> Optional[str]: """Get the crawler job UUID (None if not started)""" return self._uuid @property def started(self) -> bool: """Check if the crawler has been started""" return self._uuid is not None def crawl(self) -> 'Crawl': """ Start the crawler job Returns: Self for method chaining Raises: RuntimeError: If crawler already started Example: ```python crawl = Crawl(client, config) crawl.crawl() # Start crawling ``` """ if self._uuid is not None: raise ScrapflyCrawlerError( message="Crawler already started", code="ALREADY_STARTED", http_status_code=400 ) response = self._client.start_crawl(self._config) self._uuid = response.uuid return self def status(self, refresh: bool = True) -> CrawlerStatusResponse: """ Get current crawler status Args: refresh: If True, fetch fresh status from API. If False, return cached status. Returns: CrawlerStatusResponse with current status Raises: RuntimeError: If crawler not started yet Example: ```python status = crawl.status() print(f"Progress: {status.progress_pct}%") print(f"URLs visited: {status.state.urls_visited}") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) if refresh or self._status_cache is None: self._status_cache = self._client.get_crawl_status(self._uuid) return self._status_cache def wait( self, poll_interval: int = 5, max_wait: Optional[int] = None, verbose: bool = False, allow_cancelled: bool = False, ) -> 'Crawl': """ Wait for crawler to complete Polls the status endpoint until the crawler finishes. Args: poll_interval: Seconds between status checks (default: 5) max_wait: Maximum seconds to wait (None = wait forever) verbose: If True, print progress updates allow_cancelled: If True, return normally when the crawler reaches CANCELLED instead of raising. Useful for the cancel-then-wait pattern where the caller already knows they triggered the cancellation. Defaults to False (raises ScrapflyCrawlerError with code='CANCELLED' on user_cancelled), preserving prior behavior for callers that observe external cancellations. Returns: Self for method chaining Raises: ScrapflyCrawlerError: If crawler not started, failed, or timed out. Also raised on cancellation when ``allow_cancelled=False``. Example: ```python # Wait with progress updates crawl.crawl().wait(verbose=True) # Wait with timeout crawl.crawl().wait(max_wait=300) # 5 minutes max # Cancel from the same call site, then wait without re-raising crawl.cancel() crawl.wait(allow_cancelled=True) ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) start_time = time.time() poll_count = 0 while True: status = self.status(refresh=True) poll_count += 1 if verbose: logger.info(f"Poll #{poll_count}: {status.status} - " f"{status.progress_pct:.1f}% - " f"{status.state.urls_visited}/{status.state.urls_extracted} URLs") if status.is_complete: if verbose: logger.info(f"✓ Crawler completed successfully!") return self elif status.is_failed: raise ScrapflyCrawlerError( message=f"Crawler failed with status: {status.status}", code="FAILED", http_status_code=400 ) elif status.is_cancelled: if allow_cancelled: if verbose: logger.info("Crawler was cancelled (allow_cancelled=True)") return self raise ScrapflyCrawlerError( message="Crawler was cancelled", code="CANCELLED", http_status_code=400 ) # Check timeout if max_wait is not None: elapsed = time.time() - start_time if elapsed > max_wait: raise ScrapflyCrawlerError( message=f"Timeout waiting for crawler (>{max_wait}s)", code="TIMEOUT", http_status_code=400 ) time.sleep(poll_interval) def cancel(self) -> bool: """ Cancel the running crawler job Returns: True if cancelled successfully Raises: ScrapflyCrawlerError: If crawler not started yet Example: ```python # Start a crawl crawl = Crawl(client, config).crawl() # Cancel it crawl.cancel() ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) return self._client.cancel_crawl(self._uuid) def urls( self, status: Optional[Literal['visited', 'pending', 'failed']] = None, page: int = 1, per_page: int = 100, ) -> CrawlerUrlsResponse: """ List the crawled URLs (paginated, optionally filtered by status). NEW in 0.8.28 — convenience wrapper around :meth:`ScrapflyClient.get_crawl_urls` that pre-fills the crawler UUID. Args: status: Filter by URL status — 'visited', 'pending', or 'failed'. When None, the server defaults to 'visited'. page: 1-based page number (default 1) per_page: Page size (default 100, max 1000) Returns: CrawlerUrlsResponse with the URL records, total count and pagination metadata. Raises: ScrapflyCrawlerError: if the crawler has not been started yet. Example: ```python crawl = Crawl(client, config).crawl().wait() for entry in crawl.urls(status='visited'): print(f"{entry.url} ({entry.status})") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400, ) return self._client.get_crawl_urls( uuid=self._uuid, status=status, page=page, per_page=per_page, ) def warc(self, artifact_type: str = 'warc') -> CrawlerArtifactResponse: """ Download the crawler artifact (WARC file) Args: artifact_type: Type of artifact to download (default: 'warc') Returns: CrawlerArtifactResponse with parsed WARC data Raises: RuntimeError: If crawler not started yet Example: ```python # Get WARC artifact artifact = crawl.warc() # Get all pages pages = artifact.get_pages() # Iterate through responses for record in artifact.iter_responses(): print(record.url) ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) if self._artifact_cache is None: self._artifact_cache = self._client.get_crawl_artifact( self._uuid, artifact_type=artifact_type ) return self._artifact_cache def har(self) -> CrawlerArtifactResponse: """ Download the crawler artifact in HAR (HTTP Archive) format Returns: CrawlerArtifactResponse with parsed HAR data Raises: RuntimeError: If crawler not started yet Example: ```python # Get HAR artifact artifact = crawl.har() # Get all pages pages = artifact.get_pages() # Iterate through HAR entries for entry in artifact.iter_responses(): print(f"{entry.url}: {entry.status_code}") print(f"Timing: {entry.time}ms") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) return self._client.get_crawl_artifact( self._uuid, artifact_type='har' ) def read(self, url: str, format: ContentFormat = 'html') -> Optional[CrawlContent]: """ Read content from a specific URL in the crawl results Args: url: The URL to retrieve content for format: Content format - 'html', 'markdown', 'text', 'clean_html', 'json', 'extracted_data', 'page_metadata' Returns: CrawlContent object with content and metadata, or None if URL not found Example: ```python # Get HTML content for a specific URL content = crawl.read('https://example.com/page1') if content: print(f"URL: {content.url}") print(f"Status: {content.status_code}") print(f"Duration: {content.duration}s") print(content.content) # Get markdown content content = crawl.read('https://example.com/page1', format='markdown') if content: print(content.content) # Check if URL was crawled if crawl.read('https://example.com/missing') is None: print("URL not found in crawl results") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) # For HTML format, we can get it from the WARC artifact (faster) if format == 'html': artifact = self.warc() for record in artifact.iter_responses(): if record.url == url: # Extract metadata from WARC headers warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') duration = float(duration_str) if duration_str else None return CrawlContent( url=record.url, content=record.content.decode('utf-8', errors='replace'), status_code=record.status_code, headers=record.headers, duration=duration, log_id=warc_headers.get('WARC-Scrape-Log-Id'), country=warc_headers.get('WARC-Scrape-Country'), crawl_uuid=self._uuid ) return None # For other formats (markdown, text, etc.), use the contents API try: result = self._client.get_crawl_contents( self._uuid, format=format ) # The API returns: {"contents": {url: {format: content, ...}, ...}, "links": {...}} contents = result.get('contents', {}) if url in contents: content_data = contents[url] # Content is always a dict with format keys (e.g., {"html": "...", "markdown": "..."}) content_str = content_data.get(format) if content_str: # For non-HTML formats from contents API, we don't have full metadata # Try to get status code from WARC if possible status_code = 200 # Default headers = {} duration = None log_id = None country = None # Try to get metadata from WARC try: artifact = self.warc() for record in artifact.iter_responses(): if record.url == url: status_code = record.status_code headers = record.headers warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') duration = float(duration_str) if duration_str else None log_id = warc_headers.get('WARC-Scrape-Log-Id') country = warc_headers.get('WARC-Scrape-Country') break except: pass return CrawlContent( url=url, content=content_str, status_code=status_code, headers=headers, duration=duration, log_id=log_id, country=country, crawl_uuid=self._uuid ) return None except Exception: # If contents API fails, return None return None def read_iter( self, pattern: str, format: ContentFormat = 'html' ) -> Iterator[CrawlContent]: """ Iterate through URLs matching a pattern and yield their content Supports wildcard patterns using * and ? for flexible URL matching. Args: pattern: URL pattern with wildcards (* matches any characters, ? matches one) Examples: "/products?page=*", "https://example.com/*/detail", "*/product/*" format: Content format to retrieve Yields: CrawlContent objects for each matching URL Example: ```python # Get all product pages in markdown for content in crawl.read_iter(pattern="*/products?page=*", format="markdown"): print(f"{content.url}: {len(content.content)} chars") print(f"Duration: {content.duration}s") # Get all detail pages for content in crawl.read_iter(pattern="*/detail/*"): process(content.content) # Pattern matching examples: # "/products?page=*" matches /products?page=1, /products?page=2, etc. # "*/product/*" matches any URL with /product/ in the path # "https://example.com/page?" matches https://example.com/page1, page2, etc. ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) # For HTML format, use WARC artifact (faster) if format == 'html': artifact = self.warc() for record in artifact.iter_responses(): if fnmatch.fnmatch(record.url, pattern): # Extract metadata from WARC headers warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') duration = float(duration_str) if duration_str else None yield CrawlContent( url=record.url, content=record.content.decode('utf-8', errors='replace'), status_code=record.status_code, headers=record.headers, duration=duration, log_id=warc_headers.get('WARC-Scrape-Log-Id'), country=warc_headers.get('WARC-Scrape-Country'), crawl_uuid=self._uuid ) else: # For other formats, use contents API try: result = self._client.get_crawl_contents( self._uuid, format=format ) contents = result.get('contents', {}) # Build a metadata cache from WARC for non-HTML formats metadata_cache = {} try: artifact = self.warc() for record in artifact.iter_responses(): warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') metadata_cache[record.url] = { 'status_code': record.status_code, 'headers': record.headers, 'duration': float(duration_str) if duration_str else None, 'log_id': warc_headers.get('WARC-Scrape-Log-Id'), 'country': warc_headers.get('WARC-Scrape-Country') } except: pass # Iterate through matching URLs for url, content_data in contents.items(): if fnmatch.fnmatch(url, pattern): # Content is always a dict with format keys (e.g., {"html": "...", "markdown": "..."}) content = content_data.get(format) if content: # Get metadata from cache or use defaults metadata = metadata_cache.get(url, {}) yield CrawlContent( url=url, content=content, status_code=metadata.get('status_code', 200), headers=metadata.get('headers', {}), duration=metadata.get('duration'), log_id=metadata.get('log_id'), country=metadata.get('country'), crawl_uuid=self._uuid ) except Exception: # If contents API fails, yield nothing return def read_batch( self, urls: List[str], formats: List[ContentFormat] = None ) -> Dict[str, Dict[str, str]]: """ Retrieve content for multiple URLs in a single batch request This is more efficient than calling read() multiple times as it retrieves all content in a single API call. Maximum 100 URLs per request. Args: urls: List of URLs to retrieve (max 100) formats: List of content formats to retrieve (e.g., ['markdown', 'text']) If None, defaults to ['html'] Returns: Dictionary mapping URLs to their content in requested formats: { 'https://example.com/page1': { 'markdown': '# Page 1...', 'text': 'Page 1...' }, 'https://example.com/page2': { 'markdown': '# Page 2...', 'text': 'Page 2...' } } Example: ```python # Get markdown and text for multiple URLs urls = ['https://example.com/page1', 'https://example.com/page2'] contents = crawl.read_batch(urls, formats=['markdown', 'text']) for url, formats in contents.items(): markdown = formats.get('markdown', '') text = formats.get('text', '') print(f"{url}: {len(markdown)} chars markdown, {len(text)} chars text") ``` Raises: ValueError: If more than 100 URLs are provided ScrapflyCrawlerError: If crawler not started or request fails """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) if len(urls) > 100: raise ValueError("Maximum 100 URLs per batch request") if not urls: return {} # Default to html if no formats specified if formats is None: formats = ['html'] # Build URL with formats parameter formats_str = ','.join(formats) url = f"{self._client.host}/crawl/{self._uuid}/contents/batch" params = { 'key': self._client.key, 'formats': formats_str } # Prepare request body (newline-separated URLs) body = '\n'.join(urls) # Make request import requests response = requests.post( url, params=params, data=body.encode('utf-8'), headers={'Content-Type': 'text/plain'}, verify=self._client.verify ) if response.status_code != 200: raise ScrapflyCrawlerError( message=f"Batch content request failed: {response.status_code}", code="BATCH_REQUEST_FAILED", http_status_code=response.status_code ) # Parse multipart response content_type = response.headers.get('Content-Type', '') if not content_type.startswith('multipart/related'): raise ScrapflyCrawlerError( message=f"Unexpected content type: {content_type}", code="INVALID_RESPONSE", http_status_code=500 ) # Extract boundary from Content-Type header boundary = None for part in content_type.split(';'): part = part.strip() if part.startswith('boundary='): boundary = part.split('=', 1)[1] break if not boundary: raise ScrapflyCrawlerError( message="No boundary found in multipart response", code="INVALID_RESPONSE", http_status_code=500 ) # Parse multipart message # Prepend Content-Type header to make it a valid email message for the parser message_bytes = f"Content-Type: {content_type}\r\n\r\n".encode('utf-8') + response.content parser = BytesParser(policy=default) message = parser.parsebytes(message_bytes) # Extract content from each part result = {} for part in message.walk(): # Skip the container itself if part.get_content_maintype() == 'multipart': continue # Get the URL from Content-Location header content_location = part.get('Content-Location') if not content_location: continue # Get content type to determine format part_content_type = part.get_content_type() format_type = None # Map MIME types to format names if 'markdown' in part_content_type: format_type = 'markdown' elif 'plain' in part_content_type: format_type = 'text' elif 'html' in part_content_type: format_type = 'html' elif 'json' in part_content_type: format_type = 'json' if not format_type: continue # Get content content = part.get_content() if isinstance(content, bytes): content = content.decode('utf-8', errors='replace') # Initialize URL dict if needed if content_location not in result: result[content_location] = {} # Store content result[content_location][format_type] = content return result def stats(self) -> Dict[str, Any]: """ Get comprehensive statistics about the crawl Returns: Dictionary with crawl statistics Example: ```python stats = crawl.stats() print(f"URLs extracted: {stats['urls_extracted']}") print(f"URLs visited: {stats['urls_visited']}") print(f"Crawl rate: {stats['crawl_rate']:.1f}%") print(f"Total size: {stats['total_size_kb']:.2f} KB") ``` """ status = self.status(refresh=False) # Basic stats from status — uses the wire field names as defined by # the scrape-engine source of truth. stats_dict = { 'uuid': self._uuid, 'status': status.status, 'urls_extracted': status.state.urls_extracted, 'urls_visited': status.state.urls_visited, 'urls_to_crawl': status.state.urls_to_crawl, 'urls_failed': status.state.urls_failed, 'urls_skipped': status.state.urls_skipped, 'progress_pct': status.progress_pct, 'is_complete': status.is_complete, 'is_running': status.is_running, 'is_failed': status.is_failed, } # Calculate basic crawl rate (visited vs extracted) if status.state.urls_extracted > 0: stats_dict['crawl_rate'] = (status.state.urls_visited / status.state.urls_extracted) * 100 # Add artifact stats if available if self._artifact_cache is not None: pages = self._artifact_cache.get_pages() total_size = sum(len(p['content']) for p in pages) avg_size = total_size / len(pages) if pages else 0 stats_dict.update({ 'pages_downloaded': len(pages), 'total_size_bytes': total_size, 'total_size_kb': total_size / 1024, 'total_size_mb': total_size / (1024 * 1024), 'avg_page_size_bytes': avg_size, 'avg_page_size_kb': avg_size / 1024, }) # Calculate download rate (pages vs extracted) if status.state.urls_extracted > 0: stats_dict['download_rate'] = (len(pages) / status.state.urls_extracted) * 100 return stats_dict def __repr__(self): url = self._config._params['url'] if self._uuid is None: return f"Crawl(not started, url={url})" status_str = "unknown" if self._status_cache: status_str = self._status_cache.status return f"Crawl(uuid={self._uuid}, url={url}, status={status_str})"High-level abstraction for managing a crawler job
The Crawl object maintains the state of a crawler job and provides convenient methods for managing its lifecycle.
Example
from scrapfly import ScrapflyClient, CrawlerConfig, Crawl client = ScrapflyClient(key='your-key') config = CrawlerConfig(url='https://example.com', page_limit=10) # Create and start crawl crawl = Crawl(client, config) crawl.crawl() # Start the crawler # Wait for completion crawl.wait() # Get results pages = crawl.warc().get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") # Or read specific URLs html = crawl.read('https://example.com/page1', format='html')Initialize a Crawl object
Args
client- ScrapflyClient instance
config- CrawlerConfig with crawler settings
Instance variables
prop started : bool-
Expand source code
@property def started(self) -> bool: """Check if the crawler has been started""" return self._uuid is not NoneCheck if the crawler has been started
prop uuid : str | None-
Expand source code
@property def uuid(self) -> Optional[str]: """Get the crawler job UUID (None if not started)""" return self._uuidGet the crawler job UUID (None if not started)
Methods
def cancel(self) ‑> bool-
Expand source code
def cancel(self) -> bool: """ Cancel the running crawler job Returns: True if cancelled successfully Raises: ScrapflyCrawlerError: If crawler not started yet Example: ```python # Start a crawl crawl = Crawl(client, config).crawl() # Cancel it crawl.cancel() ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) return self._client.cancel_crawl(self._uuid)Cancel the running crawler job
Returns
True if cancelled successfully
Raises
ScrapflyCrawlerError- If crawler not started yet
Example
# Start a crawl crawl = Crawl(client, config).crawl() # Cancel it crawl.cancel() def crawl(self) ‑> Crawl-
Expand source code
def crawl(self) -> 'Crawl': """ Start the crawler job Returns: Self for method chaining Raises: RuntimeError: If crawler already started Example: ```python crawl = Crawl(client, config) crawl.crawl() # Start crawling ``` """ if self._uuid is not None: raise ScrapflyCrawlerError( message="Crawler already started", code="ALREADY_STARTED", http_status_code=400 ) response = self._client.start_crawl(self._config) self._uuid = response.uuid return selfStart the crawler job
Returns
Self for method chaining
Raises
RuntimeError- If crawler already started
Example
crawl = Crawl(client, config) crawl.crawl() # Start crawling def har(self) ‑> CrawlerArtifactResponse-
Expand source code
def har(self) -> CrawlerArtifactResponse: """ Download the crawler artifact in HAR (HTTP Archive) format Returns: CrawlerArtifactResponse with parsed HAR data Raises: RuntimeError: If crawler not started yet Example: ```python # Get HAR artifact artifact = crawl.har() # Get all pages pages = artifact.get_pages() # Iterate through HAR entries for entry in artifact.iter_responses(): print(f"{entry.url}: {entry.status_code}") print(f"Timing: {entry.time}ms") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) return self._client.get_crawl_artifact( self._uuid, artifact_type='har' )Download the crawler artifact in HAR (HTTP Archive) format
Returns
CrawlerArtifactResponse with parsed HAR data
Raises
RuntimeError- If crawler not started yet
Example
# Get HAR artifact artifact = crawl.har() # Get all pages pages = artifact.get_pages() # Iterate through HAR entries for entry in artifact.iter_responses(): print(f"{entry.url}: {entry.status_code}") print(f"Timing: {entry.time}ms") def read(self,
url: str,
format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html') ‑> CrawlContent | None-
Expand source code
def read(self, url: str, format: ContentFormat = 'html') -> Optional[CrawlContent]: """ Read content from a specific URL in the crawl results Args: url: The URL to retrieve content for format: Content format - 'html', 'markdown', 'text', 'clean_html', 'json', 'extracted_data', 'page_metadata' Returns: CrawlContent object with content and metadata, or None if URL not found Example: ```python # Get HTML content for a specific URL content = crawl.read('https://example.com/page1') if content: print(f"URL: {content.url}") print(f"Status: {content.status_code}") print(f"Duration: {content.duration}s") print(content.content) # Get markdown content content = crawl.read('https://example.com/page1', format='markdown') if content: print(content.content) # Check if URL was crawled if crawl.read('https://example.com/missing') is None: print("URL not found in crawl results") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) # For HTML format, we can get it from the WARC artifact (faster) if format == 'html': artifact = self.warc() for record in artifact.iter_responses(): if record.url == url: # Extract metadata from WARC headers warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') duration = float(duration_str) if duration_str else None return CrawlContent( url=record.url, content=record.content.decode('utf-8', errors='replace'), status_code=record.status_code, headers=record.headers, duration=duration, log_id=warc_headers.get('WARC-Scrape-Log-Id'), country=warc_headers.get('WARC-Scrape-Country'), crawl_uuid=self._uuid ) return None # For other formats (markdown, text, etc.), use the contents API try: result = self._client.get_crawl_contents( self._uuid, format=format ) # The API returns: {"contents": {url: {format: content, ...}, ...}, "links": {...}} contents = result.get('contents', {}) if url in contents: content_data = contents[url] # Content is always a dict with format keys (e.g., {"html": "...", "markdown": "..."}) content_str = content_data.get(format) if content_str: # For non-HTML formats from contents API, we don't have full metadata # Try to get status code from WARC if possible status_code = 200 # Default headers = {} duration = None log_id = None country = None # Try to get metadata from WARC try: artifact = self.warc() for record in artifact.iter_responses(): if record.url == url: status_code = record.status_code headers = record.headers warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') duration = float(duration_str) if duration_str else None log_id = warc_headers.get('WARC-Scrape-Log-Id') country = warc_headers.get('WARC-Scrape-Country') break except: pass return CrawlContent( url=url, content=content_str, status_code=status_code, headers=headers, duration=duration, log_id=log_id, country=country, crawl_uuid=self._uuid ) return None except Exception: # If contents API fails, return None return NoneRead content from a specific URL in the crawl results
Args
url- The URL to retrieve content for
format- Content format - 'html', 'markdown', 'text', 'clean_html', 'json', 'extracted_data', 'page_metadata'
Returns
CrawlContent object with content and metadata, or None if URL not found
Example
# Get HTML content for a specific URL content = crawl.read('https://example.com/page1') if content: print(f"URL: {content.url}") print(f"Status: {content.status_code}") print(f"Duration: {content.duration}s") print(content.content) # Get markdown content content = crawl.read('https://example.com/page1', format='markdown') if content: print(content.content) # Check if URL was crawled if crawl.read('https://example.com/missing') is None: print("URL not found in crawl results") def read_batch(self,
urls: List[str],
formats: List[Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata']] = None) ‑> Dict[str, Dict[str, str]]-
Expand source code
def read_batch( self, urls: List[str], formats: List[ContentFormat] = None ) -> Dict[str, Dict[str, str]]: """ Retrieve content for multiple URLs in a single batch request This is more efficient than calling read() multiple times as it retrieves all content in a single API call. Maximum 100 URLs per request. Args: urls: List of URLs to retrieve (max 100) formats: List of content formats to retrieve (e.g., ['markdown', 'text']) If None, defaults to ['html'] Returns: Dictionary mapping URLs to their content in requested formats: { 'https://example.com/page1': { 'markdown': '# Page 1...', 'text': 'Page 1...' }, 'https://example.com/page2': { 'markdown': '# Page 2...', 'text': 'Page 2...' } } Example: ```python # Get markdown and text for multiple URLs urls = ['https://example.com/page1', 'https://example.com/page2'] contents = crawl.read_batch(urls, formats=['markdown', 'text']) for url, formats in contents.items(): markdown = formats.get('markdown', '') text = formats.get('text', '') print(f"{url}: {len(markdown)} chars markdown, {len(text)} chars text") ``` Raises: ValueError: If more than 100 URLs are provided ScrapflyCrawlerError: If crawler not started or request fails """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) if len(urls) > 100: raise ValueError("Maximum 100 URLs per batch request") if not urls: return {} # Default to html if no formats specified if formats is None: formats = ['html'] # Build URL with formats parameter formats_str = ','.join(formats) url = f"{self._client.host}/crawl/{self._uuid}/contents/batch" params = { 'key': self._client.key, 'formats': formats_str } # Prepare request body (newline-separated URLs) body = '\n'.join(urls) # Make request import requests response = requests.post( url, params=params, data=body.encode('utf-8'), headers={'Content-Type': 'text/plain'}, verify=self._client.verify ) if response.status_code != 200: raise ScrapflyCrawlerError( message=f"Batch content request failed: {response.status_code}", code="BATCH_REQUEST_FAILED", http_status_code=response.status_code ) # Parse multipart response content_type = response.headers.get('Content-Type', '') if not content_type.startswith('multipart/related'): raise ScrapflyCrawlerError( message=f"Unexpected content type: {content_type}", code="INVALID_RESPONSE", http_status_code=500 ) # Extract boundary from Content-Type header boundary = None for part in content_type.split(';'): part = part.strip() if part.startswith('boundary='): boundary = part.split('=', 1)[1] break if not boundary: raise ScrapflyCrawlerError( message="No boundary found in multipart response", code="INVALID_RESPONSE", http_status_code=500 ) # Parse multipart message # Prepend Content-Type header to make it a valid email message for the parser message_bytes = f"Content-Type: {content_type}\r\n\r\n".encode('utf-8') + response.content parser = BytesParser(policy=default) message = parser.parsebytes(message_bytes) # Extract content from each part result = {} for part in message.walk(): # Skip the container itself if part.get_content_maintype() == 'multipart': continue # Get the URL from Content-Location header content_location = part.get('Content-Location') if not content_location: continue # Get content type to determine format part_content_type = part.get_content_type() format_type = None # Map MIME types to format names if 'markdown' in part_content_type: format_type = 'markdown' elif 'plain' in part_content_type: format_type = 'text' elif 'html' in part_content_type: format_type = 'html' elif 'json' in part_content_type: format_type = 'json' if not format_type: continue # Get content content = part.get_content() if isinstance(content, bytes): content = content.decode('utf-8', errors='replace') # Initialize URL dict if needed if content_location not in result: result[content_location] = {} # Store content result[content_location][format_type] = content return resultRetrieve content for multiple URLs in a single batch request
This is more efficient than calling read() multiple times as it retrieves all content in a single API call. Maximum 100 URLs per request.
Args
urls- List of URLs to retrieve (max 100)
formats- List of content formats to retrieve (e.g., ['markdown', 'text']) If None, defaults to ['html']
Returns
Dictionary mapping URLs to their content in requested formats: { 'https://example.com/page1': { 'markdown': '# Page 1…', 'text': 'Page 1…' }, 'https://example.com/page2': { 'markdown': '# Page 2…', 'text': 'Page 2…' } }
Example
# Get markdown and text for multiple URLs urls = ['https://example.com/page1', 'https://example.com/page2'] contents = crawl.read_batch(urls, formats=['markdown', 'text']) for url, formats in contents.items(): markdown = formats.get('markdown', '') text = formats.get('text', '') print(f"{url}: {len(markdown)} chars markdown, {len(text)} chars text")Raises
ValueError- If more than 100 URLs are provided
ScrapflyCrawlerError- If crawler not started or request fails
def read_iter(self,
pattern: str,
format: Literal['html', 'clean_html', 'markdown', 'json', 'text', 'extracted_data', 'page_metadata'] = 'html') ‑> Iterator[CrawlContent]-
Expand source code
def read_iter( self, pattern: str, format: ContentFormat = 'html' ) -> Iterator[CrawlContent]: """ Iterate through URLs matching a pattern and yield their content Supports wildcard patterns using * and ? for flexible URL matching. Args: pattern: URL pattern with wildcards (* matches any characters, ? matches one) Examples: "/products?page=*", "https://example.com/*/detail", "*/product/*" format: Content format to retrieve Yields: CrawlContent objects for each matching URL Example: ```python # Get all product pages in markdown for content in crawl.read_iter(pattern="*/products?page=*", format="markdown"): print(f"{content.url}: {len(content.content)} chars") print(f"Duration: {content.duration}s") # Get all detail pages for content in crawl.read_iter(pattern="*/detail/*"): process(content.content) # Pattern matching examples: # "/products?page=*" matches /products?page=1, /products?page=2, etc. # "*/product/*" matches any URL with /product/ in the path # "https://example.com/page?" matches https://example.com/page1, page2, etc. ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) # For HTML format, use WARC artifact (faster) if format == 'html': artifact = self.warc() for record in artifact.iter_responses(): if fnmatch.fnmatch(record.url, pattern): # Extract metadata from WARC headers warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') duration = float(duration_str) if duration_str else None yield CrawlContent( url=record.url, content=record.content.decode('utf-8', errors='replace'), status_code=record.status_code, headers=record.headers, duration=duration, log_id=warc_headers.get('WARC-Scrape-Log-Id'), country=warc_headers.get('WARC-Scrape-Country'), crawl_uuid=self._uuid ) else: # For other formats, use contents API try: result = self._client.get_crawl_contents( self._uuid, format=format ) contents = result.get('contents', {}) # Build a metadata cache from WARC for non-HTML formats metadata_cache = {} try: artifact = self.warc() for record in artifact.iter_responses(): warc_headers = record.warc_headers or {} duration_str = warc_headers.get('WARC-Scrape-Duration') metadata_cache[record.url] = { 'status_code': record.status_code, 'headers': record.headers, 'duration': float(duration_str) if duration_str else None, 'log_id': warc_headers.get('WARC-Scrape-Log-Id'), 'country': warc_headers.get('WARC-Scrape-Country') } except: pass # Iterate through matching URLs for url, content_data in contents.items(): if fnmatch.fnmatch(url, pattern): # Content is always a dict with format keys (e.g., {"html": "...", "markdown": "..."}) content = content_data.get(format) if content: # Get metadata from cache or use defaults metadata = metadata_cache.get(url, {}) yield CrawlContent( url=url, content=content, status_code=metadata.get('status_code', 200), headers=metadata.get('headers', {}), duration=metadata.get('duration'), log_id=metadata.get('log_id'), country=metadata.get('country'), crawl_uuid=self._uuid ) except Exception: # If contents API fails, yield nothing returnIterate through URLs matching a pattern and yield their content
Supports wildcard patterns using * and ? for flexible URL matching.
Args
pattern- URL pattern with wildcards ( matches any characters, ? matches one) Examples: "/products?page=", "https://example.com//detail", "/product/*"
format- Content format to retrieve
Yields
CrawlContent objects for each matching URL
Example
# Get all product pages in markdown for content in crawl.read_iter(pattern="*/products?page=*", format="markdown"): print(f"{content.url}: {len(content.content)} chars") print(f"Duration: {content.duration}s") # Get all detail pages for content in crawl.read_iter(pattern="*/detail/*"): process(content.content) # Pattern matching examples: # "/products?page=*" matches /products?page=1, /products?page=2, etc. # "*/product/*" matches any URL with /product/ in the path # "https://example.com/page?" matches <https://example.com/page1,> page2, etc. def stats(self) ‑> Dict[str, Any]-
Expand source code
def stats(self) -> Dict[str, Any]: """ Get comprehensive statistics about the crawl Returns: Dictionary with crawl statistics Example: ```python stats = crawl.stats() print(f"URLs extracted: {stats['urls_extracted']}") print(f"URLs visited: {stats['urls_visited']}") print(f"Crawl rate: {stats['crawl_rate']:.1f}%") print(f"Total size: {stats['total_size_kb']:.2f} KB") ``` """ status = self.status(refresh=False) # Basic stats from status — uses the wire field names as defined by # the scrape-engine source of truth. stats_dict = { 'uuid': self._uuid, 'status': status.status, 'urls_extracted': status.state.urls_extracted, 'urls_visited': status.state.urls_visited, 'urls_to_crawl': status.state.urls_to_crawl, 'urls_failed': status.state.urls_failed, 'urls_skipped': status.state.urls_skipped, 'progress_pct': status.progress_pct, 'is_complete': status.is_complete, 'is_running': status.is_running, 'is_failed': status.is_failed, } # Calculate basic crawl rate (visited vs extracted) if status.state.urls_extracted > 0: stats_dict['crawl_rate'] = (status.state.urls_visited / status.state.urls_extracted) * 100 # Add artifact stats if available if self._artifact_cache is not None: pages = self._artifact_cache.get_pages() total_size = sum(len(p['content']) for p in pages) avg_size = total_size / len(pages) if pages else 0 stats_dict.update({ 'pages_downloaded': len(pages), 'total_size_bytes': total_size, 'total_size_kb': total_size / 1024, 'total_size_mb': total_size / (1024 * 1024), 'avg_page_size_bytes': avg_size, 'avg_page_size_kb': avg_size / 1024, }) # Calculate download rate (pages vs extracted) if status.state.urls_extracted > 0: stats_dict['download_rate'] = (len(pages) / status.state.urls_extracted) * 100 return stats_dictGet comprehensive statistics about the crawl
Returns
Dictionary with crawl statistics
Example
stats = crawl.stats() print(f"URLs extracted: {stats['urls_extracted']}") print(f"URLs visited: {stats['urls_visited']}") print(f"Crawl rate: {stats['crawl_rate']:.1f}%") print(f"Total size: {stats['total_size_kb']:.2f} KB") def status(self, refresh: bool = True) ‑> CrawlerStatusResponse-
Expand source code
def status(self, refresh: bool = True) -> CrawlerStatusResponse: """ Get current crawler status Args: refresh: If True, fetch fresh status from API. If False, return cached status. Returns: CrawlerStatusResponse with current status Raises: RuntimeError: If crawler not started yet Example: ```python status = crawl.status() print(f"Progress: {status.progress_pct}%") print(f"URLs visited: {status.state.urls_visited}") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) if refresh or self._status_cache is None: self._status_cache = self._client.get_crawl_status(self._uuid) return self._status_cacheGet current crawler status
Args
refresh- If True, fetch fresh status from API. If False, return cached status.
Returns
CrawlerStatusResponse with current status
Raises
RuntimeError- If crawler not started yet
Example
status = crawl.status() print(f"Progress: {status.progress_pct}%") print(f"URLs visited: {status.state.urls_visited}") def urls(self,
status: Literal['visited', 'pending', 'failed'] | None = None,
page: int = 1,
per_page: int = 100) ‑> CrawlerUrlsResponse-
Expand source code
def urls( self, status: Optional[Literal['visited', 'pending', 'failed']] = None, page: int = 1, per_page: int = 100, ) -> CrawlerUrlsResponse: """ List the crawled URLs (paginated, optionally filtered by status). NEW in 0.8.28 — convenience wrapper around :meth:`ScrapflyClient.get_crawl_urls` that pre-fills the crawler UUID. Args: status: Filter by URL status — 'visited', 'pending', or 'failed'. When None, the server defaults to 'visited'. page: 1-based page number (default 1) per_page: Page size (default 100, max 1000) Returns: CrawlerUrlsResponse with the URL records, total count and pagination metadata. Raises: ScrapflyCrawlerError: if the crawler has not been started yet. Example: ```python crawl = Crawl(client, config).crawl().wait() for entry in crawl.urls(status='visited'): print(f"{entry.url} ({entry.status})") ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400, ) return self._client.get_crawl_urls( uuid=self._uuid, status=status, page=page, per_page=per_page, )List the crawled URLs (paginated, optionally filtered by status).
NEW in 0.8.28 — convenience wrapper around :meth:
ScrapflyClient.get_crawl_urlsthat pre-fills the crawler UUID.Args
status- Filter by URL status — 'visited', 'pending', or 'failed'. When None, the server defaults to 'visited'.
page- 1-based page number (default 1)
per_page- Page size (default 100, max 1000)
Returns
CrawlerUrlsResponse with the URL records, total count and pagination metadata.
Raises
ScrapflyCrawlerError- if the crawler has not been started yet.
Example
crawl = Crawl(client, config).crawl().wait() for entry in crawl.urls(status='visited'): print(f"{entry.url} ({entry.status})") def wait(self,
poll_interval: int = 5,
max_wait: int | None = None,
verbose: bool = False,
allow_cancelled: bool = False) ‑> Crawl-
Expand source code
def wait( self, poll_interval: int = 5, max_wait: Optional[int] = None, verbose: bool = False, allow_cancelled: bool = False, ) -> 'Crawl': """ Wait for crawler to complete Polls the status endpoint until the crawler finishes. Args: poll_interval: Seconds between status checks (default: 5) max_wait: Maximum seconds to wait (None = wait forever) verbose: If True, print progress updates allow_cancelled: If True, return normally when the crawler reaches CANCELLED instead of raising. Useful for the cancel-then-wait pattern where the caller already knows they triggered the cancellation. Defaults to False (raises ScrapflyCrawlerError with code='CANCELLED' on user_cancelled), preserving prior behavior for callers that observe external cancellations. Returns: Self for method chaining Raises: ScrapflyCrawlerError: If crawler not started, failed, or timed out. Also raised on cancellation when ``allow_cancelled=False``. Example: ```python # Wait with progress updates crawl.crawl().wait(verbose=True) # Wait with timeout crawl.crawl().wait(max_wait=300) # 5 minutes max # Cancel from the same call site, then wait without re-raising crawl.cancel() crawl.wait(allow_cancelled=True) ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) start_time = time.time() poll_count = 0 while True: status = self.status(refresh=True) poll_count += 1 if verbose: logger.info(f"Poll #{poll_count}: {status.status} - " f"{status.progress_pct:.1f}% - " f"{status.state.urls_visited}/{status.state.urls_extracted} URLs") if status.is_complete: if verbose: logger.info(f"✓ Crawler completed successfully!") return self elif status.is_failed: raise ScrapflyCrawlerError( message=f"Crawler failed with status: {status.status}", code="FAILED", http_status_code=400 ) elif status.is_cancelled: if allow_cancelled: if verbose: logger.info("Crawler was cancelled (allow_cancelled=True)") return self raise ScrapflyCrawlerError( message="Crawler was cancelled", code="CANCELLED", http_status_code=400 ) # Check timeout if max_wait is not None: elapsed = time.time() - start_time if elapsed > max_wait: raise ScrapflyCrawlerError( message=f"Timeout waiting for crawler (>{max_wait}s)", code="TIMEOUT", http_status_code=400 ) time.sleep(poll_interval)Wait for crawler to complete
Polls the status endpoint until the crawler finishes.
Args
poll_interval- Seconds between status checks (default: 5)
max_wait- Maximum seconds to wait (None = wait forever)
verbose- If True, print progress updates
allow_cancelled- If True, return normally when the crawler reaches CANCELLED instead of raising. Useful for the cancel-then-wait pattern where the caller already knows they triggered the cancellation. Defaults to False (raises ScrapflyCrawlerError with code='CANCELLED' on user_cancelled), preserving prior behavior for callers that observe external cancellations.
Returns
Self for method chaining
Raises
ScrapflyCrawlerError- If crawler not started, failed, or timed out.
Also raised on cancellation when
allow_cancelled=False.
Example
# Wait with progress updates crawl.crawl().wait(verbose=True) # Wait with timeout crawl.crawl().wait(max_wait=300) # 5 minutes max # Cancel from the same call site, then wait without re-raising crawl.cancel() crawl.wait(allow_cancelled=True) def warc(self, artifact_type: str = 'warc') ‑> CrawlerArtifactResponse-
Expand source code
def warc(self, artifact_type: str = 'warc') -> CrawlerArtifactResponse: """ Download the crawler artifact (WARC file) Args: artifact_type: Type of artifact to download (default: 'warc') Returns: CrawlerArtifactResponse with parsed WARC data Raises: RuntimeError: If crawler not started yet Example: ```python # Get WARC artifact artifact = crawl.warc() # Get all pages pages = artifact.get_pages() # Iterate through responses for record in artifact.iter_responses(): print(record.url) ``` """ if self._uuid is None: raise ScrapflyCrawlerError( message="Crawler not started yet. Call crawl() first.", code="NOT_STARTED", http_status_code=400 ) if self._artifact_cache is None: self._artifact_cache = self._client.get_crawl_artifact( self._uuid, artifact_type=artifact_type ) return self._artifact_cacheDownload the crawler artifact (WARC file)
Args
artifact_type- Type of artifact to download (default: 'warc')
Returns
CrawlerArtifactResponse with parsed WARC data
Raises
RuntimeError- If crawler not started yet
Example
# Get WARC artifact artifact = crawl.warc() # Get all pages pages = artifact.get_pages() # Iterate through responses for record in artifact.iter_responses(): print(record.url)
class CrawlContent (url: str,
content: str,
status_code: int,
headers: Dict[str, str] | None = None,
duration: float | None = None,
log_id: str | None = None,
country: str | None = None,
crawl_uuid: str | None = None)-
Expand source code
class CrawlContent: """ Response object for a single crawled URL Provides access to content and metadata for a crawled page. Similar to ScrapeApiResponse but for crawler results. Attributes: url: The crawled URL (mandatory) content: Page content in requested format (mandatory) status_code: HTTP response status code (mandatory) headers: HTTP response headers (optional) duration: Request duration in seconds (optional) log_id: Scrape log ID for debugging (optional) log_url: URL to view scrape logs (optional) country: Country the request was made from (optional) Example: ```python # Get content for a URL content = crawl.read('https://example.com', format='markdown') print(f"URL: {content.url}") print(f"Status: {content.status_code}") print(f"Duration: {content.duration}s") print(f"Content: {content.content}") # Access metadata if content.log_url: print(f"View logs: {content.log_url}") ``` """ def __init__( self, url: str, content: str, status_code: int, headers: Optional[Dict[str, str]] = None, duration: Optional[float] = None, log_id: Optional[str] = None, country: Optional[str] = None, crawl_uuid: Optional[str] = None ): """ Initialize CrawlContent Args: url: The crawled URL content: Page content in requested format status_code: HTTP response status code headers: HTTP response headers duration: Request duration in seconds log_id: Scrape log ID country: Country the request was made from crawl_uuid: Crawl job UUID """ self.url = url self.content = content self.status_code = status_code self.headers = headers or {} self.duration = duration self.log_id = log_id self.country = country self._crawl_uuid = crawl_uuid @property def log_url(self) -> Optional[str]: """ Get URL to view scrape logs Returns: Log URL if log_id is available, None otherwise """ if self.log_id: return f"https://scrapfly.io/dashboard/logs/{self.log_id}" return None @property def success(self) -> bool: """Check if the request was successful (2xx status code)""" return 200 <= self.status_code < 300 @property def error(self) -> bool: """Check if the request resulted in an error (4xx/5xx status code)""" return self.status_code >= 400 def __repr__(self) -> str: return (f"CrawlContent(url={self.url!r}, status={self.status_code}, " f"content_length={len(self.content)})") def __str__(self) -> str: return self.content def __len__(self) -> int: """Get content length""" return len(self.content)Response object for a single crawled URL
Provides access to content and metadata for a crawled page. Similar to ScrapeApiResponse but for crawler results.
Attributes
url- The crawled URL (mandatory)
content- Page content in requested format (mandatory)
status_code- HTTP response status code (mandatory)
headers- HTTP response headers (optional)
duration- Request duration in seconds (optional)
log_id- Scrape log ID for debugging (optional)
log_url- URL to view scrape logs (optional)
country- Country the request was made from (optional)
Example
# Get content for a URL content = crawl.read('https://example.com', format='markdown') print(f"URL: {content.url}") print(f"Status: {content.status_code}") print(f"Duration: {content.duration}s") print(f"Content: {content.content}") # Access metadata if content.log_url: print(f"View logs: {content.log_url}")Initialize CrawlContent
Args
url- The crawled URL
content- Page content in requested format
status_code- HTTP response status code
headers- HTTP response headers
duration- Request duration in seconds
log_id- Scrape log ID
country- Country the request was made from
crawl_uuid- Crawl job UUID
Instance variables
prop error : bool-
Expand source code
@property def error(self) -> bool: """Check if the request resulted in an error (4xx/5xx status code)""" return self.status_code >= 400Check if the request resulted in an error (4xx/5xx status code)
prop log_url : str | None-
Expand source code
@property def log_url(self) -> Optional[str]: """ Get URL to view scrape logs Returns: Log URL if log_id is available, None otherwise """ if self.log_id: return f"https://scrapfly.io/dashboard/logs/{self.log_id}" return NoneGet URL to view scrape logs
Returns
Log URL if log_id is available, None otherwise
prop success : bool-
Expand source code
@property def success(self) -> bool: """Check if the request was successful (2xx status code)""" return 200 <= self.status_code < 300Check if the request was successful (2xx status code)
class CrawlerArtifactResponse (artifact_data: bytes, artifact_type: str = 'warc')-
Expand source code
class CrawlerArtifactResponse: """ Response from downloading crawler artifacts Returned by ScrapflyClient.get_crawl_artifact() method. Provides high-level access to crawl results with automatic WARC/HAR parsing. Users don't need to understand WARC or HAR format to use this class. Example: ```python # Get WARC artifact (default) artifact = client.get_crawl_artifact(uuid) # Get HAR artifact artifact = client.get_crawl_artifact(uuid, artifact_type='har') # Easy mode: get all pages as dicts pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") html = page['content'].decode('utf-8') # Memory-efficient: iterate one page at a time for record in artifact.iter_responses(): print(f"{record.url}: {record.status_code}") process(record.content) # Save to file artifact.save('crawl_results.warc.gz') ``` """ def __init__(self, artifact_data: bytes, artifact_type: str = 'warc'): """ Initialize from artifact data Args: artifact_data: Raw artifact file bytes artifact_type: Type of artifact ('warc' or 'har') """ self._artifact_data = artifact_data self._artifact_type = artifact_type self._warc_parser: Optional[WarcParser] = None self._har_parser: Optional[HarArchive] = None @property def artifact_type(self) -> str: """Get artifact type ('warc' or 'har')""" return self._artifact_type @property def artifact_data(self) -> bytes: """Get raw artifact data (for advanced users)""" return self._artifact_data @property def warc_data(self) -> bytes: """Get raw WARC data (deprecated, use artifact_data)""" return self._artifact_data @property def parser(self) -> Union[WarcParser, HarArchive]: """Get artifact parser instance (lazy-loaded)""" if self._artifact_type == 'har': if self._har_parser is None: self._har_parser = HarArchive(self._artifact_data) return self._har_parser else: if self._warc_parser is None: self._warc_parser = parse_warc(self._artifact_data) return self._warc_parser def iter_records(self) -> Iterator[Union[WarcRecord, HarEntry]]: """ Iterate through all records For WARC: iterates through all WARC records For HAR: iterates through all HAR entries Yields: WarcRecord or HarEntry: Each record in the artifact """ if self._artifact_type == 'har': return self.parser.iter_entries() else: return self.parser.iter_records() def iter_responses(self) -> Iterator[Union[WarcRecord, HarEntry]]: """ Iterate through HTTP response records only This is more memory-efficient than get_pages() for large crawls. For WARC: iterates through response records For HAR: iterates through all entries (HAR only contains responses) Yields: WarcRecord or HarEntry: HTTP response records with url, status_code, headers, content """ if self._artifact_type == 'har': return self.parser.iter_entries() else: return self.parser.iter_responses() def get_pages(self) -> List[Dict]: """ Get all crawled pages as simple dictionaries This is the easiest way to access crawl results. Works with both WARC and HAR formats. Returns: List of dicts with keys: url, status_code, headers, content Example: ```python pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {len(page['content'])} bytes") html = page['content'].decode('utf-8') ``` """ if self._artifact_type == 'har': # Convert HAR entries to page dicts pages = [] for entry in self.parser.iter_entries(): pages.append({ 'url': entry.url, 'status_code': entry.status_code, 'headers': entry.response_headers, 'content': entry.content }) return pages else: return self.parser.get_pages() @property def total_pages(self) -> int: """Get total number of pages in the artifact""" return len(self.get_pages()) def save(self, filepath: str): """ Save WARC data to file Args: filepath: Path to save the WARC file Example: ```python artifact.save('crawl_results.warc.gz') ``` """ with open(filepath, 'wb') as f: f.write(self.warc_data) def __repr__(self): return f"CrawlerArtifactResponse(size={len(self.warc_data)} bytes)"Response from downloading crawler artifacts
Returned by ScrapflyClient.get_crawl_artifact() method.
Provides high-level access to crawl results with automatic WARC/HAR parsing. Users don't need to understand WARC or HAR format to use this class.
Example
# Get WARC artifact (default) artifact = client.get_crawl_artifact(uuid) # Get HAR artifact artifact = client.get_crawl_artifact(uuid, artifact_type='har') # Easy mode: get all pages as dicts pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") html = page['content'].decode('utf-8') # Memory-efficient: iterate one page at a time for record in artifact.iter_responses(): print(f"{record.url}: {record.status_code}") process(record.content) # Save to file artifact.save('crawl_results.warc.gz')Initialize from artifact data
Args
artifact_data- Raw artifact file bytes
artifact_type- Type of artifact ('warc' or 'har')
Instance variables
prop artifact_data : bytes-
Expand source code
@property def artifact_data(self) -> bytes: """Get raw artifact data (for advanced users)""" return self._artifact_dataGet raw artifact data (for advanced users)
prop artifact_type : str-
Expand source code
@property def artifact_type(self) -> str: """Get artifact type ('warc' or 'har')""" return self._artifact_typeGet artifact type ('warc' or 'har')
prop parser : WarcParser | HarArchive-
Expand source code
@property def parser(self) -> Union[WarcParser, HarArchive]: """Get artifact parser instance (lazy-loaded)""" if self._artifact_type == 'har': if self._har_parser is None: self._har_parser = HarArchive(self._artifact_data) return self._har_parser else: if self._warc_parser is None: self._warc_parser = parse_warc(self._artifact_data) return self._warc_parserGet artifact parser instance (lazy-loaded)
prop total_pages : int-
Expand source code
@property def total_pages(self) -> int: """Get total number of pages in the artifact""" return len(self.get_pages())Get total number of pages in the artifact
prop warc_data : bytes-
Expand source code
@property def warc_data(self) -> bytes: """Get raw WARC data (deprecated, use artifact_data)""" return self._artifact_dataGet raw WARC data (deprecated, use artifact_data)
Methods
def get_pages(self) ‑> List[Dict]-
Expand source code
def get_pages(self) -> List[Dict]: """ Get all crawled pages as simple dictionaries This is the easiest way to access crawl results. Works with both WARC and HAR formats. Returns: List of dicts with keys: url, status_code, headers, content Example: ```python pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {len(page['content'])} bytes") html = page['content'].decode('utf-8') ``` """ if self._artifact_type == 'har': # Convert HAR entries to page dicts pages = [] for entry in self.parser.iter_entries(): pages.append({ 'url': entry.url, 'status_code': entry.status_code, 'headers': entry.response_headers, 'content': entry.content }) return pages else: return self.parser.get_pages()Get all crawled pages as simple dictionaries
This is the easiest way to access crawl results. Works with both WARC and HAR formats.
Returns
Listofdicts with keys- url, status_code, headers, content
Example
pages = artifact.get_pages() for page in pages: print(f"{page['url']}: {len(page['content'])} bytes") html = page['content'].decode('utf-8') def iter_records(self) ‑> Iterator[WarcRecord | HarEntry]-
Expand source code
def iter_records(self) -> Iterator[Union[WarcRecord, HarEntry]]: """ Iterate through all records For WARC: iterates through all WARC records For HAR: iterates through all HAR entries Yields: WarcRecord or HarEntry: Each record in the artifact """ if self._artifact_type == 'har': return self.parser.iter_entries() else: return self.parser.iter_records()Iterate through all records
For WARC: iterates through all WARC records For HAR: iterates through all HAR entries
Yields
WarcRecordorHarEntry- Each record in the artifact
def iter_responses(self) ‑> Iterator[WarcRecord | HarEntry]-
Expand source code
def iter_responses(self) -> Iterator[Union[WarcRecord, HarEntry]]: """ Iterate through HTTP response records only This is more memory-efficient than get_pages() for large crawls. For WARC: iterates through response records For HAR: iterates through all entries (HAR only contains responses) Yields: WarcRecord or HarEntry: HTTP response records with url, status_code, headers, content """ if self._artifact_type == 'har': return self.parser.iter_entries() else: return self.parser.iter_responses()Iterate through HTTP response records only
This is more memory-efficient than get_pages() for large crawls.
For WARC: iterates through response records For HAR: iterates through all entries (HAR only contains responses)
Yields
WarcRecordorHarEntry- HTTP response records with url, status_code, headers, content
def save(self, filepath: str)-
Expand source code
def save(self, filepath: str): """ Save WARC data to file Args: filepath: Path to save the WARC file Example: ```python artifact.save('crawl_results.warc.gz') ``` """ with open(filepath, 'wb') as f: f.write(self.warc_data)Save WARC data to file
Args
filepath- Path to save the WARC file
Example
artifact.save('crawl_results.warc.gz')
class CrawlerConfig (url: str,
page_limit: int | None = None,
max_depth: int | None = None,
max_duration: int | None = None,
exclude_paths: List[str] | None = None,
include_only_paths: List[str] | None = None,
ignore_base_path_restriction: bool = False,
follow_external_links: bool = False,
allowed_external_domains: List[str] | None = None,
follow_internal_subdomains: bool | None = None,
allowed_internal_subdomains: List[str] | None = None,
headers: Dict[str, str] | None = None,
delay: int | None = None,
user_agent: str | None = None,
max_concurrency: int | None = None,
rendering_delay: int | None = None,
use_sitemaps: bool = False,
respect_robots_txt: bool | None = None,
ignore_no_follow: bool = False,
cache: bool = False,
cache_ttl: int | None = None,
cache_clear: bool = False,
content_formats: List[Literal['html', 'markdown', 'text', 'clean_html']] | None = None,
extraction_rules: Dict | None = None,
asp: bool = False,
proxy_pool: str | None = None,
country: str | None = None,
webhook_name: str | None = None,
webhook_events: List[str] | None = None,
max_api_credit: int | None = None)-
Expand source code
class CrawlerConfig(BaseApiConfig): """ Configuration for Scrapfly Crawler API The Crawler API performs recursive website crawling with advanced configuration, content extraction, and artifact storage. Example: ```python from scrapfly import ScrapflyClient, CrawlerConfig client = ScrapflyClient(key='YOUR_API_KEY') config = CrawlerConfig( url='https://example.com', page_limit=100, max_depth=3, content_formats=['markdown', 'html'] ) # Start crawl start_response = client.start_crawl(config) uuid = start_response.uuid # Poll status status = client.get_crawl_status(uuid) # Get results when complete if status.is_complete: artifact = client.get_crawl_artifact(uuid) pages = artifact.get_pages() ``` """ WEBHOOK_CRAWLER_STARTED = 'crawler_started' WEBHOOK_CRAWLER_URL_VISITED = 'crawler_url_visited' WEBHOOK_CRAWLER_URL_SKIPPED = 'crawler_url_skipped' WEBHOOK_CRAWLER_URL_DISCOVERED = 'crawler_url_discovered' WEBHOOK_CRAWLER_URL_FAILED = 'crawler_url_failed' WEBHOOK_CRAWLER_STOPPED = 'crawler_stopped' WEBHOOK_CRAWLER_CANCELLED = 'crawler_cancelled' WEBHOOK_CRAWLER_FINISHED = 'crawler_finished' ALL_WEBHOOK_EVENTS = [ WEBHOOK_CRAWLER_STARTED, WEBHOOK_CRAWLER_URL_VISITED, WEBHOOK_CRAWLER_URL_SKIPPED, WEBHOOK_CRAWLER_URL_DISCOVERED, WEBHOOK_CRAWLER_URL_FAILED, WEBHOOK_CRAWLER_STOPPED, WEBHOOK_CRAWLER_CANCELLED, WEBHOOK_CRAWLER_FINISHED, ] def __init__( self, url: str, # Crawl limits page_limit: Optional[int] = None, max_depth: Optional[int] = None, max_duration: Optional[int] = None, # Path filtering (mutually exclusive) exclude_paths: Optional[List[str]] = None, include_only_paths: Optional[List[str]] = None, # Advanced crawl options ignore_base_path_restriction: bool = False, follow_external_links: bool = False, allowed_external_domains: Optional[List[str]] = None, # Subdomain control (NEW — added in 0.8.28 to match the documented public API). # Server-side default for follow_internal_subdomains is True; we leave the # field unset by default so the server applies its own default. follow_internal_subdomains: Optional[bool] = None, allowed_internal_subdomains: Optional[List[str]] = None, # Request configuration headers: Optional[Dict[str, str]] = None, delay: Optional[int] = None, user_agent: Optional[str] = None, max_concurrency: Optional[int] = None, rendering_delay: Optional[int] = None, # Crawl strategy options use_sitemaps: bool = False, # respect_robots_txt: server default is True. Leave unset (None) so the # server applies its own default rather than forcing False on every request. respect_robots_txt: Optional[bool] = None, ignore_no_follow: bool = False, # Cache options cache: bool = False, cache_ttl: Optional[int] = None, cache_clear: bool = False, # Content extraction content_formats: Optional[List[Literal['html', 'markdown', 'text', 'clean_html']]] = None, extraction_rules: Optional[Dict] = None, # Web scraping features asp: bool = False, proxy_pool: Optional[str] = None, country: Optional[str] = None, # Webhook integration webhook_name: Optional[str] = None, webhook_events: Optional[List[str]] = None, # Cost control max_api_credit: Optional[int] = None ): """ Initialize a CrawlerConfig Args: url: Starting URL for the crawl (required) page_limit: Maximum number of pages to crawl max_depth: Maximum crawl depth from starting URL max_duration: Maximum crawl duration in seconds exclude_paths: List of path patterns to exclude (mutually exclusive with include_only_paths) include_only_paths: List of path patterns to include only (mutually exclusive with exclude_paths) ignore_base_path_restriction: Allow crawling outside the base path follow_external_links: Follow links to external domains allowed_external_domains: List of external domains allowed when follow_external_links is True headers: Custom HTTP headers for requests delay: Delay between requests in milliseconds user_agent: Custom user agent string max_concurrency: Maximum concurrent requests rendering_delay: Delay for JavaScript rendering in milliseconds use_sitemaps: Use sitemap.xml to discover URLs respect_robots_txt: Respect robots.txt rules ignore_no_follow: Ignore rel="nofollow" attributes cache: Enable caching cache_ttl: Cache time-to-live in seconds cache_clear: Clear cache before crawling content_formats: List of content formats to extract ('html', 'markdown', 'text', 'clean_html') extraction_rules: Custom extraction rules asp: Enable Anti-Scraping Protection bypass proxy_pool: Proxy pool to use (e.g., 'public_residential_pool') country: Target country for geo-located content webhook_name: Webhook name for event notifications webhook_events: List of webhook events to trigger max_api_credit: Maximum API credits to spend on this crawl """ if exclude_paths and include_only_paths: raise ValueError("exclude_paths and include_only_paths are mutually exclusive") params = { 'url': url, } # Add optional parameters if page_limit is not None: params['page_limit'] = page_limit if max_depth is not None: params['max_depth'] = max_depth if max_duration is not None: params['max_duration'] = max_duration # Path filtering if exclude_paths: params['exclude_paths'] = exclude_paths if include_only_paths: params['include_only_paths'] = include_only_paths # Advanced options if ignore_base_path_restriction: params['ignore_base_path_restriction'] = True if follow_external_links: params['follow_external_links'] = True if allowed_external_domains: params['allowed_external_domains'] = allowed_external_domains # Subdomain control (NEW). Both fields are tri-state: None means # "unset" (server default applies); explicit True/False / list overrides. if follow_internal_subdomains is not None: params['follow_internal_subdomains'] = follow_internal_subdomains if allowed_internal_subdomains: params['allowed_internal_subdomains'] = allowed_internal_subdomains # Request configuration if headers: params['headers'] = headers if delay is not None: params['delay'] = delay if user_agent: params['user_agent'] = user_agent if max_concurrency is not None: params['max_concurrency'] = max_concurrency if rendering_delay is not None: params['rendering_delay'] = rendering_delay # Crawl strategy if use_sitemaps: params['use_sitemaps'] = True # Tri-state: None = let server default win (default True). Explicit # True/False overrides. if respect_robots_txt is not None: params['respect_robots_txt'] = respect_robots_txt if ignore_no_follow: params['ignore_no_follow'] = True # Cache if cache: params['cache'] = True if cache_ttl is not None: params['cache_ttl'] = cache_ttl if cache_clear: params['cache_clear'] = True # Content extraction if content_formats: params['content_formats'] = content_formats if extraction_rules: params['extraction_rules'] = extraction_rules # Web scraping features if asp: params['asp'] = True if proxy_pool: params['proxy_pool'] = proxy_pool if country: params['country'] = country # Webhooks if webhook_name: params['webhook_name'] = webhook_name if webhook_events: assert all( event in self.ALL_WEBHOOK_EVENTS for event in webhook_events ), f"Invalid webhook events. Valid events are: {self.ALL_WEBHOOK_EVENTS}" params['webhook_events'] = webhook_events # Cost control if max_api_credit is not None: params['max_api_credit'] = max_api_credit self._params = params def to_api_params(self, key: Optional[str] = None) -> Dict: """ Convert config to API parameters :param key: API key (optional, can be added by client) :return: Dictionary of API parameters """ params = self._params.copy() if key: params['key'] = key return paramsConfiguration for Scrapfly Crawler API
The Crawler API performs recursive website crawling with advanced configuration, content extraction, and artifact storage.
Example
from scrapfly import ScrapflyClient, CrawlerConfig client = ScrapflyClient(key='YOUR_API_KEY') config = CrawlerConfig( url='https://example.com', page_limit=100, max_depth=3, content_formats=['markdown', 'html'] ) # Start crawl start_response = client.start_crawl(config) uuid = start_response.uuid # Poll status status = client.get_crawl_status(uuid) # Get results when complete if status.is_complete: artifact = client.get_crawl_artifact(uuid) pages = artifact.get_pages()Initialize a CrawlerConfig
Args
url- Starting URL for the crawl (required)
page_limit- Maximum number of pages to crawl
max_depth- Maximum crawl depth from starting URL
max_duration- Maximum crawl duration in seconds
exclude_paths- List of path patterns to exclude (mutually exclusive with include_only_paths)
include_only_paths- List of path patterns to include only (mutually exclusive with exclude_paths)
ignore_base_path_restriction- Allow crawling outside the base path
follow_external_links- Follow links to external domains
allowed_external_domains- List of external domains allowed when follow_external_links is True
headers- Custom HTTP headers for requests
delay- Delay between requests in milliseconds
user_agent- Custom user agent string
max_concurrency- Maximum concurrent requests
rendering_delay- Delay for JavaScript rendering in milliseconds
use_sitemaps- Use sitemap.xml to discover URLs
respect_robots_txt- Respect robots.txt rules
ignore_no_follow- Ignore rel="nofollow" attributes
cache- Enable caching
cache_ttl- Cache time-to-live in seconds
cache_clear- Clear cache before crawling
content_formats- List of content formats to extract ('html', 'markdown', 'text', 'clean_html')
extraction_rules- Custom extraction rules
asp- Enable Anti-Scraping Protection bypass
proxy_pool- Proxy pool to use (e.g., 'public_residential_pool')
country- Target country for geo-located content
webhook_name- Webhook name for event notifications
webhook_events- List of webhook events to trigger
max_api_credit- Maximum API credits to spend on this crawl
Ancestors
Class variables
var ALL_WEBHOOK_EVENTSvar WEBHOOK_CRAWLER_CANCELLEDvar WEBHOOK_CRAWLER_FINISHEDvar WEBHOOK_CRAWLER_STARTEDvar WEBHOOK_CRAWLER_STOPPEDvar WEBHOOK_CRAWLER_URL_DISCOVEREDvar WEBHOOK_CRAWLER_URL_FAILEDvar WEBHOOK_CRAWLER_URL_SKIPPEDvar WEBHOOK_CRAWLER_URL_VISITED
Methods
def to_api_params(self, key: str | None = None) ‑> Dict-
Expand source code
def to_api_params(self, key: Optional[str] = None) -> Dict: """ Convert config to API parameters :param key: API key (optional, can be added by client) :return: Dictionary of API parameters """ params = self._params.copy() if key: params['key'] = key return paramsConvert config to API parameters
:param key: API key (optional, can be added by client) :return: Dictionary of API parameters
class CrawlerLifecycleWebhook (event: str,
crawler_uuid: str,
project: str,
env: str,
action: str,
state: CrawlerState,
seed_url: str,
status_link: str)-
Expand source code
@dataclass class CrawlerLifecycleWebhook(CrawlerWebhookBase): """ Payload for the 4 lifecycle events: ``crawler_started``, ``crawler_stopped``, ``crawler_cancelled``, ``crawler_finished``. These events all carry the same fields: the seed URL, the common base (crawler_uuid / project / env / action / state), and a ``links.status`` URL pointing at the crawl status endpoint. Disambiguate by inspecting ``self.event`` (use :class:`CrawlerWebhookEvent`). Attributes: seed_url: The root URL the crawl was started from. status_link: URL to fetch the live crawler status. """ seed_url: str status_link: str @classmethod def from_payload(cls, event: str, payload: Dict[str, Any]) -> 'CrawlerLifecycleWebhook': base = cls._parse_base(event, payload) return cls( **base, seed_url=payload['seed_url'], status_link=payload['links']['status'], )Payload for the 4 lifecycle events:
crawler_started,crawler_stopped,crawler_cancelled,crawler_finished.These events all carry the same fields: the seed URL, the common base (crawler_uuid / project / env / action / state), and a
links.statusURL pointing at the crawl status endpoint. Disambiguate by inspectingself.event(use :class:CrawlerWebhookEvent).Attributes
seed_url- The root URL the crawl was started from.
status_link- URL to fetch the live crawler status.
Ancestors
Static methods
def from_payload(event: str, payload: Dict[str, Any]) ‑> CrawlerLifecycleWebhook
Instance variables
var seed_url : strvar status_link : str
class CrawlerScrapeResult (status_code: int,
country: str,
log_uuid: str,
log_url: str,
content: Dict[str, Any])-
Expand source code
@dataclass class CrawlerScrapeResult: """ The ``scrape`` sub-object of a ``crawler_url_visited`` payload. Attributes: status_code: HTTP status code returned by the target URL. country: 2-letter country code of the proxy that performed the scrape. log_uuid: ULID of the scrape log (used to fetch the full log later). log_url: Human-browseable dashboard URL for the log. content: Map of requested content format (``html``, ``text``, ``markdown``, ``clean_html``, ``json``, etc.) to the actual rendered string. The keys depend on what the caller requested in ``content_formats``. """ status_code: int country: str log_uuid: str log_url: str content: Dict[str, Any] @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'CrawlerScrapeResult': return cls( status_code=data['status_code'], country=data['country'], log_uuid=data['log_uuid'], log_url=data['log_url'], content=data['content'], )The
scrapesub-object of acrawler_url_visitedpayload.Attributes
status_code- HTTP status code returned by the target URL.
country- 2-letter country code of the proxy that performed the scrape.
log_uuid- ULID of the scrape log (used to fetch the full log later).
log_url- Human-browseable dashboard URL for the log.
content- Map of requested content format (
html,text,markdown,clean_html,json, etc.) to the actual rendered string. The keys depend on what the caller requested incontent_formats.
Static methods
def from_dict(data: Dict[str, Any]) ‑> CrawlerScrapeResult
Instance variables
var content : Dict[str, Any]var country : strvar log_url : strvar log_uuid : strvar status_code : int
class CrawlerStartResponse (response_data: Dict[str, Any])-
Expand source code
class CrawlerStartResponse: """ Response from starting a crawler job Returned by ScrapflyClient.start_crawl() method. Strict parsing: ``uuid`` and ``status`` are part of the documented contract and are required. A missing field raises ``KeyError`` so the caller knows immediately that the API contract changed. Attributes: uuid: Unique identifier for the crawler job status: Initial status (typically 'PENDING') """ def __init__(self, response_data: Dict[str, Any]): """ Initialize from API response Args: response_data: Raw API response dictionary """ self._data = response_data # API canonical name is `crawler_uuid`; we accept `uuid` only as a # legacy fallback, in case an older server emits the short form. if 'crawler_uuid' in response_data: self.uuid = response_data['crawler_uuid'] elif 'uuid' in response_data: self.uuid = response_data['uuid'] else: raise KeyError( "CrawlerStartResponse: required field 'crawler_uuid' (or legacy 'uuid') is missing" ) self.status = response_data['status'] assert isinstance(self.uuid, str) and self.uuid, ( f"CrawlerStartResponse: uuid must be a non-empty string, got {self.uuid!r}" ) assert isinstance(self.status, str) and self.status, ( f"CrawlerStartResponse: status must be a non-empty string, got {self.status!r}" ) def __repr__(self): return f"CrawlerStartResponse(uuid={self.uuid}, status={self.status})"Response from starting a crawler job
Returned by ScrapflyClient.start_crawl() method.
Strict parsing:
uuidandstatusare part of the documented contract and are required. A missing field raisesKeyErrorso the caller knows immediately that the API contract changed.Attributes
uuid- Unique identifier for the crawler job
status- Initial status (typically 'PENDING')
Initialize from API response
Args
response_data- Raw API response dictionary
class CrawlerState (state: Dict[str, Any])-
Expand source code
class CrawlerState: """ Nested ``state`` block of a crawler status response. Field names match the wire format emitted by the scrape-engine (``apps/scrapfly/scrape-engine/scrape_engine/crawler/config.py``), which is the single source of truth. Go and TypeScript SDKs expose the same names on their ``status.state`` object. Attributes: urls_visited: Number of URLs successfully crawled. urls_extracted: Total URLs discovered (seed + links + sitemaps). urls_to_crawl: Derived as ``urls_extracted - urls_skipped`` server-side. urls_failed: URLs that failed to crawl. urls_skipped: URLs skipped (filtered by exclude rules, robots.txt, etc.). api_credit_used: Total API credits consumed by this crawl. duration: Elapsed time in seconds. start_time: Unix epoch seconds when the first worker picked up the job, or ``None`` while the job is still in ``PENDING``. stop_time: Unix epoch seconds when the crawler reached a terminal state, or ``None`` while still running. stop_reason: Reason for stop (``page_limit``, ``max_duration``, etc.), or ``None`` while still running. """ __slots__ = ( 'urls_visited', 'urls_extracted', 'urls_to_crawl', 'urls_failed', 'urls_skipped', 'api_credit_used', 'duration', 'start_time', 'stop_time', 'stop_reason', ) def __init__(self, state: Dict[str, Any]): assert isinstance(state, dict), ( f"CrawlerState: expected dict, got {type(state).__name__}" ) self.urls_visited: int = state['urls_visited'] self.urls_extracted: int = state['urls_extracted'] self.urls_to_crawl: int = state['urls_to_crawl'] self.urls_failed: int = state['urls_failed'] self.urls_skipped: int = state['urls_skipped'] self.api_credit_used = state['api_credit_used'] self.duration = state['duration'] # Nullable during PENDING — before a worker has picked up the job. self.start_time: Optional[int] = state.get('start_time') self.stop_time: Optional[int] = state.get('stop_time') self.stop_reason: Optional[str] = state.get('stop_reason') def __repr__(self): return ( f"CrawlerState(visited={self.urls_visited}, extracted={self.urls_extracted}, " f"to_crawl={self.urls_to_crawl}, failed={self.urls_failed}, " f"skipped={self.urls_skipped})" )Nested
stateblock of a crawler status response.Field names match the wire format emitted by the scrape-engine (
apps/scrapfly/scrape-engine/scrape_engine/crawler/config.py), which is the single source of truth. Go and TypeScript SDKs expose the same names on theirstatus.stateobject.Attributes
urls_visited- Number of URLs successfully crawled.
urls_extracted- Total URLs discovered (seed + links + sitemaps).
urls_to_crawl- Derived as
urls_extracted - urls_skippedserver-side. urls_failed- URLs that failed to crawl.
urls_skipped- URLs skipped (filtered by exclude rules, robots.txt, etc.).
api_credit_used- Total API credits consumed by this crawl.
duration- Elapsed time in seconds.
start_time- Unix epoch seconds when the first worker picked up the job,
or
Nonewhile the job is still inPENDING. stop_time- Unix epoch seconds when the crawler reached a terminal state,
or
Nonewhile still running. stop_reason- Reason for stop (
page_limit,max_duration, etc.), orNonewhile still running.
Instance variables
var api_credit_used-
Expand source code
class CrawlerState: """ Nested ``state`` block of a crawler status response. Field names match the wire format emitted by the scrape-engine (``apps/scrapfly/scrape-engine/scrape_engine/crawler/config.py``), which is the single source of truth. Go and TypeScript SDKs expose the same names on their ``status.state`` object. Attributes: urls_visited: Number of URLs successfully crawled. urls_extracted: Total URLs discovered (seed + links + sitemaps). urls_to_crawl: Derived as ``urls_extracted - urls_skipped`` server-side. urls_failed: URLs that failed to crawl. urls_skipped: URLs skipped (filtered by exclude rules, robots.txt, etc.). api_credit_used: Total API credits consumed by this crawl. duration: Elapsed time in seconds. start_time: Unix epoch seconds when the first worker picked up the job, or ``None`` while the job is still in ``PENDING``. stop_time: Unix epoch seconds when the crawler reached a terminal state, or ``None`` while still running. stop_reason: Reason for stop (``page_limit``, ``max_duration``, etc.), or ``None`` while still running. """ __slots__ = ( 'urls_visited', 'urls_extracted', 'urls_to_crawl', 'urls_failed', 'urls_skipped', 'api_credit_used', 'duration', 'start_time', 'stop_time', 'stop_reason', ) def __init__(self, state: Dict[str, Any]): assert isinstance(state, dict), ( f"CrawlerState: expected dict, got {type(state).__name__}" ) self.urls_visited: int = state['urls_visited'] self.urls_extracted: int = state['urls_extracted'] self.urls_to_crawl: int = state['urls_to_crawl'] self.urls_failed: int = state['urls_failed'] self.urls_skipped: int = state['urls_skipped'] self.api_credit_used = state['api_credit_used'] self.duration = state['duration'] # Nullable during PENDING — before a worker has picked up the job. self.start_time: Optional[int] = state.get('start_time') self.stop_time: Optional[int] = state.get('stop_time') self.stop_reason: Optional[str] = state.get('stop_reason') def __repr__(self): return ( f"CrawlerState(visited={self.urls_visited}, extracted={self.urls_extracted}, " f"to_crawl={self.urls_to_crawl}, failed={self.urls_failed}, " f"skipped={self.urls_skipped})" ) var duration-
Expand source code
class CrawlerState: """ Nested ``state`` block of a crawler status response. Field names match the wire format emitted by the scrape-engine (``apps/scrapfly/scrape-engine/scrape_engine/crawler/config.py``), which is the single source of truth. Go and TypeScript SDKs expose the same names on their ``status.state`` object. Attributes: urls_visited: Number of URLs successfully crawled. urls_extracted: Total URLs discovered (seed + links + sitemaps). urls_to_crawl: Derived as ``urls_extracted - urls_skipped`` server-side. urls_failed: URLs that failed to crawl. urls_skipped: URLs skipped (filtered by exclude rules, robots.txt, etc.). api_credit_used: Total API credits consumed by this crawl. duration: Elapsed time in seconds. start_time: Unix epoch seconds when the first worker picked up the job, or ``None`` while the job is still in ``PENDING``. stop_time: Unix epoch seconds when the crawler reached a terminal state, or ``None`` while still running. stop_reason: Reason for stop (``page_limit``, ``max_duration``, etc.), or ``None`` while still running. """ __slots__ = ( 'urls_visited', 'urls_extracted', 'urls_to_crawl', 'urls_failed', 'urls_skipped', 'api_credit_used', 'duration', 'start_time', 'stop_time', 'stop_reason', ) def __init__(self, state: Dict[str, Any]): assert isinstance(state, dict), ( f"CrawlerState: expected dict, got {type(state).__name__}" ) self.urls_visited: int = state['urls_visited'] self.urls_extracted: int = state['urls_extracted'] self.urls_to_crawl: int = state['urls_to_crawl'] self.urls_failed: int = state['urls_failed'] self.urls_skipped: int = state['urls_skipped'] self.api_credit_used = state['api_credit_used'] self.duration = state['duration'] # Nullable during PENDING — before a worker has picked up the job. self.start_time: Optional[int] = state.get('start_time') self.stop_time: Optional[int] = state.get('stop_time') self.stop_reason: Optional[str] = state.get('stop_reason') def __repr__(self): return ( f"CrawlerState(visited={self.urls_visited}, extracted={self.urls_extracted}, " f"to_crawl={self.urls_to_crawl}, failed={self.urls_failed}, " f"skipped={self.urls_skipped})" ) var start_time-
Expand source code
class CrawlerState: """ Nested ``state`` block of a crawler status response. Field names match the wire format emitted by the scrape-engine (``apps/scrapfly/scrape-engine/scrape_engine/crawler/config.py``), which is the single source of truth. Go and TypeScript SDKs expose the same names on their ``status.state`` object. Attributes: urls_visited: Number of URLs successfully crawled. urls_extracted: Total URLs discovered (seed + links + sitemaps). urls_to_crawl: Derived as ``urls_extracted - urls_skipped`` server-side. urls_failed: URLs that failed to crawl. urls_skipped: URLs skipped (filtered by exclude rules, robots.txt, etc.). api_credit_used: Total API credits consumed by this crawl. duration: Elapsed time in seconds. start_time: Unix epoch seconds when the first worker picked up the job, or ``None`` while the job is still in ``PENDING``. stop_time: Unix epoch seconds when the crawler reached a terminal state, or ``None`` while still running. stop_reason: Reason for stop (``page_limit``, ``max_duration``, etc.), or ``None`` while still running. """ __slots__ = ( 'urls_visited', 'urls_extracted', 'urls_to_crawl', 'urls_failed', 'urls_skipped', 'api_credit_used', 'duration', 'start_time', 'stop_time', 'stop_reason', ) def __init__(self, state: Dict[str, Any]): assert isinstance(state, dict), ( f"CrawlerState: expected dict, got {type(state).__name__}" ) self.urls_visited: int = state['urls_visited'] self.urls_extracted: int = state['urls_extracted'] self.urls_to_crawl: int = state['urls_to_crawl'] self.urls_failed: int = state['urls_failed'] self.urls_skipped: int = state['urls_skipped'] self.api_credit_used = state['api_credit_used'] self.duration = state['duration'] # Nullable during PENDING — before a worker has picked up the job. self.start_time: Optional[int] = state.get('start_time') self.stop_time: Optional[int] = state.get('stop_time') self.stop_reason: Optional[str] = state.get('stop_reason') def __repr__(self): return ( f"CrawlerState(visited={self.urls_visited}, extracted={self.urls_extracted}, " f"to_crawl={self.urls_to_crawl}, failed={self.urls_failed}, " f"skipped={self.urls_skipped})" ) var stop_reason-
Expand source code
class CrawlerState: """ Nested ``state`` block of a crawler status response. Field names match the wire format emitted by the scrape-engine (``apps/scrapfly/scrape-engine/scrape_engine/crawler/config.py``), which is the single source of truth. Go and TypeScript SDKs expose the same names on their ``status.state`` object. Attributes: urls_visited: Number of URLs successfully crawled. urls_extracted: Total URLs discovered (seed + links + sitemaps). urls_to_crawl: Derived as ``urls_extracted - urls_skipped`` server-side. urls_failed: URLs that failed to crawl. urls_skipped: URLs skipped (filtered by exclude rules, robots.txt, etc.). api_credit_used: Total API credits consumed by this crawl. duration: Elapsed time in seconds. start_time: Unix epoch seconds when the first worker picked up the job, or ``None`` while the job is still in ``PENDING``. stop_time: Unix epoch seconds when the crawler reached a terminal state, or ``None`` while still running. stop_reason: Reason for stop (``page_limit``, ``max_duration``, etc.), or ``None`` while still running. """ __slots__ = ( 'urls_visited', 'urls_extracted', 'urls_to_crawl', 'urls_failed', 'urls_skipped', 'api_credit_used', 'duration', 'start_time', 'stop_time', 'stop_reason', ) def __init__(self, state: Dict[str, Any]): assert isinstance(state, dict), ( f"CrawlerState: expected dict, got {type(state).__name__}" ) self.urls_visited: int = state['urls_visited'] self.urls_extracted: int = state['urls_extracted'] self.urls_to_crawl: int = state['urls_to_crawl'] self.urls_failed: int = state['urls_failed'] self.urls_skipped: int = state['urls_skipped'] self.api_credit_used = state['api_credit_used'] self.duration = state['duration'] # Nullable during PENDING — before a worker has picked up the job. self.start_time: Optional[int] = state.get('start_time') self.stop_time: Optional[int] = state.get('stop_time') self.stop_reason: Optional[str] = state.get('stop_reason') def __repr__(self): return ( f"CrawlerState(visited={self.urls_visited}, extracted={self.urls_extracted}, " f"to_crawl={self.urls_to_crawl}, failed={self.urls_failed}, " f"skipped={self.urls_skipped})" ) var stop_time-
Expand source code
class CrawlerState: """ Nested ``state`` block of a crawler status response. Field names match the wire format emitted by the scrape-engine (``apps/scrapfly/scrape-engine/scrape_engine/crawler/config.py``), which is the single source of truth. Go and TypeScript SDKs expose the same names on their ``status.state`` object. Attributes: urls_visited: Number of URLs successfully crawled. urls_extracted: Total URLs discovered (seed + links + sitemaps). urls_to_crawl: Derived as ``urls_extracted - urls_skipped`` server-side. urls_failed: URLs that failed to crawl. urls_skipped: URLs skipped (filtered by exclude rules, robots.txt, etc.). api_credit_used: Total API credits consumed by this crawl. duration: Elapsed time in seconds. start_time: Unix epoch seconds when the first worker picked up the job, or ``None`` while the job is still in ``PENDING``. stop_time: Unix epoch seconds when the crawler reached a terminal state, or ``None`` while still running. stop_reason: Reason for stop (``page_limit``, ``max_duration``, etc.), or ``None`` while still running. """ __slots__ = ( 'urls_visited', 'urls_extracted', 'urls_to_crawl', 'urls_failed', 'urls_skipped', 'api_credit_used', 'duration', 'start_time', 'stop_time', 'stop_reason', ) def __init__(self, state: Dict[str, Any]): assert isinstance(state, dict), ( f"CrawlerState: expected dict, got {type(state).__name__}" ) self.urls_visited: int = state['urls_visited'] self.urls_extracted: int = state['urls_extracted'] self.urls_to_crawl: int = state['urls_to_crawl'] self.urls_failed: int = state['urls_failed'] self.urls_skipped: int = state['urls_skipped'] self.api_credit_used = state['api_credit_used'] self.duration = state['duration'] # Nullable during PENDING — before a worker has picked up the job. self.start_time: Optional[int] = state.get('start_time') self.stop_time: Optional[int] = state.get('stop_time') self.stop_reason: Optional[str] = state.get('stop_reason') def __repr__(self): return ( f"CrawlerState(visited={self.urls_visited}, extracted={self.urls_extracted}, " f"to_crawl={self.urls_to_crawl}, failed={self.urls_failed}, " f"skipped={self.urls_skipped})" ) var urls_extracted-
Expand source code
class CrawlerState: """ Nested ``state`` block of a crawler status response. Field names match the wire format emitted by the scrape-engine (``apps/scrapfly/scrape-engine/scrape_engine/crawler/config.py``), which is the single source of truth. Go and TypeScript SDKs expose the same names on their ``status.state`` object. Attributes: urls_visited: Number of URLs successfully crawled. urls_extracted: Total URLs discovered (seed + links + sitemaps). urls_to_crawl: Derived as ``urls_extracted - urls_skipped`` server-side. urls_failed: URLs that failed to crawl. urls_skipped: URLs skipped (filtered by exclude rules, robots.txt, etc.). api_credit_used: Total API credits consumed by this crawl. duration: Elapsed time in seconds. start_time: Unix epoch seconds when the first worker picked up the job, or ``None`` while the job is still in ``PENDING``. stop_time: Unix epoch seconds when the crawler reached a terminal state, or ``None`` while still running. stop_reason: Reason for stop (``page_limit``, ``max_duration``, etc.), or ``None`` while still running. """ __slots__ = ( 'urls_visited', 'urls_extracted', 'urls_to_crawl', 'urls_failed', 'urls_skipped', 'api_credit_used', 'duration', 'start_time', 'stop_time', 'stop_reason', ) def __init__(self, state: Dict[str, Any]): assert isinstance(state, dict), ( f"CrawlerState: expected dict, got {type(state).__name__}" ) self.urls_visited: int = state['urls_visited'] self.urls_extracted: int = state['urls_extracted'] self.urls_to_crawl: int = state['urls_to_crawl'] self.urls_failed: int = state['urls_failed'] self.urls_skipped: int = state['urls_skipped'] self.api_credit_used = state['api_credit_used'] self.duration = state['duration'] # Nullable during PENDING — before a worker has picked up the job. self.start_time: Optional[int] = state.get('start_time') self.stop_time: Optional[int] = state.get('stop_time') self.stop_reason: Optional[str] = state.get('stop_reason') def __repr__(self): return ( f"CrawlerState(visited={self.urls_visited}, extracted={self.urls_extracted}, " f"to_crawl={self.urls_to_crawl}, failed={self.urls_failed}, " f"skipped={self.urls_skipped})" ) var urls_failed-
Expand source code
class CrawlerState: """ Nested ``state`` block of a crawler status response. Field names match the wire format emitted by the scrape-engine (``apps/scrapfly/scrape-engine/scrape_engine/crawler/config.py``), which is the single source of truth. Go and TypeScript SDKs expose the same names on their ``status.state`` object. Attributes: urls_visited: Number of URLs successfully crawled. urls_extracted: Total URLs discovered (seed + links + sitemaps). urls_to_crawl: Derived as ``urls_extracted - urls_skipped`` server-side. urls_failed: URLs that failed to crawl. urls_skipped: URLs skipped (filtered by exclude rules, robots.txt, etc.). api_credit_used: Total API credits consumed by this crawl. duration: Elapsed time in seconds. start_time: Unix epoch seconds when the first worker picked up the job, or ``None`` while the job is still in ``PENDING``. stop_time: Unix epoch seconds when the crawler reached a terminal state, or ``None`` while still running. stop_reason: Reason for stop (``page_limit``, ``max_duration``, etc.), or ``None`` while still running. """ __slots__ = ( 'urls_visited', 'urls_extracted', 'urls_to_crawl', 'urls_failed', 'urls_skipped', 'api_credit_used', 'duration', 'start_time', 'stop_time', 'stop_reason', ) def __init__(self, state: Dict[str, Any]): assert isinstance(state, dict), ( f"CrawlerState: expected dict, got {type(state).__name__}" ) self.urls_visited: int = state['urls_visited'] self.urls_extracted: int = state['urls_extracted'] self.urls_to_crawl: int = state['urls_to_crawl'] self.urls_failed: int = state['urls_failed'] self.urls_skipped: int = state['urls_skipped'] self.api_credit_used = state['api_credit_used'] self.duration = state['duration'] # Nullable during PENDING — before a worker has picked up the job. self.start_time: Optional[int] = state.get('start_time') self.stop_time: Optional[int] = state.get('stop_time') self.stop_reason: Optional[str] = state.get('stop_reason') def __repr__(self): return ( f"CrawlerState(visited={self.urls_visited}, extracted={self.urls_extracted}, " f"to_crawl={self.urls_to_crawl}, failed={self.urls_failed}, " f"skipped={self.urls_skipped})" ) var urls_skipped-
Expand source code
class CrawlerState: """ Nested ``state`` block of a crawler status response. Field names match the wire format emitted by the scrape-engine (``apps/scrapfly/scrape-engine/scrape_engine/crawler/config.py``), which is the single source of truth. Go and TypeScript SDKs expose the same names on their ``status.state`` object. Attributes: urls_visited: Number of URLs successfully crawled. urls_extracted: Total URLs discovered (seed + links + sitemaps). urls_to_crawl: Derived as ``urls_extracted - urls_skipped`` server-side. urls_failed: URLs that failed to crawl. urls_skipped: URLs skipped (filtered by exclude rules, robots.txt, etc.). api_credit_used: Total API credits consumed by this crawl. duration: Elapsed time in seconds. start_time: Unix epoch seconds when the first worker picked up the job, or ``None`` while the job is still in ``PENDING``. stop_time: Unix epoch seconds when the crawler reached a terminal state, or ``None`` while still running. stop_reason: Reason for stop (``page_limit``, ``max_duration``, etc.), or ``None`` while still running. """ __slots__ = ( 'urls_visited', 'urls_extracted', 'urls_to_crawl', 'urls_failed', 'urls_skipped', 'api_credit_used', 'duration', 'start_time', 'stop_time', 'stop_reason', ) def __init__(self, state: Dict[str, Any]): assert isinstance(state, dict), ( f"CrawlerState: expected dict, got {type(state).__name__}" ) self.urls_visited: int = state['urls_visited'] self.urls_extracted: int = state['urls_extracted'] self.urls_to_crawl: int = state['urls_to_crawl'] self.urls_failed: int = state['urls_failed'] self.urls_skipped: int = state['urls_skipped'] self.api_credit_used = state['api_credit_used'] self.duration = state['duration'] # Nullable during PENDING — before a worker has picked up the job. self.start_time: Optional[int] = state.get('start_time') self.stop_time: Optional[int] = state.get('stop_time') self.stop_reason: Optional[str] = state.get('stop_reason') def __repr__(self): return ( f"CrawlerState(visited={self.urls_visited}, extracted={self.urls_extracted}, " f"to_crawl={self.urls_to_crawl}, failed={self.urls_failed}, " f"skipped={self.urls_skipped})" ) var urls_to_crawl-
Expand source code
class CrawlerState: """ Nested ``state`` block of a crawler status response. Field names match the wire format emitted by the scrape-engine (``apps/scrapfly/scrape-engine/scrape_engine/crawler/config.py``), which is the single source of truth. Go and TypeScript SDKs expose the same names on their ``status.state`` object. Attributes: urls_visited: Number of URLs successfully crawled. urls_extracted: Total URLs discovered (seed + links + sitemaps). urls_to_crawl: Derived as ``urls_extracted - urls_skipped`` server-side. urls_failed: URLs that failed to crawl. urls_skipped: URLs skipped (filtered by exclude rules, robots.txt, etc.). api_credit_used: Total API credits consumed by this crawl. duration: Elapsed time in seconds. start_time: Unix epoch seconds when the first worker picked up the job, or ``None`` while the job is still in ``PENDING``. stop_time: Unix epoch seconds when the crawler reached a terminal state, or ``None`` while still running. stop_reason: Reason for stop (``page_limit``, ``max_duration``, etc.), or ``None`` while still running. """ __slots__ = ( 'urls_visited', 'urls_extracted', 'urls_to_crawl', 'urls_failed', 'urls_skipped', 'api_credit_used', 'duration', 'start_time', 'stop_time', 'stop_reason', ) def __init__(self, state: Dict[str, Any]): assert isinstance(state, dict), ( f"CrawlerState: expected dict, got {type(state).__name__}" ) self.urls_visited: int = state['urls_visited'] self.urls_extracted: int = state['urls_extracted'] self.urls_to_crawl: int = state['urls_to_crawl'] self.urls_failed: int = state['urls_failed'] self.urls_skipped: int = state['urls_skipped'] self.api_credit_used = state['api_credit_used'] self.duration = state['duration'] # Nullable during PENDING — before a worker has picked up the job. self.start_time: Optional[int] = state.get('start_time') self.stop_time: Optional[int] = state.get('stop_time') self.stop_reason: Optional[str] = state.get('stop_reason') def __repr__(self): return ( f"CrawlerState(visited={self.urls_visited}, extracted={self.urls_extracted}, " f"to_crawl={self.urls_to_crawl}, failed={self.urls_failed}, " f"skipped={self.urls_skipped})" ) var urls_visited-
Expand source code
class CrawlerState: """ Nested ``state`` block of a crawler status response. Field names match the wire format emitted by the scrape-engine (``apps/scrapfly/scrape-engine/scrape_engine/crawler/config.py``), which is the single source of truth. Go and TypeScript SDKs expose the same names on their ``status.state`` object. Attributes: urls_visited: Number of URLs successfully crawled. urls_extracted: Total URLs discovered (seed + links + sitemaps). urls_to_crawl: Derived as ``urls_extracted - urls_skipped`` server-side. urls_failed: URLs that failed to crawl. urls_skipped: URLs skipped (filtered by exclude rules, robots.txt, etc.). api_credit_used: Total API credits consumed by this crawl. duration: Elapsed time in seconds. start_time: Unix epoch seconds when the first worker picked up the job, or ``None`` while the job is still in ``PENDING``. stop_time: Unix epoch seconds when the crawler reached a terminal state, or ``None`` while still running. stop_reason: Reason for stop (``page_limit``, ``max_duration``, etc.), or ``None`` while still running. """ __slots__ = ( 'urls_visited', 'urls_extracted', 'urls_to_crawl', 'urls_failed', 'urls_skipped', 'api_credit_used', 'duration', 'start_time', 'stop_time', 'stop_reason', ) def __init__(self, state: Dict[str, Any]): assert isinstance(state, dict), ( f"CrawlerState: expected dict, got {type(state).__name__}" ) self.urls_visited: int = state['urls_visited'] self.urls_extracted: int = state['urls_extracted'] self.urls_to_crawl: int = state['urls_to_crawl'] self.urls_failed: int = state['urls_failed'] self.urls_skipped: int = state['urls_skipped'] self.api_credit_used = state['api_credit_used'] self.duration = state['duration'] # Nullable during PENDING — before a worker has picked up the job. self.start_time: Optional[int] = state.get('start_time') self.stop_time: Optional[int] = state.get('stop_time') self.stop_reason: Optional[str] = state.get('stop_reason') def __repr__(self): return ( f"CrawlerState(visited={self.urls_visited}, extracted={self.urls_extracted}, " f"to_crawl={self.urls_to_crawl}, failed={self.urls_failed}, " f"skipped={self.urls_skipped})" )
class CrawlerStatusResponse (response_data: Dict[str, Any])-
Expand source code
class CrawlerStatusResponse: """ Response from checking crawler job status. Returned by :py:meth:`ScrapflyClient.get_crawl_status`. Provides real-time progress tracking for crawler jobs. **Field names match the wire format.** The scrape-engine is the source of truth; the Go and TypeScript SDKs expose identical names. Access state counters via the nested ``state`` attribute: >>> status.state.urls_visited 12 >>> status.state.urls_extracted 34 Attributes: uuid: Crawler job UUID. status: Current status (``PENDING``, ``RUNNING``, ``DONE``, ``CANCELLED``). is_success: Whether the crawler job completed successfully (``None`` while running). is_finished: Whether the crawler job has finished (regardless of success/failure). state: :class:`CrawlerState` — all the per-crawl counters and timings. """ # Status constants STATUS_PENDING = 'PENDING' STATUS_RUNNING = 'RUNNING' STATUS_DONE = 'DONE' STATUS_CANCELLED = 'CANCELLED' def __init__(self, response_data: Dict[str, Any]): """ Initialize from API response. Strict parsing: required fields (``crawler_uuid``, ``status``, ``is_success``, ``is_finished``, and the documented ``state.*`` metrics) are read with direct access so missing keys raise ``KeyError`` at parse time. This catches API contract drift loud and early. Args: response_data: Raw API response dictionary. """ self._data = response_data # Identification — accept legacy `uuid` only as fallback. if 'crawler_uuid' in response_data: self.uuid = response_data['crawler_uuid'] elif 'uuid' in response_data: self.uuid = response_data['uuid'] else: raise KeyError( "CrawlerStatusResponse: required field 'crawler_uuid' (or legacy 'uuid') is missing" ) self.status = response_data['status'] # `is_success` may legitimately be `null` while still running. self.is_success = response_data['is_success'] self.is_finished = response_data['is_finished'] assert isinstance(self.uuid, str) and self.uuid, ( f"CrawlerStatusResponse: uuid must be a non-empty string, got {self.uuid!r}" ) assert isinstance(self.status, str) and self.status, ( f"CrawlerStatusResponse: status must be a non-empty string, got {self.status!r}" ) assert isinstance(self.is_finished, bool), ( f"CrawlerStatusResponse: is_finished must be bool, got {type(self.is_finished).__name__}" ) assert self.is_success is None or isinstance(self.is_success, bool), ( f"CrawlerStatusResponse: is_success must be bool or None, got {type(self.is_success).__name__}" ) # Nested state — canonical shape matching Go / TS SDKs. self.state = CrawlerState(response_data['state']) @property def is_complete(self) -> bool: """Whether the crawler reached DONE with is_success=True.""" return self.status == self.STATUS_DONE and self.is_success is True @property def is_running(self) -> bool: """Whether the crawler is currently PENDING or RUNNING.""" return self.status in (self.STATUS_PENDING, self.STATUS_RUNNING) @property def is_failed(self) -> bool: """Whether the crawler reached DONE with is_success=False.""" return self.status == self.STATUS_DONE and self.is_success is False @property def is_cancelled(self) -> bool: """Whether the crawler was cancelled.""" return self.status == self.STATUS_CANCELLED @property def progress_pct(self) -> float: """ Visited/extracted ratio as a percentage (0-100). Returns 0.0 when no URLs have been extracted yet. """ if self.state.urls_extracted == 0: return 0.0 return (self.state.urls_visited / self.state.urls_extracted) * 100 def __repr__(self): return (f"CrawlerStatusResponse(uuid={self.uuid}, status={self.status}, " f"progress={self.progress_pct:.1f}%, " f"visited={self.state.urls_visited}/{self.state.urls_extracted})")Response from checking crawler job status.
Returned by :py:meth:
ScrapflyClient.get_crawl_status. Provides real-time progress tracking for crawler jobs.Field names match the wire format. The scrape-engine is the source of truth; the Go and TypeScript SDKs expose identical names. Access state counters via the nested
stateattribute:>>> status.state.urls_visited 12 >>> status.state.urls_extracted 34Attributes
uuid- Crawler job UUID.
status- Current status (
PENDING,RUNNING,DONE,CANCELLED). is_success- Whether the crawler job completed successfully (
Nonewhile running). is_finished- Whether the crawler job has finished (regardless of success/failure).
state- :class:
CrawlerState— all the per-crawl counters and timings.
Initialize from API response.
Strict parsing: required fields (
crawler_uuid,status,is_success,is_finished, and the documentedstate.*metrics) are read with direct access so missing keys raiseKeyErrorat parse time. This catches API contract drift loud and early.Args
response_data- Raw API response dictionary.
Class variables
var STATUS_CANCELLEDvar STATUS_DONEvar STATUS_PENDINGvar STATUS_RUNNING
Instance variables
prop is_cancelled : bool-
Expand source code
@property def is_cancelled(self) -> bool: """Whether the crawler was cancelled.""" return self.status == self.STATUS_CANCELLEDWhether the crawler was cancelled.
prop is_complete : bool-
Expand source code
@property def is_complete(self) -> bool: """Whether the crawler reached DONE with is_success=True.""" return self.status == self.STATUS_DONE and self.is_success is TrueWhether the crawler reached DONE with is_success=True.
prop is_failed : bool-
Expand source code
@property def is_failed(self) -> bool: """Whether the crawler reached DONE with is_success=False.""" return self.status == self.STATUS_DONE and self.is_success is FalseWhether the crawler reached DONE with is_success=False.
prop is_running : bool-
Expand source code
@property def is_running(self) -> bool: """Whether the crawler is currently PENDING or RUNNING.""" return self.status in (self.STATUS_PENDING, self.STATUS_RUNNING)Whether the crawler is currently PENDING or RUNNING.
prop progress_pct : float-
Expand source code
@property def progress_pct(self) -> float: """ Visited/extracted ratio as a percentage (0-100). Returns 0.0 when no URLs have been extracted yet. """ if self.state.urls_extracted == 0: return 0.0 return (self.state.urls_visited / self.state.urls_extracted) * 100Visited/extracted ratio as a percentage (0-100).
Returns 0.0 when no URLs have been extracted yet.
class CrawlerUrlDiscoveredWebhook (event: str,
crawler_uuid: str,
project: str,
env: str,
action: str,
state: CrawlerState,
origin: str,
discovered_urls: List[str])-
Expand source code
@dataclass class CrawlerUrlDiscoveredWebhook(CrawlerWebhookBase): """ Payload for the ``crawler_url_discovered`` event. Emitted when the crawler extracts one or more new URLs from a source. Attributes: origin: How the URLs were discovered (e.g. ``"navigation"``, ``"sitemap"``). discovered_urls: The newly-discovered URLs as a list. """ origin: str discovered_urls: List[str] @classmethod def from_payload(cls, event: str, payload: Dict[str, Any]) -> 'CrawlerUrlDiscoveredWebhook': base = cls._parse_base(event, payload) return cls( **base, origin=payload['origin'], discovered_urls=payload['discovered_urls'], )Payload for the
crawler_url_discoveredevent.Emitted when the crawler extracts one or more new URLs from a source.
Attributes
origin- How the URLs were discovered (e.g.
"navigation","sitemap"). discovered_urls- The newly-discovered URLs as a list.
Ancestors
Static methods
def from_payload(event: str, payload: Dict[str, Any]) ‑> CrawlerUrlDiscoveredWebhook
Instance variables
var discovered_urls : List[str]var origin : str
class CrawlerUrlEntry (url: str, status: str, reason: str | None = None)-
Expand source code
class CrawlerUrlEntry: """ Single URL entry from ``GET /crawl/{uuid}/urls``. The endpoint streams one record per line as ``text/plain``. For ``visited`` and ``pending`` URLs each line is just the URL; for ``failed`` or ``skipped`` URLs the line is ``url,reason``. Streaming text is used because this endpoint is expected to scale to millions of records per job — JSON is not a suitable wire format at that volume. Attributes: url: The crawled URL status: The filter status used by the caller (``visited``, ``pending``, ``failed`` or ``skipped``). Echoed from the request parameter so downstream code can disambiguate mixed buffers. reason: Only set for ``failed`` / ``skipped`` URLs; ``None`` otherwise. """ __slots__ = ('url', 'status', 'reason') def __init__(self, url: str, status: str, reason: Optional[str] = None): assert isinstance(url, str) and url, ( f"CrawlerUrlEntry: url must be a non-empty string, got {url!r}" ) assert isinstance(status, str) and status, ( f"CrawlerUrlEntry: status must be a non-empty string, got {status!r}" ) self.url = url self.status = status self.reason = reason def __repr__(self): if self.reason is not None: return f"CrawlerUrlEntry(url={self.url!r}, status={self.status!r}, reason={self.reason!r})" return f"CrawlerUrlEntry(url={self.url!r}, status={self.status!r})"Single URL entry from
GET /crawl/{uuid}/urls.The endpoint streams one record per line as
text/plain. ForvisitedandpendingURLs each line is just the URL; forfailedorskippedURLs the line isurl,reason. Streaming text is used because this endpoint is expected to scale to millions of records per job — JSON is not a suitable wire format at that volume.Attributes
url- The crawled URL
status- The filter status used by the caller (
visited,pending,failedorskipped). Echoed from the request parameter so downstream code can disambiguate mixed buffers. reason- Only set for
failed/skippedURLs;Noneotherwise.
Instance variables
var reason-
Expand source code
class CrawlerUrlEntry: """ Single URL entry from ``GET /crawl/{uuid}/urls``. The endpoint streams one record per line as ``text/plain``. For ``visited`` and ``pending`` URLs each line is just the URL; for ``failed`` or ``skipped`` URLs the line is ``url,reason``. Streaming text is used because this endpoint is expected to scale to millions of records per job — JSON is not a suitable wire format at that volume. Attributes: url: The crawled URL status: The filter status used by the caller (``visited``, ``pending``, ``failed`` or ``skipped``). Echoed from the request parameter so downstream code can disambiguate mixed buffers. reason: Only set for ``failed`` / ``skipped`` URLs; ``None`` otherwise. """ __slots__ = ('url', 'status', 'reason') def __init__(self, url: str, status: str, reason: Optional[str] = None): assert isinstance(url, str) and url, ( f"CrawlerUrlEntry: url must be a non-empty string, got {url!r}" ) assert isinstance(status, str) and status, ( f"CrawlerUrlEntry: status must be a non-empty string, got {status!r}" ) self.url = url self.status = status self.reason = reason def __repr__(self): if self.reason is not None: return f"CrawlerUrlEntry(url={self.url!r}, status={self.status!r}, reason={self.reason!r})" return f"CrawlerUrlEntry(url={self.url!r}, status={self.status!r})" var status-
Expand source code
class CrawlerUrlEntry: """ Single URL entry from ``GET /crawl/{uuid}/urls``. The endpoint streams one record per line as ``text/plain``. For ``visited`` and ``pending`` URLs each line is just the URL; for ``failed`` or ``skipped`` URLs the line is ``url,reason``. Streaming text is used because this endpoint is expected to scale to millions of records per job — JSON is not a suitable wire format at that volume. Attributes: url: The crawled URL status: The filter status used by the caller (``visited``, ``pending``, ``failed`` or ``skipped``). Echoed from the request parameter so downstream code can disambiguate mixed buffers. reason: Only set for ``failed`` / ``skipped`` URLs; ``None`` otherwise. """ __slots__ = ('url', 'status', 'reason') def __init__(self, url: str, status: str, reason: Optional[str] = None): assert isinstance(url, str) and url, ( f"CrawlerUrlEntry: url must be a non-empty string, got {url!r}" ) assert isinstance(status, str) and status, ( f"CrawlerUrlEntry: status must be a non-empty string, got {status!r}" ) self.url = url self.status = status self.reason = reason def __repr__(self): if self.reason is not None: return f"CrawlerUrlEntry(url={self.url!r}, status={self.status!r}, reason={self.reason!r})" return f"CrawlerUrlEntry(url={self.url!r}, status={self.status!r})" var url-
Expand source code
class CrawlerUrlEntry: """ Single URL entry from ``GET /crawl/{uuid}/urls``. The endpoint streams one record per line as ``text/plain``. For ``visited`` and ``pending`` URLs each line is just the URL; for ``failed`` or ``skipped`` URLs the line is ``url,reason``. Streaming text is used because this endpoint is expected to scale to millions of records per job — JSON is not a suitable wire format at that volume. Attributes: url: The crawled URL status: The filter status used by the caller (``visited``, ``pending``, ``failed`` or ``skipped``). Echoed from the request parameter so downstream code can disambiguate mixed buffers. reason: Only set for ``failed`` / ``skipped`` URLs; ``None`` otherwise. """ __slots__ = ('url', 'status', 'reason') def __init__(self, url: str, status: str, reason: Optional[str] = None): assert isinstance(url, str) and url, ( f"CrawlerUrlEntry: url must be a non-empty string, got {url!r}" ) assert isinstance(status, str) and status, ( f"CrawlerUrlEntry: status must be a non-empty string, got {status!r}" ) self.url = url self.status = status self.reason = reason def __repr__(self): if self.reason is not None: return f"CrawlerUrlEntry(url={self.url!r}, status={self.status!r}, reason={self.reason!r})" return f"CrawlerUrlEntry(url={self.url!r}, status={self.status!r})"
class CrawlerUrlFailedWebhook (event: str,
crawler_uuid: str,
project: str,
env: str,
action: str,
state: CrawlerState,
url: str,
error: str,
scrape_config: Dict[str, Any],
log_link: str | None,
scrape_link: str)-
Expand source code
@dataclass class CrawlerUrlFailedWebhook(CrawlerWebhookBase): """ Payload for the ``crawler_url_failed`` event. Emitted when a URL cannot be crawled (network error, scrape error, blocked, etc.). Attributes: url: The URL that failed. error: The scrapfly error code (e.g. ``ERR::SCRAPE::NETWORK_ERROR``). scrape_config: The scrape config that was used for the failed attempt. log_link: URL to the full scrape log for this failure. Can be ``None`` — the scrape-engine emits ``null`` when no log was recorded (e.g. the failure happened before the request was ever executed). See ``scrape_engine/crawler/webhook_manager.py::dispatch_url_failed`` line 57. scrape_link: URL that re-runs the same scrape as a one-off. Always present on the wire (non-nullable). See line 58 of the engine. """ url: str error: str scrape_config: Dict[str, Any] log_link: Optional[str] scrape_link: str @classmethod def from_payload(cls, event: str, payload: Dict[str, Any]) -> 'CrawlerUrlFailedWebhook': base = cls._parse_base(event, payload) return cls( **base, url=payload['url'], error=payload['error'], scrape_config=payload['scrape_config'], log_link=payload['links'].get('log'), scrape_link=payload['links']['scrape'], )Payload for the
crawler_url_failedevent.Emitted when a URL cannot be crawled (network error, scrape error, blocked, etc.).
Attributes
url- The URL that failed.
error- The scrapfly error code (e.g.
ERR::SCRAPE::NETWORK_ERROR). scrape_config- The scrape config that was used for the failed attempt.
log_link- URL to the full scrape log for this failure. Can be
None— the scrape-engine emitsnullwhen no log was recorded (e.g. the failure happened before the request was ever executed). Seescrape_engine/crawler/webhook_manager.py::dispatch_url_failedline 57. scrape_link- URL that re-runs the same scrape as a one-off. Always present on the wire (non-nullable). See line 58 of the engine.
Ancestors
Static methods
def from_payload(event: str, payload: Dict[str, Any]) ‑> CrawlerUrlFailedWebhook
Instance variables
var error : strvar log_link : str | Nonevar scrape_config : Dict[str, Any]var scrape_link : strvar url : str
class CrawlerUrlSkippedWebhook (event: str,
crawler_uuid: str,
project: str,
env: str,
action: str,
state: CrawlerState,
urls: Dict[str, str])-
Expand source code
@dataclass class CrawlerUrlSkippedWebhook(CrawlerWebhookBase): """ Payload for the ``crawler_url_skipped`` event. Emitted in a single batch when the crawler decides to skip a set of URLs (e.g. when reaching ``page_limit`` with discovered-but-unvisited URLs still in the queue). Attributes: urls: Mapping from URL to the reason it was skipped (e.g. ``"page_limit"``, ``"excluded"``, ``"robots_txt"``). """ urls: Dict[str, str] @classmethod def from_payload(cls, event: str, payload: Dict[str, Any]) -> 'CrawlerUrlSkippedWebhook': base = cls._parse_base(event, payload) return cls(**base, urls=payload['urls'])Payload for the
crawler_url_skippedevent.Emitted in a single batch when the crawler decides to skip a set of URLs (e.g. when reaching
page_limitwith discovered-but-unvisited URLs still in the queue).Attributes
urls- Mapping from URL to the reason it was skipped
(e.g.
"page_limit","excluded","robots_txt").
Ancestors
Static methods
def from_payload(event: str, payload: Dict[str, Any]) ‑> CrawlerUrlSkippedWebhook
Instance variables
var urls : Dict[str, str]
class CrawlerUrlVisitedWebhook (event: str,
crawler_uuid: str,
project: str,
env: str,
action: str,
state: CrawlerState,
url: str,
scrape: CrawlerScrapeResult)-
Expand source code
@dataclass class CrawlerUrlVisitedWebhook(CrawlerWebhookBase): """ Payload for the ``crawler_url_visited`` event. Emitted after each URL has been successfully scraped. Attributes: url: The URL that was just visited. scrape: Scrape result details (status code, country, log link, content). """ url: str scrape: CrawlerScrapeResult @classmethod def from_payload(cls, event: str, payload: Dict[str, Any]) -> 'CrawlerUrlVisitedWebhook': base = cls._parse_base(event, payload) return cls( **base, url=payload['url'], scrape=CrawlerScrapeResult.from_dict(payload['scrape']), )Payload for the
crawler_url_visitedevent.Emitted after each URL has been successfully scraped.
Attributes
url- The URL that was just visited.
scrape- Scrape result details (status code, country, log link, content).
Ancestors
Static methods
def from_payload(event: str, payload: Dict[str, Any]) ‑> CrawlerUrlVisitedWebhook
Instance variables
var scrape : CrawlerScrapeResultvar url : str
class CrawlerUrlsResponse (urls: List[ForwardRef('CrawlerUrlEntry')],
page: int,
per_page: int)-
Expand source code
class CrawlerUrlsResponse: """ Response from ``GET /crawl/{crawler_uuid}/urls``. The server returns a streaming ``text/plain`` body with one record per line. This class parses that stream into a materialised ``List`` of :class:`CrawlerUrlEntry` records for caller convenience. Pagination: the wire protocol carries no global ``total``. ``page`` and ``per_page`` are echoes of the caller's request parameters — request further pages by incrementing ``page`` until the response has no records. Attributes: urls: List of :class:`CrawlerUrlEntry` records on this page page: 1-based page number (echoed from the request) per_page: Page size (echoed from the request) """ __slots__ = ('urls', 'page', 'per_page') def __init__(self, urls: List['CrawlerUrlEntry'], page: int, per_page: int): self.urls = urls self.page = page self.per_page = per_page @classmethod def from_text( cls, body: str, status_hint: str, page: int, per_page: int, ) -> 'CrawlerUrlsResponse': """ Parse the raw text body returned by ``GET /crawl/{uuid}/urls``. - Empty lines are ignored (trailing newlines, blank records). - For ``visited`` / ``pending`` status each line is one URL. - For ``failed`` / ``skipped`` status each line is ``url,reason``. - When the caller passed no ``status`` filter, the server defaults to ``visited``; the caller is expected to pass that as ``status_hint`` so every parsed record gets the right status tag. Args: body: Raw response body text. status_hint: The status filter the caller used. page: Caller-provided page (echoed on the response object). per_page: Caller-provided per_page (echoed on the response object). """ entries: List[CrawlerUrlEntry] = [] for raw_line in body.splitlines(): line = raw_line.strip() if not line: continue if status_hint in ('visited', 'pending'): entries.append(CrawlerUrlEntry(url=line, status=status_hint)) else: # `url,reason` — split on the first comma only. URLs never # contain an unencoded comma in the path/query, so this is # unambiguous. comma_idx = line.find(',') if comma_idx == -1: entries.append(CrawlerUrlEntry(url=line, status=status_hint)) else: entries.append( CrawlerUrlEntry( url=line[:comma_idx], status=status_hint, reason=line[comma_idx + 1:] or None, ) ) return cls(entries, page, per_page) def __len__(self) -> int: return len(self.urls) def __iter__(self) -> Iterator[CrawlerUrlEntry]: return iter(self.urls) def __repr__(self): return ( f"CrawlerUrlsResponse(page={self.page}, per_page={self.per_page}, " f"urls={len(self.urls)})" )Response from
GET /crawl/{crawler_uuid}/urls.The server returns a streaming
text/plainbody with one record per line. This class parses that stream into a materialisedListof :class:CrawlerUrlEntryrecords for caller convenience.Pagination: the wire protocol carries no global
total.pageandper_pageare echoes of the caller's request parameters — request further pages by incrementingpageuntil the response has no records.Attributes
urls- List of :class:
CrawlerUrlEntryrecords on this page page- 1-based page number (echoed from the request)
per_page- Page size (echoed from the request)
Static methods
def from_text(body: str, status_hint: str, page: int, per_page: int) ‑> CrawlerUrlsResponse-
Parse the raw text body returned by
GET /crawl/{uuid}/urls.- Empty lines are ignored (trailing newlines, blank records).
- For
visited/pendingstatus each line is one URL. - For
failed/skippedstatus each line isurl,reason. - When the caller passed no
statusfilter, the server defaults tovisited; the caller is expected to pass that asstatus_hintso every parsed record gets the right status tag.
Args
body- Raw response body text.
status_hint- The status filter the caller used.
page- Caller-provided page (echoed on the response object).
per_page- Caller-provided per_page (echoed on the response object).
Instance variables
var page-
Expand source code
class CrawlerUrlsResponse: """ Response from ``GET /crawl/{crawler_uuid}/urls``. The server returns a streaming ``text/plain`` body with one record per line. This class parses that stream into a materialised ``List`` of :class:`CrawlerUrlEntry` records for caller convenience. Pagination: the wire protocol carries no global ``total``. ``page`` and ``per_page`` are echoes of the caller's request parameters — request further pages by incrementing ``page`` until the response has no records. Attributes: urls: List of :class:`CrawlerUrlEntry` records on this page page: 1-based page number (echoed from the request) per_page: Page size (echoed from the request) """ __slots__ = ('urls', 'page', 'per_page') def __init__(self, urls: List['CrawlerUrlEntry'], page: int, per_page: int): self.urls = urls self.page = page self.per_page = per_page @classmethod def from_text( cls, body: str, status_hint: str, page: int, per_page: int, ) -> 'CrawlerUrlsResponse': """ Parse the raw text body returned by ``GET /crawl/{uuid}/urls``. - Empty lines are ignored (trailing newlines, blank records). - For ``visited`` / ``pending`` status each line is one URL. - For ``failed`` / ``skipped`` status each line is ``url,reason``. - When the caller passed no ``status`` filter, the server defaults to ``visited``; the caller is expected to pass that as ``status_hint`` so every parsed record gets the right status tag. Args: body: Raw response body text. status_hint: The status filter the caller used. page: Caller-provided page (echoed on the response object). per_page: Caller-provided per_page (echoed on the response object). """ entries: List[CrawlerUrlEntry] = [] for raw_line in body.splitlines(): line = raw_line.strip() if not line: continue if status_hint in ('visited', 'pending'): entries.append(CrawlerUrlEntry(url=line, status=status_hint)) else: # `url,reason` — split on the first comma only. URLs never # contain an unencoded comma in the path/query, so this is # unambiguous. comma_idx = line.find(',') if comma_idx == -1: entries.append(CrawlerUrlEntry(url=line, status=status_hint)) else: entries.append( CrawlerUrlEntry( url=line[:comma_idx], status=status_hint, reason=line[comma_idx + 1:] or None, ) ) return cls(entries, page, per_page) def __len__(self) -> int: return len(self.urls) def __iter__(self) -> Iterator[CrawlerUrlEntry]: return iter(self.urls) def __repr__(self): return ( f"CrawlerUrlsResponse(page={self.page}, per_page={self.per_page}, " f"urls={len(self.urls)})" ) var per_page-
Expand source code
class CrawlerUrlsResponse: """ Response from ``GET /crawl/{crawler_uuid}/urls``. The server returns a streaming ``text/plain`` body with one record per line. This class parses that stream into a materialised ``List`` of :class:`CrawlerUrlEntry` records for caller convenience. Pagination: the wire protocol carries no global ``total``. ``page`` and ``per_page`` are echoes of the caller's request parameters — request further pages by incrementing ``page`` until the response has no records. Attributes: urls: List of :class:`CrawlerUrlEntry` records on this page page: 1-based page number (echoed from the request) per_page: Page size (echoed from the request) """ __slots__ = ('urls', 'page', 'per_page') def __init__(self, urls: List['CrawlerUrlEntry'], page: int, per_page: int): self.urls = urls self.page = page self.per_page = per_page @classmethod def from_text( cls, body: str, status_hint: str, page: int, per_page: int, ) -> 'CrawlerUrlsResponse': """ Parse the raw text body returned by ``GET /crawl/{uuid}/urls``. - Empty lines are ignored (trailing newlines, blank records). - For ``visited`` / ``pending`` status each line is one URL. - For ``failed`` / ``skipped`` status each line is ``url,reason``. - When the caller passed no ``status`` filter, the server defaults to ``visited``; the caller is expected to pass that as ``status_hint`` so every parsed record gets the right status tag. Args: body: Raw response body text. status_hint: The status filter the caller used. page: Caller-provided page (echoed on the response object). per_page: Caller-provided per_page (echoed on the response object). """ entries: List[CrawlerUrlEntry] = [] for raw_line in body.splitlines(): line = raw_line.strip() if not line: continue if status_hint in ('visited', 'pending'): entries.append(CrawlerUrlEntry(url=line, status=status_hint)) else: # `url,reason` — split on the first comma only. URLs never # contain an unencoded comma in the path/query, so this is # unambiguous. comma_idx = line.find(',') if comma_idx == -1: entries.append(CrawlerUrlEntry(url=line, status=status_hint)) else: entries.append( CrawlerUrlEntry( url=line[:comma_idx], status=status_hint, reason=line[comma_idx + 1:] or None, ) ) return cls(entries, page, per_page) def __len__(self) -> int: return len(self.urls) def __iter__(self) -> Iterator[CrawlerUrlEntry]: return iter(self.urls) def __repr__(self): return ( f"CrawlerUrlsResponse(page={self.page}, per_page={self.per_page}, " f"urls={len(self.urls)})" ) var urls-
Expand source code
class CrawlerUrlsResponse: """ Response from ``GET /crawl/{crawler_uuid}/urls``. The server returns a streaming ``text/plain`` body with one record per line. This class parses that stream into a materialised ``List`` of :class:`CrawlerUrlEntry` records for caller convenience. Pagination: the wire protocol carries no global ``total``. ``page`` and ``per_page`` are echoes of the caller's request parameters — request further pages by incrementing ``page`` until the response has no records. Attributes: urls: List of :class:`CrawlerUrlEntry` records on this page page: 1-based page number (echoed from the request) per_page: Page size (echoed from the request) """ __slots__ = ('urls', 'page', 'per_page') def __init__(self, urls: List['CrawlerUrlEntry'], page: int, per_page: int): self.urls = urls self.page = page self.per_page = per_page @classmethod def from_text( cls, body: str, status_hint: str, page: int, per_page: int, ) -> 'CrawlerUrlsResponse': """ Parse the raw text body returned by ``GET /crawl/{uuid}/urls``. - Empty lines are ignored (trailing newlines, blank records). - For ``visited`` / ``pending`` status each line is one URL. - For ``failed`` / ``skipped`` status each line is ``url,reason``. - When the caller passed no ``status`` filter, the server defaults to ``visited``; the caller is expected to pass that as ``status_hint`` so every parsed record gets the right status tag. Args: body: Raw response body text. status_hint: The status filter the caller used. page: Caller-provided page (echoed on the response object). per_page: Caller-provided per_page (echoed on the response object). """ entries: List[CrawlerUrlEntry] = [] for raw_line in body.splitlines(): line = raw_line.strip() if not line: continue if status_hint in ('visited', 'pending'): entries.append(CrawlerUrlEntry(url=line, status=status_hint)) else: # `url,reason` — split on the first comma only. URLs never # contain an unencoded comma in the path/query, so this is # unambiguous. comma_idx = line.find(',') if comma_idx == -1: entries.append(CrawlerUrlEntry(url=line, status=status_hint)) else: entries.append( CrawlerUrlEntry( url=line[:comma_idx], status=status_hint, reason=line[comma_idx + 1:] or None, ) ) return cls(entries, page, per_page) def __len__(self) -> int: return len(self.urls) def __iter__(self) -> Iterator[CrawlerUrlEntry]: return iter(self.urls) def __repr__(self): return ( f"CrawlerUrlsResponse(page={self.page}, per_page={self.per_page}, " f"urls={len(self.urls)})" )
class CrawlerWebhookBase (event: str,
crawler_uuid: str,
project: str,
env: str,
action: str,
state: CrawlerState)-
Expand source code
@dataclass class CrawlerWebhookBase: """ Common fields carried by every crawler webhook payload. Attributes: event: The wire event name (``crawler_started``, etc.). crawler_uuid: The crawler job UUID. project: Project slug the crawler belongs to. env: Environment (``LIVE`` or ``TEST``). action: Short action tag emitted by the scrape-engine (``started``, ``visited``, ``skipped``, ``url_discovery``, ``failed``, ``stopped``, ``cancelled``, ``finished``). state: Nested state counters at the moment the webhook was emitted. """ event: str crawler_uuid: str project: str env: str action: str state: CrawlerState @staticmethod def _parse_base(event: str, payload: Dict[str, Any]) -> Dict[str, Any]: """ Extract the 5 fields every webhook carries. Used by subclass ``from_payload()`` factories. """ return { 'event': event, 'crawler_uuid': payload['crawler_uuid'], 'project': payload['project'], 'env': payload['env'], 'action': payload['action'], 'state': CrawlerState(payload['state']), }Common fields carried by every crawler webhook payload.
Attributes
event- The wire event name (
crawler_started, etc.). crawler_uuid- The crawler job UUID.
project- Project slug the crawler belongs to.
env- Environment (
LIVEorTEST). action- Short action tag emitted by the scrape-engine
(
started,visited,skipped,url_discovery,failed,stopped,cancelled,finished). state- Nested state counters at the moment the webhook was emitted.
Subclasses
- CrawlerLifecycleWebhook
- CrawlerUrlDiscoveredWebhook
- CrawlerUrlFailedWebhook
- CrawlerUrlSkippedWebhook
- CrawlerUrlVisitedWebhook
Instance variables
var action : strvar crawler_uuid : strvar env : strvar event : strvar project : strvar state : CrawlerState
class CrawlerWebhookEvent (value, names=None, *, module=None, qualname=None, type=None, start=1)-
Expand source code
class CrawlerWebhookEvent(str, Enum): """ Crawler webhook event names. These MUST stay in sync with ``apps/scrapfly/scrape-engine/scrape_engine/scrape_engine/crawler/webhook_manager.py`` class ``WebhookEvents``. The scrape-engine is the source of truth. """ CRAWLER_STARTED = 'crawler_started' CRAWLER_STOPPED = 'crawler_stopped' CRAWLER_CANCELLED = 'crawler_cancelled' CRAWLER_FINISHED = 'crawler_finished' CRAWLER_URL_VISITED = 'crawler_url_visited' CRAWLER_URL_SKIPPED = 'crawler_url_skipped' CRAWLER_URL_DISCOVERED = 'crawler_url_discovered' CRAWLER_URL_FAILED = 'crawler_url_failed'Crawler webhook event names.
These MUST stay in sync with
apps/scrapfly/scrape-engine/scrape_engine/scrape_engine/crawler/webhook_manager.pyclassWebhookEvents. The scrape-engine is the source of truth.Ancestors
- builtins.str
- enum.Enum
Class variables
var CRAWLER_CANCELLEDvar CRAWLER_FINISHEDvar CRAWLER_STARTEDvar CRAWLER_STOPPEDvar CRAWLER_URL_DISCOVEREDvar CRAWLER_URL_FAILEDvar CRAWLER_URL_SKIPPEDvar CRAWLER_URL_VISITED
class HarArchive (har_data: bytes)-
Expand source code
class HarArchive: """Parser and accessor for HAR (HTTP Archive) format data""" def __init__(self, har_data: bytes): """ Initialize HAR archive from bytes Args: har_data: HAR file content as bytes (JSON format, may be gzipped) """ # Decompress if gzipped if isinstance(har_data, bytes): if har_data[:2] == b'\x1f\x8b': # gzip magic number har_data = gzip.decompress(har_data) har_data = har_data.decode('utf-8') # Parse the special format: {"log":{...,"entries":[]}}{"entry1"}{"entry2"}... # First object is HAR log structure, subsequent objects are individual entries objects = [] decoder = json.JSONDecoder() idx = 0 while idx < len(har_data): har_data_stripped = har_data[idx:].lstrip() if not har_data_stripped: break try: obj, end_idx = decoder.raw_decode(har_data_stripped) objects.append(obj) idx += len(har_data[idx:]) - len(har_data_stripped) + end_idx except json.JSONDecodeError: break # First object should be the HAR log structure if objects and 'log' in objects[0]: self._data = objects[0] self._log = self._data.get('log', {}) # Remaining objects are the entries self._entries = objects[1:] if len(objects) > 1 else [] else: # Fallback: standard HAR format self._data = json.loads(har_data) if isinstance(har_data, str) else {} self._log = self._data.get('log', {}) self._entries = self._log.get('entries', []) @property def version(self) -> str: """Get HAR version""" return self._log.get('version', '') @property def creator(self) -> Dict[str, Any]: """Get creator information""" return self._log.get('creator', {}) @property def pages(self) -> List[Dict[str, Any]]: """Get pages list""" return self._log.get('pages', []) def get_entries(self) -> List[HarEntry]: """ Get all entries as list Returns: List of HarEntry objects """ return [HarEntry(entry) for entry in self._entries] def iter_entries(self) -> Iterator[HarEntry]: """ Iterate through all HAR entries Yields: HarEntry objects """ for entry in self._entries: yield HarEntry(entry) def get_urls(self) -> List[str]: """ Get all URLs in the archive Returns: List of unique URLs """ urls = [] for entry in self._entries: url = entry.get('request', {}).get('url', '') if url and url not in urls: urls.append(url) return urls def find_by_url(self, url: str) -> Optional[HarEntry]: """ Find entry by exact URL match Args: url: URL to search for Returns: First matching HarEntry or None """ for entry in self.iter_entries(): if entry.url == url: return entry return None def filter_by_status(self, status_code: int) -> List[HarEntry]: """ Filter entries by status code Args: status_code: HTTP status code to filter by Returns: List of matching HarEntry objects """ return [entry for entry in self.iter_entries() if entry.status_code == status_code] def filter_by_content_type(self, content_type: str) -> List[HarEntry]: """ Filter entries by content type (substring match) Args: content_type: Content type to filter by (e.g., 'text/html') Returns: List of matching HarEntry objects """ return [entry for entry in self.iter_entries() if content_type.lower() in entry.content_type.lower()] def __len__(self) -> int: """Get number of entries""" return len(self._entries) def __repr__(self) -> str: return f"<HarArchive {len(self._entries)} entries>"Parser and accessor for HAR (HTTP Archive) format data
Initialize HAR archive from bytes
Args
har_data- HAR file content as bytes (JSON format, may be gzipped)
Instance variables
prop creator : Dict[str, Any]-
Expand source code
@property def creator(self) -> Dict[str, Any]: """Get creator information""" return self._log.get('creator', {})Get creator information
prop pages : List[Dict[str, Any]]-
Expand source code
@property def pages(self) -> List[Dict[str, Any]]: """Get pages list""" return self._log.get('pages', [])Get pages list
prop version : str-
Expand source code
@property def version(self) -> str: """Get HAR version""" return self._log.get('version', '')Get HAR version
Methods
def filter_by_content_type(self, content_type: str) ‑> List[HarEntry]-
Expand source code
def filter_by_content_type(self, content_type: str) -> List[HarEntry]: """ Filter entries by content type (substring match) Args: content_type: Content type to filter by (e.g., 'text/html') Returns: List of matching HarEntry objects """ return [entry for entry in self.iter_entries() if content_type.lower() in entry.content_type.lower()]Filter entries by content type (substring match)
Args
content_type- Content type to filter by (e.g., 'text/html')
Returns
List of matching HarEntry objects
def filter_by_status(self, status_code: int) ‑> List[HarEntry]-
Expand source code
def filter_by_status(self, status_code: int) -> List[HarEntry]: """ Filter entries by status code Args: status_code: HTTP status code to filter by Returns: List of matching HarEntry objects """ return [entry for entry in self.iter_entries() if entry.status_code == status_code]Filter entries by status code
Args
status_code- HTTP status code to filter by
Returns
List of matching HarEntry objects
def find_by_url(self, url: str) ‑> HarEntry | None-
Expand source code
def find_by_url(self, url: str) -> Optional[HarEntry]: """ Find entry by exact URL match Args: url: URL to search for Returns: First matching HarEntry or None """ for entry in self.iter_entries(): if entry.url == url: return entry return NoneFind entry by exact URL match
Args
url- URL to search for
Returns
First matching HarEntry or None
def get_entries(self) ‑> List[HarEntry]-
Expand source code
def get_entries(self) -> List[HarEntry]: """ Get all entries as list Returns: List of HarEntry objects """ return [HarEntry(entry) for entry in self._entries]Get all entries as list
Returns
List of HarEntry objects
def get_urls(self) ‑> List[str]-
Expand source code
def get_urls(self) -> List[str]: """ Get all URLs in the archive Returns: List of unique URLs """ urls = [] for entry in self._entries: url = entry.get('request', {}).get('url', '') if url and url not in urls: urls.append(url) return urlsGet all URLs in the archive
Returns
List of unique URLs
def iter_entries(self) ‑> Iterator[HarEntry]-
Expand source code
def iter_entries(self) -> Iterator[HarEntry]: """ Iterate through all HAR entries Yields: HarEntry objects """ for entry in self._entries: yield HarEntry(entry)Iterate through all HAR entries
Yields
HarEntry objects
class HarEntry (entry_data: Dict[str, Any])-
Expand source code
class HarEntry: """Represents a single HAR entry (HTTP request/response pair)""" def __init__(self, entry_data: Dict[str, Any]): """ Initialize from HAR entry dict Args: entry_data: HAR entry dictionary """ self._data = entry_data self._request = entry_data.get('request', {}) self._response = entry_data.get('response', {}) @property def url(self) -> str: """Get request URL""" return self._request.get('url', '') @property def method(self) -> str: """Get HTTP method""" return self._request.get('method', 'GET') @property def status_code(self) -> int: """Get response status code""" # Handle case where response doesn't exist or status is missing if not self._response: return 0 status = self._response.get('status') if status is None: return 0 # Ensure it's an int (HAR data might have status as string) try: return int(status) except (ValueError, TypeError): return 0 @property def status_text(self) -> str: """Get response status text""" return self._response.get('statusText', '') @property def request_headers(self) -> Dict[str, str]: """Get request headers as dict""" headers = {} for header in self._request.get('headers', []): headers[header['name']] = header['value'] return headers @property def response_headers(self) -> Dict[str, str]: """Get response headers as dict""" headers = {} for header in self._response.get('headers', []): headers[header['name']] = header['value'] return headers @property def content(self) -> bytes: """Get response content as bytes""" content_data = self._response.get('content', {}) text = content_data.get('text', '') # Handle base64 encoding if present encoding = content_data.get('encoding', '') if encoding == 'base64': import base64 return base64.b64decode(text) # Return as UTF-8 bytes if isinstance(text, str): return text.encode('utf-8') return text @property def content_type(self) -> str: """Get response content type""" return self._response.get('content', {}).get('mimeType', '') @property def content_size(self) -> int: """Get response content size""" return self._response.get('content', {}).get('size', 0) @property def started_datetime(self) -> str: """Get when request was started (ISO 8601 format)""" return self._data.get('startedDateTime', '') @property def time(self) -> float: """Get total elapsed time in milliseconds""" return self._data.get('time', 0.0) @property def timings(self) -> Dict[str, float]: """Get detailed timing information""" return self._data.get('timings', {}) def __repr__(self) -> str: return f"<HarEntry {self.method} {self.url} [{self.status_code}]>"Represents a single HAR entry (HTTP request/response pair)
Initialize from HAR entry dict
Args
entry_data- HAR entry dictionary
Instance variables
prop content : bytes-
Expand source code
@property def content(self) -> bytes: """Get response content as bytes""" content_data = self._response.get('content', {}) text = content_data.get('text', '') # Handle base64 encoding if present encoding = content_data.get('encoding', '') if encoding == 'base64': import base64 return base64.b64decode(text) # Return as UTF-8 bytes if isinstance(text, str): return text.encode('utf-8') return textGet response content as bytes
prop content_size : int-
Expand source code
@property def content_size(self) -> int: """Get response content size""" return self._response.get('content', {}).get('size', 0)Get response content size
prop content_type : str-
Expand source code
@property def content_type(self) -> str: """Get response content type""" return self._response.get('content', {}).get('mimeType', '')Get response content type
prop method : str-
Expand source code
@property def method(self) -> str: """Get HTTP method""" return self._request.get('method', 'GET')Get HTTP method
prop request_headers : Dict[str, str]-
Expand source code
@property def request_headers(self) -> Dict[str, str]: """Get request headers as dict""" headers = {} for header in self._request.get('headers', []): headers[header['name']] = header['value'] return headersGet request headers as dict
prop response_headers : Dict[str, str]-
Expand source code
@property def response_headers(self) -> Dict[str, str]: """Get response headers as dict""" headers = {} for header in self._response.get('headers', []): headers[header['name']] = header['value'] return headersGet response headers as dict
prop started_datetime : str-
Expand source code
@property def started_datetime(self) -> str: """Get when request was started (ISO 8601 format)""" return self._data.get('startedDateTime', '')Get when request was started (ISO 8601 format)
prop status_code : int-
Expand source code
@property def status_code(self) -> int: """Get response status code""" # Handle case where response doesn't exist or status is missing if not self._response: return 0 status = self._response.get('status') if status is None: return 0 # Ensure it's an int (HAR data might have status as string) try: return int(status) except (ValueError, TypeError): return 0Get response status code
prop status_text : str-
Expand source code
@property def status_text(self) -> str: """Get response status text""" return self._response.get('statusText', '')Get response status text
prop time : float-
Expand source code
@property def time(self) -> float: """Get total elapsed time in milliseconds""" return self._data.get('time', 0.0)Get total elapsed time in milliseconds
prop timings : Dict[str, float]-
Expand source code
@property def timings(self) -> Dict[str, float]: """Get detailed timing information""" return self._data.get('timings', {})Get detailed timing information
prop url : str-
Expand source code
@property def url(self) -> str: """Get request URL""" return self._request.get('url', '')Get request URL
class WarcParser (warc_data: bytes |) -
Expand source code
class WarcParser: """ Parser for WARC files with automatic decompression Provides methods to iterate through WARC records and extract page data. Example: ```python # From bytes parser = WarcParser(warc_bytes) # Iterate all records for record in parser.iter_records(): print(f"{record.url}: {record.status_code}") # Get only HTTP responses for record in parser.iter_responses(): print(f"Page: {record.url}") html = record.content.decode('utf-8') # Get all pages as simple dicts pages = parser.get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}") ``` """ def __init__(self, warc_data: Union[bytes, BinaryIO]): """ Initialize WARC parser Args: warc_data: WARC data as bytes or file-like object (supports both gzip-compressed and uncompressed) """ if isinstance(warc_data, bytes): # Try to decompress if gzipped if warc_data[:2] == b'\x1f\x8b': # gzip magic number try: warc_data = gzip.decompress(warc_data) except Exception: pass # Not gzipped or decompression failed self._data = BytesIO(warc_data) else: self._data = warc_data def iter_records(self) -> Iterator[WarcRecord]: """ Iterate through all WARC records Yields: WarcRecord: Each record in the WARC file """ self._data.seek(0) while True: # Read WARC version line version_line = self._read_line() if not version_line or not version_line.startswith(b'WARC/'): break # Read WARC headers warc_headers = self._read_headers() if not warc_headers: break # Get content length content_length = int(warc_headers.get('Content-Length', 0)) # Read content block content_block = self._data.read(content_length) # Skip trailing newlines self._read_line() self._read_line() # Parse the record record = self._parse_record(warc_headers, content_block) if record: yield record def iter_responses(self) -> Iterator[WarcRecord]: """ Iterate through HTTP response records only Filters out non-response records (requests, metadata, etc.) Yields: WarcRecord: HTTP response records only """ for record in self.iter_records(): if record.record_type == 'response' and record.status_code: yield record def get_pages(self) -> List[Dict]: """ Get all crawled pages as simple dictionaries This is the easiest way to access crawl results without dealing with WARC format details. Returns: List of dicts with keys: url, status_code, headers, content Example: ```python pages = parser.get_pages() for page in pages: print(f"{page['url']}: {len(page['content'])} bytes") html = page['content'].decode('utf-8') ``` """ pages = [] for record in self.iter_responses(): pages.append({ 'url': record.url, 'status_code': record.status_code, 'headers': record.headers, 'content': record.content }) return pages def _read_line(self) -> bytes: """Read a single line from the WARC file""" line = self._data.readline() return line.rstrip(b'\r\n') def _read_headers(self) -> Dict[str, str]: """Read headers until empty line""" headers = {} while True: line = self._read_line() if not line: break # Parse header line if b':' in line: key, value = line.split(b':', 1) headers[key.decode('utf-8').strip()] = value.decode('utf-8').strip() return headers def _parse_record(self, warc_headers: Dict[str, str], content_block: bytes) -> Optional[WarcRecord]: """Parse a WARC record from headers and content""" record_type = warc_headers.get('WARC-Type', '') url = warc_headers.get('WARC-Target-URI', '') if record_type == 'response': # Parse HTTP response http_headers, body = self._parse_http_response(content_block) status_code = self._extract_status_code(content_block) return WarcRecord( record_type=record_type, url=url, headers=http_headers, content=body, status_code=status_code, warc_headers=warc_headers ) elif record_type in ['request', 'metadata', 'warcinfo']: # Other record types - store raw content return WarcRecord( record_type=record_type, url=url, headers={}, content=content_block, status_code=None, warc_headers=warc_headers ) return None def _parse_http_response(self, content_block: bytes) -> tuple: """Parse HTTP response into headers and body""" try: # Split on double newline (end of headers) parts = content_block.split(b'\r\n\r\n', 1) if len(parts) < 2: parts = content_block.split(b'\n\n', 1) if len(parts) == 2: header_section, body = parts else: header_section, body = content_block, b'' # Parse headers headers = {} lines = header_section.split(b'\r\n') if b'\r\n' in header_section else header_section.split(b'\n') # Skip status line for line in lines[1:]: if b':' in line: key, value = line.split(b':', 1) headers[key.decode('utf-8', errors='ignore').strip()] = value.decode('utf-8', errors='ignore').strip() return headers, body except Exception: return {}, content_block def _extract_status_code(self, content_block: bytes) -> Optional[int]: """Extract HTTP status code from response""" try: # Look for HTTP status line (e.g., "HTTP/1.1 200 OK") first_line = content_block.split(b'\r\n', 1)[0] if b'\r\n' in content_block else content_block.split(b'\n', 1)[0] match = re.match(rb'HTTP/\d\.\d (\d+)', first_line) if match: return int(match.group(1)) except Exception: pass return NoneParser for WARC files with automatic decompression
Provides methods to iterate through WARC records and extract page data.
Example
# From bytes parser = WarcParser(warc_bytes) # Iterate all records for record in parser.iter_records(): print(f"{record.url}: {record.status_code}") # Get only HTTP responses for record in parser.iter_responses(): print(f"Page: {record.url}") html = record.content.decode('utf-8') # Get all pages as simple dicts pages = parser.get_pages() for page in pages: print(f"{page['url']}: {page['status_code']}")Initialize WARC parser
Args
warc_data- WARC data as bytes or file-like object (supports both gzip-compressed and uncompressed)
Methods
def get_pages(self) ‑> List[Dict]-
Expand source code
def get_pages(self) -> List[Dict]: """ Get all crawled pages as simple dictionaries This is the easiest way to access crawl results without dealing with WARC format details. Returns: List of dicts with keys: url, status_code, headers, content Example: ```python pages = parser.get_pages() for page in pages: print(f"{page['url']}: {len(page['content'])} bytes") html = page['content'].decode('utf-8') ``` """ pages = [] for record in self.iter_responses(): pages.append({ 'url': record.url, 'status_code': record.status_code, 'headers': record.headers, 'content': record.content }) return pagesGet all crawled pages as simple dictionaries
This is the easiest way to access crawl results without dealing with WARC format details.
Returns
Listofdicts with keys- url, status_code, headers, content
Example
pages = parser.get_pages() for page in pages: print(f"{page['url']}: {len(page['content'])} bytes") html = page['content'].decode('utf-8') def iter_records(self) ‑> Iterator[WarcRecord]-
Expand source code
def iter_records(self) -> Iterator[WarcRecord]: """ Iterate through all WARC records Yields: WarcRecord: Each record in the WARC file """ self._data.seek(0) while True: # Read WARC version line version_line = self._read_line() if not version_line or not version_line.startswith(b'WARC/'): break # Read WARC headers warc_headers = self._read_headers() if not warc_headers: break # Get content length content_length = int(warc_headers.get('Content-Length', 0)) # Read content block content_block = self._data.read(content_length) # Skip trailing newlines self._read_line() self._read_line() # Parse the record record = self._parse_record(warc_headers, content_block) if record: yield record def iter_responses(self) ‑> Iterator[WarcRecord]-
Expand source code
def iter_responses(self) -> Iterator[WarcRecord]: """ Iterate through HTTP response records only Filters out non-response records (requests, metadata, etc.) Yields: WarcRecord: HTTP response records only """ for record in self.iter_records(): if record.record_type == 'response' and record.status_code: yield recordIterate through HTTP response records only
Filters out non-response records (requests, metadata, etc.)
Yields
WarcRecord- HTTP response records only
class WarcRecord (record_type: str,
url: str,
headers: Dict[str, str],
content: bytes,
status_code: int | None,
warc_headers: Dict[str, str])-
Expand source code
@dataclass class WarcRecord: """ Represents a single WARC record A WARC file contains multiple records, each representing a captured HTTP transaction or metadata. """ record_type: str # Type of record (response, request, metadata, etc.) url: str # Associated URL headers: Dict[str, str] # HTTP headers content: bytes # Response body/content status_code: Optional[int] # HTTP status code (for response records) warc_headers: Dict[str, str] # WARC-specific headers def __repr__(self): return f"WarcRecord(type={self.record_type}, url={self.url}, status={self.status_code})"Represents a single WARC record
A WARC file contains multiple records, each representing a captured HTTP transaction or metadata.
Instance variables
var content : bytesvar headers : Dict[str, str]var record_type : strvar status_code : int | Nonevar url : strvar warc_headers : Dict[str, str]