Skip to main content

When to Use This

  • Uploading many files in a batch
  • Building production pipelines
  • Handling failures gracefully
  • Processing webhook callbacks

Async Upload Pattern

For large files or many uploads, use callbacks instead of waiting:
import videodb

conn = videodb.connect()
coll = conn.get_collection()

# Fire-and-forget uploads
for url in video_urls:
    coll.upload(
        url=url,
        callback_url="https://your-backend.com/webhooks/upload"
    )

Webhook Handler

Handle upload callbacks in your backend:
Python
from fastapi import FastAPI, Request

app = FastAPI()

@app.post("/webhooks/upload")
async def handle_upload(request: Request):
    event = await request.json()

    if event["success"]:
        video_id = event["data"]["id"]
        # Trigger next step (indexing, processing, etc.)
        await start_indexing(video_id)
    else:
        # Handle failure
        await log_failure(event["message"])

    return {"status": "ok"}

Callback Payloads

Success:
{
  "success": true,
  "data": {
    "id": "m-xxx",
    "collection_id": "c-xxx",
    "name": "video.mp4",
    "length": "191.14",
    "stream_url": "https://stream.videodb.io/...",
    "player_url": "https://console.videodb.io/player?..."
  }
}
Failure:
{
  "success": false,
  "message": "Download failed."
}

Batch Upload with Tracking

Track multiple uploads and wait for all to complete:
import asyncio
from collections import defaultdict

# Track pending uploads
pending = defaultdict(asyncio.Event)

async def upload_batch(urls):
    for url in urls:
        upload_id = generate_id()
        pending[upload_id] = asyncio.Event()

        coll.upload(
            url=url,
            callback_url=f"https://backend.com/webhooks?id={upload_id}"
        )

    # Wait for all callbacks
    await asyncio.gather(*[e.wait() for e in pending.values()])

# In webhook handler
@app.post("/webhooks")
async def handle(request: Request, id: str):
    event = await request.json()
    if id in pending:
        pending[id].set()
    return {"status": "ok"}

Retry Pattern

Handle transient failures with exponential backoff:
import time
from functools import wraps

def retry_upload(max_attempts=3, base_delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise
                    delay = base_delay * (2 ** attempt)
                    time.sleep(delay)
        return wrapper
    return decorator

@retry_upload(max_attempts=3)
def upload_with_retry(coll, url):
    return coll.upload(url=url)

Indexing Callbacks

Chain indexing after upload completes:
# Upload callback triggers indexing
@app.post("/webhooks/upload")
async def handle_upload(request: Request):
    event = await request.json()

    if event["success"]:
        video_id = event["data"]["id"]
        coll = conn.get_collection(event["data"]["collection_id"])
        video = coll.get_video(video_id)

        # Trigger async indexing
        video.index_spoken_words(
            callback_url="https://backend.com/webhooks/index"
        )

    return {"status": "ok"}

# Index callback
@app.post("/webhooks/index")
async def handle_index(request: Request):
    event = await request.json()

    if event["success"]:
        # Video is now searchable
        await notify_ready(event["data"]["id"])

    return {"status": "ok"}

Error Handling

Common error responses and how to handle them:
ErrorCauseAction
Download failedURL inaccessibleVerify URL, check permissions
Invalid media typeWrong MediaType enumMatch MediaType to file
Something went wrongCorrupted fileRe-encode source file

Next Steps