cheat sheet

Claude API

Streaming responses from the Anthropic API — server-sent events, event types, async iteration in Python and TypeScript, partial tool input, error handling, and retry strategies.

Claude API — Streaming

What it is

Streaming returns Claude's response one token (or one event) at a time over a single HTTP connection using Server-Sent Events (SSE). Instead of waiting for the entire reply to be generated before the request resolves, your client receives message_start, a series of content_block_delta events, and a final message_stop — giving you "typewriter" output, partial JSON for tools, and low time-to-first-token even for very long responses. Reach for streaming any time the user is waiting on the response in real time (chat UIs, IDE assistants, voice agents) or when you need to act on a partial result (early-exit on a stop sequence, progressive UI).

When to stream

Use caseStream?
Interactive chat UIYes — UX needs incremental tokens
IDE / coding assistantYes — show code as it's written
Voice agent (TTS pipeline)Yes — speak as text arrives
Long-form generation (>2000 tokens)Yes — avoid 60s+ wait
Background batch jobNo — use Batch API at 50% cost
Strict JSON extractionOptional — non-streaming is simpler
Tool-only deterministic outputOptional — text deltas don't help

Event types

Streaming emits a sequence of typed events. Each maps to an SSE event: field over the wire; the SDKs surface them as typed objects.

EventWhenPayload
message_startOnce at startmessage shell with empty content and initial usage
content_block_startOnce per content blockindex, content_block (type=text/tool_use/thinking)
content_block_deltaMany — the actual streamindex, delta (text_delta / input_json_delta / thinking_delta / signature_delta)
content_block_stopOnce per content blockindex
message_deltaNear the enddelta (stop_reason, stop_sequence), updated usage
message_stopOnce at end{}
pingKeepalive (~30s){} — ignore
errorOn API error mid-streamerror object — surface to caller

Python — high-level

client.messages.stream(...) returns a context manager whose text_stream yields chunk strings. The simplest pattern: iterate text_stream, print/append, then read .get_final_message() after the loop for usage and stop reason.

python
import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-opus-4-7",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Write a haiku about streams."}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
    print()

    final = stream.get_final_message()
    print(f"stop_reason: {final.stop_reason}")
    print(f"tokens out: {final.usage.output_tokens}")

Output:

text
Bits flow without pause—
A whisper between two hosts,
Words arriving live.

stop_reason: end_turn
tokens out: 26

Python — low-level events

Iterate raw events when you need to render thinking blocks, surface tool input as it streams, or implement custom UI for content_block transitions.

python
with client.messages.stream(
    model="claude-opus-4-7",
    max_tokens=512,
    messages=[{"role": "user", "content": "Hi"}],
) as stream:
    for event in stream:
        if event.type == "message_start":
            print(f"[start] model={event.message.model}")
        elif event.type == "content_block_start":
            print(f"[block_start] type={event.content_block.type}")
        elif event.type == "content_block_delta":
            if event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)
        elif event.type == "content_block_stop":
            print(f"\n[block_stop] index={event.index}")
        elif event.type == "message_delta":
            print(f"[message_delta] stop={event.delta.stop_reason} usage={event.usage}")
        elif event.type == "message_stop":
            print("[stop]")

Output:

text
[start] model=claude-opus-4-7-20251001
[block_start] type=text
Hello! How can I help you today?
[block_stop] index=0
[message_delta] stop=end_turn usage=Usage(output_tokens=11)
[stop]

Python — async streaming

For FastAPI, async workers, or any asyncio context, use AsyncAnthropic and async for.

python
import asyncio
import anthropic

async def main() -> None:
    client = anthropic.AsyncAnthropic()
    async with client.messages.stream(
        model="claude-opus-4-7",
        max_tokens=512,
        messages=[{"role": "user", "content": "Count to 3."}],
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)
        print()

asyncio.run(main())

Output:

text
One. Two. Three.

TypeScript — high-level

client.messages.stream(...) returns a MessageStream you can for await over for incremental text, or await .finalMessage() for the assembled result.

