cheat sheet

sqlalchemy

Connect to databases, write queries, and define ORM models with SQLAlchemy 2.0. Covers the engine, sessions, Core queries, ORM declarative models, relationships, session lifecycle, migrations with Alembic, and connection pool configuration.

sqlalchemy — SQL Toolkit & ORM

What it is

SQLAlchemy provides two layers:

  • Core — SQL expression language that constructs queries as Python objects.
  • ORM — maps Python classes to tables; Session is the unit of work.

SQLAlchemy 2.0 unified the API and requires explicit with Session(engine) as s: patterns. It supports SQLite, PostgreSQL, MySQL, Oracle, and more.

Install

bash
pip install sqlalchemy
# Database-specific drivers (install alongside SQLAlchemy)
pip install psycopg2-binary   # PostgreSQL
pip install pymysql           # MySQL
# SQLite is built into Python — no extra driver needed

Output: (none — exits 0 on success)

Quick example — Core

python
from sqlalchemy import create_engine, text

engine = create_engine("sqlite+pysqlite:///:memory:", echo=False)

with engine.connect() as conn:
    conn.execute(text("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)"))
    conn.execute(text("INSERT INTO users VALUES (1, 'Alice'), (2, 'Bob')"))
    conn.commit()
    rows = conn.execute(text("SELECT * FROM users")).fetchall()
    for row in rows:
        print(row)

Output:

text
(1, 'Alice')
(2, 'Bob')

When / why to use it

  • You want a Pythonic, database-agnostic query interface with good migration tooling (Alembic).
  • Building a web app with dynamic queries and relationship traversal.
  • When you need both Core (low-level control) and ORM (convenience) in the same project.

Common pitfalls

N+1 query problem — lazy-loading relationships fires one SQL query per ORM object when you iterate. Fix with eager loading: select(User).options(selectinload(User.posts)).

Session is not thread-safe — one session per request (web) or per thread. Never share a Session across threads.

text() wrapper required — SQLAlchemy 2.0 requires text() around raw SQL strings. Bare strings will raise ObjectNotExecutableError.

Richer example — ORM with 2.0-style declarations

python
from sqlalchemy import create_engine, String, ForeignKey, select
from sqlalchemy.orm import (
    DeclarativeBase, Mapped, mapped_column, relationship, Session
)

class Base(DeclarativeBase):
    pass

class Department(Base):
    __tablename__ = "department"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50))
    employees: Mapped[list["Employee"]] = relationship(back_populates="department")

class Employee(Base):
    __tablename__ = "employee"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    dept_id: Mapped[int] = mapped_column(ForeignKey("department.id"))
    department: Mapped["Department"] = relationship(back_populates="employees")

engine = create_engine("sqlite+pysqlite:///:memory:")
Base.metadata.create_all(engine)

with Session(engine) as session:
    eng = Department(id=1, name="Engineering")
    session.add_all([
        eng,
        Employee(id=1, name="Alice", dept_id=1),
        Employee(id=2, name="Bob", dept_id=1),
    ])
    session.commit()

    stmt = select(Employee).where(Employee.name.startswith("A"))
    for emp in session.scalars(stmt):
        print(f"{emp.name}{emp.department.name}")

Output:

text
Alice → Engineering

Connection strings

SQLAlchemy uses a URL of the form dialect+driver://user:password@host:port/database to identify both the database backend and the DBAPI driver. The dialect determines which SQL dialect SQLAlchemy generates; the driver selects the underlying Python library (e.g. psycopg2 vs asyncpg for PostgreSQL). Omitting the driver picks the default for that dialect.

python
# SQLite (file)
engine = create_engine("sqlite:///app.db")
# SQLite (in-memory)
engine = create_engine("sqlite+pysqlite:///:memory:")
# PostgreSQL
engine = create_engine("postgresql+psycopg2://user:pass@localhost/dbname")
# MySQL
engine = create_engine("mysql+pymysql://user:pass@localhost/dbname")

Quick reference

TaskCode
Create enginecreate_engine(url, echo=True)
Run raw SQLconn.execute(text("SELECT 1"))
Insert ORMsession.add(obj) / session.add_all([...])
Commitsession.commit()
Query allsession.scalars(select(Model)).all()
Filter.where(Model.col == val)
Updatesession.execute(update(Model).where(...).values(...))
Deletesession.execute(delete(Model).where(...))
Get by PKsession.get(Model, pk_value)

