Skip to main content
Production systems need reliable webhook handling. This guide covers idempotency, retry logic, and queue patterns.

Quick Example

from flask import Flask, request, jsonify
import hashlib

app = Flask(__name__)
processed_events = set()  # Use Redis in production

@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
    payload = request.json

    # Generate idempotency key
    idempotency_key = hashlib.sha256(
        f"{payload['event_id']}:{payload['timestamp']}".encode()
    ).hexdigest()

    # Skip if already processed
    if idempotency_key in processed_events:
        return jsonify({"status": "duplicate"}), 200

    # Process the alert
    process_alert(payload)

    # Mark as processed
    processed_events.add(idempotency_key)

    return jsonify({"status": "ok"}), 200

Idempotency

Webhooks may be delivered multiple times. Always implement idempotency.

Generate Idempotency Keys

Use a combination of unique fields:
# Option 1: Event ID + timestamp
key = f"{payload['event_id']}:{payload['timestamp']}"

# Option 2: Hash of entire payload
key = hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()

Store Processed Keys

import redis

r = redis.Redis()

def is_duplicate(key: str) -> bool:
    """Check if we've already processed this event"""
    # Set with 24h expiry
    return not r.set(f"webhook:{key}", "1", nx=True, ex=86400)

@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
    payload = request.json
    key = generate_key(payload)

    if is_duplicate(key):
        return jsonify({"status": "duplicate"}), 200

    process_alert(payload)
    return jsonify({"status": "ok"}), 200

Respond Quickly

Return 200 immediately, then process asynchronously:
from celery import Celery
from flask import Flask, request, jsonify

app = Flask(__name__)
celery = Celery('tasks', broker='redis://localhost:6379/0')

@celery.task
def process_alert_async(payload):
    """Heavy processing happens here"""
    # Notify team
    send_slack_notification(payload)
    # Store in database
    save_to_db(payload)
    # Trigger downstream actions
    trigger_automation(payload)

@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
    payload = request.json

    # Acknowledge immediately
    process_alert_async.delay(payload)

    return jsonify({"status": "queued"}), 200

Queue Patterns

Basic Queue Architecture

VideoDB → Webhook Endpoint → Queue → Worker → Actions

         Return 200 fast

Priority Queues

Handle critical alerts first:
from celery import Celery

celery = Celery('tasks', broker='redis://localhost:6379/0')

@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
    payload = request.json
    label = payload.get("label", "")

    # High priority for safety alerts
    if label in ["fall_detected", "intrusion_detected", "fire_detected"]:
        process_alert_async.apply_async(args=[payload], priority=0)
    else:
        process_alert_async.apply_async(args=[payload], priority=5)

    return jsonify({"status": "queued"}), 200

Error Handling

Retry Failed Processing

from celery import Celery
from celery.exceptions import MaxRetriesExceededError

celery = Celery('tasks', broker='redis://localhost:6379/0')

@celery.task(bind=True, max_retries=3, default_retry_delay=60)
def process_alert_async(self, payload):
    try:
        # Process the alert
        send_notification(payload)
        save_to_db(payload)
    except Exception as e:
        # Retry with exponential backoff
        raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))

Dead Letter Queue

Handle permanently failed jobs:
@celery.task(bind=True, max_retries=3)
def process_alert_async(self, payload):
    try:
        process(payload)
    except MaxRetriesExceededError:
        # Move to dead letter queue
        save_to_dead_letter(payload)
        alert_ops_team(payload)
    except Exception as e:
        raise self.retry(exc=e)

Monitoring

Track Webhook Health

from prometheus_client import Counter, Histogram

webhook_received = Counter(
    'webhook_received_total',
    'Total webhooks received',
    ['event_label']
)

webhook_latency = Histogram(
    'webhook_processing_seconds',
    'Time to process webhook'
)

@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
    payload = request.json

    webhook_received.labels(event_label=payload.get("label")).inc()

    with webhook_latency.time():
        process_alert(payload)

    return jsonify({"status": "ok"}), 200

Alert on Issues

# Alert if processing takes too long
if processing_time > 5.0:
    send_ops_alert("Webhook processing slow")

# Alert if queue is backing up
queue_size = celery.control.inspect().active()
if queue_size > 1000:
    send_ops_alert("Alert queue backing up")

Checklist

Before going to production:
  • Idempotency keys stored in Redis/database
  • Webhook returns 200 within 500ms
  • Async processing with queue (Celery, Bull, etc.)
  • Retry logic with exponential backoff
  • Dead letter queue for failed jobs
  • Monitoring and alerting
  • Key expiry to prevent memory growth

Next Steps