typescript
import Anthropic from "@anthropic-ai/sdk";

const client = new Anthropic();

const stream = client.messages.stream({
  model: "claude-opus-4-7",
  max_tokens: 1024,
  messages: [{ role: "user", content: "Write a limerick about TypeScript." }],
});

for await (const text of stream) {
  process.stdout.write(text);
}
process.stdout.write("\n");

const final = await stream.finalMessage();
console.log("stop_reason:", final.stop_reason);
console.log("tokens out:", final.usage.output_tokens);

Output:

text
There once was a coder named Sue,
Whose types were both strict and quite true—
With `infer` and `extends`,
She tamed all the trends,
And shipped without errors in view.

stop_reason: end_turn
tokens out: 56

TypeScript — low-level events

Iterate the raw event stream by for await-ing the stream and inspecting event.type.

typescript
for await (const event of stream) {
  switch (event.type) {
    case "message_start":
      console.log(`[start] ${event.message.model}`);
      break;
    case "content_block_delta":
      if (event.delta.type === "text_delta") process.stdout.write(event.delta.text);
      if (event.delta.type === "input_json_delta") process.stdout.write(event.delta.partial_json);
      break;
    case "message_delta":
      console.log(`[stop_reason] ${event.delta.stop_reason}`);
      break;
  }
}

TypeScript — event callbacks

The SDK exposes a Node-style emitter API for callback-driven code.

typescript
const stream = client.messages
  .stream({ model: "claude-opus-4-7", max_tokens: 256, messages: [{ role: "user", content: "Hi" }] })
  .on("text", (text) => process.stdout.write(text))
  .on("error", (err) => console.error(err))
  .on("end", () => console.log("\n[done]"));

await stream.finalMessage();

Output:

text
Hello! How can I help today?
[done]

Raw HTTP / SSE wire format

Below the SDK, the API sends UTF-8 SSE frames. Useful when you must reimplement streaming in a language without an official SDK or when piping through a proxy.

text
event: message_start
data: {"type":"message_start","message":{"id":"msg_01ABC...","type":"message","role":"assistant","content":[],"model":"claude-opus-4-7-20251001","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":12,"output_tokens":1}}}

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}

event: content_block_stop
data: {"type":"content_block_stop","index":0}

event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":24}}

event: message_stop
data: {"type":"message_stop"}

Open a stream with curl to see the raw frames:

bash
curl -N https://api.anthropic.com/v1/messages \
    -H "Content-Type: application/json" \
    -H "x-api-key: $ANTHROPIC_API_KEY" \
    -H "anthropic-version: 2023-06-01" \
    -d '{
      "model": "claude-opus-4-7",
      "max_tokens": 64,
      "stream": true,
      "messages": [{"role": "user", "content": "Hi"}]
    }'

Output:

text
event: message_start
data: {"type":"message_start","message":{...}}

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello!"}}

...

Streaming tool input

When a tool call is emitted, its input JSON arrives across multiple input_json_delta events. The SDK assembles the final input for you on stream.get_final_message(); iterate deltas only if you want to render a partial command in the UI before it completes.

python
buffer: list[str] = []

with client.messages.stream(
    model="claude-opus-4-7",
    max_tokens=1024,
    tools=[{
        "name": "search",
        "description": "Search the docs.",
        "input_schema": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]},
    }],
    messages=[{"role": "user", "content": "Find anything about retries."}],
) as stream:
    for event in stream:
        if event.type == "content_block_start" and event.content_block.type == "tool_use":
            print(f"\n[tool call: {event.content_block.name}]")
        elif event.type == "content_block_delta" and event.delta.type == "input_json_delta":
            buffer.append(event.delta.partial_json)
            print(event.delta.partial_json, end="", flush=True)

    final = stream.get_final_message()
    tool_use = next(b for b in final.content if b.type == "tool_use")
    print(f"\n[final input: {tool_use.input}]")

Output:

text
[tool call: search]
{"query":"retries"}
[final input: {'query': 'retries'}]

Partial JSON is just a prefix of the eventual valid JSON — do not try to json.loads it until content_block_stop arrives. Buffer the deltas, then parse once.

