cheat sheet

SQLAlchemy

Package-level reference for SQLAlchemy on PyPI — install variants, dialect drivers, version policy, extras, and alternatives.

SQLAlchemy

What it is

SQLAlchemy is a comprehensive SQL toolkit and Object-Relational Mapper for Python, created by Mike Bayer in 2006. It provides both a low-level Core API (an expression language and connection pool over DB-API) and a higher-level ORM that maps Python classes onto relational tables.

Reach for SQLAlchemy when you need a battle-tested abstraction over multiple SQL dialects with first-class transaction, pooling, and migration support. Reach for a lighter wrapper like sqlmodel or peewee when you want a smaller surface area, or for a query builder like databases when you only need async execution without an ORM.

Install

bash
pip install sqlalchemy

Output: (none — exits 0 on success). The base package supports SQLite out of the box via the stdlib sqlite3 driver.

bash
uv add sqlalchemy

Output: dependency resolved + added to pyproject.toml

bash
poetry add sqlalchemy

Output: updated lockfile + virtualenv install

bash
pip install "sqlalchemy[asyncio,postgresql_psycopg]"

Output: installs SQLAlchemy plus greenlet (async support) and psycopg[binary] (Postgres driver).

Versioning & Python support

  • Current stable line is the 2.x series. 2.0 (January 2023) was a major rewrite of the API surface — the legacy Query interface was deprecated in favour of select() + Session.execute(). The 1.4 line was the long-running bridge release; running with SQLALCHEMY_WARN_20=1 on 1.4 surfaces every call site that needs porting.
  • Supports Python 3.7+ on the 2.x line. Older 1.4 releases support 3.6.
  • Strict semver — minor releases (2.1, 2.2) are additive, majors are breaking. Pin upper-bounds (sqlalchemy>=2,<3) if you depend on dialect internals.
  • The 1.x → 2.x ABI break was the largest in the project's history. Many third-party tools (Alembic, Flask-SQLAlchemy, SQLModel) needed coordinated upgrades.

Package metadata

  • Maintainer: Mike Bayer (zzzeek) and the SQLAlchemy core team
  • Project home: github.com/sqlalchemy/sqlalchemy
  • Docs: docs.sqlalchemy.org
  • PyPI: pypi.org/project/sqlalchemy
  • License: MIT
  • Governance: independent open-source project; commercial sponsorship via Tidelift
  • First released: 2006
  • Downloads: tens of millions per month — a transitive dependency of Flask-SQLAlchemy, SQLModel, Alembic, Airflow, Dagster, Prefect, and most of the Python data ecosystem.

Optional dependencies & extras

SQLAlchemy splits its database driver dependencies into per-dialect extras. The base install ships only SQLite support.

  • sqlalchemy[asyncio]greenlet plus the async engine machinery. Required for create_async_engine() and AsyncSession.
  • sqlalchemy[postgresql]psycopg2 (synchronous, classic).
  • sqlalchemy[postgresql_psycopg]psycopg v3 (synchronous + async).
  • sqlalchemy[postgresql_asyncpg]asyncpg (async-only).
  • sqlalchemy[postgresql_pg8000] — pure-Python Postgres driver (no C extension).
  • sqlalchemy[mysql]mysqlclient (the C-based driver). Requires libmysqlclient headers at build time.
  • sqlalchemy[pymysql]PyMySQL, the pure-Python MySQL driver.
  • sqlalchemy[mariadb_connector] — official MariaDB Connector/Python.
  • sqlalchemy[oracle]cx_Oracle (now oracledb).
  • sqlalchemy[mssql]pyodbc for SQL Server.
  • sqlalchemy[aiosqlite] — async SQLite driver.
  • sqlalchemy[mypy] — legacy mypy plugin. 2.0's native typing makes this rarely needed.

Combine extras with commas: pip install "sqlalchemy[asyncio,postgresql_asyncpg]".

Alternatives

