cheat sheet

prefect

Build, schedule, and observe Python workflows with Prefect. Covers flows, tasks, retries, schedules, deployments, caching, concurrency, and Prefect Cloud.

prefect — Workflow Orchestration

What it is

Prefect is a Python workflow orchestration framework where you decorate ordinary Python functions with @flow and @task to get scheduling, retries, caching, logging, observability, and a UI — all with minimal boilerplate. Unlike Dagster's asset-centric model, Prefect is function-centric: you write imperative Python that calls tasks, and Prefect tracks the execution graph at runtime. It is the most approachable orchestrator for Python developers who want to "add orchestration" to existing scripts with minimal refactoring.

Install

bash
pip install prefect

Output: (none — exits 0 on success)

Quick example

python
from prefect import flow, task

@task
def fetch_data(url: str) -> list[dict]:
    import httpx
    return httpx.get(url).json()

@task
def process(records: list[dict]) -> list[dict]:
    return [r for r in records if r.get("active")]

@flow(name="My Pipeline")
def pipeline(url: str) -> list[dict]:
    raw = fetch_data(url)
    return process(raw)

if __name__ == "__main__":
    result = pipeline("https://httpbin.org/json")
    print(result)

Output:

text
15:42:01.234 | INFO    | prefect.engine - Created flow run 'nimble-falcon' for flow 'My Pipeline'
15:42:01.401 | INFO    | Task run 'fetch_data' - Finished in state Completed()
15:42:01.412 | INFO    | Task run 'process' - Finished in state Completed()
15:42:01.420 | INFO    | Flow run 'nimble-falcon' - Finished in state Completed()
[]

When / why to use it

  • Adding retries, logging, and scheduling to existing Python scripts with two decorators.
  • ETL pipelines that need caching (skip re-running tasks whose inputs haven't changed).
  • Data workflows that run on a cron or are triggered by an event.
  • Teams that want a UI (local or Prefect Cloud) to monitor runs without setting up Airflow.
  • Concurrent fan-out: submit dozens of tasks in parallel and collect results with wait().

Common pitfalls

Mutating state across tasks — tasks should be pure functions. Passing mutable objects between tasks and modifying them in place breaks Prefect's state tracking and caching. Return new objects instead.

@task functions called outside a @flow — calling a @task function outside a flow context runs it as a plain Python function with no orchestration. Always call tasks from within a flow.

Large return values — Prefect serialises task results to its result storage (local or cloud). Returning a 2 GB DataFrame from a task causes slow serialisation and potential OOM. Use files or external storage and return a path/URI instead.

task.submit() submits a task asynchronously and returns a PrefectFuture. Collect results with .result(). This is the idiomatic way to parallelise tasks in a flow.

Use flow.serve() to run a long-lived deployment locally without Prefect Cloud or a separate agent: pipeline.serve(name="local-deploy", cron="0 6 * * *").

Retries and timeouts

Prefect handles retries and timeouts at both the task and flow level with decorator parameters.

python
from prefect import flow, task
from datetime import timedelta

@task(
    retries=3,
    retry_delay_seconds=10,
    timeout_seconds=30,
    log_prints=True,
)
def unreliable_api_call(endpoint: str) -> dict:
    import httpx, random
    if random.random() < 0.5:
        raise ConnectionError("Simulated network failure")
    print(f"Called {endpoint} successfully")
    return {"status": "ok"}

@flow(
    retries=1,
    timeout_seconds=120,
    log_prints=True,
)
def robust_pipeline():
    result = unreliable_api_call("https://api.example.com/data")
    print(f"Result: {result}")

robust_pipeline()

Output (with retry):

text
INFO | Task run 'unreliable_api_call' - Retrying in 10.0 seconds...
INFO | Task run 'unreliable_api_call' - Called https://api.example.com/data successfully
INFO | Task run 'unreliable_api_call' - Finished in state Completed()
INFO | Result: {'status': 'ok'}

Caching — skip re-running unchanged tasks

The cache_key_fn parameter tells Prefect when two task calls are equivalent and the cached result can be reused. task_input_hash uses the function name and all input arguments as the cache key.

python
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import time

@task(
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),
    log_prints=True,
)
def expensive_query(sql: str) -> list[dict]:
    print(f"Running query: {sql}")
    time.sleep(2)  # simulate slow DB call
    return [{"result": 42}]

@flow
def pipeline():
    # First call — runs the query
    r1 = expensive_query("SELECT * FROM big_table")
    # Second call with same input — returns cached result instantly
    r2 = expensive_query("SELECT * FROM big_table")
    print(r1, r2)

