cheat sheet
prefect
Package-level reference for Prefect on PyPI — install variants, version policy, cloud-vs-OSS extras, and alternatives.
prefect
What it is
prefect is a Python framework for orchestrating data pipelines and arbitrary workflows. Originally extracted from Airflow-era pain points by Jeremiah Lowin in 2018, it provides Python-native @flow and @task decorators, a scheduler, a UI, and a control plane that can be either self-hosted or Prefect Cloud.
Reach for Prefect when you want orchestration that feels like writing plain Python with retries, caching, and observability layered in. Reach for Dagster when you want first-class asset materialization and a stronger data-lineage model.
Install
pip install prefect
Output: (none — exits 0 on success). Installs the core engine, CLI, scheduler, and an embedded server.
uv add prefect
Output: dependency resolved + added to pyproject.toml
poetry add prefect
Output: updated lockfile + virtualenv install
pip install "prefect[aws]"
Output: installs Prefect plus prefect-aws (S3 blocks, ECS workers, Secrets Manager).
prefect cloud login
Output: OAuth-style browser flow; stores an API key under ~/.prefect/profiles.toml.
Versioning & Python support
- Current stable line is the
3.xseries (released late 2024).3.0reworked the engine for synchronous-by-default execution and dropped several deprecated 2.x surfaces. - The
2.xseries (2022–2024) was itself a complete rewrite from the1.x"Prefect Core" era. The 1.x → 2.x migration broke ABI compatibility entirely; the 2.x → 3.x migration is smaller but still breaks integrations. - Supports Python 3.9+ on the
3.xline. Older2.xsupports 3.8. - Pin tightly (
prefect>=3,<4) — minor releases routinely add or rename CLI subcommands, and integration packages (prefect-aws,prefect-gcp) follow their own release cadence.
Package metadata
- Maintainer: Prefect Technologies, Inc.
- Project home: github.com/PrefectHQ/prefect
- Docs: docs.prefect.io
- PyPI: pypi.org/project/prefect
- License: Apache-2.0 (open-source core); Prefect Cloud is a paid managed control plane
- Governance: company-led open source — Prefect Technologies, Inc.
- First released: 2018
- Downloads: millions per month — widely used in data engineering and ML ops stacks.
Optional dependencies & extras
Prefect's integrations live in sibling packages (prefect-<provider>) that are also exposed as extras on the main prefect distribution.
prefect[aws]→ installsprefect-aws— S3 blocks, ECS workers, AWS Secrets Manager.prefect[gcp]→ installsprefect-gcp— GCS, Vertex AI, Cloud Run workers, Secret Manager.prefect[azure]→ installsprefect-azure— Blob storage, Container Instances.prefect[kubernetes]→ installsprefect-kubernetes— Kubernetes workers and jobs.prefect[docker]→ installsprefect-docker— Docker workers and containers.prefect[dask]→ installsprefect-dask—DaskTaskRunnerfor parallelism via Dask.prefect[ray]→ installsprefect-ray—RayTaskRunnerfor distributed Ray execution.prefect[dbt]→ installsprefect-dbt— dbt Core invocation as Prefect tasks.prefect[snowflake]/prefect[databricks]/prefect[bigquery]— first-party data-warehouse integrations.prefect[redis]/prefect[slack]/prefect[email]— operational integrations.prefect[dev]— test and lint dependencies; for contributing only.
Core required dependencies pulled in by the base install include pydantic, sqlalchemy (for the embedded metadata DB), httpx, uvicorn, griffe, croniter, and click.
Alternatives
| Package | Trade-off |
|---|---|
dagster | Asset-centric orchestration with strong lineage and a richer UI. Use when you think in software-defined assets, not flows of tasks. |
airflow (apache-airflow) | The classical DAG orchestrator. Battle-tested but heavyweight; configuration-driven; weaker Python-native ergonomics. |
temporal (temporalio) | Durable workflow engine for general-purpose Python (not data-specific). Use when you need workflow durability beyond data pipelines. |
kedro | Opinionated data-pipeline framework with strong conventions. Pairs well with Prefect or Dagster for execution. |
metaflow | Netflix's workflow framework — data-science focused, runs on AWS Step Functions or Argo. |
luigi | Old-school dependency-graph orchestrator from Spotify. Legacy choice. |
argo workflows | Kubernetes-native YAML DAGs. Use when everything already lives on K8s. |
Common gotchas
- 1.x → 2.x → 3.x ABI breaks. Each major release reworked the engine. Flow definitions, deployment YAML, and the storage block API all changed between versions. There is no rolling-upgrade path — plan a one-shot migration per major.
- Agent vs worker model changed in 2.16 / 3.0. Prefect 1.x and early 2.x used "agents" polling work queues. Modern Prefect uses workers bound to specific work pools of a particular infrastructure type (process, Docker, Kubernetes, ECS, etc.). Old tutorials referencing
prefect agent startare obsolete. - Cloud auth via API key vs CLI login.
prefect cloud loginwrites a profile interactively;PREFECT_API_KEY+PREFECT_API_URLenv vars are the CI-friendly path. Mixing the two (profile present but env vars overriding) is a frequent source of "authenticated but no workspace" errors. prefect[*]extras do not auto-update.pip install -U prefectupgrades the core but notprefect-aws/prefect-gcpunless you also pass them. Pinning all integration packages inpyproject.tomlis the safe pattern.- Embedded server uses SQLite by default. Fine for local development; not suitable for production self-hosted deployments. Switch to PostgreSQL via
PREFECT_API_DATABASE_CONNECTION_URLbefore any team starts depending on the server. - Flow run state is mutable from the API. The Cloud UI / API can mark a run as Cancelled, Failed, or Completed even if the worker process is still running. Workers detect this via heartbeats and shut the run down — but a long compute step holding the GIL can outlive the cancellation by minutes.
prefect deploywritesprefect.yaml. First-time deploys generate this file in the project root. Subsequentprefect deployruns amend it. Hand-editing the same file while running interactiveprefect deploycauses lost changes.- Block API renamed several times. "Blocks" (typed configuration objects:
S3Bucket,GitHubRepository, etc.) are stored in the API and referenced by slug. Renaming a block does not rewrite references in deployment YAML — re-deploy after any block rename.
Real-world recipes
Patterns from production Prefect 3.x. The flow/task model is Python-native — these recipes show how to push it past "decorated script" into a robust pipeline.
ETL flow with task-level retries and result caching
The core decorator pattern: @flow for orchestrated entry points, @task for cacheable units. Setting cache_key_fn=task_input_hash skips re-running tasks whose inputs are unchanged — invaluable for partial-failure recovery.
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(retries=3, retry_delay_seconds=[10, 30, 60],
cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract(source: str) -> list[dict]:
import httpx
return httpx.get(source, timeout=30).json()
@task
def transform(records: list[dict]) -> list[dict]:
return [{"id": r["id"], "name": r["name"].title()} for r in records]
@task(retries=2)
def load(records: list[dict], table: str) -> int:
# ... INSERT into table ...
return len(records)
@flow(name="user-sync", log_prints=True)
def user_sync(source: str = "https://api.example.com/users",
table: str = "users"):
raw = extract(source)
clean = transform(raw)
return load(clean, table)
if __name__ == "__main__":
user_sync()
Output: the flow runs sequentially; re-running within the hour skips extract (cache hit) and proceeds directly to transform. Retry on extract uses exponential-ish backoff (10 → 30 → 60 s).
Parameterised deployment with a cron schedule
A deployment binds a flow to infrastructure (work pool) and an optional schedule. The Python API for deployments lets you check the deployment definition into git alongside the flow code.
from prefect.client.schemas.schedules import CronSchedule
from user_sync import user_sync
if __name__ == "__main__":
user_sync.deploy(
name="hourly-sync",
work_pool_name="default-process-pool",
parameters={"source": "https://api.example.com/users",
"table": "users"},
schedules=[CronSchedule(cron="0 * * * *", timezone="UTC")],
tags=["etl", "alice-team"],
)
Output: python deploy.py registers the deployment with the API; a worker bound to default-process-pool picks up runs on the hour.
Conditional branching with task futures
Tasks return PrefectFuture objects when called inside a flow — they materialise to values when you await them or call .result(). The pattern below branches based on extracted state.
from prefect import flow, task
@task
def check_inventory(item: str) -> int:
return 42 # stub
@task
def restock(item: str, count: int) -> None:
print(f"restocking {count} of {item}")
@task
def notify_overstock(item: str, count: int) -> None:
print(f"overstock {item}={count}")
@flow
def inventory_check(item: str):
count = check_inventory(item)
if count < 10:
restock(item, 10 - count)
elif count > 100:
notify_overstock(item, count)
else:
print(f"{item} ok")
Output: Prefect logs each task transition; the conditional executes inside the flow body, so the run graph shows exactly which branch fired.
Fan-out with task.map
task.map(iterable) submits one run per item and returns a list of futures. For embarrassingly-parallel work, this is the cleanest concurrency primitive in Prefect.
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def fetch_one(url: str) -> dict:
import httpx
return httpx.get(url, timeout=30).json()
@flow(task_runner=ConcurrentTaskRunner())
def fetch_many(urls: list[str]) -> list[dict]:
futures = fetch_one.map(urls)
return [f.result() for f in futures]
Output: all URLs run concurrently up to the task runner's default concurrency cap; failures on individual items don't abort siblings.
Block-based external configuration
Blocks are typed configuration objects stored in the Prefect API — secrets, S3 connections, Slack webhooks. Define once via block.save("slug"), then reference by slug from any flow.
from prefect.blocks.system import Secret
# One-time setup (e.g. in a setup script)
Secret(value="sk-...").save("openai-api-key", overwrite=True)
# Usage in flows
from prefect import flow
@flow
def call_openai():
api_key = Secret.load("openai-api-key").get()
# ... use api_key ...
Output: the secret never lives in flow source; rotation is a single API call. Blocks are versioned and audit-logged in Prefect Cloud.
Production deployment
Prefect's deployment story has three pieces: where the orchestrator lives (Cloud vs self-hosted server), where workers run, and where flow code is fetched from. Pick each independently.
Cloud vs self-hosted server
| Choice | When | Trade-off |
|---|---|---|
| Prefect Cloud | Small teams, no DB to operate, audit/RBAC matter | $$ for managed plane; no DB ops; SSO, automations, work pools UI included. |
| Self-hosted server | Air-gapped, cost-sensitive, regulatory data residency | Run prefect server start (or Helm chart); you own Postgres + uvicorn. |
Cloud auth uses prefect cloud login (interactive) or PREFECT_API_KEY + PREFECT_API_URL env vars (CI-friendly). Self-hosted uses PREFECT_API_URL=http://prefect-server:4200/api only — no key needed if behind a private network.
# Self-hosted server (dev / on-prem)
prefect server start
# Bind to all interfaces and use Postgres
PREFECT_API_DATABASE_CONNECTION_URL=postgresql+asyncpg://prefect@db/prefect \
PREFECT_SERVER_API_HOST=0.0.0.0 \
prefect server start
Output: API on http://0.0.0.0:4200/api; UI on http://0.0.0.0:4200. Switch to Postgres before any team workload — SQLite locks under concurrent flow runs.
Work pools and workers
A work pool is a typed slot (process, docker, kubernetes, ecs, cloud-run) that holds queued flow runs. A worker is a long-running process bound to one work pool that polls for runs and launches them on the matching infrastructure.
# Create a process pool (workers run flows in subprocess on the worker host)
prefect work-pool create --type process default-process-pool
# Create a Kubernetes pool
prefect work-pool create --type kubernetes k8s-pool
# Start a worker for the process pool
prefect worker start --pool default-process-pool
# Start a Kubernetes worker (typically runs IN the cluster)
prefect worker start --pool k8s-pool
Output: the worker polls the pool every few seconds; when a queued run appears, the worker fetches the flow code, runs it on the appropriate infra, and reports state back to the server.
The pre-3.0 "agent" model is obsolete — old tutorials saying prefect agent start need to be re-read against the worker model.
Deployments: flow.deploy() vs prefect.yaml
Two equivalent definition styles:
# Python API — concise, lives next to the flow code
from prefect import flow
@flow
def my_flow(): ...
if __name__ == "__main__":
my_flow.from_source(
source="https://github.com/alicedev/repo.git",
entrypoint="flows/my_flow.py:my_flow",
).deploy(
name="prod",
work_pool_name="k8s-pool",
image="ghcr.io/alicedev/flows:latest",
)
# prefect.yaml — declarative, lives in repo root
deployments:
- name: prod
entrypoint: flows/my_flow.py:my_flow
work_pool:
name: k8s-pool
job_variables:
image: ghcr.io/alicedev/flows:latest
schedule:
cron: "0 6 * * *"
timezone: UTC
Output: prefect deploy reads prefect.yaml and registers everything; the Python form is equivalent but the YAML version composes better with GitOps pipelines.
Containerised workers
Each work pool type has an associated base job template that defines the container, env, volumes, etc. Override per-deployment via job_variables:
my_flow.deploy(
name="prod",
work_pool_name="k8s-pool",
image="ghcr.io/alicedev/flows:v1.2.3",
job_variables={
"env": {"DATABASE_URL": "{{ prefect.blocks.secret.db-url }}"},
"image_pull_policy": "Always",
"service_account_name": "prefect-flows",
},
)
Output: the worker passes the variables into the pod spec; secrets are pulled from the block store at run time.
Version migration guide
Prefect's three majors (1.x, 2.x, 3.x) are essentially three different products. Migrations are full rewrites of flow code, not ABI-compatible upgrades.
1.x → 2.x (the big one — 2022)
Prefect 1.x ("Prefect Core") was task-graph-first: you built a Flow() object explicitly, registered with a backend (Cloud or Server), and ran via a Dask-style execution model. 2.x replaced this with decorators and a brand-new engine.
# Prefect 1.x — explicit Flow construction
from prefect import Flow, task
@task
def extract(): return [1, 2, 3]
@task
def transform(xs): return [x * 2 for x in xs]
with Flow("etl") as flow:
raw = extract()
out = transform(raw)
flow.register("default")
# Prefect 2.x / 3.x — decorator-first
from prefect import flow, task
@task
def extract(): return [1, 2, 3]
@task
def transform(xs): return [x * 2 for x in xs]
@flow(name="etl")
def etl():
raw = extract()
return transform(raw)
etl()
Output: same data flow; entirely different APIs. There is no rolling upgrade.
2.x → 3.x (late 2024)
Prefect 3.0 reworked the engine for synchronous-by-default execution, removed Block.load_secret_name shims, retired several deprecated kwargs on flow.deploy(), and changed work-pool storage conventions.
Common patches needed:
- Deployment storage —
RemoteFileSystemblocks deprecated in favour offrom_source()on the flow. prefect agent— gone; useprefect workeragainst a work pool.Flow.run_config— removed; configure viawork_pooljob templates instead.PrefectFuture.result()— semantics tightened. In 3.x, calling.result()on a failed future raises by default; passraise_on_failure=Falseto recover the old behaviour.prefect.context— replaced byprefect.runtimefor accessing run/task metadata inside a task body.
# 2.x — accessing task run info
from prefect import context
task_id = context.task_run_id
# 3.x
from prefect.runtime import task_run
task_id = task_run.id
Output: equivalent values; the import path moves.
Integration package drift
Every prefect-aws, prefect-gcp, prefect-k8s package has its own release cadence and a compatible-prefect-core range. Pin all of them together in pyproject.toml:
[project]
dependencies = [
"prefect>=3.0,<4",
"prefect-aws>=0.5,<1",
"prefect-kubernetes>=0.5,<1",
]
Running pip install -U prefect without bumping the integration packages can break imports — the package guards __init__.py against incompatible core versions, but the error surfaces at flow-load time, not install time.
Performance tuning
Prefect's overhead is per-task-transition, not per-line-of-Python. Optimise by reducing the number of orchestrated boundaries — promote intra-loop work into the task body rather than wrapping every operation in its own @task.
Concurrency limits
Limits prevent fan-out from overwhelming external systems. They apply to task tags globally across the workspace.
prefect concurrency-limit create database-writes 10
prefect concurrency-limit create third-party-api 3
prefect concurrency-limit ls
Output: tasks tagged database-writes cap at 10 concurrent runs across the workspace; the rest queue.
Task runner selection
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner, ConcurrentTaskRunner
from prefect_dask import DaskTaskRunner
from prefect_ray import RayTaskRunner
@flow(task_runner=ConcurrentTaskRunner()) # async / coroutine
def f1(): ...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10)) # I/O-bound threads
def f2(): ...
@flow(task_runner=DaskTaskRunner()) # CPU-bound distributed
def f3(): ...
Output: the runner controls how task.submit / task.map futures are scheduled; for pure I/O, ConcurrentTaskRunner (asyncio) gives the best throughput per worker.
Result storage
By default Prefect serialises task results to local disk under ~/.prefect/storage/. For multi-host workers or large results, switch to remote storage:
from prefect.filesystems import S3
from prefect import flow, task
s3 = S3.load("results-bucket")
@task(persist_result=True, result_storage=s3, result_serializer="json")
def big_task(): ...
Output: results land in S3; multiple workers can read each other's results without sharing a filesystem.
For very large results, don't persist them at all — return a path/URI and let downstream tasks read it directly. Prefect's serialisation overhead is real on multi-GB pandas frames.
Logging hygiene
log_prints=True on a flow/task captures print() and routes through Prefect's logger. Without it, print() writes go to the worker's stdout and never surface in the UI. For high-volume flows, switch to structured logging via get_run_logger():
from prefect import flow, get_run_logger
@flow
def f():
log = get_run_logger()
log.info("starting", extra={"item_count": 42})
Output: the log line is attached to the flow run; the UI displays it filtered by run, and Cloud's API exposes structured search.
Testing strategies
Prefect's testing story is "run the flow in-process, assert on the return value or state". @flow and @task decorators are transparent in unit tests — the function still behaves as Python.
from prefect.testing.utilities import prefect_test_harness
import pytest
@pytest.fixture(autouse=True, scope="session")
def prefect_test_db():
"""Spin up an isolated SQLite DB for the test session."""
with prefect_test_harness():
yield
def test_extract_returns_list():
from user_sync import extract
# tasks called outside a flow run with no orchestration —
# they behave as plain Python functions
result = extract.fn("https://example.com/users")
assert isinstance(result, list)
def test_flow_e2e(monkeypatch):
from user_sync import user_sync
monkeypatch.setattr("user_sync.extract.fn", lambda src: [{"id": 1, "name": "alice"}])
state = user_sync(return_state=True)
assert state.is_completed()
assert state.result() == 1
Output: pytest -q runs the flow against an ephemeral Prefect DB; no Cloud account needed. task.fn(...) is the standard escape hatch for unit-testing a task body without the orchestrator wrapper.
Key patterns:
prefect_test_harness()— fixture that swaps in a temporary SQLite DB so test runs don't leak state into your dev workspace.task.fn(...)— call the raw function under a task without orchestration.return_state=True— makesflow(...)return theStateobject instead of the result. Assert onstate.is_completed()/state.is_failed()for control-flow tests.monkeypatchon.fn— mock external APIs at the task level. The mock survives across retries.
See also
- Python: prefect — flows, tasks, deployments, scheduling
- Packages: pip-dagster — the asset-oriented alternative