PackageTrade-off
sqlmodelPydantic + SQLAlchemy fusion. Single class declares both the API model and the table. Smaller surface, less raw power.
peeweeLightweight ORM, simpler API, smaller install. Good for hobby projects; less suited to complex schemas.
tortoise-ormAsync-first ORM with Django-style models. Use when async + Django familiarity matter.
ponyGenerator-based query syntax. Niche but elegant for some patterns.
databasesAsync query builder without an ORM. Use when you want async SQL but no object mapping.
duckdb (with sqlglot)Embedded analytical SQL with no ORM. Different use case — OLAP rather than OLTP.
datasetSchemaless wrapper over SQLAlchemy. Use for one-off scripts; not for typed application code.

Common gotchas

  1. 1.x vs 2.x query style. 2.0 made Session.execute(select(...)) the canonical query interface. The classic session.query(Model).filter(...) API is deprecated and emits warnings under SQLALCHEMY_WARN_20=1. Plan a migration before pinning to 2.x.
  2. Engine vs session lifecycle. Create one engine per process (it holds the connection pool); create many Session objects (one per unit of work, typically per request or per task). Sharing a session across threads or coroutines causes silent data corruption.
  3. Transaction scope is implicit on Session. Sessions begin a transaction on first use and hold it until commit() or rollback(). Forgetting either leaves the connection in the pool with an open transaction — eventually blocks writes. Use the with Session(...) as session: context manager.
  4. Async sessions need greenlet. The async engine spawns synchronous DB-API calls inside a greenlet; if greenlet is missing (e.g. on Alpine ARM builds without wheels), async support silently degrades.
  5. Session.execute() returns Result, not rows. Call .scalars().all() for a list of model instances, .scalar_one() for exactly-one, or .all() for tuple rows. New users frequently forget the .scalars() step.
  6. Connection pool defaults. QueuePool size is 5 with max_overflow=10. High-concurrency workers must raise these explicitly via create_engine(..., pool_size=20, max_overflow=40) — otherwise requests stall waiting for a connection.
  7. Driver versions matter for Postgres async. asyncpg and psycopg v3 each have their own DSN format and TLS handling. Mixing the wrong driver string (postgresql+asyncpg:// vs postgresql+psycopg://) yields confusing parse errors.
  8. expire_on_commit=True is the default. After session.commit(), all loaded objects are expired and trigger a re-fetch on next attribute access — surprising in API handlers. Set expire_on_commit=False for read-after-commit patterns.

Real-world recipes

Patterns that come up repeatedly once an application graduates beyond select(User).where(User.id == 1). Each is a minimal skeleton — bolt onto a real schema and grow.

Async ORM with AsyncSession

The 2.x async stack uses create_async_engine + async_sessionmaker + AsyncSession. The greenlet dep is required (pulled in by sqlalchemy[asyncio]) so synchronous DB-API calls can run inside an awaiting task.

python
import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import (
    AsyncSession, async_sessionmaker, create_async_engine
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase): pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]

engine = create_async_engine(
    "postgresql+asyncpg://alice@localhost/notes",
    pool_size=20, max_overflow=40, pool_pre_ping=True,
)
Session = async_sessionmaker(engine, expire_on_commit=False)

async def main():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    async with Session() as session:
        session.add(User(name="alice"))
        await session.commit()
        result = await session.execute(select(User))
        for u in result.scalars():
            print(u.id, u.name)

asyncio.run(main())

Output: 1 alice — the async engine multiplexes connections across many tasks without blocking the event loop.

Many-to-many with an extra-data association table

For relationships that carry their own data (e.g. role-on-team with a joined_at timestamp), declare the association as a model — not a bare Table(). This gives the ORM a real handle on the relationship and lets you query/order by association columns.

python
from datetime import datetime
from sqlalchemy import ForeignKey, select
from sqlalchemy.orm import (
    DeclarativeBase, Mapped, mapped_column, relationship
)

class Base(DeclarativeBase): pass