pipeline()

Output:

text
INFO | Running query: SELECT * FROM big_table   ← runs once
INFO | [{'result': 42}] [{'result': 42}]

Parallel task submission

task.submit() returns a PrefectFuture immediately without blocking. Collect all futures at the end with wait() or iterate to call .result().

python
from prefect import flow, task
from prefect.futures import wait
import time

@task(log_prints=True)
def process_item(item: int) -> int:
    time.sleep(0.5)
    return item ** 2

@flow
def parallel_pipeline(items: list[int]) -> list[int]:
    futures = [process_item.submit(item) for item in items]
    wait(futures)
    return [f.result() for f in futures]

result = parallel_pipeline(list(range(10)))
print(result)

Output:

text
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Subflows — flows calling flows

Flows can call other flows. The sub-flow runs as a nested tracked unit with its own run ID and logs.

python
from prefect import flow, task

@task
def validate(data: list) -> bool:
    return len(data) > 0

@flow(name="transform")
def transform_flow(data: list) -> list:
    if validate(data):
        return [x * 2 for x in data]
    return []

@flow(name="main")
def main_flow():
    raw = [1, 2, 3, 4, 5]
    result = transform_flow(raw)   # sub-flow call
    print(f"Transformed: {result}")

main_flow()

Output:

text
INFO | Created flow run 'crimson-hawk' for flow 'main'
INFO | Created subflow run for flow 'transform'
INFO | Transformed: [2, 4, 6, 8, 10]

Schedules

Attach a cron or interval schedule to a flow at serve() or deploy() time.

python
from prefect import flow
from prefect.schedules import CronSchedule, IntervalSchedule
from datetime import timedelta

@flow(log_prints=True)
def daily_report():
    print("Generating daily report...")

# Local serve — runs the flow on schedule in the foreground process
if __name__ == "__main__":
    daily_report.serve(
        name="daily-report",
        cron="0 8 * * *",            # 8 AM UTC daily
        timezone="America/New_York",
    )

For interval scheduling:

python
daily_report.serve(
    name="every-6h",
    interval=timedelta(hours=6),
)

Deployments — production scheduling

A deployment registers a flow with the Prefect server so it can be scheduled, triggered via API, or run by a worker.

python
# deploy.py
from prefect import flow

@flow
def my_pipeline(env: str = "prod"):
    print(f"Running in {env}")

if __name__ == "__main__":
    my_pipeline.deploy(
        name="prod-pipeline",
        work_pool_name="local-process-pool",
        cron="0 6 * * *",
        parameters={"env": "prod"},
    )
bash
# Create a local work pool, then run the worker
prefect work-pool create --type process local-process-pool
prefect worker start --pool local-process-pool
python deploy.py

Output: (none — exits 0 on success)

Logging and state hooks

python
from prefect import flow, task
from prefect.states import State

@task(log_prints=True)
def my_task():
    print("task running")
    return 42

def on_failure(flow, flow_run, state):
    print(f"Flow {flow.name} failed: {state.message}")

def on_completion(flow, flow_run, state):
    print(f"Flow {flow.name} completed successfully")

@flow(
    on_failure=[on_failure],
    on_completion=[on_completion],
    log_prints=True,
)
def instrumented_flow():
    result = my_task()
    print(f"Result: {result}")

instrumented_flow()

Concurrency limits

Use concurrency_limit to cap how many tasks run simultaneously, preventing resource exhaustion.

python
from prefect import flow, task
from prefect.concurrency.sync import concurrency

@task
def api_call(endpoint: str) -> dict:
    with concurrency("external-api", occupy=1):
        import httpx
        return httpx.get(endpoint).json()

@flow
def batch_requests(endpoints: list[str]) -> list[dict]:
    futures = [api_call.submit(ep) for ep in endpoints]
    return [f.result() for f in futures]

Create the concurrency limit once:

bash
prefect concurrency-limit create external-api 5

Output: (none — exits 0 on success)

Starting the UI

bash
prefect server start      # local server + UI at http://localhost:4200

Output: (none — exits 0 on success)

Real-world recipes

End-to-end Prefect flows that exercise the most common production patterns: parameterised deployments, conditional branching, and recover-from-state retries.

1. Parameterised ETL with retries and caching

python
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(retries=3, retry_delay_seconds=10, cache_key_fn=task_input_hash,
      cache_expiration=timedelta(hours=6))
def extract(source: str) -> list[dict]:
    import httpx
    r = httpx.get(source, timeout=10)
    r.raise_for_status()
    return r.json()