Streaming thinking blocks

When extended thinking is enabled, you receive thinking_delta events during the private reasoning step and (with the encrypted thinking beta) signature_delta events at the end of the block. The SDK groups them into a thinking block on final_message.

python
with client.messages.stream(
    model="claude-opus-4-7",
    max_tokens=8000,
    thinking={"type": "enabled", "budget_tokens": 5000},
    messages=[{"role": "user", "content": "Why is the sky blue?"}],
) as stream:
    in_thinking = False
    for event in stream:
        if event.type == "content_block_start":
            in_thinking = event.content_block.type == "thinking"
            if in_thinking:
                print("\n[thinking...]")
            elif event.content_block.type == "text":
                print("\n[answer]\n")
        elif event.type == "content_block_delta":
            if event.delta.type == "thinking_delta":
                print(event.delta.thinking, end="", flush=True)
            elif event.delta.type == "text_delta":
                print(event.delta.text, end="", flush=True)

Error handling mid-stream

If the API errors mid-stream (overloaded, rate limit, content policy), it sends an error event and closes the connection. The SDK raises a typed exception on the next iteration; outside the SDK, parse the event and surface a friendly message.

python
import anthropic

try:
    with client.messages.stream(
        model="claude-opus-4-7",
        max_tokens=1024,
        messages=[{"role": "user", "content": "..."}],
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
except anthropic.OverloadedError:
    print("\n[server overloaded — retry with backoff]")
except anthropic.RateLimitError as e:
    retry_after = e.response.headers.get("retry-after", "60")
    print(f"\n[rate limited — retry in {retry_after}s]")
except anthropic.APIStatusError as e:
    print(f"\n[error {e.status_code}: {e.message}]")

Retry pattern

A robust streaming client retries on transient failures (5xx, rate-limit) with exponential backoff and gives up on permanent ones (4xx auth, bad request).

python
import time
import anthropic

def stream_with_retry(messages: list, max_attempts: int = 4) -> str:
    delay = 1.0
    for attempt in range(max_attempts):
        try:
            chunks: list[str] = []
            with client.messages.stream(
                model="claude-opus-4-7",
                max_tokens=2048,
                messages=messages,
            ) as stream:
                for text in stream.text_stream:
                    chunks.append(text)
            return "".join(chunks)
        except (anthropic.RateLimitError, anthropic.OverloadedError, anthropic.APIConnectionError) as exc:
            if attempt == max_attempts - 1:
                raise
            print(f"[transient error: {exc}; retrying in {delay:.1f}s]")
            time.sleep(delay)
            delay *= 2
    return ""

Browser — fetch + ReadableStream

If you proxy streaming through your own backend (the recommended pattern — never expose ANTHROPIC_API_KEY to the browser), the browser consumes a text/event-stream or plain-text stream via fetch.

typescript
const response = await fetch("/chat", {
  method: "POST",
  body: JSON.stringify({ message: "Hello" }),
});

if (!response.body) throw new Error("no stream");

const reader = response.body.getReader();
const decoder = new TextDecoder();
let done = false;
while (!done) {
  const { value, done: streamDone } = await reader.read();
  done = streamDone;
  if (value) document.getElementById("out")!.textContent += decoder.decode(value);
}

If your backend re-emits proper SSE (text/event-stream), use the EventSource API instead — it auto-reconnects on disconnect.

Custom SSE parser

When you cannot use a Claude SDK (Go, Rust, Elixir, embedded), parse SSE yourself. The format: each event is a sequence of field: value lines, terminated by a blank line.

python
import json
import httpx

def stream_messages(api_key: str, body: dict):
    with httpx.stream(
        "POST",
        "https://api.anthropic.com/v1/messages",
        headers={
            "x-api-key": api_key,
            "anthropic-version": "2023-06-01",
            "content-type": "application/json",
        },
        json={**body, "stream": True},
        timeout=None,
    ) as r:
        r.raise_for_status()
        event_name = None
        for line in r.iter_lines():
            if not line:
                event_name = None
                continue
            if line.startswith("event: "):
                event_name = line[7:]
            elif line.startswith("data: "):
                payload = json.loads(line[6:])
                yield event_name, payload

for name, data in stream_messages(
    "sk-ant-api03-…REDACTED…",
    {"model": "claude-opus-4-7", "max_tokens": 64, "messages": [{"role": "user", "content": "Hi"}]},
):
    print(name, data.get("delta", {}).get("text", ""))

Output:

text
message_start 
content_block_start 
content_block_delta Hello
content_block_delta !
content_block_delta  How can I help?
content_block_stop 
message_delta 
message_stop 

Stop sequences interrupt streams

A stop_sequences match terminates the stream as soon as the sequence appears in the output — useful for "emit only one JSON object" or "stop before the closing fence". The matched sequence itself is not included in the deltas.

python
with client.messages.stream(
    model="claude-opus-4-7",
    max_tokens=2048,
    stop_sequences=["</answer>"],
    messages=[{"role": "user", "content": "Reply with <answer>YES</answer> or <answer>NO</answer>."}],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
    final = stream.get_final_message()
    print(f"\n[stop_reason={final.stop_reason}, stop_sequence={final.stop_sequence}]")

Output:

text
<answer>YES
[stop_reason=stop_sequence, stop_sequence=</answer>]

Common pitfalls

PitfallSymptomFix
Not using with / closing the streamSockets leak, hung connectionswith client.messages.stream(...) as s: (Python) or finalise with await stream.finalMessage() (TS)
Parsing partial JSON mid-streamJSONDecodeErrorBuffer input_json_delta until content_block_stop, then parse
Reading text_stream and eventsDouble-iteration errorPick one — text or events
Forwarding bytes to browser without decodingMojibake on multi-byte UTF-8Use TextDecoder (browser) / decode chunks consistently
Treating ping as contentSpurious empty deltasFilter event.type == "ping"
Ignoring message_delta.usageCannot estimate cost from streamRead updated usage from message_delta, not message_start
stream=true with count_tokensAPI rejectscount_tokens is non-stream only

Common recipes

First-token latency timer

python
import time

start = time.perf_counter()
first_token_at = None

with client.messages.stream(
    model="claude-opus-4-7",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Hi"}],
) as stream:
    for text in stream.text_stream:
        if first_token_at is None:
            first_token_at = time.perf_counter()
            print(f"[first token in {(first_token_at - start) * 1000:.0f} ms]")
        print(text, end="", flush=True)
    print(f"\n[total {(time.perf_counter() - start) * 1000:.0f} ms]")

Output:

text
[first token in 412 ms]
Hello! How can I help?
[total 1284 ms]

Cancel a stream early

python
with client.messages.stream(
    model="claude-opus-4-7",
    max_tokens=4096,
    messages=[{"role": "user", "content": "Write a long essay."}],
) as stream:
    out: list[str] = []
    for text in stream.text_stream:
        out.append(text)
        if len("".join(out)) > 200:
            stream.close()        # closes the underlying HTTP connection
            print("[cancelled at 200 chars]")
            break

TypeScript — abort signal

typescript
const controller = new AbortController();
setTimeout(() => controller.abort(), 2000);    // give up after 2s

try {
  const stream = client.messages.stream(
    { model: "claude-opus-4-7", max_tokens: 2048, messages: [{ role: "user", content: "..." }] },
    { signal: controller.signal },
  );
  for await (const text of stream) process.stdout.write(text);
} catch (err: any) {
  if (err?.name === "AbortError") console.log("\n[aborted]");
  else throw err;
}

Replay a stream from logs

Streaming events can be appended to a file and replayed later for debugging without burning more tokens:

python
import json, pathlib

log = pathlib.Path("stream.jsonl").open("w")
with client.messages.stream(
    model="claude-opus-4-7",
    max_tokens=512,
    messages=[{"role": "user", "content": "Test"}],
) as stream:
    for event in stream:
        log.write(json.dumps(event.model_dump()) + "\n")
log.close()

See also