cheat sheet

prefect

Package-level reference for Prefect on PyPI — install variants, version policy, cloud-vs-OSS extras, and alternatives.

prefect

What it is

prefect is a Python framework for orchestrating data pipelines and arbitrary workflows. Originally extracted from Airflow-era pain points by Jeremiah Lowin in 2018, it provides Python-native @flow and @task decorators, a scheduler, a UI, and a control plane that can be either self-hosted or Prefect Cloud.

Reach for Prefect when you want orchestration that feels like writing plain Python with retries, caching, and observability layered in. Reach for Dagster when you want first-class asset materialization and a stronger data-lineage model.

Install

bash
pip install prefect

Output: (none — exits 0 on success). Installs the core engine, CLI, scheduler, and an embedded server.

bash
uv add prefect

Output: dependency resolved + added to pyproject.toml

bash
poetry add prefect

Output: updated lockfile + virtualenv install

bash
pip install "prefect[aws]"

Output: installs Prefect plus prefect-aws (S3 blocks, ECS workers, Secrets Manager).

bash
prefect cloud login

Output: OAuth-style browser flow; stores an API key under ~/.prefect/profiles.toml.

Versioning & Python support

  • Current stable line is the 3.x series (released late 2024). 3.0 reworked the engine for synchronous-by-default execution and dropped several deprecated 2.x surfaces.
  • The 2.x series (2022–2024) was itself a complete rewrite from the 1.x "Prefect Core" era. The 1.x → 2.x migration broke ABI compatibility entirely; the 2.x → 3.x migration is smaller but still breaks integrations.
  • Supports Python 3.9+ on the 3.x line. Older 2.x supports 3.8.
  • Pin tightly (prefect>=3,<4) — minor releases routinely add or rename CLI subcommands, and integration packages (prefect-aws, prefect-gcp) follow their own release cadence.

Package metadata

  • Maintainer: Prefect Technologies, Inc.
  • Project home: github.com/PrefectHQ/prefect
  • Docs: docs.prefect.io
  • PyPI: pypi.org/project/prefect
  • License: Apache-2.0 (open-source core); Prefect Cloud is a paid managed control plane
  • Governance: company-led open source — Prefect Technologies, Inc.
  • First released: 2018
  • Downloads: millions per month — widely used in data engineering and ML ops stacks.

Optional dependencies & extras

Prefect's integrations live in sibling packages (prefect-<provider>) that are also exposed as extras on the main prefect distribution.

  • prefect[aws] → installs prefect-aws — S3 blocks, ECS workers, AWS Secrets Manager.
  • prefect[gcp] → installs prefect-gcp — GCS, Vertex AI, Cloud Run workers, Secret Manager.
  • prefect[azure] → installs prefect-azure — Blob storage, Container Instances.
  • prefect[kubernetes] → installs prefect-kubernetes — Kubernetes workers and jobs.
  • prefect[docker] → installs prefect-docker — Docker workers and containers.
  • prefect[dask] → installs prefect-daskDaskTaskRunner for parallelism via Dask.
  • prefect[ray] → installs prefect-rayRayTaskRunner for distributed Ray execution.
  • prefect[dbt] → installs prefect-dbt — dbt Core invocation as Prefect tasks.
  • prefect[snowflake] / prefect[databricks] / prefect[bigquery] — first-party data-warehouse integrations.
  • prefect[redis] / prefect[slack] / prefect[email] — operational integrations.
  • prefect[dev] — test and lint dependencies; for contributing only.

Core required dependencies pulled in by the base install include pydantic, sqlalchemy (for the embedded metadata DB), httpx, uvicorn, griffe, croniter, and click.

Alternatives

