cheat sheet

asyncio

Write concurrent Python code with asyncio. Covers coroutines, asyncio.run, gather, create_task, timeouts, queues, and avoiding the blocking-call pitfall.

asyncio — Async I/O

What it is

asyncio is Python's built-in library for writing single-threaded concurrent code using async/await syntax. Instead of threads or processes, it uses a cooperative event loop: a coroutine yields control when it waits for I/O (await), allowing other coroutines to run in the meantime.

It is the foundation for httpx async mode, FastAPI, and every other async Python library.

Quick example

python
import asyncio

async def greet(name: str, delay: float) -> str:
    await asyncio.sleep(delay)      # non-blocking sleep
    msg = f"Hello, {name}!"
    print(msg)
    return msg

asyncio.run(greet("Alice", 0.1))   # entry point for async code

Output:

text
Hello, Alice!

When / why to use it

  • I/O-bound work: many simultaneous network requests, database queries, or file reads.
  • Building async web servers (FastAPI, aiohttp, Starlette) or async CLI tools.
  • When threads add too much overhead or complexity for your use case.

Asyncio does not help CPU-bound code (use multiprocessing or concurrent.futures.ProcessPoolExecutor for that).

Common pitfalls

Calling blocking functions inside async code freezes the event loop. Anything that blocks a thread — time.sleep(), synchronous requests.get(), open() on a slow NFS mount — stops all coroutines until it returns. Use async equivalents (asyncio.sleep, httpx.AsyncClient, aiofiles) or run blocking calls in a thread pool: await asyncio.to_thread(blocking_fn, arg).

Forgetting await — calling a coroutine without await returns a coroutine object, not a result, and does nothing. mypy/pyright will catch this if you annotate return types.

asyncio.run() is the correct entry point for scripts. Never call it inside another async function — use await inside async functions, and reserve asyncio.run() for the outermost call.

Concurrent execution with gather

asyncio.gather() runs coroutines concurrently and returns all results when the last one finishes.

python
import asyncio
import time

async def fetch(n: int, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"result-{n}"

async def main():
    start = time.perf_counter()
    results = await asyncio.gather(
        fetch(1, 0.3),
        fetch(2, 0.2),
        fetch(3, 0.1),
    )
    elapsed = time.perf_counter() - start
    print(results)
    print(f"Done in {elapsed:.2f}s  (sequential would take ~0.60s)")

asyncio.run(main())

Output:

text
['result-1', 'result-2', 'result-3']
Done in 0.30s  (sequential would take ~0.60s)

create_task — fire and forget

create_task schedules a coroutine as a background task without awaiting it immediately.

python
import asyncio

async def background_job(name: str):
    await asyncio.sleep(0.1)
    print(f"[bg] {name} done")

async def main():
    task = asyncio.create_task(background_job("cleanup"))
    print("Main work happening...")
    await asyncio.sleep(0.05)
    print("More main work...")
    await task          # wait for it before exiting

asyncio.run(main())

Output:

text
Main work happening...
More main work...
[bg] cleanup done

If you create a task but never await it (or add it to a gather), the event loop may cancel it silently when the surrounding coroutine finishes. Always hold a reference and eventually await or cancel tasks.

Timeout

asyncio.wait_for() wraps a coroutine with a deadline and raises asyncio.TimeoutError if it doesn't complete in time. Use it to guard any I/O operation that could hang indefinitely — network calls, database queries, or anything waiting on an external resource.

python
import asyncio

async def slow_op():
    await asyncio.sleep(5)
    return "done"

async def main():
    try:
        result = await asyncio.wait_for(slow_op(), timeout=1.0)
    except asyncio.TimeoutError:
        print("Timed out after 1s")

asyncio.run(main())

Output:

text
Timed out after 1s

Run blocking code in a thread

asyncio.to_thread() (Python 3.9+) offloads a synchronous function to a thread pool so it doesn't block the event loop. Use it whenever you must call a blocking library — time.sleep, synchronous file I/O, CPU-intensive work — without rewriting it as async code.

python
import asyncio
import time

def cpu_or_blocking_work(n: int) -> int:
    time.sleep(0.1)     # simulates blocking I/O or CPU work
    return n * n

async def main():
    # asyncio.to_thread wraps a sync function so it doesn't block the loop
    results = await asyncio.gather(*[
        asyncio.to_thread(cpu_or_blocking_work, i) for i in range(5)
    ])
    print(results)

asyncio.run(main())

Output:

text
[0, 1, 4, 9, 16]

Producer / consumer with Queue

asyncio.Queue is a thread-safe, async-aware FIFO queue for passing items between coroutines. It is the standard pattern for decoupling work generation from work processing — producers put() items without blocking and consumers get() them, naturally rate-limiting if the queue reaches its maxsize.

python
import asyncio

async def producer(queue: asyncio.Queue, items: list):
    for item in items:
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(0.05)
    await queue.put(None)   # sentinel

async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"  Consumed: {item}")
        queue.task_done()