Relationships

relationship() declares the ORM-level link between two mapped classes, while the underlying ForeignKey column is the actual database constraint. back_populates keeps both sides of a bidirectional relationship in sync in Python; lazy loading is the default (a SELECT fires when you first access the attribute), but this causes N+1 queries in loops — override with lazy="selectin" or use joinedload/selectinload at query time.

python
from sqlalchemy import ForeignKey, String, Integer
from sqlalchemy.orm import relationship, DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Author(Base):
    __tablename__ = "authors"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100))
    books: Mapped[list["Book"]] = relationship("Book", back_populates="author")

class Book(Base):
    __tablename__ = "books"
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    author_id: Mapped[int] = mapped_column(ForeignKey("authors.id"))
    author: Mapped["Author"] = relationship("Author", back_populates="books")
python
# Loading strategies
from sqlalchemy.orm import joinedload, selectinload

# joinedload — SQL JOIN (good for single FK, one row per parent)
authors = session.execute(
    select(Author).options(joinedload(Author.books))
).unique().scalars().all()

# selectinload — separate IN query (better for collections)
authors = session.execute(
    select(Author).options(selectinload(Author.books))
).scalars().all()

for author in authors:
    print(f"{author.name}: {[b.title for b in author.books]}")

Output:

text
Alice Smith: ['Django Mastery', 'Python Tricks']
Bob Jones: ['SQLAlchemy in Action']

Session lifecycle

The Session implements the Unit of Work pattern: it tracks all ORM objects loaded or added within its scope and flushes them to the database as a single batch before each query or on explicit flush(). Calling commit() persists the transaction and expires all loaded objects so they reload fresh values on next access; rollback() discards all pending changes back to the last commit.

python
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Add new record
    author = Author(name="Alice Smith")
    session.add(author)
    session.flush()          # writes to DB but does NOT commit; author.id is now set
    print(f"Author id after flush: {author.id}")

    book = Book(title="Django Mastery", author_id=author.id)
    session.add(book)
    session.commit()         # commits everything; begins new transaction
    print(f"Book id: {book.id}")

Output:

text
Author id after flush: 1
Book id: 1
python
# Rollback on error
with Session(engine) as session:
    try:
        session.add(Author(name="Duplicate"))
        session.add(Author(name="Duplicate"))   # unique constraint violation
        session.commit()
    except Exception as e:
        session.rollback()
        print(f"Rolled back: {e}")

Output:

text
Rolled back: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "authors_name_key"
python
# expire_on_commit=False to access attributes after commit
session = Session(engine, expire_on_commit=False)

SQLAlchemy 2.0 select() style

SQLAlchemy 2.0 replaced the legacy session.query(Model) API with select(Model) passed to session.execute(), aligning ORM and Core into a single query interface. Use session.scalars(stmt) as a shortcut when you want ORM objects directly (equivalent to session.execute(stmt).scalars()); use session.execute(stmt) when you need row tuples or column-level results.

python
from sqlalchemy import select, text

with Session(engine) as session:
    # Select all
    stmt = select(Author)
    authors = session.execute(stmt).scalars().all()
    for a in authors:
        print(a.name)

Output:

text
Alice Smith
Bob Jones
Carol White
python
    # Filter, order, limit
    stmt = (
        select(Book)
        .where(Book.title.like("%Python%"))
        .order_by(Book.title)
        .limit(5)
    )
    books = session.execute(stmt).scalars().all()
    for b in books:
        print(f"{b.title}{b.author.name}")

Output:

text
Python Tricks — Alice Smith
python
    # Count and aggregate
    from sqlalchemy import func

    stmt = select(
        Author.name,
        func.count(Book.id).label("book_count")
    ).join(Book, isouter=True).group_by(Author.name).order_by(func.count(Book.id).desc())

    for name, count in session.execute(stmt):
        print(f"{name}: {count} books")

Output:

text
Alice Smith: 2 books
Bob Jones: 1 books
Carol White: 0 books
python
    # Upsert (insert or update)
    from sqlalchemy.dialects.postgresql import insert

    stmt = insert(Author).values(name="Diana").on_conflict_do_update(
        index_elements=["name"], set_={"name": "Diana"}
    )
    session.execute(stmt)
    session.commit()

Transactions and context managers

with Session(engine) as session: ensures the session is closed on exit but does not auto-commit — you must call session.commit() explicitly. with session.begin(): wraps the block in a transaction that auto-commits on success and auto-rolls back on exception. Nest session.begin_nested() inside to create a SAVEPOINT, letting you recover from inner failures without aborting the outer transaction.

python
# Nested transactions with savepoints
with Session(engine) as session:
    with session.begin():
        author = Author(name="Eve")
        session.add(author)

        try:
            with session.begin_nested():   # SAVEPOINT
                session.add(Book(title="Bad Book", author_id=999))   # FK violation
        except Exception:
            pass   # savepoint rolled back, outer tx continues

        session.add(Book(title="Good Book", author_id=author.id))
    # outer begin() auto-commits here

Alembic migrations

Alembic is SQLAlchemy's companion migration tool: env.py connects Alembic to your engine and Base.metadata so --autogenerate can diff models against the live schema to produce upgrade/downgrade functions. Each migration is a versioned Python file; run alembic upgrade head to apply all pending migrations and alembic downgrade -1 to roll back one step.

bash
# Initialize Alembic
alembic init migrations

Output:

text
  Creating directory /project/migrations ...  done
  Creating directory /project/migrations/versions ...  done
  Generating /project/migrations/env.py ...  done
  Generating /project/alembic.ini ...  done
bash
# Generate a migration from model changes
alembic revision --autogenerate -m "add books table"

Output:

text
INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO  [alembic.autogenerate.compare] Detected added table 'books'
  Generating /project/migrations/versions/3a4b5c6d_add_books_table.py ...  done
bash
# Apply all pending migrations
alembic upgrade head

Output:

text
INFO  [alembic.runtime.migration] Running upgrade  -> 3a4b5c6d, add books table
bash
# Rollback one migration
alembic downgrade -1

Output:

text
INFO  [alembic.runtime.migration] Running downgrade 3a4b5c6d -> , add books table
bash
# Show migration history
alembic history --verbose

Output:

text
Rev: 3a4b5c6d (head)
Parent: <base>
Path: migrations/versions/3a4b5c6d_add_books_table.py
...add books table...

Connection pool configuration

SQLAlchemy maintains a pool of persistent database connections to avoid the overhead of creating a new connection on every request. pool_size sets the number of connections kept open; max_overflow allows extra connections under load (they are closed when released); pool_pre_ping=True sends a lightweight ping before each checkout to detect stale connections, preventing errors from connections dropped by the database or a proxy.

python
from sqlalchemy import create_engine

engine = create_engine(
    "postgresql+psycopg2://user:pass@localhost/mydb",
    pool_size=10,          # number of persistent connections
    max_overflow=5,        # extra connections when pool is full
    pool_timeout=30,       # seconds to wait for a connection
    pool_recycle=1800,     # recycle connections after 30 min (avoid stale)
    pool_pre_ping=True,    # test connection health before use
)
python
# Inspect pool status
from sqlalchemy import event

@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, conn_record, conn_proxy):
    print(f"Connection checked out: pool size={engine.pool.size()}, checked_out={engine.pool.checkedout()}")

Output:

text
Connection checked out: pool size=10, checked_out=1

Core vs ORM — when to use which

SQLAlchemy ships two complementary APIs that share the same engine, connection, and SQL expression layer. Core treats tables as Table objects and queries as Select/Insert/Update constructs — there are no Python model classes, just SQL built from primitives. ORM maps Python classes to tables and tracks objects in a Session (the Unit of Work). The two interoperate freely — you can drop down from ORM to Core within one session.execute() call, and a single connection serves both.

ConcernCoreORM
Mental modelSQL-builderObject-relational mapping
ReturnsRow tuplesMapped instances
Identity trackingNonePer-session identity map
Change trackingManualUnit-of-work autoflush
Bulk performanceExcellentGood with session.execute(insert(...))
Ad-hoc DDL/ETLIdealAwkward
Web app with FK traversalPossibleIdiomatic
python
# Core — pure SQL builder; rows are tuples
from sqlalchemy import Table, Column, Integer, String, MetaData, select, insert