PackageTrade-off
dagsterAsset-centric orchestration with strong lineage and a richer UI. Use when you think in software-defined assets, not flows of tasks.
airflow (apache-airflow)The classical DAG orchestrator. Battle-tested but heavyweight; configuration-driven; weaker Python-native ergonomics.
temporal (temporalio)Durable workflow engine for general-purpose Python (not data-specific). Use when you need workflow durability beyond data pipelines.
kedroOpinionated data-pipeline framework with strong conventions. Pairs well with Prefect or Dagster for execution.
metaflowNetflix's workflow framework — data-science focused, runs on AWS Step Functions or Argo.
luigiOld-school dependency-graph orchestrator from Spotify. Legacy choice.
argo workflowsKubernetes-native YAML DAGs. Use when everything already lives on K8s.

Common gotchas

  1. 1.x → 2.x → 3.x ABI breaks. Each major release reworked the engine. Flow definitions, deployment YAML, and the storage block API all changed between versions. There is no rolling-upgrade path — plan a one-shot migration per major.
  2. Agent vs worker model changed in 2.16 / 3.0. Prefect 1.x and early 2.x used "agents" polling work queues. Modern Prefect uses workers bound to specific work pools of a particular infrastructure type (process, Docker, Kubernetes, ECS, etc.). Old tutorials referencing prefect agent start are obsolete.
  3. Cloud auth via API key vs CLI login. prefect cloud login writes a profile interactively; PREFECT_API_KEY + PREFECT_API_URL env vars are the CI-friendly path. Mixing the two (profile present but env vars overriding) is a frequent source of "authenticated but no workspace" errors.
  4. prefect[*] extras do not auto-update. pip install -U prefect upgrades the core but not prefect-aws/prefect-gcp unless you also pass them. Pinning all integration packages in pyproject.toml is the safe pattern.
  5. Embedded server uses SQLite by default. Fine for local development; not suitable for production self-hosted deployments. Switch to PostgreSQL via PREFECT_API_DATABASE_CONNECTION_URL before any team starts depending on the server.
  6. Flow run state is mutable from the API. The Cloud UI / API can mark a run as Cancelled, Failed, or Completed even if the worker process is still running. Workers detect this via heartbeats and shut the run down — but a long compute step holding the GIL can outlive the cancellation by minutes.
  7. prefect deploy writes prefect.yaml. First-time deploys generate this file in the project root. Subsequent prefect deploy runs amend it. Hand-editing the same file while running interactive prefect deploy causes lost changes.
  8. Block API renamed several times. "Blocks" (typed configuration objects: S3Bucket, GitHubRepository, etc.) are stored in the API and referenced by slug. Renaming a block does not rewrite references in deployment YAML — re-deploy after any block rename.

Real-world recipes

Patterns from production Prefect 3.x. The flow/task model is Python-native — these recipes show how to push it past "decorated script" into a robust pipeline.

ETL flow with task-level retries and result caching

The core decorator pattern: @flow for orchestrated entry points, @task for cacheable units. Setting cache_key_fn=task_input_hash skips re-running tasks whose inputs are unchanged — invaluable for partial-failure recovery.

python
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash

