Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.mavera.io/llms.txt

Use this file to discover all available pages before exploring further.

Scenario

Use Deepgram’s streaming transcription via WebSocket for live event coverage. As transcript segments arrive in real time, batch them every 30 seconds and send to Mavera Generate for live-blogging content — turning a keynote into publishable blog snippets as it happens. Flow: Deepgram WebSocket wss://api.deepgram.com/v1/listen?model=nova-3&encoding=linear16&sample_rate=16000 → real-time segments → batch every 30s → Mavera POST /generations → Live blog posts
Streaming requires a live audio source. The examples read from a WAV file to simulate. In production, pipe from a microphone, RTMP feed, or SIP trunk. Match encoding and sample_rate to your source.

Code

import os, asyncio, json, time, requests, websockets

DG = os.environ["DEEPGRAM_API_KEY"]
MV = os.environ["MAVERA_API_KEY"]
MV_BASE = "https://app.mavera.io/api/v1"
MV_H = {"Authorization": f"Bearer {MV}", "Content-Type": "application/json"}

async def stream_and_blog():
    url = ("wss://api.deepgram.com/v1/listen?model=nova-3&encoding=linear16"
        "&sample_rate=16000&smart_format=true&punctuate=true&interim_results=false")
    buf, posts = [], []
    last_flush = time.time()
    retries, max_retries = 0, 5

    while retries < max_retries:
        try:
            async with websockets.connect(url, extra_headers={"Authorization": f"Token {DG}"}) as ws:
                print("Connected to Deepgram streaming API")
                retries = 0

                async def send_audio():
                    with open("keynote-livestream.wav", "rb") as f:
                        f.read(44)
                        while chunk := f.read(4096):
                            await ws.send(chunk)
                            await asyncio.sleep(0.1)
                    await ws.send(json.dumps({"type": "CloseStream"}))

                async def receive():
                    nonlocal last_flush
                    async for msg in ws:
                        d = json.loads(msg)
                        if d.get("type") != "Results": continue
                        text = d["channel"]["alternatives"][0].get("transcript","").strip()
                        if not text or not d.get("is_final"): continue
                        buf.append(text)
                        print(f"  [{len(buf):3d}] {text[:80]}")
                        if time.time() - last_flush >= 30 and buf:
                            batch = " ".join(buf); buf.clear(); last_flush = time.time()
                            p = requests.post(f"{MV_BASE}/generations", headers=MV_H, json={
                                "prompt": f"Live blog. 80-120 words, present tense.\n\n{batch[:3000]}"
                            }).json()
                            blog = p.get("output") or p.get("content") or ""
                            posts.append(blog)
                            print(f"\n  LIVE BLOG #{len(posts)}:\n  {blog[:300]}\n")

                await asyncio.gather(send_audio(), receive())
                break
        except websockets.exceptions.ConnectionClosed as e:
            retries += 1
            wait = min(2 ** retries, 30)
            print(f"Reconnecting in {wait}s ({retries}/{max_retries})...")
            await asyncio.sleep(wait)

    if buf:
        p = requests.post(f"{MV_BASE}/generations", headers=MV_H, json={
            "prompt": f"Live blog. Final segment:\n\n{' '.join(buf)[:3000]}"}).json()
        posts.append(p.get("output") or p.get("content") or "")
    print(f"\nLIVE BLOG COMPLETE: {len(posts)} posts")
    for i, p in enumerate(posts, 1): print(f"\n--- Post {i} ---\n{p[:500]}")

asyncio.run(stream_and_blog())

Example Output

Connected to Deepgram streaming API
  [  1] Welcome everyone to the 2026 product summit.
  [ 12] Let me show you the dashboard in action.

  LIVE BLOG #1:
  The 2026 Product Summit opens with CEO Maria Chen announcing three
  major platform updates. The headline: a new AI engine processing
  content 40% faster. "This isn't incremental," Chen says.

LIVE BLOG COMPLETE: 8 posts generated

Error Handling

Exponential backoff reconnection (1s → 30s max, 5 attempts). In production, persist the transcript buffer to disk between reconnections. Send heartbeat pings every 10 seconds.
Match encoding and sample_rate to your source: linear16+16000 (telephony), linear16+44100 (broadcast), opus+48000 (WebRTC). Mismatches produce garbled transcripts.
interim_results=false returns only finalized transcripts (higher accuracy, slight delay). Set to true for lower latency. The code filters is_final to avoid duplicates.
Each generation takes 2-5 seconds. For fast events, increase batch interval to 45-60 seconds for meatier blog segments.