> ## Documentation Index
> Fetch the complete documentation index at: https://docs.videodb.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Webhooks and Reliability

> Handle webhooks at scale with idempotency, retries, and queues

Production systems need reliable webhook handling. This guide covers idempotency, retry logic, and queue patterns.

## Quick Example

<CodeGroup>
  ```python Python theme={null}
  from flask import Flask, request, jsonify
  import hashlib

  app = Flask(__name__)
  processed_events = set()  # Use Redis in production

  @app.route("/webhooks/alerts", methods=["POST"])
  def handle_alert():
      payload = request.json

      # Generate idempotency key
      idempotency_key = hashlib.sha256(
          f"{payload['event_id']}:{payload['timestamp']}".encode()
      ).hexdigest()

      # Skip if already processed
      if idempotency_key in processed_events:
          return jsonify({"status": "duplicate"}), 200

      # Process the alert
      process_alert(payload)

      # Mark as processed
      processed_events.add(idempotency_key)

      return jsonify({"status": "ok"}), 200
  ```

  ```javascript Node.js theme={null}
  import express from 'express';
  import crypto from 'crypto';

  const app = express();
  app.use(express.json());

  const processedEvents = new Set();  // Use Redis in production

  app.post("/webhooks/alerts", (req, res) => {
      const payload = req.body;

      // Generate idempotency key
      const idempotencyKey = crypto
          .createHash('sha256')
          .update(`${payload.event_id}:${payload.timestamp}`)
          .digest('hex');

      // Skip if already processed
      if (processedEvents.has(idempotencyKey)) {
          return res.json({ status: "duplicate" });
      }

      // Process the alert
      processAlert(payload);

      // Mark as processed
      processedEvents.add(idempotencyKey);

      res.json({ status: "ok" });
  });
  ```
</CodeGroup>

***

## Idempotency

Webhooks may be delivered multiple times. Always implement idempotency.

### Generate Idempotency Keys

Use a combination of unique fields:

```python theme={null}
# Option 1: Event ID + timestamp
key = f"{payload['event_id']}:{payload['timestamp']}"

# Option 2: Hash of entire payload
key = hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()
```

### Store Processed Keys

<CodeGroup>
  ```python Python theme={null}
  import redis

  r = redis.Redis()

  def is_duplicate(key: str) -> bool:
      """Check if we've already processed this event"""
      # Set with 24h expiry
      return not r.set(f"webhook:{key}", "1", nx=True, ex=86400)

  @app.route("/webhooks/alerts", methods=["POST"])
  def handle_alert():
      payload = request.json
      key = generate_key(payload)

      if is_duplicate(key):
          return jsonify({"status": "duplicate"}), 200

      process_alert(payload)
      return jsonify({"status": "ok"}), 200
  ```

  ```javascript Node.js theme={null}
  import Redis from 'ioredis';

  const redis = new Redis();

  async function isDuplicate(key) {
      // SET with NX (only if not exists) and 24h expiry
      const result = await redis.set(`webhook:${key}`, '1', 'NX', 'EX', 86400);
      return result === null;
  }

  app.post("/webhooks/alerts", async (req, res) => {
      const payload = req.body;
      const key = generateKey(payload);

      if (await isDuplicate(key)) {
          return res.json({ status: "duplicate" });
      }

      await processAlert(payload);
      res.json({ status: "ok" });
  });
  ```
</CodeGroup>

***

## Respond Quickly

Return 200 immediately, then process asynchronously:

<CodeGroup>
  ```python Python theme={null}
  from celery import Celery
  from flask import Flask, request, jsonify

  app = Flask(__name__)
  celery = Celery('tasks', broker='redis://localhost:6379/0')

  @celery.task
  def process_alert_async(payload):
      """Heavy processing happens here"""
      # Notify team
      send_slack_notification(payload)
      # Store in database
      save_to_db(payload)
      # Trigger downstream actions
      trigger_automation(payload)

  @app.route("/webhooks/alerts", methods=["POST"])
  def handle_alert():
      payload = request.json

      # Acknowledge immediately
      process_alert_async.delay(payload)

      return jsonify({"status": "queued"}), 200
  ```

  ```javascript Node.js theme={null}
  import Bull from 'bull';

  const alertQueue = new Bull('alerts', 'redis://localhost:6379');

  // Worker processes jobs asynchronously
  alertQueue.process(async (job) => {
      const payload = job.data;
      // Heavy processing
      await sendSlackNotification(payload);
      await saveToDb(payload);
      await triggerAutomation(payload);
  });

  app.post("/webhooks/alerts", async (req, res) => {
      const payload = req.body;

      // Acknowledge immediately, process async
      await alertQueue.add(payload);

      res.json({ status: "queued" });
  });
  ```
</CodeGroup>

***

## Queue Patterns

### Basic Queue Architecture

```
VideoDB → Webhook Endpoint → Queue → Worker → Actions
              ↓
         Return 200 fast
```