metadata = MetaData()
users = Table(
    "users", metadata,
    Column("id", Integer, primary_key=True),
    Column("name", String(50)),
    Column("email", String(120), unique=True),
)
metadata.create_all(engine)

with engine.begin() as conn:                       # begin() auto-commits
    conn.execute(insert(users), [
        {"name": "Alice", "email": "alice@example.com"},
        {"name": "Bob",   "email": "bob@example.com"},
    ])
    for row in conn.execute(select(users).where(users.c.name.startswith("A"))):
        print(row.name, row.email)

Output:

text
Alice alice@example.com

Reach for Core when you're scripting ETL, doing bulk operations, or writing a one-off migration helper. Reach for the ORM when you're building a real application with relationships and business logic that benefits from typed objects.

Engine, connection, and session — the three-layer model

These are the three layers every SQLAlchemy program builds on. The engine is a connection factory plus a pool — created once per database URL, per process. A connection is a single DBAPI connection checked out of the pool; it has a transactional context and is what actually issues SQL. A session wraps one or more connections with ORM identity tracking, change detection, and lazy loading.

python
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session

# 1. Engine — one per database URL, share across the app
engine = create_engine("sqlite:///app.db", echo=False, future=True)

# 2. Connection — checked out from the engine's pool
with engine.connect() as conn:
    conn.execute(text("SELECT 1"))
    conn.commit()           # explicit commit needed when using connect()

# Same result via engine.begin() — auto-commits on success, auto-rolls back on error
with engine.begin() as conn:
    conn.execute(text("SELECT 1"))

# 3. Session — tracks ORM objects, manages transactions
with Session(engine) as session:
    session.execute(text("SELECT 1"))
    session.commit()

Output: (none — exits 0 on success)

In long-running apps you don't construct Session directly. Use sessionmaker(engine) to bind a default engine and produce sessions via SessionLocal(). FastAPI / Flask dependency injection patterns rely on this.

python
from sqlalchemy.orm import sessionmaker

SessionLocal = sessionmaker(bind=engine, expire_on_commit=False, autoflush=True)

def get_session():
    with SessionLocal() as session:
        yield session            # FastAPI/Starlette dependency

Set expire_on_commit=False in web frameworks. After commit() SQLAlchemy expires all loaded objects by default, so attribute access re-fetches from the DB — usually wrong when you've already serialized the response.

Declarative models in depth

SQLAlchemy 2.0's declarative API uses DeclarativeBase plus type-annotated Mapped[T] columns. Annotations drive the column type inference (int → Integer, str → String, datetime → DateTime), and mapped_column(...) overrides the default whenever you need a constraint, index, or different type.

python
from datetime import datetime
from sqlalchemy import String, ForeignKey, Index, UniqueConstraint, CheckConstraint
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    __table_args__ = (
        UniqueConstraint("email", name="uq_users_email"),
        Index("ix_users_active_name", "is_active", "name"),
        CheckConstraint("age >= 0", name="ck_users_age_nonneg"),
    )
    id:        Mapped[int]      = mapped_column(primary_key=True)
    name:      Mapped[str]      = mapped_column(String(100), index=True)
    email:     Mapped[str]      = mapped_column(String(255))
    age:       Mapped[int | None] = mapped_column(default=None)
    is_active: Mapped[bool]     = mapped_column(default=True)
    created:   Mapped[datetime] = mapped_column(default=datetime.utcnow)

A few details worth knowing:

  • Mapped[int | None] (or Optional[int]) marks the column NULLable. Mapped[int] is NOT NULL.
  • default=... sets a Python-side default; server_default=text("CURRENT_TIMESTAMP") sets a database-side default.
  • __table_args__ is the catch-all for constraints, indexes, and PostgreSQL-specific options (postgresql_partition_by="RANGE (created)").

Type-annotation maps for custom types

For domain-specific Python types (UUID, decimal money, JSON), register a type_annotation_map so SQLAlchemy picks the right column type automatically.

