cheat sheet

httpx

Make sync and async HTTP requests with httpx. Covers GET/POST, async usage, HTTP/2, streaming, and how it compares to requests.

httpx — Modern HTTP Client

What it is

HTTPX is a full-featured HTTP client for Python by Tom Christie that supports both synchronous and asynchronous usage, HTTP/1.1 and HTTP/2, connection pooling, and a requests-compatible API surface. It is the go-to replacement for requests when you need async/await support (e.g., inside FastAPI or asyncio services) or HTTP/2 without changing how you write request code. The sync API is a true drop-in for requests; the async API pairs naturally with httpx.AsyncClient and await.

Install

bash
pip install httpx
# HTTP/2 support (optional)
pip install "httpx[http2]"

Output: (none — exits 0 on success)

Quick example — synchronous

python
import httpx

resp = httpx.get("https://httpbin.org/get", params={"tool": "httpx"})
resp.raise_for_status()
print(resp.status_code)
print(resp.json()["args"])

Output:

text
200
{'tool': 'httpx'}

Async example

python
import asyncio
import httpx

async def fetch_all(urls: list[str]) -> list[dict]:
    async with httpx.AsyncClient(timeout=10) as client:
        tasks = [client.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        return [r.json() for r in responses]

urls = [
    "https://httpbin.org/get?n=1",
    "https://httpbin.org/get?n=2",
    "https://httpbin.org/get?n=3",
]
results = asyncio.run(fetch_all(urls))
print([r["args"] for r in results])

Output:

text
[{'n': '1'}, {'n': '2'}, {'n': '3'}]

When / why to use it over requests

SituationUse
You need async/awaithttpx
You want HTTP/2httpx
Type annotations matter to youhttpx (fully typed)
Simple sync-only scriptsEither (requests has wider ecosystem)
Existing requests codebaseKeep requests unless you have a reason to migrate

Common pitfalls

AsyncClient must be used as a context manager — creating httpx.AsyncClient() without async with leaks the connection pool. Always use async with httpx.AsyncClient() as client:.

Default timeout is 5 seconds — unlike requests, httpx has a default timeout. You can increase or disable it: timeout=httpx.Timeout(30.0) or timeout=None.

httpx raises httpx.HTTPStatusError (not requests.HTTPError) when you call raise_for_status(). Handle imports accordingly if porting from requests.

Richer example — POST with auth and retry

python
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=0.5))
def post_event(payload: dict) -> dict:
    with httpx.Client(timeout=10) as client:
        resp = client.post(
            "https://httpbin.org/post",
            json=payload,
            headers={"Authorization": "Bearer my-token"},
        )
        resp.raise_for_status()
        return resp.json()["json"]

result = post_event({"event": "page_view", "user": "alice"})
print(result)

Output:

text
{'event': 'page_view', 'user': 'alice'}

tenacity is a separate retry library (pip install tenacity). httpx does not include built-in retry logic the way requests + HTTPAdapter does.

Sync vs async — choosing a client

httpx.Client (sync) and httpx.AsyncClient are nearly identical APIs that share the same underlying connection pool, transport, and event hooks. The choice is determined by the surrounding code, not the request itself: if you're inside an asyncio event loop (FastAPI, Litestar, an asyncio.run script) use AsyncClient; otherwise use Client. Reuse the client across calls — building it for each request defeats connection pooling and TLS session reuse.

python
import httpx

# Sync — for CLIs, scripts, sync Flask/Django views
with httpx.Client(base_url="https://api.example.com", timeout=10) as client:
    r = client.get("/users", params={"page": 1})

# Async — for FastAPI, Litestar, asyncio code
import asyncio
async def fetch():
    async with httpx.AsyncClient(base_url="https://api.example.com", timeout=10) as client:
        r = await client.get("/users", params={"page": 1})
        return r.json()
asyncio.run(fetch())

# One-off top-level calls — convenient, but create-and-throw-away the client
httpx.get("https://httpbin.org/get")          # no connection reuse
httpx.post("https://httpbin.org/post", json={"k": "v"})

Inside FastAPI, create the AsyncClient once in a lifespan handler and stash it on app.state.http. The single shared client reuses connections across every request and shuts down cleanly on app exit.

HTTP/2

HTTP/2 multiplexes many requests over one TCP connection — a big win for chatty APIs. httpx supports HTTP/2 when you install the [http2] extra and pass http2=True to the client. The protocol is opportunistic: if the server doesn't advertise HTTP/2 via ALPN, the connection falls back to HTTP/1.1 transparently.

