cheat sheet
loguru
Add structured, colorized logging to Python apps with loguru. Covers sinks, log levels, file rotation, retention, exception catching, and context binding.
loguru — Structured Logging
What it is
Loguru is a structured logging library that replaces Python's standard logging module with a simpler, zero-configuration API. It provides:
- Colorized output with level labels out of the box
- File sinks with automatic rotation, retention, and compression
logger.catch()decorator for automatic exception catching- Context binding with
logger.bind() - Structured JSON output via
serialize=True
Install
pip install loguru
Output: (none — exits 0 on success)
Quick example
from loguru import logger
logger.debug("Starting up")
logger.info("Server running on port {port}", port=8080)
logger.warning("Low memory: {mb} MB remaining", mb=128)
logger.error("Connection refused to {host}", host="db.internal")
Output:
2026-04-25 14:30:01.234 | DEBUG | __main__:<module>:3 - Starting up
2026-04-25 14:30:01.234 | INFO | __main__:<module>:4 - Server running on port 8080
2026-04-25 14:30:01.234 | WARNING | __main__:<module>:5 - Low memory: 128 MB remaining
2026-04-25 14:30:01.234 | ERROR | __main__:<module>:6 - Connection refused to db.internal
In the terminal, each level is colorized: DEBUG is dim, INFO is white, WARNING is yellow, ERROR is red. The output includes module, function, and line number automatically.
When / why to use it over logging
| Feature | logging | loguru |
|---|---|---|
| Zero-config setup | ❌ (handlers, formatters) | ✅ |
| Colorized output | ❌ | ✅ |
f-string-style messages | ❌ | ✅ |
| File rotation | Manual setup | logger.add("app.log", rotation="10 MB") |
| Exception context | Manual | logger.catch() / logger.exception() |
| Structured JSON | Manual | logger.add(..., serialize=True) |
Common pitfalls
Thread-safety — loguru is thread-safe by default. But if you're using multiprocessing, you must use
enqueue=Trueon your file sink:logger.add("app.log", enqueue=True)to avoid write conflicts.
logger.remove()before reconfiguring — loguru starts with a default stderr sink (id 0). If you add your own stderr sink without removing the default, you get duplicate output. Calllogger.remove()first.
Use
logger.info("value={x}", x=val)(lazy formatting) rather thanlogger.info(f"value={val}"). Lazy formatting skips string interpolation entirely if the log level is disabled — important for performance in hot paths.
Richer example — file sink with rotation
import sys
from loguru import logger
# Remove default stderr sink and add custom ones
logger.remove()
# Concise stderr: only INFO and above
logger.add(
sys.stderr,
level="INFO",
format="<green>{time:HH:mm:ss}</green> | <level>{level:<8}</level> | {message}",
colorize=True,
)
# Verbose file: DEBUG and above, with rotation
logger.add(
"logs/app.log",
level="DEBUG",
rotation="10 MB", # new file after 10 MB
retention="7 days", # delete logs older than 7 days
compression="zip", # compress rotated files
format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {name}:{line} | {message}",
)
logger.debug("Debug detail — goes to file only")
logger.info("Server started")
logger.warning("Disk usage above 80%")
Output (stderr only):
14:30:01 | INFO | Server started
14:30:01 | WARNING | Disk usage above 80%
Exception catching
The @logger.catch decorator wraps a function so that any unhandled exception is logged with a full, colorized traceback instead of crashing silently. Apply it to top-level handlers or background tasks where you want errors recorded but execution to continue.
from loguru import logger
@logger.catch
def divide(a: int, b: int) -> float:
return a / b
result = divide(10, 0) # won't crash the program — logs the full traceback
Output:
2026-04-25 14:30:01.234 | ERROR | __main__:divide:4 - An error has been caught in function 'divide', process 'MainProcess' (1234), thread 'MainThread' (5678):
Traceback (most recent call last):
...
ZeroDivisionError: division by zero
Context binding
logger.bind(**kwargs) returns a new logger instance with extra key/value pairs attached to every log record it emits. Use it to tag all log lines within a request, task, or user session with a correlation ID — making it easy to filter logs for a specific context.
from loguru import logger
def handle_request(request_id: str, user_id: int):
log = logger.bind(request_id=request_id, user_id=user_id)
log.info("Request received")
log.info("Processing complete")
handle_request("req-abc123", user_id=42)
Output:
2026-04-25 14:30:01 | INFO | __main__:handle_request:4 - Request received
2026-04-25 14:30:01 | INFO | __main__:handle_request:5 - Processing complete
JSON / structured output
Passing serialize=True to logger.add() switches that sink to emit one JSON object per log line instead of formatted text. This is the standard way to feed loguru into log aggregators (Datadog, Loki, CloudWatch) that expect structured, machine-parseable logs.
import sys
from loguru import logger
logger.remove()
logger.add(sys.stdout, serialize=True) # every line is a JSON object
logger.info("User logged in", user_id=42, action="login")
Output:
{"text": "2026-04-25 14:30:01.234 | INFO | __main__:<module>:5 - User logged in", "record": {"elapsed": ..., "exception": null, "extra": {"user_id": 42, "action": "login"}, "file": ..., "function": "<module>", "level": {"icon": "ℹ️", "name": "INFO", "no": 20}, "line": 5, "message": "User logged in", ...}}
Log levels
| Level | logger.X() | Default value |
|---|---|---|
| TRACE | logger.trace() | 5 |
| DEBUG | logger.debug() | 10 |
| INFO | logger.info() | 20 |
| SUCCESS | logger.success() | 25 |
| WARNING | logger.warning() | 30 |
| ERROR | logger.error() | 40 |
| CRITICAL | logger.critical() | 50 |
logger.success()is a loguru-specific level (between INFO and WARNING) useful for marking successful completion of significant operations.
loguru vs stdlib logging
The stdlib logging module is venerable, exhaustively configurable, and notoriously painful to set up. Loguru is the deliberate counterpoint: a single global logger object that works without configuration and exposes 90% of stdlib's power through a much smaller API. The trade-off:
| Concern | stdlib logging | loguru |
|---|---|---|
| Mental model | Hierarchy of named loggers; each picks up handlers from ancestors | One global logger; handlers added/removed by id |
| Setup | logger = logging.getLogger(__name__), handlers, formatters, levels — at least 10 lines | from loguru import logger — zero lines |
| Formatting | %-style strings via Formatter | f-string-like inline placeholders + colour markup |
| Rotation | logging.handlers.RotatingFileHandler + manual size logic | rotation="10 MB" |
| Retention | Manual (no built-in cleanup) | retention="7 days" |
| Compression | Manual | compression="zip" |
| Async / multiproc safety | Manual QueueHandler setup | enqueue=True |
| Exception traceback | logger.exception() + manual extra= | @logger.catch or logger.opt(exception=True) |
| JSON output | Manual custom formatter | serialize=True |
| Context propagation | Manual Filter + extra= | logger.bind() / logger.contextualize() |
| Ecosystem | Universal — every Python lib uses it | Loguru-only; bridge with InterceptHandler |
The pragmatic choice:
- New apps — start with loguru. The API is dramatically smaller and the features (rotation, retention, structured output) work out of the box.
- Libraries — use stdlib
loggingexclusively. Library authors must not impose loguru on their consumers; expose a stdlib logger and let the app decide. - Existing apps with stdlib already wired — bridge stdlib into loguru with an
InterceptHandler(see recipe below) so all log records flow through loguru's handlers without rewriting everylogging.getLogger(__name__)call.
The single global logger
Loguru exposes one logger imported once and used everywhere. No getLogger(__name__) ritual, no parent/child propagation rules. Every module imports the same object; configuration applied in main.py takes effect across the program. The trade-off is that you cannot route different modules' logs to different sinks via the logger object itself — instead, configure handlers to filter by record fields (record["name"], record["module"], record["extra"]).
# app.py
from loguru import logger
# Configure at startup — affects every module
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add("logs/app.log", level="DEBUG", rotation="10 MB")
# Modules import the same logger
import myapp.api
import myapp.workers
# myapp/api.py
from loguru import logger
def handle(request):
logger.info("Handling {method} {path}", method=request.method, path=request.path)
# myapp/workers.py
from loguru import logger
def process(job):
logger.debug("Processing job {id}", id=job.id)
A single logger.remove() then logger.add(...) block in main.py reconfigures the entire app. To route worker logs separately, filter inside the handler:
logger.add(
"logs/workers.log",
filter=lambda record: record["module"].startswith("myapp.workers"),
)
add and remove — handler lifecycle
logger.add(sink, **opts) registers a sink (stderr, a file path, a callable, a stream) and returns an integer id. logger.remove(id) removes that specific sink; logger.remove() (no args) removes all sinks including the default. The default stderr sink has id 0 — calling add(sys.stderr, ...) without first removing it produces duplicate output.
import sys
from loguru import logger
# Step 1: remove the default stderr sink (id 0)
logger.remove()
# Step 2: add custom sinks; each returns an id
stderr_id = logger.add(sys.stderr, level="INFO", format="<green>{time:HH:mm:ss}</green> | {message}")
file_id = logger.add("logs/app.log", level="DEBUG", rotation="10 MB")
debug_id = logger.add("logs/debug.log", level="TRACE")
# Step 3: remove a specific sink later
logger.remove(debug_id)
# Step 4: full reset
logger.remove()
Sink types
| Sink | Example | Behaviour |
|---|---|---|
| File path (str/Path) | "logs/app.log" | Write to file; supports rotation/retention/compression |
| Stream | sys.stderr | Write to file-like object |
| Callable | lambda msg: send_to_slack(msg) | Invoked for each formatted record |
| Coroutine | async def consume(msg): ... | Awaited per record (loguru handles the event loop) |
| logging.Handler | a stdlib Handler instance | Bridge: loguru records become stdlib LogRecords |
# Send errors to Slack
def slack_sink(message):
record = message.record
if record["level"].name == "ERROR":
post_to_slack(message)
logger.add(slack_sink, level="ERROR")
# Async sink for an HTTP API
import httpx
async def http_sink(message):
async with httpx.AsyncClient() as c:
await c.post("https://logs.example.com/", content=str(message))
logger.add(http_sink, level="WARNING")
Rotation, retention, and compression
Loguru's file sink handles log rotation, old-file cleanup, and compression in one line each. Each argument accepts multiple forms — sizes, durations, schedules, or callables. This is the single biggest ergonomic win over stdlib's RotatingFileHandler/TimedRotatingFileHandler.
Rotation — when to start a new file
| Form | Example | Meaning |
|---|---|---|
| Size | rotation="10 MB" | Rotate when current file exceeds 10 MB |
| Time | rotation="00:00" | Rotate at midnight |
| Period | rotation="1 week" | Rotate every week |
| Callable | rotation=lambda msg, file: file.tell() > 1e7 | Custom predicate |
logger.add("app.log", rotation="100 MB")
logger.add("app.log", rotation="1 day")
logger.add("app.log", rotation="monday at 12:00")
logger.add("app.log", rotation="00:00") # nightly
Retention — when to delete old files
| Form | Example | Meaning |
|---|---|---|
| Count | retention=10 | Keep newest 10 rotated files |
| Duration | retention="7 days" | Delete files older than 7 days |
| Callable | retention=lambda files: [f for f in files if old(f)] | Custom cleanup |
logger.add("app.log", rotation="10 MB", retention=5)
logger.add("app.log", rotation="1 day", retention="30 days")
Compression — how to store rotated files
| Form | Example | Meaning |
|---|---|---|
| Extension | compression="zip" | gzip, bz2, xz, lzma, tar, tar.gz, tar.bz2, tar.xz, zip |
| Callable | compression=lambda f: subprocess.run(...) | Custom |
logger.add("app.log", rotation="100 MB", retention="30 days", compression="gz")
Full production-grade config:
logger.add(
"logs/app.{time:YYYY-MM-DD}.log",
level="INFO",
rotation="00:00", # roll daily at midnight
retention="30 days", # keep a month
compression="gz", # gzip rotated files
enqueue=True, # safe for multiprocessing
backtrace=True, # include extended traceback context
diagnose=False, # do NOT include variable values in tracebacks for prod
serialize=False, # human-readable; switch to True for log aggregators
)
Format strings and colours
Loguru formats use {field} placeholders against the record dict, plus optional <color> and <level> markup that's stripped automatically for non-TTY sinks. Available colours: <red>, <green>, <yellow>, <blue>, <magenta>, <cyan>, <white>, plus <dim>, <bold>, <italic>, <underline>. <level> is a special tag that picks up the current record's level colour.
logger.add(
sys.stderr,
format=(
"<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> "
"| <level>{level: <8}</level> "
"| <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> "
"- <level>{message}</level>"
),
colorize=True,
)
Record fields
| Field | Meaning |
|---|---|
{time} | datetime of the record; supports {time:YYYY-MM-DD HH:mm:ss} |
{level} | Level name ({level: <8} pads to 8 chars) |
{name} | Module name (e.g. myapp.api) |
{function} | Calling function name |
{file} | File path / name |
{line} | Line number |
{module} | Module short name |
{message} | The formatted message |
{extra} | Dict of bound extra fields |
{exception} | Formatted traceback (if any) |
{process} / {thread} | Process and thread identifiers |
{elapsed} | Time since program start |
Programmatic vs string formatting
logger.info("user={user}", user=u) is lazy: loguru only interpolates if the record is actually emitted (i.e. the level isn't filtered out). logger.info(f"user={u}") interpolates eagerly — a meaningful difference in hot loops with disabled debug logging.
# Lazy — preferred
logger.debug("payload={payload}", payload=request.body)
# Eager — runs str(request.body) even if debug is off
logger.debug(f"payload={request.body}")
bind and contextualize — structured logging
logger.bind(**kwargs) returns a new logger instance with extra fields attached to every record it emits. logger.contextualize(**kwargs) is a context manager that attaches fields temporarily — fields disappear when the block exits. Use bind for long-lived loggers (per-request, per-worker); use contextualize for transient contexts (one operation, one block).
# bind — returns a new logger object
def handle_request(request):
log = logger.bind(request_id=request.id, user_id=request.user_id)
log.info("Received")
log.debug("Processing")
log.info("Done")
# contextualize — temporary, scope-limited
def transfer_funds(src, dst, amount):
with logger.contextualize(operation="transfer", amount=amount):
logger.info("Debiting {src}", src=src)
logger.info("Crediting {dst}", dst=dst)
# Outside the block — operation and amount no longer attached
logger.info("Transfer complete")
contextualize is the right tool for request-scoped fields in async frameworks because it interacts correctly with contextvars — the context propagates across await boundaries.
# FastAPI: attach a request_id to every log in this request's call chain
from uuid import uuid4
from fastapi import Request
async def request_id_middleware(request: Request, call_next):
rid = request.headers.get("X-Request-Id") or str(uuid4())
with logger.contextualize(request_id=rid):
return await call_next(request)
JSON / structured output
Pass serialize=True to a sink and every record becomes a single JSON object. This is the format log aggregators (Datadog, Loki, Splunk, CloudWatch, Elastic) consume natively. Fields bound via bind/contextualize end up under record.extra in the output.
import sys
from loguru import logger
logger.remove()
logger.add(sys.stdout, serialize=True, level="INFO")
with logger.contextualize(request_id="req-abc123", user_id=42):
logger.info("User action", action="login")
Output:
{"text": "...", "record": {"elapsed": {...}, "exception": null, "extra": {"request_id": "req-abc123", "user_id": 42, "action": "login"}, "file": {...}, "function": "<module>", "level": {"icon": "ℹ️", "name": "INFO", "no": 20}, "line": 10, "message": "User action", "module": "app", "name": "__main__", "process": {...}, "thread": {...}, "time": {...}}}
For a leaner format, write a custom serializer:
import json
def lean_serializer(record):
return json.dumps({
"ts": record["time"].isoformat(),
"level": record["level"].name,
"msg": record["message"],
**record["extra"],
})
def sink(message):
record = message.record
print(lean_serializer(record))
logger.add(sink, level="INFO")
Output:
{"ts": "2026-05-25T14:30:01.234567+00:00", "level": "INFO", "msg": "User action", "request_id": "req-abc123", "user_id": 42, "action": "login"}
Exception catching
Three escape hatches for capturing exceptions:
@logger.catch — decorator
Wraps a function so any unhandled exception is logged with a full traceback and (optionally) re-raised or swallowed.
@logger.catch # logs + swallows
def background_job(payload):
process(payload)
@logger.catch(reraise=True) # logs + re-raises
def critical_path():
do_something_risky()
@logger.catch(exclude=(KeyboardInterrupt,)) # don't catch certain types
def long_running():
while True:
tick()
@logger.catch(message="Job {extra[job_id]} failed", level="ERROR")
def parametrised_job(job_id):
logger = loguru_logger.bind(job_id=job_id)
risky_work()
logger.opt — fine-grained control
logger.opt() returns a logger configured for the next call only. Use exception=True to add the current exception's traceback, lazy=True to defer formatting, depth=N to walk up the call stack for {name}/{function}/{line} fields.
try:
risky()
except ValueError as e:
logger.opt(exception=True).error("risky failed: {msg}", msg=e)
# equivalent to logger.exception() in stdlib
# Inside a logging helper, point caller fields one frame up
def my_log(msg):
logger.opt(depth=1).info(msg)
Backtrace and diagnose
The file sink accepts backtrace=True (show frames above the exception) and diagnose=True (show variable values at each frame). Both default to False for performance; turn on in dev, leave off in production (diagnose=True may leak secrets into logs).
logger.add("dev.log", backtrace=True, diagnose=True) # rich, dev only
logger.add("prod.log", backtrace=True, diagnose=False) # safe for prod
Async safety and multiprocessing
Loguru is thread-safe by default — internal locks serialise writes. For multiprocessing or async workloads, pass enqueue=True to the sink. This routes records through a multiprocessing.Queue, which a background thread drains. The cost is a small queue overhead; the benefit is correctness when multiple processes (or async tasks logging from different event loops) write to the same file.
# Multiprocessing-safe file sink
logger.add("app.log", rotation="10 MB", enqueue=True)
# In each child process:
def worker(name):
logger.bind(worker=name).info("Working")
with multiprocessing.Pool(4) as pool:
pool.map(worker, ["a", "b", "c", "d"])
enqueue=Truesurvives the parent process forking but each child must import loguru after the fork. On Windows (which uses spawn rather than fork), loguru reconfigures itself per process — but the file lock is shared via the queue.
Bridging stdlib logging into loguru
Most Python libraries log via stdlib. To route those records through loguru's handlers (so they get the same rotation, JSON output, colours, etc.), add an InterceptHandler to the stdlib root logger.
import logging
from loguru import logger
class InterceptHandler(logging.Handler):
def emit(self, record: logging.LogRecord) -> None:
# Map stdlib level number to loguru level name
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# Walk up the stack to find the caller (skip logging internals)
frame, depth = logging.currentframe(), 2
while frame and frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)
# Install once at app startup
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
Every logging.getLogger("uvicorn").info(...) and requests.session.send(...) debug log now flows through loguru's sinks — same format, same rotation, same JSON output.
Framework integration
FastAPI
FastAPI uses stdlib logging. Bridge to loguru via InterceptHandler and use contextualize middleware to attach request IDs.
from fastapi import FastAPI, Request
from loguru import logger
from uuid import uuid4
app = FastAPI()
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
request_id = request.headers.get("X-Request-Id") or str(uuid4())
with logger.contextualize(request_id=request_id, path=request.url.path):
logger.info("Request started")
response = await call_next(request)
logger.info("Request done", status=response.status_code)
response.headers["X-Request-Id"] = request_id
return response
Add the InterceptHandler from the previous section to capture uvicorn.access and uvicorn.error logs through loguru.
Django
Add the InterceptHandler in settings.py:
# settings.py
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"handlers": {
"loguru": {
"class": "myapp.logging.InterceptHandler",
},
},
"root": {"handlers": ["loguru"], "level": "INFO"},
}
Then in apps.py or wsgi.py:
from loguru import logger
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add("logs/django.log", rotation="50 MB", retention="30 days", serialize=True)
Celery
Celery emits via stdlib. Use the same InterceptHandler pattern and add per-task context:
from celery.signals import task_prerun, task_postrun
from loguru import logger
@task_prerun.connect
def task_start(sender, task_id, task, **kwargs):
logger.bind(task_id=task_id, task_name=task.name).info("Task starting")
@task_postrun.connect
def task_end(sender, task_id, task, state, **kwargs):
logger.bind(task_id=task_id, state=state).info("Task done")
Log levels
| Level | Method | Default value | Use for |
|---|---|---|---|
| TRACE | logger.trace() | 5 | Extreme verbosity, function-entry traces |
| DEBUG | logger.debug() | 10 | Dev troubleshooting |
| INFO | logger.info() | 20 | Normal operational events |
| SUCCESS | logger.success() | 25 | Successful completion of significant operations |
| WARNING | logger.warning() | 30 | Recoverable issues |
| ERROR | logger.error() | 40 | Errors a user might see |
| CRITICAL | logger.critical() | 50 | Service down, data loss |
Custom levels
logger.level("AUDIT", no=33, color="<yellow>", icon="*")
logger.log("AUDIT", "User {u} accessed admin panel", u="alice")
Common pitfalls
- Duplicate output from the default sink — loguru ships with stderr sink id 0. Adding your own stderr without
logger.remove()produces double-printed lines. Always remove first. diagnose=Trueleaks secrets — variable values are printed at every frame in tracebacks. Fine in dev, dangerous in production logs. Default it toFalsein prod sinks.enqueue=Trueon every sink — turns every log call into a cross-thread message. Use only for shared files in multiprocess workloads; single-process file sinks don't need it.- Eager f-string formatting in hot paths —
logger.debug(f"x={value}")runsstr(value)even when debug is off. Uselogger.debug("x={x}", x=value)for lazy formatting. - Library code using loguru — libraries must not pin a logging library on their consumers. Use stdlib
loggingin libraries; let the app bridge to loguru if it wants. - Long-lived
bindchains —logger.bind(req=...).bind(user=...).bind(op=...)works but each.bind()creates a new logger object. Combine:logger.bind(req=..., user=..., op=...). - Mixing
extra=andbind—extra=is the old stdlib argument; loguru's idiomatic version isbind(). They both work but mixing them in one app makes the format string brittle. rotation="0 sec"doesn't disable rotation — it rotates every record. To disable, omit the argument entirely.serialize=Trueoutputs a giant JSON — the default serializer includes every record field. Define a custom sink (see structured-logging section) for a lean payload.logger.catchswallows by default — useful for fire-and-forget but dangerous for code that should fail loudly. Passreraise=Truefor critical paths.- Caplog (pytest) doesn't see loguru records — pytest's
caplogreads stdliblogging. To assert on loguru output, add a list sink in a fixture (see recipe below). - Async sinks block on import — declaring an async sink before the event loop exists raises. Add the sink inside
asyncio.run()or afterloop = asyncio.new_event_loop().
Real-world recipes
Bootstrap loguru in a new project
A single configuration block at startup covers 90% of production needs: colourised stderr for humans, rotated JSON files for log aggregators.
# myapp/logging_setup.py
import sys
from loguru import logger
def setup_logging(level: str = "INFO", json_logs: bool = False):
logger.remove()
# Human-readable stderr
logger.add(
sys.stderr,
level=level,
format=(
"<green>{time:YYYY-MM-DD HH:mm:ss}</green> | "
"<level>{level: <8}</level> | "
"<cyan>{name}:{line}</cyan> - <level>{message}</level>"
),
colorize=True,
backtrace=True,
diagnose=False,
)
# Rotating file (JSON for log aggregators when json_logs=True)
logger.add(
"logs/app.{time:YYYY-MM-DD}.log",
level="DEBUG",
rotation="00:00",
retention="30 days",
compression="gz",
enqueue=True,
backtrace=True,
diagnose=False,
serialize=json_logs,
)
# main.py
from myapp.logging_setup import setup_logging
from loguru import logger
setup_logging(level="INFO", json_logs=False)
logger.info("Service starting on port {port}", port=8080)
Output:
2026-05-25 14:30:01 | INFO | __main__:5 - Service starting on port 8080
Bridge stdlib loggers into loguru
The InterceptHandler recipe applied at startup so library logs (uvicorn, requests, sqlalchemy) flow through loguru's pipeline. See the bridging section above for the handler class.
import logging
from loguru import logger
from myapp.logging_setup import InterceptHandler
# Replace stdlib handlers
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
# Silence noisy libraries
for noisy in ["urllib3", "asyncio", "watchdog"]:
logging.getLogger(noisy).setLevel(logging.WARNING)
Per-request context in async code
Use contextualize so the request id propagates across await boundaries via contextvars. Every log inside the request handler — including ones emitted by deep helpers — carries the id.
from fastapi import FastAPI, Request
from loguru import logger
from uuid import uuid4
app = FastAPI()
@app.middleware("http")
async def attach_id(request: Request, call_next):
rid = request.headers.get("X-Request-Id") or str(uuid4())
with logger.contextualize(request_id=rid, path=str(request.url.path)):
logger.info("Request begin")
response = await call_next(request)
logger.bind(status=response.status_code).info("Request end")
return response
Custom JSON for a log aggregator
Most log aggregators want a flat JSON record. Replace loguru's default verbose serializer with a slim one.
import json
import sys
from loguru import logger
def serialize(record):
return json.dumps({
"ts": record["time"].isoformat(),
"level": record["level"].name,
"logger": record["name"],
"msg": record["message"],
"file": f"{record['file'].name}:{record['line']}",
**record["extra"],
**({"exc": str(record["exception"])} if record["exception"] else {}),
})
def sink(message):
print(serialize(message.record), file=sys.stdout, flush=True)
logger.remove()
logger.add(sink, level="INFO")
logger.bind(service="api", env="prod").info("Hello {who}", who="world")
Output:
{"ts": "2026-05-25T14:30:01.234567+00:00", "level": "INFO", "logger": "__main__", "msg": "Hello world", "file": "app.py:12", "service": "api", "env": "prod"}
Test loguru output in pytest
caplog reads stdlib records and won't see loguru. Capture loguru records via a list sink fixture.
# conftest.py
import pytest
from loguru import logger
@pytest.fixture
def caplog_loguru():
records = []
sink_id = logger.add(records.append, level="DEBUG", format="{message}")
yield records
logger.remove(sink_id)
# test_logging.py
def test_logs_warning(caplog_loguru):
from myapp.api import warning_path
warning_path()
assert any("retry" in str(r) for r in caplog_loguru)
Per-module log levels
Filter records by record["name"] to route different modules to different levels.
logger.add(sys.stderr, level="INFO")
logger.add(
"logs/api.log",
level="DEBUG",
filter=lambda r: r["name"].startswith("myapp.api"),
)
logger.add(
"logs/workers.log",
level="INFO",
filter=lambda r: r["name"].startswith("myapp.workers"),
)
Disable logging in tests
Tests should not pollute output with WARNING/ERROR from negative-path tests. Disable loguru globally in test setup or use a quiet sink.
# conftest.py
import pytest
from loguru import logger
@pytest.fixture(autouse=True)
def silence_loguru():
logger.remove()
yield
# restore for any downstream tests that want it
logger.add(sys.stderr, level="WARNING")
Catch exceptions in a background task
Background tasks must not crash silently. Wrap entry points with @logger.catch to log the traceback and (optionally) re-raise.
from loguru import logger
import asyncio
@logger.catch(reraise=False)
async def background_worker(queue: asyncio.Queue):
while True:
job = await queue.get()
await process(job)
asyncio.create_task(background_worker(jobs))
If process raises, the traceback is logged at ERROR level with full context, and the worker continues. Use reraise=True to crash hard instead.
Rotate logs hourly with gzip compression and 7-day retention
A common production pattern. Hourly rotation keeps individual files small enough to grep; gzip cuts disk by ~10x; 7-day retention is enough for triage without filling the disk.
logger.add(
"logs/api.{time:YYYY-MM-DD-HH}.log",
rotation="1 hour",
retention="7 days",
compression="gz",
level="INFO",
enqueue=True,
serialize=True,
)
After a week the directory looks like:
logs/
├── api.2026-05-25-14.log # current hour, plaintext
├── api.2026-05-25-13.log.gz
├── api.2026-05-25-12.log.gz
├── ...
└── api.2026-05-18-15.log.gz # oldest still retained
Send errors to Slack while logging everything to disk
A callable sink filtered by level posts to Slack; a separate file sink keeps the full DEBUG record.
import httpx
from loguru import logger
SLACK_WEBHOOK = "https://hooks.slack.com/services/T000/B000/XXX"
def slack_sink(message):
if message.record["level"].no >= logger.level("ERROR").no:
httpx.post(SLACK_WEBHOOK, json={"text": str(message)})
logger.remove()
logger.add(sys.stderr, level="INFO")
logger.add("logs/app.log", level="DEBUG", rotation="100 MB", retention="14 days")
logger.add(slack_sink, level="ERROR")
One-shot debug logging in production
When troubleshooting a specific issue in prod, drop a temporary verbose sink. Use a context manager so it auto-removes.
from contextlib import contextmanager
from loguru import logger
@contextmanager
def verbose_logging(path: str = "logs/debug-trace.log"):
sink_id = logger.add(path, level="TRACE", backtrace=True, diagnose=True)
try:
yield
finally:
logger.remove(sink_id)
# In a triage script — re-route everything to a forensic file
with verbose_logging():
run_problem_workflow()
See also
sections/python/logging— stdliblogging, the alternative; bridged from above.sections/python/pytest— testing tip; loguru records via a list sink fixture instead ofcaplog.sections/python/fastapi— context-binding middleware pattern.sections/python/rich— alternative for terminal output (tables, prompts, progress) without the logging semantics.sections/python/pdb— pair with loguru's@logger.catch(reraise=True)for post-mortem debugging on crashes.