cheat sheet
httpx
Make sync and async HTTP requests with httpx. Covers GET/POST, async usage, HTTP/2, streaming, and how it compares to requests.
httpx — Modern HTTP Client
What it is
HTTPX is a full-featured HTTP client for Python by Tom Christie that supports both synchronous and asynchronous usage, HTTP/1.1 and HTTP/2, connection pooling, and a requests-compatible API surface. It is the go-to replacement for requests when you need async/await support (e.g., inside FastAPI or asyncio services) or HTTP/2 without changing how you write request code. The sync API is a true drop-in for requests; the async API pairs naturally with httpx.AsyncClient and await.
Install
pip install httpx
# HTTP/2 support (optional)
pip install "httpx[http2]"
Output: (none — exits 0 on success)
Quick example — synchronous
import httpx
resp = httpx.get("https://httpbin.org/get", params={"tool": "httpx"})
resp.raise_for_status()
print(resp.status_code)
print(resp.json()["args"])
Output:
200
{'tool': 'httpx'}
Async example
import asyncio
import httpx
async def fetch_all(urls: list[str]) -> list[dict]:
async with httpx.AsyncClient(timeout=10) as client:
tasks = [client.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
return [r.json() for r in responses]
urls = [
"https://httpbin.org/get?n=1",
"https://httpbin.org/get?n=2",
"https://httpbin.org/get?n=3",
]
results = asyncio.run(fetch_all(urls))
print([r["args"] for r in results])
Output:
[{'n': '1'}, {'n': '2'}, {'n': '3'}]
When / why to use it over requests
| Situation | Use |
|---|---|
You need async/await | httpx |
| You want HTTP/2 | httpx |
| Type annotations matter to you | httpx (fully typed) |
| Simple sync-only scripts | Either (requests has wider ecosystem) |
Existing requests codebase | Keep requests unless you have a reason to migrate |
Common pitfalls
AsyncClientmust be used as a context manager — creatinghttpx.AsyncClient()withoutasync withleaks the connection pool. Always useasync with httpx.AsyncClient() as client:.
Default timeout is 5 seconds — unlike
requests,httpxhas a default timeout. You can increase or disable it:timeout=httpx.Timeout(30.0)ortimeout=None.
httpxraiseshttpx.HTTPStatusError(notrequests.HTTPError) when you callraise_for_status(). Handle imports accordingly if porting fromrequests.
Richer example — POST with auth and retry
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5))
def post_event(payload: dict) -> dict:
with httpx.Client(timeout=10) as client:
resp = client.post(
"https://httpbin.org/post",
json=payload,
headers={"Authorization": "Bearer my-token"},
)
resp.raise_for_status()
return resp.json()["json"]
result = post_event({"event": "page_view", "user": "alice"})
print(result)
Output:
{'event': 'page_view', 'user': 'alice'}
tenacityis a separate retry library (pip install tenacity).httpxdoes not include built-in retry logic the wayrequests+HTTPAdapterdoes.
Sync vs async — choosing a client
httpx.Client (sync) and httpx.AsyncClient are nearly identical APIs that share the same underlying connection pool, transport, and event hooks. The choice is determined by the surrounding code, not the request itself: if you're inside an asyncio event loop (FastAPI, Litestar, an asyncio.run script) use AsyncClient; otherwise use Client. Reuse the client across calls — building it for each request defeats connection pooling and TLS session reuse.
import httpx
# Sync — for CLIs, scripts, sync Flask/Django views
with httpx.Client(base_url="https://api.example.com", timeout=10) as client:
r = client.get("/users", params={"page": 1})
# Async — for FastAPI, Litestar, asyncio code
import asyncio
async def fetch():
async with httpx.AsyncClient(base_url="https://api.example.com", timeout=10) as client:
r = await client.get("/users", params={"page": 1})
return r.json()
asyncio.run(fetch())
# One-off top-level calls — convenient, but create-and-throw-away the client
httpx.get("https://httpbin.org/get") # no connection reuse
httpx.post("https://httpbin.org/post", json={"k": "v"})
Inside FastAPI, create the
AsyncClientonce in a lifespan handler and stash it onapp.state.http. The single shared client reuses connections across every request and shuts down cleanly on app exit.
HTTP/2
HTTP/2 multiplexes many requests over one TCP connection — a big win for chatty APIs. httpx supports HTTP/2 when you install the [http2] extra and pass http2=True to the client. The protocol is opportunistic: if the server doesn't advertise HTTP/2 via ALPN, the connection falls back to HTTP/1.1 transparently.
# pip install "httpx[http2]"
import httpx
with httpx.Client(http2=True, timeout=10) as client:
r = client.get("https://nghttp2.org/httpbin/get")
print(r.http_version) # "HTTP/2"
print(r.headers)
# Async + HTTP/2 fan-out — multiplexed over one TLS connection
async def fan_out(urls: list[str]) -> list[dict]:
async with httpx.AsyncClient(http2=True, timeout=10) as client:
rs = await asyncio.gather(*(client.get(u) for u in urls))
return [r.json() for r in rs]
Timeouts — granular control
httpx.Timeout separates four phases: connect, read, write, and pool (how long to wait for a free connection from the pool). The default is Timeout(5.0) for all four. You can set a single value, pass None to disable a phase, or compose phase-specific timeouts.
import httpx
# Single value — applies to all four phases
client = httpx.Client(timeout=10)
# Detailed per-phase timeout
client = httpx.Client(timeout=httpx.Timeout(
connect=5.0, # TCP + TLS handshake
read=30.0, # between socket reads
write=10.0, # between socket writes (large uploads)
pool=5.0, # waiting for a free pool slot
))
# Per-request override
r = client.get(url, timeout=httpx.Timeout(60))
# Disable entirely (NOT recommended)
client = httpx.Client(timeout=None)
The default 5 s timeout catches almost everyone migrating from
requestsoff-guard. A long-running upload or a polling endpoint will raisehttpx.ReadTimeoutafter 5 s unless you raise the limit.
Connection pooling and limits
httpx.Limits controls how many connections the client keeps open in total, per host, and how long idle connections stay alive. The defaults (100 max, 20 keepalive, 5 s keepalive) are fine for most services; raise them for high-throughput fan-out, lower them when you're hitting a small upstream that wants gentler clients.
import httpx
limits = httpx.Limits(
max_keepalive_connections=20, # idle connections kept in the pool
max_connections=100, # hard ceiling per client
keepalive_expiry=10.0, # seconds before idle conns are reaped
)
client = httpx.Client(limits=limits, timeout=10)
# Inspect the pool while the client is open
import httpcore
transport = client._transport # httpx.HTTPTransport
# (Use transport-level introspection sparingly — it's not public API.)
Retries — via transport, hooks, or tenacity
Unlike requests, httpx does not ship a high-level retry policy. You have three options, ordered by capability:
- Transport-level retries for connection-level errors only (
HTTPTransport(retries=N)). Does not retry HTTP statuses. - An event hook for application-level retries — good for honoring
Retry-After. - The
tenacitylibrary wrapping the call site — most expressive, easy to test.
import httpx
# 1. Transport-level — retries failed connects, NOT non-2xx responses
transport = httpx.HTTPTransport(retries=3)
client = httpx.Client(transport=transport, timeout=10)
# 2. Hook-based — handle 429/503 with Retry-After
import time
def retry_on_429(response: httpx.Response):
if response.status_code == 429:
time.sleep(float(response.headers.get("Retry-After", 1)))
client = httpx.Client(event_hooks={"response": [retry_on_429]}, timeout=10)
# 3. tenacity — full-featured retry policy (pip install tenacity)
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=0.5, min=1, max=30),
retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError)),
reraise=True,
)
def fetch(url: str) -> dict:
with httpx.Client(timeout=10) as client:
r = client.get(url)
r.raise_for_status()
return r.json()
Event hooks
Hooks run synchronously (or asynchronously, with AsyncClient) before each request and after each response. They're the right place for logging, metrics, distributed tracing, and centralised error handling — every call through the client gets the behavior without changes at the call site.
import logging
log = logging.getLogger(__name__)
def log_request(request: httpx.Request):
log.info("→ %s %s", request.method, request.url)
def log_response(response: httpx.Response):
request = response.request
log.info("← %s %s -> %d %dms",
request.method, request.url, response.status_code,
int(response.elapsed.total_seconds() * 1000))
def raise_on_4xx_5xx(response: httpx.Response):
response.raise_for_status()
client = httpx.Client(
event_hooks={
"request": [log_request],
"response": [log_response, raise_on_4xx_5xx],
},
)
# Async hooks for AsyncClient must be async functions
async def alog_response(response: httpx.Response):
log.info("← %d %s", response.status_code, response.request.url)
client = httpx.AsyncClient(event_hooks={"response": [alog_response]})
File uploads and multipart
Use files= with a dict, list of tuples, or a tuple of (filename, fileobj, content_type). httpx streams the upload from the file object so even multi-gigabyte files don't exhaust memory.
with httpx.Client() as client:
# Single file
with open("photo.jpg", "rb") as f:
r = client.post(
"https://api.example.com/uploads",
files={"photo": ("photo.jpg", f, "image/jpeg")},
data={"caption": "Sunset"},
)
# Multiple files
files = [
("photos", ("a.jpg", open("a.jpg", "rb"), "image/jpeg")),
("photos", ("b.jpg", open("b.jpg", "rb"), "image/jpeg")),
]
r = client.post("https://api.example.com/album", files=files)
Streaming responses
client.stream(method, url) returns a context manager that yields a streaming Response — its body is not consumed up front. Iterate with aiter_bytes() / iter_bytes() (raw), iter_text() (decoded), iter_lines() (line by line), or aiter_lines() for async.
# Sync streaming download
with httpx.Client() as client:
with client.stream("GET", "https://example.com/large.zip") as resp:
resp.raise_for_status()
with open("large.zip", "wb") as f:
for chunk in resp.iter_bytes(chunk_size=64 * 1024):
f.write(chunk)
# Async streaming — NDJSON event source
async def consume_events():
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream("GET", "https://example.com/events") as r:
r.raise_for_status()
async for line in r.aiter_lines():
if line:
yield json.loads(line)
Inside a
stream(...)context,resp.text,resp.content, andresp.json()are not available — the body hasn't been read yet. Either iterate it manually or callresp.read()first.
OAuth2 and Bearer authentication
Pass auth=... to either the client or a single request. httpx.BasicAuth and httpx.DigestAuth ship in the box; for OAuth2 / Bearer tokens, subclass httpx.Auth — your auth_flow(request) generator can read the response, decide whether to refresh, yield a new request, and so on.
import httpx
# Built-ins
auth = httpx.BasicAuth("alicedev", "pw")
auth = httpx.DigestAuth("alicedev", "pw")
# Simple Bearer token
class BearerAuth(httpx.Auth):
def __init__(self, token: str):
self.token = token
def auth_flow(self, request):
request.headers["Authorization"] = f"Bearer {self.token}"
yield request
# OAuth2 — refresh on 401, retry the original request
class OAuth2Auth(httpx.Auth):
requires_response_body = True
def __init__(self, access_token: str, refresh_token: str, token_url: str):
self.access_token = access_token
self.refresh_token = refresh_token
self.token_url = token_url
def auth_flow(self, request):
request.headers["Authorization"] = f"Bearer {self.access_token}"
response = yield request
if response.status_code == 401:
# Refresh and retry once
refresh_req = httpx.Request("POST", self.token_url, data={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
})
refresh_resp = yield refresh_req
tokens = refresh_resp.json()
self.access_token = tokens["access_token"]
request.headers["Authorization"] = f"Bearer {self.access_token}"
yield request
with httpx.Client(auth=OAuth2Auth(access, refresh, "https://auth.example.com/token")) as c:
r = c.get("https://api.example.com/me")
TLS, mTLS, and proxies
httpx uses certifi's CA bundle by default. Override per client with verify= (path to a CA bundle or False for testing only), supply client certificates with cert=, and set proxies with proxies= or environment variables (HTTPS_PROXY, HTTP_PROXY, NO_PROXY).
import httpx, ssl
# Custom CA bundle
client = httpx.Client(verify="/etc/ssl/certs/internal-ca.pem")
# Mutual TLS
client = httpx.Client(cert=("client.crt", "client.key"))
# Per-protocol proxy
client = httpx.Client(proxies={
"http://": "http://proxy.internal:3128",
"https://": "http://proxy.internal:3128",
})
# Fine-grained SSL context (e.g. min TLS version, custom ciphers)
ctx = ssl.create_default_context()
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
client = httpx.Client(verify=ctx)
Testing with MockTransport
MockTransport swaps the network for a callback that returns canned responses. Combined with app= (an ASGI app), httpx is the standard test client for FastAPI, Litestar, and Starlette — no port binding, no event loop wrangling.
import httpx
def handler(request: httpx.Request) -> httpx.Response:
if request.url.path == "/users/1":
return httpx.Response(200, json={"id": 1, "name": "Alice Dev"})
return httpx.Response(404, json={"error": "not found"})
transport = httpx.MockTransport(handler)
with httpx.Client(transport=transport, base_url="https://api.example.com") as client:
r = client.get("/users/1")
assert r.json()["name"] == "Alice Dev"
r = client.get("/users/999")
assert r.status_code == 404
# Drive an ASGI app in-process — the test client used by FastAPI/Litestar
from fastapi import FastAPI
app = FastAPI()
@app.get("/health")
def health():
return {"ok": True}
transport = httpx.ASGITransport(app=app)
with httpx.Client(transport=transport, base_url="http://testserver") as client:
assert client.get("/health").json() == {"ok": True}
httpx vs requests — side by side
The sync httpx API is a near drop-in for requests — the same kwargs, the same Response object — with a handful of meaningful differences. Use this table when porting a requests codebase or reviewing PRs that mix the two libraries.
| Behavior | requests | httpx |
|---|---|---|
| Default timeout | None (hangs forever) | 5 s on all phases |
| Async support | No | Yes (AsyncClient) |
| HTTP/2 | No | Yes ([http2] extra) |
| Type hints | Partial, runtime checks | Full, strictly typed |
| Built-in retries | urllib3.Retry via adapters | None — use transport retries for connect-level, tenacity for app-level |
Response on raise_for_status() | requests.HTTPError | httpx.HTTPStatusError |
| Stream API | r = requests.get(url, stream=True); r.iter_content() | with client.stream(...) as r: r.iter_bytes() |
| Test injection | responses / requests-mock libraries | Built-in MockTransport, ASGITransport |
| Cookies persistence | requests.Session | httpx.Client (default cookies persist) |
| Proxies kwarg | proxies={"http": ..., "https": ...} | proxies={"http://": ..., "https://": ...} (trailing slash, scheme keys) |
| Connection pool tuning | HTTPAdapter(pool_*) | httpx.Limits(max_connections=..., max_keepalive_connections=...) |
| URL building | Path concatenation | base_url="..." joins respect RFC 3986 |
Cross-link: see requests for the sync-only ancestor library and asyncio for the concurrency runtime AsyncClient runs on.
# Porting a requests snippet to httpx
# Before
import requests
with requests.Session() as s:
s.headers["Authorization"] = "Bearer t"
r = s.get("https://api.example.com/me", timeout=10)
r.raise_for_status()
print(r.json())
# After (identical API except imports + timeout default)
import httpx
with httpx.Client(headers={"Authorization": "Bearer t"}, timeout=10) as s:
r = s.get("https://api.example.com/me")
r.raise_for_status()
print(r.json())
Exception hierarchy
httpx raises subclasses of httpx.HTTPError. The most useful split is between transport errors (TransportError and its children — ConnectError, ConnectTimeout, ReadTimeout, …) and status errors (HTTPStatusError from raise_for_status()).
import httpx
try:
r = client.get("https://api.example.com/x", timeout=5)
r.raise_for_status()
except httpx.ConnectTimeout:
log.warning("Could not connect — network or DNS issue")
except httpx.ReadTimeout:
log.warning("Server stalled mid-response")
except httpx.HTTPStatusError as e:
log.error("HTTP %d: %s", e.response.status_code, e.response.text[:200])
except httpx.RequestError as e:
log.exception("Transport error: %s", e)
| Exception | Raised on |
|---|---|
RequestError | Base for transport errors |
TransportError | Base for connection-level failures |
ConnectError | TCP / TLS handshake failed |
ConnectTimeout | Could not connect in time |
ReadTimeout | No data within read timeout |
WriteTimeout | Could not write within write timeout |
PoolTimeout | No free pool slot within pool timeout |
NetworkError | OS-level network failure |
HTTPStatusError | Raised by raise_for_status() for 4xx/5xx |
DecodingError | Bad content encoding |
TooManyRedirects | Redirect chain exceeded max_redirects |
Common pitfalls (extended)
Calling
r.json()insideclient.stream(...)— the body hasn't been buffered yet. Either iterate the stream manually or callr.read()first to load the whole body into memory.
Reusing an
AsyncClientacrossasyncio.run()calls —AsyncClientis bound to the event loop it was created on. Create it insidelifespan(FastAPI) orasync withblock, not at module scope.
Forgetting
proxiesuses scheme-keys with trailing slash —{"https://": "..."}works;{"https": "..."}(therequestsform) is silently ignored.
Use
client.build_request(...)to construct a request without sending it — handy for signing (HMAC) where you need to compute the signature off the canonical request before transmission.
Real-world recipes
Concurrent fan-out with bounded parallelism
asyncio.gather runs everything at once — fine for 10 calls, deadly for 10 000. Bound concurrency with a semaphore so you don't melt the upstream.
import asyncio, httpx
async def fetch_all(urls: list[str], *, concurrency: int = 20) -> list[dict]:
sem = asyncio.Semaphore(concurrency)
async with httpx.AsyncClient(timeout=10, http2=True) as client:
async def fetch(url: str) -> dict:
async with sem:
r = await client.get(url)
r.raise_for_status()
return r.json()
return await asyncio.gather(*(fetch(u) for u in urls))
urls = [f"https://api.example.com/items/{i}" for i in range(1000)]
results = asyncio.run(fetch_all(urls, concurrency=50))
A reusable API client class
class GitHubClient:
def __init__(self, token: str, *, timeout: float = 10.0):
self._client = httpx.Client(
base_url="https://api.github.com",
headers={
"Accept": "application/vnd.github+json",
"Authorization": f"Bearer {token}",
"X-GitHub-Api-Version": "2022-11-28",
},
timeout=timeout,
event_hooks={"response": [self._raise_for_status]},
)
@staticmethod
def _raise_for_status(r: httpx.Response):
r.raise_for_status()
def close(self) -> None:
self._client.close()
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
def list_repos(self, user: str, *, per_page: int = 100) -> list[dict]:
return self._client.get(f"/users/{user}/repos", params={"per_page": per_page}).json()
with GitHubClient(token=os.environ["GITHUB_TOKEN"]) as gh:
for r in gh.list_repos("python"):
print(r["full_name"])
Server-Sent Events (SSE) client
async def sse_consume(url: str):
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream("GET", url, headers={"Accept": "text/event-stream"}) as r:
event = {}
async for line in r.aiter_lines():
if line == "":
if event:
yield event
event = {}
continue
if line.startswith(":"):
continue
k, _, v = line.partition(":")
event[k.strip()] = v.lstrip()
Quick reference
| Task | Code |
|---|---|
| One-off GET | httpx.get(url, timeout=10).json() |
| Sync client | with httpx.Client(base_url=..., timeout=10) as c: ... |
| Async client | async with httpx.AsyncClient(timeout=10) as c: ... |
| POST JSON | c.post(url, json={...}) |
| POST form | c.post(url, data={...}) |
| Upload file | c.post(url, files={"f": open("a.png", "rb")}) |
| Query params | c.get(url, params={"q": "x"}) |
| Headers | c.get(url, headers={"Auth": "Bearer t"}) |
| Cookies | c.get(url, cookies={"k": "v"}) |
| Timeout | httpx.Timeout(connect=5, read=30) |
| Pool limits | httpx.Limits(max_connections=100, max_keepalive_connections=20) |
| HTTP/2 | httpx.Client(http2=True) (after pip install "httpx[http2]") |
| Stream download | with c.stream("GET", url) as r: r.iter_bytes() |
| Stream lines | for line in r.iter_lines(): ... |
| Async stream | async for chunk in r.aiter_bytes(): ... |
| Event hooks | httpx.Client(event_hooks={"response": [fn]}) |
| Bearer auth | httpx.Client(auth=BearerAuth(token)) (custom httpx.Auth) |
| mTLS cert | httpx.Client(cert=("client.crt", "client.key")) |
| Custom CA | httpx.Client(verify="/path/ca.pem") |
| Proxy | httpx.Client(proxies={"https://": "http://..."}) |
| Mock transport | httpx.Client(transport=httpx.MockTransport(handler)) |
| ASGI app driver | httpx.AsyncClient(transport=httpx.ASGITransport(app=app)) |
| Raise on 4xx/5xx | resp.raise_for_status() |
| Response | .status_code / .text / .content / .json() / .http_version |