cheat sheet
SQLAlchemy
Package-level reference for SQLAlchemy on PyPI — install variants, dialect drivers, version policy, extras, and alternatives.
SQLAlchemy
What it is
SQLAlchemy is a comprehensive SQL toolkit and Object-Relational Mapper for Python, created by Mike Bayer in 2006. It provides both a low-level Core API (an expression language and connection pool over DB-API) and a higher-level ORM that maps Python classes onto relational tables.
Reach for SQLAlchemy when you need a battle-tested abstraction over multiple SQL dialects with first-class transaction, pooling, and migration support. Reach for a lighter wrapper like sqlmodel or peewee when you want a smaller surface area, or for a query builder like databases when you only need async execution without an ORM.
Install
pip install sqlalchemy
Output: (none — exits 0 on success). The base package supports SQLite out of the box via the stdlib sqlite3 driver.
uv add sqlalchemy
Output: dependency resolved + added to pyproject.toml
poetry add sqlalchemy
Output: updated lockfile + virtualenv install
pip install "sqlalchemy[asyncio,postgresql_psycopg]"
Output: installs SQLAlchemy plus greenlet (async support) and psycopg[binary] (Postgres driver).
Versioning & Python support
- Current stable line is the
2.xseries.2.0(January 2023) was a major rewrite of the API surface — the legacyQueryinterface was deprecated in favour ofselect()+Session.execute(). The1.4line was the long-running bridge release; running withSQLALCHEMY_WARN_20=1on1.4surfaces every call site that needs porting. - Supports Python 3.7+ on the
2.xline. Older1.4releases support 3.6. - Strict semver — minor releases (
2.1,2.2) are additive, majors are breaking. Pin upper-bounds (sqlalchemy>=2,<3) if you depend on dialect internals. - The 1.x → 2.x ABI break was the largest in the project's history. Many third-party tools (Alembic, Flask-SQLAlchemy, SQLModel) needed coordinated upgrades.
Package metadata
- Maintainer: Mike Bayer (
zzzeek) and the SQLAlchemy core team - Project home: github.com/sqlalchemy/sqlalchemy
- Docs: docs.sqlalchemy.org
- PyPI: pypi.org/project/sqlalchemy
- License: MIT
- Governance: independent open-source project; commercial sponsorship via Tidelift
- First released: 2006
- Downloads: tens of millions per month — a transitive dependency of Flask-SQLAlchemy, SQLModel, Alembic, Airflow, Dagster, Prefect, and most of the Python data ecosystem.
Optional dependencies & extras
SQLAlchemy splits its database driver dependencies into per-dialect extras. The base install ships only SQLite support.
sqlalchemy[asyncio]—greenletplus the async engine machinery. Required forcreate_async_engine()andAsyncSession.sqlalchemy[postgresql]—psycopg2(synchronous, classic).sqlalchemy[postgresql_psycopg]—psycopgv3 (synchronous + async).sqlalchemy[postgresql_asyncpg]—asyncpg(async-only).sqlalchemy[postgresql_pg8000]— pure-Python Postgres driver (no C extension).sqlalchemy[mysql]—mysqlclient(the C-based driver). Requires libmysqlclient headers at build time.sqlalchemy[pymysql]—PyMySQL, the pure-Python MySQL driver.sqlalchemy[mariadb_connector]— official MariaDB Connector/Python.sqlalchemy[oracle]—cx_Oracle(noworacledb).sqlalchemy[mssql]—pyodbcfor SQL Server.sqlalchemy[aiosqlite]— async SQLite driver.sqlalchemy[mypy]— legacy mypy plugin.2.0's native typing makes this rarely needed.
Combine extras with commas: pip install "sqlalchemy[asyncio,postgresql_asyncpg]".
Alternatives
| Package | Trade-off |
|---|---|
sqlmodel | Pydantic + SQLAlchemy fusion. Single class declares both the API model and the table. Smaller surface, less raw power. |
peewee | Lightweight ORM, simpler API, smaller install. Good for hobby projects; less suited to complex schemas. |
tortoise-orm | Async-first ORM with Django-style models. Use when async + Django familiarity matter. |
pony | Generator-based query syntax. Niche but elegant for some patterns. |
databases | Async query builder without an ORM. Use when you want async SQL but no object mapping. |
duckdb (with sqlglot) | Embedded analytical SQL with no ORM. Different use case — OLAP rather than OLTP. |
dataset | Schemaless wrapper over SQLAlchemy. Use for one-off scripts; not for typed application code. |
Common gotchas
- 1.x vs 2.x query style.
2.0madeSession.execute(select(...))the canonical query interface. The classicsession.query(Model).filter(...)API is deprecated and emits warnings underSQLALCHEMY_WARN_20=1. Plan a migration before pinning to2.x. - Engine vs session lifecycle. Create one
engineper process (it holds the connection pool); create manySessionobjects (one per unit of work, typically per request or per task). Sharing a session across threads or coroutines causes silent data corruption. - Transaction scope is implicit on
Session. Sessions begin a transaction on first use and hold it untilcommit()orrollback(). Forgetting either leaves the connection in the pool with an open transaction — eventually blocks writes. Use thewith Session(...) as session:context manager. - Async sessions need
greenlet. The async engine spawns synchronous DB-API calls inside a greenlet; ifgreenletis missing (e.g. on Alpine ARM builds without wheels), async support silently degrades. Session.execute()returnsResult, not rows. Call.scalars().all()for a list of model instances,.scalar_one()for exactly-one, or.all()for tuple rows. New users frequently forget the.scalars()step.- Connection pool defaults.
QueuePoolsize is 5 withmax_overflow=10. High-concurrency workers must raise these explicitly viacreate_engine(..., pool_size=20, max_overflow=40)— otherwise requests stall waiting for a connection. - Driver versions matter for Postgres async.
asyncpgandpsycopgv3 each have their own DSN format and TLS handling. Mixing the wrong driver string (postgresql+asyncpg://vspostgresql+psycopg://) yields confusing parse errors. expire_on_commit=Trueis the default. Aftersession.commit(), all loaded objects are expired and trigger a re-fetch on next attribute access — surprising in API handlers. Setexpire_on_commit=Falsefor read-after-commit patterns.
Real-world recipes
Patterns that come up repeatedly once an application graduates beyond select(User).where(User.id == 1). Each is a minimal skeleton — bolt onto a real schema and grow.
Async ORM with AsyncSession
The 2.x async stack uses create_async_engine + async_sessionmaker + AsyncSession. The greenlet dep is required (pulled in by sqlalchemy[asyncio]) so synchronous DB-API calls can run inside an awaiting task.
import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import (
AsyncSession, async_sessionmaker, create_async_engine
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase): pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
engine = create_async_engine(
"postgresql+asyncpg://alice@localhost/notes",
pool_size=20, max_overflow=40, pool_pre_ping=True,
)
Session = async_sessionmaker(engine, expire_on_commit=False)
async def main():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with Session() as session:
session.add(User(name="alice"))
await session.commit()
result = await session.execute(select(User))
for u in result.scalars():
print(u.id, u.name)
asyncio.run(main())
Output: 1 alice — the async engine multiplexes connections across many tasks without blocking the event loop.
Many-to-many with an extra-data association table
For relationships that carry their own data (e.g. role-on-team with a joined_at timestamp), declare the association as a model — not a bare Table(). This gives the ORM a real handle on the relationship and lets you query/order by association columns.
from datetime import datetime
from sqlalchemy import ForeignKey, select
from sqlalchemy.orm import (
DeclarativeBase, Mapped, mapped_column, relationship
)
class Base(DeclarativeBase): pass
class Membership(Base):
__tablename__ = "memberships"
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), primary_key=True)
team_id: Mapped[int] = mapped_column(ForeignKey("teams.id"), primary_key=True)
role: Mapped[str]
joined_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
user: Mapped["User"] = relationship(back_populates="memberships")
team: Mapped["Team"] = relationship(back_populates="memberships")
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
memberships: Mapped[list[Membership]] = relationship(back_populates="user")
class Team(Base):
__tablename__ = "teams"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
memberships: Mapped[list[Membership]] = relationship(back_populates="team")
Output: queries can filter on the association: select(User).join(Membership).where(Membership.role == "admin").
Hybrid property — Python + SQL semantics in one method
A @hybrid_property is a single method that works both as a Python attribute on a loaded instance and as a SQL expression in where(...) / order_by(...). The classic use case is a computed field that should be queryable.
from sqlalchemy import func
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase): pass
class Person(Base):
__tablename__ = "people"
id: Mapped[int] = mapped_column(primary_key=True)
first: Mapped[str]
last: Mapped[str]
@hybrid_property
def full_name(self) -> str:
return f"{self.first} {self.last}"
@full_name.expression
def full_name(cls):
return func.concat(cls.first, " ", cls.last)
Output: session.execute(select(Person).where(Person.full_name.contains("alice"))).scalars().all() runs the SQL form; person.full_name runs the Python form.
Batch insert / update via insert(...).values([...])
For ETL-style bulk loads, never session.add() in a loop — every call sends a separate INSERT round-trip. Use Core's insert(...).values([dict, ...]) to issue a single multi-row INSERT, optionally with ON CONFLICT DO UPDATE for upsert semantics.
from sqlalchemy import insert
from sqlalchemy.dialects.postgresql import insert as pg_insert
# Plain bulk insert
session.execute(
insert(User),
[{"name": "alice"}, {"name": "bob"}, {"name": "carol"}],
)
session.commit()
# Upsert on Postgres — ON CONFLICT DO UPDATE
stmt = pg_insert(User).values([
{"id": 1, "name": "alice (renamed)"},
{"id": 2, "name": "bob"},
])
stmt = stmt.on_conflict_do_update(
index_elements=["id"], set_={"name": stmt.excluded.name},
)
session.execute(stmt)
session.commit()
Output: one INSERT statement on the wire instead of N; RETURNING id can be appended on Postgres/SQLite to recover generated PKs.
Eager-loading strategies for the N+1 trap
The textbook ORM pitfall: iterating users and accessing user.posts triggers one query per user. Fix with selectinload, joinedload, or subqueryload depending on cardinality.
from sqlalchemy import select
from sqlalchemy.orm import selectinload, joinedload
# selectinload — best default for one-to-many. Two queries total.
result = session.execute(
select(User).options(selectinload(User.posts))
).scalars().all()
# joinedload — best for many-to-one or one-to-one. One query with JOIN.
result = session.execute(
select(Post).options(joinedload(Post.author))
).scalars().all()
# Nested chains — load posts AND each post's comments
result = session.execute(
select(User).options(
selectinload(User.posts).selectinload(Post.comments)
)
).scalars().all()
Output: depending on strategy, either 1 (joinedload) or 2 (selectinload) queries cover the whole graph instead of 1 + N + N×M.
Production deployment
A SQLAlchemy application is just Python; the production concerns are the same as any database client — pool sizing, transaction discipline, observability. The non-obvious bits are below.
Pool sizing
QueuePool defaults to pool_size=5, max_overflow=10 — i.e. up to 15 concurrent connections. Web apps with worker counts ≥ pool size will block. The standard rule:
connections_per_engine = (web_workers × max_concurrent_db_calls_per_worker) + slack
For a gunicorn -w 4 deployment with each worker handling ~4 concurrent DB calls, set pool_size=16, max_overflow=8 and configure Postgres max_connections to comfortably exceed pool_size × num_app_instances.
engine = create_engine(
"postgresql+psycopg://app@db.internal/app",
pool_size=16,
max_overflow=8,
pool_recycle=1800, # drop connections older than 30 min
pool_pre_ping=True, # cheap SELECT 1 before reuse — survives DB restarts
pool_timeout=10, # block at most 10s waiting for a free conn
)
Output: the engine maintains a warm pool, recycles long-lived connections to dodge stale TCP state, and pings before use so a DB restart doesn't surface as a wave of OperationalError.
Transactional patterns
The 2.x canonical pattern is with Session(engine) as session: with session.begin(): …. The inner begin() opens a transaction, commits on clean exit, rolls back on exception. Repeated commits without expire_on_commit=False re-fetch every object — usually unwanted in API handlers.
from sqlalchemy.orm import sessionmaker
SessionLocal = sessionmaker(engine, expire_on_commit=False)
def update_user(user_id: int, **updates):
with SessionLocal() as session, session.begin():
user = session.get(User, user_id)
for k, v in updates.items():
setattr(user, k, v)
# commit happens implicitly on `with session.begin()` exit
# session.close() happens here
# In FastAPI: a Depends() that yields a session per request
def get_db():
with SessionLocal() as session:
yield session
Output: transaction scope matches request lifecycle; rollback on uncaught exceptions is automatic.
Observability — logging, slow query traces
Set echo="debug" only in development. For production, attach a query-execution event listener that logs to your structured logger and flags slow queries.
import logging
import time
from sqlalchemy import event
slow_log = logging.getLogger("db.slow")
@event.listens_for(engine, "before_cursor_execute")
def _start(conn, cursor, statement, params, context, executemany):
context._t0 = time.perf_counter()
@event.listens_for(engine, "after_cursor_execute")
def _end(conn, cursor, statement, params, context, executemany):
dt = time.perf_counter() - context._t0
if dt > 0.5:
slow_log.warning("slow_query duration=%.3fs sql=%s", dt, statement[:200])
Output: any query slower than 500 ms lands in the slow-query log with a snippet of the SQL; pair with Postgres auto_explain for full plans.
Health checks
A SELECT 1 against the pool both verifies DB reachability and exercises pool_pre_ping. Wire as your container's readiness probe.
from sqlalchemy import text
def healthcheck() -> bool:
try:
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
return True
except Exception:
return False
Output: the engine reports healthy iff a connection round-trip succeeds.
Database migration strategies
SQLAlchemy ships zero migration tooling — Base.metadata.create_all() is for dev only. The standard companion is Alembic (also by Mike Bayer), which compares your model metadata against the live DB and writes Python migration scripts.
Alembic project layout
pip install alembic
alembic init alembic
# Edit alembic.ini: set sqlalchemy.url = postgresql://...
# Edit alembic/env.py: target_metadata = Base.metadata
Output: an alembic/ directory with env.py, script.py.mako, and an empty versions/ folder.
Autogenerate + review (the safe loop)
Alembic's autogenerate compares the model metadata to the DB schema and writes a migration. Always review before applying — autogenerate misses constraint renames, custom types, and server-side defaults.
# Generate a candidate migration
alembic revision --autogenerate -m "add notes table"
# Inspect — REQUIRED. Edit alembic/versions/<rev>_add_notes_table.py.
# Apply
alembic upgrade head
# Roll back the most recent
alembic downgrade -1
Output: alembic upgrade head runs every pending revision in order; alembic history shows the chain.
Data migrations vs schema migrations
Schema migrations change DDL; data migrations rewrite rows. Mixing them in one script is tempting but fragile — a failed data step leaves the DDL half-applied. The safer pattern is two separate revisions, with the data revision marked op.run_python()-style.
# alembic/versions/202604010001_backfill_user_status.py
from alembic import op
from sqlalchemy import table, column, String
def upgrade():
users = table("users", column("status", String))
op.execute(users.update().where(users.c.status == None).values(status="active"))
def downgrade():
pass # data migrations rarely reverse cleanly
Output: the migration runs the backfill query inside the same transaction Alembic opens for the revision; a crash mid-migration rolls back atomically on Postgres.
Branching and merging migration history
Long-lived branches generate parallel revisions whose down_revision both point to the same ancestor. Alembic detects this on alembic upgrade head and refuses. Fix by merging:
alembic merge -m "merge heads" <rev_a> <rev_b>
Output: a no-op merge revision unifying the two heads — apply with alembic upgrade head afterward.
Deployment patterns
For zero-downtime deployments:
- Expand: add new columns/tables as nullable / with defaults. New code reads & writes both old and new.
- Migrate: backfill data; old code unaffected.
- Contract: drop old columns once all instances run the new code.
Never do expand+contract in one migration on a live system — old code crashes mid-deploy.
Version migration guide
The 1.x → 2.x jump (January 2023) was the largest break in SQLAlchemy's history. The default query API changed from Session.query(...) to Session.execute(select(...)), transaction semantics tightened, and Mapped[] typing became canonical.
1.x → 2.x query API
# 1.x — legacy Query interface (deprecated in 2.x)
users = session.query(User).filter(User.active == True).all()
user = session.query(User).get(123)
count = session.query(User).filter_by(active=True).count()
# 2.x — select() + Session.execute() — canonical
from sqlalchemy import select, func
users = session.execute(
select(User).where(User.active.is_(True))
).scalars().all()
user = session.get(User, 123)
count = session.execute(
select(func.count()).select_from(User).where(User.active.is_(True))
).scalar_one()
Output: the new style produces a Result object — call .scalars().all() for model instances, .scalar_one() for a guaranteed single row, .all() for tuple rows.
Mapped[] typing replaces untyped Column
# 1.x — untyped Column
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
# 2.x — typed Mapped + mapped_column
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase): pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
Output: mypy and IDE refactors understand the model; the runtime behaviour is identical.
Transaction semantics
# 1.x — implicit begin, autocommit by Session
session.add(user); session.commit()
# 2.x — explicit `with session.begin():` recommended;
# `autocommit` mode removed entirely from Session
with Session(engine) as session, session.begin():
session.add(user)
# commit on exit; rollback on exception
Output: transaction scope is now a with block — leaks of "session left in pending state" are caught at type-check time.
Migration tooling
Run the 1.4 bridge release with the warning flag enabled to find every legacy call site:
SQLALCHEMY_WARN_20=1 python -W error::DeprecationWarning app.py
Output: every query(), lazy="select" default, and autocommit=True surfaces as a hard error — fix all, then pin sqlalchemy>=2.
Performance tuning
Most SQLAlchemy performance work is not about the ORM internals — it's about the SQL the ORM emits. Profile the SQL first.
| Symptom | Likely cause | Fix |
|---|---|---|
| N+1 queries when iterating relationships | Lazy-loading default (lazy="select") | selectinload/joinedload in the .options(...) of the select. |
| Long INSERT loops | One session.add() per row → N INSERTs | session.execute(insert(Model), [...]) for batched insert; ~50× faster. |
| High DB connection count | Too many create_engine calls | One engine per process; share via dependency injection. |
pool_timeout errors at peak | Pool too small for worker concurrency | Raise pool_size + max_overflow; check Postgres max_connections. |
Session.commit() slow | expire_on_commit=True re-fetches every loaded object on next access | Session(..., expire_on_commit=False) for API handlers. |
| Slow ORM materialisation on large result | ORM unit-of-work overhead per row | Use Core: session.execute(select(User.id, User.name)).all() — returns raw tuples. |
Hot path spends time in compile_statement | Repeated compilation of the same SQL | Cache compiled statements via select(...).execution_options(compiled_cache=cache_dict). |
Slow count() on large tables | SELECT COUNT(*) scans the table on Postgres | Use approximate counts (pg_class.reltuples) for dashboards; reserve exact counts for billable surfaces. |
Escape hatch — raw SQL
When the ORM gets in the way, drop to text() for a specific query. The Session still wraps it in the transaction; you give up auto-mapping but gain full control over SQL hints, CTEs, and dialect features.
from sqlalchemy import text
with Session(engine) as session:
rows = session.execute(
text("""
WITH recent AS (
SELECT user_id, MAX(created_at) AS last_seen
FROM events
WHERE created_at > :since
GROUP BY user_id
)
SELECT u.name, r.last_seen
FROM users u JOIN recent r ON r.user_id = u.id
ORDER BY r.last_seen DESC
LIMIT 50
"""),
{"since": "2026-01-01"},
).all()
Output: the ORM stays out of the way; bind parameters are still escaped properly.
Schema design & ORM patterns
The ORM choices that survive contact with production schemas. Each one trades a layer of magic for explicitness.
Declarative vs imperative mapping
Declarative is the standard — class body declares columns and relationships. Imperative mapping (registry.map_imperatively(User, users_table)) decouples the model class from the table, useful when the class is a dataclass owned by the domain layer and the table is owned by persistence.
from dataclasses import dataclass, field
from sqlalchemy import Column, Integer, String, Table, MetaData
from sqlalchemy.orm import registry
mapper_registry = registry()
metadata = MetaData()
users_table = Table(
"users", metadata,
Column("id", Integer, primary_key=True),
Column("name", String),
)
@dataclass
class User:
id: int = field(init=False)
name: str = ""
mapper_registry.map_imperatively(User, users_table)
Output: User is a plain dataclass — no SQLAlchemy base — but the registry maps it to the table. Tests that don't need a DB can construct User(name="alice") without touching SQLAlchemy.
Polymorphic inheritance
Joined-table inheritance is the safe default when subclasses have meaningfully different columns. Single-table is faster (one table, no join) but bloats with nullable columns. Concrete-table inheritance is rare — almost always a code smell.
class Vehicle(Base):
__tablename__ = "vehicles"
id: Mapped[int] = mapped_column(primary_key=True)
kind: Mapped[str]
__mapper_args__ = {"polymorphic_on": "kind", "polymorphic_identity": "vehicle"}
class Car(Vehicle):
__tablename__ = "cars"
id: Mapped[int] = mapped_column(ForeignKey("vehicles.id"), primary_key=True)
seats: Mapped[int]
__mapper_args__ = {"polymorphic_identity": "car"}
class Truck(Vehicle):
__tablename__ = "trucks"
id: Mapped[int] = mapped_column(ForeignKey("vehicles.id"), primary_key=True)
capacity: Mapped[int]
__mapper_args__ = {"polymorphic_identity": "truck"}
Output: select(Vehicle) returns instances of the right subclass automatically; select(Car) returns only cars.
Custom column types
When the DB type doesn't quite match the Python type — e.g. storing JSON-as-text with Pydantic validation on the way in — implement TypeDecorator:
from sqlalchemy import String
from sqlalchemy.types import TypeDecorator
import json
class JSONEncodedDict(TypeDecorator):
impl = String
cache_ok = True
def process_bind_param(self, value, dialect):
return json.dumps(value) if value is not None else None
def process_result_value(self, value, dialect):
return json.loads(value) if value is not None else None
class Note(Base):
__tablename__ = "notes"
id: Mapped[int] = mapped_column(primary_key=True)
meta: Mapped[dict] = mapped_column(JSONEncodedDict)
Output: note.meta = {"tags": ["urgent"]} survives a round-trip through the DB as serialized JSON; the application code never sees the string form.
See also
- Python: sqlalchemy — API tutorial, Core, ORM, migrations
- Packages: pip-sqlmodel — Pydantic-fused ORM on top of SQLAlchemy