async def main():
    queue: asyncio.Queue = asyncio.Queue(maxsize=2)
    await asyncio.gather(
        producer(queue, ["a", "b", "c"]),
        consumer(queue),
    )

asyncio.run(main())

Output:

text
Produced: a
  Consumed: a
Produced: b
  Consumed: b
Produced: c
  Consumed: c

The event loop — what actually runs

The event loop is the heart of asyncio: a single-threaded scheduler that runs coroutines until they hit an await on something that isn't yet ready, then parks them and picks up another. When the awaited operation completes (a socket becomes readable, a timer fires, a subprocess exits), the loop reschedules the parked coroutine. Everything else — tasks, gather, queues — is built on top of this one loop.

python
import asyncio

async def main():
    loop = asyncio.get_running_loop()
    print("loop:", type(loop).__name__)
    print("running:", loop.is_running())

asyncio.run(main())

Output:

text
loop: _UnixSelectorEventLoop
running: True

asyncio.run() creates a new loop, runs the coroutine to completion, and closes the loop. You should rarely touch the loop object directly in modern code — asyncio.get_running_loop() is the right call inside an async function and only works while a loop is active. The deprecated asyncio.get_event_loop() will fall back to creating one on demand, which is a common source of "Event loop is closed" surprises.

On Linux, uvloop (a libuv-backed drop-in replacement) is 2–4× faster than the stdlib loop. pip install uvloop then asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) before asyncio.run(). FastAPI and aiohttp pick it up automatically when installed.

Coroutines, async def, and await

A function defined with async def is a coroutine function; calling it returns a coroutine object without running the body. The body runs only when the coroutine is awaited, passed to asyncio.run(), or wrapped in a task. await expression evaluates the expression (which must be an awaitable — a coroutine, task, or future) and suspends the current coroutine until the result is ready.

python
import asyncio

async def double(x: int) -> int:
    await asyncio.sleep(0)   # yield control once
    return x * 2

coro = double(21)            # NOT executed yet
print(type(coro).__name__)

result = asyncio.run(coro)   # now it runs
print(result)

Output:

text
coroutine
42

Three things can be awaited in asyncio:

AwaitableCreated byNotes
Coroutineasync def callOne-shot; awaiting it twice raises RuntimeError.
Taskasyncio.create_task(coro)A coroutine scheduled to run on the loop; awaitable, cancellable.
Futurelow-level (loop.create_future())Result placeholder; usually wrapped by libraries.

Tasks — create_task, gather, as_completed, wait

A task is a coroutine that the event loop has scheduled to run concurrently with whatever else is running. asyncio.create_task(coro) returns immediately with a Task object; the coroutine starts executing the next time the loop runs. Tasks are how you achieve real concurrency in asyncio — without create_task (or gather, which uses it under the hood), await coro() simply runs one coroutine to completion before starting the next.

python
import asyncio
import time

