cheat sheet

dagster

Build, schedule, and observe data pipelines as software-defined assets with Dagster. Covers assets, jobs, schedules, sensors, resources, partitions, and the Dagster UI.

dagster — Modern Data Orchestration

What it is

Dagster is a data orchestration platform centred on software-defined assets — Python objects (DataFrames, models, files, database tables) whose production logic you declare alongside the asset itself. Instead of thinking in terms of "tasks that run," you declare "what data should exist and how to produce it." Dagster derives the execution graph from asset dependencies, supports partitioning, and ships a full observability UI. It is the modern alternative to Airflow for Python-first teams.

Install

bash
pip install dagster dagster-webserver

Output: (none — exits 0 on success)

Quick example

python
from dagster import asset, Definitions, materialize

@asset
def raw_data() -> list[dict]:
    """Simulated CSV load."""
    return [
        {"name": "Alice", "score": 92},
        {"name": "Bob",   "score": 78},
        {"name": "Carol", "score": 85},
    ]

@asset
def high_scorers(raw_data: list[dict]) -> list[dict]:
    """Filter to scores above 80."""
    return [r for r in raw_data if r["score"] > 80]

defs = Definitions(assets=[raw_data, high_scorers])

# Materialise both assets locally
result = materialize([raw_data, high_scorers])
print(result.output_for_node("high_scorers"))

Output:

text
[{'name': 'Alice', 'score': 92}, {'name': 'Carol', 'score': 85}]

When / why to use it

  • Data pipelines where lineage, reproducibility, and observability matter — asset graph makes dependencies explicit.
  • ETL / ELT pipelines over databases, files, or APIs.
  • ML feature pipelines where intermediate artefacts need versioning and re-use.
  • Teams migrating from Airflow who want stronger typing and a better local dev story.
  • Partitioned pipelines: daily, weekly, or by-category processing with automatic backfill.

Common pitfalls

Asset function return type must be serialisable — Dagster stores asset outputs in an I/O manager. The default I/O manager uses pickle. For large DataFrames, configure a Parquet or Arrow I/O manager instead.