@task
def transform(rows: list[dict], min_score: int) -> list[dict]:
    return [r for r in rows if r.get("score", 0) >= min_score]

@task(retries=2)
def load(rows: list[dict], dest: str) -> int:
    import json, pathlib
    pathlib.Path(dest).write_text(json.dumps(rows, indent=2))
    return len(rows)

@flow(name="etl", log_prints=True)
def etl(source: str, dest: str = "out.json", min_score: int = 50) -> int:
    raw = extract(source)
    cleaned = transform(raw, min_score)
    return load(cleaned, dest)

if __name__ == "__main__":
    print(etl("https://api.example.com/items", min_score=75))

Output:

text
INFO | Task run 'extract' - Finished in state Completed()
INFO | Task run 'transform' - Finished in state Completed()
INFO | Task run 'load' - Finished in state Completed()
INFO | Flow run 'amber-falcon' - Finished in state Completed()
142

2. Conditional branching with allow_failure

python
from prefect import flow, task, allow_failure

@task
def primary_source() -> dict:
    raise RuntimeError("primary down")

@task
def fallback_source() -> dict:
    return {"src": "fallback", "rows": [1, 2, 3]}

@task
def merge(primary: dict, fallback: dict) -> dict:
    return primary if primary else fallback

@flow(log_prints=True)
def resilient_fetch():
    p_future = primary_source.submit()
    f_future = fallback_source.submit()
    result = merge(allow_failure(p_future), f_future)
    print(result)

resilient_fetch()

Output:

text
INFO | Task run 'primary_source' - Finished in state Failed(...)
INFO | Task run 'fallback_source' - Finished in state Completed()
INFO | Task run 'merge' - Finished in state Completed()
{'src': 'fallback', 'rows': [1, 2, 3]}

3. Concurrency-limited batch with rate-limiting

python
from prefect import flow, task
from prefect.concurrency.sync import rate_limit, concurrency

@task
def call_third_party(endpoint: str) -> dict:
    # Soft concurrency cap (max 5 in-flight)
    with concurrency("third-party", occupy=1):
        # Hard rate limit (10 requests / second)
        rate_limit("third-party-rps", occupy=1)
        import httpx
        return httpx.get(endpoint, timeout=5).json()

@flow
def fanout(endpoints: list[str]):
    futures = [call_third_party.submit(e) for e in endpoints]
    return [f.result() for f in futures]

Create the limits once via CLI:

bash
prefect concurrency-limit create third-party 5
prefect gcl create third-party-rps --limit 10 --slot-decay-per-second 10

Output: (none — exits 0 on success)

4. Map fan-out with .map()

python
from prefect import flow, task

@task
def process(item: int) -> int:
    return item * item

@flow
def map_pipeline(items: list[int]) -> list[int]:
    futures = process.map(items)
    return [f.result() for f in futures]

print(map_pipeline(list(range(10))))

Output:

text
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

5. Recovering from a failed state

python
from prefect import flow, task
from prefect.states import Failed, Completed

@task
def maybe_fail(x: int):
    if x < 0:
        return Failed(message=f"negative: {x}")
    return Completed(message=f"ok: {x}", data=x * 2)

@flow(log_prints=True)
def safe_flow(values: list[int]):
    for v in values:
        state = maybe_fail(v, return_state=True)
        if state.is_failed():
            print(f"SKIPPED: {state.message}")
            continue
        print(f"OK: {state.result()}")

safe_flow([1, -2, 3, -4])

Output:

text
OK: 2
SKIPPED: negative: -2
OK: 6
SKIPPED: negative: -4

Production deployment

In production Prefect runs three logical components: the server (Prefect Cloud or self-hosted), one or more workers polling a work pool, and your code stored where workers can fetch it.

python
# deploy.py
from prefect import flow
from prefect.docker import DockerImage

@flow
def hourly_report(env: str = "prod"):
    print(f"running in {env}")

if __name__ == "__main__":
    hourly_report.deploy(
        name="hourly-report-prod",
        work_pool_name="k8s-prod",
        image=DockerImage(
            name="myorg/hourly-report",
            tag="2026.05.30",
            dockerfile="Dockerfile",
        ),
        cron="0 * * * *",
        parameters={"env": "prod"},
        tags=["prod", "hourly"],
    )

Output: (none — exits 0 on success)

Worker pool topology:

bash
# 1. Create the pool (one-time)
prefect work-pool create --type kubernetes k8s-prod

# 2. Start a worker (one or many; Prefect will load-balance)
prefect worker start --pool k8s-prod

