Module scrapfly.webhook

Expand source code
from typing import Callable, Optional, Tuple
from enum import Enum

import flask
from flask import request, make_response
from scrapfly import ResponseBodyHandler
import logging as logger


class ResourceType(Enum):
    SCRAPE = 'scrape'
    PING = 'ping'


def create_server(signing_secrets:Tuple[str], callback:Callable, app:Optional[flask.Flask]=None) -> flask.Flask:

    if app is None:
        app = flask.Flask("Scrapfly Webhook Server")

    @app.route("/webhook", methods=["POST"])
    def webhook():
        headers = request.headers
        resource_type = headers.get('X-Scrapfly-Webhook-Resource-Type')

        if resource_type == ResourceType.SCRAPE.value or resource_type == ResourceType.PING.value:
            body_handler = ResponseBodyHandler(signing_secrets=signing_secrets)
            data = body_handler.read(
                content=request.data,
                content_encoding=headers.get('Content-Encoding'),
                content_type=headers.get('Content-Type'),
                signature=headers.get('X-Scrapfly-Webhook-Signature', None) # Can be none when ping during the webhook creation flow via "ping"
            )

            try:
                callback(data, resource_type, request)
                return make_response("", 200)
            except Exception as e:
                logger.error(e)
                return make_response("", 500)

        return make_response("Do not support resource type %s" % resource_type, 400)

    return app

Functions

def create_server(signing_secrets: Tuple[str], callback: Callable, app: Optional[flask.app.Flask] = None) ‑> flask.app.Flask
Expand source code
def create_server(signing_secrets:Tuple[str], callback:Callable, app:Optional[flask.Flask]=None) -> flask.Flask:

    if app is None:
        app = flask.Flask("Scrapfly Webhook Server")

    @app.route("/webhook", methods=["POST"])
    def webhook():
        headers = request.headers
        resource_type = headers.get('X-Scrapfly-Webhook-Resource-Type')

        if resource_type == ResourceType.SCRAPE.value or resource_type == ResourceType.PING.value:
            body_handler = ResponseBodyHandler(signing_secrets=signing_secrets)
            data = body_handler.read(
                content=request.data,
                content_encoding=headers.get('Content-Encoding'),
                content_type=headers.get('Content-Type'),
                signature=headers.get('X-Scrapfly-Webhook-Signature', None) # Can be none when ping during the webhook creation flow via "ping"
            )

            try:
                callback(data, resource_type, request)
                return make_response("", 200)
            except Exception as e:
                logger.error(e)
                return make_response("", 500)

        return make_response("Do not support resource type %s" % resource_type, 400)

    return app

Classes

class ResourceType (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class ResourceType(Enum):
    SCRAPE = 'scrape'
    PING = 'ping'

Ancestors

  • enum.Enum

Class variables

var PING
var SCRAPE