Scaling Agents: The Definitive Open-Source Guide — From 1 Agent to 100 Agents, 1 Tool to 100 Tools, Managing Context
7th March 2026
The Scaling Problem Space: Three Walls
| Scale | Wall 1: Tools | Wall 2: Agents | Wall 3: Context |
|---|---|---|---|
| Trivial | 1 tool | 1 agent | 4K tokens |
| Fine | 5 tools | 3 agents | 32K tokens |
| Danger zone | 15 tools — model confused | 10 agents — coordination hard | 128K tokens — risky |
| Broken | 50 tools — wrong tool selection | 50 agents — chaos | 500K+ tokens — overflow |
| Impossible without architecture | 100 tools | 100 agents | Multi-agent × long context |
| Solution | Tool routing & namespacing | Hierarchy & graphs | Summarization & scoping |
Tool Wall: Every tool’s JSON schema goes into the system prompt. At 15+ tools, models start selecting wrong tools. At 50+, token usage explodes and accuracy plummets.
Agent Wall: N agents with full connectivity = N×(N-1)/2 communication links. 10 agents = 45 links. 100 agents = 4,950 links. Without hierarchy, coordination becomes impossible.
Context Wall: LLMs have finite context windows. As conversations grow, older information gets pushed out. Cross-agent context propagation multiplies the problem.
Single Agent: Getting Started
LangGraph — Single Agent
# LangGraph uses StateGraph for agent construction
# Source: langgraph/graph/state.py - StateGraph class
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_openai import ChatOpenAI
model = ChatOpenAI(model="gpt-4o")
class AgentState(MessagesState):
pass
def agent_node(state: AgentState):
response = model.invoke(state["messages"])
return {"messages": [response]}
def should_continue(state: AgentState):
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return END
# Build graph
graph = StateGraph(AgentState)
graph.add_node("agent", agent_node)
graph.add_node("tools", tool_node)
graph.add_edge(START, "agent")
graph.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END})
graph.add_edge("tools", "agent")
app = graph.compile()
LangGraph Architecture (from source):
- StateGraph compiles to Pregel (Google’s graph processing framework concept)
- State is managed through channels (LastValue, BinaryOperatorAggregate)
- Each node reads/writes to channels
- Supports checkpointing via BaseCheckpointSaver (memory, SQLite, Postgres)
AutoGen — Single Agent
# AutoGen uses ChatAgent protocol
# Source: autogen-agentchat/src/autogen_agentchat/base
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent(
name="assistant",
model_client=model_client,
tools=[my_tool],
system_message="You are a helpful assistant.",
)
result = await agent.run(task="What is 2+2?")
AutoGen Architecture (from source):
- Two-layer design: autogen-core (message passing runtime) + autogen-agentchat (high-level API)
- Core uses AgentRuntime with pub/sub messaging (SingleThreadedAgentRuntime, GrpcWorkerAgentRuntimeHost)
- AgentChat provides ChatAgent protocol that wraps core agents
CrewAI — Single Agent
# CrewAI uses Agent + Task + Crew trinity
# Source: crewai/crew.py, crewai/task.py
from crewai import Agent, Task, Crew
agent = Agent(
role="Researcher",
goal="Find accurate information",
backstory="Expert researcher with 10 years experience",
tools=[search_tool],
llm="gpt-4o",
)
task = Task(
description="Research the latest AI trends",
expected_output="A comprehensive report",
agent=agent,
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
)
result = crew.kickoff()
CrewAI Architecture (from source):
- Crew orchestrates Agents executing Tasks
- Process enum: sequential or hierarchical
- Flow class for complex DAG workflows with @start, @listen, @router decorators
- Built-in evaluation via AgentEvaluator with metric categories: TOOL_SELECTION, REASONING, SEMANTIC_QUALITY, GOAL
Strands Agents — Single Agent
# Strands uses a simple Agent class
# Source: strands-agents/src/strands/agent/agent.py
from strands import Agent, tool
@tool
def calculate(expression: str) -> str:
"""Calculate a mathematical expression."""
return str(eval(expression))
agent = Agent(
model="us.amazon.nova-pro-v1:0",
system_prompt="You are a math assistant.",
tools=[calculate],
)
result = agent("What is 15 * 23?")
Strands Architecture (from source):
- Minimal core: Agent with tools, model, and conversation manager
- Tools via @tool decorator or MCP servers
- Conversation management: SlidingWindowConversationManager, SummarizingConversationManager
- Multi-agent: Swarm (dynamic handoffs) and Graph (deterministic DAG)
- Session persistence via SessionManager hook system
Agno — Single Agent
# Agno uses Agent with rich configuration
# Source: agno/agent/agent.py
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.duckduckgo import DuckDuckGoTools
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[DuckDuckGoTools()],
instructions=["Always verify facts"],
markdown=True,
show_tool_calls=True,
)
agent.print_response("Latest news on AI agents")
Agno Architecture (from source):
- Agent dataclass with extensive configuration (200+ params)
- Team class with TeamMode: coordinate, route, broadcast, tasks
- Workflow for complex multi-step processes with Step definitions
- Built-in memory via MemoryManager, storage via BaseDb, knowledge via KnowledgeProtocol
DSPy — Single Module/Agent
# DSPy uses Modules with Signatures
# Source: dspy/predict/react.py
import dspy
lm = dspy.LM("openai/gpt-4o")
dspy.configure(lm=lm)
def get_weather(city: str) -> str:
"""Get weather for a city."""
return f"Sunny in {city}"
react = dspy.ReAct(
signature="question -> answer",
tools=[get_weather],
max_iters=20,
)
result = react(question="What's the weather in Tokyo?")
DSPy Architecture (from source):
- Module base class with composable Predict, ChainOfThought, ReAct
- Signature defines typed input/output contracts
- Tool wraps callables with name, description, args schema
- Unique: Optimizers (MIPROv2, BootstrapFewShot) that automatically tune prompts
- Evaluate module for systematic evaluation with metrics
Multi-Agent Patterns (From Source Code)
Pattern Taxonomy
┌─────────────────┬──────────────────┬────────────────┬──────────────────────┐
│ SEQUENTIAL │ FAN-OUT/IN │ HIERARCHICAL │ DYNAMIC/SWARM │
│ A → B → C │ Aggregator │ Manager │ (self-organizing) │
│ │ ↗ ↑ ↖ │ / | \ │ │
│ • LangGraph │ A B C │ A B C │ • Strands Swarm │
│ • CrewAI seq │ │ │ • AutoGen Swarm │
│ • AgentScope │ • AgentScope │ • CrewAI hier │ • Agno tasks mode │
│ Sequential │ Fanout │ • Agno coord │ │
│ Pipeline │ • LangGraph │ • AutoGen │ │
│ │ parallel │ Selector │ │
├─────────────────┼──────────────────┼────────────────┼──────────────────────┤
│ GRAPH/DAG │ ROUTER │ NESTED TEAMS │ DISTRIBUTED │
│ │ │ │ │
│ • LangGraph │ • Agno route │ │ │
│ • Strands Graph │ • AutoGen │ • AutoGen │ • AutoGen gRPC │
│ • AutoGen Graph │ Selector │ nested teams │ Runtime │
│ │ • Strands │ • Agno nested │ • Mastra server │
│ │ (conditional) │ │ adapters │
└─────────────────┴──────────────────┴────────────────┴──────────────────────┘
LangGraph — Multi-Agent via Subgraphs
# LangGraph scales via subgraphs (graphs within graphs)
# Source: langgraph/graph/state.py - StateGraph supports nesting
# Each team is its own StateGraph
research_team = StateGraph(ResearchState)
research_team.add_node("searcher", search_agent)
research_team.add_node("analyst", analysis_agent)
research_compiled = research_team.compile()
writing_team = StateGraph(WritingState)
writing_team.add_node("writer", write_agent)
writing_compiled = writing_team.compile()
# Orchestrator graph uses teams as nodes
orchestrator = StateGraph(OrchestratorState)
orchestrator.add_node("research", research_compiled) # Subgraph!
orchestrator.add_node("writing", writing_compiled) # Subgraph!
orchestrator.add_edge("research", "writing")
app = orchestrator.compile(
checkpointer=PostgresSaver(conn), # Persistence
)
LangGraph Scaling Mechanisms (from source):
- Subgraphs: StateGraph can contain other compiled StateGraphs
- Namespaced State: Child graphs have isolated state via NS_SEP
- Send API: Dynamically spawn parallel branches with Send(“node”, payload)
- Checkpointing: PostgresSaver, SqliteSaver, MemorySaver for state persistence
- Store: BaseStore (Postgres-backed) for cross-thread shared memory
- RetryPolicy: Built-in retry with configurable backoff
- CachePolicy: Cache node results to avoid recomputation
- Command: Command(goto=..., update=...) for dynamic graph navigation
AutoGen — Multi-Agent Teams
# AutoGen provides multiple team patterns
# Source: autogen-agentchat/teams/_group_chat/
from autogen_agentchat.teams import (
RoundRobinGroupChat, # Agents take turns
SelectorGroupChat, # LLM selects next speaker
Swarm, # Tool-based handoffs
GraphFlow, # DAG-based execution
)
# SelectorGroupChat - LLM picks next speaker
team = SelectorGroupChat(
participants=[agent_a, agent_b, agent_c],
model_client=selector_model,
termination_condition=MaxMessageTermination(10),
selector_prompt="Select the most appropriate agent...",
allow_repeated_speaker=False,
)
# GraphFlow - DAG with conditional edges
builder = DiGraphBuilder()
builder.add_node(agent_a).add_node(agent_b).add_node(agent_c)
builder.add_edge(agent_a, agent_b)
builder.add_edge(agent_b, agent_a,
condition=lambda msg: "loop" in msg.to_model_text())
builder.add_edge(agent_b, agent_c,
condition=lambda msg: "done" in msg.to_model_text())
team = GraphFlow(
participants=builder.get_participants(),
graph=builder.build(),
termination_condition=MaxMessageTermination(20),
)
# NESTED TEAMS - Teams as participants in other teams
inner_team = RoundRobinGroupChat(
participants=[assistant, code_executor],
termination_condition=TextMentionTermination("TERMINATE"),
)
outer_team = RoundRobinGroupChat(
participants=[inner_team, reviewer], # Team as participant!
termination_condition=TextMentionTermination("TERMINATE"),
)
AutoGen Scaling Mechanisms (from source):
- Team Nesting: Any Team implements ChatAgent, so teams compose into teams
- AgentRuntime: Message-passing runtime (SingleThreadedAgentRuntime or GrpcWorkerAgentRuntimeHost)
- Distributed Runtime: GrpcWorkerAgentRuntimeHost for cross-process agents
- SelectorGroupChat: LLM-based dynamic routing with selector_func and candidate_func overrides
- GraphFlow: DiGraphBuilder with conditional edges, activation groups, loop support
- Termination Conditions: MaxMessageTermination, TextMentionTermination, composable with | and &
- State Management: TeamState for pause/resume
CrewAI — Crews, Flows, and Hierarchical Process
# CrewAI multi-agent via Process and Flow
# Source: crewai/crew.py, crewai/flow/flow.py
# HIERARCHICAL PROCESS - Manager agent coordinates
crew = Crew(
agents=[researcher, writer, editor],
tasks=[research_task, write_task, edit_task],
process=Process.hierarchical,
manager_llm="gpt-4o",
memory=True,
)
# FLOW - Complex DAG with @start, @listen, @router decorators
from crewai.flow.flow import Flow, start, listen, router
class ContentPipeline(Flow):
@start()
def research(self):
crew = Crew(agents=[researcher], tasks=[research_task])
return crew.kickoff()
@listen(research)
def write(self, research_result):
crew = Crew(agents=[writer], tasks=[write_task])
return crew.kickoff(inputs={"research": research_result})
@router(write)
def quality_check(self, write_result):
if quality_score(write_result) > 0.8:
return "publish"
return "revise"
@listen("publish")
def publish(self, content):
return publish_content(content)
@listen("revise")
def revise(self, content):
return self.write(content)
pipeline = ContentPipeline()
result = pipeline.kickoff()
CrewAI Scaling Mechanisms (from source):
- Process Types: sequential (chain) and hierarchical (manager delegates)
- Flow Engine: Full DAG workflow with @start, @listen, @router decorators
- Memory System: MemoryScope with path-based scoping (/agent/1, /crew/shared)
- Evaluation: AgentEvaluator with ToolSelectionEvaluator, ReasoningMetrics, GoalMetrics, SemanticQualityMetrics
- Knowledge: BaseKnowledgeSource with RAG integration
Strands Agents — Graph and Swarm
# Strands provides two multi-agent patterns
# Source: strands/multiagent/graph.py, strands/multiagent/swarm.py
from strands import Agent
from strands.multiagent.graph import GraphBuilder
# GRAPH PATTERN - Deterministic DAG
builder = GraphBuilder()
math_node = builder.add_node(math_agent)
analysis_node = builder.add_node(analysis_agent)
summary_node = builder.add_node(summary_agent)
builder.add_edge(math_node, analysis_node)
builder.add_edge(analysis_node, summary_node)
builder.set_entry_point(math_node.node_id)
# Safety controls
builder.set_max_node_executions(50)
builder.set_execution_timeout(300.0)
builder.set_node_timeout(60.0)
builder.reset_on_revisit(True)
graph = builder.build()
result = graph("Calculate 15 * 23, analyze the result, then summarize")
# SWARM PATTERN - Dynamic agent handoffs
from strands.multiagent.swarm import Swarm
swarm = Swarm(
nodes=[sales_agent, support_agent, billing_agent],
entry_point=sales_agent,
max_handoffs=20,
max_iterations=20,
execution_timeout=900.0,
node_timeout=300.0,
repetitive_handoff_detection_window=5,
repetitive_handoff_min_unique_agents=3,
)
result = swarm("I need help with my bill")
# NESTED: Graph can contain Swarm as a node (and vice versa)
builder = GraphBuilder()
builder.add_node(swarm) # Swarm as a graph node!
builder.add_node(summary_agent)
builder.add_edge("default_swarm", summary_agent)
Strands Scaling Mechanisms (from source):
- GraphBuilder: Fluent API for DAG construction with add_node, add_edge, set_entry_point
- GraphState: Tracks completed_nodes, failed_nodes, execution_order, accumulated_usage
- Conditional Edges: GraphEdge.condition is Callable[[GraphState], bool]
- SwarmState: Tracks current_node, shared_context, node_history, handoff_node
- SharedContext: JSON-serializable key-value store shared across swarm agents
- Repetitive Handoff Detection: Monitors recent agent history for stuck loops
- Composability: MultiAgentBase can be nested — Graph in Graph, Swarm in Graph
- A2A Protocol: strands/multiagent/a2a/ for inter-service agent communication
Agno — Teams with Multiple Modes
# Agno provides Team with configurable execution modes
# Source: agno/team/team.py, agno/team/mode.py
from agno.agent import Agent
from agno.team import Team, TeamMode
# COORDINATE MODE - Leader picks members, crafts tasks, synthesizes
team = Team(
name="Research Team",
mode=TeamMode.coordinate,
members=[researcher, writer, editor],
model=OpenAIChat(id="gpt-4o"),
instructions=["Produce high-quality research reports"],
max_iterations=10,
)
# ROUTE MODE - Leader routes to single specialist
team = Team(
name="Support Router",
mode=TeamMode.route,
members=[billing_agent, tech_agent, general_agent],
respond_directly=True,
)
# BROADCAST MODE - Same task to all members simultaneously
team = Team(
name="Consensus Team",
mode=TeamMode.broadcast,
members=[analyst_1, analyst_2, analyst_3],
delegate_to_all_members=True,
)
# TASKS MODE - Autonomous task decomposition and execution
team = Team(
name="Project Team",
mode=TeamMode.tasks,
members=[designer, developer, tester],
max_iterations=20,
share_member_interactions=True,
)
# NESTED TEAMS
inner_team = Team(members=[agent_a, agent_b], mode=TeamMode.coordinate)
outer_team = Team(
members=[inner_team, agent_c], # Team as member!
mode=TeamMode.coordinate,
)
Tool Scaling: 1 → 100 Tools
The Tool Scaling Problem
Every framework sends tool schemas to the LLM. More tools = more tokens consumed + worse selection accuracy.
| Tools | Token Cost | Selection Accuracy |
|---|---|---|
| 1-5 | ~500 tokens | ~98% |
| 5-15 | ~2K tokens | ~90% |
| 15-30 | ~5K tokens | ~75% (danger zone) |
| 30-50 | ~10K tokens | ~50% (effectively broken) |
| 50+ | ~20K+ tokens | ~30% (random guessing) |
Strategy 1: Agent Specialization (All Frameworks)
# Instead of 1 agent with 30 tools, use 3 agents with 10 each
finance_agent = Agent(tools=[stock_price, portfolio, dividends, earnings, ...]) # 8 tools
data_agent = Agent(tools=[sql_query, csv_read, json_parse, aggregate, ...]) # 7 tools
email_agent = Agent(tools=[send_email, read_inbox, search_mail, ...]) # 5 tools
# Router decides which specialist handles the request
swarm = Swarm(
nodes=[finance_agent, data_agent, email_agent],
entry_point=finance_agent,
)
Strategy 2: Dynamic Tool Loading
# Strands - MCP servers load tools on-demand
agent = Agent(
tools=[
"mcp://localhost:3000/finance-tools",
"mcp://localhost:3001/data-tools",
]
)
# LangGraph - Tools bound per-node, not per-graph
def research_node(state):
model_with_tools = model.bind_tools([search, wiki_lookup]) # Only these tools
return {"messages": [model_with_tools.invoke(state["messages"])]}
def analysis_node(state):
model_with_tools = model.bind_tools([calculator, chart]) # Different tools
return {"messages": [model_with_tools.invoke(state["messages"])]}
Strategy 3: Tool Namespacing and Categorization
# Agno - Toolkit grouping
from agno.tools import Toolkit
class FinanceToolkit(Toolkit):
def __init__(self):
super().__init__(name="finance")
self.register(self.get_stock_price)
self.register(self.get_portfolio)
self.register(self.get_dividends)
def get_stock_price(self, symbol: str) -> str: ...
def get_portfolio(self, user_id: str) -> str: ...
def get_dividends(self, symbol: str) -> str: ...
agent = Agent(tools=[FinanceToolkit()])
Tool Scaling Architecture
┌────────────────────────────────────────────────────────────────────┐
│ TOOL SCALING ARCHITECTURE │
├────────────────────────────────────────────────────────────────────┤
│ │
│ Layer 1: TOOL REGISTRY (per agent, max 10-15) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Finance │ │Data │ │Email │ │File │ │
│ │Agent │ │Agent │ │Agent │ │Agent │ │
│ │8 tools │ │7 tools │ │5 tools │ │6 tools │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ Layer 2: ROUTING (agent selection) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Router Agent (0 tools, just routes) │ │
│ │ "I need stock data" → Finance Agent │ │
│ │ "Send an email" → Email Agent │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Layer 3: MCP SERVERS (external tool providers) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │Finance │ │Database │ │Cloud │ │
│ │MCP Server│ │MCP Server│ │MCP Server│ │
│ │20 tools │ │15 tools │ │25 tools │ │
│ └──────────┘ └──────────┘ └──────────┘ │
└────────────────────────────────────────────────────────────────────┘
Agent Scaling: 1 → 100 Agents
The Scaling Progression
| Agents | Pattern & Framework Recommendation |
|---|---|
| 1 | Single agent with tools — any framework works |
| 2-5 | Strands: Swarm with handoff tools / AutoGen: SelectorGroupChat / LangGraph: Conditional edges / Agno: Team(mode=route) / CrewAI: Hierarchical process |
| 10-25 | Nested teams/subgraphs with department structure — LangGraph: Subgraphs / AutoGen: Nested teams / Strands: Graph containing Swarms / Agno: Nested Teams / CrewAI: Flow with multiple Crews |
| 25-50 | Multi-level subgraph nesting — AutoGen: Distributed runtime (GrpcWorkerAgentRuntime) / Strands: A2A protocol / CrewAI: Flow orchestrating multiple Crews |
| 50-100 | Distributed microservices with A2A/MCP protocols — AutoGen: GrpcWorkerAgentRuntimeHost / Strands: A2A server/client / Agno: RemoteTeam / Mastra: Server adapters |
Why Hierarchy Works
FLAT (100 agents, full connectivity):
Links = 100 × 99 / 2 = 4,950 ← UNMANAGEABLE
HIERARCHICAL (100 agents in 5 departments of 20):
Each agent only knows ~20 others max
Total effective links per agent: ~20 (not 99)
3-LEVEL HIERARCHY (5 divisions × 4 teams × 5 agents):
Each agent sees at most: 4 peers + 1 lead = 5 agents (not 99!)
Implementation: Nested Hierarchy
# AutoGen — Nested Teams (cleanest nesting support)
researcher = AssistantAgent("researcher", model_client=client)
writer = AssistantAgent("writer", model_client=client)
code_gen = AssistantAgent("code_gen", model_client=client)
code_review = AssistantAgent("code_review", model_client=client)
designer = AssistantAgent("designer", model_client=client)
qa_tester = AssistantAgent("qa_tester", model_client=client)
# Level 2: Department teams
content_team = SelectorGroupChat(
participants=[researcher, writer],
model_client=selector_client,
termination_condition=MaxMessageTermination(6),
)
engineering_team = RoundRobinGroupChat(
participants=[code_gen, code_review],
termination_condition=MaxMessageTermination(6),
)
design_team = RoundRobinGroupChat(
participants=[designer, qa_tester],
termination_condition=MaxMessageTermination(4),
)
# Level 3: Organization (teams as participants)
organization = SelectorGroupChat(
participants=[content_team, engineering_team, design_team],
model_client=executive_client,
termination_condition=MaxMessageTermination(20),
)
result = await organization.run(task="Build a landing page")
# Strands — Nested Graph + Swarm
# Department as Swarm
support_swarm = Swarm(
nodes=[billing_agent, tech_agent, general_agent],
entry_point=general_agent,
max_handoffs=10,
)
# Organization as Graph containing Swarms
org_graph = GraphBuilder()
intake_node = org_graph.add_node(intake_agent)
support_node = org_graph.add_node(support_swarm) # Swarm as node!
escalation_node = org_graph.add_node(escalation_agent)
org_graph.add_edge(intake_node, support_node)
org_graph.add_edge(support_node, escalation_node,
condition=lambda state: "escalation" in str(state.results))
org_graph.set_entry_point(intake_node.node_id)
org = org_graph.build()
Distributed Agent Scaling
# AutoGen — Distributed via gRPC Runtime
# Host process
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntimeHost
host = GrpcWorkerAgentRuntimeHost(address="localhost:50051")
host.start()
# Worker process 1
from autogen_ext.runtimes.grpc import GrpcWorkerAgentRuntime
runtime = GrpcWorkerAgentRuntime(host_address="localhost:50051")
runtime.start()
await Writer.register(runtime, "writer", lambda: Writer(...))
# Worker process 2
runtime = GrpcWorkerAgentRuntime(host_address="localhost:50051")
runtime.start()
await Editor.register(runtime, "editor", lambda: Editor(...))
Context Window Management
The Context Problem
┌──────────────────────────────────────────────────────────────┐
│ CONTEXT WINDOW CONSUMPTION │
├──────────────────────────────────────────────────────────────┤
│ │
│ System prompt: ~500-2000 tokens │
│ Tool schemas (10 tools): ~2000 tokens │
│ Conversation history: GROWS UNBOUNDED │
│ Tool results: 500-10000 tokens EACH │
│ Cross-agent context: MULTIPLIED per agent │
│ │
│ Example: 5 agents × 10 tools × 20 turns │
│ = 34,000 × 5 = 170,000 tokens ← OVER most context windows │
│ │
└──────────────────────────────────────────────────────────────┘
Strands: Conversation Managers
# 1. SLIDING WINDOW - Drop old messages
from strands.agent.conversation_manager import SlidingWindowConversationManager
agent = Agent(
conversation_manager=SlidingWindowConversationManager(
window_size=20,
),
)
# 2. SUMMARIZING - Compress old context into summary
from strands.agent.conversation_manager import SummarizingConversationManager
agent = Agent(
conversation_manager=SummarizingConversationManager(
summary_ratio=0.3,
summary_agent=Agent(model="us.amazon.nova-lite-v1:0"),
),
)
# 3. NULL - No management (for short tasks)
from strands.agent.conversation_manager import NullConversationManager
agent = Agent(conversation_manager=NullConversationManager())
LangGraph: State Reducers and Message Trimming
from langgraph.graph import MessagesState
from langchain_core.messages import trim_messages
class AgentState(MessagesState):
summary: str
def summarize_conversation(state):
messages = state["messages"]
if len(messages) > 20:
summary = llm.invoke(f"Summarize: {messages[:10]}")
return {
"messages": messages[10:],
"summary": summary.content,
}
return state
# Or use built-in trimming
trimmed = trim_messages(
messages,
max_tokens=4000,
strategy="last",
token_counter=ChatOpenAI(model="gpt-4o"),
)
AutoGen: Model Context
from autogen_core.model_context import (
UnboundedChatCompletionContext,
BufferedChatCompletionContext,
)
agent = AssistantAgent(
"assistant",
model_client=client,
model_context=BufferedChatCompletionContext(buffer_size=20),
)
Cross-Agent Context Strategies
┌─────────────────────────────────────────────────────────────────────┐
│ CONTEXT ISOLATION vs SHARING STRATEGIES │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ FULL ISOLATION (Graph pattern) │
│ Agent A ──output only──> Agent B ──output only──> Agent C │
│ Each agent has own context. Only final output passed forward. │
│ │
│ SHARED CONTEXT (Swarm pattern) │
│ SharedContext: {"agent_a": {"key": "value"}} │
│ All agents read/write to shared JSON key-value store. │
│ │
│ SCOPED SHARING (CrewAI Memory pattern) │
│ /global/ ← All agents can read │
│ /department/eng/ ← Only engineering agents │
│ /agent/coder/ ← Only the coder agent │
│ │
│ BROADCAST (AgentScope MsgHub) │
│ All messages visible to all participants (like a chat room) │
│ Simple but O(N²) context growth │
└─────────────────────────────────────────────────────────────────────┘
Memory Systems Across Frameworks
| Layer | Framework Support |
|---|---|
| Layer 1: Working Memory (conversation history) | All frameworks: messages list / Strands: SummarizingConversationManager / AutoGen: BufferedChatCompletionContext / Agno: CompressionManager / AgentScope: InMemoryMemory, RedisMemory / LangGraph: trim_messages |
| Layer 2: Shared Team Memory | Strands: SharedContext (JSON KV in Swarm) / LangGraph: Graph state (channels with reducers) / AutoGen: Shared message history in group chats / CrewAI: MemoryScope (/crew/shared/) / Agno: share_member_interactions |
| Layer 3: Session Persistence | LangGraph: PostgresSaver, SqliteSaver / Strands: SessionManager hook system / AutoGen: TeamState for pause/resume / AgentScope: SQLAlchemy-backed WorkingMemory |
| Layer 4: Long-Term Knowledge (Vector DB / RAG) | Agno: 20+ vector DB integrations (pgvector, pinecone, qdrant, etc.) / CrewAI: Knowledge with BaseKnowledgeSource + RAG / LangGraph: BaseStore with embedding search / AgentScope: LongTermMemory via Mem0 / Strands: MCP-based knowledge tools |
Verification & Testing Scaled Agents
CrewAI: Built-in Evaluation
from crewai.evaluation import AgentEvaluator
from crewai.evaluation.metrics import (
ToolSelectionEvaluator,
ReasoningMetrics,
GoalMetrics,
SemanticQualityMetrics,
)
evaluator = AgentEvaluator(
agents=[agent1, agent2],
evaluators=[
ToolSelectionEvaluator(),
ReasoningMetrics(),
GoalMetrics(),
SemanticQualityMetrics(),
],
)
# A/B testing with ExperimentRunner
from crewai.experimental.evaluation.experiment import ExperimentRunner
runner = ExperimentRunner(
crew=crew,
iterations=10,
inputs_list=[{"topic": "AI"}, {"topic": "ML"}, ...],
)
results = runner.run()
DSPy: Systematic Optimization and Evaluation
import dspy
def accuracy_metric(example, prediction, trace=None):
return example.answer == prediction.answer
# Evaluate module
evaluator = dspy.Evaluate(
devset=dev_examples,
metric=accuracy_metric,
num_threads=4,
)
score = evaluator(my_module)
# Auto-tune prompts for better performance
from dspy.teleprompt import MIPROv2
optimizer = MIPROv2(
metric=accuracy_metric,
auto="medium",
)
optimized_module = optimizer.compile(
my_module,
trainset=train_examples,
)
Verification Checklist for Scaled Systems
┌─────────────────────────────────────────────────────────────────────┐
│ SCALED AGENT VERIFICATION CHECKLIST │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 1. TOOL SELECTION ACCURACY │
│ □ Each agent selects correct tool >90% of the time │
│ □ No agent has more than 15 tools │
│ □ Tool descriptions are unambiguous │
│ │
│ 2. ROUTING ACCURACY │
│ □ Correct agent selected >90% of the time │
│ □ No infinite handoff loops (repetitive_handoff_detection) │
│ │
│ 3. CONTEXT INTEGRITY │
│ □ Critical information survives summarization │
│ □ Cross-agent handoffs don't lose key data │
│ □ Context window never exceeds model limit │
│ │
│ 4. SAFETY & BOUNDS │
│ □ max_handoffs prevents infinite loops │
│ □ execution_timeout prevents runaway │
│ □ max_turns prevents infinite chat │
│ □ max_iterations prevents infinite loops │
│ │
│ 5. COST & LATENCY │
│ □ Total tokens per request within budget │
│ □ P95 latency acceptable for use case │
│ □ Cheap models used for routing/summarization │
│ │
│ 6. STATE PERSISTENCE │
│ □ Sessions survive process restarts │
│ □ Interrupted workflows resume correctly │
│ □ State doesn't grow unbounded │
│ │
│ 7. OUTPUT QUALITY │
│ □ Output meets stated goal (CrewAI GoalMetrics) │
│ □ Output is semantically coherent (SemanticQualityMetrics) │
│ □ Reasoning is logical (ReasoningMetrics) │
└─────────────────────────────────────────────────────────────────────┘
Testing Strategy by Scale
# LEVEL 1: Unit Test Individual Agents
def test_single_agent_tool_selection():
agent = Agent(tools=[search, calculate])
result = agent("What is 2+2?")
assert "calculate" in str(result.tool_calls)
# LEVEL 2: Integration Test Agent Pairs
def test_handoff():
swarm = Swarm(
nodes=[router_agent, specialist_agent],
max_handoffs=3,
)
result = swarm("billing question")
assert "specialist" in str(result.node_history)
# LEVEL 3: System Test Full Pipeline
def test_full_pipeline_e2e():
graph = build_full_pipeline()
test_cases = [
{"input": "Research AI trends", "expected_nodes": 3},
{"input": "Debug this code", "expected_nodes": 2},
]
for tc in test_cases:
result = graph(tc["input"])
assert len(result.execution_order) == tc["expected_nodes"]
# LEVEL 4: Load Test
async def test_concurrent_requests():
graph = build_production_graph()
tasks = [graph.invoke_async(f"Request {i}") for i in range(50)]
results = await asyncio.gather(*tasks, return_exceptions=True)
success_rate = sum(1 for r in results if not isinstance(r, Exception)) / len(results)
assert success_rate > 0.95
Observability & Monitoring
| Framework | Observability Features |
|---|---|
| LangGraph | LangSmith integration (traces, runs) / Checkpoint inspection (get_state, get_state_history) / Stream events per node |
| AutoGen | OpenTelemetry tracing / Agent registry / Message-level logging |
| CrewAI | Event bus (crewai_event_bus) with typed events / TraceCollectionListener for OpenTelemetry / AgentEvaluator with 4 metric categories / ExperimentRunner for A/B testing |
| Strands | Hook system (Before/After events for all ops) / OpenTelemetry tracing (get_tracer) / GraphState/SwarmState with execution metrics |
| Agno | OpenTelemetry tracing / RunMetrics, SessionMetrics tracking / Registry for agent discovery |
| AgentScope | OpenTelemetry tracing module / Trace extractors and converters / Token counters per model provider |
| DSPy | dspy.inspect_history() for prompt debugging / Evaluate module with metrics / Cost tracking, latency monitoring |
| Langfuse | Universal observability layer / Prompt versioning, generations tracking / Works with ALL frameworks above |
Key Metrics to Track
metrics = {
# Per-request metrics
"total_tokens": 0,
"total_cost_usd": 0.0,
"latency_ms": 0,
"num_agents_invoked": 0,
"num_tool_calls": 0,
"num_handoffs": 0,
# Quality metrics
"goal_achieved": True,
"tool_accuracy": 0.95,
# Safety metrics
"loop_detected": False,
"context_overflow": False,
"max_depth_reached": False,
}
Framework Comparison Matrix
| Feature | LangGraph | AutoGen | CrewAI | Strands | Agno | AgentScope | DSPy |
|---|---|---|---|---|---|---|---|
| Single Agent | StateGraph | Assistant Agent | Agent+Task | Agent | Agent | AgentBase | Module |
| Multi-Agent | Subgraphs | Teams | Crew+Flow | Graph+Swarm | Team | Pipeline+MsgHub | Compose modules |
| DAG/Graph | Native | GraphFlow | Flow | GraphBuilder | - | Seq+Fanout | - |
| Dynamic Routing | Conditional edges | SelectorGroupChat | Hierarchical | Swarm | TeamMode.route | MsgHub | - |
| Distributed | LangGraph Platform | gRPC Runtime | - | A2A Protocol | RemoteTeam | - | - |
| Checkpointing | Native (Postgres/SQLite) | TeamState | FlowPersistence | SessionManager | SessionManager | SQLAlchemy | - |
| Evaluation | Manual | Manual | Built-in 4 metrics | Hooks | BaseEval | Tuner | Native Evaluate+Optimize |
| Safety Guards | Retry+Cache | Termination Cond | Guardrails | Max handoffs+timeout | Guardrails | - | max_iters |
| Languages | Python+JS | Python+.NET | Python | Python | Python | Python | Python |
| Best For | Complex stateful workflows | Enterprise distributed | Quick teams w/eval | AWS/prod agents | Full-stack agents | Research+distrib | Prompt optim+eval |
Step-by-Step Scaling Playbook
Phase 1: Single Agent (Week 1)
- Pick your framework based on needs
- Build single agent with 3-5 tools
- Test tool selection accuracy (should be >95%)
- Add conversation management (sliding window or summarizing)
- Add session persistence
Phase 2: Agent Pair (Week 2-3)
- Identify a natural split (Research → Analysis, Generate → Review, Route → Specialist)
- Build the pair: Sequential (Graph/Pipeline) or Router + Specialist (Swarm/Selector)
- Define the handoff contract with structured output (Pydantic models)
- Test: correct agent selected, context transfers completely, no infinite loops
Phase 3: Small Team (Week 4-6)
- Organize: 3-4 specialists with 5-8 tools each + 1 router/coordinator
- Choose pattern: Known workflow → Graph / Dynamic routing → Swarm / Both → Swarm with conditional edges
- Add safety: max_handoffs=15, execution_timeout=300, repetitive_handoff_detection_window=5
- Add shared context (Strands SharedContext / LangGraph state channels / CrewAI Memory scopes)
Phase 4: Department Structure (Week 7-10)
Orchestrator (1 agent)
├── Research Team (5 agents)
│ ├── Web Researcher
│ ├── Paper Analyst
│ ├── Data Collector
│ ├── Fact Checker
│ └── Summarizer
├── Engineering Team (5 agents)
│ ├── Architect
│ ├── Frontend Dev
│ ├── Backend Dev
│ ├── Code Reviewer
│ └── DevOps
└── QA Team (3 agents)
├── Test Designer
├── Test Runner
└── Bug Reporter
Each team is self-contained: own shared context, own conversation management, own safety limits, exposes single interface to orchestrator. Cross-team communication via structured outputs only.
Phase 5: Organization Scale (Week 11+)
┌─────────────┐
│ Orchestrator │ (API Gateway)
│ Service │
└──────┬───────┘
│ A2A/gRPC/REST
┌──────┼──────┬───────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌─────┐┌─────┐┌─────┐ ┌─────┐
│Team ││Team ││Team │ │Team │
│Svc 1││Svc 2││Svc 3│ │Svc 4│
└─────┘└─────┘└─────┘ └─────┘
Move to distributed architecture: AutoGen GrpcWorkerAgentRuntimeHost, Strands A2A protocol, Agno RemoteTeam. Each team runs as independent service. Add observability (Langfuse, OpenTelemetry) and rate limiting/cost controls.
Production Architecture Patterns
The “Golden Path” Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ PRODUCTION MULTI-AGENT SYSTEM │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────┐ │
│ │ API Gateway │ ← Rate limiting, auth │
│ └────────────┬─────────────┘ │
│ │ │
│ ┌────────────┼────────────┬────────────────┐ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Research│ │Execute │ │Support │ │Analysis│ ← Each is a │
│ │Team │ │Team │ │Team │ ... │Team │ self-contained │
│ │3-5 agt │ │3-5 agt │ │3-5 agt │ │3-5 agt │ Graph or Swarm │
│ │8-12 │ │8-12 │ │8-12 │ │8-12 │ │
│ │tools │ │tools │ │tools │ │tools │ │
│ └────────┘ └────────┘ └────────┘ └────────┘ │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ Shared Infrastructure │ │
│ │ ┌─────────┐ ┌──────────┐ ┌───────────┐ │ │
│ │ │Session │ │Memory/ │ │Observabil-│ │ │
│ │ │Store │ │Knowledge │ │ity (OTel) │ │ │
│ │ │(Postgres│ │(VectorDB)│ │(Langfuse) │ │ │
│ │ └─────────┘ └──────────┘ └───────────┘ │ │
│ └──────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
Cost Optimization Rules
# RULE 1: Use cheap models for routing/summarization (GPT-4o-mini, Nova Lite)
# RULE 2: Cache tool results aggressively
# RULE 3: Don't run all agents — route and run the 3-5 that matter
# RULE 4: Structured output reduces token waste
from pydantic import BaseModel
class ResearchResult(BaseModel):
summary: str
key_findings: list[str]
confidence: float
agent = Agent(output_type=ResearchResult) # No rambling
The 8 Golden Rules of Agent Scaling
┌─────────────────────────────────────────────────────────────────────┐
│ 8 GOLDEN RULES │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 1. NO AGENT SEES MORE THAN 15 TOOLS │
│ Split into specialists. Use routing. │
│ │
│ 2. EVERY AGENT HAS CONVERSATION MANAGEMENT │
│ Sliding window or summarization. Never unbounded. │
│ │
│ 3. TOOL RESULTS SHOULD BE < 2K TOKENS │
│ Truncate or summarize large results. │
│ │
│ 4. EVERY GRAPH HAS TIMEOUT + MAX NODE LIMIT │
│ Strands: execution_timeout + max_node_executions │
│ AutoGen: termination_condition + max_turns │
│ CrewAI: max_iterations │
│ │
│ 5. EVERY SWARM HAS MAX_HANDOFFS + LOOP DETECTION │
│ Strands: max_handoffs + repetitive_handoff_detection_window │
│ AutoGen: max_turns + termination_condition │
│ │
│ 6. USE STRUCTURED OUTPUT FOR AGENT-TO-AGENT DATA │
│ Pydantic models, not free-text. │
│ │
│ 7. USE CHEAP MODELS FOR INFRASTRUCTURE │
│ Routing, summarization, classification → GPT-4o-mini / Nova │
│ Actual expert work → GPT-4o / Claude │
│ │
│ 8. HIERARCHY BEATS FLAT NETWORKS — ALWAYS │
│ 5 teams of 20 > 100 agents in a flat swarm. │
│ Each team: self-contained, own context, own safety limits. │
│ │
└─────────────────────────────────────────────────────────────────────┘
Quick Reference: Which Framework for What
| “I need...” | Use This |
|---|---|
| Complex stateful workflows with checkpointing | LangGraph |
| Quick team setup with built-in evaluation | CrewAI |
| Enterprise distributed system | AutoGen |
| AWS-native production system | Strands |
| Full-stack Python with UI + DB + vectors | Agno |
| Prompt optimization at scale | DSPy |
| Research/prototyping multi-agent | AgentScope |
| TypeScript/JavaScript agents | Mastra |
| Observability for any framework | Langfuse |
| Cross-framework agent communication | A2A Protocol |
Generated from source code analysis of 8 open-source frameworks. Last updated: March 7, 2026.
More recent articles
- OpenUSD: Advanced Patterns and Common Gotchas. - 28th March 2026
- OpenUSD Mastery: From Composition to Pipeline — A SO-101 Arm Journey - 25th March 2026
- Learning OpenUSD — From Curious Questions to Real Understanding - 19th March 2026