cheat sheet

Claude API

Complete Python SDK reference for the Anthropic Claude API — messages, streaming, vision, extended thinking, prompt caching, batch processing, and token counting.

Claude API — Python

What it is

The Anthropic API is Anthropic's REST API for programmatically integrating Claude models into applications, with official SDKs for Python and TypeScript. The Python SDK (anthropic) wraps the API with typed request/response objects and supports synchronous and streaming message creation, tool use, prompt caching, extended thinking, and batch processing. Reach for it when you need to embed Claude into backend services, pipelines, or automation scripts rather than using the Claude.ai web interface.

Install

bash
pip install anthropic

Output:

text
Successfully installed anthropic-0.49.0

Basic message

The minimum viable call: instantiate an Anthropic() client (which reads ANTHROPIC_API_KEY from the environment), call client.messages.create() with a model, a token budget, and a messages list, then read response.content[0].text. Use this as the foundation for every more complex pattern on this page.

python
import anthropic

client = anthropic.Anthropic()  # reads ANTHROPIC_API_KEY from env

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Explain what a Python decorator is."}]
)

print(response.content[0].text)
print(response.usage)

Output:

text
A Python decorator is a function that takes another function as input and returns a
modified version of it, allowing you to add behavior before or after the original
function runs without changing its source code. The @syntax is shorthand for
function = decorator(function).

Usage(input_tokens=15, output_tokens=62, cache_creation_input_tokens=0, cache_read_input_tokens=0)

Response object

The Message object returned by client.messages.create() contains the generated content as a list of typed blocks (usually a single TextBlock), plus metadata about why generation stopped (stop_reason) and token usage. Always check stop_reason — if it is "max_tokens" the output was truncated and you may need to increase max_tokens or continue from where it left off.

python
print(response.id)            # "msg_01XVn..."
print(response.model)         # "claude-opus-4-7-20251001"
print(response.stop_reason)   # "end_turn" | "max_tokens" | "tool_use" | "stop_sequence"
print(response.usage.input_tokens)
print(response.usage.output_tokens)

# Content blocks
for block in response.content:
    print(block.type)    # "text"
    print(block.text)

Multi-turn conversation

The API is stateless — you maintain conversation history by building the messages list yourself, alternating user and assistant turns. Append each assistant response verbatim before adding the next user message; the model uses this history to maintain context and avoid repeating itself.

python
messages = [
    {"role": "user", "content": "What is 2 + 2?"},
    {"role": "assistant", "content": "4"},
    {"role": "user", "content": "Multiply that by 10."}
]

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=256,
    messages=messages
)
print(response.content[0].text)

Output:

text
40

System prompt

The system parameter sets persistent, session-wide instructions that apply to every turn — persona, output format, domain constraints, tone. It is passed as a top-level field (not inside messages) and is the right place for everything that should not change between user turns.

python
response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=512,
    system="You are a concise technical documentation writer. Reply in bullet points only.",
    messages=[{"role": "user", "content": "How does TCP handle packet loss?"}]
)
print(response.content[0].text)

Output:

text
- Sender sets a retransmission timer when a segment is sent
- If ACK not received before timeout, the segment is retransmitted
- Receiver uses sequence numbers to detect duplicates and reorder out-of-order segments
- Duplicate ACKs (3 in a row) trigger fast retransmit before the timer expires
- Congestion window is reduced on loss to slow the send rate

Streaming

Use client.messages.stream() to receive tokens as they arrive.

python
with client.messages.stream(
    model="claude-opus-4-7",
    max_tokens=1024,
    messages=[{"role": "user", "content": "Count to 5 slowly."}]
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)
    print()

# Get full message after stream completes
message = stream.get_final_message()
print(f"\nTotal tokens: {message.usage.input_tokens + message.usage.output_tokens}")

Output:

text
One... two... three... four... five.

Total tokens: 31

Low-level streaming events

python
with client.messages.stream(
    model="claude-opus-4-7",
    max_tokens=256,
    messages=[{"role": "user", "content": "Hi"}]
) as stream:
    for event in stream:
        print(event.type)

Output:

text
message_start
content_block_start
content_block_delta
content_block_delta
content_block_stop
message_delta
message_stop

Vision — image input

Pass images as content blocks alongside text in the messages list. Images can be supplied as base64-encoded data (for local files or programmatically generated images) or as a publicly accessible URL. Claude can reason over diagrams, screenshots, charts, and photos in the same turn as text.

