Documentation Index
Fetch the complete documentation index at: https://docs.videodb.io/llms.txt
Use this file to discover all available pages before exploring further.
Production systems need reliable webhook handling. This guide covers idempotency, retry logic, and queue patterns.
Quick Example
from flask import Flask, request, jsonify
import hashlib
app = Flask(__name__)
processed_events = set() # Use Redis in production
@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
payload = request.json
# Generate idempotency key
idempotency_key = hashlib.sha256(
f"{payload['event_id']}:{payload['timestamp']}".encode()
).hexdigest()
# Skip if already processed
if idempotency_key in processed_events:
return jsonify({"status": "duplicate"}), 200
# Process the alert
process_alert(payload)
# Mark as processed
processed_events.add(idempotency_key)
return jsonify({"status": "ok"}), 200
Idempotency
Webhooks may be delivered multiple times. Always implement idempotency.
Generate Idempotency Keys
Use a combination of unique fields:
# Option 1: Event ID + timestamp
key = f"{payload['event_id']}:{payload['timestamp']}"
# Option 2: Hash of entire payload
key = hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()
Store Processed Keys
import redis
r = redis.Redis()
def is_duplicate(key: str) -> bool:
"""Check if we've already processed this event"""
# Set with 24h expiry
return not r.set(f"webhook:{key}", "1", nx=True, ex=86400)
@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
payload = request.json
key = generate_key(payload)
if is_duplicate(key):
return jsonify({"status": "duplicate"}), 200
process_alert(payload)
return jsonify({"status": "ok"}), 200
Respond Quickly
Return 200 immediately, then process asynchronously:
from celery import Celery
from flask import Flask, request, jsonify
app = Flask(__name__)
celery = Celery('tasks', broker='redis://localhost:6379/0')
@celery.task
def process_alert_async(payload):
"""Heavy processing happens here"""
# Notify team
send_slack_notification(payload)
# Store in database
save_to_db(payload)
# Trigger downstream actions
trigger_automation(payload)
@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
payload = request.json
# Acknowledge immediately
process_alert_async.delay(payload)
return jsonify({"status": "queued"}), 200
Queue Patterns
Basic Queue Architecture
VideoDB → Webhook Endpoint → Queue → Worker → Actions
↓
Return 200 fast
Priority Queues
Handle critical alerts first:
from celery import Celery
celery = Celery('tasks', broker='redis://localhost:6379/0')
@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
payload = request.json
label = payload.get("label", "")
# High priority for safety alerts
if label in ["fall_detected", "intrusion_detected", "fire_detected"]:
process_alert_async.apply_async(args=[payload], priority=0)
else:
process_alert_async.apply_async(args=[payload], priority=5)
return jsonify({"status": "queued"}), 200
Error Handling
Retry Failed Processing
from celery import Celery
from celery.exceptions import MaxRetriesExceededError
celery = Celery('tasks', broker='redis://localhost:6379/0')
@celery.task(bind=True, max_retries=3, default_retry_delay=60)
def process_alert_async(self, payload):
try:
# Process the alert
send_notification(payload)
save_to_db(payload)
except Exception as e:
# Retry with exponential backoff
raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
Dead Letter Queue
Handle permanently failed jobs:
@celery.task(bind=True, max_retries=3)
def process_alert_async(self, payload):
try:
process(payload)
except MaxRetriesExceededError:
# Move to dead letter queue
save_to_dead_letter(payload)
alert_ops_team(payload)
except Exception as e:
raise self.retry(exc=e)
Monitoring
Track Webhook Health
from prometheus_client import Counter, Histogram
webhook_received = Counter(
'webhook_received_total',
'Total webhooks received',
['event_label']
)
webhook_latency = Histogram(
'webhook_processing_seconds',
'Time to process webhook'
)
@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
payload = request.json
webhook_received.labels(event_label=payload.get("label")).inc()
with webhook_latency.time():
process_alert(payload)
return jsonify({"status": "ok"}), 200
Alert on Issues
# Alert if processing takes too long
if processing_time > 5.0:
send_ops_alert("Webhook processing slow")
# Alert if queue is backing up
queue_size = celery.control.inspect().active()
if queue_size > 1000:
send_ops_alert("Alert queue backing up")
Checklist
Before going to production:
- Idempotency keys stored in Redis/database
- Webhook returns 200 within 500ms
- Async processing with queue (Celery, Bull, etc.)
- Retry logic with exponential backoff
- Dead letter queue for failed jobs
- Monitoring and alerting
- Key expiry to prevent memory growth
Next Steps
Event Detection Patterns
Create effective detection rules
Alerts and Callbacks
Wire events to delivery channels