@task(retries=3, retry_delay_seconds=[10, 30, 60],
      cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract(source: str) -> list[dict]:
    import httpx
    return httpx.get(source, timeout=30).json()

@task
def transform(records: list[dict]) -> list[dict]:
    return [{"id": r["id"], "name": r["name"].title()} for r in records]

@task(retries=2)
def load(records: list[dict], table: str) -> int:
    # ... INSERT into table ...
    return len(records)

@flow(name="user-sync", log_prints=True)
def user_sync(source: str = "https://api.example.com/users",
              table: str = "users"):
    raw = extract(source)
    clean = transform(raw)
    return load(clean, table)

if __name__ == "__main__":
    user_sync()

Output: the flow runs sequentially; re-running within the hour skips extract (cache hit) and proceeds directly to transform. Retry on extract uses exponential-ish backoff (10 → 30 → 60 s).

Parameterised deployment with a cron schedule

A deployment binds a flow to infrastructure (work pool) and an optional schedule. The Python API for deployments lets you check the deployment definition into git alongside the flow code.

python
from prefect.client.schemas.schedules import CronSchedule
from user_sync import user_sync

if __name__ == "__main__":
    user_sync.deploy(
        name="hourly-sync",
        work_pool_name="default-process-pool",
        parameters={"source": "https://api.example.com/users",
                    "table": "users"},
        schedules=[CronSchedule(cron="0 * * * *", timezone="UTC")],
        tags=["etl", "alice-team"],
    )

Output: python deploy.py registers the deployment with the API; a worker bound to default-process-pool picks up runs on the hour.

Conditional branching with task futures

Tasks return PrefectFuture objects when called inside a flow — they materialise to values when you await them or call .result(). The pattern below branches based on extracted state.

python
from prefect import flow, task

@task
def check_inventory(item: str) -> int:
    return 42  # stub

@task
def restock(item: str, count: int) -> None:
    print(f"restocking {count} of {item}")

@task
def notify_overstock(item: str, count: int) -> None:
    print(f"overstock {item}={count}")

@flow
def inventory_check(item: str):
    count = check_inventory(item)
    if count < 10:
        restock(item, 10 - count)
    elif count > 100:
        notify_overstock(item, count)
    else:
        print(f"{item} ok")

Output: Prefect logs each task transition; the conditional executes inside the flow body, so the run graph shows exactly which branch fired.

Fan-out with task.map

task.map(iterable) submits one run per item and returns a list of futures. For embarrassingly-parallel work, this is the cleanest concurrency primitive in Prefect.

python
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task
def fetch_one(url: str) -> dict:
    import httpx
    return httpx.get(url, timeout=30).json()

@flow(task_runner=ConcurrentTaskRunner())
def fetch_many(urls: list[str]) -> list[dict]:
    futures = fetch_one.map(urls)
    return [f.result() for f in futures]

Output: all URLs run concurrently up to the task runner's default concurrency cap; failures on individual items don't abort siblings.

Block-based external configuration

Blocks are typed configuration objects stored in the Prefect API — secrets, S3 connections, Slack webhooks. Define once via block.save("slug"), then reference by slug from any flow.

python
from prefect.blocks.system import Secret

# One-time setup (e.g. in a setup script)
Secret(value="sk-...").save("openai-api-key", overwrite=True)

# Usage in flows
from prefect import flow

@flow
def call_openai():
    api_key = Secret.load("openai-api-key").get()
    # ... use api_key ...

Output: the secret never lives in flow source; rotation is a single API call. Blocks are versioned and audit-logged in Prefect Cloud.

Production deployment

Prefect's deployment story has three pieces: where the orchestrator lives (Cloud vs self-hosted server), where workers run, and where flow code is fetched from. Pick each independently.

Cloud vs self-hosted server

ChoiceWhenTrade-off
Prefect CloudSmall teams, no DB to operate, audit/RBAC matter$$ for managed plane; no DB ops; SSO, automations, work pools UI included.
Self-hosted serverAir-gapped, cost-sensitive, regulatory data residencyRun prefect server start (or Helm chart); you own Postgres + uvicorn.

Cloud auth uses prefect cloud login (interactive) or PREFECT_API_KEY + PREFECT_API_URL env vars (CI-friendly). Self-hosted uses PREFECT_API_URL=http://prefect-server:4200/api only — no key needed if behind a private network.

bash
# Self-hosted server (dev / on-prem)
prefect server start
# Bind to all interfaces and use Postgres
PREFECT_API_DATABASE_CONNECTION_URL=postgresql+asyncpg://prefect@db/prefect \
PREFECT_SERVER_API_HOST=0.0.0.0 \
prefect server start

Output: API on http://0.0.0.0:4200/api; UI on http://0.0.0.0:4200. Switch to Postgres before any team workload — SQLite locks under concurrent flow runs.

Work pools and workers

A work pool is a typed slot (process, docker, kubernetes, ecs, cloud-run) that holds queued flow runs. A worker is a long-running process bound to one work pool that polls for runs and launches them on the matching infrastructure.

bash
# Create a process pool (workers run flows in subprocess on the worker host)
prefect work-pool create --type process default-process-pool

# Create a Kubernetes pool
prefect work-pool create --type kubernetes k8s-pool

# Start a worker for the process pool
prefect worker start --pool default-process-pool

# Start a Kubernetes worker (typically runs IN the cluster)
prefect worker start --pool k8s-pool

Output: the worker polls the pool every few seconds; when a queued run appears, the worker fetches the flow code, runs it on the appropriate infra, and reports state back to the server.

The pre-3.0 "agent" model is obsolete — old tutorials saying prefect agent start need to be re-read against the worker model.

Deployments: flow.deploy() vs prefect.yaml

Two equivalent definition styles:

python
# Python API — concise, lives next to the flow code
from prefect import flow

@flow
def my_flow(): ...

if __name__ == "__main__":
    my_flow.from_source(
        source="https://github.com/alicedev/repo.git",
        entrypoint="flows/my_flow.py:my_flow",
    ).deploy(
        name="prod",
        work_pool_name="k8s-pool",
        image="ghcr.io/alicedev/flows:latest",
    )
yaml
# prefect.yaml — declarative, lives in repo root
deployments:
  - name: prod
    entrypoint: flows/my_flow.py:my_flow
    work_pool:
      name: k8s-pool
      job_variables:
        image: ghcr.io/alicedev/flows:latest
    schedule:
      cron: "0 6 * * *"
      timezone: UTC

Output: prefect deploy reads prefect.yaml and registers everything; the Python form is equivalent but the YAML version composes better with GitOps pipelines.

Containerised workers

Each work pool type has an associated base job template that defines the container, env, volumes, etc. Override per-deployment via job_variables:

python
my_flow.deploy(
    name="prod",
    work_pool_name="k8s-pool",
    image="ghcr.io/alicedev/flows:v1.2.3",
    job_variables={
        "env": {"DATABASE_URL": "{{ prefect.blocks.secret.db-url }}"},
        "image_pull_policy": "Always",
        "service_account_name": "prefect-flows",
    },
)

Output: the worker passes the variables into the pod spec; secrets are pulled from the block store at run time.

Version migration guide

Prefect's three majors (1.x, 2.x, 3.x) are essentially three different products. Migrations are full rewrites of flow code, not ABI-compatible upgrades.

1.x2.x (the big one — 2022)

Prefect 1.x ("Prefect Core") was task-graph-first: you built a Flow() object explicitly, registered with a backend (Cloud or Server), and ran via a Dask-style execution model. 2.x replaced this with decorators and a brand-new engine.

python
# Prefect 1.x — explicit Flow construction
from prefect import Flow, task

@task
def extract(): return [1, 2, 3]

@task
def transform(xs): return [x * 2 for x in xs]

with Flow("etl") as flow:
    raw = extract()
    out = transform(raw)

flow.register("default")
python
# Prefect 2.x / 3.x — decorator-first
from prefect import flow, task

@task
def extract(): return [1, 2, 3]

@task
def transform(xs): return [x * 2 for x in xs]

@flow(name="etl")
def etl():
    raw = extract()
    return transform(raw)

etl()

Output: same data flow; entirely different APIs. There is no rolling upgrade.

2.x3.x (late 2024)

Prefect 3.0 reworked the engine for synchronous-by-default execution, removed Block.load_secret_name shims, retired several deprecated kwargs on flow.deploy(), and changed work-pool storage conventions.

Common patches needed:

  • Deployment storageRemoteFileSystem blocks deprecated in favour of from_source() on the flow.
  • prefect agent — gone; use prefect worker against a work pool.
  • Flow.run_config — removed; configure via work_pool job templates instead.
  • PrefectFuture.result() — semantics tightened. In 3.x, calling .result() on a failed future raises by default; pass raise_on_failure=False to recover the old behaviour.
  • prefect.context — replaced by prefect.runtime for accessing run/task metadata inside a task body.
python
# 2.x — accessing task run info
from prefect import context
task_id = context.task_run_id

# 3.x
from prefect.runtime import task_run
task_id = task_run.id

Output: equivalent values; the import path moves.

Integration package drift

Every prefect-aws, prefect-gcp, prefect-k8s package has its own release cadence and a compatible-prefect-core range. Pin all of them together in pyproject.toml:

toml
[project]
dependencies = [
  "prefect>=3.0,<4",
  "prefect-aws>=0.5,<1",
  "prefect-kubernetes>=0.5,<1",
]

Running pip install -U prefect without bumping the integration packages can break imports — the package guards __init__.py against incompatible core versions, but the error surfaces at flow-load time, not install time.

Performance tuning

Prefect's overhead is per-task-transition, not per-line-of-Python. Optimise by reducing the number of orchestrated boundaries — promote intra-loop work into the task body rather than wrapping every operation in its own @task.

Concurrency limits

Limits prevent fan-out from overwhelming external systems. They apply to task tags globally across the workspace.

bash
prefect concurrency-limit create database-writes 10
prefect concurrency-limit create third-party-api 3
prefect concurrency-limit ls

Output: tasks tagged database-writes cap at 10 concurrent runs across the workspace; the rest queue.

Task runner selection

python
from prefect import flow
from prefect.task_runners import ThreadPoolTaskRunner, ConcurrentTaskRunner
from prefect_dask import DaskTaskRunner
from prefect_ray import RayTaskRunner

@flow(task_runner=ConcurrentTaskRunner())          # async / coroutine
def f1(): ...

@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))   # I/O-bound threads
def f2(): ...