# 3. Deploy and trigger
python deploy.py
prefect deployment run "hourly-report/hourly-report-prod"

Output: (none — exits 0 on success)

Work-pool typeUse when
processLocal development, single-node services
dockerEach run in an isolated container, single host
kubernetesEach run as a K8s Job; production default
ecs / cloud-run / vertexServerless executors on AWS / GCP
prefect-managedPrefect Cloud runs the worker for you

Deploy checklist:

  1. Pin Prefect version across server and worker. Skew breaks the gRPC contract.
  2. Use Prefect Cloud or self-host Postgres for the API. SQLite is fine for dev only.
  3. Store flow code in a version-controlled image, not on the worker filesystem.
  4. Set deployment-level retries separately from task-level retries — a flow-level retry restarts the whole graph; a task-level retry restarts only that task.
  5. Use prefect-aws / prefect-gcp blocks to mount secrets — never bake them into the image.

Testing patterns

Prefect makes flows and tasks unit-testable by treating them as regular Python objects. The prefect.testing module also provides utilities for asserting on run state.

python
import pytest
from prefect.testing.utilities import prefect_test_harness
from my_flows import etl, transform

# Harness sets up an in-memory Prefect API for the test session
@pytest.fixture(autouse=True, scope="session")
def prefect_db():
    with prefect_test_harness():
        yield

# 1. Pure-function task — call without the @task wrapper
def test_transform_filters_low_scores():
    result = transform.fn([{"score": 10}, {"score": 80}], min_score=50)
    assert result == [{"score": 80}]

# 2. Flow run end-to-end against the test API
def test_etl_loads_results(tmp_path):
    out = tmp_path / "out.json"
    n = etl("https://httpbin.org/get", dest=str(out), min_score=0)
    assert n >= 0
    assert out.exists()

# 3. Assert state from a flow run
def test_etl_handles_bad_url():
    state = etl("https://no-such-host.example", return_state=True)
    assert state.is_failed()

Output: (none — exits 0 on success)

Patterns:

  • task.fn(...) invokes the wrapped function without any Prefect orchestration — fastest for pure-logic tests.
  • prefect_test_harness() spins up an ephemeral SQLite-backed API; use as a session-scoped fixture.
  • return_state=True on a flow call returns the final State so you can assert on .is_failed() / .message.
  • Mock external IO with respx (httpx) or responses (requests) rather than hitting real services.

Migration from older Prefect versions

Prefect has had two major breaking transitions: 1.x → 2.x (2022, total rewrite) and 2.x → 3.x (2024, API streamlining). Most production code today is 2.x or 3.x.

ConceptPrefect 1.xPrefect 2.xPrefect 3.x
Define a workflowFlow(...) context manager@flow decorator@flow decorator
Define a taskTask subclass@task decorator@task decorator
Run locallyflow.run()flow() callflow() call
ScheduleSchedule(clocks=[...])flow.serve(cron="...")flow.serve(cron="...")
Deployflow.register(project_name=...)Deployment.build_from_flow(...)flow.deploy(name=..., work_pool_name=...)
Agentprefect agent startprefect agent start -q queueprefect worker start --pool POOL
StorageBuilt-in storage classesBlocksBlocks (still)
Result storageResult subclassesPer-flow result_storagePer-flow result_storage

Migration tips 2.x → 3.x:

  • prefect agent is now prefect worker. Agents poll queues, workers poll work pools.
  • Deployment.build_from_flow() is gone — use flow.deploy(...) with a work-pool name.
  • prefect.client.get_client() API moved; use the new async-first client and await it.
  • Default result storage moved from local filesystem to the Prefect API by default; for large results use S3Bucket / GCSBucket blocks.

Ecosystem integrations

Prefect ships first-party integrations as separate pip packages. Each provides Blocks (configured resources) and Tasks (pre-built operations).

PackagePurpose
prefect-awsS3, Lambda, ECS work pool, Secrets Manager
prefect-gcpGCS, BigQuery, Cloud Run, Vertex AI
prefect-kubernetesK8s job runner, secret blocks
prefect-dbtRun dbt models as tasks with manifest-based lineage
prefect-snowflakeConnector block + query tasks
prefect-daskDask task runner for parallel execution
prefect-rayRay task runner for parallel execution
prefect-slackSlack notifications on flow state
prefect-emailSMTP email notifications
python
# Example: use prefect-dask for parallel task execution
from prefect import flow
from prefect_dask import DaskTaskRunner

@flow(task_runner=DaskTaskRunner(
    cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
))
def parallel_pipeline():
    ...

