Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.mavera.io/llms.txt

Use this file to discover all available pages before exploring further.

With a standard API call, you wait until the entire response is generated before anything comes back. That’s fine for background jobs, but your users are staring at a blank screen the whole time. Streaming flips this around. You get tokens the moment they’re produced, so your users see the response being typed out in real time — just like ChatGPT. Under the hood, Mavera uses Server-Sent Events (SSE) to push each token as it’s ready.

Quick Example

Use client.responses.stream() and iterate over events instead of reading a single response.
import os
from openai import OpenAI

client = OpenAI(
    api_key=os.environ["MAVERA_API_KEY"],
    base_url="https://app.mavera.io/api/v1",
)

with client.responses.stream(
    model="mavera-1",
    input="Explain API rate limiting in 3 sentences.",
    extra_body={"persona_id": os.environ.get("PERSONA_ID")},
) as stream:
    for event in stream:
        if event.type == "response.output_text.delta":
            print(event.delta, end="", flush=True)
That’s it. Same client, same model. Use client.responses.stream() and handle named events as they arrive.

How Streaming Works

When you stream, the API doesn’t wait to finish generating. Instead, it opens a long-lived HTTP connection and pushes Server-Sent Events — one per token (or small group of tokens). Each event has a type that tells you what happened. The final response.completed event signals the stream is done and includes usage data. The connection stays open until the model finishes or an error occurs. Your client reads events as they arrive, so there’s no polling.
Streaming doesn’t change what the model generates — you get the exact same output. It only changes when you receive it.

Event Structure

Each SSE event is a named event with a type field. Here are the key events you’ll encounter:
Event TypeDescription
response.createdResponse object created — streaming has started
response.output_item.addedA new output item (text, function call) has been added
response.output_text.deltaA text token — read it from event.delta
response.output_text.doneText generation for the current item is complete
response.output_item.doneThe current output item is fully complete
response.completedResponse is finished — includes full usage data
A typical stream looks like this:
// response.created
{"type": "response.created", "response": {"id": "resp_abc123", "status": "in_progress"}}

// response.output_text.delta events
{"type": "response.output_text.delta", "delta": "Rate"}
{"type": "response.output_text.delta", "delta": " limiting"}
{"type": "response.output_text.delta", "delta": " prevents"}

// response.completed
{"type": "response.completed", "response": {"id": "resp_abc123", "status": "completed", "usage": {"input_tokens": 18, "output_tokens": 42, "total_tokens": 60}}}
Token usage is included automatically in the response.completed event — no extra parameters needed.

Building a Chat UI

In a real application you need the full response text after streaming finishes — for storing in a database, passing to the next API call, or displaying in a conversation thread. Accumulate deltas as they arrive.
import os
from openai import OpenAI

client = OpenAI(
    api_key=os.environ["MAVERA_API_KEY"],
    base_url="https://app.mavera.io/api/v1",
)

def stream_response(input_messages, persona_id):
    chunks = []

    with client.responses.stream(
        model="mavera-1",
        input=input_messages,
        extra_body={"persona_id": persona_id},
    ) as stream:
        for event in stream:
            if event.type == "response.output_text.delta":
                chunks.append(event.delta)
                print(event.delta, end="", flush=True)

            if event.type == "response.completed":
                usage = event.response.usage
                print(f"\n\n[Tokens: {usage.total_tokens}, Credits: {usage.credits_used}]")

    return "".join(chunks)

full_response = stream_response(
    input_input=[{"role": "user", "content": "What drives Gen Z brand loyalty?"}],
    persona_id=os.environ.get("PERSONA_ID"),
)
For a web frontend, the same pattern applies — push each event.delta into your UI state and let your framework re-render. In React, that looks like appending to a useState string inside the loop.

Streaming with Structured Outputs

Structured outputs work with streaming. The JSON arrives token by token just like plain text. You won’t have valid JSON until the stream finishes, so accumulate everything, then parse once at the end.
with client.responses.stream(
    model="mavera-1",
    input=[
        {"role": "user", "content": "Analyze this headline: 'AI Replaces 50% of Marketing Jobs'"}
    ],
    extra_body={
        "persona_id": os.environ.get("PERSONA_ID"),
        "text": {
            "format": {
                "type": "json_schema",
                "json_schema": {
                    "name": "headline_analysis",
                    "strict": True,
                    "schema": {
                        "type": "object",
                        "properties": {
                            "sentiment": {"type": "string", "enum": ["positive", "neutral", "negative"]},
                            "clickbait_score": {"type": "number"},
                            "key_claims": {"type": "array", "items": {"type": "string"}},
                            "suggested_revision": {"type": "string"}
                        },
                        "required": ["sentiment", "clickbait_score", "key_claims", "suggested_revision"]
                    }
                }
            }
        }
    },
) as stream:
    raw = []
    for event in stream:
        if event.type == "response.output_text.delta":
            raw.append(event.delta)
            print(event.delta, end="", flush=True)