@flow(task_runner=DaskTaskRunner())                # CPU-bound distributed
def f3(): ...

Output: the runner controls how task.submit / task.map futures are scheduled; for pure I/O, ConcurrentTaskRunner (asyncio) gives the best throughput per worker.

Result storage

By default Prefect serialises task results to local disk under ~/.prefect/storage/. For multi-host workers or large results, switch to remote storage:

python
from prefect.filesystems import S3
from prefect import flow, task

s3 = S3.load("results-bucket")

@task(persist_result=True, result_storage=s3, result_serializer="json")
def big_task(): ...

Output: results land in S3; multiple workers can read each other's results without sharing a filesystem.

For very large results, don't persist them at all — return a path/URI and let downstream tasks read it directly. Prefect's serialisation overhead is real on multi-GB pandas frames.

Logging hygiene

log_prints=True on a flow/task captures print() and routes through Prefect's logger. Without it, print() writes go to the worker's stdout and never surface in the UI. For high-volume flows, switch to structured logging via get_run_logger():

python
from prefect import flow, get_run_logger

@flow
def f():
    log = get_run_logger()
    log.info("starting", extra={"item_count": 42})

Output: the log line is attached to the flow run; the UI displays it filtered by run, and Cloud's API exposes structured search.

Testing strategies