class Membership(Base):
    __tablename__ = "memberships"
    user_id: Mapped[int]  = mapped_column(ForeignKey("users.id"), primary_key=True)
    team_id: Mapped[int]  = mapped_column(ForeignKey("teams.id"), primary_key=True)
    role:    Mapped[str]
    joined_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    user: Mapped["User"] = relationship(back_populates="memberships")
    team: Mapped["Team"] = relationship(back_populates="memberships")

class User(Base):
    __tablename__ = "users"
    id: Mapped[int]  = mapped_column(primary_key=True)
    name: Mapped[str]
    memberships: Mapped[list[Membership]] = relationship(back_populates="user")

class Team(Base):
    __tablename__ = "teams"
    id: Mapped[int]  = mapped_column(primary_key=True)
    name: Mapped[str]
    memberships: Mapped[list[Membership]] = relationship(back_populates="team")

Output: queries can filter on the association: select(User).join(Membership).where(Membership.role == "admin").

Hybrid property — Python + SQL semantics in one method

A @hybrid_property is a single method that works both as a Python attribute on a loaded instance and as a SQL expression in where(...) / order_by(...). The classic use case is a computed field that should be queryable.

python
from sqlalchemy import func
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase): pass

class Person(Base):
    __tablename__ = "people"
    id: Mapped[int]  = mapped_column(primary_key=True)
    first: Mapped[str]
    last:  Mapped[str]

    @hybrid_property
    def full_name(self) -> str:
        return f"{self.first} {self.last}"

    @full_name.expression
    def full_name(cls):
        return func.concat(cls.first, " ", cls.last)

Output: session.execute(select(Person).where(Person.full_name.contains("alice"))).scalars().all() runs the SQL form; person.full_name runs the Python form.

Batch insert / update via insert(...).values([...])

For ETL-style bulk loads, never session.add() in a loop — every call sends a separate INSERT round-trip. Use Core's insert(...).values([dict, ...]) to issue a single multi-row INSERT, optionally with ON CONFLICT DO UPDATE for upsert semantics.

python
from sqlalchemy import insert
from sqlalchemy.dialects.postgresql import insert as pg_insert

# Plain bulk insert
session.execute(
    insert(User),
    [{"name": "alice"}, {"name": "bob"}, {"name": "carol"}],
)
session.commit()

# Upsert on Postgres — ON CONFLICT DO UPDATE
stmt = pg_insert(User).values([
    {"id": 1, "name": "alice (renamed)"},
    {"id": 2, "name": "bob"},
])
stmt = stmt.on_conflict_do_update(
    index_elements=["id"], set_={"name": stmt.excluded.name},
)
session.execute(stmt)
session.commit()

Output: one INSERT statement on the wire instead of N; RETURNING id can be appended on Postgres/SQLite to recover generated PKs.

Eager-loading strategies for the N+1 trap

The textbook ORM pitfall: iterating users and accessing user.posts triggers one query per user. Fix with selectinload, joinedload, or subqueryload depending on cardinality.

python
from sqlalchemy import select
from sqlalchemy.orm import selectinload, joinedload

# selectinload — best default for one-to-many. Two queries total.
result = session.execute(
    select(User).options(selectinload(User.posts))
).scalars().all()

# joinedload — best for many-to-one or one-to-one. One query with JOIN.
result = session.execute(
    select(Post).options(joinedload(Post.author))
).scalars().all()

# Nested chains — load posts AND each post's comments
result = session.execute(
    select(User).options(
        selectinload(User.posts).selectinload(Post.comments)
    )
).scalars().all()

Output: depending on strategy, either 1 (joinedload) or 2 (selectinload) queries cover the whole graph instead of 1 + N + N×M.

Production deployment

A SQLAlchemy application is just Python; the production concerns are the same as any database client — pool sizing, transaction discipline, observability. The non-obvious bits are below.

Pool sizing

QueuePool defaults to pool_size=5, max_overflow=10 — i.e. up to 15 concurrent connections. Web apps with worker counts ≥ pool size will block. The standard rule:

ini
connections_per_engine = (web_workers × max_concurrent_db_calls_per_worker) + slack