python
from decimal import Decimal
from uuid import UUID
from sqlalchemy import Numeric
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    type_annotation_map = {
        Decimal: Numeric(12, 2),
        UUID:    PG_UUID(as_uuid=True),
    }

Any Mapped[Decimal] field on a subclass now uses Numeric(12, 2) without an explicit mapped_column() argument.

Many-to-many with association tables

A pure many-to-many relationship uses a separate Table object as the link, declared once and reused on both sides via secondary=.

python
from sqlalchemy import Table, Column, ForeignKey, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

class Base(DeclarativeBase): pass

post_tag = Table(
    "post_tag", Base.metadata,
    Column("post_id", ForeignKey("post.id"), primary_key=True),
    Column("tag_id",  ForeignKey("tag.id"),  primary_key=True),
)

class Post(Base):
    __tablename__ = "post"
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    tags: Mapped[list["Tag"]] = relationship(secondary=post_tag, back_populates="posts")

class Tag(Base):
    __tablename__ = "tag"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)
    posts: Mapped[list[Post]] = relationship(secondary=post_tag, back_populates="tags")
python
with Session(engine) as s:
    py  = Tag(name="python")
    web = Tag(name="web")
    p = Post(title="FastAPI tour", tags=[py, web])
    s.add(p)
    s.commit()
    s.refresh(p)
    print(p.title, [t.name for t in p.tags])

Output:

text
FastAPI tour ['python', 'web']

Association object pattern

When the link itself needs columns (e.g. created_at, added_by), the plain Table isn't enough — promote the link to a full mapped class. This is the association object pattern.

python
from datetime import datetime
from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship

class Base(DeclarativeBase): pass

class PostTag(Base):
    __tablename__ = "post_tag"
    post_id: Mapped[int] = mapped_column(ForeignKey("post.id"), primary_key=True)
    tag_id:  Mapped[int] = mapped_column(ForeignKey("tag.id"),  primary_key=True)
    added_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    added_by: Mapped[str | None] = mapped_column(String(80))

    post: Mapped["Post"] = relationship(back_populates="tag_links")
    tag:  Mapped["Tag"]  = relationship(back_populates="post_links")

class Post(Base):
    __tablename__ = "post"
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    tag_links: Mapped[list[PostTag]] = relationship(back_populates="post")

class Tag(Base):
    __tablename__ = "tag"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True)
    post_links: Mapped[list[PostTag]] = relationship(back_populates="tag")
python
with Session(engine) as s:
    py = Tag(name="python")
    p  = Post(title="async/await", tag_links=[
        PostTag(tag=py, added_by="alice"),
    ])
    s.add(p)
    s.commit()
    for link in p.tag_links:
        print(f"{link.post.title}{link.tag.name} (by {link.added_by})")

Output:

text
async/await ← python (by alice)

Querying — select() patterns in depth

Every SQLAlchemy 2.0 query is built from select(...) then handed to session.execute() or session.scalars(). The patterns below cover joins, filtering, aggregation, subqueries, CTEs, and window functions — the day-to-day vocabulary of any non-trivial application.

Joins — explicit ON vs FK auto-detection

python
from sqlalchemy import select

with Session(engine) as s:
    # Implicit join via relationship
    stmt = select(Book.title, Author.name).join(Book.author)

    # Explicit JOIN ... ON
    stmt = select(Book, Author).join(Author, Book.author_id == Author.id)

    # LEFT OUTER JOIN
    stmt = select(Author, Book).outerjoin(Book, Book.author_id == Author.id)

    # Multi-step join
    stmt = (
        select(Author.name, Book.title, Tag.name)
        .join(Book, Book.author_id == Author.id)
        .join(Book.tags)
    )

Filtering — combining conditions

python
from sqlalchemy import select, and_, or_, not_

stmt = (
    select(Book)
    .where(
        Book.title.ilike("%python%"),               # ILIKE
        Book.published_at.between("2020-01-01", "2025-12-31"),
        or_(Book.is_featured == True, Book.rating >= 4.5),  # noqa: E712
    )
    .where(not_(Book.is_deleted))
)

Multiple .where(...) clauses are AND'd together — the most readable form. Use or_(...) only when you need disjunction.

Aggregation — func.count, func.sum, GROUP BY

python
from sqlalchemy import select, func

