cheat sheet
AutoGen
Build multi-agent AI systems with Microsoft AutoGen. Covers agents, group chats, code execution, tool registration, async runtimes, and LLM configuration.
AutoGen — Multi-Agent Conversations
What it is
AutoGen is Microsoft Research's framework for building systems where multiple AI agents converse with each other to complete tasks. Each agent has a role (assistant, user proxy, code executor) and communicates by sending messages. The framework handles the conversation loop, tool calling, code execution sandboxing, and LLM configuration. AutoGen is particularly strong at tasks that benefit from a "review and iterate" pattern — one agent proposes a solution, another critiques or tests it.
Install
pip install autogen-agentchat autogen-ext[openai]
# For Anthropic support
pip install autogen-ext[anthropic]
Output: (none — exits 0 on success)
AutoGen underwent a major refactor in v0.4 (
autogen-agentchat). The older v0.2 API (pyautogen) still works but is in maintenance mode. This page covers v0.4.
Quick example
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient
import os
model_client = OpenAIChatCompletionClient(
model="gpt-4o-mini",
api_key=os.environ["OPENAI_API_KEY"],
)
agent = AssistantAgent(
name="assistant",
model_client=model_client,
system_message="You are a helpful coding assistant.",
)
async def main():
await Console(agent.run_stream(task="Write a Python function that checks if a number is prime."))
asyncio.run(main())
Output:
---------- assistant ----------
def is_prime(n: int) -> bool:
if n < 2:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True
When / why to use it
- Tasks that benefit from multiple agents reviewing, criticising, or iterating on each other's output.
- Code generation pipelines where one agent writes code and another executes and validates it.
- Complex multi-step workflows: research → draft → review → revise.
- Systems where you want a human-in-the-loop checkpoint before executing code.
- Multi-agent debate or consensus-building over a question.
Common pitfalls
Infinite loops — if no termination condition is set, agents will keep sending messages until
max_turnsis hit. Always settermination_conditionormax_turnson group chats and multi-agent runs.
Code execution safety —
CodeExecutorAgentwith aLocalCommandLineCodeExecutorruns code directly on your machine. UseDockerCommandLineCodeExecutorfor any untrusted or LLM-generated code.
LLM API costs — group chats with many agents can generate dozens of API calls per task. Monitor token usage via
model_client.total_usage()or LLM provider dashboards.
Console(agent.run_stream(...))is the easiest way to print streaming output during development. Remove it in production and processTaskResultdirectly.
Assign each agent a distinct, narrow
system_message— vague system prompts cause agents to overlap roles and produce unfocused conversations.
Richer example — writer and critic agents
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient
import os
client = OpenAIChatCompletionClient(model="gpt-4o-mini", api_key=os.environ["OPENAI_API_KEY"])
writer = AssistantAgent(
name="writer",
model_client=client,
system_message=(
"You are a technical blog writer. Write clear, concise technical content. "
"Incorporate feedback from the critic in each revision."
),
)
critic = AssistantAgent(
name="critic",
model_client=client,
system_message=(
"You are a technical editor. Review the writer's content for accuracy, "
"clarity, and completeness. Provide specific improvement suggestions. "
"When satisfied, say 'APPROVED'."
),
)
termination = MaxMessageTermination(max_messages=6)
team = RoundRobinGroupChat([writer, critic], termination_condition=termination)
async def main():
result = await team.run(task="Write a 150-word blog intro explaining what a transformer neural network is.")
print(result.messages[-1].content)
asyncio.run(main())
Output:
Transformers are a class of neural network architectures that revolutionised
natural language processing when introduced in the landmark 2017 paper
"Attention Is All You Need." Unlike earlier recurrent models that processed
text sequentially, transformers use self-attention to process all tokens
simultaneously, capturing long-range dependencies with remarkable efficiency.
[APPROVED]
Agent types
| Agent class | Role |
|---|---|
AssistantAgent | LLM-powered agent that generates responses and can call tools |
UserProxyAgent | Simulates a human user; can execute code and relay feedback |
CodeExecutorAgent | Executes code produced by other agents in a sandbox |
SocietyOfMindAgent | Wraps a team of agents as a single agent |
LLM model client configuration
AutoGen separates the model client from the agent. Swap models by changing the client object.
from autogen_ext.models.openai import OpenAIChatCompletionClient, AzureOpenAIChatCompletionClient
from autogen_ext.models.anthropic import AnthropicChatCompletionClient
import os
# OpenAI
client = OpenAIChatCompletionClient(
model="gpt-4o",
api_key=os.environ["OPENAI_API_KEY"],
temperature=0.3,
max_tokens=1024,
)
# Azure OpenAI
client = AzureOpenAIChatCompletionClient(
azure_deployment="gpt-4o",
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_key=os.environ["AZURE_OPENAI_KEY"],
api_version="2024-06-01",
model="gpt-4o",
)
# Anthropic
client = AnthropicChatCompletionClient(
model="claude-sonnet-4-6",
api_key=os.environ["ANTHROPIC_API_KEY"],
)
Tools — registering functions
Tools are standard Python functions decorated with type annotations. Pass them in the agent's tools= list. The model decides when to call them.
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_core.tools import FunctionTool
from autogen_ext.models.openai import OpenAIChatCompletionClient
import os
def search_docs(query: str) -> str:
"""Search internal documentation and return relevant excerpts."""
# Stub — replace with real search
return f"Found: Documentation for '{query}': Use `configure()` to initialise."
def get_version(package: str) -> str:
"""Return the installed version of a Python package."""
import importlib.metadata
try:
return importlib.metadata.version(package)
except Exception:
return f"{package} not installed"
client = OpenAIChatCompletionClient(model="gpt-4o-mini", api_key=os.environ["OPENAI_API_KEY"])
agent = AssistantAgent(
name="helper",
model_client=client,
tools=[
FunctionTool(search_docs, description="Search internal documentation"),
FunctionTool(get_version, description="Get installed package version"),
],
)
async def main():
result = await agent.run(task="What version of polars is installed? Also look up how to configure it.")
print(result.messages[-1].content)
asyncio.run(main())
Output:
polars 1.9.0 is installed. According to the documentation: Use `configure()` to initialise.
Code execution — CodeExecutorAgent
CodeExecutorAgent pairs with an AssistantAgent to form a write-then-execute loop. The assistant writes code blocks; the executor runs them and returns stdout/stderr back to the assistant.
import asyncio
import tempfile
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.code_executors.local import LocalCommandLineCodeExecutor
from autogen_ext.models.openai import OpenAIChatCompletionClient
import os
client = OpenAIChatCompletionClient(model="gpt-4o-mini", api_key=os.environ["OPENAI_API_KEY"])
with tempfile.TemporaryDirectory() as work_dir:
executor = LocalCommandLineCodeExecutor(work_dir=work_dir)
coder = AssistantAgent(
name="coder",
model_client=client,
system_message=(
"You are a Python coding assistant. Write complete, runnable Python code. "
"Once the code is verified working, say 'TASK_COMPLETE'."
),
)
runner = CodeExecutorAgent(name="runner", code_executor=executor)
termination = TextMentionTermination("TASK_COMPLETE")
team = RoundRobinGroupChat([coder, runner], termination_condition=termination)
async def main():
result = await team.run(
task="Write and run Python code that computes the first 10 Fibonacci numbers."
)
for msg in result.messages:
print(f"[{msg.source}] {msg.content[:120]}")
asyncio.run(main())
Output:
[coder] ```python
fibs = [0, 1]
for _ in range(8):
fibs.append(fibs[-1] + fibs[-2])
print(fibs)
[runner] [0, 1, 1, 2, 3, 5, 8, 13, 21, 34] [coder] The code ran successfully: [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]. TASK_COMPLETE
## Group chats — RoundRobinGroupChat and SelectorGroupChat
`RoundRobinGroupChat` passes the conversation to each agent in order. `SelectorGroupChat` uses an LLM to select the next speaker based on context — useful when different agents should speak at different points.
```python
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.conditions import MaxMessageTermination
planner = AssistantAgent("planner", model_client=client,
system_message="You plan solutions and break tasks into steps.")
researcher = AssistantAgent("researcher", model_client=client,
system_message="You research information and provide facts.")
reviewer = AssistantAgent("reviewer", model_client=client,
system_message="You review solutions for correctness and completeness.")
team = SelectorGroupChat(
[planner, researcher, reviewer],
model_client=client, # LLM selects next speaker
termination_condition=MaxMessageTermination(10),
)
Termination conditions
from autogen_agentchat.conditions import (
MaxMessageTermination,
TextMentionTermination,
StopMessageTermination,
TimeoutTermination,
)
# Stop after 10 messages
MaxMessageTermination(max_messages=10)
# Stop when any agent says "DONE"
TextMentionTermination("DONE")
# Stop after 60 seconds
TimeoutTermination(timeout_seconds=60)
# Combine with OR
MaxMessageTermination(10) | TextMentionTermination("DONE")
# Combine with AND
MaxMessageTermination(20) & TextMentionTermination("APPROVED")
Async runtime and cancellation
AutoGen v0.4 is async-first. All run() and run_stream() methods are coroutines. Cancel a run with asyncio.CancelledError or a timeout.
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
import os
client = OpenAIChatCompletionClient(model="gpt-4o-mini", api_key=os.environ["OPENAI_API_KEY"])
agent = AssistantAgent("assistant", model_client=client)
async def main():
try:
result = await asyncio.wait_for(
agent.run(task="Summarise the history of computing in detail."),
timeout=30.0,
)
print(result.messages[-1].content[:200])
except asyncio.TimeoutError:
print("Agent timed out after 30s")
asyncio.run(main())
Usage tracking
# After a run, retrieve token usage
usage = model_client.total_usage()
print(f"Prompt tokens: {usage.prompt_tokens}")
print(f"Completion tokens: {usage.completion_tokens}")
Docker code executor — sandboxed execution
DockerCommandLineCodeExecutor runs untrusted code inside an isolated container. Use this for any code an LLM produces in production — LocalCommandLineCodeExecutor has root-level access to your filesystem.
import asyncio
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient
import os
client = OpenAIChatCompletionClient(model="gpt-4o-mini", api_key=os.environ["OPENAI_API_KEY"])
async def main():
async with DockerCommandLineCodeExecutor(
image="python:3.12-slim",
work_dir="/workspace",
timeout=60,
auto_remove=True,
) as executor:
coder = AssistantAgent("coder", model_client=client,
system_message="Write Python. When verified working, say TASK_COMPLETE.")
runner = CodeExecutorAgent("runner", code_executor=executor)
team = RoundRobinGroupChat(
[coder, runner],
termination_condition=TextMentionTermination("TASK_COMPLETE"),
)
result = await team.run(task="Compute mean of [4, 7, 2, 9, 5] using statistics.")
print(result.messages[-1].content)
asyncio.run(main())
Output:
The mean of the list is 5.4. TASK_COMPLETE
docker pull python:3.12-slim
python autogen_docker_example.py
Output:
3.12-slim: Pulling from library/python
Digest: sha256:9ce5...
Status: Downloaded newer image for python:3.12-slim
The mean of the list is 5.4. TASK_COMPLETE
The Docker executor needs Docker Desktop or Docker Engine running. On Apple Silicon, pull the linux/arm64 variant to avoid emulation slowdowns.
Jupyter executor — stateful kernels
DockerJupyterCodeExecutor runs code in a persistent Jupyter kernel inside a container. State (imports, variables) carries between code blocks — useful for multi-step data analysis tasks.
import asyncio
from autogen_ext.code_executors.jupyter import DockerJupyterCodeExecutor
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent
async def main():
async with DockerJupyterCodeExecutor(
image="quay.io/jupyter/minimal-notebook:latest",
timeout=120,
) as executor:
analyst = AssistantAgent("analyst", model_client=client,
system_message="You are a data analyst. Use pandas. Reply DONE when finished.")
runner = CodeExecutorAgent("runner", code_executor=executor)
team = RoundRobinGroupChat([analyst, runner],
termination_condition=TextMentionTermination("DONE"))
await team.run(task=(
"Load https://raw.githubusercontent.com/datasets/airport-codes/master/data/airport-codes.csv "
"into df, then report how many airports have type='large_airport' grouped by iso_country."
))
asyncio.run(main())
UserProxyAgent — human in the loop
UserProxyAgent waits for human input at each turn, letting you review or steer the conversation. Use it for approval gates before executing risky code.
import asyncio
from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
assistant = AssistantAgent("assistant", model_client=client,
system_message="Propose shell commands. Wait for human approval. Say DONE when complete.")
user = UserProxyAgent(
"user",
input_func=input, # blocks for stdin; swap for websocket/queue in prod
)
team = RoundRobinGroupChat([assistant, user],
termination_condition=TextMentionTermination("DONE"))
async def main():
await team.run(task="Help me clean up old Docker images on my machine.")
asyncio.run(main())
Output:
---------- assistant ----------
I suggest running: `docker image prune -a --filter "until=720h"`. This removes
images not used in 30 days. Approve? (y/n)
---------- user ----------
y
---------- assistant ----------
Acknowledged. The command will free unreferenced images. DONE.
Messages and message types
Agents exchange typed message objects. Use the right type so the UI and serialiser render output correctly.
| Message class | When to use |
|---|---|
TextMessage | Standard agent reply |
MultiModalMessage | Text + image content |
ToolCallRequestEvent | Agent requests a tool call |
ToolCallExecutionEvent | Tool returned a value |
StopMessage | Programmatic stop signal |
HandoffMessage | Hand the floor to a named agent |
from autogen_agentchat.messages import TextMessage, MultiModalMessage
from autogen_core import Image
from PIL import Image as PILImage
# Multimodal: send an image plus a prompt
img = Image.from_pil(PILImage.open("chart.png"))
msg = MultiModalMessage(content=["What's the trend in this chart?", img], source="user")
result = await agent.on_messages([msg], cancellation_token=None)
print(result.chat_message.content)
Memory — persisting context across runs
A Memory provider stores per-agent state (conversations, facts) that persists between runs. Common implementations: ListMemory (in-process), Mem0 (managed), and custom vector-backed memories.
from autogen_core.memory import ListMemory, MemoryContent, MemoryMimeType
from autogen_agentchat.agents import AssistantAgent
memory = ListMemory()
await memory.add(MemoryContent(content="User prefers Python 3.12.", mime_type=MemoryMimeType.TEXT))
await memory.add(MemoryContent(content="User's project name is jockey.", mime_type=MemoryMimeType.TEXT))
agent = AssistantAgent(
name="assistant",
model_client=client,
memory=[memory], # injected into system prompt context
system_message="You are a helpful assistant. Use stored memory when relevant.",
)
result = await agent.run(task="What Python version should I target?")
print(result.messages[-1].content)
Output:
Target Python 3.12 based on your stated preference.
Handoff — explicit delegation between agents
Instead of a generic group chat, HandoffMessage lets one agent pass control to a specific colleague. Useful for triage-style flows where a router hands off to a specialist.
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import Swarm
from autogen_agentchat.conditions import HandoffTermination, MaxMessageTermination
triage = AssistantAgent(
"triage",
model_client=client,
system_message="Route the user's question. Hand off to 'billing' or 'tech'.",
handoffs=["billing", "tech"],
)
billing = AssistantAgent("billing", model_client=client,
system_message="Answer billing questions only. Hand back to 'triage' when done.",
handoffs=["triage"])
tech = AssistantAgent("tech", model_client=client,
system_message="Answer technical questions only. Hand back to 'triage' when done.",
handoffs=["triage"])
team = Swarm(
[triage, billing, tech],
termination_condition=HandoffTermination(target="user") | MaxMessageTermination(10),
)
await team.run(task="My credit card was charged twice for the last invoice.")
Streaming inner events
run_stream yields incremental events: model token deltas, tool call requests, tool returns. Use this to drive a UI or to log fine-grained activity.
import asyncio
from autogen_agentchat.messages import TextMessage, ModelClientStreamingChunkEvent
async def main():
async for event in agent.run_stream(task="Explain Big-O notation briefly."):
if isinstance(event, ModelClientStreamingChunkEvent):
print(event.content, end="", flush=True)
elif isinstance(event, TextMessage):
print(f"\n[{event.source} complete]")
asyncio.run(main())
Output:
Big-O describes the asymptotic upper bound on running time as input size grows...
[assistant complete]
State persistence — saving conversation context
import json
state = await agent.save_state()
with open("agent_state.json", "w") as f:
json.dump(state, f)
# Later — restore
with open("agent_state.json") as f:
state = json.load(f)
await agent.load_state(state)
Team state works the same way:
team_state = await team.save_state()
# ... process restarted ...
await team.load_state(team_state)
result = await team.run(task="Continue the prior task.")
Distributed runtime — multi-host agents
For large-scale deployments, autogen-core ships with a gRPC-based distributed runtime. Agents register with a host service and exchange messages across processes or machines.
import asyncio
from autogen_core import (
DefaultTopicId,
SingleThreadedAgentRuntime,
TypeSubscription,
)
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntimeHost, GrpcWorkerAgentRuntime
async def main():
# Run on a dedicated machine
host = GrpcWorkerAgentRuntimeHost(address="0.0.0.0:50051")
host.start()
print("Host listening on :50051")
await host.stop_when_signal()
asyncio.run(main())
python host.py &
python worker_a.py &
python worker_b.py
Output:
Host listening on :50051
[worker_a] registered as worker-a
[worker_b] registered as worker-b
Structured output — Pydantic schemas
For tasks that need structured data (extraction, function calling), constrain the model output to a Pydantic schema. The runtime validates and re-prompts on schema failure.
from pydantic import BaseModel
from autogen_agentchat.agents import AssistantAgent
class Invoice(BaseModel):
invoice_id: str
amount_usd: float
due_date: str
agent = AssistantAgent(
"extractor",
model_client=client,
output_content_type=Invoice, # constrains structured output
system_message="Extract the invoice fields from the user's message.",
)
result = await agent.run(task=(
"Invoice INV-1029 for $342.10 is due on 2026-06-01. Please record it."
))
parsed = result.messages[-1].content # Invoice instance
print(parsed.invoice_id, parsed.amount_usd, parsed.due_date)
Output:
INV-1029 342.1 2026-06-01
Real-world recipes
Recipe: write-test-fix loop
A coder writes a function, a tester runs pytest, a critic reviews failures. Continue until tests pass.
import asyncio, tempfile
from autogen_agentchat.agents import AssistantAgent, CodeExecutorAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination, MaxMessageTermination
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
async def main():
async with DockerCommandLineCodeExecutor(image="python:3.12-slim") as executor:
coder = AssistantAgent("coder", model_client=client, system_message=(
"Write Python code and unit tests. When tests pass, say PASS."
))
runner = CodeExecutorAgent("runner", code_executor=executor)
team = RoundRobinGroupChat([coder, runner],
termination_condition=TextMentionTermination("PASS") | MaxMessageTermination(12))
await team.run(task=(
"Implement is_palindrome(s: str) -> bool, then write 5 pytest tests "
"and run them with `pytest -q test_pal.py`."
))
asyncio.run(main())
Output:
[coder] (writes is_palindrome + pytest cases)
[runner] 5 passed in 0.04s
[coder] All tests pass. PASS
Recipe: research → draft → review pipeline
researcher = AssistantAgent("researcher", model_client=client,
system_message="Find current information using the web_search tool. Summarise key facts.",
tools=[FunctionTool(web_search, description="Search the web for current info")])
writer = AssistantAgent("writer", model_client=client,
system_message="Take research notes and produce a 200-word draft. Cite sources.")
reviewer = AssistantAgent("reviewer", model_client=client,
system_message="Review for accuracy and clarity. Reply APPROVED only when satisfied.")
team = SelectorGroupChat(
[researcher, writer, reviewer],
model_client=client,
termination_condition=TextMentionTermination("APPROVED") | MaxMessageTermination(12),
)
await team.run(task="Write an explainer on the difference between LoRA and QLoRA fine-tuning.")
Recipe: triage with handoff and human approval
from autogen_agentchat.teams import Swarm
from autogen_agentchat.conditions import HandoffTermination
router = AssistantAgent("router", model_client=client,
handoffs=["refund_agent", "tech_support", "user"],
system_message="Route to refund_agent, tech_support, or hand back to user.")
refund = AssistantAgent("refund_agent", model_client=client,
handoffs=["user"],
system_message="Propose a refund amount, then hand off to user for approval.")
team = Swarm(
[router, refund, tech_support],
termination_condition=HandoffTermination(target="user"),
)
await team.run(task="My subscription was charged twice. Please refund the duplicate.")
Recipe: parallel agent fan-out
Run N agents independently against the same task and pick the best.
import asyncio
from autogen_agentchat.agents import AssistantAgent
async def run_one(persona: str, task: str) -> str:
agent = AssistantAgent(persona, model_client=client,
system_message=f"Answer as a {persona}. Be concise.")
r = await agent.run(task=task)
return r.messages[-1].content
async def main():
task = "Explain why memoization helps a slow recursive Fibonacci."
answers = await asyncio.gather(*[
run_one("five-year-old", task),
run_one("undergrad", task),
run_one("compiler engineer", task),
])
for persona, ans in zip(["child", "undergrad", "engineer"], answers):
print(f"\n[{persona}]\n{ans}")
asyncio.run(main())
Output:
[child]
Imagine you do the same homework over and over. Memoization saves the answer the first time...
[undergrad]
Naive fib(n) recomputes overlapping subproblems exponentially. Memoization stores f(k)...
[engineer]
The recursion tree of fib(n) has O(phi^n) nodes; memoization converts it into O(n) DP via cache.
Performance and reliability tips
- Always set both
max_turnsand a textual termination signal — agents can loop indefinitely on ambiguous tasks. - For code execution, prefer
DockerCommandLineCodeExecutorin production. The local executor has the same privileges as the calling user. SelectorGroupChatmakes an extra LLM call per turn to pick the next speaker. For deterministic flows useRoundRobinGroupChatto save tokens.- Stream output to users with
run_stream— long agent turns block for minutes otherwise. - Track
model_client.total_usage()after each run; surface costs in your app so spending stays visible. - Pin
autogen-agentchat==0.4.xinrequirements.txt— the 0.2/0.4 split means accidental upgrades break imports.
Quick reference
| Task | Code |
|---|---|
| OpenAI client | OpenAIChatCompletionClient(model="gpt-4o", api_key=...) |
| Anthropic client | AnthropicChatCompletionClient(model="claude-sonnet-4-6", api_key=...) |
| Azure client | AzureOpenAIChatCompletionClient(azure_deployment=..., api_version=...) |
| Assistant agent | AssistantAgent("name", model_client=client, system_message="...") |
| Code executor | CodeExecutorAgent("runner", code_executor=LocalCommandLineCodeExecutor(...)) |
| Docker executor | DockerCommandLineCodeExecutor(image="python:3.12-slim", timeout=60) |
| Jupyter executor | DockerJupyterCodeExecutor(image="quay.io/jupyter/minimal-notebook") |
| User proxy | UserProxyAgent("user", input_func=input) |
| Round-robin team | RoundRobinGroupChat([a1, a2], termination_condition=...) |
| Selector team | SelectorGroupChat([a1, a2, a3], model_client=client, ...) |
| Swarm + handoff | Swarm([a, b], termination_condition=HandoffTermination(target="user")) |
| Run (await) | await agent.run(task="...") |
| Stream output | await Console(agent.run_stream(task="...")) |
| Stream events | async for ev in agent.run_stream(task="..."): |
| Stop on text | TextMentionTermination("DONE") |
| Stop on turns | MaxMessageTermination(10) |
| Stop on time | TimeoutTermination(60) |
| Stop on handoff | HandoffTermination(target="user") |
| Combine stop conds | Cond1() | Cond2() (OR) or Cond1() & Cond2() (AND) |
| Add tool | tools=[FunctionTool(fn, description="...")] |
| Memory | memory=[ListMemory()] |
| Structured output | output_content_type=PydanticModel |
| Save state | state = await agent.save_state() |
| Restore state | await agent.load_state(state) |
| Token usage | model_client.total_usage() |