cheat sheet
prefect
Build, schedule, and observe Python workflows with Prefect. Covers flows, tasks, retries, schedules, deployments, caching, concurrency, and Prefect Cloud.
prefect — Workflow Orchestration
What it is
Prefect is a Python workflow orchestration framework where you decorate ordinary Python functions with @flow and @task to get scheduling, retries, caching, logging, observability, and a UI — all with minimal boilerplate. Unlike Dagster's asset-centric model, Prefect is function-centric: you write imperative Python that calls tasks, and Prefect tracks the execution graph at runtime. It is the most approachable orchestrator for Python developers who want to "add orchestration" to existing scripts with minimal refactoring.
Install
pip install prefect
Output: (none — exits 0 on success)
Quick example
from prefect import flow, task
@task
def fetch_data(url: str) -> list[dict]:
import httpx
return httpx.get(url).json()
@task
def process(records: list[dict]) -> list[dict]:
return [r for r in records if r.get("active")]
@flow(name="My Pipeline")
def pipeline(url: str) -> list[dict]:
raw = fetch_data(url)
return process(raw)
if __name__ == "__main__":
result = pipeline("https://httpbin.org/json")
print(result)
Output:
15:42:01.234 | INFO | prefect.engine - Created flow run 'nimble-falcon' for flow 'My Pipeline'
15:42:01.401 | INFO | Task run 'fetch_data' - Finished in state Completed()
15:42:01.412 | INFO | Task run 'process' - Finished in state Completed()
15:42:01.420 | INFO | Flow run 'nimble-falcon' - Finished in state Completed()
[]
When / why to use it
- Adding retries, logging, and scheduling to existing Python scripts with two decorators.
- ETL pipelines that need caching (skip re-running tasks whose inputs haven't changed).
- Data workflows that run on a cron or are triggered by an event.
- Teams that want a UI (local or Prefect Cloud) to monitor runs without setting up Airflow.
- Concurrent fan-out: submit dozens of tasks in parallel and collect results with
wait().
Common pitfalls
Mutating state across tasks — tasks should be pure functions. Passing mutable objects between tasks and modifying them in place breaks Prefect's state tracking and caching. Return new objects instead.
@taskfunctions called outside a@flow— calling a@taskfunction outside a flow context runs it as a plain Python function with no orchestration. Always call tasks from within a flow.
Large return values — Prefect serialises task results to its result storage (local or cloud). Returning a 2 GB DataFrame from a task causes slow serialisation and potential OOM. Use files or external storage and return a path/URI instead.
task.submit()submits a task asynchronously and returns aPrefectFuture. Collect results with.result(). This is the idiomatic way to parallelise tasks in a flow.
Use
flow.serve()to run a long-lived deployment locally without Prefect Cloud or a separate agent:pipeline.serve(name="local-deploy", cron="0 6 * * *").
Retries and timeouts
Prefect handles retries and timeouts at both the task and flow level with decorator parameters.
from prefect import flow, task
from datetime import timedelta
@task(
retries=3,
retry_delay_seconds=10,
timeout_seconds=30,
log_prints=True,
)
def unreliable_api_call(endpoint: str) -> dict:
import httpx, random
if random.random() < 0.5:
raise ConnectionError("Simulated network failure")
print(f"Called {endpoint} successfully")
return {"status": "ok"}
@flow(
retries=1,
timeout_seconds=120,
log_prints=True,
)
def robust_pipeline():
result = unreliable_api_call("https://api.example.com/data")
print(f"Result: {result}")
robust_pipeline()
Output (with retry):
INFO | Task run 'unreliable_api_call' - Retrying in 10.0 seconds...
INFO | Task run 'unreliable_api_call' - Called https://api.example.com/data successfully
INFO | Task run 'unreliable_api_call' - Finished in state Completed()
INFO | Result: {'status': 'ok'}
Caching — skip re-running unchanged tasks
The cache_key_fn parameter tells Prefect when two task calls are equivalent and the cached result can be reused. task_input_hash uses the function name and all input arguments as the cache key.
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import time
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
log_prints=True,
)
def expensive_query(sql: str) -> list[dict]:
print(f"Running query: {sql}")
time.sleep(2) # simulate slow DB call
return [{"result": 42}]
@flow
def pipeline():
# First call — runs the query
r1 = expensive_query("SELECT * FROM big_table")
# Second call with same input — returns cached result instantly
r2 = expensive_query("SELECT * FROM big_table")
print(r1, r2)
pipeline()
Output:
INFO | Running query: SELECT * FROM big_table ← runs once
INFO | [{'result': 42}] [{'result': 42}]
Parallel task submission
task.submit() returns a PrefectFuture immediately without blocking. Collect all futures at the end with wait() or iterate to call .result().
from prefect import flow, task
from prefect.futures import wait
import time
@task(log_prints=True)
def process_item(item: int) -> int:
time.sleep(0.5)
return item ** 2
@flow
def parallel_pipeline(items: list[int]) -> list[int]:
futures = [process_item.submit(item) for item in items]
wait(futures)
return [f.result() for f in futures]
result = parallel_pipeline(list(range(10)))
print(result)
Output:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Subflows — flows calling flows
Flows can call other flows. The sub-flow runs as a nested tracked unit with its own run ID and logs.
from prefect import flow, task
@task
def validate(data: list) -> bool:
return len(data) > 0
@flow(name="transform")
def transform_flow(data: list) -> list:
if validate(data):
return [x * 2 for x in data]
return []
@flow(name="main")
def main_flow():
raw = [1, 2, 3, 4, 5]
result = transform_flow(raw) # sub-flow call
print(f"Transformed: {result}")
main_flow()
Output:
INFO | Created flow run 'crimson-hawk' for flow 'main'
INFO | Created subflow run for flow 'transform'
INFO | Transformed: [2, 4, 6, 8, 10]
Schedules
Attach a cron or interval schedule to a flow at serve() or deploy() time.
from prefect import flow
from prefect.schedules import CronSchedule, IntervalSchedule
from datetime import timedelta
@flow(log_prints=True)
def daily_report():
print("Generating daily report...")
# Local serve — runs the flow on schedule in the foreground process
if __name__ == "__main__":
daily_report.serve(
name="daily-report",
cron="0 8 * * *", # 8 AM UTC daily
timezone="America/New_York",
)
For interval scheduling:
daily_report.serve(
name="every-6h",
interval=timedelta(hours=6),
)
Deployments — production scheduling
A deployment registers a flow with the Prefect server so it can be scheduled, triggered via API, or run by a worker.
# deploy.py
from prefect import flow
@flow
def my_pipeline(env: str = "prod"):
print(f"Running in {env}")
if __name__ == "__main__":
my_pipeline.deploy(
name="prod-pipeline",
work_pool_name="local-process-pool",
cron="0 6 * * *",
parameters={"env": "prod"},
)
# Create a local work pool, then run the worker
prefect work-pool create --type process local-process-pool
prefect worker start --pool local-process-pool
python deploy.py
Output: (none — exits 0 on success)
Logging and state hooks
from prefect import flow, task
from prefect.states import State
@task(log_prints=True)
def my_task():
print("task running")
return 42
def on_failure(flow, flow_run, state):
print(f"Flow {flow.name} failed: {state.message}")
def on_completion(flow, flow_run, state):
print(f"Flow {flow.name} completed successfully")
@flow(
on_failure=[on_failure],
on_completion=[on_completion],
log_prints=True,
)
def instrumented_flow():
result = my_task()
print(f"Result: {result}")
instrumented_flow()
Concurrency limits
Use concurrency_limit to cap how many tasks run simultaneously, preventing resource exhaustion.
from prefect import flow, task
from prefect.concurrency.sync import concurrency
@task
def api_call(endpoint: str) -> dict:
with concurrency("external-api", occupy=1):
import httpx
return httpx.get(endpoint).json()
@flow
def batch_requests(endpoints: list[str]) -> list[dict]:
futures = [api_call.submit(ep) for ep in endpoints]
return [f.result() for f in futures]
Create the concurrency limit once:
prefect concurrency-limit create external-api 5
Output: (none — exits 0 on success)
Starting the UI
prefect server start # local server + UI at http://localhost:4200
Output: (none — exits 0 on success)
Real-world recipes
End-to-end Prefect flows that exercise the most common production patterns: parameterised deployments, conditional branching, and recover-from-state retries.
1. Parameterised ETL with retries and caching
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(retries=3, retry_delay_seconds=10, cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=6))
def extract(source: str) -> list[dict]:
import httpx
r = httpx.get(source, timeout=10)
r.raise_for_status()
return r.json()
@task
def transform(rows: list[dict], min_score: int) -> list[dict]:
return [r for r in rows if r.get("score", 0) >= min_score]
@task(retries=2)
def load(rows: list[dict], dest: str) -> int:
import json, pathlib
pathlib.Path(dest).write_text(json.dumps(rows, indent=2))
return len(rows)
@flow(name="etl", log_prints=True)
def etl(source: str, dest: str = "out.json", min_score: int = 50) -> int:
raw = extract(source)
cleaned = transform(raw, min_score)
return load(cleaned, dest)
if __name__ == "__main__":
print(etl("https://api.example.com/items", min_score=75))
Output:
INFO | Task run 'extract' - Finished in state Completed()
INFO | Task run 'transform' - Finished in state Completed()
INFO | Task run 'load' - Finished in state Completed()
INFO | Flow run 'amber-falcon' - Finished in state Completed()
142
2. Conditional branching with allow_failure
from prefect import flow, task, allow_failure
@task
def primary_source() -> dict:
raise RuntimeError("primary down")
@task
def fallback_source() -> dict:
return {"src": "fallback", "rows": [1, 2, 3]}
@task
def merge(primary: dict, fallback: dict) -> dict:
return primary if primary else fallback
@flow(log_prints=True)
def resilient_fetch():
p_future = primary_source.submit()
f_future = fallback_source.submit()
result = merge(allow_failure(p_future), f_future)
print(result)
resilient_fetch()
Output:
INFO | Task run 'primary_source' - Finished in state Failed(...)
INFO | Task run 'fallback_source' - Finished in state Completed()
INFO | Task run 'merge' - Finished in state Completed()
{'src': 'fallback', 'rows': [1, 2, 3]}
3. Concurrency-limited batch with rate-limiting
from prefect import flow, task
from prefect.concurrency.sync import rate_limit, concurrency
@task
def call_third_party(endpoint: str) -> dict:
# Soft concurrency cap (max 5 in-flight)
with concurrency("third-party", occupy=1):
# Hard rate limit (10 requests / second)
rate_limit("third-party-rps", occupy=1)
import httpx
return httpx.get(endpoint, timeout=5).json()
@flow
def fanout(endpoints: list[str]):
futures = [call_third_party.submit(e) for e in endpoints]
return [f.result() for f in futures]
Create the limits once via CLI:
prefect concurrency-limit create third-party 5
prefect gcl create third-party-rps --limit 10 --slot-decay-per-second 10
Output: (none — exits 0 on success)
4. Map fan-out with .map()
from prefect import flow, task
@task
def process(item: int) -> int:
return item * item
@flow
def map_pipeline(items: list[int]) -> list[int]:
futures = process.map(items)
return [f.result() for f in futures]
print(map_pipeline(list(range(10))))
Output:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
5. Recovering from a failed state
from prefect import flow, task
from prefect.states import Failed, Completed
@task
def maybe_fail(x: int):
if x < 0:
return Failed(message=f"negative: {x}")
return Completed(message=f"ok: {x}", data=x * 2)
@flow(log_prints=True)
def safe_flow(values: list[int]):
for v in values:
state = maybe_fail(v, return_state=True)
if state.is_failed():
print(f"SKIPPED: {state.message}")
continue
print(f"OK: {state.result()}")
safe_flow([1, -2, 3, -4])
Output:
OK: 2
SKIPPED: negative: -2
OK: 6
SKIPPED: negative: -4
Production deployment
In production Prefect runs three logical components: the server (Prefect Cloud or self-hosted), one or more workers polling a work pool, and your code stored where workers can fetch it.
# deploy.py
from prefect import flow
from prefect.docker import DockerImage
@flow
def hourly_report(env: str = "prod"):
print(f"running in {env}")
if __name__ == "__main__":
hourly_report.deploy(
name="hourly-report-prod",
work_pool_name="k8s-prod",
image=DockerImage(
name="myorg/hourly-report",
tag="2026.05.30",
dockerfile="Dockerfile",
),
cron="0 * * * *",
parameters={"env": "prod"},
tags=["prod", "hourly"],
)
Output: (none — exits 0 on success)
Worker pool topology:
# 1. Create the pool (one-time)
prefect work-pool create --type kubernetes k8s-prod
# 2. Start a worker (one or many; Prefect will load-balance)
prefect worker start --pool k8s-prod
# 3. Deploy and trigger
python deploy.py
prefect deployment run "hourly-report/hourly-report-prod"
Output: (none — exits 0 on success)
| Work-pool type | Use when |
|---|---|
process | Local development, single-node services |
docker | Each run in an isolated container, single host |
kubernetes | Each run as a K8s Job; production default |
ecs / cloud-run / vertex | Serverless executors on AWS / GCP |
prefect-managed | Prefect Cloud runs the worker for you |
Deploy checklist:
- Pin Prefect version across server and worker. Skew breaks the gRPC contract.
- Use Prefect Cloud or self-host Postgres for the API. SQLite is fine for dev only.
- Store flow code in a version-controlled image, not on the worker filesystem.
- Set deployment-level
retriesseparately from task-level retries — a flow-level retry restarts the whole graph; a task-level retry restarts only that task. - Use
prefect-aws/prefect-gcpblocks to mount secrets — never bake them into the image.
Testing patterns
Prefect makes flows and tasks unit-testable by treating them as regular Python objects. The prefect.testing module also provides utilities for asserting on run state.
import pytest
from prefect.testing.utilities import prefect_test_harness
from my_flows import etl, transform
# Harness sets up an in-memory Prefect API for the test session
@pytest.fixture(autouse=True, scope="session")
def prefect_db():
with prefect_test_harness():
yield
# 1. Pure-function task — call without the @task wrapper
def test_transform_filters_low_scores():
result = transform.fn([{"score": 10}, {"score": 80}], min_score=50)
assert result == [{"score": 80}]
# 2. Flow run end-to-end against the test API
def test_etl_loads_results(tmp_path):
out = tmp_path / "out.json"
n = etl("https://httpbin.org/get", dest=str(out), min_score=0)
assert n >= 0
assert out.exists()
# 3. Assert state from a flow run
def test_etl_handles_bad_url():
state = etl("https://no-such-host.example", return_state=True)
assert state.is_failed()
Output: (none — exits 0 on success)
Patterns:
task.fn(...)invokes the wrapped function without any Prefect orchestration — fastest for pure-logic tests.prefect_test_harness()spins up an ephemeral SQLite-backed API; use as a session-scoped fixture.return_state=Trueon a flow call returns the finalStateso you can assert on.is_failed()/.message.- Mock external IO with
respx(httpx) orresponses(requests) rather than hitting real services.
Migration from older Prefect versions
Prefect has had two major breaking transitions: 1.x → 2.x (2022, total rewrite) and 2.x → 3.x (2024, API streamlining). Most production code today is 2.x or 3.x.
| Concept | Prefect 1.x | Prefect 2.x | Prefect 3.x |
|---|---|---|---|
| Define a workflow | Flow(...) context manager | @flow decorator | @flow decorator |
| Define a task | Task subclass | @task decorator | @task decorator |
| Run locally | flow.run() | flow() call | flow() call |
| Schedule | Schedule(clocks=[...]) | flow.serve(cron="...") | flow.serve(cron="...") |
| Deploy | flow.register(project_name=...) | Deployment.build_from_flow(...) | flow.deploy(name=..., work_pool_name=...) |
| Agent | prefect agent start | prefect agent start -q queue | prefect worker start --pool POOL |
| Storage | Built-in storage classes | Blocks | Blocks (still) |
| Result storage | Result subclasses | Per-flow result_storage | Per-flow result_storage |
Migration tips 2.x → 3.x:
prefect agentis nowprefect worker. Agents poll queues, workers poll work pools.Deployment.build_from_flow()is gone — useflow.deploy(...)with a work-pool name.prefect.client.get_client()API moved; use the new async-first client andawaitit.- Default result storage moved from local filesystem to the Prefect API by default; for large results use
S3Bucket/GCSBucketblocks.
Ecosystem integrations
Prefect ships first-party integrations as separate pip packages. Each provides Blocks (configured resources) and Tasks (pre-built operations).
| Package | Purpose |
|---|---|
| prefect-aws | S3, Lambda, ECS work pool, Secrets Manager |
| prefect-gcp | GCS, BigQuery, Cloud Run, Vertex AI |
| prefect-kubernetes | K8s job runner, secret blocks |
| prefect-dbt | Run dbt models as tasks with manifest-based lineage |
| prefect-snowflake | Connector block + query tasks |
| prefect-dask | Dask task runner for parallel execution |
| prefect-ray | Ray task runner for parallel execution |
| prefect-slack | Slack notifications on flow state |
| prefect-email | SMTP email notifications |
# Example: use prefect-dask for parallel task execution
from prefect import flow
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner(
cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
))
def parallel_pipeline():
...
Troubleshooting common errors
| Symptom | Cause | Fix |
|---|---|---|
RuntimeError: This flow is not currently being orchestrated | Calling a @task outside a @flow | Wrap the call in a flow, or call task.fn(...) to bypass orchestration |
| Worker logs "no flow runs available" indefinitely | Wrong work-pool name in deployment | prefect deployment ls and confirm --pool matches |
| Task hangs at "submitted" forever | Worker died or storage block unreachable | Restart worker; check block credentials |
PrefectHTTPStatusError: 422 Unprocessable Entity | Deployment payload schema mismatch | Pin server + client to same minor version |
| Cron schedule never fires | Timezone string typo, or daemon not running | Verify timezone="America/New_York"; check server logs |
| Concurrency limit not enforced | Limit not created on server, or wrong name | prefect concurrency-limit ls; recreate if missing |
OSError: [Errno 28] No space left on device | Local results storage filled | Configure result_storage block to S3/GCS for large outputs |
| Subflow status "Crashed" with no traceback | Worker SIGKILLed (OOM) | Increase worker memory; reduce task return-value size |
Performance tuning
Prefect's overhead per task is roughly 50–100ms; for fast tasks this is the dominant cost. Tune by raising task granularity or using task.map() for fan-out.
# BAD — high overhead, one task per row
@task
def process_row(row): return row * 2
@flow
def slow(rows):
return [process_row(r) for r in rows] # 10,000 tasks × 80ms overhead = 800s
# GOOD — batch into chunks of 100
from itertools import islice
@task
def process_batch(batch): return [r * 2 for r in batch]
@flow
def fast(rows):
futures = []
it = iter(rows)
while batch := list(islice(it, 100)):
futures.append(process_batch.submit(batch))
return [r for f in futures for r in f.result()]
Output: (none — exits 0 on success)
Tuning checklist:
- Batch task granularity. Aim for tasks that take > 1 second each. Sub-second tasks waste orchestration overhead.
- Use
DaskTaskRunnerorRayTaskRunnerfor genuinely parallel CPU work — the default runner is concurrent but not parallel for pure Python. - Cap concurrency for external APIs via
concurrencyandrate_limit— uncapped fan-out gets you rate-limited or banned. - Persist results to object storage for tasks whose outputs are > 10 MB; the default Prefect API storage isn't optimised for large blobs.
- Set
log_prints=Truethoughtfully — every captured print is a network round-trip to the API.
Patterns & idioms
- Parameterised deployments per environment. Deploy the same flow twice with different
parameters={"env": "prod"}andparameters={"env": "staging"}. - Pure tasks over stateful classes. Tasks should be functions; classes-with-state break caching and serialisation.
- Subflows for reusable sequences. Extract a 5-task fragment into a
@flowand call it from multiple parents. - State hooks for alerting.
on_failure=[notify_slack]is cleaner than a try/except wrapping the flow body. - Blocks for shared configuration.
S3Bucket.load("prod-data")is reusable across flows and version-controlled in Prefect Cloud. return_state=Truewhen callers need to inspect failure programmatically.
When NOT to use this
Prefect is a great default but is overkill in several common situations.
- One-shot scripts. A 10-line script run by cron does not need Prefect. The decorators add no value without the UI, retries, or scheduling.
- Sub-second event processing. Prefect's per-task overhead is 50–100ms; for sub-second event handling use Kafka, Faust, or Celery.
- Strict asset-lineage workflows. Dagster is more opinionated about typed asset lineage; for lineage-first pipelines, Dagster's model is cleaner.
- Long-running stateful workflows. Temporal models pause/resume and durable timers natively; Prefect can do this but is not optimised for hour-to-day-long timeouts.
- Pipelines that are 90% SQL. dbt is leaner; wrap it later with
prefect-dbtif observability becomes a need.
Quick reference
| Task | Code |
|---|---|
| Define flow | @flow def my_flow(): ... |
| Define task | @task def my_task(): ... |
| Call task (blocking) | result = my_task(arg) |
| Submit task (async) | future = my_task.submit(arg) |
| Wait for futures | wait(futures) |
| Get future result | future.result() |
| Retries | @task(retries=3, retry_delay_seconds=10) |
| Timeout | @task(timeout_seconds=30) |
| Cache | @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1)) |
| Log prints | @task(log_prints=True) |
| Cron schedule | flow.serve(name="x", cron="0 6 * * *") |
| Interval schedule | flow.serve(name="x", interval=timedelta(hours=6)) |
| Deploy to worker | flow.deploy(name="x", work_pool_name="pool", cron="...") |
| Concurrency limit | with concurrency("key", occupy=1): |
| Start UI | prefect server start |