@op vs @asset@op is the lower-level "task" primitive (Dagster's Airflow equivalent). @asset is the higher-level "declare what exists" primitive. Prefer @asset for new pipelines; @op for complex fan-out/fan-in graphs that don't map to named data artefacts.

Definitions must be importable at module level — the dagster dev server and scheduler import your code to discover assets. Avoid side effects in module-level code; keep heavy initialisation inside functions or resources.

dagster dev launches the local UI at http://localhost:3000. Use it during development to visualise the asset graph, trigger materialisations, and inspect logs — it is significantly better than reading logs from stdout.

Use AssetIn and Output to attach metadata (row count, schema, file size) to asset outputs. The UI displays this metadata alongside the materialisation event for free observability.

Software-defined assets

Assets are the core Dagster primitive. Each @asset function declares one data object and optionally depends on others by naming them as parameters.

python
import pandas as pd
from dagster import asset, AssetIn, Output, MetadataValue

@asset(group_name="raw")
def raw_orders() -> pd.DataFrame:
    """Load orders from CSV."""
    df = pd.read_csv("orders.csv")
    return df

@asset(group_name="processed", ins={"raw_orders": AssetIn()})
def cleaned_orders(raw_orders: pd.DataFrame) -> Output[pd.DataFrame]:
    """Remove invalid rows and normalise column names."""
    df = raw_orders.dropna(subset=["order_id", "amount"])
    df.columns = df.columns.str.lower().str.replace(" ", "_")
    return Output(
        df,
        metadata={
            "row_count": MetadataValue.int(len(df)),
            "columns": MetadataValue.text(str(list(df.columns))),
        },
    )

@asset(group_name="processed")
def revenue_by_region(cleaned_orders: pd.DataFrame) -> pd.DataFrame:
    """Aggregate revenue per region."""
    return cleaned_orders.groupby("region")["amount"].sum().reset_index()

Resources — injectable clients and connections

Resources are reusable configured objects (database connections, API clients, file system handles) that are injected into asset and op functions. Define them once and share across all assets.

python
import pandas as pd
from dagster import asset, Definitions, EnvVar
from dagster_duckdb import DuckDBResource

@asset
def sales_summary(duckdb: DuckDBResource) -> pd.DataFrame:
    """Query the DuckDB warehouse."""
    with duckdb.get_connection() as conn:
        return conn.execute(
            "SELECT region, SUM(amount) AS total FROM sales GROUP BY region"
        ).df()

defs = Definitions(
    assets=[sales_summary],
    resources={
        "duckdb": DuckDBResource(database=EnvVar("DUCKDB_PATH")),
    },
)

Jobs — targeted execution graphs

A job selects a subset of assets (or ops) and their configuration for a single run. Use jobs to run different asset subsets on different schedules or with different configs.

python
from dagster import job, op, In, Out

@op
def fetch_data() -> list[dict]:
    return [{"id": 1, "value": 42}, {"id": 2, "value": 7}]

@op(ins={"data": In()})
def filter_data(data: list[dict]) -> list[dict]:
    return [d for d in data if d["value"] > 10]

@op(ins={"data": In()})
def save_data(data: list[dict]) -> None:
    print(f"Saving {len(data)} records")

@job
def my_pipeline():
    save_data(filter_data(fetch_data()))

For asset-based jobs:

python
from dagster import define_asset_job, AssetSelection

# Run all assets in the "processed" group
processed_job = define_asset_job("processed_job", AssetSelection.groups("processed"))

Schedules

A schedule triggers a job on a cron expression.

python
from dagster import ScheduleDefinition, define_asset_job, AssetSelection, Definitions

daily_job = define_asset_job("daily_pipeline", AssetSelection.all())

daily_schedule = ScheduleDefinition(
    job=daily_job,
    cron_schedule="0 6 * * *",   # 6 AM UTC daily
    execution_timezone="UTC",
)

defs = Definitions(
    assets=[raw_orders, cleaned_orders, revenue_by_region],
    jobs=[daily_job],
    schedules=[daily_schedule],
)

Sensors — event-driven triggers

A sensor polls an external condition (new file, queue message, API event) and triggers a run when the condition is met.

python
import os
from dagster import sensor, RunRequest, SensorEvaluationContext, define_asset_job, AssetSelection, Definitions

ingest_job = define_asset_job("ingest_job", AssetSelection.all())

@sensor(job=ingest_job, minimum_interval_seconds=30)
def new_file_sensor(context: SensorEvaluationContext):
    """Trigger a run when a new CSV appears in the inbox directory."""
    known_files = set(context.cursor.split(",")) if context.cursor else set()
    inbox_files = {f for f in os.listdir("./inbox") if f.endswith(".csv")}
    new_files = inbox_files - known_files

    for filename in new_files:
        yield RunRequest(run_key=filename, run_config={"filename": filename})

    context.update_cursor(",".join(inbox_files))

Partitions — time and category slicing

Partitions let you process data in named slices (dates, categories) and backfill or re-run individual partitions independently.

python
from dagster import DailyPartitionsDefinition, asset, Definitions

daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")

@asset(partitions_def=daily_partitions, group_name="raw")
def daily_events(context) -> list[dict]:
    """Load events for the given partition date."""
    partition_date = context.partition_key   # "2026-01-15"
    # Load data for this specific date
    return [{"date": partition_date, "events": 1000}]

@asset(partitions_def=daily_partitions, group_name="processed")
def daily_summary(daily_events: list[dict]) -> dict:
    return {"total": sum(e["events"] for e in daily_events)}

Materialise a single partition:

bash
dagster asset materialize --select daily_events --partition 2026-01-15

Output: (none — exits 0 on success)

Config — parameterising runs

python
from dagster import asset, Config, Definitions

class FilterConfig(Config):
    min_score: int = 80
    region: str = "East"

@asset
def filtered_sales(config: FilterConfig) -> list[dict]:
    """Return sales filtered by config parameters."""
    data = [
        {"region": "East", "score": 92},
        {"region": "West", "score": 78},
        {"region": "East", "score": 85},
    ]
    return [
        d for d in data
        if d["score"] >= config.min_score and d["region"] == config.region
    ]

defs = Definitions(assets=[filtered_sales])

Launch with config:

python
from dagster import materialize

result = materialize(
    [filtered_sales],
    run_config={"ops": {"filtered_sales": {"config": {"min_score": 85, "region": "East"}}}},
)

Testing assets

python
from dagster import materialize, build_asset_context
from my_pipeline import raw_orders, cleaned_orders

def test_cleaned_orders():
    import pandas as pd
    sample = pd.DataFrame({
        "Order_ID": [1, None, 3],
        "Amount":   [100.0, 200.0, None],
        "Region":   ["East", "West", "East"],
    })
    context = build_asset_context()
    result = cleaned_orders(context, raw_orders=sample)
    # None in Order_ID and Amount should be dropped
    assert len(result.value) == 1
    assert "order_id" in result.value.columns   # normalised to lowercase

def test_pipeline_end_to_end():
    result = materialize([raw_orders, cleaned_orders])
    assert result.success
    df = result.output_for_node("cleaned_orders")
    assert len(df) > 0

Launching the UI

bash
dagster dev            # starts UI at http://localhost:3000

Output: (none — exits 0 on success)

The UI shows the full asset graph, per-asset materialisation history, run logs, schedules, sensors, and partition status.

Real-world recipes

A handful of end-to-end pipelines that show how assets, schedules, sensors, partitions, and resources compose in practice.

1. Daily-partitioned ETL with backfill

python
import pandas as pd
from dagster import (
    asset, DailyPartitionsDefinition, Definitions, ScheduleDefinition,
    define_asset_job, AssetSelection, MetadataValue, Output,
)

daily = DailyPartitionsDefinition(start_date="2026-01-01")

@asset(partitions_def=daily, group_name="raw")
def raw_events(context) -> Output[pd.DataFrame]:
    """Load events for the given partition date from object storage."""
    date = context.partition_key
    df = pd.read_parquet(f"s3://bucket/events/dt={date}/")
    return Output(df, metadata={"rows": MetadataValue.int(len(df)),
                                "date": MetadataValue.text(date)})

@asset(partitions_def=daily, group_name="curated")
def daily_revenue(raw_events: pd.DataFrame) -> Output[pd.DataFrame]:
    out = raw_events.groupby("country")["amount"].sum().reset_index()
    return Output(out, metadata={"countries": MetadataValue.int(len(out))})

etl_job = define_asset_job(
    "etl_daily",
    AssetSelection.keys("raw_events", "daily_revenue"),
    partitions_def=daily,
)

defs = Definitions(
    assets=[raw_events, daily_revenue],
    jobs=[etl_job],
    schedules=[ScheduleDefinition(job=etl_job, cron_schedule="0 4 * * *")],
)

Output: (none — exits 0 on success)

Backfill a date range from the CLI:

bash
dagster asset backfill --select raw_events,daily_revenue --from 2026-01-01 --to 2026-01-31

Output: (none — exits 0 on success)

2. Sensor-driven ingest pipeline

python
import os
from dagster import (
    asset, sensor, RunRequest, SensorEvaluationContext,
    define_asset_job, AssetSelection, Definitions,
)

@asset
def ingest_file(context):
    path = context.run_config["resources"]["io"]["config"]["path"]
    context.log.info(f"Ingesting {path}")
    return {"path": path, "ok": True}

ingest_job = define_asset_job("ingest_job", AssetSelection.keys("ingest_file"))

@sensor(job=ingest_job, minimum_interval_seconds=15)
def inbox_sensor(context: SensorEvaluationContext):
    seen = set((context.cursor or "").split(",")) - {""}
    current = {f for f in os.listdir("./inbox") if f.endswith(".jsonl")}
    for new in (current - seen):
        yield RunRequest(
            run_key=new,
            run_config={"resources": {"io": {"config": {"path": new}}}},
            tags={"source": "inbox"},
        )
    context.update_cursor(",".join(sorted(current)))

defs = Definitions(assets=[ingest_file], jobs=[ingest_job], sensors=[inbox_sensor])

3. Asset checks — built-in data quality

python
from dagster import asset, asset_check, AssetCheckResult, AssetCheckExecutionContext
import pandas as pd

@asset
def orders() -> pd.DataFrame:
    return pd.DataFrame({"id": [1, 2, 3], "amount": [10.0, 20.0, 30.0]})

@asset_check(asset=orders)
def orders_amount_positive(orders: pd.DataFrame) -> AssetCheckResult:
    bad = (orders["amount"] <= 0).sum()
    return AssetCheckResult(
        passed=bad == 0,
        metadata={"bad_rows": int(bad)},
    )

@asset_check(asset=orders)
def orders_no_duplicates(orders: pd.DataFrame) -> AssetCheckResult:
    dups = orders["id"].duplicated().sum()
    return AssetCheckResult(passed=dups == 0, metadata={"dup_count": int(dups)})

4. Multi-partition asset (date × region)

python
from dagster import (
    asset, MultiPartitionsDefinition, StaticPartitionsDefinition,
    DailyPartitionsDefinition,
)

regions = StaticPartitionsDefinition(["us", "eu", "apac"])
date_part = DailyPartitionsDefinition(start_date="2026-01-01")

mp = MultiPartitionsDefinition({"date": date_part, "region": regions})

@asset(partitions_def=mp)
def regional_events(context):
    keys = context.partition_key.keys_by_dimension
    return {"date": keys["date"], "region": keys["region"], "events": 100}

Production deployment

Production dagster runs three processes per code location: the webserver (UI + GraphQL), the daemon (schedules, sensors, run queue), and one or more executors (typically Kubernetes, Docker, or a process pool).

python
# definitions.py — the entrypoint module
from dagster import Definitions, ScheduleDefinition, define_asset_job, AssetSelection
from my_project.assets import raw_events, daily_revenue
from my_project.resources import duckdb_resource

defs = Definitions(
    assets=[raw_events, daily_revenue],
    resources={"duckdb": duckdb_resource},
    jobs=[define_asset_job("nightly", AssetSelection.all())],
    schedules=[ScheduleDefinition(
        job_name="nightly",
        cron_schedule="0 2 * * *",
        execution_timezone="UTC",
    )],
)

Output: (none — exits 0 on success)

Run the three processes (typical systemd / Docker layout):

bash
# Webserver (UI)
dagster-webserver -m my_project.definitions --host 0.0.0.0 --port 3000

# Daemon (schedules + sensors + run queue)
dagster-daemon run -m my_project.definitions

# Optional: dedicated worker for K8s/Docker executor
dagster-worker -m my_project.definitions

Output: (none — exits 0 on success)

dagster.yaml — the instance config that webserver and daemon share:

yaml
storage:
  postgres:
    postgres_db:
      hostname: db.internal
      username: dagster
      password:
        env: PG_PASSWORD
      db_name: dagster
      port: 5432

run_launcher:
  module: dagster_k8s
  class: K8sRunLauncher
  config:
    service_account_name: dagster-runner
    job_namespace: dagster

scheduler:
  module: dagster.core.scheduler
  class: DagsterDaemonScheduler

run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 10

Deploy-checklist:

  1. Pin dagster and dagster-webserver to the same version. They are co-released and version-skew breaks the GraphQL contract.
  2. Use Postgres for run storage in production, never SQLite — the default SQLite store does not survive concurrent writes.
  3. Run the daemon as a singleton. Multiple daemons fight over schedule ticks.
  4. Give the daemon ≥ 30s startup time before opening the webserver to traffic — schedules register on daemon boot.
  5. Set MAX_CONCURRENT_RUNS on the run coordinator. Without it, schedule pile-up can DoS your warehouse.

Testing patterns

Dagster's testability is a major reason teams pick it. Every primitive — asset, op, resource, job — can be invoked directly without spinning up the daemon.

python
import pandas as pd
import pytest
from dagster import materialize, build_asset_context, ResourceParam

from my_project.assets import raw_orders, cleaned_orders, revenue_by_region

# 1. Unit-test an asset function directly
def test_cleaned_orders_drops_nulls():
    raw = pd.DataFrame({"Order_ID": [1, None, 3], "Amount": [10.0, 20.0, None]})
    ctx = build_asset_context()
    out = cleaned_orders(ctx, raw_orders=raw)
    assert len(out.value) == 1
    assert "order_id" in out.value.columns

# 2. End-to-end materialise — runs all upstream deps in-memory
def test_pipeline_materialize_in_memory(tmp_path):
    (tmp_path / "orders.csv").write_text(
        "Order_ID,Amount,Region\n1,100,East\n2,,East\n"
    )
    result = materialize(
        [raw_orders, cleaned_orders, revenue_by_region],
        resources={"path": str(tmp_path / "orders.csv")},
    )
    assert result.success
    df = result.output_for_node("revenue_by_region")
    assert df["amount"].sum() == 100

# 3. Override resources for testing
def test_with_mock_db():
    class MockDB:
        def query(self, sql): return [{"value": 42}]
    result = materialize([raw_orders], resources={"db": MockDB()})
    assert result.success

Output: (none — exits 0 on success)

Patterns:

  • build_asset_context() mocks the asset run context for unit tests — no daemon required.
  • materialize([...]) materialises a subgraph in-process; ideal for pytest.
  • Resource overrides let you swap real I/O managers for in-memory fakes without changing asset code.
  • Asset checks can be tested independently via execute_in_process on the underlying job.
  • Run with pytest -m "not slow" and tag long materialisations with @pytest.mark.slow.

Migration from older patterns

Dagster has had two major API shifts. Most production code today uses the Definitions module pattern; if you maintain pre-1.0 code, migrate.

PatternYear introducedStatus
@repository decorator2020Removed in 1.5; migrate to Definitions
@asset decorator2022Current; use as default
Definitions(...) module-level object2023Current; replaces @repository
Asset graphs (@graph_asset)2024Current; for op-level fan-out within an asset
Asset checks (@asset_check)2024Current; replaces ad-hoc validation ops

Repository → Definitions:

python
# OLD — pre-1.5
from dagster import repository

@repository
def my_repo():
    return [my_asset, my_job, my_schedule]

# NEW — 1.5+
from dagster import Definitions
defs = Definitions(
    assets=[my_asset],
    jobs=[my_job],
    schedules=[my_schedule],
)

Migrating from Airflow:

Airflow conceptDagster equivalent
DAGDefinitions (one per code location)
Operator / Task@op (or @asset for data-centric)
XCom push/pullAsset return values + I/O managers
dag.schedule_interval="0 6 * * *"ScheduleDefinition(cron_schedule="0 6 * * *")
PythonOperatorplain @op / @asset function
BranchPythonOperatorconditional yield Output from a multi-output op
SubDAG@graph_asset or nested job
ConnectionsResource classes
VariablesEnvVar or resource config

Ecosystem integrations

Dagster ships a large library of first-party integrations. Each is a separate pip package and exposes resources, assets, or I/O managers.

PackagePurpose
dagster-dbtSurfaces dbt models as Dagster assets; auto-derives lineage
dagster-airbyteTriggers Airbyte syncs; models connections as assets
dagster-snowflakeI/O manager: Pandas/Polars ↔ Snowflake tables
dagster-databricksRun Databricks jobs as Dagster ops; mount notebooks
dagster-awsS3 I/O manager, EMR ops, ECS task launcher
dagster-duckdbDuckDB resource + Pandas/Polars I/O manager
dagster-pandasAdds Pandas DataFrame type checks
dagster-polarsPolars-native I/O managers (Parquet, Delta)
dagster-k8sPer-run Kubernetes Job launcher
dagster-openai / dagster-anthropicLLM resources, optional cost tracking

Troubleshooting common errors

ErrorCauseFix
Could not find Definitions objectDaemon/webserver can't import your moduleCheck -m my.module matches a real importable path with a top-level defs
Asset has no upstream dependencies (warning)Asset name mismatch with parameter nameParameter names must match the upstream asset's name exactly
RunWorker exited unexpectedlyOOM or unhandled exception in the launcherInspect dagster.log; bump K8s job memory; wrap heavy ops in try/except
Schedules never fireDaemon not running, or daemon clock skewdagster-daemon run; verify execution_timezone
Sensor stuck in evaluatingSensor function blocks > minimum_interval_secondsMove work off the sensor — RunRequest should fire fast
DagsterInvalidConfigError at startupResource/asset config schema mismatchRun dagster definitions validate before deploying
UI shows asset as "Materializing..." foreverWorker died without reportingInspect logs; clear stale runs with dagster run wipe-by-id
PostgresEventLogStorage: relation does not existDatabase not migrateddagster instance migrate

Performance tuning

Dagster's overhead grows with asset count and run rate. For large deployments tune the run coordinator, executor, and daemon cadence.

python
# definitions.py
from dagster import Definitions, multiprocess_executor, in_process_executor

defs = Definitions(
    assets=[...],
    executor=multiprocess_executor.configured({
        "max_concurrent": 8,        # cap on parallel ops within a single run
    }),
)

Performance checklist:

  1. Sensor cadence: raise minimum_interval_seconds to ≥30s. Sub-second sensors thrash the daemon.
  2. Schedule explosions: if you have N partitioned assets × M schedules, that's N×M ticks. Use define_asset_job with AssetSelection to batch.
  3. I/O manager choice: the default pickle I/O manager is fine for small Python objects. For DataFrames > 100 MB, swap to ParquetIOManager or S3PickleIOManager.
  4. In-process executor for tiny graphs: executor=in_process_executor skips multiprocessing overhead for ops that complete in < 1s.
  5. Postgres connection pool: default is 5 connections; raise to ≥20 if your daemon is contending.

Patterns & idioms

  • Group assets by data layer. group_name="raw", group_name="staging", group_name="marts" makes the UI lineage view legible.
  • Asset key prefixes for multi-environment. AssetKey(["prod", "users"]) vs AssetKey(["staging", "users"]) lets one definitions module serve multiple environments.
  • Use freshness_checks for SLA monitoring. Dagster fails the check when an asset isn't materialised within a window.
  • Code locations as deploy boundaries. One code location per team or service; deploys are independent.
  • Sensors emit RunRequest, never run work themselves. A sensor that blocks > 30s blocks all other sensors in the daemon.
  • Resource configuration via EnvVar instead of hard-coded secrets — Dagster Cloud and the local daemon both honour env-var indirection.

When NOT to use this

Dagster is excellent for pipeline orchestration but is over-engineered for many scripts. Skip it when:

  • A single cron-triggered script suffices. A systemd timer or GitHub Actions cron does not need Dagster.
  • You don't track lineage. Dagster's value is asset lineage and observability; without those, the cost of running three daemons is dead weight.
  • Sub-second event processing. Dagster's tick cadence is seconds at minimum. Use Kafka Streams, Faust, or a queue worker.
  • You need a workflow service to back a user-facing API. Temporal or Prefect lean further into request-driven orchestration.
  • The pipeline is exclusively SQL. dbt is leaner; you can still wrap it with dagster-dbt later if observability becomes a need.

Quick reference

TaskCode
Define asset@asset def my_asset(...) -> T:
Asset dependencyparameter name matches upstream asset name
Asset with metadatareturn Output(value, metadata={"rows": MetadataValue.int(n)})
Group assets@asset(group_name="raw")
Resource@asset def fn(db: MyResource) + Definitions(resources={"db": MyResource(...)})
Define jobdefine_asset_job("name", AssetSelection.groups("g"))
ScheduleScheduleDefinition(job=job, cron_schedule="0 6 * * *")
Sensor@sensor(job=job) def s(context): yield RunRequest(run_key=...)
Partition@asset(partitions_def=DailyPartitionsDefinition(...))
Configclass C(Config): field: type@asset def fn(config: C)
Materialise locallymaterialize([asset1, asset2])
Run UIdagster dev
Run jobdagster job execute -j job_name