stmt = (
    select(
        Author.name,
        func.count(Book.id).label("book_count"),
        func.coalesce(func.sum(Book.pages), 0).label("total_pages"),
    )
    .join(Book, Book.author_id == Author.id, isouter=True)
    .group_by(Author.id, Author.name)
    .having(func.count(Book.id) > 0)
    .order_by(func.count(Book.id).desc())
)

with Session(engine) as s:
    for name, n, pages in s.execute(stmt):
        print(f"{name:<20} {n:>3d} books  {pages:>6d} pages")

Output:

text
Alice Smith            12   2840 pages
Bob Jones               5   1100 pages

Scalar subqueries

python
from sqlalchemy import select, func

avg_pages = select(func.avg(Book.pages)).scalar_subquery()
stmt = select(Book.title, (Book.pages - avg_pages).label("delta")).order_by("delta")

CTEs (common table expressions)

A CTE (WITH recent AS (...)) is the cleanest way to compose a query that references the same intermediate result twice.

python
from sqlalchemy import select, func

recent = (
    select(Book.id, Book.author_id, Book.pages)
    .where(Book.published_at >= "2024-01-01")
    .cte("recent")
)

stmt = (
    select(Author.name, func.sum(recent.c.pages).label("pages_2024"))
    .join(recent, recent.c.author_id == Author.id)
    .group_by(Author.id, Author.name)
)

Window functions

python
from sqlalchemy import select, func

stmt = select(
    Book.title,
    Book.pages,
    func.rank().over(
        partition_by=Book.author_id,
        order_by=Book.pages.desc(),
    ).label("rank_in_author"),
)

Eager vs lazy loading — selectinload vs joinedload

Lazy loading (the default) fires one SQL query per attribute access on each parent — the classic N+1 problem that turns a 100-row page into 101 round-trips. Eager loading fetches related rows upfront via either a SQL JOIN (joinedload) or a separate IN (...) query (selectinload). The choice depends on the cardinality of the relationship.

StrategyGeneratesBest forWatch out for
selectinload(Model.rel)Second SELECT … WHERE id IN (…)One-to-many collectionsTwo round-trips instead of one
joinedload(Model.rel)Single SELECT … LEFT OUTER JOIN …Many-to-one, one-to-oneRow duplication for collections (use .unique())
subqueryload(Model.rel)SELECT … FROM (subquery)Legacy; rarely used nowSlower than selectinload
raiseload(Model.rel)Raises on accessCatching accidental lazy loads in testsStrict — every access must be eager
Default (lazy="select")One query per accessQuick scriptsN+1 in loops
python
from sqlalchemy import select
from sqlalchemy.orm import selectinload, joinedload, contains_eager, raiseload

# Best for collections — two queries instead of N+1
authors = session.scalars(
    select(Author).options(selectinload(Author.books))
).all()

# Best for many-to-one (single related row per parent)
books = session.scalars(
    select(Book).options(joinedload(Book.author))
).all()

# Chained — load Author -> Books -> Tags
authors = session.scalars(
    select(Author).options(
        selectinload(Author.books).selectinload(Book.tags)
    )
).all()

# raiseload — fail fast on accidental lazy access
authors = session.scalars(
    select(Author).options(raiseload(Author.books))
).all()
# authors[0].books  # raises sqlalchemy.exc.InvalidRequestError

Default to selectinload for most relationship loads. Use joinedload only for many-to-one where the parent has exactly one related row, or you need to filter / sort by a joined column.

Async support with AsyncSession

SQLAlchemy 2.0 ships first-class async support via create_async_engine and AsyncSession. The API mirrors the sync version with await prefixes on I/O methods. Use this when you're building async apps with FastAPI, Starlette, or any other asyncio-based framework — pair it with asyncio.

bash
pip install "sqlalchemy[asyncio]"
pip install asyncpg          # async PostgreSQL driver
pip install aiosqlite        # async SQLite driver

Output: (none — exits 0 on success)

python
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase): pass

class Item(Base):
    __tablename__ = "item"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str]

# Async URL form — note the `+aiosqlite` driver suffix
async_engine = create_async_engine("sqlite+aiosqlite:///app.db", echo=False)
AsyncSessionLocal = async_sessionmaker(async_engine, expire_on_commit=False)