python
# pip install "httpx[http2]"
import httpx

with httpx.Client(http2=True, timeout=10) as client:
    r = client.get("https://nghttp2.org/httpbin/get")
    print(r.http_version)         # "HTTP/2"
    print(r.headers)
python
# Async + HTTP/2 fan-out — multiplexed over one TLS connection
async def fan_out(urls: list[str]) -> list[dict]:
    async with httpx.AsyncClient(http2=True, timeout=10) as client:
        rs = await asyncio.gather(*(client.get(u) for u in urls))
        return [r.json() for r in rs]

Timeouts — granular control

httpx.Timeout separates four phases: connect, read, write, and pool (how long to wait for a free connection from the pool). The default is Timeout(5.0) for all four. You can set a single value, pass None to disable a phase, or compose phase-specific timeouts.

python
import httpx

# Single value — applies to all four phases
client = httpx.Client(timeout=10)

# Detailed per-phase timeout
client = httpx.Client(timeout=httpx.Timeout(
    connect=5.0,       # TCP + TLS handshake
    read=30.0,         # between socket reads
    write=10.0,        # between socket writes (large uploads)
    pool=5.0,          # waiting for a free pool slot
))

# Per-request override
r = client.get(url, timeout=httpx.Timeout(60))

# Disable entirely (NOT recommended)
client = httpx.Client(timeout=None)

The default 5 s timeout catches almost everyone migrating from requests off-guard. A long-running upload or a polling endpoint will raise httpx.ReadTimeout after 5 s unless you raise the limit.

Connection pooling and limits

httpx.Limits controls how many connections the client keeps open in total, per host, and how long idle connections stay alive. The defaults (100 max, 20 keepalive, 5 s keepalive) are fine for most services; raise them for high-throughput fan-out, lower them when you're hitting a small upstream that wants gentler clients.

python
import httpx

limits = httpx.Limits(
    max_keepalive_connections=20,    # idle connections kept in the pool
    max_connections=100,             # hard ceiling per client
    keepalive_expiry=10.0,           # seconds before idle conns are reaped
)
client = httpx.Client(limits=limits, timeout=10)

# Inspect the pool while the client is open
import httpcore
transport = client._transport      # httpx.HTTPTransport
# (Use transport-level introspection sparingly — it's not public API.)

Retries — via transport, hooks, or tenacity

Unlike requests, httpx does not ship a high-level retry policy. You have three options, ordered by capability:

  1. Transport-level retries for connection-level errors only (HTTPTransport(retries=N)). Does not retry HTTP statuses.
  2. An event hook for application-level retries — good for honoring Retry-After.
  3. The tenacity library wrapping the call site — most expressive, easy to test.
python
import httpx

# 1. Transport-level — retries failed connects, NOT non-2xx responses
transport = httpx.HTTPTransport(retries=3)
client = httpx.Client(transport=transport, timeout=10)

# 2. Hook-based — handle 429/503 with Retry-After
import time

def retry_on_429(response: httpx.Response):
    if response.status_code == 429:
        time.sleep(float(response.headers.get("Retry-After", 1)))

client = httpx.Client(event_hooks={"response": [retry_on_429]}, timeout=10)

# 3. tenacity — full-featured retry policy (pip install tenacity)
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=0.5, min=1, max=30),
    retry=retry_if_exception_type((httpx.TimeoutException, httpx.HTTPStatusError)),
    reraise=True,
)
def fetch(url: str) -> dict:
    with httpx.Client(timeout=10) as client:
        r = client.get(url)
        r.raise_for_status()
        return r.json()

Event hooks

Hooks run synchronously (or asynchronously, with AsyncClient) before each request and after each response. They're the right place for logging, metrics, distributed tracing, and centralised error handling — every call through the client gets the behavior without changes at the call site.

python
import logging
log = logging.getLogger(__name__)

def log_request(request: httpx.Request):
    log.info("→ %s %s", request.method, request.url)

def log_response(response: httpx.Response):
    request = response.request
    log.info("← %s %s -> %d %dms",
             request.method, request.url, response.status_code,
             int(response.elapsed.total_seconds() * 1000))

def raise_on_4xx_5xx(response: httpx.Response):
    response.raise_for_status()

