cheat sheet
asyncio
Write concurrent Python code with asyncio. Covers coroutines, asyncio.run, gather, create_task, timeouts, queues, and avoiding the blocking-call pitfall.
asyncio — Async I/O
What it is
asyncio is Python's built-in library for writing single-threaded concurrent code using async/await syntax. Instead of threads or processes, it uses a cooperative event loop: a coroutine yields control when it waits for I/O (await), allowing other coroutines to run in the meantime.
It is the foundation for httpx async mode, FastAPI, and every other async Python library.
Quick example
import asyncio
async def greet(name: str, delay: float) -> str:
await asyncio.sleep(delay) # non-blocking sleep
msg = f"Hello, {name}!"
print(msg)
return msg
asyncio.run(greet("Alice", 0.1)) # entry point for async code
Output:
Hello, Alice!
When / why to use it
- I/O-bound work: many simultaneous network requests, database queries, or file reads.
- Building async web servers (FastAPI, aiohttp, Starlette) or async CLI tools.
- When threads add too much overhead or complexity for your use case.
Asyncio does not help CPU-bound code (use multiprocessing or concurrent.futures.ProcessPoolExecutor for that).
Common pitfalls
Calling blocking functions inside async code freezes the event loop. Anything that blocks a thread —
time.sleep(), synchronousrequests.get(),open()on a slow NFS mount — stops all coroutines until it returns. Use async equivalents (asyncio.sleep,httpx.AsyncClient,aiofiles) or run blocking calls in a thread pool:await asyncio.to_thread(blocking_fn, arg).
Forgetting
await— calling a coroutine withoutawaitreturns a coroutine object, not a result, and does nothing. mypy/pyright will catch this if you annotate return types.
asyncio.run()is the correct entry point for scripts. Never call it inside another async function — useawaitinside async functions, and reserveasyncio.run()for the outermost call.
Concurrent execution with gather
asyncio.gather() runs coroutines concurrently and returns all results when the last one finishes.
import asyncio
import time
async def fetch(n: int, delay: float) -> str:
await asyncio.sleep(delay)
return f"result-{n}"
async def main():
start = time.perf_counter()
results = await asyncio.gather(
fetch(1, 0.3),
fetch(2, 0.2),
fetch(3, 0.1),
)
elapsed = time.perf_counter() - start
print(results)
print(f"Done in {elapsed:.2f}s (sequential would take ~0.60s)")
asyncio.run(main())
Output:
['result-1', 'result-2', 'result-3']
Done in 0.30s (sequential would take ~0.60s)
create_task — fire and forget
create_task schedules a coroutine as a background task without awaiting it immediately.
import asyncio
async def background_job(name: str):
await asyncio.sleep(0.1)
print(f"[bg] {name} done")
async def main():
task = asyncio.create_task(background_job("cleanup"))
print("Main work happening...")
await asyncio.sleep(0.05)
print("More main work...")
await task # wait for it before exiting
asyncio.run(main())
Output:
Main work happening...
More main work...
[bg] cleanup done
If you create a task but never await it (or add it to a gather), the event loop may cancel it silently when the surrounding coroutine finishes. Always hold a reference and eventually await or cancel tasks.
Timeout
asyncio.wait_for() wraps a coroutine with a deadline and raises asyncio.TimeoutError if it doesn't complete in time. Use it to guard any I/O operation that could hang indefinitely — network calls, database queries, or anything waiting on an external resource.
import asyncio
async def slow_op():
await asyncio.sleep(5)
return "done"
async def main():
try:
result = await asyncio.wait_for(slow_op(), timeout=1.0)
except asyncio.TimeoutError:
print("Timed out after 1s")
asyncio.run(main())
Output:
Timed out after 1s
Run blocking code in a thread
asyncio.to_thread() (Python 3.9+) offloads a synchronous function to a thread pool so it doesn't block the event loop. Use it whenever you must call a blocking library — time.sleep, synchronous file I/O, CPU-intensive work — without rewriting it as async code.
import asyncio
import time
def cpu_or_blocking_work(n: int) -> int:
time.sleep(0.1) # simulates blocking I/O or CPU work
return n * n
async def main():
# asyncio.to_thread wraps a sync function so it doesn't block the loop
results = await asyncio.gather(*[
asyncio.to_thread(cpu_or_blocking_work, i) for i in range(5)
])
print(results)
asyncio.run(main())
Output:
[0, 1, 4, 9, 16]
Producer / consumer with Queue
asyncio.Queue is a thread-safe, async-aware FIFO queue for passing items between coroutines. It is the standard pattern for decoupling work generation from work processing — producers put() items without blocking and consumers get() them, naturally rate-limiting if the queue reaches its maxsize.
import asyncio
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(0.05)
await queue.put(None) # sentinel
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f" Consumed: {item}")
queue.task_done()
async def main():
queue: asyncio.Queue = asyncio.Queue(maxsize=2)
await asyncio.gather(
producer(queue, ["a", "b", "c"]),
consumer(queue),
)
asyncio.run(main())
Output:
Produced: a
Consumed: a
Produced: b
Consumed: b
Produced: c
Consumed: c
The event loop — what actually runs
The event loop is the heart of asyncio: a single-threaded scheduler that runs coroutines until they hit an await on something that isn't yet ready, then parks them and picks up another. When the awaited operation completes (a socket becomes readable, a timer fires, a subprocess exits), the loop reschedules the parked coroutine. Everything else — tasks, gather, queues — is built on top of this one loop.
import asyncio
async def main():
loop = asyncio.get_running_loop()
print("loop:", type(loop).__name__)
print("running:", loop.is_running())
asyncio.run(main())
Output:
loop: _UnixSelectorEventLoop
running: True
asyncio.run() creates a new loop, runs the coroutine to completion, and closes the loop. You should rarely touch the loop object directly in modern code — asyncio.get_running_loop() is the right call inside an async function and only works while a loop is active. The deprecated asyncio.get_event_loop() will fall back to creating one on demand, which is a common source of "Event loop is closed" surprises.
On Linux,
uvloop(a libuv-backed drop-in replacement) is 2–4× faster than the stdlib loop.pip install uvloopthenasyncio.set_event_loop_policy(uvloop.EventLoopPolicy())beforeasyncio.run(). FastAPI and aiohttp pick it up automatically when installed.
Coroutines, async def, and await
A function defined with async def is a coroutine function; calling it returns a coroutine object without running the body. The body runs only when the coroutine is awaited, passed to asyncio.run(), or wrapped in a task. await expression evaluates the expression (which must be an awaitable — a coroutine, task, or future) and suspends the current coroutine until the result is ready.
import asyncio
async def double(x: int) -> int:
await asyncio.sleep(0) # yield control once
return x * 2
coro = double(21) # NOT executed yet
print(type(coro).__name__)
result = asyncio.run(coro) # now it runs
print(result)
Output:
coroutine
42
Three things can be awaited in asyncio:
| Awaitable | Created by | Notes |
|---|---|---|
| Coroutine | async def call | One-shot; awaiting it twice raises RuntimeError. |
| Task | asyncio.create_task(coro) | A coroutine scheduled to run on the loop; awaitable, cancellable. |
| Future | low-level (loop.create_future()) | Result placeholder; usually wrapped by libraries. |
Tasks — create_task, gather, as_completed, wait
A task is a coroutine that the event loop has scheduled to run concurrently with whatever else is running. asyncio.create_task(coro) returns immediately with a Task object; the coroutine starts executing the next time the loop runs. Tasks are how you achieve real concurrency in asyncio — without create_task (or gather, which uses it under the hood), await coro() simply runs one coroutine to completion before starting the next.
import asyncio
import time
async def work(label: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{label} done"
async def main():
start = time.perf_counter()
t1 = asyncio.create_task(work("A", 0.3))
t2 = asyncio.create_task(work("B", 0.2))
t3 = asyncio.create_task(work("C", 0.1))
results = [await t1, await t2, await t3]
print(results, f"in {time.perf_counter()-start:.2f}s")
asyncio.run(main())
Output:
['A done', 'B done', 'C done'] in 0.30s
as_completed — process results in finish order
asyncio.as_completed(awaitables) yields each awaitable as it finishes — useful when you want to start handling early results without waiting for the slowest. The yield order is non-deterministic and follows actual completion order.
import asyncio
async def task(n: int) -> int:
await asyncio.sleep(0.3 - n * 0.1)
return n
async def main():
coros = [task(i) for i in range(3)]
for coro in asyncio.as_completed(coros):
result = await coro
print(f"finished: {result}")
asyncio.run(main())
Output:
finished: 2
finished: 1
finished: 0
wait — first-completed or partial timeout
asyncio.wait(tasks, return_when=...) is the lower-level cousin of gather. It returns two sets — done and pending — and supports FIRST_COMPLETED, FIRST_EXCEPTION, or ALL_COMPLETED modes. Unlike gather, it does not raise on task exceptions; you must inspect each task yourself.
import asyncio
async def task(n: int, delay: float):
await asyncio.sleep(delay)
return n
async def main():
tasks = [asyncio.create_task(task(i, 0.1 + i * 0.1)) for i in range(3)]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for t in done:
print("done:", t.result())
for t in pending:
t.cancel()
asyncio.run(main())
Output:
done: 0
gather exception handling
gather(..., return_exceptions=False) (the default) raises the first exception and cancels remaining tasks. With return_exceptions=True it returns exceptions as values, letting you collect partial results.
import asyncio
async def good(n): return n
async def bad(): raise ValueError("nope")
async def main():
results = await asyncio.gather(good(1), bad(), good(2), return_exceptions=True)
for r in results:
print(type(r).__name__, r)
asyncio.run(main())
Output:
int 1
ValueError nope
int 2
TaskGroup (3.11+) — structured concurrency
asyncio.TaskGroup is the modern, structured-concurrency replacement for ad-hoc create_task + manual cleanup. Tasks spawned with tg.create_task() are guaranteed to complete (or be cancelled together) before the async with block exits. If any task raises, all sibling tasks are cancelled and the errors are surfaced as an ExceptionGroup. Prefer TaskGroup over gather for any non-trivial concurrent workflow on Python 3.11+.
import asyncio
async def fetch(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name}-result"
async def main():
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch("alpha", 0.1))
t2 = tg.create_task(fetch("beta", 0.2))
t3 = tg.create_task(fetch("gamma", 0.05))
# All tasks have completed at this point
print([t1.result(), t2.result(), t3.result()])
asyncio.run(main())
Output:
['alpha-result', 'beta-result', 'gamma-result']
If one task fails, the group cancels the rest and re-raises every error as an ExceptionGroup:
import asyncio
async def fail(): raise ValueError("boom")
async def slow(): await asyncio.sleep(5)
async def main():
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(fail())
tg.create_task(slow())
except* ValueError as eg:
for e in eg.exceptions:
print("caught:", e)
asyncio.run(main())
Output:
caught: boom
The except* syntax (3.11+) matches one or more exceptions inside an ExceptionGroup.
Cancellation and CancelledError
Cancellation is cooperative: calling task.cancel() schedules a CancelledError to be raised at the task's next await. The task can catch it, run cleanup, and re-raise, or (rarely, and only for very good reason) suppress it. Never silently swallow CancelledError — doing so breaks the cancellation contract and can leave the event loop unable to shut down cleanly.
import asyncio
async def cancellable():
try:
print("started")
await asyncio.sleep(5)
print("never reached")
except asyncio.CancelledError:
print("cleaning up")
raise # always re-raise!
async def main():
task = asyncio.create_task(cancellable())
await asyncio.sleep(0.1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("confirmed cancelled")
asyncio.run(main())
Output:
started
cleaning up
confirmed cancelled
Shielding from cancellation
asyncio.shield(awaitable) protects a coroutine from being cancelled by an outer cancel. Useful for "must-finish" cleanup work (e.g. flushing a buffer, closing a transaction) that should complete even if the caller times out.
import asyncio
async def critical_flush():
await asyncio.sleep(0.2)
return "flushed"
async def main():
task = asyncio.create_task(asyncio.shield(critical_flush()))
try:
await asyncio.wait_for(asyncio.shield(task), timeout=0.05)
except asyncio.TimeoutError:
print("outer timed out, but shielded task continues")
print("final:", await task)
asyncio.run(main())
Output:
outer timed out, but shielded task continues
final: flushed
asyncio.timeout — context-manager timeouts (3.11+)
async with asyncio.timeout(seconds): is the modern replacement for wait_for. It applies the deadline to everything inside the block, not a single awaitable, and integrates cleanly with TaskGroup. Outside the block, the deadline is automatically cleared.
import asyncio
async def main():
try:
async with asyncio.timeout(0.1):
await asyncio.sleep(0.05) # fine
await asyncio.sleep(0.5) # raises
except TimeoutError:
print("block exceeded 100ms")
asyncio.run(main())
Output:
block exceeded 100ms
Use asyncio.timeout_at(deadline) to set an absolute deadline (loop time, not wall clock) instead of a relative one.
Async iterators and generators
An async iterator implements __aiter__ and __anext__; you consume it with async for. An async generator is the same thing built from async def + yield. They let you stream values from an awaitable source (network, database cursor, paginated API) without buffering the whole result in memory.
import asyncio
async def ticks(n: int):
for i in range(n):
await asyncio.sleep(0.05)
yield i
async def main():
async for value in ticks(3):
print("got", value)
asyncio.run(main())
Output:
got 0
got 1
got 2
Async comprehensions work too:
result = [v async for v in ticks(3)]
Don't call a regular generator from inside
async for— you'll getTypeError: 'async for' requires an object with __aiter__ method. Use a syncforfor ordinary iterables,async foronly for async ones.
Async context managers
A class with __aenter__ and __aexit__ is an async context manager, consumed with async with. Library objects like aiohttp.ClientSession, httpx.AsyncClient, and asyncpg.connect() are async context managers — you must enter them with async with to set up and tear down properly.
import asyncio
class AsyncResource:
async def __aenter__(self):
await asyncio.sleep(0.05)
print("acquired")
return self
async def __aexit__(self, exc_type, exc, tb):
await asyncio.sleep(0.05)
print("released")
async def main():
async with AsyncResource() as r:
print("using resource")
asyncio.run(main())
Output:
acquired
using resource
released
The @contextlib.asynccontextmanager decorator turns an async generator into the same shape with less boilerplate:
from contextlib import asynccontextmanager
@asynccontextmanager
async def resource():
print("setup")
try:
yield "the value"
finally:
print("teardown")
Synchronisation primitives — Lock, Semaphore, Event, Condition
asyncio re-implements the standard locking primitives as async-aware versions. The key difference from threading: these primitives are not thread-safe — they coordinate coroutines on a single event loop, not threads.
Lock — mutual exclusion across coroutines
asyncio.Lock ensures only one coroutine holds the lock at a time. Useful when multiple coroutines mutate shared state (a counter, a list, a connection cache).
import asyncio
lock = asyncio.Lock()
counter = 0
async def increment(n: int):
global counter
async with lock:
local = counter
await asyncio.sleep(0) # simulate context switch
counter = local + n
async def main():
await asyncio.gather(*[increment(1) for _ in range(100)])
print("counter:", counter)
asyncio.run(main())
Output:
counter: 100
Semaphore — bound concurrency (rate limiting)
asyncio.Semaphore(N) lets at most N coroutines hold the semaphore at once. This is the canonical way to cap concurrent outbound HTTP requests, database connections, or any rate-limited resource.
import asyncio
sem = asyncio.Semaphore(3)
async def request(n: int):
async with sem:
print(f"request {n} running (active ≤ 3)")
await asyncio.sleep(0.05)
async def main():
await asyncio.gather(*[request(i) for i in range(10)])
asyncio.run(main())
Output:
request 0 running (active ≤ 3)
request 1 running (active ≤ 3)
request 2 running (active ≤ 3)
request 3 running (active ≤ 3)
...
Event — broadcast a one-shot signal
asyncio.Event is a one-shot flag: coroutines await event.wait() to block until someone calls event.set(). Useful for coordinating start/stop signals across many tasks.
import asyncio
started = asyncio.Event()
async def worker(name: str):
print(f"{name} waiting…")
await started.wait()
print(f"{name} go!")
async def main():
workers = [asyncio.create_task(worker(f"w{i}")) for i in range(3)]
await asyncio.sleep(0.1)
started.set()
await asyncio.gather(*workers)
asyncio.run(main())
Output:
w0 waiting…
w1 waiting…
w2 waiting…
w0 go!
w1 go!
w2 go!
Async subprocess and streams
asyncio.create_subprocess_exec is the async cousin of subprocess.Popen — fire dozens of commands without blocking the loop. Each process's stdin/stdout/stderr is an async stream you can read line-by-line or in chunks. See subprocess for the blocking equivalent.
import asyncio
async def run(*args: str) -> str:
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
)
out, _ = await proc.communicate()
return out.decode().rstrip()
async def main():
versions = await asyncio.gather(
run("python3", "--version"),
run("git", "--version"),
run("uname", "-s"),
)
for v in versions:
print(v)
asyncio.run(main())
Output:
Python 3.12.4
git version 2.43.0
Darwin
Streaming TCP with asyncio.open_connection
asyncio.open_connection(host, port) returns a (StreamReader, StreamWriter) pair — high-level wrappers around an asyncio transport. Use them to talk plain TCP without writing a protocol class.
import asyncio
async def fetch_http(host: str, path: str) -> str:
reader, writer = await asyncio.open_connection(host, 80)
writer.write(f"GET {path} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode())
await writer.drain()
body = await reader.read()
writer.close()
await writer.wait_closed()
return body.decode(errors="replace")
async def main():
response = await fetch_http("example.com", "/")
print(response.splitlines()[0])
asyncio.run(main())
Output:
HTTP/1.0 200 OK
Integration with async libraries
asyncio is the foundation, but real applications use higher-level libraries on top of it. Each plays the same role as a sync equivalent, with async/await calls instead of blocking ones.
| Sync | Async | Use case |
|---|---|---|
requests | httpx (AsyncClient) | HTTP client |
requests | aiohttp.ClientSession | HTTP client (older, mature) |
psycopg2 | asyncpg | PostgreSQL driver |
pymysql | aiomysql | MySQL driver |
redis-py | redis.asyncio | Redis client |
boto3 | aioboto3 | AWS SDK |
open() | aiofiles | File I/O (use sparingly — disks are usually fast) |
time.sleep | asyncio.sleep | Delay |
subprocess.run | asyncio.create_subprocess_exec | Spawn process |
httpx.AsyncClient example
import asyncio
import httpx
async def fetch(client: httpx.AsyncClient, url: str) -> int:
r = await client.get(url, timeout=10)
return r.status_code
async def main():
urls = ["https://httpbin.org/get", "https://example.com", "https://python.org"]
async with httpx.AsyncClient() as client:
codes = await asyncio.gather(*(fetch(client, u) for u in urls))
for u, c in zip(urls, codes):
print(f"{c} {u}")
asyncio.run(main())
Output:
200 https://httpbin.org/get
200 https://example.com
200 https://python.org
asyncpg example
import asyncio
import asyncpg
async def main():
conn = await asyncpg.connect("postgresql://alice@localhost/mydb")
rows = await conn.fetch("SELECT id, name FROM users LIMIT 3")
for row in rows:
print(dict(row))
await conn.close()
asyncio.run(main())
Output:
{'id': 1, 'name': 'Alice'}
{'id': 2, 'name': 'Bob'}
{'id': 3, 'name': 'Carol'}
asyncio vs threads vs concurrent.futures
Pick the right model for the job — they solve different problems and don't compose cleanly.
| Model | Good for | Bad for | Notes |
|---|---|---|---|
asyncio | Many I/O-bound tasks, network-heavy apps | CPU-bound work; blocking libraries | Single-threaded, cooperative. Needs async libraries. |
threading / ThreadPoolExecutor | I/O-bound with blocking libraries | CPU-bound (GIL) | Pre-emptive; works with any sync code. |
multiprocessing / ProcessPoolExecutor | CPU-bound | Anything that's not pickleable | Real parallelism; serialization overhead. |
asyncio.to_thread | Calling one blocking function from async | Heavy CPU work | Wraps ThreadPoolExecutor for async callers. |
import asyncio
import concurrent.futures
import time
def cpu_work(n: int) -> int:
total = 0
for i in range(n):
total += i * i
return total
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
start = time.perf_counter()
results = await asyncio.gather(*(
loop.run_in_executor(pool, cpu_work, 1_000_000)
for _ in range(4)
))
print(f"{len(results)} results in {time.perf_counter()-start:.2f}s")
asyncio.run(main())
Output:
4 results in 0.18s
Anti-patterns and pitfalls
Sync-over-async — calling
asyncio.run(coro())deep inside a sync function inside a running loop will raiseRuntimeError: asyncio.run() cannot be called from a running event loop. If you need to bridge, restructure your code to be async all the way, or useasyncio.run_coroutine_threadsafe(coro, loop)from a different thread.
Blocking the loop with sync I/O —
requests.get,time.sleep,psycopg2.execute, even slowopen()calls stop every coroutine on the loop. Use the async equivalent or wrap withasyncio.to_thread. Symptoms: heartbeats stop firing, websockets disconnect, throughput collapses.
CPU-bound work in async code — a 200 ms NumPy call blocks the loop for 200 ms. Hand it off to a
ProcessPoolExecutoror move it out of the async hot path.
Floating tasks —
asyncio.create_task(coro())without storing the task lets the loop garbage-collect it mid-run. Always keep a reference (in a list, set, orTaskGroup) until the task finishes.
Swallowing
CancelledError—except Exception:catchesCancelledErrorin 3.7 only (in 3.8+ it inherits fromBaseException, so a bareexcept:is the danger). Either re-raise it explicitly or useexcept asyncio.CancelledError: raise.
Don't use
loop.run_until_completein modern code. Useasyncio.runfor scripts andasyncio.TaskGroup/asyncio.gatherinside async functions.
Debug mode is your friend.
asyncio.run(main(), debug=True)enables slow-callback warnings (callbacks longer than 100 ms) and logs unhandled exceptions on every task. Also setPYTHONASYNCIODEBUG=1.
Real-world recipes
Bounded fan-out — fetch N URLs with a concurrency cap
The single most common asyncio pattern: hit a few hundred URLs concurrently, but no more than 10 at a time. A Semaphore enforces the cap; gather collects every result.
import asyncio
import httpx
async def fetch_one(client: httpx.AsyncClient, sem: asyncio.Semaphore, url: str):
async with sem:
try:
r = await client.get(url, timeout=10)
return url, r.status_code
except Exception as e:
return url, f"error: {e}"
async def fetch_all(urls: list[str], concurrency: int = 10):
sem = asyncio.Semaphore(concurrency)
async with httpx.AsyncClient() as client:
return await asyncio.gather(*(fetch_one(client, sem, u) for u in urls))
async def main():
urls = [f"https://httpbin.org/status/{c}" for c in (200, 200, 404, 500, 200)]
for url, status in await fetch_all(urls, concurrency=3):
print(status, url)
asyncio.run(main())
Output:
200 https://httpbin.org/status/200
200 https://httpbin.org/status/200
404 https://httpbin.org/status/404
500 https://httpbin.org/status/500
200 https://httpbin.org/status/200
Retry with exponential back-off
A reusable retry() wrapper that catches asyncio.TimeoutError and any custom exception list, sleeps with back-off, and re-raises on the last attempt.
import asyncio
import random
async def with_retry(coro_fn, *args, attempts=5, base=0.1, max_delay=2.0):
for n in range(attempts):
try:
return await coro_fn(*args)
except (asyncio.TimeoutError, ConnectionError) as e:
if n == attempts - 1:
raise
delay = min(base * (2 ** n), max_delay) + random.random() * 0.05
print(f"attempt {n+1} failed ({e}); retrying in {delay:.2f}s")
await asyncio.sleep(delay)
async def flaky():
if random.random() < 0.7:
raise ConnectionError("network blip")
return "ok"
async def main():
random.seed(7)
result = await with_retry(flaky, attempts=5, base=0.05)
print("result:", result)
asyncio.run(main())
Output:
attempt 1 failed (network blip); retrying in 0.05s
attempt 2 failed (network blip); retrying in 0.13s
result: ok
Heartbeat task alongside main work
Spawn a background "heartbeat" that ticks every second for liveness, run the real work in parallel, and cancel the heartbeat cleanly when the work is done.
import asyncio
import time
async def heartbeat(interval: float = 1.0):
try:
n = 0
while True:
await asyncio.sleep(interval)
n += 1
print(f" ♥ tick {n}")
except asyncio.CancelledError:
print(" ♥ stopped")
raise
async def real_work():
await asyncio.sleep(2.5)
return "work complete"
async def main():
hb = asyncio.create_task(heartbeat(1.0))
try:
print(await real_work())
finally:
hb.cancel()
try:
await hb
except asyncio.CancelledError:
pass
asyncio.run(main())
Output:
♥ tick 1
♥ tick 2
work complete
♥ stopped
Producer / consumer pool with TaskGroup
A TaskGroup-managed pool: one producer, N consumers, all coordinated through an asyncio.Queue. The TaskGroup guarantees that every consumer is cancelled and joined when the producer signals completion.
import asyncio
import random
SENTINEL = object()
async def producer(queue: asyncio.Queue, items: int, workers: int):
for i in range(items):
await queue.put(i)
await asyncio.sleep(random.uniform(0.0, 0.02))
for _ in range(workers):
await queue.put(SENTINEL)
async def consumer(name: str, queue: asyncio.Queue):
while True:
item = await queue.get()
if item is SENTINEL:
queue.task_done()
return
await asyncio.sleep(0.05) # simulate work
print(f"{name} processed {item}")
queue.task_done()
async def main():
queue: asyncio.Queue = asyncio.Queue(maxsize=4)
workers = 3
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(queue, items=6, workers=workers))
for i in range(workers):
tg.create_task(consumer(f"c{i}", queue))
asyncio.run(main())
Output:
c0 processed 0
c1 processed 1
c2 processed 2
c0 processed 3
c1 processed 4
c2 processed 5
Periodic task with cancellation
Run a function every N seconds until cancelled — the asyncio version of setInterval. Wrap in try/finally so cleanup runs even if the loop is shut down hard.
import asyncio
async def every(interval: float, fn):
while True:
await asyncio.sleep(interval)
try:
await fn()
except Exception as e:
print(f"periodic failed: {e}")
async def ping():
print("ping")
async def main():
task = asyncio.create_task(every(0.1, ping))
await asyncio.sleep(0.35)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
asyncio.run(main())
Output:
ping
ping
ping
Quick reference
| Pattern | Code |
|---|---|
| Entry point | asyncio.run(main()) |
| Sleep | await asyncio.sleep(seconds) |
| Run concurrently | await asyncio.gather(coro1(), coro2()) |
| Structured concurrency (3.11+) | async with asyncio.TaskGroup() as tg: tg.create_task(coro()) |
| Background task | t = asyncio.create_task(coro()) |
| Timeout (single awaitable) | await asyncio.wait_for(coro(), timeout=5) |
| Timeout (block, 3.11+) | async with asyncio.timeout(5): |
| Shield from cancellation | await asyncio.shield(coro()) |
| Run blocking fn | await asyncio.to_thread(fn, arg) |
| Run CPU work | await loop.run_in_executor(pool, fn, arg) |
| Current event loop | asyncio.get_running_loop() |
| Queue | asyncio.Queue(maxsize=N) |
| Semaphore (limit concurrency) | asyncio.Semaphore(10) |
| Lock | async with asyncio.Lock(): |
| Event | asyncio.Event(), .set(), await .wait() |
| Async iterator | async for x in stream: |
| Async context manager | async with resource() as r: |
| As completed | for c in asyncio.as_completed(coros): await c |
| First done | await asyncio.wait(coros, return_when=FIRST_COMPLETED) |
| Subprocess | await asyncio.create_subprocess_exec(*args) |
| Open TCP | await asyncio.open_connection(host, port) |
See also
- subprocess — blocking process spawning; async equivalent is
create_subprocess_exec - httpx — async HTTP client built on asyncio
- fastapi — async web framework; every endpoint is a coroutine
- tqdm —
tqdm.asyncio.tqdm.gatherfor progress bars on async work