For a gunicorn -w 4 deployment with each worker handling ~4 concurrent DB calls, set pool_size=16, max_overflow=8 and configure Postgres max_connections to comfortably exceed pool_size × num_app_instances.

python
engine = create_engine(
    "postgresql+psycopg://app@db.internal/app",
    pool_size=16,
    max_overflow=8,
    pool_recycle=1800,      # drop connections older than 30 min
    pool_pre_ping=True,     # cheap SELECT 1 before reuse — survives DB restarts
    pool_timeout=10,        # block at most 10s waiting for a free conn
)

Output: the engine maintains a warm pool, recycles long-lived connections to dodge stale TCP state, and pings before use so a DB restart doesn't surface as a wave of OperationalError.

Transactional patterns

The 2.x canonical pattern is with Session(engine) as session: with session.begin(): …. The inner begin() opens a transaction, commits on clean exit, rolls back on exception. Repeated commits without expire_on_commit=False re-fetch every object — usually unwanted in API handlers.

python
from sqlalchemy.orm import sessionmaker

SessionLocal = sessionmaker(engine, expire_on_commit=False)

def update_user(user_id: int, **updates):
    with SessionLocal() as session, session.begin():
        user = session.get(User, user_id)
        for k, v in updates.items():
            setattr(user, k, v)
        # commit happens implicitly on `with session.begin()` exit
    # session.close() happens here

# In FastAPI: a Depends() that yields a session per request
def get_db():
    with SessionLocal() as session:
        yield session

Output: transaction scope matches request lifecycle; rollback on uncaught exceptions is automatic.

Observability — logging, slow query traces

Set echo="debug" only in development. For production, attach a query-execution event listener that logs to your structured logger and flags slow queries.

python
import logging
import time
from sqlalchemy import event

slow_log = logging.getLogger("db.slow")

@event.listens_for(engine, "before_cursor_execute")
def _start(conn, cursor, statement, params, context, executemany):
    context._t0 = time.perf_counter()

@event.listens_for(engine, "after_cursor_execute")
def _end(conn, cursor, statement, params, context, executemany):
    dt = time.perf_counter() - context._t0
    if dt > 0.5:
        slow_log.warning("slow_query duration=%.3fs sql=%s", dt, statement[:200])

Output: any query slower than 500 ms lands in the slow-query log with a snippet of the SQL; pair with Postgres auto_explain for full plans.

Health checks

A SELECT 1 against the pool both verifies DB reachability and exercises pool_pre_ping. Wire as your container's readiness probe.

python
from sqlalchemy import text

def healthcheck() -> bool:
    try:
        with engine.connect() as conn:
            conn.execute(text("SELECT 1"))
        return True
    except Exception:
        return False

Output: the engine reports healthy iff a connection round-trip succeeds.

Database migration strategies

SQLAlchemy ships zero migration tooling — Base.metadata.create_all() is for dev only. The standard companion is Alembic (also by Mike Bayer), which compares your model metadata against the live DB and writes Python migration scripts.

Alembic project layout

bash
pip install alembic
alembic init alembic
# Edit alembic.ini: set sqlalchemy.url = postgresql://...
# Edit alembic/env.py: target_metadata = Base.metadata

Output: an alembic/ directory with env.py, script.py.mako, and an empty versions/ folder.

Autogenerate + review (the safe loop)

Alembic's autogenerate compares the model metadata to the DB schema and writes a migration. Always review before applying — autogenerate misses constraint renames, custom types, and server-side defaults.

bash
# Generate a candidate migration
alembic revision --autogenerate -m "add notes table"

# Inspect — REQUIRED. Edit alembic/versions/<rev>_add_notes_table.py.

# Apply
alembic upgrade head

# Roll back the most recent
alembic downgrade -1

Output: alembic upgrade head runs every pending revision in order; alembic history shows the chain.

Data migrations vs schema migrations

Schema migrations change DDL; data migrations rewrite rows. Mixing them in one script is tempting but fragile — a failed data step leaves the DDL half-applied. The safer pattern is two separate revisions, with the data revision marked op.run_python()-style.

