Real-time patterns

Streaming deep-dive

SSE wire format, Python generator pipelines, TypeScript async iteration, and resilient mid-stream error recovery — all in one reference.

SSE wire format

Server-Sent Events are a unidirectional stream from server to client over a single long-lived HTTP connection. The wire format is line-delimited UTF-8 text. Each message consists of one or morefield: valuelines terminated by \n, with a blank line signalling end-of-message.

id: 42
event: token
data: {"chunk":"Hello"}

id: 43
event: token
data: {"chunk":" world"}

id: 44
event: done
data: [DONE]
  • id — opaque string the client echoes back on reconnect via Last-Event-ID.
  • event — dispatched as a named EventSource listener; defaults to message.
  • data — payload; multiple data: lines are concatenated with \n.
  • retry — milliseconds the client should wait before reconnecting.

Python iterator pipeline

Python's iterator protocol maps naturally to SSE. A generator yields chunks; the framework flushes them as data: frames. Below is a FastAPI-style endpoint that streams tokens from an LLM and handles a mid-stream cancellation cleanly.

import asyncio
from fastapi.responses import StreamingResponse

async def token_generator(prompt: str):
    try:
        for i, token in enumerate(llm_stream(prompt)):
            if await request.is_disconnected():
                break
            yield f"id: {i}\n"
            yield f"event: token\n"
            yield f"data: {token}\n\n"
            await asyncio.sleep(0)  # yield event loop
    except GeneratorExit:
        pass
    finally:
        yield "event: done\ndata: [DONE]\n\n"

@app.post("/stream")
async def stream(prompt: str):
    return StreamingResponse(
        token_generator(prompt),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-store",
            "X-Accel-Buffering": "no",
        },
    )

The request.is_disconnected() check prevents wasted work when the client drops. The finally block guarantees a terminal event so the client can close cleanly.

TypeScript async iterable

On the client, EventSource is the simplest consumer, but for full control — custom headers, POST bodies, abort signals — use fetch() with a readable stream and an async iterator adapter.

async function* sseIterator(
  url: string,
  body: string,
  signal?: AbortSignal,
): AsyncGenerator<string, void, undefined> {
  const res = await fetch(url, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body,
    signal,
  });

  if (!res.ok) throw new Error(`HTTP ${res.status}`);
  if (!res.body) throw new Error("No readable stream");

  const reader = res.body
    .pipeThrough(new TextDecoderStream())
    .getReader();

  let buffer = "";
  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      buffer += value;

      const lines = buffer.split("\n");
      buffer = lines.pop() ?? "";

      for (const line of lines) {
        if (line.startsWith("data: ")) {
          const payload = line.slice(6);
          if (payload === "[DONE]") return;
          yield payload;
        }
      }
    }
  } finally {
    reader.releaseLock();
  }
}

The buffer handles partial line arrival. The [DONE] sentinel terminates the generator. The caller can pass an AbortSignal to cancel mid-stream.

Mid-stream error handling

Errors after the first byte are tricky — HTTP status is already 200. The pattern is to emit an error event with structured JSON, then close the stream. The client must listen for it explicitly.

// Server (Python)
try:
    async for token in pipeline(prompt):
        yield f"data: {token}\n\n"
except RateLimitError as e:
    yield (
        "event: error\n"
        f"data: {{\"code\":\"rate_limited\",\"retry_after\":{e.retry_after}}}\n\n"
    )
except Exception:
    yield "event: error\ndata: {\"code\":\"internal\"}\n\n"
// Client (TypeScript)
const source = new EventSource("/api/stream");
source.addEventListener("error", (e: MessageEvent) => {
  const payload = JSON.parse(e.data);
  if (payload.code === "rate_limited") {
    setTimeout(() => source.close(), payload.retry_after * 1000);
  }
  source.close();
});

Production note

Always set a server-side timeout on the generator. A stalled upstream should not hold the connection open indefinitely. In FastAPI, wrap the iterator in asyncio.wait_for with a per-chunk deadline; on the client, use AbortSignal.timeout as a last resort.