client = httpx.Client(
    event_hooks={
        "request": [log_request],
        "response": [log_response, raise_on_4xx_5xx],
    },
)
python
# Async hooks for AsyncClient must be async functions
async def alog_response(response: httpx.Response):
    log.info("← %d %s", response.status_code, response.request.url)

client = httpx.AsyncClient(event_hooks={"response": [alog_response]})

File uploads and multipart

Use files= with a dict, list of tuples, or a tuple of (filename, fileobj, content_type). httpx streams the upload from the file object so even multi-gigabyte files don't exhaust memory.

python
with httpx.Client() as client:
    # Single file
    with open("photo.jpg", "rb") as f:
        r = client.post(
            "https://api.example.com/uploads",
            files={"photo": ("photo.jpg", f, "image/jpeg")},
            data={"caption": "Sunset"},
        )

    # Multiple files
    files = [
        ("photos", ("a.jpg", open("a.jpg", "rb"), "image/jpeg")),
        ("photos", ("b.jpg", open("b.jpg", "rb"), "image/jpeg")),
    ]
    r = client.post("https://api.example.com/album", files=files)

Streaming responses

client.stream(method, url) returns a context manager that yields a streaming Response — its body is not consumed up front. Iterate with aiter_bytes() / iter_bytes() (raw), iter_text() (decoded), iter_lines() (line by line), or aiter_lines() for async.

python
# Sync streaming download
with httpx.Client() as client:
    with client.stream("GET", "https://example.com/large.zip") as resp:
        resp.raise_for_status()
        with open("large.zip", "wb") as f:
            for chunk in resp.iter_bytes(chunk_size=64 * 1024):
                f.write(chunk)

# Async streaming — NDJSON event source
async def consume_events():
    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream("GET", "https://example.com/events") as r:
            r.raise_for_status()
            async for line in r.aiter_lines():
                if line:
                    yield json.loads(line)

Inside a stream(...) context, resp.text, resp.content, and resp.json() are not available — the body hasn't been read yet. Either iterate it manually or call resp.read() first.

OAuth2 and Bearer authentication

Pass auth=... to either the client or a single request. httpx.BasicAuth and httpx.DigestAuth ship in the box; for OAuth2 / Bearer tokens, subclass httpx.Auth — your auth_flow(request) generator can read the response, decide whether to refresh, yield a new request, and so on.

python
import httpx

# Built-ins
auth = httpx.BasicAuth("alicedev", "pw")
auth = httpx.DigestAuth("alicedev", "pw")

# Simple Bearer token
class BearerAuth(httpx.Auth):
    def __init__(self, token: str):
        self.token = token
    def auth_flow(self, request):
        request.headers["Authorization"] = f"Bearer {self.token}"
        yield request

# OAuth2 — refresh on 401, retry the original request
class OAuth2Auth(httpx.Auth):
    requires_response_body = True

    def __init__(self, access_token: str, refresh_token: str, token_url: str):
        self.access_token = access_token
        self.refresh_token = refresh_token
        self.token_url = token_url

    def auth_flow(self, request):
        request.headers["Authorization"] = f"Bearer {self.access_token}"
        response = yield request
        if response.status_code == 401:
            # Refresh and retry once
            refresh_req = httpx.Request("POST", self.token_url, data={
                "grant_type": "refresh_token",
                "refresh_token": self.refresh_token,
            })
            refresh_resp = yield refresh_req
            tokens = refresh_resp.json()
            self.access_token = tokens["access_token"]
            request.headers["Authorization"] = f"Bearer {self.access_token}"
            yield request

with httpx.Client(auth=OAuth2Auth(access, refresh, "https://auth.example.com/token")) as c:
    r = c.get("https://api.example.com/me")

TLS, mTLS, and proxies

httpx uses certifi's CA bundle by default. Override per client with verify= (path to a CA bundle or False for testing only), supply client certificates with cert=, and set proxies with proxies= or environment variables (HTTPS_PROXY, HTTP_PROXY, NO_PROXY).

python
import httpx, ssl

# Custom CA bundle
client = httpx.Client(verify="/etc/ssl/certs/internal-ca.pem")

# Mutual TLS
client = httpx.Client(cert=("client.crt", "client.key"))

# Per-protocol proxy
client = httpx.Client(proxies={
    "http://":  "http://proxy.internal:3128",
    "https://": "http://proxy.internal:3128",
})

# Fine-grained SSL context (e.g. min TLS version, custom ciphers)
ctx = ssl.create_default_context()
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
client = httpx.Client(verify=ctx)