python
# alembic/versions/202604010001_backfill_user_status.py
from alembic import op
from sqlalchemy import table, column, String

def upgrade():
    users = table("users", column("status", String))
    op.execute(users.update().where(users.c.status == None).values(status="active"))

def downgrade():
    pass  # data migrations rarely reverse cleanly

Output: the migration runs the backfill query inside the same transaction Alembic opens for the revision; a crash mid-migration rolls back atomically on Postgres.

Branching and merging migration history

Long-lived branches generate parallel revisions whose down_revision both point to the same ancestor. Alembic detects this on alembic upgrade head and refuses. Fix by merging:

bash
alembic merge -m "merge heads" <rev_a> <rev_b>

Output: a no-op merge revision unifying the two heads — apply with alembic upgrade head afterward.

Deployment patterns

For zero-downtime deployments:

  1. Expand: add new columns/tables as nullable / with defaults. New code reads & writes both old and new.
  2. Migrate: backfill data; old code unaffected.
  3. Contract: drop old columns once all instances run the new code.

Never do expand+contract in one migration on a live system — old code crashes mid-deploy.

Version migration guide

The 1.x → 2.x jump (January 2023) was the largest break in SQLAlchemy's history. The default query API changed from Session.query(...) to Session.execute(select(...)), transaction semantics tightened, and Mapped[] typing became canonical.

1.x2.x query API

python
# 1.x — legacy Query interface (deprecated in 2.x)
users = session.query(User).filter(User.active == True).all()
user  = session.query(User).get(123)
count = session.query(User).filter_by(active=True).count()
python
# 2.x — select() + Session.execute() — canonical
from sqlalchemy import select, func
users = session.execute(
    select(User).where(User.active.is_(True))
).scalars().all()
user  = session.get(User, 123)
count = session.execute(
    select(func.count()).select_from(User).where(User.active.is_(True))
).scalar_one()

Output: the new style produces a Result object — call .scalars().all() for model instances, .scalar_one() for a guaranteed single row, .all() for tuple rows.

Mapped[] typing replaces untyped Column

python
# 1.x — untyped Column
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base
Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id   = Column(Integer, primary_key=True)
    name = Column(String)
python
# 2.x — typed Mapped + mapped_column
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase): pass

class User(Base):
    __tablename__ = "users"
    id:   Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]

Output: mypy and IDE refactors understand the model; the runtime behaviour is identical.

Transaction semantics

python
# 1.x — implicit begin, autocommit by Session
session.add(user); session.commit()

# 2.x — explicit `with session.begin():` recommended;
# `autocommit` mode removed entirely from Session
with Session(engine) as session, session.begin():
    session.add(user)
# commit on exit; rollback on exception

Output: transaction scope is now a with block — leaks of "session left in pending state" are caught at type-check time.

Migration tooling

Run the 1.4 bridge release with the warning flag enabled to find every legacy call site:

bash
SQLALCHEMY_WARN_20=1 python -W error::DeprecationWarning app.py

Output: every query(), lazy="select" default, and autocommit=True surfaces as a hard error — fix all, then pin sqlalchemy>=2.

Performance tuning

Most SQLAlchemy performance work is not about the ORM internals — it's about the SQL the ORM emits. Profile the SQL first.

SymptomLikely causeFix
N+1 queries when iterating relationshipsLazy-loading default (lazy="select")selectinload/joinedload in the .options(...) of the select.
Long INSERT loopsOne session.add() per row → N INSERTssession.execute(insert(Model), [...]) for batched insert; ~50× faster.
High DB connection countToo many create_engine callsOne engine per process; share via dependency injection.
pool_timeout errors at peakPool too small for worker concurrencyRaise pool_size + max_overflow; check Postgres max_connections.
Session.commit() slowexpire_on_commit=True re-fetches every loaded object on next accessSession(..., expire_on_commit=False) for API handlers.
Slow ORM materialisation on large resultORM unit-of-work overhead per rowUse Core: session.execute(select(User.id, User.name)).all() — returns raw tuples.
Hot path spends time in compile_statementRepeated compilation of the same SQLCache compiled statements via select(...).execution_options(compiled_cache=cache_dict).
Slow count() on large tablesSELECT COUNT(*) scans the table on PostgresUse approximate counts (pg_class.reltuples) for dashboards; reserve exact counts for billable surfaces.