Troubleshooting common errors

SymptomCauseFix
RuntimeError: This flow is not currently being orchestratedCalling a @task outside a @flowWrap the call in a flow, or call task.fn(...) to bypass orchestration
Worker logs "no flow runs available" indefinitelyWrong work-pool name in deploymentprefect deployment ls and confirm --pool matches
Task hangs at "submitted" foreverWorker died or storage block unreachableRestart worker; check block credentials
PrefectHTTPStatusError: 422 Unprocessable EntityDeployment payload schema mismatchPin server + client to same minor version
Cron schedule never firesTimezone string typo, or daemon not runningVerify timezone="America/New_York"; check server logs
Concurrency limit not enforcedLimit not created on server, or wrong nameprefect concurrency-limit ls; recreate if missing
OSError: [Errno 28] No space left on deviceLocal results storage filledConfigure result_storage block to S3/GCS for large outputs
Subflow status "Crashed" with no tracebackWorker SIGKILLed (OOM)Increase worker memory; reduce task return-value size

Performance tuning

Prefect's overhead per task is roughly 50–100ms; for fast tasks this is the dominant cost. Tune by raising task granularity or using task.map() for fan-out.

python
# BAD — high overhead, one task per row
@task
def process_row(row): return row * 2

@flow
def slow(rows):
    return [process_row(r) for r in rows]   # 10,000 tasks × 80ms overhead = 800s

# GOOD — batch into chunks of 100
from itertools import islice
@task
def process_batch(batch): return [r * 2 for r in batch]

@flow
def fast(rows):
    futures = []
    it = iter(rows)
    while batch := list(islice(it, 100)):
        futures.append(process_batch.submit(batch))
    return [r for f in futures for r in f.result()]

Output: (none — exits 0 on success)

Tuning checklist:

  1. Batch task granularity. Aim for tasks that take > 1 second each. Sub-second tasks waste orchestration overhead.
  2. Use DaskTaskRunner or RayTaskRunner for genuinely parallel CPU work — the default runner is concurrent but not parallel for pure Python.
  3. Cap concurrency for external APIs via concurrency and rate_limit — uncapped fan-out gets you rate-limited or banned.
  4. Persist results to object storage for tasks whose outputs are > 10 MB; the default Prefect API storage isn't optimised for large blobs.
  5. Set log_prints=True thoughtfully — every captured print is a network round-trip to the API.

Patterns & idioms

  • Parameterised deployments per environment. Deploy the same flow twice with different parameters={"env": "prod"} and parameters={"env": "staging"}.
  • Pure tasks over stateful classes. Tasks should be functions; classes-with-state break caching and serialisation.
  • Subflows for reusable sequences. Extract a 5-task fragment into a @flow and call it from multiple parents.
  • State hooks for alerting. on_failure=[notify_slack] is cleaner than a try/except wrapping the flow body.
  • Blocks for shared configuration. S3Bucket.load("prod-data") is reusable across flows and version-controlled in Prefect Cloud.
  • return_state=True when callers need to inspect failure programmatically.

When NOT to use this

Prefect is a great default but is overkill in several common situations.

  • One-shot scripts. A 10-line script run by cron does not need Prefect. The decorators add no value without the UI, retries, or scheduling.
  • Sub-second event processing. Prefect's per-task overhead is 50–100ms; for sub-second event handling use Kafka, Faust, or Celery.
  • Strict asset-lineage workflows. Dagster is more opinionated about typed asset lineage; for lineage-first pipelines, Dagster's model is cleaner.
  • Long-running stateful workflows. Temporal models pause/resume and durable timers natively; Prefect can do this but is not optimised for hour-to-day-long timeouts.
  • Pipelines that are 90% SQL. dbt is leaner; wrap it later with prefect-dbt if observability becomes a need.

Quick reference

TaskCode
Define flow@flow def my_flow(): ...
Define task@task def my_task(): ...
Call task (blocking)result = my_task(arg)
Submit task (async)future = my_task.submit(arg)
Wait for futureswait(futures)
Get future resultfuture.result()
Retries@task(retries=3, retry_delay_seconds=10)
Timeout@task(timeout_seconds=30)
Cache@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
Log prints@task(log_prints=True)
Cron scheduleflow.serve(name="x", cron="0 6 * * *")
Interval scheduleflow.serve(name="x", interval=timedelta(hours=6))
Deploy to workerflow.deploy(name="x", work_pool_name="pool", cron="...")
Concurrency limitwith concurrency("key", occupy=1):
Start UIprefect server start