Skip to main content
Capture sessions emit structured events in real-time. Use webhooks for durable delivery, WebSockets for live UI.

Quick Example

import videodb

conn = videodb.connect()
ws = conn.connect_websocket()
await ws.connect()

# Listen for events
async for ev in ws.stream():
    channel = ev.get("channel")

    if channel == "transcript":
        print(f"TRANSCRIPT: {ev['data']['text']}")
    elif channel == "scene_index":
        print(f"SCENE: {ev['data']['text']}")
    elif channel == "audio_index":
        print(f"AUDIO: {ev['data']['text']}")

Event Types

Transcript Events

Real-time speech-to-text from audio channels:
{
  "channel": "transcript",
  "rtstream_id": "rts-xxx",
  "rtstream_name": "mic:default",
  "data": {
    "text": "Let's schedule the meeting for Thursday",
    "is_final": true,
    "start": 1710000001234,
    "end": 1710000002345
  }
}
FieldDescription
textTranscribed speech
is_finaltrue for final, false for interim
start/endTimestamps (ms)

Visual Index Events

Scene descriptions from screen capture:
{
  "channel": "visual_index",
  "rtstream_id": "rts-xxx",
  "rtstream_name": "display:1",
  "data": {
    "text": "User is viewing a Slack conversation with 3 unread messages",
    "start": 1710000012340,
    "end": 1710000018900
  }
}

Audio Index Events

Semantic understanding of audio:
{
  "channel": "audio_index",
  "rtstream_id": "rts-xxx",
  "rtstream_name": "mic:default",
  "data": {
    "text": "Discussion about scheduling a team meeting",
    "start": 1710000021500,
    "end": 1710000029200
  }
}

Alert Events

Custom detection rules firing:
{
  "channel": "alert",
  "rtstream_id": "rts-xxx",
  "data": {
    "label": "sensitive_content",
    "triggered": true,
    "confidence": 0.92,
    "start": 1710000045100,
    "end": 1710000047800
  }
}

WebSocket Channels

ChannelSourceContent
capture_sessionSession lifecycleStatus changes
transcriptstart_transcript()Speech-to-text
scene_indexindex_visuals()Visual analysis
audio_indexindex_audio()Audio analysis
alertcreate_alert()Alert notifications

Connecting

conn = videodb.connect()
ws = conn.connect_websocket()
await ws.connect()

# Pass ws.connection_id when starting AI operations
rtstream.start_transcript(ws_connection_id=ws.connection_id)
rtstream.index_visuals(prompt="...", ws_connection_id=ws.connection_id)
rtstream.index_audio(prompt="...", ws_connection_id=ws.connection_id)

Webhooks

Durable, at-least-once delivery for session lifecycle events.

Webhook Envelope

{
  "version": "2",
  "event": "capture_session.active",
  "timestamp": "2026-01-20T12:34:56Z",
  "capture_session_id": "cap-xxx",
  "end_user_id": "user_abc",
  "status": "active",
  "data": {}
}

Session Lifecycle Events

EventStatusKey Data
capture_session.createdcreated
capture_session.startingstarting
capture_session.activeactivertstreams[]
capture_session.stoppingstopping
capture_session.stoppedstopped
capture_session.exportedexportedexported_video_id
capture_session.failedfailederror object

Key Webhook: capture_session.active

This is where you start AI pipelines:
{
  "event": "capture_session.active",
  "capture_session_id": "cap-xxx",
  "status": "active",
  "data": {
    "rtstreams": [
      { "rtstream_id": "rts-1", "name": "mic:default", "media_types": ["audio"] },
      { "rtstream_id": "rts-2", "name": "system_audio:default", "media_types": ["audio"] },
      { "rtstream_id": "rts-3", "name": "display:1", "media_types": ["video"] }
    ]
  }
}
def on_active_webhook(payload):
    cap = conn.get_capture_session(payload["capture_session_id"])

    for rts_info in payload["data"]["rtstreams"]:
        rts_id = rts_info["rtstream_id"]
        rts_name = rts_info["name"]

        if "audio" in rts_info["media_types"]:
            rtstream = conn.get_rtstream(rts_id)
            rtstream.start_transcript()
            rtstream.index_audio(prompt="Extract key decisions")

        if "video" in rts_info["media_types"]:
            rtstream = conn.get_rtstream(rts_id)
            rtstream.index_visuals(prompt="Describe what user is doing")

Delivery Semantics

MethodGuaranteeHandle
WebSocketBest-effortReconnect on disconnect
WebhookAt-least-onceDeduplicate by event ID
Webhooks may deliver duplicates. Respond 2xx quickly, process asynchronously, implement idempotency.

Next Steps