Module scrapfly.batch

Streaming multipart/mixed parser for the POST /scrape/batch endpoint.

The API emits one part per scrape result as each scrape completes; the client must consume parts as they arrive (not after the whole response lands) to get the end-to-end streaming benefit.

Design notes: - Pure-Python parser (no new deps). Reuses requests streaming iter_content. - Works uniformly for JSON and msgpack part bodies — the negotiated part content-type is surfaced to the caller in the yielded tuple. - Does NOT perform decompression itself — requests already handles Content-Encoding gzip/zstd at the envelope level when stream=True is set with decode_content=True (default on requests Response).

Functions

def decode_part_body(headers: Dict[str, str], body: bytes, body_handler)
Expand source code
def decode_part_body(
    headers: Dict[str, str],
    body: bytes,
    body_handler,
):
    """
    Decode one part body according to its Content-Type header.
    Delegates to the existing ResponseBodyHandler for msgpack/json
    symmetry with single /scrape responses.
    """

    content_type = headers.get("content-type", "application/json")

    # body_handler.__call__ takes (content, content_type) and returns
    # a parsed dict. It handles both JSON and msgpack.
    return body_handler(content=body, content_type=content_type)

Decode one part body according to its Content-Type header. Delegates to the existing ResponseBodyHandler for msgpack/json symmetry with single /scrape responses.

def iter_batch_parts(response) ‑> Iterator[Tuple[Dict[str, str], bytes]]
Expand source code
def iter_batch_parts(
    response,  # requests.Response — duck-typed to avoid circular imports
) -> Iterator[Tuple[Dict[str, str], bytes]]:
    """
    Iterate (part_headers, part_body) tuples from a streaming
    multipart/mixed response. The per-part Content-Type is in
    `part_headers['content-type']` (lowercased key), and the
    correlation_id is in `part_headers['x-scrapfly-correlation-id']`.

    The caller is responsible for decoding `part_body` based on the
    part's Content-Type (JSON vs msgpack).

    Raises ValueError if the outer Content-Type is not multipart/mixed
    or if the boundary parameter is missing.
    """

    envelope_ct = response.headers.get("Content-Type", "")
    mime, params = _parse_content_type(envelope_ct)

    if mime != "multipart/mixed":
        raise ValueError(
            f"scrape_batch: expected Content-Type multipart/mixed, got {envelope_ct!r}"
        )

    boundary_str = params.get("boundary")

    if not boundary_str:
        raise ValueError(
            f"scrape_batch: Content-Type multipart/mixed is missing boundary parameter: {envelope_ct!r}"
        )

    boundary = boundary_str.encode("ascii")

    chunks = response.iter_content(chunk_size=8 * 1024)
    reader = _BufferedMultipartReader(chunks, boundary)

    # Skip anything before the first --boundary.
    reader.discard_prefix()

    while True:
        # After each --boundary we expect either CRLF (more parts) or
        # `--` (terminator). RFC 2046 mandates CRLF; any server
        # deviating from that is broken — return cleanly rather than
        # try to guess a framing variant.
        suffix = reader.read_exact(2)

        if suffix == b"--":
            # Final boundary. Drain CRLF and any epilogue.
            return

        if suffix != _CRLF:
            return

        # Read headers up to the blank line.
        header_block = reader.read_until(_CRLF + _CRLF)
        headers: Dict[str, str] = {}

        for line in header_block.split(_CRLF):
            if not line or b":" not in line:
                continue

            k, _, v = line.partition(b":")
            headers[k.decode("ascii", errors="replace").strip().lower()] = (
                v.decode("ascii", errors="replace").strip()
            )

        # Body framing: prefer Content-Length (we always emit it
        # server-side), fall back to boundary-delimited scan.
        cl_raw = headers.get("content-length")
        body: bytes

        if cl_raw and cl_raw.isdigit():
            body = reader.read_exact(int(cl_raw))
        else:
            # Read until next boundary marker. The "\r\n--<boundary>"
            # sequence is the canonical delimiter per RFC 2046.
            body = reader.read_until(_CRLF + b"--" + boundary)

        yield headers, body

        # If we used Content-Length, we still need to consume the
        # trailing "\r\n--<boundary>" that starts the next boundary.
        if cl_raw and cl_raw.isdigit():
            reader.read_until(_CRLF + b"--" + boundary)

Iterate (part_headers, part_body) tuples from a streaming multipart/mixed response. The per-part Content-Type is in part_headers['content-type'] (lowercased key), and the correlation_id is in part_headers['x-scrapfly-correlation-id'].

The caller is responsible for decoding part_body based on the part's Content-Type (JSON vs msgpack).

Raises ValueError if the outer Content-Type is not multipart/mixed or if the boundary parameter is missing.