async def work(label: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"{label} done"

async def main():
    start = time.perf_counter()
    t1 = asyncio.create_task(work("A", 0.3))
    t2 = asyncio.create_task(work("B", 0.2))
    t3 = asyncio.create_task(work("C", 0.1))
    results = [await t1, await t2, await t3]
    print(results, f"in {time.perf_counter()-start:.2f}s")

asyncio.run(main())

Output:

text
['A done', 'B done', 'C done'] in 0.30s

as_completed — process results in finish order

asyncio.as_completed(awaitables) yields each awaitable as it finishes — useful when you want to start handling early results without waiting for the slowest. The yield order is non-deterministic and follows actual completion order.

python
import asyncio

async def task(n: int) -> int:
    await asyncio.sleep(0.3 - n * 0.1)
    return n

async def main():
    coros = [task(i) for i in range(3)]
    for coro in asyncio.as_completed(coros):
        result = await coro
        print(f"finished: {result}")

asyncio.run(main())

Output:

text
finished: 2
finished: 1
finished: 0

wait — first-completed or partial timeout

asyncio.wait(tasks, return_when=...) is the lower-level cousin of gather. It returns two sets — done and pending — and supports FIRST_COMPLETED, FIRST_EXCEPTION, or ALL_COMPLETED modes. Unlike gather, it does not raise on task exceptions; you must inspect each task yourself.

python
import asyncio

async def task(n: int, delay: float):
    await asyncio.sleep(delay)
    return n

async def main():
    tasks = [asyncio.create_task(task(i, 0.1 + i * 0.1)) for i in range(3)]
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    for t in done:
        print("done:", t.result())
    for t in pending:
        t.cancel()

asyncio.run(main())

Output:

text
done: 0

gather exception handling

gather(..., return_exceptions=False) (the default) raises the first exception and cancels remaining tasks. With return_exceptions=True it returns exceptions as values, letting you collect partial results.

python
import asyncio

async def good(n): return n
async def bad(): raise ValueError("nope")

async def main():
    results = await asyncio.gather(good(1), bad(), good(2), return_exceptions=True)
    for r in results:
        print(type(r).__name__, r)

asyncio.run(main())

Output:

text
int 1
ValueError nope
int 2

TaskGroup (3.11+) — structured concurrency

asyncio.TaskGroup is the modern, structured-concurrency replacement for ad-hoc create_task + manual cleanup. Tasks spawned with tg.create_task() are guaranteed to complete (or be cancelled together) before the async with block exits. If any task raises, all sibling tasks are cancelled and the errors are surfaced as an ExceptionGroup. Prefer TaskGroup over gather for any non-trivial concurrent workflow on Python 3.11+.

python
import asyncio

async def fetch(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"{name}-result"

async def main():
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(fetch("alpha", 0.1))
        t2 = tg.create_task(fetch("beta",  0.2))
        t3 = tg.create_task(fetch("gamma", 0.05))
    # All tasks have completed at this point
    print([t1.result(), t2.result(), t3.result()])

asyncio.run(main())

Output:

text
['alpha-result', 'beta-result', 'gamma-result']

If one task fails, the group cancels the rest and re-raises every error as an ExceptionGroup:

python
import asyncio

async def fail(): raise ValueError("boom")
async def slow(): await asyncio.sleep(5)

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(fail())
            tg.create_task(slow())
    except* ValueError as eg:
        for e in eg.exceptions:
            print("caught:", e)

asyncio.run(main())

Output:

text
caught: boom

The except* syntax (3.11+) matches one or more exceptions inside an ExceptionGroup.

Cancellation and CancelledError

Cancellation is cooperative: calling task.cancel() schedules a CancelledError to be raised at the task's next await. The task can catch it, run cleanup, and re-raise, or (rarely, and only for very good reason) suppress it. Never silently swallow CancelledError — doing so breaks the cancellation contract and can leave the event loop unable to shut down cleanly.

python
import asyncio

async def cancellable():
    try:
        print("started")
        await asyncio.sleep(5)
        print("never reached")
    except asyncio.CancelledError:
        print("cleaning up")
        raise           # always re-raise!

async def main():
    task = asyncio.create_task(cancellable())
    await asyncio.sleep(0.1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("confirmed cancelled")

asyncio.run(main())

Output:

text
started
cleaning up
confirmed cancelled

Shielding from cancellation

asyncio.shield(awaitable) protects a coroutine from being cancelled by an outer cancel. Useful for "must-finish" cleanup work (e.g. flushing a buffer, closing a transaction) that should complete even if the caller times out.

python
import asyncio

async def critical_flush():
    await asyncio.sleep(0.2)
    return "flushed"

async def main():
    task = asyncio.create_task(asyncio.shield(critical_flush()))
    try:
        await asyncio.wait_for(asyncio.shield(task), timeout=0.05)
    except asyncio.TimeoutError:
        print("outer timed out, but shielded task continues")
    print("final:", await task)

asyncio.run(main())

Output:

text
outer timed out, but shielded task continues
final: flushed

asyncio.timeout — context-manager timeouts (3.11+)

async with asyncio.timeout(seconds): is the modern replacement for wait_for. It applies the deadline to everything inside the block, not a single awaitable, and integrates cleanly with TaskGroup. Outside the block, the deadline is automatically cleared.

python
import asyncio

async def main():
    try:
        async with asyncio.timeout(0.1):
            await asyncio.sleep(0.05)   # fine
            await asyncio.sleep(0.5)    # raises
    except TimeoutError:
        print("block exceeded 100ms")

asyncio.run(main())

Output:

text
block exceeded 100ms

Use asyncio.timeout_at(deadline) to set an absolute deadline (loop time, not wall clock) instead of a relative one.

Async iterators and generators

An async iterator implements __aiter__ and __anext__; you consume it with async for. An async generator is the same thing built from async def + yield. They let you stream values from an awaitable source (network, database cursor, paginated API) without buffering the whole result in memory.

python
import asyncio

async def ticks(n: int):
    for i in range(n):
        await asyncio.sleep(0.05)
        yield i

async def main():
    async for value in ticks(3):
        print("got", value)

asyncio.run(main())

Output:

text
got 0
got 1
got 2

Async comprehensions work too:

python
result = [v async for v in ticks(3)]

Don't call a regular generator from inside async for — you'll get TypeError: 'async for' requires an object with __aiter__ method. Use a sync for for ordinary iterables, async for only for async ones.

Async context managers

A class with __aenter__ and __aexit__ is an async context manager, consumed with async with. Library objects like aiohttp.ClientSession, httpx.AsyncClient, and asyncpg.connect() are async context managers — you must enter them with async with to set up and tear down properly.

python
import asyncio

class AsyncResource:
    async def __aenter__(self):
        await asyncio.sleep(0.05)
        print("acquired")
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await asyncio.sleep(0.05)
        print("released")

async def main():
    async with AsyncResource() as r:
        print("using resource")

asyncio.run(main())

Output:

text
acquired
using resource
released

The @contextlib.asynccontextmanager decorator turns an async generator into the same shape with less boilerplate:

python
from contextlib import asynccontextmanager

@asynccontextmanager
async def resource():
    print("setup")
    try:
        yield "the value"
    finally:
        print("teardown")

Synchronisation primitives — Lock, Semaphore, Event, Condition

asyncio re-implements the standard locking primitives as async-aware versions. The key difference from threading: these primitives are not thread-safe — they coordinate coroutines on a single event loop, not threads.

Lock — mutual exclusion across coroutines

asyncio.Lock ensures only one coroutine holds the lock at a time. Useful when multiple coroutines mutate shared state (a counter, a list, a connection cache).

python
import asyncio

lock = asyncio.Lock()
counter = 0

async def increment(n: int):
    global counter
    async with lock:
        local = counter
        await asyncio.sleep(0)   # simulate context switch
        counter = local + n

async def main():
    await asyncio.gather(*[increment(1) for _ in range(100)])
    print("counter:", counter)

asyncio.run(main())

Output:

text
counter: 100

Semaphore — bound concurrency (rate limiting)

asyncio.Semaphore(N) lets at most N coroutines hold the semaphore at once. This is the canonical way to cap concurrent outbound HTTP requests, database connections, or any rate-limited resource.

python
import asyncio

sem = asyncio.Semaphore(3)

async def request(n: int):
    async with sem:
        print(f"request {n} running (active ≤ 3)")
        await asyncio.sleep(0.05)

async def main():
    await asyncio.gather(*[request(i) for i in range(10)])

asyncio.run(main())

Output:

text
request 0 running (active ≤ 3)
request 1 running (active ≤ 3)
request 2 running (active ≤ 3)
request 3 running (active ≤ 3)
...

Event — broadcast a one-shot signal

asyncio.Event is a one-shot flag: coroutines await event.wait() to block until someone calls event.set(). Useful for coordinating start/stop signals across many tasks.

python
import asyncio

started = asyncio.Event()

async def worker(name: str):
    print(f"{name} waiting…")
    await started.wait()
    print(f"{name} go!")

async def main():
    workers = [asyncio.create_task(worker(f"w{i}")) for i in range(3)]
    await asyncio.sleep(0.1)
    started.set()
    await asyncio.gather(*workers)

asyncio.run(main())

Output:

text
w0 waiting…
w1 waiting…
w2 waiting…
w0 go!
w1 go!
w2 go!

Async subprocess and streams

asyncio.create_subprocess_exec is the async cousin of subprocess.Popen — fire dozens of commands without blocking the loop. Each process's stdin/stdout/stderr is an async stream you can read line-by-line or in chunks. See subprocess for the blocking equivalent.

python
import asyncio

async def run(*args: str) -> str:
    proc = await asyncio.create_subprocess_exec(
        *args,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.STDOUT,
    )
    out, _ = await proc.communicate()
    return out.decode().rstrip()

async def main():
    versions = await asyncio.gather(
        run("python3", "--version"),
        run("git", "--version"),
        run("uname", "-s"),
    )
    for v in versions:
        print(v)

asyncio.run(main())

Output:

text
Python 3.12.4
git version 2.43.0
Darwin

Streaming TCP with asyncio.open_connection

asyncio.open_connection(host, port) returns a (StreamReader, StreamWriter) pair — high-level wrappers around an asyncio transport. Use them to talk plain TCP without writing a protocol class.

python
import asyncio

async def fetch_http(host: str, path: str) -> str:
    reader, writer = await asyncio.open_connection(host, 80)
    writer.write(f"GET {path} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode())
    await writer.drain()
    body = await reader.read()
    writer.close()
    await writer.wait_closed()
    return body.decode(errors="replace")

async def main():
    response = await fetch_http("example.com", "/")
    print(response.splitlines()[0])

asyncio.run(main())

Output:

text
HTTP/1.0 200 OK

Integration with async libraries

asyncio is the foundation, but real applications use higher-level libraries on top of it. Each plays the same role as a sync equivalent, with async/await calls instead of blocking ones.

SyncAsyncUse case
requestshttpx (AsyncClient)HTTP client
requestsaiohttp.ClientSessionHTTP client (older, mature)
psycopg2asyncpgPostgreSQL driver
pymysqlaiomysqlMySQL driver
redis-pyredis.asyncioRedis client
boto3aioboto3AWS SDK
open()aiofilesFile I/O (use sparingly — disks are usually fast)
time.sleepasyncio.sleepDelay
subprocess.runasyncio.create_subprocess_execSpawn process

httpx.AsyncClient example

python
import asyncio
import httpx

async def fetch(client: httpx.AsyncClient, url: str) -> int:
    r = await client.get(url, timeout=10)
    return r.status_code

async def main():
    urls = ["https://httpbin.org/get", "https://example.com", "https://python.org"]
    async with httpx.AsyncClient() as client:
        codes = await asyncio.gather(*(fetch(client, u) for u in urls))
    for u, c in zip(urls, codes):
        print(f"{c} {u}")

asyncio.run(main())

Output:

text
200 https://httpbin.org/get
200 https://example.com
200 https://python.org

asyncpg example

python
import asyncio
import asyncpg

async def main():
    conn = await asyncpg.connect("postgresql://alice@localhost/mydb")
    rows = await conn.fetch("SELECT id, name FROM users LIMIT 3")
    for row in rows:
        print(dict(row))
    await conn.close()

asyncio.run(main())

Output:

text
{'id': 1, 'name': 'Alice'}
{'id': 2, 'name': 'Bob'}
{'id': 3, 'name': 'Carol'}

asyncio vs threads vs concurrent.futures

Pick the right model for the job — they solve different problems and don't compose cleanly.

ModelGood forBad forNotes
asyncioMany I/O-bound tasks, network-heavy appsCPU-bound work; blocking librariesSingle-threaded, cooperative. Needs async libraries.
threading / ThreadPoolExecutorI/O-bound with blocking librariesCPU-bound (GIL)Pre-emptive; works with any sync code.
multiprocessing / ProcessPoolExecutorCPU-boundAnything that's not pickleableReal parallelism; serialization overhead.
asyncio.to_threadCalling one blocking function from asyncHeavy CPU workWraps ThreadPoolExecutor for async callers.
python
import asyncio
import concurrent.futures
import time

def cpu_work(n: int) -> int:
    total = 0
    for i in range(n):
        total += i * i
    return total

async def main():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        start = time.perf_counter()
        results = await asyncio.gather(*(
            loop.run_in_executor(pool, cpu_work, 1_000_000)
            for _ in range(4)
        ))
        print(f"{len(results)} results in {time.perf_counter()-start:.2f}s")

asyncio.run(main())

Output:

text
4 results in 0.18s

Anti-patterns and pitfalls

Sync-over-async — calling asyncio.run(coro()) deep inside a sync function inside a running loop will raise RuntimeError: asyncio.run() cannot be called from a running event loop. If you need to bridge, restructure your code to be async all the way, or use asyncio.run_coroutine_threadsafe(coro, loop) from a different thread.

Blocking the loop with sync I/Orequests.get, time.sleep, psycopg2.execute, even slow open() calls stop every coroutine on the loop. Use the async equivalent or wrap with asyncio.to_thread. Symptoms: heartbeats stop firing, websockets disconnect, throughput collapses.

CPU-bound work in async code — a 200 ms NumPy call blocks the loop for 200 ms. Hand it off to a ProcessPoolExecutor or move it out of the async hot path.

Floating tasksasyncio.create_task(coro()) without storing the task lets the loop garbage-collect it mid-run. Always keep a reference (in a list, set, or TaskGroup) until the task finishes.

Swallowing CancelledErrorexcept Exception: catches CancelledError in 3.7 only (in 3.8+ it inherits from BaseException, so a bare except: is the danger). Either re-raise it explicitly or use except asyncio.CancelledError: raise.

Don't use loop.run_until_complete in modern code. Use asyncio.run for scripts and asyncio.TaskGroup/asyncio.gather inside async functions.

Debug mode is your friend. asyncio.run(main(), debug=True) enables slow-callback warnings (callbacks longer than 100 ms) and logs unhandled exceptions on every task. Also set PYTHONASYNCIODEBUG=1.

Real-world recipes

Bounded fan-out — fetch N URLs with a concurrency cap

The single most common asyncio pattern: hit a few hundred URLs concurrently, but no more than 10 at a time. A Semaphore enforces the cap; gather collects every result.

python
import asyncio
import httpx

async def fetch_one(client: httpx.AsyncClient, sem: asyncio.Semaphore, url: str):
    async with sem:
        try:
            r = await client.get(url, timeout=10)
            return url, r.status_code
        except Exception as e:
            return url, f"error: {e}"

async def fetch_all(urls: list[str], concurrency: int = 10):
    sem = asyncio.Semaphore(concurrency)
    async with httpx.AsyncClient() as client:
        return await asyncio.gather(*(fetch_one(client, sem, u) for u in urls))

async def main():
    urls = [f"https://httpbin.org/status/{c}" for c in (200, 200, 404, 500, 200)]
    for url, status in await fetch_all(urls, concurrency=3):
        print(status, url)

asyncio.run(main())

Output:

text
200 https://httpbin.org/status/200
200 https://httpbin.org/status/200
404 https://httpbin.org/status/404
500 https://httpbin.org/status/500
200 https://httpbin.org/status/200

Retry with exponential back-off

A reusable retry() wrapper that catches asyncio.TimeoutError and any custom exception list, sleeps with back-off, and re-raises on the last attempt.

python
import asyncio
import random

async def with_retry(coro_fn, *args, attempts=5, base=0.1, max_delay=2.0):
    for n in range(attempts):
        try:
            return await coro_fn(*args)
        except (asyncio.TimeoutError, ConnectionError) as e:
            if n == attempts - 1:
                raise
            delay = min(base * (2 ** n), max_delay) + random.random() * 0.05
            print(f"attempt {n+1} failed ({e}); retrying in {delay:.2f}s")
            await asyncio.sleep(delay)

async def flaky(): 
    if random.random() < 0.7:
        raise ConnectionError("network blip")
    return "ok"

async def main():
    random.seed(7)
    result = await with_retry(flaky, attempts=5, base=0.05)
    print("result:", result)

asyncio.run(main())

Output:

text
attempt 1 failed (network blip); retrying in 0.05s
attempt 2 failed (network blip); retrying in 0.13s
result: ok

Heartbeat task alongside main work

Spawn a background "heartbeat" that ticks every second for liveness, run the real work in parallel, and cancel the heartbeat cleanly when the work is done.

python
import asyncio
import time

async def heartbeat(interval: float = 1.0):
    try:
        n = 0
        while True:
            await asyncio.sleep(interval)
            n += 1
            print(f"  ♥ tick {n}")
    except asyncio.CancelledError:
        print("  ♥ stopped")
        raise

async def real_work():
    await asyncio.sleep(2.5)
    return "work complete"

async def main():
    hb = asyncio.create_task(heartbeat(1.0))
    try:
        print(await real_work())
    finally:
        hb.cancel()
        try:
            await hb
        except asyncio.CancelledError:
            pass

asyncio.run(main())

Output:

text
  ♥ tick 1
  ♥ tick 2
work complete
  ♥ stopped

Producer / consumer pool with TaskGroup

A TaskGroup-managed pool: one producer, N consumers, all coordinated through an asyncio.Queue. The TaskGroup guarantees that every consumer is cancelled and joined when the producer signals completion.

python
import asyncio
import random

SENTINEL = object()

async def producer(queue: asyncio.Queue, items: int, workers: int):
    for i in range(items):
        await queue.put(i)
        await asyncio.sleep(random.uniform(0.0, 0.02))
    for _ in range(workers):
        await queue.put(SENTINEL)

async def consumer(name: str, queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is SENTINEL:
            queue.task_done()
            return
        await asyncio.sleep(0.05)            # simulate work
        print(f"{name} processed {item}")
        queue.task_done()

async def main():
    queue: asyncio.Queue = asyncio.Queue(maxsize=4)
    workers = 3
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, items=6, workers=workers))
        for i in range(workers):
            tg.create_task(consumer(f"c{i}", queue))

asyncio.run(main())

Output:

text
c0 processed 0
c1 processed 1
c2 processed 2
c0 processed 3
c1 processed 4
c2 processed 5

Periodic task with cancellation

Run a function every N seconds until cancelled — the asyncio version of setInterval. Wrap in try/finally so cleanup runs even if the loop is shut down hard.

python
import asyncio

async def every(interval: float, fn):
    while True:
        await asyncio.sleep(interval)
        try:
            await fn()
        except Exception as e:
            print(f"periodic failed: {e}")

async def ping():
    print("ping")

async def main():
    task = asyncio.create_task(every(0.1, ping))
    await asyncio.sleep(0.35)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        pass

asyncio.run(main())

Output:

text
ping
ping
ping

Quick reference

PatternCode
Entry pointasyncio.run(main())
Sleepawait asyncio.sleep(seconds)
Run concurrentlyawait asyncio.gather(coro1(), coro2())
Structured concurrency (3.11+)async with asyncio.TaskGroup() as tg: tg.create_task(coro())
Background taskt = asyncio.create_task(coro())
Timeout (single awaitable)await asyncio.wait_for(coro(), timeout=5)
Timeout (block, 3.11+)async with asyncio.timeout(5):
Shield from cancellationawait asyncio.shield(coro())
Run blocking fnawait asyncio.to_thread(fn, arg)
Run CPU workawait loop.run_in_executor(pool, fn, arg)
Current event loopasyncio.get_running_loop()
Queueasyncio.Queue(maxsize=N)
Semaphore (limit concurrency)asyncio.Semaphore(10)
Lockasync with asyncio.Lock():
Eventasyncio.Event(), .set(), await .wait()
Async iteratorasync for x in stream:
Async context managerasync with resource() as r:
As completedfor c in asyncio.as_completed(coros): await c
First doneawait asyncio.wait(coros, return_when=FIRST_COMPLETED)
Subprocessawait asyncio.create_subprocess_exec(*args)
Open TCPawait asyncio.open_connection(host, port)

See also

  • subprocess — blocking process spawning; async equivalent is create_subprocess_exec
  • httpx — async HTTP client built on asyncio
  • fastapi — async web framework; every endpoint is a coroutine
  • tqdmtqdm.asyncio.tqdm.gather for progress bars on async work