python
import base64

with open("chart.png", "rb") as f:
    image_data = base64.standard_b64encode(f.read()).decode("utf-8")

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=1024,
    messages=[{
        "role": "user",
        "content": [
            {
                "type": "image",
                "source": {
                    "type": "base64",
                    "media_type": "image/png",
                    "data": image_data,
                }
            },
            {"type": "text", "text": "What trend does this chart show?"}
        ]
    }]
)
print(response.content[0].text)

Output:

text
The chart shows a steady upward trend in monthly active users from Q1 2024 through
Q4 2024, with growth accelerating in Q3. The steepest increase occurs between
August and October.

Image from URL

python
response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=512,
    messages=[{
        "role": "user",
        "content": [
            {
                "type": "image",
                "source": {"type": "url", "url": "https://example.com/diagram.png"}
            },
            {"type": "text", "text": "Describe this diagram."}
        ]
    }]
)

Supported image formats: image/jpeg, image/png, image/gif, image/webp. Max 5 MB per image. Up to 20 images per request.

Extended thinking

Extended thinking lets Claude reason privately before responding. Useful for multi-step math, logic, and code review.

python
response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=16000,
    thinking={
        "type": "enabled",
        "budget_tokens": 10000
    },
    messages=[{
        "role": "user",
        "content": "A snail crawls 2 cm forward, then 1 cm back each day. Starting 0 cm, how many days to reach 10 cm?"
    }]
)

for block in response.content:
    if block.type == "thinking":
        print(f"[Thinking: {len(block.thinking)} chars]")
    else:
        print(block.text)

Output:

text
[Thinking: 1843 chars]
The snail reaches 10 cm on day 10.

On days 1–9 it ends each day at 1, 2, 3, 4, 5, 6, 7, 8, 9 cm respectively
(net +1 cm/day). On day 10 it crawls forward 2 cm from 9 cm, hitting 11 cm —
which passes 10 cm before the backward crawl, so day 10 is the answer.

Extended thinking requires temperature=1 (the default). budget_tokens must be ≥ 1024. Billed for thinking tokens.

Prompt caching

Cache large, reused context to cut latency and cost by up to 90% on cache hits. TTL is 5 minutes.

python
large_docs = "... 50,000 tokens of documentation ..."

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=1024,
    system=[
        {"type": "text", "text": "You are a helpful assistant."},
        {
            "type": "text",
            "text": large_docs,
            "cache_control": {"type": "ephemeral"},   # mark for caching
        }
    ],
    messages=[{"role": "user", "content": "What is the retry policy described in the docs?"}]
)
print(response.usage)

Output (first call — writes cache):

text
Usage(input_tokens=150, output_tokens=82, cache_creation_input_tokens=50000, cache_read_input_tokens=0)

Output (subsequent calls within 5 min):

text
Usage(input_tokens=150, output_tokens=82, cache_creation_input_tokens=0, cache_read_input_tokens=50000)

Place cache_control on the last content block you want included in the cache prefix. Up to 4 cache breakpoints per request. Cache the longest, most stable content (docs, tool definitions, system prompt) rather than volatile per-request content.

Token counting

Count tokens before sending to avoid hitting limits or to estimate cost.

python
count = client.messages.count_tokens(
    model="claude-opus-4-7",
    messages=[{"role": "user", "content": "Explain quantum entanglement in plain English."}]
)
print(count.input_tokens)

Output:

text
12
python
# Count tokens including tools and system prompt
count = client.messages.count_tokens(
    model="claude-opus-4-7",
    system="You are a helpful assistant.",
    tools=tools,
    messages=messages
)
print(f"Estimated input tokens: {count.input_tokens:,}")

Output:

text
Estimated input tokens: 3,842

Batch processing

Process thousands of prompts at 50% cost. Results are ready within 24 hours.

python
documents = ["Summary of doc 1...", "Summary of doc 2...", "Summary of doc 3..."]

batch = client.messages.batches.create(
    requests=[
        {
            "custom_id": f"doc-{i}",
            "params": {
                "model": "claude-haiku-4-5",
                "max_tokens": 200,
                "messages": [{"role": "user", "content": f"Summarize in one sentence: {doc}"}]
            }
        }
        for i, doc in enumerate(documents)
    ]
)

