cheat sheet
Claude API
Streaming responses from the Anthropic API — server-sent events, event types, async iteration in Python and TypeScript, partial tool input, error handling, and retry strategies.
Claude API — Streaming
What it is
Streaming returns Claude's response one token (or one event) at a time over a single HTTP connection using Server-Sent Events (SSE). Instead of waiting for the entire reply to be generated before the request resolves, your client receives message_start, a series of content_block_delta events, and a final message_stop — giving you "typewriter" output, partial JSON for tools, and low time-to-first-token even for very long responses. Reach for streaming any time the user is waiting on the response in real time (chat UIs, IDE assistants, voice agents) or when you need to act on a partial result (early-exit on a stop sequence, progressive UI).
When to stream
| Use case | Stream? |
|---|---|
| Interactive chat UI | Yes — UX needs incremental tokens |
| IDE / coding assistant | Yes — show code as it's written |
| Voice agent (TTS pipeline) | Yes — speak as text arrives |
| Long-form generation (>2000 tokens) | Yes — avoid 60s+ wait |
| Background batch job | No — use Batch API at 50% cost |
| Strict JSON extraction | Optional — non-streaming is simpler |
| Tool-only deterministic output | Optional — text deltas don't help |
Event types
Streaming emits a sequence of typed events. Each maps to an SSE event: field over the wire; the SDKs surface them as typed objects.
| Event | When | Payload |
|---|---|---|
message_start | Once at start | message shell with empty content and initial usage |
content_block_start | Once per content block | index, content_block (type=text/tool_use/thinking) |
content_block_delta | Many — the actual stream | index, delta (text_delta / input_json_delta / thinking_delta / signature_delta) |
content_block_stop | Once per content block | index |
message_delta | Near the end | delta (stop_reason, stop_sequence), updated usage |
message_stop | Once at end | {} |
ping | Keepalive (~30s) | {} — ignore |
error | On API error mid-stream | error object — surface to caller |
Python — high-level
client.messages.stream(...) returns a context manager whose text_stream yields chunk strings. The simplest pattern: iterate text_stream, print/append, then read .get_final_message() after the loop for usage and stop reason.
import anthropic
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": "Write a haiku about streams."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print()
final = stream.get_final_message()
print(f"stop_reason: {final.stop_reason}")
print(f"tokens out: {final.usage.output_tokens}")
Output:
Bits flow without pause—
A whisper between two hosts,
Words arriving live.
stop_reason: end_turn
tokens out: 26
Python — low-level events
Iterate raw events when you need to render thinking blocks, surface tool input as it streams, or implement custom UI for content_block transitions.
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=512,
messages=[{"role": "user", "content": "Hi"}],
) as stream:
for event in stream:
if event.type == "message_start":
print(f"[start] model={event.message.model}")
elif event.type == "content_block_start":
print(f"[block_start] type={event.content_block.type}")
elif event.type == "content_block_delta":
if event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
elif event.type == "content_block_stop":
print(f"\n[block_stop] index={event.index}")
elif event.type == "message_delta":
print(f"[message_delta] stop={event.delta.stop_reason} usage={event.usage}")
elif event.type == "message_stop":
print("[stop]")
Output:
[start] model=claude-opus-4-7-20251001
[block_start] type=text
Hello! How can I help you today?
[block_stop] index=0
[message_delta] stop=end_turn usage=Usage(output_tokens=11)
[stop]
Python — async streaming
For FastAPI, async workers, or any asyncio context, use AsyncAnthropic and async for.
import asyncio
import anthropic
async def main() -> None:
client = anthropic.AsyncAnthropic()
async with client.messages.stream(
model="claude-opus-4-7",
max_tokens=512,
messages=[{"role": "user", "content": "Count to 3."}],
) as stream:
async for text in stream.text_stream:
print(text, end="", flush=True)
print()
asyncio.run(main())
Output:
One. Two. Three.
TypeScript — high-level
client.messages.stream(...) returns a MessageStream you can for await over for incremental text, or await .finalMessage() for the assembled result.
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
const stream = client.messages.stream({
model: "claude-opus-4-7",
max_tokens: 1024,
messages: [{ role: "user", content: "Write a limerick about TypeScript." }],
});
for await (const text of stream) {
process.stdout.write(text);
}
process.stdout.write("\n");
const final = await stream.finalMessage();
console.log("stop_reason:", final.stop_reason);
console.log("tokens out:", final.usage.output_tokens);
Output:
There once was a coder named Sue,
Whose types were both strict and quite true—
With `infer` and `extends`,
She tamed all the trends,
And shipped without errors in view.
stop_reason: end_turn
tokens out: 56
TypeScript — low-level events
Iterate the raw event stream by for await-ing the stream and inspecting event.type.
for await (const event of stream) {
switch (event.type) {
case "message_start":
console.log(`[start] ${event.message.model}`);
break;
case "content_block_delta":
if (event.delta.type === "text_delta") process.stdout.write(event.delta.text);
if (event.delta.type === "input_json_delta") process.stdout.write(event.delta.partial_json);
break;
case "message_delta":
console.log(`[stop_reason] ${event.delta.stop_reason}`);
break;
}
}
TypeScript — event callbacks
The SDK exposes a Node-style emitter API for callback-driven code.
const stream = client.messages
.stream({ model: "claude-opus-4-7", max_tokens: 256, messages: [{ role: "user", content: "Hi" }] })
.on("text", (text) => process.stdout.write(text))
.on("error", (err) => console.error(err))
.on("end", () => console.log("\n[done]"));
await stream.finalMessage();
Output:
Hello! How can I help today?
[done]
Raw HTTP / SSE wire format
Below the SDK, the API sends UTF-8 SSE frames. Useful when you must reimplement streaming in a language without an official SDK or when piping through a proxy.
event: message_start
data: {"type":"message_start","message":{"id":"msg_01ABC...","type":"message","role":"assistant","content":[],"model":"claude-opus-4-7-20251001","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":12,"output_tokens":1}}}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}
event: content_block_stop
data: {"type":"content_block_stop","index":0}
event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":24}}
event: message_stop
data: {"type":"message_stop"}
Open a stream with curl to see the raw frames:
curl -N https://api.anthropic.com/v1/messages \
-H "Content-Type: application/json" \
-H "x-api-key: $ANTHROPIC_API_KEY" \
-H "anthropic-version: 2023-06-01" \
-d '{
"model": "claude-opus-4-7",
"max_tokens": 64,
"stream": true,
"messages": [{"role": "user", "content": "Hi"}]
}'
Output:
event: message_start
data: {"type":"message_start","message":{...}}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello!"}}
...
Streaming tool input
When a tool call is emitted, its input JSON arrives across multiple input_json_delta events. The SDK assembles the final input for you on stream.get_final_message(); iterate deltas only if you want to render a partial command in the UI before it completes.
buffer: list[str] = []
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=1024,
tools=[{
"name": "search",
"description": "Search the docs.",
"input_schema": {"type": "object", "properties": {"query": {"type": "string"}}, "required": ["query"]},
}],
messages=[{"role": "user", "content": "Find anything about retries."}],
) as stream:
for event in stream:
if event.type == "content_block_start" and event.content_block.type == "tool_use":
print(f"\n[tool call: {event.content_block.name}]")
elif event.type == "content_block_delta" and event.delta.type == "input_json_delta":
buffer.append(event.delta.partial_json)
print(event.delta.partial_json, end="", flush=True)
final = stream.get_final_message()
tool_use = next(b for b in final.content if b.type == "tool_use")
print(f"\n[final input: {tool_use.input}]")
Output:
[tool call: search]
{"query":"retries"}
[final input: {'query': 'retries'}]
Partial JSON is just a prefix of the eventual valid JSON — do not try to
json.loadsit untilcontent_block_stoparrives. Buffer the deltas, then parse once.
Streaming thinking blocks
When extended thinking is enabled, you receive thinking_delta events during the private reasoning step and (with the encrypted thinking beta) signature_delta events at the end of the block. The SDK groups them into a thinking block on final_message.
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=8000,
thinking={"type": "enabled", "budget_tokens": 5000},
messages=[{"role": "user", "content": "Why is the sky blue?"}],
) as stream:
in_thinking = False
for event in stream:
if event.type == "content_block_start":
in_thinking = event.content_block.type == "thinking"
if in_thinking:
print("\n[thinking...]")
elif event.content_block.type == "text":
print("\n[answer]\n")
elif event.type == "content_block_delta":
if event.delta.type == "thinking_delta":
print(event.delta.thinking, end="", flush=True)
elif event.delta.type == "text_delta":
print(event.delta.text, end="", flush=True)
Error handling mid-stream
If the API errors mid-stream (overloaded, rate limit, content policy), it sends an error event and closes the connection. The SDK raises a typed exception on the next iteration; outside the SDK, parse the event and surface a friendly message.
import anthropic
try:
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": "..."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
except anthropic.OverloadedError:
print("\n[server overloaded — retry with backoff]")
except anthropic.RateLimitError as e:
retry_after = e.response.headers.get("retry-after", "60")
print(f"\n[rate limited — retry in {retry_after}s]")
except anthropic.APIStatusError as e:
print(f"\n[error {e.status_code}: {e.message}]")
Retry pattern
A robust streaming client retries on transient failures (5xx, rate-limit) with exponential backoff and gives up on permanent ones (4xx auth, bad request).
import time
import anthropic
def stream_with_retry(messages: list, max_attempts: int = 4) -> str:
delay = 1.0
for attempt in range(max_attempts):
try:
chunks: list[str] = []
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=2048,
messages=messages,
) as stream:
for text in stream.text_stream:
chunks.append(text)
return "".join(chunks)
except (anthropic.RateLimitError, anthropic.OverloadedError, anthropic.APIConnectionError) as exc:
if attempt == max_attempts - 1:
raise
print(f"[transient error: {exc}; retrying in {delay:.1f}s]")
time.sleep(delay)
delay *= 2
return ""
Browser — fetch + ReadableStream
If you proxy streaming through your own backend (the recommended pattern — never expose ANTHROPIC_API_KEY to the browser), the browser consumes a text/event-stream or plain-text stream via fetch.
const response = await fetch("/chat", {
method: "POST",
body: JSON.stringify({ message: "Hello" }),
});
if (!response.body) throw new Error("no stream");
const reader = response.body.getReader();
const decoder = new TextDecoder();
let done = false;
while (!done) {
const { value, done: streamDone } = await reader.read();
done = streamDone;
if (value) document.getElementById("out")!.textContent += decoder.decode(value);
}
If your backend re-emits proper SSE (text/event-stream), use the EventSource API instead — it auto-reconnects on disconnect.
Custom SSE parser
When you cannot use a Claude SDK (Go, Rust, Elixir, embedded), parse SSE yourself. The format: each event is a sequence of field: value lines, terminated by a blank line.
import json
import httpx
def stream_messages(api_key: str, body: dict):
with httpx.stream(
"POST",
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={**body, "stream": True},
timeout=None,
) as r:
r.raise_for_status()
event_name = None
for line in r.iter_lines():
if not line:
event_name = None
continue
if line.startswith("event: "):
event_name = line[7:]
elif line.startswith("data: "):
payload = json.loads(line[6:])
yield event_name, payload
for name, data in stream_messages(
"sk-ant-api03-…REDACTED…",
{"model": "claude-opus-4-7", "max_tokens": 64, "messages": [{"role": "user", "content": "Hi"}]},
):
print(name, data.get("delta", {}).get("text", ""))
Output:
message_start
content_block_start
content_block_delta Hello
content_block_delta !
content_block_delta How can I help?
content_block_stop
message_delta
message_stop
Stop sequences interrupt streams
A stop_sequences match terminates the stream as soon as the sequence appears in the output — useful for "emit only one JSON object" or "stop before the closing fence". The matched sequence itself is not included in the deltas.
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=2048,
stop_sequences=["</answer>"],
messages=[{"role": "user", "content": "Reply with <answer>YES</answer> or <answer>NO</answer>."}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
final = stream.get_final_message()
print(f"\n[stop_reason={final.stop_reason}, stop_sequence={final.stop_sequence}]")
Output:
<answer>YES
[stop_reason=stop_sequence, stop_sequence=</answer>]
Common pitfalls
| Pitfall | Symptom | Fix |
|---|---|---|
Not using with / closing the stream | Sockets leak, hung connections | with client.messages.stream(...) as s: (Python) or finalise with await stream.finalMessage() (TS) |
| Parsing partial JSON mid-stream | JSONDecodeError | Buffer input_json_delta until content_block_stop, then parse |
Reading text_stream and events | Double-iteration error | Pick one — text or events |
| Forwarding bytes to browser without decoding | Mojibake on multi-byte UTF-8 | Use TextDecoder (browser) / decode chunks consistently |
Treating ping as content | Spurious empty deltas | Filter event.type == "ping" |
Ignoring message_delta.usage | Cannot estimate cost from stream | Read updated usage from message_delta, not message_start |
stream=true with count_tokens | API rejects | count_tokens is non-stream only |
Common recipes
First-token latency timer
import time
start = time.perf_counter()
first_token_at = None
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": "Hi"}],
) as stream:
for text in stream.text_stream:
if first_token_at is None:
first_token_at = time.perf_counter()
print(f"[first token in {(first_token_at - start) * 1000:.0f} ms]")
print(text, end="", flush=True)
print(f"\n[total {(time.perf_counter() - start) * 1000:.0f} ms]")
Output:
[first token in 412 ms]
Hello! How can I help?
[total 1284 ms]
Cancel a stream early
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=4096,
messages=[{"role": "user", "content": "Write a long essay."}],
) as stream:
out: list[str] = []
for text in stream.text_stream:
out.append(text)
if len("".join(out)) > 200:
stream.close() # closes the underlying HTTP connection
print("[cancelled at 200 chars]")
break
TypeScript — abort signal
const controller = new AbortController();
setTimeout(() => controller.abort(), 2000); // give up after 2s
try {
const stream = client.messages.stream(
{ model: "claude-opus-4-7", max_tokens: 2048, messages: [{ role: "user", content: "..." }] },
{ signal: controller.signal },
);
for await (const text of stream) process.stdout.write(text);
} catch (err: any) {
if (err?.name === "AbortError") console.log("\n[aborted]");
else throw err;
}
Replay a stream from logs
Streaming events can be appended to a file and replayed later for debugging without burning more tokens:
import json, pathlib
log = pathlib.Path("stream.jsonl").open("w")
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=512,
messages=[{"role": "user", "content": "Test"}],
) as stream:
for event in stream:
log.write(json.dumps(event.model_dump()) + "\n")
log.close()
See also
- Python SDK — non-streaming basics and async.
- TypeScript SDK — streaming on Node, Deno, Workers.
- Tool use — streaming partial tool input.
- Prompt caching — works identically with streaming.
- Batch API — the alternative when streaming is not needed.