Prefect's testing story is "run the flow in-process, assert on the return value or state". @flow and @task decorators are transparent in unit tests — the function still behaves as Python.

python
from prefect.testing.utilities import prefect_test_harness
import pytest

@pytest.fixture(autouse=True, scope="session")
def prefect_test_db():
    """Spin up an isolated SQLite DB for the test session."""
    with prefect_test_harness():
        yield

def test_extract_returns_list():
    from user_sync import extract
    # tasks called outside a flow run with no orchestration —
    # they behave as plain Python functions
    result = extract.fn("https://example.com/users")
    assert isinstance(result, list)

def test_flow_e2e(monkeypatch):
    from user_sync import user_sync
    monkeypatch.setattr("user_sync.extract.fn", lambda src: [{"id": 1, "name": "alice"}])
    state = user_sync(return_state=True)
    assert state.is_completed()
    assert state.result() == 1

Output: pytest -q runs the flow against an ephemeral Prefect DB; no Cloud account needed. task.fn(...) is the standard escape hatch for unit-testing a task body without the orchestrator wrapper.

Key patterns:

  • prefect_test_harness() — fixture that swaps in a temporary SQLite DB so test runs don't leak state into your dev workspace.
  • task.fn(...) — call the raw function under a task without orchestration.
  • return_state=True — makes flow(...) return the State object instead of the result. Assert on state.is_completed() / state.is_failed() for control-flow tests.
  • monkeypatch on .fn — mock external APIs at the task level. The mock survives across retries.

See also