print(batch.id)
print(batch.processing_status)

Output:

text
msgbatch_01XVnKzQp...
in_progress

Poll and retrieve results

python
import time

# Poll until done
while True:
    batch = client.messages.batches.retrieve(batch.id)
    if batch.processing_status == "ended":
        break
    time.sleep(60)

# Stream results
for result in client.messages.batches.results(batch.id):
    if result.result.type == "succeeded":
        print(f"{result.custom_id}: {result.result.message.content[0].text}")
    elif result.result.type == "errored":
        print(f"{result.custom_id}: ERROR — {result.result.error}")

Output:

text
doc-0: The document describes a three-tier caching strategy for web services.
doc-1: The document outlines the company's Q3 financial results showing 18% revenue growth.
doc-2: The document explains how to configure database connection pooling in SQLAlchemy.

Stop sequences

stop_sequences is a list of token strings that, if generated, cause Claude to halt immediately — the matched string is not included in the output. Use stop sequences to enforce boundaries in structured generation, such as stopping before a closing delimiter so you can inject content, or halting after a single JSON object.

python
response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=512,
    stop_sequences=["```", "---"],
    messages=[{"role": "user", "content": "Write a Python hello world."}]
)
print(response.stop_reason)   # "stop_sequence"
print(response.stop_sequence) # "```"

Error handling

The SDK raises typed exceptions so you can handle transient errors (rate limits, network blips) differently from permanent ones (bad API key, malformed request). RateLimitError means you should back off and retry; APIStatusError wraps all other HTTP error responses with the status code and message attached.

python
import anthropic

client = anthropic.Anthropic()

try:
    response = client.messages.create(
        model="claude-opus-4-7",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Hello"}]
    )
except anthropic.AuthenticationError:
    print("Invalid API key")
except anthropic.RateLimitError as e:
    print(f"Rate limited — retry after: {e.response.headers.get('retry-after')}")
except anthropic.APIStatusError as e:
    print(f"API error {e.status_code}: {e.message}")

Async client

AsyncAnthropic is the async counterpart to the synchronous client — use it in async def functions with await to avoid blocking an event loop. It exposes the same API surface (messages.create, messages.stream, etc.) and is the right choice when embedding Claude calls in FastAPI handlers, async scripts, or any asyncio-based application.

python
import asyncio
import anthropic

async def main():
    async with anthropic.AsyncAnthropic() as client:
        response = await client.messages.create(
            model="claude-opus-4-7",
            max_tokens=1024,
            messages=[{"role": "user", "content": "What is async/await?"}]
        )
        print(response.content[0].text)

asyncio.run(main())

Output:

text
Async/await is Python syntax for writing asynchronous code that looks synchronous.
`async def` marks a coroutine function; `await` suspends it until a result is ready
without blocking the event loop, letting other tasks run in the meantime.

Model reference

ModelContextBest for
claude-opus-4-7200KComplex reasoning, code, analysis
claude-sonnet-4-6200KBalanced quality and speed
claude-haiku-4-5200KFast, low-cost, high-volume tasks

Use claude-haiku-4-5 for batch jobs, classification, and summarization where cost matters. Use claude-opus-4-7 for agentic tasks, code generation, and complex reasoning. claude-sonnet-4-6 is a good default for interactive applications.

Environment setup

bash
# Set API key (preferred — never hardcode)
export ANTHROPIC_API_KEY="sk-ant-api03-…REDACTED…"

# Optional: custom base URL (e.g. proxy)
export ANTHROPIC_BASE_URL="https://my-proxy.example.com"

Output: (none — exits 0 on success)

python
# Or pass explicitly
client = anthropic.Anthropic(api_key="sk-ant-api03-…REDACTED…")

# Or use a different base URL
client = anthropic.Anthropic(
    api_key="sk-ant-api03-…REDACTED…",
    base_url="https://my-proxy.example.com"
)

Retries and timeouts

The SDK retries transient failures (network blips, 408, 409, 429, 5xx) up to two times by default with exponential backoff. Override max_retries on the client to be more aggressive or to disable retries entirely; override timeout to bound total wall-clock time per request. Per-call overrides via .with_options() are useful for one-off slow operations (e.g. extended thinking with a long budget) without changing the client-wide default.

python
import httpx
import anthropic

