Introduction: From Prototype to Production
In the previous articles we explored individual frameworks for building AI agents: LangChain and LangGraph for graph-based workflows, CrewAI for structured teams, and AutoGen for multi-agent conversation. Now we face the most complex challenge: how to orchestrate multi-agent systems in production, where reliability, scalability, and observability are not optional but fundamental requirements.
Moving from a prototype with two or three agents to a production-ready system with dozens of coordinated agents requires a significant architectural leap. Orchestration patterns are now mature enough to be standardized, and choosing the right architecture determines the success or failure of the entire project. A common mistake is underestimating the complexity: according to recent studies, a naive approach ("bag of agents") can produce an error escalation of 17 times, where each agent amplifies the problems of others instead of solving them.
In this article we will analyze the 5 standard orchestration patterns, the three main architectures (Hub-and-Spoke, Peer-to-Peer, Message Queue), distributed state management strategies, fault tolerance, and monitoring. The goal is to provide a practical guide for designing robust and maintainable multi-agent systems.
What You Will Learn in This Article
- The 5 standard orchestration patterns: Sequential, Concurrent, Group Chat, Handoff, Plan-First
- Hub-and-Spoke Architecture: when and how to use a central coordinator
- Peer-to-Peer Architecture: direct communication between agents
- Message Queue Architecture: decoupling and resilience with RabbitMQ, Kafka, Pulsar
- Distributed state management: event sourcing, CQRS, distributed transactions
- Fault tolerance: circuit breakers, timeouts, retry policies, graceful degradation
- Monitoring and observability: distributed tracing, metrics, and specialized dashboards
- Decision matrix: how to choose the right architecture for your use case
The 5 Standard Orchestration Patterns
Multi-agent orchestration is built on five fundamental patterns, each optimized for a specific type of workload. In complex systems, these patterns combine: a workflow may use the Sequential pattern for the main flow, Concurrent to parallelize sub-tasks, and Handoff to delegate specialized decisions.
The 5 Orchestration Patterns
| Pattern | Flow | Pros | Cons | Use Case |
|---|---|---|---|---|
| Sequential | A → B → C | Simple, predictable | Slow, no parallelism | ETL pipeline, content workflow |
| Concurrent | A, B, C in parallel | Fast, efficient | Synchronization complexity | Multi-source analysis, search |
| Group Chat | Collaborative thread | Flexible, emergent | Unpredictable, expensive | Brainstorming, iterative review |
| Handoff | Dynamic delegation | Adaptive, specialized | Complex routing | Customer support, triage |
| Plan-First | Plan → Execute | Strategic, optimized | Initial overhead | Complex tasks, system design |
Pattern 1: Sequential
In the Sequential pattern, agents operate in a linear chain. The output of Agent A becomes the input of Agent B, whose output becomes the input of Agent C. Each agent enriches, transforms, or validates the predecessor's result. It is the simplest pattern to implement and debug.
Think of it as a manufacturing assembly line: each station performs a specific operation and passes the workpiece to the next. The pipeline architecture ensures that each stage focuses on a single responsibility, making individual agents simpler and more reliable. However, the total execution time is the sum of all agent processing times, with no opportunity for parallelism.
# Sequential Pattern: Content Creation Pipeline
from typing import List, Dict
class SequentialPipeline:
def __init__(self, agents: List):
self.agents = agents
async def execute(self, initial_input: str) -> Dict:
current_output = initial_input
results = []
for agent in self.agents:
result = await agent.process(current_output)
results.append({
"agent": agent.name,
"input": current_output,
"output": result
})
current_output = result
return {
"final_output": current_output,
"pipeline_trace": results
}
# Usage: Researcher -> Writer -> Editor -> Publisher
pipeline = SequentialPipeline([
ResearchAgent("Researcher"),
WriterAgent("Writer"),
EditorAgent("Editor"),
PublisherAgent("Publisher")
])
result = await pipeline.execute(
"Write an article on Kubernetes best practices"
)
The main advantage of the Sequential pattern is complete traceability: for each phase you know exactly the input, output, and the responsible agent. The primary disadvantage is that total time is the sum of all agent times, with no possibility of parallelism. This pattern works best for workflows where each step genuinely depends on the previous step's output.
When to Use Sequential
- Content creation pipelines (research, write, edit, publish)
- ETL data transformation workflows
- Document processing chains (extract, classify, summarize, store)
- Compliance checks where each validation depends on the previous one
- Any workflow where order matters and steps are inherently dependent
Pattern 2: Concurrent (Fan-Out / Fan-In)
In the Concurrent pattern, multiple agents work in parallel on independent tasks. The results are then aggregated by a dedicated component. This pattern is ideal when sub-tasks have no dependencies between them and speed is critical. It follows the classic fan-out / fan-in architecture: a dispatcher fans out tasks to multiple agents, and an aggregator fans in the results.
import asyncio
from typing import List, Dict
class ConcurrentOrchestrator:
def __init__(self, agents: List, aggregator):
self.agents = agents
self.aggregator = aggregator
async def execute(self, task: str) -> Dict:
# Parallel execution of all agents
tasks = [agent.process(task) for agent in self.agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Error filtering and aggregation
successful = []
errors = []
for agent, result in zip(self.agents, results):
if isinstance(result, Exception):
errors.append({"agent": agent.name, "error": str(result)})
else:
successful.append({"agent": agent.name, "result": result})
# Result aggregation
final = await self.aggregator.aggregate(successful)
return {
"final_output": final,
"successful_agents": len(successful),
"failed_agents": len(errors),
"errors": errors
}
# Usage: 3 agents analyze the same topic from different angles
orchestrator = ConcurrentOrchestrator(
agents=[
TechnicalAnalyst("TechAnalyst"),
MarketAnalyst("MarketAnalyst"),
RiskAnalyst("RiskAnalyst")
],
aggregator=SynthesisAgent("Synthesizer")
)
result = await orchestrator.execute(
"Evaluate the adoption of Rust in enterprise backend systems"
)
The Concurrent pattern dramatically reduces wall-clock time. If three agents each take 10 seconds, sequential execution takes 30 seconds while concurrent execution takes only 10 seconds plus aggregation overhead. The trade-off is increased complexity in error handling: what happens when one agent fails but others succeed? The implementation above handles this gracefully by separating successful results from errors and producing a partial result when possible.
Pattern 3: Group Chat
The Group Chat pattern creates a collaborative thread where agents converse freely, as we saw with AutoGen. A moderator (either LLM-based or rule-based) selects which agent speaks next based on the conversation context. It is the most flexible pattern but also the least predictable. The quality of the result depends heavily on role definition and termination criteria.
Group Chat excels in scenarios requiring iterative refinement, where agents build on each other's contributions. For instance, in a code review system, a Coder writes code, a Reviewer critiques it, and a Tester verifies it. The conversation continues until the Reviewer approves the code. The emergent behavior of this pattern can produce surprisingly creative solutions, but it can also lead to infinite loops or topic drift if not properly constrained.
from langgraph.graph import StateGraph, MessagesState
from langchain_openai import ChatOpenAI
# Group Chat with LangGraph as moderator
class GroupChatOrchestrator:
def __init__(self, agents: Dict[str, 'Agent'], moderator_llm):
self.agents = agents
self.moderator = moderator_llm
self.history = []
async def run(self, initial_message: str, max_rounds: int = 15):
self.history.append({"role": "user", "content": initial_message})
for round_num in range(max_rounds):
# Moderator selects the next speaker
next_speaker = await self._select_speaker()
if next_speaker == "TERMINATE":
break
agent = self.agents[next_speaker]
response = await agent.respond(self.history)
self.history.append({
"role": next_speaker,
"content": response,
"round": round_num
})
# Check termination conditions
if self._should_terminate(response):
break
return {
"history": self.history,
"rounds": round_num + 1,
"final_response": self.history[-1]["content"]
}
async def _select_speaker(self) -> str:
"""LLM-based speaker selection"""
agent_descriptions = "\n".join([
f"- {name}: {agent.description}"
for name, agent in self.agents.items()
])
prompt = f"""Given this conversation and these agents:
{agent_descriptions}
Who should speak next? Reply with just the agent name,
or TERMINATE if the task is complete."""
return await self.moderator.predict(prompt, self.history)
def _should_terminate(self, response: str) -> bool:
return "APPROVED" in response or "TASK_COMPLETE" in response
Group Chat Pitfalls
Group Chat is the most expensive and unpredictable pattern. Every round consumes tokens for
all participating agents plus the moderator. A 15-round conversation with 4 agents and a
moderator produces at least 75 LLM calls. Always set max_rounds and implement
cost tracking. Watch for conversation loops where agents repeat the same
arguments without making progress.
Pattern 4: Handoff (OpenAI Swarm Style)
In the Handoff pattern, a routing agent analyzes the task and delegates it to the most appropriate specialist agent. It is the dominant pattern in AI customer support systems, where a triage agent classifies the request and routes it to the right department. This pattern was popularized by OpenAI's Swarm framework, where agents explicitly transfer control to one another based on expertise boundaries.
The key insight of Handoff is that agents know their own limitations. When a billing agent encounters a technical issue, it does not attempt to solve it. Instead, it transfers control (along with relevant context) to the technical support agent. This produces more accurate results because each query is handled by the most qualified specialist.
class HandoffRouter:
def __init__(self, specialists: Dict[str, 'Agent']):
self.specialists = specialists
self.router_llm = RouterLLM()
async def route(self, task: str) -> Dict:
# The LLM determines which specialist should handle the task
classification = await self.router_llm.classify(task)
specialist_name = classification["specialist"]
confidence = classification["confidence"]
if confidence < 0.7:
# Low confidence: escalate to a human
return {
"status": "escalated",
"reason": "Low confidence routing",
"confidence": confidence
}
specialist = self.specialists.get(specialist_name)
if not specialist:
return {"status": "error", "reason": f"No specialist: {specialist_name}"}
result = await specialist.handle(task)
return {
"status": "completed",
"specialist": specialist_name,
"confidence": confidence,
"result": result
}
# Configuration for customer support
router = HandoffRouter({
"billing": BillingAgent("BillingSpecialist"),
"technical": TechnicalAgent("TechSupport"),
"sales": SalesAgent("SalesSpecialist"),
"general": GeneralAgent("GeneralSupport")
})
# Multi-hop handoff: agent A -> agent B -> agent C
class ChainableAgent:
def __init__(self, name: str, can_handoff_to: List[str]):
self.name = name
self.can_handoff_to = can_handoff_to
async def handle(self, task: str, context: Dict) -> Dict:
result = await self.process(task, context)
if result.get("needs_handoff"):
target = result["handoff_target"]
if target in self.can_handoff_to:
return {
"status": "handoff",
"target": target,
"context": {**context, **result.get("handoff_context", {})}
}
return {"status": "completed", "result": result}
Pattern 5: Plan-First
In the Plan-First pattern, a planner agent analyzes the complex task, decomposes it into sub-tasks, defines dependencies and execution order, and then an orchestrator executes the plan. This pattern is essential for complex tasks that require a strategy before action. The planner produces a Directed Acyclic Graph (DAG) of tasks, which the executor processes respecting dependency constraints.
Plan-First is particularly effective for open-ended tasks where the solution path is not known in advance. A user might ask "Build a web scraper for real estate listings" and the planner would decompose this into: (1) research target website structure, (2) design data schema, (3) implement scraper logic, (4) add error handling, (5) write tests, (6) create documentation. Steps 1 and 2 can run concurrently, but steps 3-6 depend on their predecessors.
class PlanFirstOrchestrator:
def __init__(self, planner, executor_pool: Dict[str, 'Agent']):
self.planner = planner
self.executor_pool = executor_pool
async def execute(self, complex_task: str) -> Dict:
# Phase 1: Planning
plan = await self.planner.create_plan(complex_task)
# plan = [
# {"step": 1, "agent": "researcher", "task": "...", "deps": []},
# {"step": 2, "agent": "analyst", "task": "...", "deps": [1]},
# {"step": 3, "agent": "writer", "task": "...", "deps": [1, 2]}
# ]
# Phase 2: Execution respecting dependencies
completed = {}
for step in self._topological_sort(plan):
# Wait for dependencies
dep_results = {
d: completed[d] for d in step["deps"]
}
agent = self.executor_pool[step["agent"]]
result = await agent.process(
task=step["task"],
context=dep_results
)
completed[step["step"]] = result
return {"plan": plan, "results": completed}
def _topological_sort(self, plan):
"""Sort steps respecting dependencies (Kahn's algorithm)"""
in_degree = {s["step"]: len(s["deps"]) for s in plan}
queue = [s for s in plan if in_degree[s["step"]] == 0]
sorted_steps = []
while queue:
step = queue.pop(0)
sorted_steps.append(step)
for s in plan:
if step["step"] in s["deps"]:
in_degree[s["step"]] -= 1
if in_degree[s["step"]] == 0:
queue.append(s)
return sorted_steps
Plan-First with Dynamic Replanning
A more advanced variant of Plan-First introduces dynamic replanning. After each step completes, the planner re-evaluates the remaining plan based on intermediate results. If a research step reveals unexpected complexity, the planner can add new steps or reassign tasks to different specialist agents. This makes the system adaptive while retaining the strategic advantage of planning ahead.
Hub-and-Spoke Architecture
The Hub-and-Spoke architecture features a central coordinator (the hub) that directs a group of worker agents (the spokes). Every communication passes through the hub: agents do not communicate directly with each other. This is the most common architectural model in production multi-agent systems because it balances simplicity with control.
Hub-and-Spoke Architecture:
+------------------+
| ORCHESTRATOR |
| (Hub) |
+--------+---------+
|
+---------+-------+-------+---------+
| | | |
+-----v----+ +--v------+ +----v-----+ +-v--------+
| Agent A | | Agent B | | Agent C | | Agent D |
| (Search) | | (Analyze| | (Write) | | (Review) |
+----------+ +---------+ +----------+ +----------+
(Spoke) (Spoke) (Spoke) (Spoke)
Advantages of Hub-and-Spoke
- Centralized control: the hub has full visibility over the state of all agents and can make informed decisions about routing and priorities.
- Simple monitoring: all communications pass through a single point, making logging, tracing, and auditing trivial.
- Centralized error handling: the hub can implement retry, fallback, and circuit breaker logic consistently for all agents.
- Easy evolution: adding or removing worker agents is straightforward; just register them with the hub without modifying other agents.
Disadvantages of Hub-and-Spoke
- Single Point of Failure: if the hub fails, the entire system stops. This requires high availability (HA) for the hub, which increases complexity.
- Bottleneck: the hub becomes a bottleneck when the number of agents or the frequency of communications increases significantly.
- Limited scalability: scaling beyond a certain number of agents requires hub sharding or migration to a different architecture.
- Additional latency: every inter-agent communication adds a hop through the hub, increasing end-to-end latency.
class HubOrchestrator:
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.task_queue: asyncio.Queue = asyncio.Queue()
self.results: Dict[str, any] = {}
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
def register_agent(self, name: str, agent: 'Agent'):
self.agents[name] = agent
self.circuit_breakers[name] = CircuitBreaker(
failure_threshold=3,
recovery_timeout=60
)
async def dispatch(self, agent_name: str, task: Dict) -> Dict:
cb = self.circuit_breakers[agent_name]
if cb.is_open():
return {"status": "circuit_open", "agent": agent_name}
try:
result = await asyncio.wait_for(
self.agents[agent_name].process(task),
timeout=30.0
)
cb.record_success()
return result
except asyncio.TimeoutError:
cb.record_failure()
return {"status": "timeout", "agent": agent_name}
except Exception as e:
cb.record_failure()
return {"status": "error", "agent": agent_name, "error": str(e)}
async def run_workflow(self, workflow: List[Dict]) -> Dict:
"""Execute a workflow defined as a list of steps"""
results = {}
for step in workflow:
agent_name = step["agent"]
task = {**step["task"], "context": results}
result = await self.dispatch(agent_name, task)
results[step["id"]] = result
if result.get("status") in ("circuit_open", "timeout", "error"):
# Attempt fallback if available
if step.get("fallback_agent"):
result = await self.dispatch(step["fallback_agent"], task)
results[step["id"]] = result
return results
Peer-to-Peer Architecture
In the Peer-to-Peer (P2P) architecture, agents communicate directly with each other without a central coordinator. Every agent is autonomous and can discover and interact with any other agent in the system. This model draws inspiration from classic distributed systems such as gossip protocols and consensus algorithms.
Peer-to-Peer Architecture:
+----------+ direct messages +----------+
| Agent A | <---------------------> | Agent B |
+-----+----+ +----+-----+
| |
| +----------+ |
+-------->| Agent C |<-------------+
| +----+-----+ |
| | |
| +----v-----+ |
+-------->| Agent D |<-------------+
+----------+
Advantages of Peer-to-Peer
- No Single Point of Failure: if one agent fails, others continue functioning and can redistribute the workload.
- Horizontal scalability: adding new agents requires no architectural changes; simply connect them to the network.
- Low latency: direct communication between agents eliminates the hop through a central coordinator.
- Natural resilience: the system automatically adapts to failures, re-routing communications through functioning agents.
Disadvantages of Peer-to-Peer
- High complexity: discovery, routing, and consensus logic is distributed across every agent, making the system harder to develop and maintain.
- Consistency challenges: maintaining coherent state across autonomous agents is a fundamental distributed systems problem (CAP theorem).
- Complex debugging: without a central observation point, tracing communication flows and diagnosing problems is significantly harder.
- Communication overhead: every agent must maintain connections with many other agents, increasing network resource consumption.
Message Queue Architecture
The Message Queue architecture introduces a message broker (such as RabbitMQ, Apache Kafka, or Apache Pulsar) as an intermediary between agents. Agents publish messages to specific topics or queues, and other agents subscribe to receive them. This model implements an event-driven architecture that offers superior decoupling, resilience, and scalability.
Message Queue Architecture:
+----------+ +----------+
| Agent A |---publish---> +----------------+ -->| Agent C |
+----------+ | | +----------+
| MESSAGE BROKER |
+----------+ | (Kafka/RabbitMQ| +----------+
| Agent B |---publish---> | /Pulsar) | -->| Agent D |
+----------+ +----------------+ +----------+
|
+------v-------+
| Dead Letter |
| Queue (DLQ) |
+--------------+
Topics / Queues:
- tasks.research (Agent A publishes, Agent C consumes)
- tasks.analysis (Agent A publishes, Agent D consumes)
- results.research (Agent C publishes, Agent B consumes)
- errors.global (all publish, monitoring consumes)
Advantages of Message Queue
- Total decoupling: agents do not need to know each other directly. They communicate through topics, allowing agents to be added or removed with no impact on the rest of the system.
- Native resilience: messages persist in the queue even if the destination agent is temporarily offline. When it comes back, it processes the accumulated messages.
- Independent scalability: each agent type can be scaled independently. If analysis is the bottleneck, add more analysis agent instances.
- Replay and audit: with Kafka, the entire message history is available for replay, debugging, and compliance auditing.
Disadvantages of Message Queue
- Operational complexity: managing a Kafka or RabbitMQ cluster in production requires significant DevOps expertise and dedicated infrastructure.
- Additional latency: message serialization, persistence, and deserialization introduce latency compared to direct communication.
- Eventual consistency: the system is inherently eventually consistent, which can be problematic for workflows requiring strong consistency.
- Infrastructure cost: the message broker is an additional component that requires resources, monitoring, and maintenance.
import asyncio
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
import json
class KafkaAgentBus:
def __init__(self, bootstrap_servers: str):
self.servers = bootstrap_servers
self.producer = None
self.consumers = {}
async def start(self):
self.producer = AIOKafkaProducer(
bootstrap_servers=self.servers,
value_serializer=lambda v: json.dumps(v).encode()
)
await self.producer.start()
async def publish(self, topic: str, message: Dict):
await self.producer.send_and_wait(topic, message)
async def subscribe(self, topic: str, handler):
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=self.servers,
value_deserializer=lambda v: json.loads(v.decode()),
group_id=f"agent-group-{topic}"
)
await consumer.start()
async for msg in consumer:
try:
await handler(msg.value)
except Exception as e:
# Publish to Dead Letter Queue
await self.publish("errors.dlq", {
"original_topic": topic,
"message": msg.value,
"error": str(e)
})
# Usage
bus = KafkaAgentBus("localhost:9092")
await bus.start()
# Agent A publishes a research task
await bus.publish("tasks.research", {
"task_id": "t-001",
"query": "Microservices best practices 2026",
"priority": "high"
})
# Agent C consumes and processes
await bus.subscribe("tasks.research", research_agent.handle)
Distributed State Management
In a multi-agent system, state is the most delicate problem to manage. Each agent might have a partial or outdated view of the global state, leading to inconsistent decisions. There are three main approaches for managing state in a distributed fashion.
Event Sourcing
With Event Sourcing, state is not stored as a current snapshot but as an immutable sequence of events. The current state is obtained by replaying all events from the beginning. This approach provides a complete audit trail and the ability to reconstruct state at any point in time. It is particularly valuable for multi-agent systems because it allows you to understand exactly how the system arrived at its current state.
from datetime import datetime
from typing import List, Dict
class EventStore:
def __init__(self):
self.events: List[Dict] = []
def append(self, event: Dict):
event["timestamp"] = datetime.utcnow().isoformat()
event["sequence"] = len(self.events)
self.events.append(event)
def get_state(self, entity_id: str) -> Dict:
"""Reconstruct state by applying all events"""
state = {}
for event in self.events:
if event.get("entity_id") == entity_id:
state = self._apply_event(state, event)
return state
def _apply_event(self, state: Dict, event: Dict) -> Dict:
event_type = event["type"]
if event_type == "TaskCreated":
state["status"] = "created"
state["data"] = event["data"]
elif event_type == "TaskAssigned":
state["assigned_to"] = event["agent"]
state["status"] = "assigned"
elif event_type == "TaskCompleted":
state["status"] = "completed"
state["result"] = event["result"]
return state
def get_state_at(self, entity_id: str, point_in_time: str) -> Dict:
"""Reconstruct state at a specific point in time"""
state = {}
for event in self.events:
if event.get("entity_id") == entity_id:
if event["timestamp"] <= point_in_time:
state = self._apply_event(state, event)
return state
CQRS (Command Query Responsibility Segregation)
CQRS separates write operations (commands) from read operations (queries). Commands modify state through the event store, while queries read from optimized projections. This allows reads and writes to be scaled independently and provides read models optimized for each specific type of query.
- Command side: validates commands, generates events, maintains consistency
- Query side: reads materialized projections optimized for specific queries
- Projections: denormalized views asynchronously updated from events
Checkpointing and Recovery
Checkpointing is the practice of periodically saving the complete state of a multi-agent workflow so that it can be resumed from the last checkpoint in case of failure. This is critical for long-running workflows that might take minutes or hours to complete. Without checkpointing, a failure at step 9 of 10 would require re-executing all previous steps from scratch.
import json
import hashlib
from pathlib import Path
class CheckpointManager:
def __init__(self, checkpoint_dir: str = "./checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
def save(self, workflow_id: str, step: int, state: Dict):
"""Save a checkpoint for a specific workflow step"""
checkpoint = {
"workflow_id": workflow_id,
"step": step,
"state": state,
"timestamp": datetime.utcnow().isoformat(),
"checksum": self._compute_checksum(state)
}
path = self.checkpoint_dir / f"{workflow_id}_step_{step}.json"
path.write_text(json.dumps(checkpoint, indent=2))
def load_latest(self, workflow_id: str) -> Dict:
"""Load the most recent valid checkpoint"""
checkpoints = sorted(
self.checkpoint_dir.glob(f"{workflow_id}_step_*.json"),
key=lambda p: int(p.stem.split("_step_")[1]),
reverse=True
)
for cp_path in checkpoints:
cp = json.loads(cp_path.read_text())
if self._verify_checksum(cp):
return cp
return None
def _compute_checksum(self, state: Dict) -> str:
return hashlib.sha256(
json.dumps(state, sort_keys=True).encode()
).hexdigest()
def _verify_checksum(self, checkpoint: Dict) -> bool:
expected = checkpoint.get("checksum")
actual = self._compute_checksum(checkpoint["state"])
return expected == actual
Distributed Transactions: Saga Pattern
When an operation involves multiple agents and must be atomic, the Saga Pattern manages the distributed transaction as a sequence of local transactions, each with its own compensating action in case of failure. If step 3 fails, the saga rolls back steps 2 and 1 by executing their compensating actions in reverse order.
class SagaOrchestrator:
def __init__(self):
self.steps: List[SagaStep] = []
def add_step(self, execute_fn, compensate_fn, name: str):
self.steps.append(SagaStep(execute_fn, compensate_fn, name))
async def execute(self) -> Dict:
completed = []
try:
for step in self.steps:
result = await step.execute()
completed.append((step, result))
return {"status": "success", "steps": len(completed)}
except Exception as e:
# Compensate in reverse order
for step, _ in reversed(completed):
try:
await step.compensate()
except Exception as comp_error:
# Log compensation error, do not propagate
logger.error(f"Compensation failed: {comp_error}")
return {"status": "rolled_back", "error": str(e)}
# Usage: multi-agent transaction
saga = SagaOrchestrator()
saga.add_step(
execute_fn=lambda: research_agent.analyze(data),
compensate_fn=lambda: research_agent.cleanup(),
name="research"
)
saga.add_step(
execute_fn=lambda: writer_agent.generate(analysis),
compensate_fn=lambda: writer_agent.discard_draft(),
name="writing"
)
saga.add_step(
execute_fn=lambda: publisher_agent.publish(article),
compensate_fn=lambda: publisher_agent.unpublish(),
name="publishing"
)
Fault Tolerance
In a production multi-agent system, failures are inevitable. LLMs responding with errors, network timeouts, agents producing malformed output, costs exceeding the budget. A robust system must handle all these scenarios gracefully, without propagating cascading errors. The key principle is design for failure.
Circuit Breaker Pattern
The Circuit Breaker monitors an agent's failures and, after a configurable number of consecutive errors, "opens the circuit" preventing further calls for a cooldown period. This prevents the cascade effect where a failed agent causes the slowdown or failure of the entire system. The circuit breaker has three states: Closed (normal operation), Open (rejecting calls), and Half-Open (testing with a single call to see if the agent has recovered).
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Circuit open, rejecting calls
HALF_OPEN = "half_open" # Testing with a single call
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = timedelta(seconds=recovery_timeout)
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.success_count = 0
def is_open(self) -> bool:
if self.state == CircuitState.OPEN:
if datetime.utcnow() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return False
return True
return False
def record_success(self):
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count += 1
def record_failure(self):
self.failure_count += 1
self.last_failure_time = datetime.utcnow()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Retry Policies with Exponential Backoff
Retry policies define how and when to retry a failed operation. Exponential backoff prevents overloading the service with overly frequent attempts, progressively increasing the interval between retries. Adding jitter (random variation) prevents the "thundering herd" problem where multiple clients retry at exactly the same time.
import random
async def retry_with_backoff(
fn,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True
):
for attempt in range(max_retries + 1):
try:
return await fn()
except Exception as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
if jitter:
delay *= (0.5 + random.random())
logger.warning(
f"Attempt {attempt + 1}/{max_retries + 1} failed: {e}. "
f"Retrying in {delay:.1f}s"
)
await asyncio.sleep(delay)
# Usage with a fallback agent
async def resilient_dispatch(primary_agent, fallback_agent, task):
try:
return await retry_with_backoff(
lambda: primary_agent.process(task),
max_retries=3
)
except Exception:
logger.warning("Primary agent exhausted retries, using fallback")
return await fallback_agent.process(task)
Graceful Degradation
Graceful degradation ensures the system always produces a useful result, even when some components fail. Instead of returning a generic error, the system degrades output quality while maintaining functionality.
- Fallback agents: if the primary agent fails, a simpler backup agent (e.g., rule-based) takes its place
- Partial results: if 3 out of 5 agents complete the task, the system returns the partial result with a disclaimer
- Result caching: previous similar responses are returned as a fallback
- Default values: for non-critical fields, predefined values are used when the responsible agent does not respond
Production Failure Modes to Prepare For
- LLM rate limiting: API providers throttle requests; implement queuing and backpressure
- Token budget exhaustion: agents consuming more tokens than expected; implement per-agent token limits
- Infinite loops: agents bouncing tasks back and forth; implement max iteration counts and loop detection
- Poison messages: inputs that consistently cause agent failures; route to Dead Letter Queue after N retries
- Cascading failures: one agent's failure causing others to fail; use circuit breakers and bulkheads
Monitoring and Observability
A multi-agent system without monitoring is like driving blind. Observability is the ability to understand the internal state of the system by observing its outputs. For multi-agent systems, three pillars are required: logging, metrics, and distributed tracing.
Distributed Tracing
Distributed tracing traces a request through all the agents that process it, creating an end-to-end visualization of the flow. Tools like Jaeger or Zipkin allow you to see where latency accumulates and where errors occur. Each agent creates a "span" within a larger trace, allowing you to pinpoint exactly which agent caused a delay or failure.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
tracer = trace.get_tracer("multi-agent-system")
class TracedAgent:
def __init__(self, agent, name: str):
self.agent = agent
self.name = name
async def process(self, task: Dict) -> Dict:
with tracer.start_as_current_span(
f"agent.{self.name}.process",
attributes={
"agent.name": self.name,
"agent.task_type": task.get("type", "unknown"),
"agent.task_id": task.get("id", "")
}
) as span:
try:
result = await self.agent.process(task)
span.set_attribute("agent.status", "success")
span.set_attribute("agent.tokens_used",
result.get("tokens", 0))
return result
except Exception as e:
span.set_status(trace.StatusCode.ERROR, str(e))
span.record_exception(e)
raise
Key Metrics for Multi-Agent Systems
Beyond standard infrastructure metrics (CPU, memory, network), multi-agent systems require specific metrics to monitor system health and performance.
Essential Metrics
| Metric | Description | Target | Alert |
|---|---|---|---|
| Agent Latency (p95) | 95th percentile response time | < 5s per agent | > 15s |
| Error Rate | % of failed tasks per agent | < 2% | > 10% |
| Token Cost / Task | Average token cost per task | Budget-dependent | > 2x average |
| Loop Detection Rate | % of conversations with loops | < 1% | > 5% |
| Handoff Success Rate | % of successful delegations | > 95% | < 85% |
| End-to-End Latency | Total time from input to result | < 30s | > 60s |
| Circuit Breaker Opens | Number of circuit breaker openings / hour | 0 | > 3 / hour |
Specialized Monitoring Platforms
For monitoring multi-agent systems, generic tools like Grafana and Prometheus are combined with specialized platforms:
- LangSmith: LangChain's platform for tracing, debugging, and evaluating LLM applications. It offers call chain visualization, cost analysis, and A/B testing.
- Langfuse: an open source alternative to LangSmith focused on observability and analytics for AI applications.
- Weights & Biases: for experiment tracking, prompt versioning, and performance monitoring over time.
- Prometheus + Grafana: the classic stack for infrastructure and custom metrics, with configurable alerting.
- Jaeger / Zipkin: for end-to-end distributed tracing across the various agents in the system.
Choosing the Right Pattern: Decision Matrix
The choice of architecture depends on multiple factors. There is no universal solution: every project has different constraints and priorities. The following tables provide a practical guide for making the right decision.
Pattern Selection by Task Type
| Task Type | Recommended Pattern | Why |
|---|---|---|
| Linear data processing | Sequential | Steps depend on each other, order matters |
| Multi-source research | Concurrent | Independent queries can run in parallel |
| Creative brainstorming | Group Chat | Emergent ideas from free discussion |
| Customer support routing | Handoff | Specialist selection based on query type |
| Complex project planning | Plan-First | Requires decomposition before execution |
| Content creation pipeline | Sequential + Concurrent | Sequential main flow with parallel sub-tasks |
Architecture Decision Matrix
| Factor | Hub-and-Spoke | Peer-to-Peer | Message Queue |
|---|---|---|---|
| Team Size | 2-10 agents | 5-50 agents | 10-100+ agents |
| Required Latency | Medium (< 10s) | Low (< 5s) | Tolerant (< 30s) |
| Fault Tolerance | Medium (HA hub) | High (native) | Very high (broker) |
| Operational Complexity | Low | High | Medium-High |
| Infrastructure Cost | Low | Medium | High |
| Debugging | Easy | Difficult | Medium |
| Scalability | Limited | Good | Excellent |
| Ideal Use Case | MVP, small teams | Distributed systems | Enterprise, high reliability |
Practical Advice
For most projects, start with Hub-and-Spoke. It is simple, easy to debug, and sufficient for systems with fewer than 10 agents. When scalability becomes a problem, migrate to Message Queue. The Peer-to-Peer architecture is rarely necessary for AI multi-agent systems and should only be considered for specific use cases where latency is the critical factor and the number of agents is very large. Remember: premature optimization of architecture is just as dangerous as premature code optimization. Start simple, measure, and evolve based on real data.
Conclusions
Multi-agent orchestration is a discipline that requires expertise in both software architecture and AI. The patterns we analyzed (Sequential, Concurrent, Group Chat, Handoff, Plan-First) are fundamental building blocks that combine to construct complex systems. The three architectures (Hub-and-Spoke, Peer-to-Peer, Message Queue) offer different trade-offs between simplicity, scalability, and resilience.
The key takeaway is: do not underestimate the complexity. A production multi-agent system is not simply "more agents collaborating." It is a distributed system with all the classic problems of distributed systems (consistency, fault tolerance, observability), amplified by the non-determinism of LLMs. Investing in fault tolerance, monitoring, and state management from the start is not over-engineering: it is survival.
In the next article we will explore memory for AI agents: how to equip agents with short-term and long-term memory, retrieval patterns (RAG), embeddings and vector databases, and how memory influences the reasoning and planning capabilities of agents.







