cheat sheet
dagster
Build, schedule, and observe data pipelines as software-defined assets with Dagster. Covers assets, jobs, schedules, sensors, resources, partitions, and the Dagster UI.
dagster — Modern Data Orchestration
What it is
Dagster is a data orchestration platform centred on software-defined assets — Python objects (DataFrames, models, files, database tables) whose production logic you declare alongside the asset itself. Instead of thinking in terms of "tasks that run," you declare "what data should exist and how to produce it." Dagster derives the execution graph from asset dependencies, supports partitioning, and ships a full observability UI. It is the modern alternative to Airflow for Python-first teams.
Install
pip install dagster dagster-webserver
Output: (none — exits 0 on success)
Quick example
from dagster import asset, Definitions, materialize
@asset
def raw_data() -> list[dict]:
"""Simulated CSV load."""
return [
{"name": "Alice", "score": 92},
{"name": "Bob", "score": 78},
{"name": "Carol", "score": 85},
]
@asset
def high_scorers(raw_data: list[dict]) -> list[dict]:
"""Filter to scores above 80."""
return [r for r in raw_data if r["score"] > 80]
defs = Definitions(assets=[raw_data, high_scorers])
# Materialise both assets locally
result = materialize([raw_data, high_scorers])
print(result.output_for_node("high_scorers"))
Output:
[{'name': 'Alice', 'score': 92}, {'name': 'Carol', 'score': 85}]
When / why to use it
- Data pipelines where lineage, reproducibility, and observability matter — asset graph makes dependencies explicit.
- ETL / ELT pipelines over databases, files, or APIs.
- ML feature pipelines where intermediate artefacts need versioning and re-use.
- Teams migrating from Airflow who want stronger typing and a better local dev story.
- Partitioned pipelines: daily, weekly, or by-category processing with automatic backfill.
Common pitfalls
Asset function return type must be serialisable — Dagster stores asset outputs in an I/O manager. The default I/O manager uses pickle. For large DataFrames, configure a Parquet or Arrow I/O manager instead.
@opvs@asset—@opis the lower-level "task" primitive (Dagster's Airflow equivalent).@assetis the higher-level "declare what exists" primitive. Prefer@assetfor new pipelines;@opfor complex fan-out/fan-in graphs that don't map to named data artefacts.
Definitionsmust be importable at module level — thedagster devserver and scheduler import your code to discover assets. Avoid side effects in module-level code; keep heavy initialisation inside functions or resources.
dagster devlaunches the local UI athttp://localhost:3000. Use it during development to visualise the asset graph, trigger materialisations, and inspect logs — it is significantly better than reading logs from stdout.
Use
AssetInandOutputto attach metadata (row count, schema, file size) to asset outputs. The UI displays this metadata alongside the materialisation event for free observability.
Software-defined assets
Assets are the core Dagster primitive. Each @asset function declares one data object and optionally depends on others by naming them as parameters.
import pandas as pd
from dagster import asset, AssetIn, Output, MetadataValue
@asset(group_name="raw")
def raw_orders() -> pd.DataFrame:
"""Load orders from CSV."""
df = pd.read_csv("orders.csv")
return df
@asset(group_name="processed", ins={"raw_orders": AssetIn()})
def cleaned_orders(raw_orders: pd.DataFrame) -> Output[pd.DataFrame]:
"""Remove invalid rows and normalise column names."""
df = raw_orders.dropna(subset=["order_id", "amount"])
df.columns = df.columns.str.lower().str.replace(" ", "_")
return Output(
df,
metadata={
"row_count": MetadataValue.int(len(df)),
"columns": MetadataValue.text(str(list(df.columns))),
},
)
@asset(group_name="processed")
def revenue_by_region(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
"""Aggregate revenue per region."""
return cleaned_orders.groupby("region")["amount"].sum().reset_index()
Resources — injectable clients and connections
Resources are reusable configured objects (database connections, API clients, file system handles) that are injected into asset and op functions. Define them once and share across all assets.
import pandas as pd
from dagster import asset, Definitions, EnvVar
from dagster_duckdb import DuckDBResource
@asset
def sales_summary(duckdb: DuckDBResource) -> pd.DataFrame:
"""Query the DuckDB warehouse."""
with duckdb.get_connection() as conn:
return conn.execute(
"SELECT region, SUM(amount) AS total FROM sales GROUP BY region"
).df()
defs = Definitions(
assets=[sales_summary],
resources={
"duckdb": DuckDBResource(database=EnvVar("DUCKDB_PATH")),
},
)
Jobs — targeted execution graphs
A job selects a subset of assets (or ops) and their configuration for a single run. Use jobs to run different asset subsets on different schedules or with different configs.
from dagster import job, op, In, Out
@op
def fetch_data() -> list[dict]:
return [{"id": 1, "value": 42}, {"id": 2, "value": 7}]
@op(ins={"data": In()})
def filter_data(data: list[dict]) -> list[dict]:
return [d for d in data if d["value"] > 10]
@op(ins={"data": In()})
def save_data(data: list[dict]) -> None:
print(f"Saving {len(data)} records")
@job
def my_pipeline():
save_data(filter_data(fetch_data()))
For asset-based jobs:
from dagster import define_asset_job, AssetSelection
# Run all assets in the "processed" group
processed_job = define_asset_job("processed_job", AssetSelection.groups("processed"))
Schedules
A schedule triggers a job on a cron expression.
from dagster import ScheduleDefinition, define_asset_job, AssetSelection, Definitions
daily_job = define_asset_job("daily_pipeline", AssetSelection.all())
daily_schedule = ScheduleDefinition(
job=daily_job,
cron_schedule="0 6 * * *", # 6 AM UTC daily
execution_timezone="UTC",
)
defs = Definitions(
assets=[raw_orders, cleaned_orders, revenue_by_region],
jobs=[daily_job],
schedules=[daily_schedule],
)
Sensors — event-driven triggers
A sensor polls an external condition (new file, queue message, API event) and triggers a run when the condition is met.
import os
from dagster import sensor, RunRequest, SensorEvaluationContext, define_asset_job, AssetSelection, Definitions
ingest_job = define_asset_job("ingest_job", AssetSelection.all())
@sensor(job=ingest_job, minimum_interval_seconds=30)
def new_file_sensor(context: SensorEvaluationContext):
"""Trigger a run when a new CSV appears in the inbox directory."""
known_files = set(context.cursor.split(",")) if context.cursor else set()
inbox_files = {f for f in os.listdir("./inbox") if f.endswith(".csv")}
new_files = inbox_files - known_files
for filename in new_files:
yield RunRequest(run_key=filename, run_config={"filename": filename})
context.update_cursor(",".join(inbox_files))
Partitions — time and category slicing
Partitions let you process data in named slices (dates, categories) and backfill or re-run individual partitions independently.
from dagster import DailyPartitionsDefinition, asset, Definitions
daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=daily_partitions, group_name="raw")
def daily_events(context) -> list[dict]:
"""Load events for the given partition date."""
partition_date = context.partition_key # "2026-01-15"
# Load data for this specific date
return [{"date": partition_date, "events": 1000}]
@asset(partitions_def=daily_partitions, group_name="processed")
def daily_summary(daily_events: list[dict]) -> dict:
return {"total": sum(e["events"] for e in daily_events)}
Materialise a single partition:
dagster asset materialize --select daily_events --partition 2026-01-15
Output: (none — exits 0 on success)
Config — parameterising runs
from dagster import asset, Config, Definitions
class FilterConfig(Config):
min_score: int = 80
region: str = "East"
@asset
def filtered_sales(config: FilterConfig) -> list[dict]:
"""Return sales filtered by config parameters."""
data = [
{"region": "East", "score": 92},
{"region": "West", "score": 78},
{"region": "East", "score": 85},
]
return [
d for d in data
if d["score"] >= config.min_score and d["region"] == config.region
]
defs = Definitions(assets=[filtered_sales])
Launch with config:
from dagster import materialize
result = materialize(
[filtered_sales],
run_config={"ops": {"filtered_sales": {"config": {"min_score": 85, "region": "East"}}}},
)
Testing assets
from dagster import materialize, build_asset_context
from my_pipeline import raw_orders, cleaned_orders
def test_cleaned_orders():
import pandas as pd
sample = pd.DataFrame({
"Order_ID": [1, None, 3],
"Amount": [100.0, 200.0, None],
"Region": ["East", "West", "East"],
})
context = build_asset_context()
result = cleaned_orders(context, raw_orders=sample)
# None in Order_ID and Amount should be dropped
assert len(result.value) == 1
assert "order_id" in result.value.columns # normalised to lowercase
def test_pipeline_end_to_end():
result = materialize([raw_orders, cleaned_orders])
assert result.success
df = result.output_for_node("cleaned_orders")
assert len(df) > 0
Launching the UI
dagster dev # starts UI at http://localhost:3000
Output: (none — exits 0 on success)
The UI shows the full asset graph, per-asset materialisation history, run logs, schedules, sensors, and partition status.
Real-world recipes
A handful of end-to-end pipelines that show how assets, schedules, sensors, partitions, and resources compose in practice.
1. Daily-partitioned ETL with backfill
import pandas as pd
from dagster import (
asset, DailyPartitionsDefinition, Definitions, ScheduleDefinition,
define_asset_job, AssetSelection, MetadataValue, Output,
)
daily = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=daily, group_name="raw")
def raw_events(context) -> Output[pd.DataFrame]:
"""Load events for the given partition date from object storage."""
date = context.partition_key
df = pd.read_parquet(f"s3://bucket/events/dt={date}/")
return Output(df, metadata={"rows": MetadataValue.int(len(df)),
"date": MetadataValue.text(date)})
@asset(partitions_def=daily, group_name="curated")
def daily_revenue(raw_events: pd.DataFrame) -> Output[pd.DataFrame]:
out = raw_events.groupby("country")["amount"].sum().reset_index()
return Output(out, metadata={"countries": MetadataValue.int(len(out))})
etl_job = define_asset_job(
"etl_daily",
AssetSelection.keys("raw_events", "daily_revenue"),
partitions_def=daily,
)
defs = Definitions(
assets=[raw_events, daily_revenue],
jobs=[etl_job],
schedules=[ScheduleDefinition(job=etl_job, cron_schedule="0 4 * * *")],
)
Output: (none — exits 0 on success)
Backfill a date range from the CLI:
dagster asset backfill --select raw_events,daily_revenue --from 2026-01-01 --to 2026-01-31
Output: (none — exits 0 on success)
2. Sensor-driven ingest pipeline
import os
from dagster import (
asset, sensor, RunRequest, SensorEvaluationContext,
define_asset_job, AssetSelection, Definitions,
)
@asset
def ingest_file(context):
path = context.run_config["resources"]["io"]["config"]["path"]
context.log.info(f"Ingesting {path}")
return {"path": path, "ok": True}
ingest_job = define_asset_job("ingest_job", AssetSelection.keys("ingest_file"))
@sensor(job=ingest_job, minimum_interval_seconds=15)
def inbox_sensor(context: SensorEvaluationContext):
seen = set((context.cursor or "").split(",")) - {""}
current = {f for f in os.listdir("./inbox") if f.endswith(".jsonl")}
for new in (current - seen):
yield RunRequest(
run_key=new,
run_config={"resources": {"io": {"config": {"path": new}}}},
tags={"source": "inbox"},
)
context.update_cursor(",".join(sorted(current)))
defs = Definitions(assets=[ingest_file], jobs=[ingest_job], sensors=[inbox_sensor])
3. Asset checks — built-in data quality
from dagster import asset, asset_check, AssetCheckResult, AssetCheckExecutionContext
import pandas as pd
@asset
def orders() -> pd.DataFrame:
return pd.DataFrame({"id": [1, 2, 3], "amount": [10.0, 20.0, 30.0]})
@asset_check(asset=orders)
def orders_amount_positive(orders: pd.DataFrame) -> AssetCheckResult:
bad = (orders["amount"] <= 0).sum()
return AssetCheckResult(
passed=bad == 0,
metadata={"bad_rows": int(bad)},
)
@asset_check(asset=orders)
def orders_no_duplicates(orders: pd.DataFrame) -> AssetCheckResult:
dups = orders["id"].duplicated().sum()
return AssetCheckResult(passed=dups == 0, metadata={"dup_count": int(dups)})
4. Multi-partition asset (date × region)
from dagster import (
asset, MultiPartitionsDefinition, StaticPartitionsDefinition,
DailyPartitionsDefinition,
)
regions = StaticPartitionsDefinition(["us", "eu", "apac"])
date_part = DailyPartitionsDefinition(start_date="2026-01-01")
mp = MultiPartitionsDefinition({"date": date_part, "region": regions})
@asset(partitions_def=mp)
def regional_events(context):
keys = context.partition_key.keys_by_dimension
return {"date": keys["date"], "region": keys["region"], "events": 100}
Production deployment
Production dagster runs three processes per code location: the webserver (UI + GraphQL), the daemon (schedules, sensors, run queue), and one or more executors (typically Kubernetes, Docker, or a process pool).
# definitions.py — the entrypoint module
from dagster import Definitions, ScheduleDefinition, define_asset_job, AssetSelection
from my_project.assets import raw_events, daily_revenue
from my_project.resources import duckdb_resource
defs = Definitions(
assets=[raw_events, daily_revenue],
resources={"duckdb": duckdb_resource},
jobs=[define_asset_job("nightly", AssetSelection.all())],
schedules=[ScheduleDefinition(
job_name="nightly",
cron_schedule="0 2 * * *",
execution_timezone="UTC",
)],
)
Output: (none — exits 0 on success)
Run the three processes (typical systemd / Docker layout):
# Webserver (UI)
dagster-webserver -m my_project.definitions --host 0.0.0.0 --port 3000
# Daemon (schedules + sensors + run queue)
dagster-daemon run -m my_project.definitions
# Optional: dedicated worker for K8s/Docker executor
dagster-worker -m my_project.definitions
Output: (none — exits 0 on success)
dagster.yaml — the instance config that webserver and daemon share:
storage:
postgres:
postgres_db:
hostname: db.internal
username: dagster
password:
env: PG_PASSWORD
db_name: dagster
port: 5432
run_launcher:
module: dagster_k8s
class: K8sRunLauncher
config:
service_account_name: dagster-runner
job_namespace: dagster
scheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 10
Deploy-checklist:
- Pin
dagsteranddagster-webserverto the same version. They are co-released and version-skew breaks the GraphQL contract. - Use Postgres for run storage in production, never SQLite — the default SQLite store does not survive concurrent writes.
- Run the daemon as a singleton. Multiple daemons fight over schedule ticks.
- Give the daemon ≥ 30s startup time before opening the webserver to traffic — schedules register on daemon boot.
- Set
MAX_CONCURRENT_RUNSon the run coordinator. Without it, schedule pile-up can DoS your warehouse.
Testing patterns
Dagster's testability is a major reason teams pick it. Every primitive — asset, op, resource, job — can be invoked directly without spinning up the daemon.
import pandas as pd
import pytest
from dagster import materialize, build_asset_context, ResourceParam
from my_project.assets import raw_orders, cleaned_orders, revenue_by_region
# 1. Unit-test an asset function directly
def test_cleaned_orders_drops_nulls():
raw = pd.DataFrame({"Order_ID": [1, None, 3], "Amount": [10.0, 20.0, None]})
ctx = build_asset_context()
out = cleaned_orders(ctx, raw_orders=raw)
assert len(out.value) == 1
assert "order_id" in out.value.columns
# 2. End-to-end materialise — runs all upstream deps in-memory
def test_pipeline_materialize_in_memory(tmp_path):
(tmp_path / "orders.csv").write_text(
"Order_ID,Amount,Region\n1,100,East\n2,,East\n"
)
result = materialize(
[raw_orders, cleaned_orders, revenue_by_region],
resources={"path": str(tmp_path / "orders.csv")},
)
assert result.success
df = result.output_for_node("revenue_by_region")
assert df["amount"].sum() == 100
# 3. Override resources for testing
def test_with_mock_db():
class MockDB:
def query(self, sql): return [{"value": 42}]
result = materialize([raw_orders], resources={"db": MockDB()})
assert result.success
Output: (none — exits 0 on success)
Patterns:
build_asset_context()mocks the asset run context for unit tests — no daemon required.materialize([...])materialises a subgraph in-process; ideal for pytest.- Resource overrides let you swap real I/O managers for in-memory fakes without changing asset code.
- Asset checks can be tested independently via
execute_in_processon the underlying job. - Run with
pytest -m "not slow"and tag long materialisations with@pytest.mark.slow.
Migration from older patterns
Dagster has had two major API shifts. Most production code today uses the Definitions module pattern; if you maintain pre-1.0 code, migrate.
| Pattern | Year introduced | Status |
|---|---|---|
@repository decorator | 2020 | Removed in 1.5; migrate to Definitions |
@asset decorator | 2022 | Current; use as default |
Definitions(...) module-level object | 2023 | Current; replaces @repository |
Asset graphs (@graph_asset) | 2024 | Current; for op-level fan-out within an asset |
Asset checks (@asset_check) | 2024 | Current; replaces ad-hoc validation ops |
Repository → Definitions:
# OLD — pre-1.5
from dagster import repository
@repository
def my_repo():
return [my_asset, my_job, my_schedule]
# NEW — 1.5+
from dagster import Definitions
defs = Definitions(
assets=[my_asset],
jobs=[my_job],
schedules=[my_schedule],
)
Migrating from Airflow:
| Airflow concept | Dagster equivalent |
|---|---|
DAG | Definitions (one per code location) |
Operator / Task | @op (or @asset for data-centric) |
XCom push/pull | Asset return values + I/O managers |
dag.schedule_interval="0 6 * * *" | ScheduleDefinition(cron_schedule="0 6 * * *") |
PythonOperator | plain @op / @asset function |
BranchPythonOperator | conditional yield Output from a multi-output op |
SubDAG | @graph_asset or nested job |
| Connections | Resource classes |
| Variables | EnvVar or resource config |
Ecosystem integrations
Dagster ships a large library of first-party integrations. Each is a separate pip package and exposes resources, assets, or I/O managers.
| Package | Purpose |
|---|---|
| dagster-dbt | Surfaces dbt models as Dagster assets; auto-derives lineage |
| dagster-airbyte | Triggers Airbyte syncs; models connections as assets |
| dagster-snowflake | I/O manager: Pandas/Polars ↔ Snowflake tables |
| dagster-databricks | Run Databricks jobs as Dagster ops; mount notebooks |
| dagster-aws | S3 I/O manager, EMR ops, ECS task launcher |
| dagster-duckdb | DuckDB resource + Pandas/Polars I/O manager |
| dagster-pandas | Adds Pandas DataFrame type checks |
| dagster-polars | Polars-native I/O managers (Parquet, Delta) |
| dagster-k8s | Per-run Kubernetes Job launcher |
| dagster-openai / dagster-anthropic | LLM resources, optional cost tracking |
Troubleshooting common errors
| Error | Cause | Fix |
|---|---|---|
Could not find Definitions object | Daemon/webserver can't import your module | Check -m my.module matches a real importable path with a top-level defs |
Asset has no upstream dependencies (warning) | Asset name mismatch with parameter name | Parameter names must match the upstream asset's name exactly |
RunWorker exited unexpectedly | OOM or unhandled exception in the launcher | Inspect dagster.log; bump K8s job memory; wrap heavy ops in try/except |
| Schedules never fire | Daemon not running, or daemon clock skew | dagster-daemon run; verify execution_timezone |
| Sensor stuck in evaluating | Sensor function blocks > minimum_interval_seconds | Move work off the sensor — RunRequest should fire fast |
DagsterInvalidConfigError at startup | Resource/asset config schema mismatch | Run dagster definitions validate before deploying |
| UI shows asset as "Materializing..." forever | Worker died without reporting | Inspect logs; clear stale runs with dagster run wipe-by-id |
PostgresEventLogStorage: relation does not exist | Database not migrated | dagster instance migrate |
Performance tuning
Dagster's overhead grows with asset count and run rate. For large deployments tune the run coordinator, executor, and daemon cadence.
# definitions.py
from dagster import Definitions, multiprocess_executor, in_process_executor
defs = Definitions(
assets=[...],
executor=multiprocess_executor.configured({
"max_concurrent": 8, # cap on parallel ops within a single run
}),
)
Performance checklist:
- Sensor cadence: raise
minimum_interval_secondsto ≥30s. Sub-second sensors thrash the daemon. - Schedule explosions: if you have N partitioned assets × M schedules, that's N×M ticks. Use
define_asset_jobwithAssetSelectionto batch. - I/O manager choice: the default pickle I/O manager is fine for small Python objects. For DataFrames > 100 MB, swap to
ParquetIOManagerorS3PickleIOManager. - In-process executor for tiny graphs:
executor=in_process_executorskips multiprocessing overhead for ops that complete in < 1s. - Postgres connection pool: default is 5 connections; raise to ≥20 if your daemon is contending.
Patterns & idioms
- Group assets by data layer.
group_name="raw",group_name="staging",group_name="marts"makes the UI lineage view legible. - Asset key prefixes for multi-environment.
AssetKey(["prod", "users"])vsAssetKey(["staging", "users"])lets one definitions module serve multiple environments. - Use
freshness_checksfor SLA monitoring. Dagster fails the check when an asset isn't materialised within a window. - Code locations as deploy boundaries. One code location per team or service; deploys are independent.
- Sensors emit
RunRequest, never run work themselves. A sensor that blocks > 30s blocks all other sensors in the daemon. - Resource configuration via
EnvVarinstead of hard-coded secrets — Dagster Cloud and the local daemon both honour env-var indirection.
When NOT to use this
Dagster is excellent for pipeline orchestration but is over-engineered for many scripts. Skip it when:
- A single cron-triggered script suffices. A systemd timer or GitHub Actions cron does not need Dagster.
- You don't track lineage. Dagster's value is asset lineage and observability; without those, the cost of running three daemons is dead weight.
- Sub-second event processing. Dagster's tick cadence is seconds at minimum. Use Kafka Streams, Faust, or a queue worker.
- You need a workflow service to back a user-facing API. Temporal or Prefect lean further into request-driven orchestration.
- The pipeline is exclusively SQL. dbt is leaner; you can still wrap it with
dagster-dbtlater if observability becomes a need.
Quick reference
| Task | Code |
|---|---|
| Define asset | @asset def my_asset(...) -> T: |
| Asset dependency | parameter name matches upstream asset name |
| Asset with metadata | return Output(value, metadata={"rows": MetadataValue.int(n)}) |
| Group assets | @asset(group_name="raw") |
| Resource | @asset def fn(db: MyResource) + Definitions(resources={"db": MyResource(...)}) |
| Define job | define_asset_job("name", AssetSelection.groups("g")) |
| Schedule | ScheduleDefinition(job=job, cron_schedule="0 6 * * *") |
| Sensor | @sensor(job=job) def s(context): yield RunRequest(run_key=...) |
| Partition | @asset(partitions_def=DailyPartitionsDefinition(...)) |
| Config | class C(Config): field: type → @asset def fn(config: C) |
| Materialise locally | materialize([asset1, asset2]) |
| Run UI | dagster dev |
| Run job | dagster job execute -j job_name |