client = anthropic.Anthropic(
    max_retries=5,
    timeout=httpx.Timeout(60.0, connect=5.0, read=60.0, write=10.0),
)

# Per-call override — bumps timeout for one slow call
response = client.with_options(timeout=180.0).messages.create(
    model="claude-opus-4-7",
    max_tokens=8000,
    messages=[{"role": "user", "content": "Write a long essay."}],
)
print(response.usage.output_tokens)

Output:

text
3942

Inspecting raw HTTP responses

Sometimes you need the raw Response to inspect headers (e.g. rate-limit budget) or to dump the body for debugging. Use .with_raw_response.create() to get an APIResponse wrapping the underlying httpx.Response; call .parse() to return the typed Message object as well.

python
raw = client.messages.with_raw_response.create(
    model="claude-opus-4-7",
    max_tokens=128,
    messages=[{"role": "user", "content": "ping"}],
)

print(raw.http_response.status_code)
print(raw.http_response.headers.get("anthropic-ratelimit-requests-remaining"))
print(raw.http_response.headers.get("anthropic-ratelimit-tokens-remaining"))

message = raw.parse()       # typed Message object
print(message.content[0].text)

Output:

text
200
49
399200
pong

Rate-limit headers (anthropic-ratelimit-*-remaining, anthropic-ratelimit-*-reset) are the most accurate way to back off — much better than guessing from RateLimitError.retry_after.

Logging and debugging

Enable verbose SDK logs to see every HTTP request, retry, and response.

bash
export ANTHROPIC_LOG=debug
python my_app.py

Output:

text
DEBUG anthropic._base_client:Request options: {'method': 'post', 'url': '/v1/messages', ...}
DEBUG anthropic._base_client:HTTP Request: POST https://api.anthropic.com/v1/messages "200 OK"
DEBUG anthropic._base_client:HTTP Response: 200 ... 1.42s

Or programmatically through Python's logging module:

python
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("anthropic").setLevel(logging.DEBUG)

# Now every request is logged

JSON-mode output

Claude does not have a hard "JSON mode" toggle — instead, you ask for JSON in the prompt and parse the result, or you use tool use with a single forced tool for guaranteed structured output. The prompt-only approach is simpler; tool-forced extraction is more reliable when the schema is strict.

python
import json

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=512,
    system="Reply with a single JSON object and nothing else. No prose, no code fences.",
    messages=[{
        "role": "user",
        "content": "Extract name and age: 'Alice Dev is 34 years old.'"
    }],
)

data = json.loads(response.content[0].text)
print(data["name"], data["age"])

Output:

text
Alice Dev 34

For strict schemas, prefer tool_choice={"type": "tool", "name": "extract"} with a properties schema — Claude is guaranteed to emit JSON that matches your input_schema. See the tool-use page.

PDF input

Send a PDF as a document content block alongside text. Claude reads the PDF visually (preserving layout, tables, figures) up to 100 pages per document and 32 MB per file.

python
import base64

with open("report.pdf", "rb") as f:
    pdf_data = base64.standard_b64encode(f.read()).decode("utf-8")

response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=2048,
    messages=[{
        "role": "user",
        "content": [
            {
                "type": "document",
                "source": {
                    "type": "base64",
                    "media_type": "application/pdf",
                    "data": pdf_data,
                },
                "cache_control": {"type": "ephemeral"},
            },
            {"type": "text", "text": "Summarise the key findings in 5 bullet points."}
        ]
    }],
)
print(response.content[0].text)

Output:

text
- Quarterly revenue rose 18% YoY, driven by enterprise SaaS.
- Customer churn fell to 2.1% from 3.4% a year earlier.
- Operating margin expanded 280 bps to 24.7%.
- Free cash flow conversion was 96% of net income.
- Guidance raised for the second half on improving pipeline.

Reference a PDF by URL with {"type": "url", "url": "https://…/report.pdf"} instead of base64. URL sources do not count against your upload size but must be publicly accessible. For large or reused PDFs, the Files API is a better fit — upload once, reference many times.

Citations

Citations attach machine-verifiable references to each part of Claude's response, anchored to character ranges or page numbers in the document. Enable per-document with citations: { enabled: true }. Claude returns citation blocks inside text blocks so you can render exact source spans in your UI.

