cheat sheet
dagster
Package-level reference for Dagster on PyPI — install variants, the dagster-* plugin family, version policy, and alternatives.
dagster
What it is
dagster is a Python framework for building data applications around the concept of software-defined assets: declaratively-typed data artifacts (tables, models, files) whose dependencies, materialization logic, and metadata live together in code. Created by Nick Schrock (formerly of GraphQL/Facebook), it competes directly with Airflow and Prefect.
Reach for Dagster when the unit you care about is the asset that gets produced (a Snowflake table, a dbt model, a Parquet file), not the task that runs. Reach for Prefect when you'd rather think in terms of flows and ad-hoc Python orchestration without the asset abstraction.
Install
pip install dagster dagster-webserver
Output: (none — exits 0 on success). dagster is the engine; dagster-webserver is the UI/API server. Both are typically installed together.
uv add dagster dagster-webserver
Output: dependencies resolved + added to pyproject.toml
poetry add dagster dagster-webserver
Output: updated lockfile + virtualenv install
dagster dev
Output: launches a local instance with the daemon + webserver on http://127.0.0.1:3000.
Versioning & Python support
- Current stable line is the
1.xseries. Dagster broke from0.xto1.0in August 2022. Confusingly, the company continues to release1.x.ywith breaking changes between minor versions, despite calling them stable — read each release note carefully before upgrading. - Supports Python 3.9+ on recent releases. The
dagster-*plugin family matches the core's Python floor. - The
1.5release (late 2023) introduced the Definitions module pattern, deprecating the older@repositorydecorator. Code on the old pattern still runs but emits warnings. - Pin to a specific minor (
dagster~=1.8) and upgrade in lockstep with everydagster-*plugin you use — version drift between core and plugins produces obscure import errors.
Package metadata
- Maintainer: Dagster Labs (formerly Elementl)
- Project home: github.com/dagster-io/dagster
- Docs: docs.dagster.io
- PyPI: pypi.org/project/dagster
- License: Apache-2.0 (open-source core); Dagster+ is a paid managed control plane
- Governance: company-led open source — Dagster Labs
- First released: 2019
- Downloads: millions per month — heavy use in modern data-platform teams.
Optional dependencies & extras
Dagster has the largest plugin family in the orchestration space — every integration is a separate dagster-* PyPI package, not an extra on the main dagster distribution.
Install plugins explicitly: pip install dagster dagster-aws dagster-dbt dagster-snowflake.
Core orchestration tier:
dagster-webserver— the web UI + GraphQL API. Required for the local dev experience and any self-hosted server.dagster-graphql— standalone GraphQL client for the API; pulled in by the webserver.dagster-postgres/dagster-mysql— persistent storage for run history, schedules, sensors. SQLite is the default; switch to one of these before any team deployment.
Cloud / infra:
dagster-aws— S3, Redshift, ECS run launchers, Secrets Manager.dagster-gcp— GCS, BigQuery, Dataproc, Cloud Run.dagster-azure— Blob storage, ADLS, Azure resources.dagster-k8s— Kubernetes run launchers + executors.dagster-docker— Docker run launcher.
Compute / data tools:
dagster-dbt— first-class dbt Core integration; loads each dbt model as a Dagster asset.dagster-airbyte/dagster-fivetran— load ingest connectors as assets.dagster-snowflake/dagster-bigquery/dagster-duckdb— warehouse I/O managers.dagster-pandas/dagster-polars/dagster-pyspark— dataframe I/O managers and type checks.dagster-mlflow— log MLflow experiments from within Dagster assets.
Cloud-only:
dagster-cloud— Dagster+ (the managed service) CLI and agent. Only needed for Dagster+ deployments.
Alternatives
| Package | Trade-off |
|---|---|
prefect | Flow/task model rather than asset-oriented. Lighter weight, less rigid; weaker lineage UI. |
airflow (apache-airflow) | DAG/task model, configuration-heavy. Industry default for legacy data engineering teams. |
kedro | Opinionated framework for data-science pipelines. Often paired with Dagster or Prefect for execution. |
temporal (temporalio) | Durable workflows for arbitrary code — not data-focused. Use when you need workflow durability across services. |
metaflow | Netflix's data-science framework. Heavy AWS lean. |
argo workflows | Kubernetes-native YAML DAGs. Operations-focused, not asset-centric. |
mage-ai | Newer, notebook-style data engineering. Less mature ecosystem. |
Common gotchas
- Definitions module pattern replaces
@repository. Since1.5, the canonical entry point is a singledefs = Definitions(assets=[...], schedules=[...], resources={...})object in a module (typicallyyour_project/__init__.py). Old code using@repositoryand@multi_repositorystill loads but is deprecated; new tooling assumes Definitions. - Asset materialization vs op concept. "Ops" (the older abstraction) are units of computation; "assets" are the things produced by computation. Most modern Dagster code declares
@assetdirectly and skips ops entirely. Mixing both styles in one project produces a confusing UI. dagster-dbtrequires explicit installation per dbt project. The integration loads a dbt manifest (manifest.json) and exposes each model as an asset.pip install dagster-dbtalone is not enough — your dbt project must build its manifest (dbt parse) before Dagster can load assets. Stale manifests produce ghost assets that no longer exist in dbt.dagster devuses SQLite in~/.dagster/. Fine for development; not suitable for production. Switch to Postgres viadagster.yamlstorage config before deploying.- Plugin version drift breaks imports. Every
dagster-*plugin pins a compatible core version range. Runningpip install -U dagsterwithout upgrading plugins (or vice versa) yieldsImportErroron next start. Always upgradedagster+ everydagster-*together. - I/O managers are an extra step. Assets don't write anywhere by default — they hand the returned value to an I/O manager (Snowflake table, S3 file, etc.). New users frequently expect
@assetto just persist somewhere; without an I/O manager configured, the default is the local filesystem under~/.dagster/storage/. - Sensors and schedules need the daemon.
dagster-webserveralone runs the UI but does not tick schedules. Thedagster-daemonprocess (started bydagster devautomatically) is what evaluates schedules and sensors. Production deployments need to run both. materializeAPI vsexecute_in_process.Definitions.materializeis for asset-based code;JobDefinition.execute_in_processis for op/job-based code. Calling the wrong one for your codebase yields confusing "no such asset/job" errors.
Real-world recipes
The Dagster recipes that show up across most modern data platforms. Each one stays grounded in the Definitions module pattern (canonical since 1.5).
Asset DAG with typed dependencies
The bread-and-butter Dagster idiom: @asset functions whose parameters reference upstream assets. Dagster derives the DAG automatically from parameter names matching asset keys.
from dagster import asset, Definitions
import pandas as pd
@asset
def raw_users() -> pd.DataFrame:
"""Pull users from the source system."""
return pd.DataFrame([{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}])
@asset
def clean_users(raw_users: pd.DataFrame) -> pd.DataFrame:
"""Normalise names."""
return raw_users.assign(name=raw_users["name"].str.title())
@asset
def user_count(clean_users: pd.DataFrame) -> int:
"""How many users do we have?"""
return len(clean_users)
defs = Definitions(assets=[raw_users, clean_users, user_count])
Output: dagster dev shows a DAG with three nodes and two edges; clicking each shows the materialisation history and any logs.
Daily-partitioned asset
A partitioned asset is one logical asset that ships in slices keyed by partition (typically a date). Each partition materialises independently and can be backfilled out of order.
from datetime import datetime
from dagster import asset, DailyPartitionsDefinition, Definitions, AssetExecutionContext
daily = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=daily)
def daily_events(context: AssetExecutionContext) -> int:
date = context.partition_key # "2026-01-15"
# ... query the warehouse for events where dt = date ...
return 42
defs = Definitions(assets=[daily_events])
Output: the UI shows a calendar of partitions; missing partitions render grey. Backfill from the UI or via dagster asset materialize --select daily_events --partition 2026-01-15.
dbt integration
dagster-dbt exposes every dbt model as a first-class asset. The translator reads dbt's manifest.json and creates one Dagster asset per dbt model.
from pathlib import Path
from dagster import Definitions
from dagster_dbt import DbtCliResource, dbt_assets
DBT_PROJECT_DIR = Path(__file__).parent / "dbt_project"
DBT_MANIFEST = DBT_PROJECT_DIR / "target" / "manifest.json"
@dbt_assets(manifest=DBT_MANIFEST)
def dbt_models(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
defs = Definitions(
assets=[dbt_models],
resources={"dbt": DbtCliResource(project_dir=str(DBT_PROJECT_DIR))},
)
Output: every dbt model becomes a Dagster asset with lineage, schedules, and asset checks. dbt build runs under Dagster's logging; the UI surfaces dbt test results inline.
Sensor — react to upstream changes
A sensor evaluates Python on a tick (default 30 s) and emits run requests. Use sensors for event-driven materialisation (file landed in S3, message in a queue, upstream warehouse table updated).
import os
from dagster import sensor, RunRequest, SensorEvaluationContext, Definitions
@sensor(target=daily_events)
def s3_drop_sensor(context: SensorEvaluationContext):
last_seen = context.cursor or ""
new_files = sorted(os.listdir("/mnt/inbox"))
new = [f for f in new_files if f > last_seen]
if not new:
return
for f in new:
yield RunRequest(run_key=f, partition_key=f.split(".")[0])
context.update_cursor(new[-1])
defs = Definitions(assets=[daily_events], sensors=[s3_drop_sensor])
Output: the daemon ticks the sensor every 30 s; new files emit one run per partition. The cursor persists across daemon restarts.
Asset checks — runtime data quality
Asset checks run after materialisation and produce pass/fail signals. Use them for runtime invariants — row counts, null-ratio limits, schema drift detection.
from dagster import asset, asset_check, AssetCheckResult, AssetCheckSeverity
@asset
def clean_users() -> pd.DataFrame:
return pd.DataFrame([{"id": 1, "name": "alice"}])
@asset_check(asset=clean_users)
def no_null_names(clean_users: pd.DataFrame) -> AssetCheckResult:
nulls = clean_users["name"].isna().sum()
return AssetCheckResult(
passed=(nulls == 0),
severity=AssetCheckSeverity.ERROR if nulls else AssetCheckSeverity.WARN,
metadata={"null_count": nulls},
)
Output: the UI shows the check next to the asset; failures alert downstream consumers and block dependent materialisations if configured.
Production deployment
A Dagster deployment is three long-running processes plus storage. Run all three to have a working orchestrator.
The three processes
| Process | Role | Production setup |
|---|---|---|
dagster-webserver | UI + GraphQL API for humans and tools | Behind a reverse proxy with TLS; horizontally scalable. |
dagster-daemon | Ticks schedules, evaluates sensors, runs the queued-run coordinator | Single instance per deployment; HA via leader election. |
| Run worker | Spawned by the daemon to execute a single run | Process / Docker / Kubernetes / ECS based on RunLauncher config. |
# Run all three locally (dev mode)
dagster dev
# Run individually in production
dagster-webserver -h 0.0.0.0 -p 3000 -w workspace.yaml
dagster-daemon run -w workspace.yaml
# Run workers spawn automatically based on dagster.yaml RunLauncher config
Output: the webserver and daemon both read workspace.yaml to discover code locations; runs are spawned by whichever RunLauncher is configured (defaults to DefaultRunLauncher = subprocess on the daemon host).
Code locations
A code location is a Python module/package containing a Definitions object. The webserver and daemon load code locations into isolated subprocesses, so a syntax error in one location doesn't crash the whole deployment.
# workspace.yaml
load_from:
- python_file:
relative_path: my_pipelines/__init__.py
working_directory: .
- python_module:
module_name: my_other_pipelines
working_directory: .
- grpc_server:
host: pipelines.internal
port: 4000
Output: the UI shows each code location as a separate "deployment" with its own assets/jobs/schedules.
Postgres storage
dagster dev uses SQLite under ~/.dagster/. For production, swap to Postgres via dagster.yaml:
# $DAGSTER_HOME/dagster.yaml
storage:
postgres:
postgres_db:
username: dagster
password:
env: DAGSTER_PG_PASSWORD
hostname: db.internal
db_name: dagster
port: 5432
run_launcher:
module: dagster.core.launcher
class: DefaultRunLauncher
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 10
tag_concurrency_limits:
- key: "team"
value: "alice-team"
limit: 5
Output: run history, asset materialisations, schedules, and sensor cursors all persist in Postgres. The QueuedRunCoordinator caps concurrent runs.
Kubernetes deployment
The official Helm chart (dagster/dagster) provisions webserver + daemon + Postgres + each user code location as separate deployments. Each user code location runs as a gRPC server pod.
helm repo add dagster https://dagster-io.github.io/helm
helm install dagster dagster/dagster \
--set userDeployments.enabled=true \
--set "userDeployments.deployments[0].name=alice-pipelines" \
--set "userDeployments.deployments[0].image.repository=ghcr.io/alicedev/pipelines" \
--set "userDeployments.deployments[0].image.tag=v1.0.0"
Output: a full Dagster deployment on K8s; user pipelines run via the K8sRunLauncher, which spawns each run as its own pod.
Database migration strategies
Dagster doesn't manage schema for your data — it manages its own metadata (run history, event log, asset materialisations). When upgrading Dagster, the metadata schema may change.
Dagster metadata migrations
# After every dagster upgrade — run before starting the daemon/webserver
dagster instance migrate
Output: the migration runs DDL against $DAGSTER_HOME storage (SQLite or Postgres); subsequent processes start cleanly. Skipping this step is the #1 cause of "schema_version mismatch" errors after an upgrade.
Asset materialisation key versioning
When an asset's key changes (e.g. you rename it or move it under a key_prefix), Dagster loses the materialisation history for the old key. The pattern to preserve continuity:
from dagster import asset, AssetKey
@asset(key=AssetKey(["analytics", "users"])) # new structured key
def users(): ...
# Old materialisations live under AssetKey("users") and are no longer linked.
# To migrate, use the Dagster API to "alias" old materialisations under the new key.
For partitioned assets, never change the partition scheme (DailyPartitionsDefinition(start_date=...)) without backfilling — old partitions disappear from the UI.
Versioning IO managers
I/O managers store materialised values somewhere. Changing the I/O manager on an existing asset (e.g. from FilesystemIOManager to S3PickleIOManager) breaks reads of old materialisations — schedule a backfill or accept the gap.
Version migration guide
Dagster's 0.x → 1.0 jump (August 2022) preceded the asset-centric model. Within 1.x, the most important transition is @op/@graph → @asset (formalised in 1.5).
@op / @graph → @asset (post-1.5)
The pre-1.5 model declared computation units as @ops wired into @graphs. The modern model declares assets (the things produced); Dagster derives the graph automatically.
# Pre-1.5 — ops + graphs
from dagster import op, graph, job
@op
def extract(): return [1, 2, 3]
@op
def transform(xs): return [x * 2 for x in xs]
@graph
def etl():
transform(extract())
etl_job = etl.to_job()
# 1.5+ — assets
from dagster import asset, Definitions
@asset
def raw_xs() -> list[int]:
return [1, 2, 3]
@asset
def doubled_xs(raw_xs: list[int]) -> list[int]:
return [x * 2 for x in raw_xs]
defs = Definitions(assets=[raw_xs, doubled_xs])
Output: both styles still run; the asset style is the canonical 1.5+ pattern and gets all new UI features (asset lineage graph, materialisation history, asset checks).
@repository → Definitions
Pre-1.5 code declared a @repository; modern code exports a Definitions object as the top-level symbol.
# Pre-1.5
from dagster import repository
@repository
def repo():
return [my_job, my_schedule]
# 1.5+
from dagster import Definitions
defs = Definitions(jobs=[my_job], schedules=[my_schedule])
Output: the Definitions object replaces the decorator; the webserver discovers it via workspace.yaml.
Plugin compatibility ranges
Each dagster-* plugin pins a tight compatible range on core. After every dagster upgrade:
pip install -U dagster dagster-webserver dagster-dbt dagster-aws dagster-snowflake
dagster instance migrate
Output: all plugins move together; the metadata schema migrates before any process starts. Drift between core and plugin produces import-time errors that look like API changes but are actually compatibility-pin mismatches.
Removed surfaces to audit
pipeline→job— the@pipelinedecorator was removed in1.0. Equivalent is@graph+.to_job().solid→op— the@soliddecorator was renamed to@opin0.13. Old tutorials still show@solid.DagsterInstance.local_temp— removed; useDagsterInstance.ephemeral()for tests.Output.metadata_entries— replaced byOutput.metadata(a dict, not a list of typed objects).
Performance tuning
Dagster's overhead is per-asset-materialisation, not per-line-of-Python. Optimise by reducing materialisation boundaries — small frequent materialisations of cheap assets cost more than a single materialisation of a fat asset.
Multi-asset for shared computation
When several assets share most of their computation, declare them as a @multi_asset so the body runs once and emits multiple outputs.
from dagster import multi_asset, AssetOut
@multi_asset(outs={
"users": AssetOut(),
"orders": AssetOut(),
})
def users_and_orders():
df = expensive_query()
return df[df["type"] == "user"], df[df["type"] == "order"]
Output: one body invocation, two materialisations recorded. The UI shows both assets as siblings with shared lineage.
Asset concurrency limits
Tag-based concurrency limits cap parallelism across the workspace — useful when a downstream system (warehouse, API) has its own bottleneck.
# dagster.yaml
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
tag_concurrency_limits:
- key: "dagster/concurrency_key"
value: "snowflake"
limit: 4
@asset(op_tags={"dagster/concurrency_key": "snowflake"})
def warehouse_export(): ...
Output: at most 4 warehouse_export-tagged runs execute concurrently; the rest queue.
IO manager selection for large data
Default I/O managers pickle the asset return value. For multi-GB pandas/polars frames, switch to a columnar I/O manager (dagster-snowflake-pandas, dagster-polars) or write to the warehouse directly and return a reference.
from dagster import asset, AssetExecutionContext
@asset(io_manager_key="snowflake_io_manager")
def large_table(context: AssetExecutionContext) -> pd.DataFrame:
return huge_dataframe # writes to Snowflake, doesn't pickle
Output: Dagster never holds the frame in memory across asset boundaries; the I/O manager handles persistence.
Sensor cadence
Sensors default to a 30 s tick. For event-driven cases that need faster reaction (file landing in S3, message in a queue), lower the interval:
from dagster import sensor
@sensor(target=daily_events, minimum_interval_seconds=10)
def fast_sensor(context): ...
Output: the daemon ticks the sensor every 10 s; below that the daemon's own loop overhead dominates.
Testing strategies
Dagster's testing primitives are materialize([...]) for assets and JobDefinition.execute_in_process() for jobs. Both run in-process against an ephemeral DagsterInstance.
import pytest
from dagster import materialize, DagsterInstance
from my_pipelines.assets import raw_users, clean_users, user_count
def test_asset_graph():
result = materialize([raw_users, clean_users, user_count])
assert result.success
df = result.output_for_node("clean_users")
assert df["name"].iloc[0] == "Alice"
def test_partitioned():
result = materialize(
[daily_events],
partition_key="2026-01-15",
)
assert result.success
assert result.output_for_node("daily_events") == 42
def test_with_mock_resource():
from unittest.mock import MagicMock
mock_db = MagicMock()
mock_db.query.return_value = [{"id": 1, "name": "test"}]
result = materialize([raw_users], resources={"db": mock_db})
assert result.success
Output: all three tests run in milliseconds against an ephemeral SQLite instance; no daemon, no webserver, no Postgres.
Key patterns:
materialize([...])— the primary asset-test primitive. Returns anExecuteInProcessResultwithoutput_for_node,success,dagster_events.partition_key="…"— for partitioned assets, supply the partition under test.resources={...}— override resources with mocks for unit isolation.DagsterInstance.ephemeral()— explicit ephemeral instance when you need to assert on event log state across multiple materialisations.
See also
- Python: dagster — assets, jobs, schedules, sensors, resources
- Packages: pip-prefect — the flow/task-oriented alternative