async def main():
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    async with AsyncSessionLocal() as session:
        session.add_all([Item(name="alpha"), Item(name="beta")])
        await session.commit()

        result = await session.scalars(select(Item).order_by(Item.name))
        for item in result:
            print(item.id, item.name)

asyncio.run(main())

Output:

text
1 alpha
2 beta

Eager loading is mandatory in async

In sync sessions, lazy-loaded attributes fire a synchronous SQL query on access. In async sessions, lazy loads raise because they would block the event loop. Either eager-load with selectinload/joinedload, or call await session.refresh(obj, ["rel"]) to populate the attribute explicitly.

python
from sqlalchemy.orm import selectinload

async with AsyncSessionLocal() as session:
    result = await session.scalars(
        select(Author).options(selectinload(Author.books))
    )
    for author in result:
        print(author.name, [b.title for b in author.books])

FastAPI dependency for async sessions

python
from fastapi import FastAPI, Depends
from typing import AsyncIterator

async def get_session() -> AsyncIterator[AsyncSession]:
    async with AsyncSessionLocal() as session:
        yield session

app = FastAPI()

@app.get("/items")
async def list_items(session: AsyncSession = Depends(get_session)):
    items = (await session.scalars(select(Item))).all()
    return [{"id": i.id, "name": i.name} for i in items]

Inspection API — schema introspection at runtime

The inspect() function returns an Inspector for an engine (or a Mapper for an ORM class). Use it to enumerate tables, columns, indexes, and foreign keys at runtime — essential for migration tooling, admin dashboards, or any code that needs to reason about the live schema.

python
from sqlalchemy import inspect, create_engine

engine = create_engine("sqlite:///app.db")
insp = inspect(engine)

print("schemas:", insp.get_schema_names())
print("tables:", insp.get_table_names())

for col in insp.get_columns("users"):
    print(f"  {col['name']:<20} {col['type']!s:<20} nullable={col['nullable']}")

for fk in insp.get_foreign_keys("orders"):
    print("FK:", fk["constrained_columns"], "→", fk["referred_table"], fk["referred_columns"])

for idx in insp.get_indexes("users"):
    print("IDX:", idx["name"], idx["column_names"], "unique" if idx["unique"] else "")

Output:

text
schemas: ['main']
tables: ['users', 'orders', 'items']
  id                   INTEGER             nullable=False
  name                 VARCHAR(100)        nullable=False
  email                VARCHAR(255)        nullable=False
FK: ['user_id'] → users ['id']
IDX: ix_users_active_name ['is_active', 'name']

The mapper-level inspector is what frameworks like Flask-Admin use to generate forms:

python
from sqlalchemy import inspect

mapper = inspect(User)
for col in mapper.columns:
    print(col.name, col.type, col.nullable)
for rel in mapper.relationships:
    print("REL:", rel.key, "→", rel.mapper.class_.__name__)

SQLAlchemy vs SQLModel

SQLAlchemy is the foundation; SQLModel sits on top, gluing SQLAlchemy ORM and Pydantic v2 together so one class serves as both table and validation model. The trade-off is convenience vs flexibility.

ConcernSQLAlchemy 2.0SQLModel
DeclaresClass + Mapped[T] + mapped_column(...)Class + Field(...) (typed via Pydantic)
ValidationNone at the ORM layerPydantic validation on every assignment
API layerSeparate Pydantic modelsSame class via table=True
AsyncFirst-class (AsyncSession)Via SQLAlchemy under the hood
InspectionFull inspect() APISame (inherits)
Composite primary keys, custom typesNativeLimited — drop down to SQLAlchemy
AuthorSQLAlchemy teamFastAPI author
Best fitLibrary code, complex schemas, ETLFastAPI apps, prototype-to-prod, single-class projects

Rule of thumb: pick SQLModel when your data model is straightforward and your API layer is FastAPI; pick SQLAlchemy when you need composite keys, custom column types, complex inheritance, or fine-grained control over the query layer.

Real-world recipes

Bulk insert with insert(...).values([...])

The fastest way to insert N rows is one INSERT statement with many value tuples. session.add_all([...]) works but autoflushes — for >1000 rows this becomes the bottleneck. Use session.execute(insert(Model), [...]) for raw speed.

