cheat sheet
sqlalchemy
Connect to databases, write queries, and define ORM models with SQLAlchemy 2.0. Covers the engine, sessions, Core queries, ORM declarative models, relationships, session lifecycle, migrations with Alembic, and connection pool configuration.
sqlalchemy — SQL Toolkit & ORM
What it is
SQLAlchemy provides two layers:
- Core — SQL expression language that constructs queries as Python objects.
- ORM — maps Python classes to tables;
Sessionis the unit of work.
SQLAlchemy 2.0 unified the API and requires explicit with Session(engine) as s: patterns. It supports SQLite, PostgreSQL, MySQL, Oracle, and more.
Install
pip install sqlalchemy
# Database-specific drivers (install alongside SQLAlchemy)
pip install psycopg2-binary # PostgreSQL
pip install pymysql # MySQL
# SQLite is built into Python — no extra driver needed
Output: (none — exits 0 on success)
Quick example — Core
from sqlalchemy import create_engine, text
engine = create_engine("sqlite+pysqlite:///:memory:", echo=False)
with engine.connect() as conn:
conn.execute(text("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)"))
conn.execute(text("INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob')"))
conn.commit()
rows = conn.execute(text("SELECT * FROM users")).fetchall()
for row in rows:
print(row)
Output:
(1, 'Alice')
(2, 'Bob')
When / why to use it
- You want a Pythonic, database-agnostic query interface with good migration tooling (Alembic).
- Building a web app with dynamic queries and relationship traversal.
- When you need both Core (low-level control) and ORM (convenience) in the same project.
Common pitfalls
N+1 query problem — lazy-loading relationships fires one SQL query per ORM object when you iterate. Fix with eager loading:
select(User).options(selectinload(User.posts)).
Sessionis not thread-safe — one session per request (web) or per thread. Never share aSessionacross threads.
text()wrapper required — SQLAlchemy 2.0 requirestext()around raw SQL strings. Bare strings will raiseObjectNotExecutableError.
Richer example — ORM with 2.0-style declarations
from sqlalchemy import create_engine, String, ForeignKey, select
from sqlalchemy.orm import (
DeclarativeBase, Mapped, mapped_column, relationship, Session
)
class Base(DeclarativeBase):
pass
class Department(Base):
__tablename__ = "department"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50))
employees: Mapped[list["Employee"]] = relationship(back_populates="department")
class Employee(Base):
__tablename__ = "employee"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
dept_id: Mapped[int] = mapped_column(ForeignKey("department.id"))
department: Mapped["Department"] = relationship(back_populates="employees")
engine = create_engine("sqlite+pysqlite:///:memory:")
Base.metadata.create_all(engine)
with Session(engine) as session:
eng = Department(id=1, name="Engineering")
session.add_all([
eng,
Employee(id=1, name="Alice", dept_id=1),
Employee(id=2, name="Bob", dept_id=1),
])
session.commit()
stmt = select(Employee).where(Employee.name.startswith("A"))
for emp in session.scalars(stmt):
print(f"{emp.name} → {emp.department.name}")
Output:
Alice → Engineering
Connection strings
SQLAlchemy uses a URL of the form dialect+driver://user:password@host:port/database to identify both the database backend and the DBAPI driver. The dialect determines which SQL dialect SQLAlchemy generates; the driver selects the underlying Python library (e.g. psycopg2 vs asyncpg for PostgreSQL). Omitting the driver picks the default for that dialect.
# SQLite (file)
engine = create_engine("sqlite:///app.db")
# SQLite (in-memory)
engine = create_engine("sqlite+pysqlite:///:memory:")
# PostgreSQL
engine = create_engine("postgresql+psycopg2://user:pass@localhost/dbname")
# MySQL
engine = create_engine("mysql+pymysql://user:pass@localhost/dbname")
Quick reference
| Task | Code |
|---|---|
| Create engine | create_engine(url, echo=True) |
| Run raw SQL | conn.execute(text("SELECT 1")) |
| Insert ORM | session.add(obj) / session.add_all([...]) |
| Commit | session.commit() |
| Query all | session.scalars(select(Model)).all() |
| Filter | .where(Model.col == val) |
| Update | session.execute(update(Model).where(...).values(...)) |
| Delete | session.execute(delete(Model).where(...)) |
| Get by PK | session.get(Model, pk_value) |
Relationships
relationship() declares the ORM-level link between two mapped classes, while the underlying ForeignKey column is the actual database constraint. back_populates keeps both sides of a bidirectional relationship in sync in Python; lazy loading is the default (a SELECT fires when you first access the attribute), but this causes N+1 queries in loops — override with lazy="selectin" or use joinedload/selectinload at query time.
from sqlalchemy import ForeignKey, String, Integer
from sqlalchemy.orm import relationship, DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class Author(Base):
__tablename__ = "authors"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100))
books: Mapped[list["Book"]] = relationship("Book", back_populates="author")
class Book(Base):
__tablename__ = "books"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
author_id: Mapped[int] = mapped_column(ForeignKey("authors.id"))
author: Mapped["Author"] = relationship("Author", back_populates="books")
# Loading strategies
from sqlalchemy.orm import joinedload, selectinload
# joinedload — SQL JOIN (good for single FK, one row per parent)
authors = session.execute(
select(Author).options(joinedload(Author.books))
).unique().scalars().all()
# selectinload — separate IN query (better for collections)
authors = session.execute(
select(Author).options(selectinload(Author.books))
).scalars().all()
for author in authors:
print(f"{author.name}: {[b.title for b in author.books]}")
Output:
Alice Smith: ['Django Mastery', 'Python Tricks']
Bob Jones: ['SQLAlchemy in Action']
Session lifecycle
The Session implements the Unit of Work pattern: it tracks all ORM objects loaded or added within its scope and flushes them to the database as a single batch before each query or on explicit flush(). Calling commit() persists the transaction and expires all loaded objects so they reload fresh values on next access; rollback() discards all pending changes back to the last commit.
from sqlalchemy.orm import Session
with Session(engine) as session:
# Add new record
author = Author(name="Alice Smith")
session.add(author)
session.flush() # writes to DB but does NOT commit; author.id is now set
print(f"Author id after flush: {author.id}")
book = Book(title="Django Mastery", author_id=author.id)
session.add(book)
session.commit() # commits everything; begins new transaction
print(f"Book id: {book.id}")
Output:
Author id after flush: 1
Book id: 1
# Rollback on error
with Session(engine) as session:
try:
session.add(Author(name="Duplicate"))
session.add(Author(name="Duplicate")) # unique constraint violation
session.commit()
except Exception as e:
session.rollback()
print(f"Rolled back: {e}")
Output:
Rolled back: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "authors_name_key"
# expire_on_commit=False to access attributes after commit
session = Session(engine, expire_on_commit=False)
SQLAlchemy 2.0 select() style
SQLAlchemy 2.0 replaced the legacy session.query(Model) API with select(Model) passed to session.execute(), aligning ORM and Core into a single query interface. Use session.scalars(stmt) as a shortcut when you want ORM objects directly (equivalent to session.execute(stmt).scalars()); use session.execute(stmt) when you need row tuples or column-level results.
from sqlalchemy import select, text
with Session(engine) as session:
# Select all
stmt = select(Author)
authors = session.execute(stmt).scalars().all()
for a in authors:
print(a.name)
Output:
Alice Smith
Bob Jones
Carol White
# Filter, order, limit
stmt = (
select(Book)
.where(Book.title.like("%Python%"))
.order_by(Book.title)
.limit(5)
)
books = session.execute(stmt).scalars().all()
for b in books:
print(f"{b.title} — {b.author.name}")
Output:
Python Tricks — Alice Smith
# Count and aggregate
from sqlalchemy import func
stmt = select(
Author.name,
func.count(Book.id).label("book_count")
).join(Book, isouter=True).group_by(Author.name).order_by(func.count(Book.id).desc())
for name, count in session.execute(stmt):
print(f"{name}: {count} books")
Output:
Alice Smith: 2 books
Bob Jones: 1 books
Carol White: 0 books
# Upsert (insert or update)
from sqlalchemy.dialects.postgresql import insert
stmt = insert(Author).values(name="Diana").on_conflict_do_update(
index_elements=["name"], set_={"name": "Diana"}
)
session.execute(stmt)
session.commit()
Transactions and context managers
with Session(engine) as session: ensures the session is closed on exit but does not auto-commit — you must call session.commit() explicitly. with session.begin(): wraps the block in a transaction that auto-commits on success and auto-rolls back on exception. Nest session.begin_nested() inside to create a SAVEPOINT, letting you recover from inner failures without aborting the outer transaction.
# Nested transactions with savepoints
with Session(engine) as session:
with session.begin():
author = Author(name="Eve")
session.add(author)
try:
with session.begin_nested(): # SAVEPOINT
session.add(Book(title="Bad Book", author_id=999)) # FK violation
except Exception:
pass # savepoint rolled back, outer tx continues
session.add(Book(title="Good Book", author_id=author.id))
# outer begin() auto-commits here
Alembic migrations
Alembic is SQLAlchemy's companion migration tool: env.py connects Alembic to your engine and Base.metadata so --autogenerate can diff models against the live schema to produce upgrade/downgrade functions. Each migration is a versioned Python file; run alembic upgrade head to apply all pending migrations and alembic downgrade -1 to roll back one step.
# Initialize Alembic
alembic init migrations
Output:
Creating directory /project/migrations ... done
Creating directory /project/migrations/versions ... done
Generating /project/migrations/env.py ... done
Generating /project/alembic.ini ... done
# Generate a migration from model changes
alembic revision --autogenerate -m "add books table"
Output:
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.autogenerate.compare] Detected added table 'books'
Generating /project/migrations/versions/3a4b5c6d_add_books_table.py ... done
# Apply all pending migrations
alembic upgrade head
Output:
INFO [alembic.runtime.migration] Running upgrade -> 3a4b5c6d, add books table
# Rollback one migration
alembic downgrade -1
Output:
INFO [alembic.runtime.migration] Running downgrade 3a4b5c6d -> , add books table
# Show migration history
alembic history --verbose
Output:
Rev: 3a4b5c6d (head)
Parent: <base>
Path: migrations/versions/3a4b5c6d_add_books_table.py
...add books table...
Connection pool configuration
SQLAlchemy maintains a pool of persistent database connections to avoid the overhead of creating a new connection on every request. pool_size sets the number of connections kept open; max_overflow allows extra connections under load (they are closed when released); pool_pre_ping=True sends a lightweight ping before each checkout to detect stale connections, preventing errors from connections dropped by the database or a proxy.
from sqlalchemy import create_engine
engine = create_engine(
"postgresql+psycopg2://user:pass@localhost/mydb",
pool_size=10, # number of persistent connections
max_overflow=5, # extra connections when pool is full
pool_timeout=30, # seconds to wait for a connection
pool_recycle=1800, # recycle connections after 30 min (avoid stale)
pool_pre_ping=True, # test connection health before use
)
# Inspect pool status
from sqlalchemy import event
@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, conn_record, conn_proxy):
print(f"Connection checked out: pool size={engine.pool.size()}, checked_out={engine.pool.checkedout()}")
Output:
Connection checked out: pool size=10, checked_out=1
Core vs ORM — when to use which
SQLAlchemy ships two complementary APIs that share the same engine, connection, and SQL expression layer. Core treats tables as Table objects and queries as Select/Insert/Update constructs — there are no Python model classes, just SQL built from primitives. ORM maps Python classes to tables and tracks objects in a Session (the Unit of Work). The two interoperate freely — you can drop down from ORM to Core within one session.execute() call, and a single connection serves both.
| Concern | Core | ORM |
|---|---|---|
| Mental model | SQL-builder | Object-relational mapping |
| Returns | Row tuples | Mapped instances |
| Identity tracking | None | Per-session identity map |
| Change tracking | Manual | Unit-of-work autoflush |
| Bulk performance | Excellent | Good with session.execute(insert(...)) |
| Ad-hoc DDL/ETL | Ideal | Awkward |
| Web app with FK traversal | Possible | Idiomatic |
# Core — pure SQL builder; rows are tuples
from sqlalchemy import Table, Column, Integer, String, MetaData, select, insert
metadata = MetaData()
users = Table(
"users", metadata,
Column("id", Integer, primary_key=True),
Column("name", String(50)),
Column("email", String(120), unique=True),
)
metadata.create_all(engine)
with engine.begin() as conn: # begin() auto-commits
conn.execute(insert(users), [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
])
for row in conn.execute(select(users).where(users.c.name.startswith("A"))):
print(row.name, row.email)
Output:
Alice alice@example.com
Reach for Core when you're scripting ETL, doing bulk operations, or writing a one-off migration helper. Reach for the ORM when you're building a real application with relationships and business logic that benefits from typed objects.
Engine, connection, and session — the three-layer model
These are the three layers every SQLAlchemy program builds on. The engine is a connection factory plus a pool — created once per database URL, per process. A connection is a single DBAPI connection checked out of the pool; it has a transactional context and is what actually issues SQL. A session wraps one or more connections with ORM identity tracking, change detection, and lazy loading.
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
# 1. Engine — one per database URL, share across the app
engine = create_engine("sqlite:///app.db", echo=False, future=True)
# 2. Connection — checked out from the engine's pool
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
conn.commit() # explicit commit needed when using connect()
# Same result via engine.begin() — auto-commits on success, auto-rolls back on error
with engine.begin() as conn:
conn.execute(text("SELECT 1"))
# 3. Session — tracks ORM objects, manages transactions
with Session(engine) as session:
session.execute(text("SELECT 1"))
session.commit()
Output: (none — exits 0 on success)
sessionmaker — the recommended factory
In long-running apps you don't construct Session directly. Use sessionmaker(engine) to bind a default engine and produce sessions via SessionLocal(). FastAPI / Flask dependency injection patterns rely on this.
from sqlalchemy.orm import sessionmaker
SessionLocal = sessionmaker(bind=engine, expire_on_commit=False, autoflush=True)
def get_session():
with SessionLocal() as session:
yield session # FastAPI/Starlette dependency
Set
expire_on_commit=Falsein web frameworks. Aftercommit()SQLAlchemy expires all loaded objects by default, so attribute access re-fetches from the DB — usually wrong when you've already serialized the response.
Declarative models in depth
SQLAlchemy 2.0's declarative API uses DeclarativeBase plus type-annotated Mapped[T] columns. Annotations drive the column type inference (int → Integer, str → String, datetime → DateTime), and mapped_column(...) overrides the default whenever you need a constraint, index, or different type.
from datetime import datetime
from sqlalchemy import String, ForeignKey, Index, UniqueConstraint, CheckConstraint
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
__table_args__ = (
UniqueConstraint("email", name="uq_users_email"),
Index("ix_users_active_name", "is_active", "name"),
CheckConstraint("age >= 0", name="ck_users_age_nonneg"),
)
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100), index=True)
email: Mapped[str] = mapped_column(String(255))
age: Mapped[int | None] = mapped_column(default=None)
is_active: Mapped[bool] = mapped_column(default=True)
created: Mapped[datetime] = mapped_column(default=datetime.utcnow)
A few details worth knowing:
Mapped[int | None](orOptional[int]) marks the column NULLable.Mapped[int]is NOT NULL.default=...sets a Python-side default;server_default=text("CURRENT_TIMESTAMP")sets a database-side default.__table_args__is the catch-all for constraints, indexes, and PostgreSQL-specific options (postgresql_partition_by="RANGE (created)").
Type-annotation maps for custom types
For domain-specific Python types (UUID, decimal money, JSON), register a type_annotation_map so SQLAlchemy picks the right column type automatically.
from decimal import Decimal
from uuid import UUID
from sqlalchemy import Numeric
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
type_annotation_map = {
Decimal: Numeric(12, 2),
UUID: PG_UUID(as_uuid=True),
}
Any Mapped[Decimal] field on a subclass now uses Numeric(12, 2) without an explicit mapped_column() argument.
Many-to-many with association tables
A pure many-to-many relationship uses a separate Table object as the link, declared once and reused on both sides via secondary=.
from sqlalchemy import Table, Column, ForeignKey, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase): pass
post_tag = Table(
"post_tag", Base.metadata,
Column("post_id", ForeignKey("post.id"), primary_key=True),
Column("tag_id", ForeignKey("tag.id"), primary_key=True),
)
class Post(Base):
__tablename__ = "post"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
tags: Mapped[list["Tag"]] = relationship(secondary=post_tag, back_populates="posts")
class Tag(Base):
__tablename__ = "tag"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[list[Post]] = relationship(secondary=post_tag, back_populates="tags")
with Session(engine) as s:
py = Tag(name="python")
web = Tag(name="web")
p = Post(title="FastAPI tour", tags=[py, web])
s.add(p)
s.commit()
s.refresh(p)
print(p.title, [t.name for t in p.tags])
Output:
FastAPI tour ['python', 'web']
Association object pattern
When the link itself needs columns (e.g. created_at, added_by), the plain Table isn't enough — promote the link to a full mapped class. This is the association object pattern.
from datetime import datetime
from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase): pass
class PostTag(Base):
__tablename__ = "post_tag"
post_id: Mapped[int] = mapped_column(ForeignKey("post.id"), primary_key=True)
tag_id: Mapped[int] = mapped_column(ForeignKey("tag.id"), primary_key=True)
added_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
added_by: Mapped[str | None] = mapped_column(String(80))
post: Mapped["Post"] = relationship(back_populates="tag_links")
tag: Mapped["Tag"] = relationship(back_populates="post_links")
class Post(Base):
__tablename__ = "post"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
tag_links: Mapped[list[PostTag]] = relationship(back_populates="post")
class Tag(Base):
__tablename__ = "tag"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
post_links: Mapped[list[PostTag]] = relationship(back_populates="tag")
with Session(engine) as s:
py = Tag(name="python")
p = Post(title="async/await", tag_links=[
PostTag(tag=py, added_by="alice"),
])
s.add(p)
s.commit()
for link in p.tag_links:
print(f"{link.post.title} ← {link.tag.name} (by {link.added_by})")
Output:
async/await ← python (by alice)
Querying — select() patterns in depth
Every SQLAlchemy 2.0 query is built from select(...) then handed to session.execute() or session.scalars(). The patterns below cover joins, filtering, aggregation, subqueries, CTEs, and window functions — the day-to-day vocabulary of any non-trivial application.
Joins — explicit ON vs FK auto-detection
from sqlalchemy import select
with Session(engine) as s:
# Implicit join via relationship
stmt = select(Book.title, Author.name).join(Book.author)
# Explicit JOIN ... ON
stmt = select(Book, Author).join(Author, Book.author_id == Author.id)
# LEFT OUTER JOIN
stmt = select(Author, Book).outerjoin(Book, Book.author_id == Author.id)
# Multi-step join
stmt = (
select(Author.name, Book.title, Tag.name)
.join(Book, Book.author_id == Author.id)
.join(Book.tags)
)
Filtering — combining conditions
from sqlalchemy import select, and_, or_, not_
stmt = (
select(Book)
.where(
Book.title.ilike("%python%"), # ILIKE
Book.published_at.between("2020-01-01", "2025-12-31"),
or_(Book.is_featured == True, Book.rating >= 4.5), # noqa: E712
)
.where(not_(Book.is_deleted))
)
Multiple
.where(...)clauses are AND'd together — the most readable form. Useor_(...)only when you need disjunction.
Aggregation — func.count, func.sum, GROUP BY
from sqlalchemy import select, func
stmt = (
select(
Author.name,
func.count(Book.id).label("book_count"),
func.coalesce(func.sum(Book.pages), 0).label("total_pages"),
)
.join(Book, Book.author_id == Author.id, isouter=True)
.group_by(Author.id, Author.name)
.having(func.count(Book.id) > 0)
.order_by(func.count(Book.id).desc())
)
with Session(engine) as s:
for name, n, pages in s.execute(stmt):
print(f"{name:<20} {n:>3d} books {pages:>6d} pages")
Output:
Alice Smith 12 2840 pages
Bob Jones 5 1100 pages
Scalar subqueries
from sqlalchemy import select, func
avg_pages = select(func.avg(Book.pages)).scalar_subquery()
stmt = select(Book.title, (Book.pages - avg_pages).label("delta")).order_by("delta")
CTEs (common table expressions)
A CTE (WITH recent AS (...)) is the cleanest way to compose a query that references the same intermediate result twice.
from sqlalchemy import select, func
recent = (
select(Book.id, Book.author_id, Book.pages)
.where(Book.published_at >= "2024-01-01")
.cte("recent")
)
stmt = (
select(Author.name, func.sum(recent.c.pages).label("pages_2024"))
.join(recent, recent.c.author_id == Author.id)
.group_by(Author.id, Author.name)
)
Window functions
from sqlalchemy import select, func
stmt = select(
Book.title,
Book.pages,
func.rank().over(
partition_by=Book.author_id,
order_by=Book.pages.desc(),
).label("rank_in_author"),
)
Eager vs lazy loading — selectinload vs joinedload
Lazy loading (the default) fires one SQL query per attribute access on each parent — the classic N+1 problem that turns a 100-row page into 101 round-trips. Eager loading fetches related rows upfront via either a SQL JOIN (joinedload) or a separate IN (...) query (selectinload). The choice depends on the cardinality of the relationship.
| Strategy | Generates | Best for | Watch out for |
|---|---|---|---|
selectinload(Model.rel) | Second SELECT … WHERE id IN (…) | One-to-many collections | Two round-trips instead of one |
joinedload(Model.rel) | Single SELECT … LEFT OUTER JOIN … | Many-to-one, one-to-one | Row duplication for collections (use .unique()) |
subqueryload(Model.rel) | SELECT … FROM (subquery) | Legacy; rarely used now | Slower than selectinload |
raiseload(Model.rel) | Raises on access | Catching accidental lazy loads in tests | Strict — every access must be eager |
Default (lazy="select") | One query per access | Quick scripts | N+1 in loops |
from sqlalchemy import select
from sqlalchemy.orm import selectinload, joinedload, contains_eager, raiseload
# Best for collections — two queries instead of N+1
authors = session.scalars(
select(Author).options(selectinload(Author.books))
).all()
# Best for many-to-one (single related row per parent)
books = session.scalars(
select(Book).options(joinedload(Book.author))
).all()
# Chained — load Author -> Books -> Tags
authors = session.scalars(
select(Author).options(
selectinload(Author.books).selectinload(Book.tags)
)
).all()
# raiseload — fail fast on accidental lazy access
authors = session.scalars(
select(Author).options(raiseload(Author.books))
).all()
# authors[0].books # raises sqlalchemy.exc.InvalidRequestError
Default to
selectinloadfor most relationship loads. Usejoinedloadonly for many-to-one where the parent has exactly one related row, or you need to filter / sort by a joined column.
Async support with AsyncSession
SQLAlchemy 2.0 ships first-class async support via create_async_engine and AsyncSession. The API mirrors the sync version with await prefixes on I/O methods. Use this when you're building async apps with FastAPI, Starlette, or any other asyncio-based framework — pair it with asyncio.
pip install "sqlalchemy[asyncio]"
pip install asyncpg # async PostgreSQL driver
pip install aiosqlite # async SQLite driver
Output: (none — exits 0 on success)
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase): pass
class Item(Base):
__tablename__ = "item"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
# Async URL form — note the `+aiosqlite` driver suffix
async_engine = create_async_engine("sqlite+aiosqlite:///app.db", echo=False)
AsyncSessionLocal = async_sessionmaker(async_engine, expire_on_commit=False)
async def main():
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async with AsyncSessionLocal() as session:
session.add_all([Item(name="alpha"), Item(name="beta")])
await session.commit()
result = await session.scalars(select(Item).order_by(Item.name))
for item in result:
print(item.id, item.name)
asyncio.run(main())
Output:
1 alpha
2 beta
Eager loading is mandatory in async
In sync sessions, lazy-loaded attributes fire a synchronous SQL query on access. In async sessions, lazy loads raise because they would block the event loop. Either eager-load with selectinload/joinedload, or call await session.refresh(obj, ["rel"]) to populate the attribute explicitly.
from sqlalchemy.orm import selectinload
async with AsyncSessionLocal() as session:
result = await session.scalars(
select(Author).options(selectinload(Author.books))
)
for author in result:
print(author.name, [b.title for b in author.books])
FastAPI dependency for async sessions
from fastapi import FastAPI, Depends
from typing import AsyncIterator
async def get_session() -> AsyncIterator[AsyncSession]:
async with AsyncSessionLocal() as session:
yield session
app = FastAPI()
@app.get("/items")
async def list_items(session: AsyncSession = Depends(get_session)):
items = (await session.scalars(select(Item))).all()
return [{"id": i.id, "name": i.name} for i in items]
Inspection API — schema introspection at runtime
The inspect() function returns an Inspector for an engine (or a Mapper for an ORM class). Use it to enumerate tables, columns, indexes, and foreign keys at runtime — essential for migration tooling, admin dashboards, or any code that needs to reason about the live schema.
from sqlalchemy import inspect, create_engine
engine = create_engine("sqlite:///app.db")
insp = inspect(engine)
print("schemas:", insp.get_schema_names())
print("tables:", insp.get_table_names())
for col in insp.get_columns("users"):
print(f" {col['name']:<20} {col['type']!s:<20} nullable={col['nullable']}")
for fk in insp.get_foreign_keys("orders"):
print("FK:", fk["constrained_columns"], "→", fk["referred_table"], fk["referred_columns"])
for idx in insp.get_indexes("users"):
print("IDX:", idx["name"], idx["column_names"], "unique" if idx["unique"] else "")
Output:
schemas: ['main']
tables: ['users', 'orders', 'items']
id INTEGER nullable=False
name VARCHAR(100) nullable=False
email VARCHAR(255) nullable=False
FK: ['user_id'] → users ['id']
IDX: ix_users_active_name ['is_active', 'name']
The mapper-level inspector is what frameworks like Flask-Admin use to generate forms:
from sqlalchemy import inspect
mapper = inspect(User)
for col in mapper.columns:
print(col.name, col.type, col.nullable)
for rel in mapper.relationships:
print("REL:", rel.key, "→", rel.mapper.class_.__name__)
SQLAlchemy vs SQLModel
SQLAlchemy is the foundation; SQLModel sits on top, gluing SQLAlchemy ORM and Pydantic v2 together so one class serves as both table and validation model. The trade-off is convenience vs flexibility.
| Concern | SQLAlchemy 2.0 | SQLModel |
|---|---|---|
| Declares | Class + Mapped[T] + mapped_column(...) | Class + Field(...) (typed via Pydantic) |
| Validation | None at the ORM layer | Pydantic validation on every assignment |
| API layer | Separate Pydantic models | Same class via table=True |
| Async | First-class (AsyncSession) | Via SQLAlchemy under the hood |
| Inspection | Full inspect() API | Same (inherits) |
| Composite primary keys, custom types | Native | Limited — drop down to SQLAlchemy |
| Author | SQLAlchemy team | FastAPI author |
| Best fit | Library code, complex schemas, ETL | FastAPI apps, prototype-to-prod, single-class projects |
Rule of thumb: pick SQLModel when your data model is straightforward and your API layer is FastAPI; pick SQLAlchemy when you need composite keys, custom column types, complex inheritance, or fine-grained control over the query layer.
Real-world recipes
Bulk insert with insert(...).values([...])
The fastest way to insert N rows is one INSERT statement with many value tuples. session.add_all([...]) works but autoflushes — for >1000 rows this becomes the bottleneck. Use session.execute(insert(Model), [...]) for raw speed.
from sqlalchemy import insert
with Session(engine) as session, session.begin():
session.execute(
insert(User),
[
{"name": f"user-{i}", "email": f"user{i}@example.com"}
for i in range(10_000)
],
)
print("inserted 10,000 users")
Output:
inserted 10,000 users
Upsert (PostgreSQL ON CONFLICT)
PostgreSQL's INSERT ... ON CONFLICT ... DO UPDATE is the canonical upsert. SQLAlchemy exposes it via postgresql.insert(). SQLite has the same syntax via sqlite.insert(). MySQL uses ON DUPLICATE KEY UPDATE via mysql.insert().
from sqlalchemy.dialects.postgresql import insert
stmt = insert(User).values(
name="alice", email="alice@example.com", age=30
)
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={"name": stmt.excluded.name, "age": stmt.excluded.age},
)
with Session(engine) as s:
s.execute(stmt)
s.commit()
Streaming a huge result set
When a query returns millions of rows, fetching them all at once blows up memory. stream_results=True (PostgreSQL: server-side cursor) lets you iterate row-by-row. Combine with yield_per(N) for per-batch cleanup.
from sqlalchemy import select
with engine.connect().execution_options(stream_results=True) as conn:
for row in conn.execute(select(User)).yield_per(1000):
# process one row at a time
...
For ORM:
with Session(engine) as s:
stmt = select(User).execution_options(stream_results=True, yield_per=500)
for user in s.scalars(stmt):
...
Repository pattern with typed methods
A small wrapper class hides session plumbing behind named methods. Great for testability — tests inject a fake session, code stays readable.
from typing import Sequence
from sqlalchemy import select
from sqlalchemy.orm import Session
class UserRepo:
def __init__(self, session: Session):
self.s = session
def get(self, user_id: int) -> User | None:
return self.s.get(User, user_id)
def by_email(self, email: str) -> User | None:
return self.s.scalar(select(User).where(User.email == email))
def search(self, name_like: str, limit: int = 20) -> Sequence[User]:
stmt = select(User).where(User.name.ilike(f"%{name_like}%")).limit(limit)
return self.s.scalars(stmt).all()
def create(self, **kwargs) -> User:
u = User(**kwargs)
self.s.add(u)
self.s.flush()
return u
with Session(engine) as s:
repo = UserRepo(s)
user = repo.create(name="Alice", email="alice@example.com")
s.commit()
print(repo.by_email("alice@example.com").name)
Output:
Alice
Auditing with SQLAlchemy events
event.listens_for(Session, "before_flush") fires once per flush, before the SQL is sent. Use it for cross-cutting concerns like setting updated_at, enforcing immutability, or writing an audit-trail row.
from sqlalchemy import event
from sqlalchemy.orm import Session as SASession
from datetime import datetime
@event.listens_for(SASession, "before_flush")
def stamp_updated(session, flush_context, instances):
for obj in session.dirty:
if hasattr(obj, "updated_at"):
obj.updated_at = datetime.utcnow()
Test fixture — rollback after every test
The fastest test isolation strategy is to wrap each test in a transaction and rollback at the end. The DB state is identical between tests, no truncate or recreate needed.
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
@pytest.fixture(scope="session")
def engine():
eng = create_engine("sqlite:///test.db")
Base.metadata.create_all(eng)
yield eng
Base.metadata.drop_all(eng)
@pytest.fixture
def session(engine):
conn = engine.connect()
txn = conn.begin()
session = Session(bind=conn)
yield session
session.close()
txn.rollback()
conn.close()
See also
- SQLModel — single-class ORM + Pydantic for FastAPI projects
- FastAPI — uses SQLAlchemy sessions via dependency injection
- Pydantic — separate API-layer validation when not using SQLModel
- DuckDB — alternative for analytical queries; SQLAlchemy has a dialect for it
- Polars — DataFrame library; complements SQLAlchemy for analytics