### Priority Queues

Handle critical alerts first:

<CodeGroup>
  ```python Python theme={null}
  from celery import Celery

  celery = Celery('tasks', broker='redis://localhost:6379/0')

  @app.route("/webhooks/alerts", methods=["POST"])
  def handle_alert():
      payload = request.json
      label = payload.get("label", "")

      # High priority for safety alerts
      if label in ["fall_detected", "intrusion_detected", "fire_detected"]:
          process_alert_async.apply_async(args=[payload], priority=0)
      else:
          process_alert_async.apply_async(args=[payload], priority=5)

      return jsonify({"status": "queued"}), 200
  ```

  ```javascript Node.js theme={null}
  import Bull from 'bull';

  const highPriorityQueue = new Bull('high-priority-alerts');
  const normalQueue = new Bull('normal-alerts');

  app.post("/webhooks/alerts", async (req, res) => {
      const payload = req.body;
      const label = payload.label || "";

      // High priority for safety alerts
      const safetyLabels = ["fall_detected", "intrusion_detected", "fire_detected"];

      if (safetyLabels.includes(label)) {
          await highPriorityQueue.add(payload);
      } else {
          await normalQueue.add(payload);
      }

      res.json({ status: "queued" });
  });
  ```
</CodeGroup>

***

## Error Handling

### Retry Failed Processing

<CodeGroup>
  ```python Python theme={null}
  from celery import Celery
  from celery.exceptions import MaxRetriesExceededError

  celery = Celery('tasks', broker='redis://localhost:6379/0')

  @celery.task(bind=True, max_retries=3, default_retry_delay=60)
  def process_alert_async(self, payload):
      try:
          # Process the alert
          send_notification(payload)
          save_to_db(payload)
      except Exception as e:
          # Retry with exponential backoff
          raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
  ```

  ```javascript Node.js theme={null}
  const alertQueue = new Bull('alerts', {
      defaultJobOptions: {
          attempts: 3,
          backoff: {
              type: 'exponential',
              delay: 60000  // 1 minute base
          }
      }
  });

  alertQueue.process(async (job) => {
      // Automatic retries on failure
      await sendNotification(job.data);
      await saveToDb(job.data);
  });

  // Handle failed jobs
  alertQueue.on('failed', (job, err) => {
      console.error(`Job ${job.id} failed after ${job.attemptsMade} attempts`);
      // Send to dead letter queue or alert ops team
  });
  ```
</CodeGroup>

### Dead Letter Queue

Handle permanently failed jobs:

<CodeGroup>
  ```python Python theme={null}
  @celery.task(bind=True, max_retries=3)
  def process_alert_async(self, payload):
      try:
          process(payload)
      except MaxRetriesExceededError:
          # Move to dead letter queue
          save_to_dead_letter(payload)
          alert_ops_team(payload)
      except Exception as e:
          raise self.retry(exc=e)
  ```

  ```javascript Node.js theme={null}
  const deadLetterQueue = new Bull('dead-letter');

  alertQueue.on('failed', async (job, err) => {
      if (job.attemptsMade >= job.opts.attempts) {
          // Move to dead letter queue
          await deadLetterQueue.add({
              originalPayload: job.data,
              error: err.message,
              failedAt: new Date()
          });
      }
  });
  ```
</CodeGroup>

***

## Monitoring

### Track Webhook Health

```python theme={null}
from prometheus_client import Counter, Histogram

webhook_received = Counter(
    'webhook_received_total',
    'Total webhooks received',
    ['event_label']
)

webhook_latency = Histogram(
    'webhook_processing_seconds',
    'Time to process webhook'
)

@app.route("/webhooks/alerts", methods=["POST"])
def handle_alert():
    payload = request.json

    webhook_received.labels(event_label=payload.get("label")).inc()

    with webhook_latency.time():
        process_alert(payload)

    return jsonify({"status": "ok"}), 200
```

### Alert on Issues

```python theme={null}
# Alert if processing takes too long
if processing_time > 5.0:
    send_ops_alert("Webhook processing slow")

# Alert if queue is backing up
queue_size = celery.control.inspect().active()
if queue_size > 1000:
    send_ops_alert("Alert queue backing up")
```

***

## Checklist

Before going to production:

* Idempotency keys stored in Redis/database
* Webhook returns 200 within 500ms
* Async processing with queue (Celery, Bull, etc.)
* Retry logic with exponential backoff
* Dead letter queue for failed jobs
* Monitoring and alerting
* Key expiry to prevent memory growth

***

## Next Steps

<CardGroup cols={2}>
  <Card icon="image" title="Event Detection Patterns" href="/pages/act/live-action/event-detection-patterns">
    Create effective detection rules
  </Card>

  <Card icon="bell" title="Alerts and Callbacks" href="/pages/act/live-action/alerts-and-callbacks">
    Wire events to delivery channels
  </Card>
</CardGroup>