Escape hatch — raw SQL

When the ORM gets in the way, drop to text() for a specific query. The Session still wraps it in the transaction; you give up auto-mapping but gain full control over SQL hints, CTEs, and dialect features.

python
from sqlalchemy import text

with Session(engine) as session:
    rows = session.execute(
        text("""
            WITH recent AS (
              SELECT user_id, MAX(created_at) AS last_seen
              FROM events
              WHERE created_at > :since
              GROUP BY user_id
            )
            SELECT u.name, r.last_seen
            FROM users u JOIN recent r ON r.user_id = u.id
            ORDER BY r.last_seen DESC
            LIMIT 50
        """),
        {"since": "2026-01-01"},
    ).all()

Output: the ORM stays out of the way; bind parameters are still escaped properly.

Schema design & ORM patterns

The ORM choices that survive contact with production schemas. Each one trades a layer of magic for explicitness.

Declarative vs imperative mapping

Declarative is the standard — class body declares columns and relationships. Imperative mapping (registry.map_imperatively(User, users_table)) decouples the model class from the table, useful when the class is a dataclass owned by the domain layer and the table is owned by persistence.

python
from dataclasses import dataclass, field
from sqlalchemy import Column, Integer, String, Table, MetaData
from sqlalchemy.orm import registry

mapper_registry = registry()
metadata = MetaData()

users_table = Table(
    "users", metadata,
    Column("id", Integer, primary_key=True),
    Column("name", String),
)

@dataclass
class User:
    id: int = field(init=False)
    name: str = ""

mapper_registry.map_imperatively(User, users_table)

Output: User is a plain dataclass — no SQLAlchemy base — but the registry maps it to the table. Tests that don't need a DB can construct User(name="alice") without touching SQLAlchemy.

Polymorphic inheritance

Joined-table inheritance is the safe default when subclasses have meaningfully different columns. Single-table is faster (one table, no join) but bloats with nullable columns. Concrete-table inheritance is rare — almost always a code smell.

python
class Vehicle(Base):
    __tablename__ = "vehicles"
    id: Mapped[int]  = mapped_column(primary_key=True)
    kind: Mapped[str]
    __mapper_args__ = {"polymorphic_on": "kind", "polymorphic_identity": "vehicle"}

class Car(Vehicle):
    __tablename__ = "cars"
    id:   Mapped[int] = mapped_column(ForeignKey("vehicles.id"), primary_key=True)
    seats: Mapped[int]
    __mapper_args__ = {"polymorphic_identity": "car"}

class Truck(Vehicle):
    __tablename__ = "trucks"
    id:       Mapped[int] = mapped_column(ForeignKey("vehicles.id"), primary_key=True)
    capacity: Mapped[int]
    __mapper_args__ = {"polymorphic_identity": "truck"}

Output: select(Vehicle) returns instances of the right subclass automatically; select(Car) returns only cars.

Custom column types

When the DB type doesn't quite match the Python type — e.g. storing JSON-as-text with Pydantic validation on the way in — implement TypeDecorator:

python
from sqlalchemy import String
from sqlalchemy.types import TypeDecorator
import json

class JSONEncodedDict(TypeDecorator):
    impl = String
    cache_ok = True

    def process_bind_param(self, value, dialect):
        return json.dumps(value) if value is not None else None

    def process_result_value(self, value, dialect):
        return json.loads(value) if value is not None else None

class Note(Base):
    __tablename__ = "notes"
    id:    Mapped[int] = mapped_column(primary_key=True)
    meta:  Mapped[dict] = mapped_column(JSONEncodedDict)

Output: note.meta = {"tags": ["urgent"]} survives a round-trip through the DB as serialized JSON; the application code never sees the string form.

See also