cheat sheet
Claude API
Complete Python SDK reference for the Anthropic Claude API — messages, streaming, vision, extended thinking, prompt caching, batch processing, and token counting.
Claude API — Python
What it is
The Anthropic API is Anthropic's REST API for programmatically integrating Claude models into applications, with official SDKs for Python and TypeScript. The Python SDK (anthropic) wraps the API with typed request/response objects and supports synchronous and streaming message creation, tool use, prompt caching, extended thinking, and batch processing. Reach for it when you need to embed Claude into backend services, pipelines, or automation scripts rather than using the Claude.ai web interface.
Install
pip install anthropic
Output:
Successfully installed anthropic-0.49.0
Basic message
The minimum viable call: instantiate an Anthropic() client (which reads ANTHROPIC_API_KEY from the environment), call client.messages.create() with a model, a token budget, and a messages list, then read response.content[0].text. Use this as the foundation for every more complex pattern on this page.
import anthropic
client = anthropic.Anthropic() # reads ANTHROPIC_API_KEY from env
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": "Explain what a Python decorator is."}]
)
print(response.content[0].text)
print(response.usage)
Output:
A Python decorator is a function that takes another function as input and returns a
modified version of it, allowing you to add behavior before or after the original
function runs without changing its source code. The @syntax is shorthand for
function = decorator(function).
Usage(input_tokens=15, output_tokens=62, cache_creation_input_tokens=0, cache_read_input_tokens=0)
Response object
The Message object returned by client.messages.create() contains the generated content as a list of typed blocks (usually a single TextBlock), plus metadata about why generation stopped (stop_reason) and token usage. Always check stop_reason — if it is "max_tokens" the output was truncated and you may need to increase max_tokens or continue from where it left off.
print(response.id) # "msg_01XVn..."
print(response.model) # "claude-opus-4-7-20251001"
print(response.stop_reason) # "end_turn" | "max_tokens" | "tool_use" | "stop_sequence"
print(response.usage.input_tokens)
print(response.usage.output_tokens)
# Content blocks
for block in response.content:
print(block.type) # "text"
print(block.text)
Multi-turn conversation
The API is stateless — you maintain conversation history by building the messages list yourself, alternating user and assistant turns. Append each assistant response verbatim before adding the next user message; the model uses this history to maintain context and avoid repeating itself.
messages = [
{"role": "user", "content": "What is 2 + 2?"},
{"role": "assistant", "content": "4"},
{"role": "user", "content": "Multiply that by 10."}
]
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=256,
messages=messages
)
print(response.content[0].text)
Output:
40
System prompt
The system parameter sets persistent, session-wide instructions that apply to every turn — persona, output format, domain constraints, tone. It is passed as a top-level field (not inside messages) and is the right place for everything that should not change between user turns.
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=512,
system="You are a concise technical documentation writer. Reply in bullet points only.",
messages=[{"role": "user", "content": "How does TCP handle packet loss?"}]
)
print(response.content[0].text)
Output:
- Sender sets a retransmission timer when a segment is sent
- If ACK not received before timeout, the segment is retransmitted
- Receiver uses sequence numbers to detect duplicates and reorder out-of-order segments
- Duplicate ACKs (3 in a row) trigger fast retransmit before the timer expires
- Congestion window is reduced on loss to slow the send rate
Streaming
Use client.messages.stream() to receive tokens as they arrive.
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": "Count to 5 slowly."}]
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
print()
# Get full message after stream completes
message = stream.get_final_message()
print(f"\nTotal tokens: {message.usage.input_tokens + message.usage.output_tokens}")
Output:
One... two... three... four... five.
Total tokens: 31
Low-level streaming events
with client.messages.stream(
model="claude-opus-4-7",
max_tokens=256,
messages=[{"role": "user", "content": "Hi"}]
) as stream:
for event in stream:
print(event.type)
Output:
message_start
content_block_start
content_block_delta
content_block_delta
content_block_stop
message_delta
message_stop
Vision — image input
Pass images as content blocks alongside text in the messages list. Images can be supplied as base64-encoded data (for local files or programmatically generated images) or as a publicly accessible URL. Claude can reason over diagrams, screenshots, charts, and photos in the same turn as text.
import base64
with open("chart.png", "rb") as f:
image_data = base64.standard_b64encode(f.read()).decode("utf-8")
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": image_data,
}
},
{"type": "text", "text": "What trend does this chart show?"}
]
}]
)
print(response.content[0].text)
Output:
The chart shows a steady upward trend in monthly active users from Q1 2024 through
Q4 2024, with growth accelerating in Q3. The steepest increase occurs between
August and October.
Image from URL
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=512,
messages=[{
"role": "user",
"content": [
{
"type": "image",
"source": {"type": "url", "url": "https://example.com/diagram.png"}
},
{"type": "text", "text": "Describe this diagram."}
]
}]
)
Supported image formats:
image/jpeg,image/png,image/gif,image/webp. Max 5 MB per image. Up to 20 images per request.
Extended thinking
Extended thinking lets Claude reason privately before responding. Useful for multi-step math, logic, and code review.
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=16000,
thinking={
"type": "enabled",
"budget_tokens": 10000
},
messages=[{
"role": "user",
"content": "A snail crawls 2 cm forward, then 1 cm back each day. Starting 0 cm, how many days to reach 10 cm?"
}]
)
for block in response.content:
if block.type == "thinking":
print(f"[Thinking: {len(block.thinking)} chars]")
else:
print(block.text)
Output:
[Thinking: 1843 chars]
The snail reaches 10 cm on day 10.
On days 1–9 it ends each day at 1, 2, 3, 4, 5, 6, 7, 8, 9 cm respectively
(net +1 cm/day). On day 10 it crawls forward 2 cm from 9 cm, hitting 11 cm —
which passes 10 cm before the backward crawl, so day 10 is the answer.
Extended thinking requires
temperature=1(the default).budget_tokensmust be ≥ 1024. Billed for thinking tokens.
Prompt caching
Cache large, reused context to cut latency and cost by up to 90% on cache hits. TTL is 5 minutes.
large_docs = "... 50,000 tokens of documentation ..."
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
system=[
{"type": "text", "text": "You are a helpful assistant."},
{
"type": "text",
"text": large_docs,
"cache_control": {"type": "ephemeral"}, # mark for caching
}
],
messages=[{"role": "user", "content": "What is the retry policy described in the docs?"}]
)
print(response.usage)
Output (first call — writes cache):
Usage(input_tokens=150, output_tokens=82, cache_creation_input_tokens=50000, cache_read_input_tokens=0)
Output (subsequent calls within 5 min):
Usage(input_tokens=150, output_tokens=82, cache_creation_input_tokens=0, cache_read_input_tokens=50000)
Place
cache_controlon the last content block you want included in the cache prefix. Up to 4 cache breakpoints per request. Cache the longest, most stable content (docs, tool definitions, system prompt) rather than volatile per-request content.
Token counting
Count tokens before sending to avoid hitting limits or to estimate cost.
count = client.messages.count_tokens(
model="claude-opus-4-7",
messages=[{"role": "user", "content": "Explain quantum entanglement in plain English."}]
)
print(count.input_tokens)
Output:
12
# Count tokens including tools and system prompt
count = client.messages.count_tokens(
model="claude-opus-4-7",
system="You are a helpful assistant.",
tools=tools,
messages=messages
)
print(f"Estimated input tokens: {count.input_tokens:,}")
Output:
Estimated input tokens: 3,842
Batch processing
Process thousands of prompts at 50% cost. Results are ready within 24 hours.
documents = ["Summary of doc 1...", "Summary of doc 2...", "Summary of doc 3..."]
batch = client.messages.batches.create(
requests=[
{
"custom_id": f"doc-{i}",
"params": {
"model": "claude-haiku-4-5",
"max_tokens": 200,
"messages": [{"role": "user", "content": f"Summarize in one sentence: {doc}"}]
}
}
for i, doc in enumerate(documents)
]
)
print(batch.id)
print(batch.processing_status)
Output:
msgbatch_01XVnKzQp...
in_progress
Poll and retrieve results
import time
# Poll until done
while True:
batch = client.messages.batches.retrieve(batch.id)
if batch.processing_status == "ended":
break
time.sleep(60)
# Stream results
for result in client.messages.batches.results(batch.id):
if result.result.type == "succeeded":
print(f"{result.custom_id}: {result.result.message.content[0].text}")
elif result.result.type == "errored":
print(f"{result.custom_id}: ERROR — {result.result.error}")
Output:
doc-0: The document describes a three-tier caching strategy for web services.
doc-1: The document outlines the company's Q3 financial results showing 18% revenue growth.
doc-2: The document explains how to configure database connection pooling in SQLAlchemy.
Stop sequences
stop_sequences is a list of token strings that, if generated, cause Claude to halt immediately — the matched string is not included in the output. Use stop sequences to enforce boundaries in structured generation, such as stopping before a closing delimiter so you can inject content, or halting after a single JSON object.
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=512,
stop_sequences=["```", "---"],
messages=[{"role": "user", "content": "Write a Python hello world."}]
)
print(response.stop_reason) # "stop_sequence"
print(response.stop_sequence) # "```"
Error handling
The SDK raises typed exceptions so you can handle transient errors (rate limits, network blips) differently from permanent ones (bad API key, malformed request). RateLimitError means you should back off and retry; APIStatusError wraps all other HTTP error responses with the status code and message attached.
import anthropic
client = anthropic.Anthropic()
try:
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}]
)
except anthropic.AuthenticationError:
print("Invalid API key")
except anthropic.RateLimitError as e:
print(f"Rate limited — retry after: {e.response.headers.get('retry-after')}")
except anthropic.APIStatusError as e:
print(f"API error {e.status_code}: {e.message}")
Async client
AsyncAnthropic is the async counterpart to the synchronous client — use it in async def functions with await to avoid blocking an event loop. It exposes the same API surface (messages.create, messages.stream, etc.) and is the right choice when embedding Claude calls in FastAPI handlers, async scripts, or any asyncio-based application.
import asyncio
import anthropic
async def main():
async with anthropic.AsyncAnthropic() as client:
response = await client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": "What is async/await?"}]
)
print(response.content[0].text)
asyncio.run(main())
Output:
Async/await is Python syntax for writing asynchronous code that looks synchronous.
`async def` marks a coroutine function; `await` suspends it until a result is ready
without blocking the event loop, letting other tasks run in the meantime.
Model reference
| Model | Context | Best for |
|---|---|---|
claude-opus-4-7 | 200K | Complex reasoning, code, analysis |
claude-sonnet-4-6 | 200K | Balanced quality and speed |
claude-haiku-4-5 | 200K | Fast, low-cost, high-volume tasks |
Use
claude-haiku-4-5for batch jobs, classification, and summarization where cost matters. Useclaude-opus-4-7for agentic tasks, code generation, and complex reasoning.claude-sonnet-4-6is a good default for interactive applications.
Environment setup
# Set API key (preferred — never hardcode)
export ANTHROPIC_API_KEY="sk-ant-api03-…REDACTED…"
# Optional: custom base URL (e.g. proxy)
export ANTHROPIC_BASE_URL="https://my-proxy.example.com"
Output: (none — exits 0 on success)
# Or pass explicitly
client = anthropic.Anthropic(api_key="sk-ant-api03-…REDACTED…")
# Or use a different base URL
client = anthropic.Anthropic(
api_key="sk-ant-api03-…REDACTED…",
base_url="https://my-proxy.example.com"
)
Retries and timeouts
The SDK retries transient failures (network blips, 408, 409, 429, 5xx) up to two times by default with exponential backoff. Override max_retries on the client to be more aggressive or to disable retries entirely; override timeout to bound total wall-clock time per request. Per-call overrides via .with_options() are useful for one-off slow operations (e.g. extended thinking with a long budget) without changing the client-wide default.
import httpx
import anthropic
client = anthropic.Anthropic(
max_retries=5,
timeout=httpx.Timeout(60.0, connect=5.0, read=60.0, write=10.0),
)
# Per-call override — bumps timeout for one slow call
response = client.with_options(timeout=180.0).messages.create(
model="claude-opus-4-7",
max_tokens=8000,
messages=[{"role": "user", "content": "Write a long essay."}],
)
print(response.usage.output_tokens)
Output:
3942
Inspecting raw HTTP responses
Sometimes you need the raw Response to inspect headers (e.g. rate-limit budget) or to dump the body for debugging. Use .with_raw_response.create() to get an APIResponse wrapping the underlying httpx.Response; call .parse() to return the typed Message object as well.
raw = client.messages.with_raw_response.create(
model="claude-opus-4-7",
max_tokens=128,
messages=[{"role": "user", "content": "ping"}],
)
print(raw.http_response.status_code)
print(raw.http_response.headers.get("anthropic-ratelimit-requests-remaining"))
print(raw.http_response.headers.get("anthropic-ratelimit-tokens-remaining"))
message = raw.parse() # typed Message object
print(message.content[0].text)
Output:
200
49
399200
pong
Rate-limit headers (
anthropic-ratelimit-*-remaining,anthropic-ratelimit-*-reset) are the most accurate way to back off — much better than guessing fromRateLimitError.retry_after.
Logging and debugging
Enable verbose SDK logs to see every HTTP request, retry, and response.
export ANTHROPIC_LOG=debug
python my_app.py
Output:
DEBUG anthropic._base_client:Request options: {'method': 'post', 'url': '/v1/messages', ...}
DEBUG anthropic._base_client:HTTP Request: POST https://api.anthropic.com/v1/messages "200 OK"
DEBUG anthropic._base_client:HTTP Response: 200 ... 1.42s
Or programmatically through Python's logging module:
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("anthropic").setLevel(logging.DEBUG)
# Now every request is logged
JSON-mode output
Claude does not have a hard "JSON mode" toggle — instead, you ask for JSON in the prompt and parse the result, or you use tool use with a single forced tool for guaranteed structured output. The prompt-only approach is simpler; tool-forced extraction is more reliable when the schema is strict.
import json
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=512,
system="Reply with a single JSON object and nothing else. No prose, no code fences.",
messages=[{
"role": "user",
"content": "Extract name and age: 'Alice Dev is 34 years old.'"
}],
)
data = json.loads(response.content[0].text)
print(data["name"], data["age"])
Output:
Alice Dev 34
For strict schemas, prefer
tool_choice={"type": "tool", "name": "extract"}with apropertiesschema — Claude is guaranteed to emit JSON that matches your input_schema. See the tool-use page.
PDF input
Send a PDF as a document content block alongside text. Claude reads the PDF visually (preserving layout, tables, figures) up to 100 pages per document and 32 MB per file.
import base64
with open("report.pdf", "rb") as f:
pdf_data = base64.standard_b64encode(f.read()).decode("utf-8")
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=2048,
messages=[{
"role": "user",
"content": [
{
"type": "document",
"source": {
"type": "base64",
"media_type": "application/pdf",
"data": pdf_data,
},
"cache_control": {"type": "ephemeral"},
},
{"type": "text", "text": "Summarise the key findings in 5 bullet points."}
]
}],
)
print(response.content[0].text)
Output:
- Quarterly revenue rose 18% YoY, driven by enterprise SaaS.
- Customer churn fell to 2.1% from 3.4% a year earlier.
- Operating margin expanded 280 bps to 24.7%.
- Free cash flow conversion was 96% of net income.
- Guidance raised for the second half on improving pipeline.
Reference a PDF by URL with
{"type": "url", "url": "https://…/report.pdf"}instead of base64. URL sources do not count against your upload size but must be publicly accessible. For large or reused PDFs, the Files API is a better fit — upload once, reference many times.
Citations
Citations attach machine-verifiable references to each part of Claude's response, anchored to character ranges or page numbers in the document. Enable per-document with citations: { enabled: true }. Claude returns citation blocks inside text blocks so you can render exact source spans in your UI.
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{
"role": "user",
"content": [
{
"type": "document",
"source": {"type": "text", "media_type": "text/plain", "data": "The capital of France is Paris. Berlin is the capital of Germany."},
"title": "Capitals",
"citations": {"enabled": True},
},
{"type": "text", "text": "What is the capital of France?"}
]
}],
)
for block in response.content:
if block.type != "text":
continue
print(block.text)
for cite in block.citations or []:
print(f" -> {cite.cited_text!r} ({cite.start_char_index}-{cite.end_char_index})")
Output:
The capital of France is Paris.
-> 'The capital of France is Paris.' (0-31)
Files API integration
Once a document is uploaded via the Files API, reference it by file_id in any subsequent message — no base64 re-upload. This is the cleanest pattern for RAG over a fixed corpus and for very large PDFs you reuse across many calls.
uploaded = client.beta.files.upload(file=open("manual.pdf", "rb"))
response = client.beta.messages.create(
model="claude-opus-4-7",
max_tokens=2048,
messages=[{
"role": "user",
"content": [
{"type": "document", "source": {"type": "file", "file_id": uploaded.id}},
{"type": "text", "text": "What does section 4.2 say about reset behaviour?"}
]
}],
extra_headers={"anthropic-beta": "files-api-2025-04-14"},
)
print(response.content[0].text)
Output:
Section 4.2 states that a soft reset clears the I/O queue but preserves the
device's serial number and calibration tables, while a hard reset clears
both and reverts firmware to factory defaults.
See the full Files API reference for upload, listing, and lifecycle.
Concurrency patterns
Async batching with asyncio.gather is the fastest way to fan out independent calls. Cap concurrency with a Semaphore to stay under per-minute token and request limits — without one, even 50 simultaneous calls will trigger RateLimitError on most accounts.
import asyncio
import anthropic
client = anthropic.AsyncAnthropic()
sem = asyncio.Semaphore(10) # max 10 in-flight calls
async def summarise(text: str) -> str:
async with sem:
resp = await client.messages.create(
model="claude-haiku-4-5",
max_tokens=200,
messages=[{"role": "user", "content": f"Summarise: {text}"}],
)
return resp.content[0].text
async def main():
docs = [f"Document {i} content..." for i in range(100)]
results = await asyncio.gather(*(summarise(d) for d in docs))
print(f"Completed {len(results)} summaries")
asyncio.run(main())
Output:
Completed 100 summaries
For >1000 prompts that can wait, the Batch API is 50% cheaper than this fan-out pattern. Use async fan-out only when results must be available in seconds, not hours.
FastAPI streaming endpoint
Stream Claude's response over a FastAPI StreamingResponse to forward tokens to a browser as they arrive. The browser EventSource API (or fetch with a ReadableStream) can consume the chunks in order, giving the same "typing" UX as Claude.ai with no extra infrastructure.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic
app = FastAPI()
client = anthropic.AsyncAnthropic()
class ChatRequest(BaseModel):
message: str
@app.post("/chat")
async def chat(req: ChatRequest):
async def token_stream():
async with client.messages.stream(
model="claude-opus-4-7",
max_tokens=1024,
messages=[{"role": "user", "content": req.message}],
) as stream:
async for text in stream.text_stream:
yield text
return StreamingResponse(token_stream(), media_type="text/plain")
Test it from the command line:
curl -N -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"message": "Count to three."}'
Output:
One. Two. Three.
Pydantic models from responses
Combine forced tool use with pydantic models to get strictly typed extraction. The forced tool guarantees JSON matching input_schema; Pydantic validates and converts on the way in.
from pydantic import BaseModel, Field
import anthropic, json
class Contact(BaseModel):
name: str
email: str = Field(pattern=r".+@.+\..+")
role: str | None = None
tools = [{
"name": "extract_contact",
"description": "Extract a single contact from text.",
"input_schema": Contact.model_json_schema(),
}]
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=512,
tools=tools,
tool_choice={"type": "tool", "name": "extract_contact"},
messages=[{
"role": "user",
"content": "Alice Dev (alice@example.com) is our staff engineer."
}],
)
tool_use = next(b for b in response.content if b.type == "tool_use")
contact = Contact.model_validate(tool_use.input)
print(contact)
Output:
name='Alice Dev' email='alice@example.com' role='staff engineer'
Bedrock and Vertex variants
The SDK also ships clients for Claude on AWS Bedrock and Google Vertex AI — same API surface, different auth. Useful when your data residency, billing, or VPC requirements force you off the first-party Anthropic API.
# AWS Bedrock
from anthropic import AnthropicBedrock
bedrock = AnthropicBedrock(
aws_region="us-east-1",
)
resp = bedrock.messages.create(
model="anthropic.claude-opus-4-7-v1:0",
max_tokens=256,
messages=[{"role": "user", "content": "Hello from Bedrock."}],
)
print(resp.content[0].text)
# Google Vertex AI
from anthropic import AnthropicVertex
vertex = AnthropicVertex(
region="us-central1",
project_id="my-gcp-project",
)
resp = vertex.messages.create(
model="claude-opus-4-7@20251001",
max_tokens=256,
messages=[{"role": "user", "content": "Hello from Vertex."}],
)
print(resp.content[0].text)
Bedrock and Vertex use AWS / GCP credentials respectively —
ANTHROPIC_API_KEYis not consulted. Some features (Batch API, Files API beta) may lag the first-party API on these platforms; check the SDK release notes before relying on them.
Common pitfalls
| Pitfall | Symptom | Fix |
|---|---|---|
| Forgetting to append assistant turn | Model loses context on next call | After tool use or any reply, append {"role": "assistant", "content": response.content} verbatim |
| Re-uploading the same PDF every call | Slow, expensive | Use Files API or prompt caching with cache_control |
temperature ≠ 1 with thinking | BadRequestError | Drop temperature (default 1) or set it to 1 explicitly |
Streaming without with block | Connection leaks | Always use with client.messages.stream(...) as stream: |
Hardcoding sk-ant-… in source | Key leaks via git history | Use ANTHROPIC_API_KEY env var; never commit keys |
max_tokens too low | stop_reason == "max_tokens" mid-thought | Increase budget; check stop_reason after every call |
| Synchronous client in async code | Event loop blocks | Use AsyncAnthropic inside async def |
| Caching small prompts | No latency win, wasted writes | Only cache prefixes ≥ 1024 tokens (Sonnet/Opus) or ≥ 2048 (Haiku) |
Common recipes
Continuation past max_tokens
def complete_with_continuation(prompt: str, total_budget: int = 16000) -> str:
"""Keep calling until stop_reason != 'max_tokens'."""
parts = []
messages = [{"role": "user", "content": prompt}]
remaining = total_budget
while remaining > 0:
chunk_budget = min(4096, remaining)
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=chunk_budget,
messages=messages,
)
text = "".join(b.text for b in response.content if b.type == "text")
parts.append(text)
if response.stop_reason != "max_tokens":
break
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": "continue"})
remaining -= response.usage.output_tokens
return "".join(parts)
Cost estimator
PRICES = {
"claude-opus-4-7": {"in": 15.0, "out": 75.0}, # $/1M tokens
"claude-sonnet-4-6": {"in": 3.0, "out": 15.0},
"claude-haiku-4-5": {"in": 0.80, "out": 4.0},
}
def estimate_cost(model: str, in_tok: int, out_tok: int) -> float:
p = PRICES[model]
return (in_tok * p["in"] + out_tok * p["out"]) / 1_000_000
usage = response.usage
print(f"${estimate_cost('claude-opus-4-7', usage.input_tokens, usage.output_tokens):.4f}")
Output:
$0.0048
Conversation memory trimmer
def trim_history(messages: list, max_input_tokens: int = 100_000) -> list:
"""Drop oldest user/assistant pairs until under the cap."""
while True:
count = client.messages.count_tokens(
model="claude-opus-4-7",
messages=messages,
).input_tokens
if count <= max_input_tokens or len(messages) <= 2:
return messages
messages = messages[2:] # drop oldest user + assistant turn
See also
- TypeScript SDK — same API surface, Node/Deno/Bun.
- Streaming — full SSE event reference.
- Tool use — function calling and agentic loops.
- Batch API — bulk processing at 50% cost.
- Prompt caching — TTL, breakpoints, cost.
- Files API — upload-once, reference-many.