result = json.loads("".join(raw))
print(f"\n\nSentiment: {result['sentiment']}")
print(f"Clickbait score: {result['clickbait_score']}")
You can show a live JSON preview while streaming by attempting JSON.parse() on each accumulated chunk. Libraries like partial-json can parse incomplete JSON for real-time UI updates.

Streaming with Function Calling

When the model decides to call a tool, the function name and arguments stream in as events. You’ll receive response.function_call_arguments.delta events with argument fragments. Accumulate them the same way you accumulate text content.
tools = [{
    "type": "function",
    "name": "lookup_competitor",
    "description": "Look up competitor information by name",
    "parameters": {
        "type": "object",
        "properties": {
            "company": {"type": "string", "description": "Competitor company name"},
            "metrics": {
                "type": "array",
                "items": {"type": "string"},
                "description": "Metrics to retrieve"
            }
        },
        "required": ["company"]
    }
}]

with client.responses.stream(
    model="mavera-1",
    input=[{"role": "user", "content": "Compare our pricing to Acme Corp"}],
    extra_body={"persona_id": os.environ.get("PERSONA_ID"), "tools": tools},
) as stream:
    function_name = ""
    call_id = ""
    arguments = ""

    for event in stream:
        if event.type == "response.output_item.added":
            if hasattr(event, "item") and event.item.type == "function_call":
                function_name = event.item.name
                call_id = event.item.call_id

        if event.type == "response.function_call_arguments.delta":
            arguments += event.delta

        if event.type == "response.output_item.done":
            if hasattr(event, "item") and event.item.type == "function_call":
                print(f"Function: {function_name}")
                parsed_args = json.loads(arguments)
After you receive the full tool call, execute the function locally and send the result back in a follow-up request (streaming again if you want the final answer streamed too).

Error Handling

Streams can fail mid-way. A network hiccup, a server timeout, or a client disconnect can leave you with a partial response. Here’s how to handle the common cases.

Connection Drops and Timeouts

Wrap your stream in a try/catch to handle broken connections gracefully. Decide whether to retry (if idempotent) or surface the partial response to the user.
from openai import APIConnectionError, APITimeoutError

def stream_with_recovery(input_data, persona_id, max_retries=3):
    for attempt in range(max_retries):
        chunks = []
        try:
            with client.responses.stream(
                model="mavera-1",
                input=input_data,
                extra_body={"persona_id": persona_id},
            ) as stream:
                for event in stream:
                    if event.type == "response.output_text.delta":
                        chunks.append(event.delta)
                        print(event.delta, end="", flush=True)

            return "".join(chunks)

        except (APIConnectionError, APITimeoutError) as e:
            partial = "".join(chunks)
            print(f"\n\n[Connection lost after {len(partial)} chars. Attempt {attempt + 1}/{max_retries}]")

            if attempt == max_retries - 1:
                if partial:
                    print("[Returning partial response]")
                    return partial
                raise

        except Exception as e:
            print(f"\n[Stream error: {e}]")
            raise

Checklist

The OpenAI SDK lets you pass timeout (in seconds for Python, milliseconds for JS). Without a timeout, a stalled connection can hang forever. 60 seconds is a reasonable default.
If the very first event errors out, you’ll get an exception before any content arrives. Handle this the same as a non-streaming API error — retry or surface the error to the user.
If the stream drops while returning JSON, you’ll have invalid JSON. Don’t try to parse it — surface a user-friendly error and retry the request.
If you’re rate-limited, the streaming request fails before any events are sent. You’ll get a RateLimitError (Python) or a response with status: 429 (JS). Handle this with exponential backoff, same as non-streaming.
Streaming requests consume the same credits as standard requests. A dropped connection still costs credits for the tokens that were generated before the disconnect.

When to Use Streaming

Streaming isn’t always the right choice. Here’s a quick decision guide:
Use CaseStreamingStandard
Chat UIs and conversational appsYes — users see responses instantlyNo — awkward delay
Long-form content (articles, reports)Yes — show progress on long generationsDepends on context
Batch processing and pipelinesNo — overhead of event handling isn’t worth itYes — simpler code
Structured outputs (JSON)Either — stream for UX, standard for simplicityEither
Function callingEither — stream to show “thinking” stateEither
Webhooks and async workflowsNo — you need the full response in one payloadYes
A good rule of thumb: if a human is watching, stream it. If a machine is consuming it, use standard.

See Also

Responses API

Full API reference for responses, including all parameters

Structured Outputs

JSON mode and JSON Schema for typed responses

Error Handling

Complete error codes and retry strategies

Rate Limits

Request limits, headers, and backoff patterns