python
from sqlalchemy import insert

with Session(engine) as session, session.begin():
    session.execute(
        insert(User),
        [
            {"name": f"user-{i}", "email": f"user{i}@example.com"}
            for i in range(10_000)
        ],
    )
print("inserted 10,000 users")

Output:

text
inserted 10,000 users

Upsert (PostgreSQL ON CONFLICT)

PostgreSQL's INSERT ... ON CONFLICT ... DO UPDATE is the canonical upsert. SQLAlchemy exposes it via postgresql.insert(). SQLite has the same syntax via sqlite.insert(). MySQL uses ON DUPLICATE KEY UPDATE via mysql.insert().

python
from sqlalchemy.dialects.postgresql import insert

stmt = insert(User).values(
    name="alice", email="alice@example.com", age=30
)
stmt = stmt.on_conflict_do_update(
    index_elements=["email"],
    set_={"name": stmt.excluded.name, "age": stmt.excluded.age},
)
with Session(engine) as s:
    s.execute(stmt)
    s.commit()

Streaming a huge result set

When a query returns millions of rows, fetching them all at once blows up memory. stream_results=True (PostgreSQL: server-side cursor) lets you iterate row-by-row. Combine with yield_per(N) for per-batch cleanup.

python
from sqlalchemy import select

with engine.connect().execution_options(stream_results=True) as conn:
    for row in conn.execute(select(User)).yield_per(1000):
        # process one row at a time
        ...

For ORM:

python
with Session(engine) as s:
    stmt = select(User).execution_options(stream_results=True, yield_per=500)
    for user in s.scalars(stmt):
        ...

Repository pattern with typed methods

A small wrapper class hides session plumbing behind named methods. Great for testability — tests inject a fake session, code stays readable.

python
from typing import Sequence
from sqlalchemy import select
from sqlalchemy.orm import Session

class UserRepo:
    def __init__(self, session: Session):
        self.s = session

    def get(self, user_id: int) -> User | None:
        return self.s.get(User, user_id)

    def by_email(self, email: str) -> User | None:
        return self.s.scalar(select(User).where(User.email == email))

    def search(self, name_like: str, limit: int = 20) -> Sequence[User]:
        stmt = select(User).where(User.name.ilike(f"%{name_like}%")).limit(limit)
        return self.s.scalars(stmt).all()

    def create(self, **kwargs) -> User:
        u = User(**kwargs)
        self.s.add(u)
        self.s.flush()
        return u

with Session(engine) as s:
    repo = UserRepo(s)
    user = repo.create(name="Alice", email="alice@example.com")
    s.commit()
    print(repo.by_email("alice@example.com").name)

Output:

text
Alice

Auditing with SQLAlchemy events

event.listens_for(Session, "before_flush") fires once per flush, before the SQL is sent. Use it for cross-cutting concerns like setting updated_at, enforcing immutability, or writing an audit-trail row.

python
from sqlalchemy import event
from sqlalchemy.orm import Session as SASession
from datetime import datetime

@event.listens_for(SASession, "before_flush")
def stamp_updated(session, flush_context, instances):
    for obj in session.dirty:
        if hasattr(obj, "updated_at"):
            obj.updated_at = datetime.utcnow()

Test fixture — rollback after every test

The fastest test isolation strategy is to wrap each test in a transaction and rollback at the end. The DB state is identical between tests, no truncate or recreate needed.

python
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

@pytest.fixture(scope="session")
def engine():
    eng = create_engine("sqlite:///test.db")
    Base.metadata.create_all(eng)
    yield eng
    Base.metadata.drop_all(eng)

@pytest.fixture
def session(engine):
    conn = engine.connect()
    txn = conn.begin()
    session = Session(bind=conn)
    yield session
    session.close()
    txn.rollback()
    conn.close()

See also

  • SQLModel — single-class ORM + Pydantic for FastAPI projects
  • FastAPI — uses SQLAlchemy sessions via dependency injection
  • Pydantic — separate API-layer validation when not using SQLModel
  • DuckDB — alternative for analytical queries; SQLAlchemy has a dialect for it
  • Polars — DataFrame library; complements SQLAlchemy for analytics