python
response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=1024,
    messages=[{
        "role": "user",
        "content": [
            {
                "type": "document",
                "source": {"type": "text", "media_type": "text/plain", "data": "The capital of France is Paris. Berlin is the capital of Germany."},
                "title": "Capitals",
                "citations": {"enabled": True},
            },
            {"type": "text", "text": "What is the capital of France?"}
        ]
    }],
)

for block in response.content:
    if block.type != "text":
        continue
    print(block.text)
    for cite in block.citations or []:
        print(f"  -> {cite.cited_text!r} ({cite.start_char_index}-{cite.end_char_index})")

Output:

text
The capital of France is Paris.
  -> 'The capital of France is Paris.' (0-31)

Files API integration

Once a document is uploaded via the Files API, reference it by file_id in any subsequent message — no base64 re-upload. This is the cleanest pattern for RAG over a fixed corpus and for very large PDFs you reuse across many calls.

python
uploaded = client.beta.files.upload(file=open("manual.pdf", "rb"))

response = client.beta.messages.create(
    model="claude-opus-4-7",
    max_tokens=2048,
    messages=[{
        "role": "user",
        "content": [
            {"type": "document", "source": {"type": "file", "file_id": uploaded.id}},
            {"type": "text", "text": "What does section 4.2 say about reset behaviour?"}
        ]
    }],
    extra_headers={"anthropic-beta": "files-api-2025-04-14"},
)
print(response.content[0].text)

Output:

text
Section 4.2 states that a soft reset clears the I/O queue but preserves the
device's serial number and calibration tables, while a hard reset clears
both and reverts firmware to factory defaults.

See the full Files API reference for upload, listing, and lifecycle.

Concurrency patterns

Async batching with asyncio.gather is the fastest way to fan out independent calls. Cap concurrency with a Semaphore to stay under per-minute token and request limits — without one, even 50 simultaneous calls will trigger RateLimitError on most accounts.

python
import asyncio
import anthropic

client = anthropic.AsyncAnthropic()
sem = asyncio.Semaphore(10)   # max 10 in-flight calls

async def summarise(text: str) -> str:
    async with sem:
        resp = await client.messages.create(
            model="claude-haiku-4-5",
            max_tokens=200,
            messages=[{"role": "user", "content": f"Summarise: {text}"}],
        )
        return resp.content[0].text

async def main():
    docs = [f"Document {i} content..." for i in range(100)]
    results = await asyncio.gather(*(summarise(d) for d in docs))
    print(f"Completed {len(results)} summaries")

asyncio.run(main())

Output:

text
Completed 100 summaries

For >1000 prompts that can wait, the Batch API is 50% cheaper than this fan-out pattern. Use async fan-out only when results must be available in seconds, not hours.

FastAPI streaming endpoint

Stream Claude's response over a FastAPI StreamingResponse to forward tokens to a browser as they arrive. The browser EventSource API (or fetch with a ReadableStream) can consume the chunks in order, giving the same "typing" UX as Claude.ai with no extra infrastructure.

python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic

app = FastAPI()
client = anthropic.AsyncAnthropic()

class ChatRequest(BaseModel):
    message: str

@app.post("/chat")
async def chat(req: ChatRequest):
    async def token_stream():
        async with client.messages.stream(
            model="claude-opus-4-7",
            max_tokens=1024,
            messages=[{"role": "user", "content": req.message}],
        ) as stream:
            async for text in stream.text_stream:
                yield text
    return StreamingResponse(token_stream(), media_type="text/plain")

Test it from the command line:

bash
curl -N -X POST http://localhost:8000/chat \
    -H "Content-Type: application/json" \
    -d '{"message": "Count to three."}'

Output:

text
One. Two. Three.

Pydantic models from responses

Combine forced tool use with pydantic models to get strictly typed extraction. The forced tool guarantees JSON matching input_schema; Pydantic validates and converts on the way in.

python
from pydantic import BaseModel, Field
import anthropic, json

class Contact(BaseModel):
    name: str
    email: str = Field(pattern=r".+@.+\..+")
    role: str | None = None

tools = [{
    "name": "extract_contact",
    "description": "Extract a single contact from text.",
    "input_schema": Contact.model_json_schema(),
}]

client = anthropic.Anthropic()
response = client.messages.create(
    model="claude-opus-4-7",
    max_tokens=512,
    tools=tools,
    tool_choice={"type": "tool", "name": "extract_contact"},
    messages=[{
        "role": "user",
        "content": "Alice Dev (alice@example.com) is our staff engineer."
    }],
)

