Documentation Index
Fetch the complete documentation index at: https://docs.videodb.io/llms.txt
Use this file to discover all available pages before exploring further.
When to Use This
- Uploading many files in a batch
- Building production pipelines
- Handling failures gracefully
- Processing webhook callbacks
Async Upload Pattern
For large files or many uploads, use callbacks instead of waiting:
import videodb
conn = videodb.connect()
coll = conn.get_collection()
# Fire-and-forget uploads
for url in video_urls:
coll.upload(
url=url,
callback_url="https://your-backend.com/webhooks/upload"
)
Webhook Handler
Handle upload callbacks in your backend:
from fastapi import FastAPI, Request
app = FastAPI()
@app.post("/webhooks/upload")
async def handle_upload(request: Request):
event = await request.json()
if event["success"]:
video_id = event["data"]["id"]
# Trigger next step (indexing, processing, etc.)
await start_indexing(video_id)
else:
# Handle failure
await log_failure(event["message"])
return {"status": "ok"}
Callback Payloads
Success:
{
"success": true,
"data": {
"id": "m-xxx",
"collection_id": "c-xxx",
"name": "video.mp4",
"length": "191.14",
"stream_url": "https://stream.videodb.io/...",
"player_url": "https://console.videodb.io/player?..."
}
}
Failure:
{
"success": false,
"message": "Download failed."
}
Batch Upload with Tracking
Track multiple uploads and wait for all to complete:
import asyncio
from collections import defaultdict
# Track pending uploads
pending = defaultdict(asyncio.Event)
async def upload_batch(urls):
for url in urls:
upload_id = generate_id()
pending[upload_id] = asyncio.Event()
coll.upload(
url=url,
callback_url=f"https://backend.com/webhooks?id={upload_id}"
)
# Wait for all callbacks
await asyncio.gather(*[e.wait() for e in pending.values()])
# In webhook handler
@app.post("/webhooks")
async def handle(request: Request, id: str):
event = await request.json()
if id in pending:
pending[id].set()
return {"status": "ok"}
Retry Pattern
Handle transient failures with exponential backoff:
import time
from functools import wraps
def retry_upload(max_attempts=3, base_delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt)
time.sleep(delay)
return wrapper
return decorator
@retry_upload(max_attempts=3)
def upload_with_retry(coll, url):
return coll.upload(url=url)
Indexing Callbacks
Chain indexing after upload completes:
# Upload callback triggers indexing
@app.post("/webhooks/upload")
async def handle_upload(request: Request):
event = await request.json()
if event["success"]:
video_id = event["data"]["id"]
coll = conn.get_collection(event["data"]["collection_id"])
video = coll.get_video(video_id)
# Trigger async indexing
video.index_spoken_words(
callback_url="https://backend.com/webhooks/index"
)
return {"status": "ok"}
# Index callback
@app.post("/webhooks/index")
async def handle_index(request: Request):
event = await request.json()
if event["success"]:
# Video is now searchable
await notify_ready(event["data"]["id"])
return {"status": "ok"}
Error Handling
Common error responses and how to handle them:
| Error | Cause | Action |
|---|
Download failed | URL inaccessible | Verify URL, check permissions |
Invalid media type | Wrong MediaType enum | Match MediaType to file |
Something went wrong | Corrupted file | Re-encode source file |
Next Steps
Upload Video
Upload methods reference
Live Streams
Ingest from RTSP sources