cheat sheet
Claude API
The Anthropic Message Batches API — create, poll, retrieve, and stream results at 50% off list price for asynchronous bulk processing of thousands of Claude requests.
Claude API — Message Batches
What it is
The Message Batches API runs up to 100,000 Claude requests as a single asynchronous job at 50% off the standard per-token rate. Submit a list of requests, poll for completion (most batches finish within an hour; SLA is 24 hours), and stream back results in any order — each result is tagged with the custom_id you provided so you can correlate it back to the originating prompt. Reach for it any time the work is independent and not interactive: nightly classification, large-scale extraction, dataset enrichment, eval suites, document summarisation, retro re-scoring.
Pricing and limits
| Limit | Value |
|---|---|
| Max requests per batch | 100,000 |
| Max raw batch size | 256 MB |
| Discount | 50% off both input and output tokens |
| SLA | 24 hours (typically < 1 hour) |
| Result retention | 29 days from completion |
| Cancellation | Allowed while in_progress |
| Streaming | Not supported per-request (use Streaming for that) |
Batches compose with prompt caching — a 50% batch discount and up to 90% cache savings stack. For massive RAG re-scoring jobs the effective cost can drop to under 10% of list price.
When to batch
| Use case | Batch? |
|---|---|
| Nightly classification of 50K rows | Yes |
| Dataset enrichment / labelling | Yes |
| Eval suite over 1000 prompts | Yes |
| Translation of a 10K-row CSV | Yes |
| Bulk PDF summarisation | Yes |
| User-facing chat | No — use streaming |
| Real-time agent loop | No — use synchronous |
| One-off ad-hoc query | No — not worth the orchestration |
Create a batch — Python
Build a list of requests, each with a unique custom_id and a params block that matches the body you would send to messages.create. The SDK accepts any model, including claude-haiku-4-5 for cost-sensitive jobs.
import anthropic
client = anthropic.Anthropic()
documents = [
"Document 1: The quarterly report shows 18% revenue growth...",
"Document 2: Customer churn fell to 2.1% from 3.4%...",
"Document 3: Operating margin expanded by 280 bps...",
]
batch = client.messages.batches.create(
requests=[
{
"custom_id": f"doc-{i:04d}",
"params": {
"model": "claude-haiku-4-5",
"max_tokens": 200,
"messages": [{"role": "user", "content": f"Summarise in one sentence: {doc}"}],
},
}
for i, doc in enumerate(documents)
],
)
print(batch.id)
print(batch.processing_status)
print(batch.request_counts)
Output:
msgbatch_01XVnKzQpZ8mN7vF4LqJ2cR3
in_progress
RequestCounts(processing=3, succeeded=0, errored=0, canceled=0, expired=0)
Create a batch — TypeScript
The TypeScript SDK mirrors the Python surface. Use Anthropic.Messages.BatchCreateParams.Request for typed requests.
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
const documents = [
"Document 1: Revenue grew 18% YoY ...",
"Document 2: Churn fell to 2.1% ...",
"Document 3: Margin expanded 280 bps ...",
];
const batch = await client.messages.batches.create({
requests: documents.map((doc, i) => ({
custom_id: `doc-${String(i).padStart(4, "0")}`,
params: {
model: "claude-haiku-4-5",
max_tokens: 200,
messages: [{ role: "user", content: `Summarise in one sentence: ${doc}` }],
},
})),
});
console.log(batch.id, batch.processing_status);
Output:
msgbatch_01XVnKzQpZ8mN7vF4LqJ2cR3 in_progress
Batch object
A MessageBatch describes the job. Key fields:
| Field | Type | Meaning |
|---|---|---|
id | string | msgbatch_… — pass to retrieve and results |
processing_status | in_progress / canceling / ended | Job state |
request_counts | object | processing, succeeded, errored, canceled, expired |
created_at | ISO 8601 | When the batch was submitted |
ended_at | ISO 8601 | null | When processing finished (null while in progress) |
expires_at | ISO 8601 | Job expires 24 h after created_at if not done |
results_url | string | null | URL to download results once ended |
cancel_initiated_at | ISO 8601 | null | Set after a cancel request |
processing_status == "ended"does not mean "all succeeded" — it means processing is over. Checkrequest_counts.erroredandexpiredto see if anything failed.
Poll until done
The batch is asynchronous — poll retrieve(batch.id) until processing_status == "ended". Use 30–60 second sleeps for small batches; for very large jobs, 5+ minutes is fine.
import time
while True:
batch = client.messages.batches.retrieve(batch.id)
print(batch.processing_status, batch.request_counts)
if batch.processing_status == "ended":
break
time.sleep(30)
print("done")
print(f"succeeded: {batch.request_counts.succeeded}")
print(f"errored: {batch.request_counts.errored}")
Output:
in_progress RequestCounts(processing=3, succeeded=0, errored=0, canceled=0, expired=0)
in_progress RequestCounts(processing=2, succeeded=1, errored=0, canceled=0, expired=0)
ended RequestCounts(processing=0, succeeded=3, errored=0, canceled=0, expired=0)
done
succeeded: 3
errored: 0
Retrieve results
Once ended, stream results back with client.messages.batches.results(batch.id). Results are returned in arbitrary order — use the custom_id to correlate.
for result in client.messages.batches.results(batch.id):
if result.result.type == "succeeded":
text = result.result.message.content[0].text
print(f"{result.custom_id}: {text}")
elif result.result.type == "errored":
print(f"{result.custom_id}: ERROR — {result.result.error}")
elif result.result.type == "canceled":
print(f"{result.custom_id}: canceled")
elif result.result.type == "expired":
print(f"{result.custom_id}: expired (24h SLA missed)")
Output:
doc-0000: The quarter saw 18% revenue growth driven by enterprise SaaS.
doc-0002: Operating margin expanded 280 basis points YoY.
doc-0001: Customer churn fell from 3.4% to 2.1%.
Result object structure
Each line of the results stream is a BatchResultEntry with custom_id and a result discriminated union.
# result.result.type values:
#
# "succeeded" → result.result.message (a full Message object)
# "errored" → result.result.error ({type, message})
# "canceled" → result.result.error (always null; presence indicates cancel)
# "expired" → result.result.error (always null)
for r in client.messages.batches.results(batch.id):
msg = r.result.message if r.result.type == "succeeded" else None
print(r.custom_id, r.result.type, msg.usage if msg else None)
Output:
doc-0000 succeeded Usage(input_tokens=42, output_tokens=18, cache_creation_input_tokens=0, cache_read_input_tokens=0)
doc-0001 succeeded Usage(input_tokens=41, output_tokens=14, cache_creation_input_tokens=0, cache_read_input_tokens=0)
doc-0002 succeeded Usage(input_tokens=41, output_tokens=12, cache_creation_input_tokens=0, cache_read_input_tokens=0)
TypeScript — poll and retrieve
let current = await client.messages.batches.retrieve(batch.id);
while (current.processing_status !== "ended") {
await new Promise(r => setTimeout(r, 30_000));
current = await client.messages.batches.retrieve(batch.id);
console.log(current.processing_status, current.request_counts);
}
const results = await client.messages.batches.results(batch.id);
for await (const r of results) {
if (r.result.type === "succeeded") {
const first = r.result.message.content[0];
if (first.type === "text") {
console.log(`${r.custom_id}: ${first.text}`);
}
} else {
console.log(`${r.custom_id}: ${r.result.type}`);
}
}
Listing batches
Page over your batch history. Useful for ops dashboards or for resuming after a crash.
for batch in client.messages.batches.list(limit=20):
print(batch.id, batch.processing_status, batch.request_counts.succeeded, batch.created_at)
Output:
msgbatch_01XVnKzQpZ8mN7vF4LqJ2cR3 ended 3 2026-05-25T14:02:11Z
msgbatch_01XVlpPzM5sR4gJyT3Hf2cQ1 ended 4983 2026-05-24T19:41:08Z
msgbatch_01XVjnNyL3qP4eK2T2Hd1cN9 ended 502 2026-05-23T08:11:32Z
Canceling a batch
While processing_status is in_progress, you can request cancellation. The API moves the batch to canceling and finishes any in-flight requests before stopping; succeeded results so far are still retrievable, the rest are marked canceled.
canceled = client.messages.batches.cancel(batch.id)
print(canceled.processing_status)
print(canceled.cancel_initiated_at)
Output:
canceling
2026-05-25T14:06:22Z
Cost comparison
Worked example for 10,000 prompts × 1500 input tokens × 200 output tokens on Claude Sonnet:
| Mode | Input cost | Output cost | Total |
|---|---|---|---|
| Synchronous | 10K × 1500 × $3/1M = $45 | 10K × 200 × $15/1M = $30 | $75 |
| Batch (50% off) | $22.50 | $15 | $37.50 |
| Batch + cache hit on system prompt (~80%) | ~$10 | $15 | ~$25 |
PRICES = {"claude-sonnet-4-6": {"in": 3.0, "out": 15.0}}
def batch_cost(model: str, n: int, in_tok: int, out_tok: int, cache_hit_ratio: float = 0.0) -> float:
p = PRICES[model]
eff_input = in_tok * (1 - cache_hit_ratio) + in_tok * cache_hit_ratio * 0.1
return n * (eff_input * p["in"] + out_tok * p["out"]) / 1_000_000 * 0.5
print(f"baseline: ${batch_cost('claude-sonnet-4-6', 10_000, 1500, 200):.2f}")
print(f"80% cached: ${batch_cost('claude-sonnet-4-6', 10_000, 1500, 200, 0.8):.2f}")
Output:
baseline: $37.50
80% cached: $13.50
Use cases
1. Bulk classification
Classify a CSV of support tickets by category. The system prompt is reused across all 10,000 rows — prime cache target.
import csv
with open("tickets.csv", newline="") as f:
rows = list(csv.DictReader(f))
system = [
{"type": "text", "text": "You categorise support tickets."},
{
"type": "text",
"text": "Categories: billing, bug, feature_request, account, other. Reply with one word.",
"cache_control": {"type": "ephemeral"},
},
]
batch = client.messages.batches.create(
requests=[
{
"custom_id": row["ticket_id"],
"params": {
"model": "claude-haiku-4-5",
"max_tokens": 8,
"system": system,
"messages": [{"role": "user", "content": row["body"]}],
},
}
for row in rows
],
)
print(f"submitted {batch.request_counts.processing} tickets")
Output:
submitted 10000 tickets
2. Eval suite
Run a frozen set of evaluation prompts against a new model release.
import json
eval_set = json.load(open("eval_prompts.json"))
batch = client.messages.batches.create(
requests=[
{
"custom_id": ev["id"],
"params": {
"model": "claude-opus-4-7",
"max_tokens": 1024,
"messages": [{"role": "user", "content": ev["prompt"]}],
},
}
for ev in eval_set
],
)
3. Dataset re-scoring
Re-grade an existing dataset against a new rubric without burning interactive credits.
batch = client.messages.batches.create(
requests=[
{
"custom_id": row.id,
"params": {
"model": "claude-sonnet-4-6",
"max_tokens": 256,
"system": "Grade the answer 1-5 against the rubric. Reply with JSON: {\"score\": N, \"reason\": \"...\"}.",
"messages": [{"role": "user", "content": f"Question: {row.q}\nAnswer: {row.a}"}],
},
}
for row in dataset
],
)
Error handling in results
Most batches return some errored results — a malformed prompt, a stray validation error, a transient overload. Always handle them explicitly; do not assume all results succeeded.
succeeded: list[tuple[str, str]] = []
failed: list[tuple[str, str]] = []
for r in client.messages.batches.results(batch.id):
if r.result.type == "succeeded":
succeeded.append((r.custom_id, r.result.message.content[0].text))
else:
err = getattr(r.result, "error", None)
failed.append((r.custom_id, err.message if err else r.result.type))
print(f"ok: {len(succeeded)}; fail: {len(failed)}")
if failed:
for cid, msg in failed[:5]:
print(f" {cid}: {msg}")
Output:
ok: 9994; fail: 6
doc-1832: prompt content was longer than the model's context window
doc-4719: invalid_request_error: messages.0.content[0]: content cannot be empty
doc-6011: prompt content was longer than the model's context window
doc-8801: invalid_request_error: messages.0.content[0]: content cannot be empty
doc-9933: server_error: an internal error occurred
Resuming after a crash
The batch lives on Anthropic's side — your client crashing does not stop the job. Persist batch.id to your DB the moment it is created, then poll on restart.
import sqlite3
def submit_and_persist(requests: list, db: sqlite3.Connection) -> str:
batch = client.messages.batches.create(requests=requests)
db.execute(
"INSERT INTO batches (id, created_at, status) VALUES (?, ?, ?)",
(batch.id, batch.created_at.isoformat(), "in_progress"),
)
db.commit()
return batch.id
def resume_pending(db: sqlite3.Connection) -> list[str]:
return [row[0] for row in db.execute("SELECT id FROM batches WHERE status = 'in_progress'")]
Common pitfalls
| Pitfall | Symptom | Fix |
|---|---|---|
Reusing custom_id within a batch | API error on create | custom_id must be unique inside a batch |
| Polling every second | Wasted requests, no speedup | 30–60 s is plenty; batches do not finish faster from polling harder |
Treating ended as "all succeeded" | Silently dropped failures | Always inspect request_counts.errored and per-result types |
| Submitting interactive workloads | Slow UX, no streaming | Use synchronous + streaming for anything user-facing |
Not persisting batch.id | Lose the result URL on crash | Save the ID before the function returns |
| Mixing models per request to "save cost" | Bookkeeping nightmare | Submit separate batches per model — easier to reason about |
Ignoring expires_at | Batch silently expires after 24h | Monitor and resubmit anything not done |
| Forgetting cache breakpoints | Lose the stacked discount | Mark large reused prefixes with cache_control — they cache across the batch |
Common recipes
End-to-end runner
import time
import anthropic
client = anthropic.Anthropic()
def run_batch(requests: list, poll_seconds: int = 30) -> dict[str, str]:
batch = client.messages.batches.create(requests=requests)
print(f"submitted: {batch.id}")
while True:
batch = client.messages.batches.retrieve(batch.id)
if batch.processing_status == "ended":
break
time.sleep(poll_seconds)
out: dict[str, str] = {}
for r in client.messages.batches.results(batch.id):
if r.result.type == "succeeded":
out[r.custom_id] = r.result.message.content[0].text
return out
Async polling with progress
import asyncio
import anthropic
async def run_async(requests: list, poll_seconds: int = 15) -> dict[str, str]:
client = anthropic.AsyncAnthropic()
batch = await client.messages.batches.create(requests=requests)
while True:
batch = await client.messages.batches.retrieve(batch.id)
rc = batch.request_counts
total = rc.succeeded + rc.errored + rc.canceled + rc.expired + rc.processing
print(f"\r{rc.succeeded}/{total} done", end="", flush=True)
if batch.processing_status == "ended":
print()
break
await asyncio.sleep(poll_seconds)
out: dict[str, str] = {}
async for r in client.messages.batches.results(batch.id):
if r.result.type == "succeeded":
out[r.custom_id] = r.result.message.content[0].text
return out
Chunking very large jobs
The 100K-request cap is per-batch — for larger jobs, split into multiple batches and merge results.
def chunked(seq: list, size: int):
for i in range(0, len(seq), size):
yield seq[i:i + size]
all_requests = [...] # 500_000 requests
batch_ids: list[str] = []
for chunk in chunked(all_requests, 100_000):
b = client.messages.batches.create(requests=chunk)
batch_ids.append(b.id)
print(f"submitted {len(batch_ids)} batches")
Output:
submitted 5 batches
Retry just the failures
After processing, build a fresh batch from the errored results — preserve the original custom_id so you can correlate retried answers.
def collect_failures(batch_id: str) -> list[str]:
failed = []
for r in client.messages.batches.results(batch_id):
if r.result.type == "errored":
failed.append(r.custom_id)
return failed
See also
- Python SDK — synchronous message API.
- TypeScript SDK — same in TS.
- Prompt caching — stack a 50% batch discount with cached prefixes.
- Streaming — the interactive counterpart.
- Files API — reference uploaded docs from batched requests.