Testing with MockTransport

MockTransport swaps the network for a callback that returns canned responses. Combined with app= (an ASGI app), httpx is the standard test client for FastAPI, Litestar, and Starlette — no port binding, no event loop wrangling.

python
import httpx

def handler(request: httpx.Request) -> httpx.Response:
    if request.url.path == "/users/1":
        return httpx.Response(200, json={"id": 1, "name": "Alice Dev"})
    return httpx.Response(404, json={"error": "not found"})

transport = httpx.MockTransport(handler)
with httpx.Client(transport=transport, base_url="https://api.example.com") as client:
    r = client.get("/users/1")
    assert r.json()["name"] == "Alice Dev"

    r = client.get("/users/999")
    assert r.status_code == 404
python
# Drive an ASGI app in-process — the test client used by FastAPI/Litestar
from fastapi import FastAPI
app = FastAPI()

@app.get("/health")
def health():
    return {"ok": True}

transport = httpx.ASGITransport(app=app)
with httpx.Client(transport=transport, base_url="http://testserver") as client:
    assert client.get("/health").json() == {"ok": True}

httpx vs requests — side by side

The sync httpx API is a near drop-in for requests — the same kwargs, the same Response object — with a handful of meaningful differences. Use this table when porting a requests codebase or reviewing PRs that mix the two libraries.

Behaviorrequestshttpx
Default timeoutNone (hangs forever)5 s on all phases
Async supportNoYes (AsyncClient)
HTTP/2NoYes ([http2] extra)
Type hintsPartial, runtime checksFull, strictly typed
Built-in retriesurllib3.Retry via adaptersNone — use transport retries for connect-level, tenacity for app-level
Response on raise_for_status()requests.HTTPErrorhttpx.HTTPStatusError
Stream APIr = requests.get(url, stream=True); r.iter_content()with client.stream(...) as r: r.iter_bytes()
Test injectionresponses / requests-mock librariesBuilt-in MockTransport, ASGITransport
Cookies persistencerequests.Sessionhttpx.Client (default cookies persist)
Proxies kwargproxies={"http": ..., "https": ...}proxies={"http://": ..., "https://": ...} (trailing slash, scheme keys)
Connection pool tuningHTTPAdapter(pool_*)httpx.Limits(max_connections=..., max_keepalive_connections=...)
URL buildingPath concatenationbase_url="..." joins respect RFC 3986

Cross-link: see requests for the sync-only ancestor library and asyncio for the concurrency runtime AsyncClient runs on.

python
# Porting a requests snippet to httpx
# Before
import requests
with requests.Session() as s:
    s.headers["Authorization"] = "Bearer t"
    r = s.get("https://api.example.com/me", timeout=10)
    r.raise_for_status()
    print(r.json())

# After (identical API except imports + timeout default)
import httpx
with httpx.Client(headers={"Authorization": "Bearer t"}, timeout=10) as s:
    r = s.get("https://api.example.com/me")
    r.raise_for_status()
    print(r.json())

Exception hierarchy

httpx raises subclasses of httpx.HTTPError. The most useful split is between transport errors (TransportError and its children — ConnectError, ConnectTimeout, ReadTimeout, …) and status errors (HTTPStatusError from raise_for_status()).

python
import httpx

try:
    r = client.get("https://api.example.com/x", timeout=5)
    r.raise_for_status()
except httpx.ConnectTimeout:
    log.warning("Could not connect — network or DNS issue")
except httpx.ReadTimeout:
    log.warning("Server stalled mid-response")
except httpx.HTTPStatusError as e:
    log.error("HTTP %d: %s", e.response.status_code, e.response.text[:200])
except httpx.RequestError as e:
    log.exception("Transport error: %s", e)
ExceptionRaised on
RequestErrorBase for transport errors
TransportErrorBase for connection-level failures
ConnectErrorTCP / TLS handshake failed
ConnectTimeoutCould not connect in time
ReadTimeoutNo data within read timeout
WriteTimeoutCould not write within write timeout
PoolTimeoutNo free pool slot within pool timeout
NetworkErrorOS-level network failure
HTTPStatusErrorRaised by raise_for_status() for 4xx/5xx
DecodingErrorBad content encoding
TooManyRedirectsRedirect chain exceeded max_redirects

Common pitfalls (extended)

Calling r.json() inside client.stream(...) — the body hasn't been buffered yet. Either iterate the stream manually or call r.read() first to load the whole body into memory.

