Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.videodb.io/llms.txt

Use this file to discover all available pages before exploring further.

Production systems need reliable webhook handling. This guide covers idempotency, retry logic, and queue patterns.

Quick Example

from flask import Flask, request, jsonify
import hashlib

app = Flask(__name__)
processed_events = set()  # Use Redis in production

@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
    payload = request.json

    # Generate idempotency key
    idempotency_key = hashlib.sha256(
        f"{payload['event_id']}:{payload['timestamp']}".encode()
    ).hexdigest()

    # Skip if already processed
    if idempotency_key in processed_events:
        return jsonify({"status": "duplicate"}), 200

    # Process the alert
    process_alert(payload)

    # Mark as processed
    processed_events.add(idempotency_key)

    return jsonify({"status": "ok"}), 200

Idempotency

Webhooks may be delivered multiple times. Always implement idempotency.

Generate Idempotency Keys

Use a combination of unique fields:
# Option 1: Event ID + timestamp
key = f"{payload['event_id']}:{payload['timestamp']}"

# Option 2: Hash of entire payload
key = hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()

Store Processed Keys

import redis

r = redis.Redis()

def is_duplicate(key: str) -> bool:
    """Check if we've already processed this event"""
    # Set with 24h expiry
    return not r.set(f"webhook:{key}", "1", nx=True, ex=86400)

@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
    payload = request.json
    key = generate_key(payload)

    if is_duplicate(key):
        return jsonify({"status": "duplicate"}), 200

    process_alert(payload)
    return jsonify({"status": "ok"}), 200

Respond Quickly

Return 200 immediately, then process asynchronously:
from celery import Celery
from flask import Flask, request, jsonify

app = Flask(__name__)
celery = Celery('tasks', broker='redis://localhost:6379/0')

@celery.task
def process_alert_async(payload):
    """Heavy processing happens here"""
    # Notify team
    send_slack_notification(payload)
    # Store in database
    save_to_db(payload)
    # Trigger downstream actions
    trigger_automation(payload)

@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
    payload = request.json

    # Acknowledge immediately
    process_alert_async.delay(payload)

    return jsonify({"status": "queued"}), 200

Queue Patterns

Basic Queue Architecture

VideoDB → Webhook Endpoint → Queue → Worker → Actions

         Return 200 fast

Priority Queues

Handle critical alerts first:
from celery import Celery

celery = Celery('tasks', broker='redis://localhost:6379/0')

@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
    payload = request.json
    label = payload.get("label", "")

    # High priority for safety alerts
    if label in ["fall_detected", "intrusion_detected", "fire_detected"]:
        process_alert_async.apply_async(args=[payload], priority=0)
    else:
        process_alert_async.apply_async(args=[payload], priority=5)

    return jsonify({"status": "queued"}), 200

Error Handling

Retry Failed Processing

from celery import Celery
from celery.exceptions import MaxRetriesExceededError

celery = Celery('tasks', broker='redis://localhost:6379/0')

@celery.task(bind=True, max_retries=3, default_retry_delay=60)
def process_alert_async(self, payload):
    try:
        # Process the alert
        send_notification(payload)
        save_to_db(payload)
    except Exception as e:
        # Retry with exponential backoff
        raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))

Dead Letter Queue

Handle permanently failed jobs:
@celery.task(bind=True, max_retries=3)
def process_alert_async(self, payload):
    try:
        process(payload)
    except MaxRetriesExceededError:
        # Move to dead letter queue
        save_to_dead_letter(payload)
        alert_ops_team(payload)
    except Exception as e:
        raise self.retry(exc=e)

Monitoring

Track Webhook Health

from prometheus_client import Counter, Histogram

webhook_received = Counter(
    'webhook_received_total',
    'Total webhooks received',
    ['event_label']
)

webhook_latency = Histogram(
    'webhook_processing_seconds',
    'Time to process webhook'
)

@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
    payload = request.json

    webhook_received.labels(event_label=payload.get("label")).inc()

    with webhook_latency.time():
        process_alert(payload)

    return jsonify({"status": "ok"}), 200

Alert on Issues

# Alert if processing takes too long
if processing_time > 5.0:
    send_ops_alert("Webhook processing slow")

# Alert if queue is backing up
queue_size = celery.control.inspect().active()
if queue_size > 1000:
    send_ops_alert("Alert queue backing up")

Checklist

Before going to production:
  • Idempotency keys stored in Redis/database
  • Webhook returns 200 within 500ms
  • Async processing with queue (Celery, Bull, etc.)
  • Retry logic with exponential backoff
  • Dead letter queue for failed jobs
  • Monitoring and alerting
  • Key expiry to prevent memory growth

Next Steps

Event Detection Patterns

Create effective detection rules

Alerts and Callbacks

Wire events to delivery channels