tool_use = next(b for b in response.content if b.type == "tool_use")
contact = Contact.model_validate(tool_use.input)
print(contact)

Output:

text
name='Alice Dev' email='alice@example.com' role='staff engineer'

Bedrock and Vertex variants

The SDK also ships clients for Claude on AWS Bedrock and Google Vertex AI — same API surface, different auth. Useful when your data residency, billing, or VPC requirements force you off the first-party Anthropic API.

python
# AWS Bedrock
from anthropic import AnthropicBedrock

bedrock = AnthropicBedrock(
    aws_region="us-east-1",
)
resp = bedrock.messages.create(
    model="anthropic.claude-opus-4-7-v1:0",
    max_tokens=256,
    messages=[{"role": "user", "content": "Hello from Bedrock."}],
)
print(resp.content[0].text)
python
# Google Vertex AI
from anthropic import AnthropicVertex

vertex = AnthropicVertex(
    region="us-central1",
    project_id="my-gcp-project",
)
resp = vertex.messages.create(
    model="claude-opus-4-7@20251001",
    max_tokens=256,
    messages=[{"role": "user", "content": "Hello from Vertex."}],
)
print(resp.content[0].text)

Bedrock and Vertex use AWS / GCP credentials respectively — ANTHROPIC_API_KEY is not consulted. Some features (Batch API, Files API beta) may lag the first-party API on these platforms; check the SDK release notes before relying on them.

Common pitfalls

PitfallSymptomFix
Forgetting to append assistant turnModel loses context on next callAfter tool use or any reply, append {"role": "assistant", "content": response.content} verbatim
Re-uploading the same PDF every callSlow, expensiveUse Files API or prompt caching with cache_control
temperature ≠ 1 with thinkingBadRequestErrorDrop temperature (default 1) or set it to 1 explicitly
Streaming without with blockConnection leaksAlways use with client.messages.stream(...) as stream:
Hardcoding sk-ant-… in sourceKey leaks via git historyUse ANTHROPIC_API_KEY env var; never commit keys
max_tokens too lowstop_reason == "max_tokens" mid-thoughtIncrease budget; check stop_reason after every call
Synchronous client in async codeEvent loop blocksUse AsyncAnthropic inside async def
Caching small promptsNo latency win, wasted writesOnly cache prefixes ≥ 1024 tokens (Sonnet/Opus) or ≥ 2048 (Haiku)

Common recipes

Continuation past max_tokens

python
def complete_with_continuation(prompt: str, total_budget: int = 16000) -> str:
    """Keep calling until stop_reason != 'max_tokens'."""
    parts = []
    messages = [{"role": "user", "content": prompt}]
    remaining = total_budget
    while remaining > 0:
        chunk_budget = min(4096, remaining)
        response = client.messages.create(
            model="claude-opus-4-7",
            max_tokens=chunk_budget,
            messages=messages,
        )
        text = "".join(b.text for b in response.content if b.type == "text")
        parts.append(text)
        if response.stop_reason != "max_tokens":
            break
        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": "continue"})
        remaining -= response.usage.output_tokens
    return "".join(parts)

Cost estimator

python
PRICES = {
    "claude-opus-4-7":   {"in": 15.0, "out": 75.0},   # $/1M tokens
    "claude-sonnet-4-6": {"in":  3.0, "out": 15.0},
    "claude-haiku-4-5":  {"in":  0.80, "out": 4.0},
}

def estimate_cost(model: str, in_tok: int, out_tok: int) -> float:
    p = PRICES[model]
    return (in_tok * p["in"] + out_tok * p["out"]) / 1_000_000

usage = response.usage
print(f"${estimate_cost('claude-opus-4-7', usage.input_tokens, usage.output_tokens):.4f}")

Output:

text
$0.0048

Conversation memory trimmer

python
def trim_history(messages: list, max_input_tokens: int = 100_000) -> list:
    """Drop oldest user/assistant pairs until under the cap."""
    while True:
        count = client.messages.count_tokens(
            model="claude-opus-4-7",
            messages=messages,
        ).input_tokens
        if count <= max_input_tokens or len(messages) <= 2:
            return messages
        messages = messages[2:]    # drop oldest user + assistant turn

See also