Reusing an AsyncClient across asyncio.run() callsAsyncClient is bound to the event loop it was created on. Create it inside lifespan (FastAPI) or async with block, not at module scope.

Forgetting proxies uses scheme-keys with trailing slash{"https://": "..."} works; {"https": "..."} (the requests form) is silently ignored.

Use client.build_request(...) to construct a request without sending it — handy for signing (HMAC) where you need to compute the signature off the canonical request before transmission.

Real-world recipes

Concurrent fan-out with bounded parallelism

asyncio.gather runs everything at once — fine for 10 calls, deadly for 10 000. Bound concurrency with a semaphore so you don't melt the upstream.

python
import asyncio, httpx

async def fetch_all(urls: list[str], *, concurrency: int = 20) -> list[dict]:
    sem = asyncio.Semaphore(concurrency)
    async with httpx.AsyncClient(timeout=10, http2=True) as client:
        async def fetch(url: str) -> dict:
            async with sem:
                r = await client.get(url)
                r.raise_for_status()
                return r.json()
        return await asyncio.gather(*(fetch(u) for u in urls))

urls = [f"https://api.example.com/items/{i}" for i in range(1000)]
results = asyncio.run(fetch_all(urls, concurrency=50))

A reusable API client class

python
class GitHubClient:
    def __init__(self, token: str, *, timeout: float = 10.0):
        self._client = httpx.Client(
            base_url="https://api.github.com",
            headers={
                "Accept": "application/vnd.github+json",
                "Authorization": f"Bearer {token}",
                "X-GitHub-Api-Version": "2022-11-28",
            },
            timeout=timeout,
            event_hooks={"response": [self._raise_for_status]},
        )

    @staticmethod
    def _raise_for_status(r: httpx.Response):
        r.raise_for_status()

    def close(self) -> None:
        self._client.close()

    def __enter__(self):
        return self
    def __exit__(self, *exc):
        self.close()

    def list_repos(self, user: str, *, per_page: int = 100) -> list[dict]:
        return self._client.get(f"/users/{user}/repos", params={"per_page": per_page}).json()

with GitHubClient(token=os.environ["GITHUB_TOKEN"]) as gh:
    for r in gh.list_repos("python"):
        print(r["full_name"])

Server-Sent Events (SSE) client

python
async def sse_consume(url: str):
    async with httpx.AsyncClient(timeout=None) as client:
        async with client.stream("GET", url, headers={"Accept": "text/event-stream"}) as r:
            event = {}
            async for line in r.aiter_lines():
                if line == "":
                    if event:
                        yield event
                        event = {}
                    continue
                if line.startswith(":"):
                    continue
                k, _, v = line.partition(":")
                event[k.strip()] = v.lstrip()

Quick reference

TaskCode
One-off GEThttpx.get(url, timeout=10).json()
Sync clientwith httpx.Client(base_url=..., timeout=10) as c: ...
Async clientasync with httpx.AsyncClient(timeout=10) as c: ...
POST JSONc.post(url, json={...})
POST formc.post(url, data={...})
Upload filec.post(url, files={"f": open("a.png", "rb")})
Query paramsc.get(url, params={"q": "x"})
Headersc.get(url, headers={"Auth": "Bearer t"})
Cookiesc.get(url, cookies={"k": "v"})
Timeouthttpx.Timeout(connect=5, read=30)
Pool limitshttpx.Limits(max_connections=100, max_keepalive_connections=20)
HTTP/2httpx.Client(http2=True) (after pip install "httpx[http2]")
Stream downloadwith c.stream("GET", url) as r: r.iter_bytes()
Stream linesfor line in r.iter_lines(): ...
Async streamasync for chunk in r.aiter_bytes(): ...
Event hookshttpx.Client(event_hooks={"response": [fn]})
Bearer authhttpx.Client(auth=BearerAuth(token)) (custom httpx.Auth)
mTLS certhttpx.Client(cert=("client.crt", "client.key"))
Custom CAhttpx.Client(verify="/path/ca.pem")
Proxyhttpx.Client(proxies={"https://": "http://..."})
Mock transporthttpx.Client(transport=httpx.MockTransport(handler))
ASGI app driverhttpx.AsyncClient(transport=httpx.ASGITransport(app=app))
Raise on 4xx/5xxresp.raise_for_status()
Response.status_code / .text / .content / .json() / .http_version