Sistemi Multi-Agent: Orchestrazione e Comunicazione
I sistemi multi-agent sono uno dei trend più esplosivi nell'AI del 2025. Gartner ha registrato un aumento del 1.445% nelle query su "multi-agent systems" tra il Q1 2024 e il Q2 2025. La ragione è semplice: molti problemi reali sono troppo complessi per un singolo agente AI. Un agente di ricerca che deve raccogliere dati, analizzarli, scrivere un report e inviarlo via email non è un problema singolo: sono quattro problemi specializzati che richiedono competenze diverse.
In questo articolo costruiamo sistemi multi-agent reali: dall'architettura di base (orchestratore + agenti specializzati) ai pattern avanzati come ReAct (Reasoning + Acting), Plan-and-Execute, Supervisor-Worker e LangGraph per orchestrazione stateful. Ogni sezione include codice Python funzionante e pattern testati su problemi reali.
Cosa Imparerai
- Architettura e pattern dei sistemi multi-agent
- ReAct pattern: Reasoning + Acting con tool calling
- Plan-and-Execute: pianificazione separata dall'esecuzione
- Supervisor pattern: orchestratore che coordina agenti specializzati
- LangGraph per orchestrazione stateful con grafo di controllo
- Comunicazione e coordinazione tra agenti
- Gestione degli errori e fallback in sistemi multi-agent
- Monitoring e observability per pipeline multi-agent
1. Architettura dei Sistemi Multi-Agent
Un sistema multi-agent è composto da più agenti (LLM + set di tool) che collaborano per raggiungere un obiettivo comune. La chiave del successo è la separazione delle responsabilità: ogni agente è specializzato in un dominio specifico e sa quando delegare ad altri agenti.
TOPOLOGIE MULTI-AGENT:
1. NETWORK (fully connected):
Ogni agente può chiamare qualsiasi altro agente.
Pro: massima flessibilità
Con: difficile da controllare, rischio di loop
A ◄──► B
▲ ▲
└──► C ◄┘
2. SUPERVISOR (star topology):
Un agente centrale orchestra tutti gli altri.
Pro: controllo centralizzato, facile da debuggare
Con: single point of failure, bottleneck
SUPERVISOR
├──► Agent A (ricerca)
├──► Agent B (analisi)
└──► Agent C (report)
3. HIERARCHICAL:
Supervisori multipli organizzati in gerarchia.
Pro: scalabilità, separazione chiara delle responsabilità
Con: latenza aumentata, complessità coordinazione
Manager
├──► Sub-manager A
│ ├──► Worker A1
│ └──► Worker A2
└──► Sub-manager B
├──► Worker B1
└──► Worker B2
4. PIPELINE (sequential):
Ogni agente processa l'output del precedente.
Pro: semplice, deterministico, facile da debuggare
Con: rigido, nessuna retroazione
Input ──► A ──► B ──► C ──► Output
SCELTA DELLA TOPOLOGIA:
- Task ben definiti, ordine chiaro → Pipeline
- Task con routing dinamico → Supervisor
- Problemi complessi con sottoproblemi → Hierarchical
- Ricerca esplorativa → Network (con guardrail)
2. ReAct Pattern: Reasoning + Acting
ReAct (Reasoning + Acting) è il pattern fondamentale per agenti AI con accesso a tool. L'agente alterna tra Thought (ragionamento su cosa fare), Action (esecuzione di un tool) e Observation (interpretazione del risultato), ripetendo il ciclo fino a raggiungere la risposta finale.
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain import hub
from typing import Optional
import requests
import json
# ============================================================
# DEFINIZIONE DEI TOOL
# ============================================================
@tool
def search_web(query: str) -> str:
"""Cerca informazioni sul web. Usa per trovare dati aggiornati o notizie."""
# Integrazione con Tavily, SerpAPI o simili
# Per semplicità, simuliamo una risposta
return f"Risultati ricerca per '{query}': [risultati simulati per demo]"
@tool
def analyze_data(data: str, analysis_type: str) -> str:
"""
Analizza dati numerici o testuali.
analysis_type può essere: 'summary', 'trend', 'anomaly', 'comparison'
"""
return f"Analisi {analysis_type} completata: [analisi simulata]"
@tool
def generate_report(title: str, sections: str) -> str:
"""
Genera un report strutturato.
sections: JSON con le sezioni del report
"""
return f"Report '{title}' generato con sezioni: {sections}"
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Invia un'email. Usa per comunicare risultati all'utente finale."""
return f"Email inviata a {to} con oggetto '{subject}'"
@tool
def query_database(query: str, database: str = "main") -> str:
"""
Esegue query su database interni.
database: 'main' per il DB principale, 'analytics' per il DW
"""
return f"Query su {database}: [risultati simulati]"
# ============================================================
# CREAZIONE AGENTE REACT
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Tool set disponibili all'agente
tools = [search_web, analyze_data, generate_report, send_email, query_database]
# Prompt ReAct standard (da LangChain Hub)
react_prompt = hub.pull("hwchase17/react")
# Il prompt include: instructions per pensiero/azione/osservazione
# con schema: "Thought: ...\nAction: tool_name\nAction Input: ...\nObservation: ..."
# Crea l'agente ReAct
agent = create_react_agent(
llm=llm,
tools=tools,
prompt=react_prompt
)
# AgentExecutor: gestisce il loop thought-action-observation
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True, # Mostra ogni step del reasoning
max_iterations=10, # Previeni loop infiniti
early_stopping_method="generate", # Genera risposta se troppi step
handle_parsing_errors=True # Gestisci errori di parsing gracefully
)
# Esempio: task complesso che richiede più step
task = """
Analizza le performance di vendita Q4 2024 dal database analytics,
cerca notizie recenti sul settore, genera un report con raccomandazioni
e invialo a manager@company.com
"""
result = agent_executor.invoke({"input": task})
print(f"\nRisposta finale:\n{result['output']}")
# Output tipico del reasoning ReAct:
# Thought: Devo prima ottenere i dati di vendita dal database
# Action: query_database
# Action Input: {"query": "SELECT * FROM sales WHERE quarter='Q4' AND year=2024"}
# Observation: [dati vendite]
# Thought: Ora cerco notizie recenti sul settore
# Action: search_web
# Action Input: "notizie settore vendite Q4 2024"
# Observation: [risultati]
# Thought: Ho tutti i dati, genero il report
# Action: generate_report
# ...
3. Plan-and-Execute Pattern
Il pattern Plan-and-Execute separa la pianificazione dall'esecuzione: prima un LLM crea un piano dettagliato dei passi necessari, poi gli agenti eseguono ogni passo in sequenza (o in parallelo). Rispetto a ReAct, permette una visione più olistica del problema e una migliore gestione della complessità.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import List, Dict, Any
import json
class PlanAndExecuteAgent:
"""
Agente Plan-and-Execute:
1. PLANNER: crea un piano step-by-step
2. EXECUTOR: esegue ogni step con i tool disponibili
3. RESPONDER: sintetizza i risultati in una risposta finale
"""
def __init__(self, llm, tools_dict: Dict[str, callable]):
self.llm = llm
self.tools = tools_dict
# PLANNER: LLM ottimizzato per pianificazione
self.planner_chain = (
ChatPromptTemplate.from_template("""
Sei un pianificatore esperto. Crea un piano dettagliato step-by-step per completare
il seguente task.
Tool disponibili: {available_tools}
Task: {task}
Crea un piano come lista JSON di steps. Ogni step deve avere:
- "step_id": numero progressivo (1, 2, 3...)
- "description": descrizione dell'azione
- "tool": nome del tool da usare (o "llm" per ragionamento puro)
- "depends_on": lista di step_id che devono completarsi prima (per parallelismo)
Rispondi SOLO con il JSON, nient'altro.""")
| llm
| StrOutputParser()
)
# EXECUTOR: esegue un singolo step
self.executor_chain = (
ChatPromptTemplate.from_template("""
Esegui questo step del piano:
Step: {step}
Risultati steps precedenti: {previous_results}
Se devi usare un tool, fornisci l'input necessario in modo preciso.
Descrivi brevemente cosa hai fatto e il risultato ottenuto.""")
| llm
| StrOutputParser()
)
# RESPONDER: sintesi finale
self.responder_chain = (
ChatPromptTemplate.from_template("""
Task originale: {task}
Piano eseguito con questi risultati:
{step_results}
Sintetizza i risultati in una risposta completa e strutturata per l'utente.""")
| llm
| StrOutputParser()
)
def _parse_plan(self, plan_json: str) -> List[Dict]:
"""Parse del piano JSON"""
try:
# Rimuovi eventuali backtick o prefissi markdown
clean = plan_json.strip().strip('`').strip()
if clean.startswith('json'):
clean = clean[4:].strip()
return json.loads(clean)
except json.JSONDecodeError:
# Fallback: crea un piano semplice
return [{"step_id": 1, "description": "Esegui il task", "tool": "llm", "depends_on": []}]
def _execute_step(self, step: Dict, previous_results: Dict) -> str:
"""Esegui un singolo step del piano"""
tool_name = step.get("tool", "llm")
if tool_name != "llm" and tool_name in self.tools:
# Usa il tool specifico
try:
tool_input = step.get("tool_input", step["description"])
result = self.tools[tool_name](tool_input)
return f"Tool {tool_name} eseguito: {result}"
except Exception as e:
return f"Errore nel tool {tool_name}: {str(e)}"
else:
# Ragionamento puro con LLM
return self.executor_chain.invoke({
"step": step["description"],
"previous_results": json.dumps(previous_results, ensure_ascii=False)
})
def run(self, task: str) -> dict:
"""Esegui il task con Plan-and-Execute"""
print(f"\nTask: {task}\n")
# STEP 1: Pianificazione
available_tools = list(self.tools.keys())
plan_json = self.planner_chain.invoke({
"task": task,
"available_tools": ", ".join(available_tools)
})
plan = self._parse_plan(plan_json)
print(f"Piano creato: {len(plan)} steps")
# STEP 2: Esecuzione sequenziale degli step
step_results = {}
completed_steps = set()
# Ordina per dipendenze (topological sort semplificato)
for step in plan:
step_id = step["step_id"]
depends_on = step.get("depends_on", [])
# Aspetta che i prerequisiti siano completati
while not all(str(d) in completed_steps for d in depends_on):
import time; time.sleep(0.1) # In pratica usa async
print(f" Esecuzione step {step_id}: {step['description'][:60]}...")
result = self._execute_step(step, step_results)
step_results[str(step_id)] = result
completed_steps.add(str(step_id))
# STEP 3: Sintesi finale
final_response = self.responder_chain.invoke({
"task": task,
"step_results": json.dumps(step_results, ensure_ascii=False, indent=2)
})
return {
"plan": plan,
"step_results": step_results,
"final_response": final_response
}
# Utilizzo
tools_dict = {
"search_web": lambda q: f"Risultati ricerca: {q}",
"analyze_data": lambda d: f"Analisi completata per: {d}",
"generate_report": lambda t: f"Report generato: {t}",
}
agent = PlanAndExecuteAgent(llm=llm, tools_dict=tools_dict)
result = agent.run("Analizza i dati di vendita 2024 e prepara un report esecutivo")
print(result["final_response"])
4. LangGraph: Orchestrazione Stateful con Grafi
LangGraph è la libreria di LangChain per costruire sistemi multi-agent stateful basati su grafi aciclici diretti (DAG) e grafi con cicli. Permette di definire esplicitamente il flusso di controllo tra agenti, gestire lo stato condiviso e implementare cicli di feedback. È la scelta migliore per sistemi multi-agent complessi in produzione.
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, List, Literal
import operator
import json
# ============================================================
# STATO CONDIVISO DEL SISTEMA
# ============================================================
class AgentState(TypedDict):
"""Stato condiviso tra tutti gli agenti nel grafo"""
messages: Annotated[List, operator.add] # Lista messaggi (append-only)
task: str # Task originale
plan: List[str] # Piano di esecuzione
current_step: int # Step corrente
research_results: str # Output agente ricerca
analysis_results: str # Output agente analisi
final_report: str # Report finale
next_agent: str # Prossimo agente da chiamare
error_count: int # Contatore errori
# ============================================================
# AGENTI SPECIALIZZATI
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def supervisor_agent(state: AgentState) -> AgentState:
"""
Agente supervisor: decide quale agente chiamare.
Analizza lo stato corrente e decide il prossimo passo.
"""
system_msg = SystemMessage(content="""Sei un supervisor AI che coordina un team di agenti.
Devi decidere quale agente chiamare in base allo stato corrente del task.
Agenti disponibili:
- research: raccoglie informazioni e dati
- analysis: analizza i dati raccolti
- report_writer: scrive il report finale
- FINISH: il task è completato
Rispondi con UN SOLO nome tra quelli disponibili.""")
messages = [system_msg, HumanMessage(content=f"""
Task: {state['task']}
Research completata: {'Si' if state['research_results'] else 'No'}
Analisi completata: {'Si' if state['analysis_results'] else 'No'}
Report scritto: {'Si' if state['final_report'] else 'No'}
Quale agente deve agire ora?""")]
response = llm.invoke(messages)
next_agent = response.content.strip().lower()
# Valida la risposta
valid_agents = ["research", "analysis", "report_writer", "finish"]
if next_agent not in valid_agents:
next_agent = "research" # Fallback sicuro
return {"next_agent": next_agent, "messages": [response]}
def research_agent(state: AgentState) -> AgentState:
"""Agente specializzato in ricerca e raccolta dati"""
messages = [
SystemMessage(content="Sei un ricercatore AI. Raccogli dati rilevanti per il task."),
HumanMessage(content=f"Raccogli informazioni per: {state['task']}")
]
response = llm.invoke(messages)
return {
"research_results": response.content,
"messages": [response]
}
def analysis_agent(state: AgentState) -> AgentState:
"""Agente specializzato in analisi dei dati"""
messages = [
SystemMessage(content="Sei un analista AI. Analizza i dati raccolti."),
HumanMessage(content=f"""
Dati da analizzare:
{state['research_results']}
Task originale: {state['task']}
Fornisci un'analisi strutturata.""")
]
response = llm.invoke(messages)
return {
"analysis_results": response.content,
"messages": [response]
}
def report_writer_agent(state: AgentState) -> AgentState:
"""Agente specializzato nella scrittura di report"""
messages = [
SystemMessage(content="Sei un writer AI specializzato in report tecnici."),
HumanMessage(content=f"""
Task: {state['task']}
Ricerca: {state['research_results']}
Analisi: {state['analysis_results']}
Scrivi un report professionale e completo.""")
]
response = llm.invoke(messages)
return {
"final_report": response.content,
"messages": [response]
}
# ============================================================
# ROUTING FUNCTION
# ============================================================
def route_to_agent(state: AgentState) -> Literal["research", "analysis", "report_writer", END]:
"""Funzione di routing: decide il nodo successivo nel grafo"""
next_agent = state.get("next_agent", "research")
if next_agent == "finish" or state.get("error_count", 0) > 5:
return END
return next_agent
# ============================================================
# COSTRUZIONE DEL GRAFO
# ============================================================
# Crea il grafo stateful
workflow = StateGraph(AgentState)
# Aggiungi nodi (agenti)
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("research", research_agent)
workflow.add_node("analysis", analysis_agent)
workflow.add_node("report_writer", report_writer_agent)
# Definisci entry point
workflow.set_entry_point("supervisor")
# Definisci edges condizionali
workflow.add_conditional_edges(
"supervisor",
route_to_agent,
{
"research": "research",
"analysis": "analysis",
"report_writer": "report_writer",
END: END
}
)
# Tutti gli agenti tornano al supervisor
workflow.add_edge("research", "supervisor")
workflow.add_edge("analysis", "supervisor")
workflow.add_edge("report_writer", "supervisor")
# Compila il grafo
app = workflow.compile()
# ============================================================
# ESECUZIONE
# ============================================================
initial_state = {
"task": "Analizza le tendenze del mercato AI nel 2025 e prepara un report esecutivo",
"messages": [],
"plan": [],
"current_step": 0,
"research_results": "",
"analysis_results": "",
"final_report": "",
"next_agent": "",
"error_count": 0
}
# Esegui il grafo
config = {"recursion_limit": 20}
final_state = app.invoke(initial_state, config=config)
print("\n=== REPORT FINALE ===")
print(final_state["final_report"])
5. Comunicazione e Coordinazione tra Agenti
La comunicazione efficace tra agenti è cruciale. Esistono tre principali pattern: message passing (agenti si scambiano messaggi strutturati), shared state (stato condiviso globale come in LangGraph) e blackboard (database condiviso in cui gli agenti scrivono e leggono).
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from datetime import datetime
import uuid
from enum import Enum
class MessageType(Enum):
TASK = "task" # Assegna un task a un agente
RESULT = "result" # Restituisce il risultato di un task
ERROR = "error" # Segnala un errore
STATUS = "status" # Aggiornamento di stato
QUERY = "query" # Richiesta di informazioni
RESPONSE = "response" # Risposta a una query
@dataclass
class AgentMessage:
"""Messaggio strutturato per comunicazione inter-agent"""
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
from_agent: str = ""
to_agent: str = ""
message_type: MessageType = MessageType.TASK
content: Any = None
metadata: Dict = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
correlation_id: Optional[str] = None # Per tracciare conversazioni
def is_response_to(self, original: 'AgentMessage') -> bool:
return self.correlation_id == original.message_id
def create_response(self, content: Any, message_type: MessageType = MessageType.RESULT):
"""Crea una risposta a questo messaggio"""
return AgentMessage(
from_agent=self.to_agent,
to_agent=self.from_agent,
message_type=message_type,
content=content,
correlation_id=self.message_id
)
class MessageBus:
"""Bus di messaggi per comunicazione asincrona tra agenti"""
def __init__(self):
self.queues: Dict[str, List[AgentMessage]] = {}
self.history: List[AgentMessage] = []
def register_agent(self, agent_id: str):
"""Registra un agente nel bus"""
self.queues[agent_id] = []
def send(self, message: AgentMessage):
"""Invia un messaggio a un agente"""
if message.to_agent not in self.queues:
self.queues[message.to_agent] = []
self.queues[message.to_agent].append(message)
self.history.append(message)
def receive(self, agent_id: str) -> Optional[AgentMessage]:
"""Ricevi il primo messaggio dalla coda di un agente"""
if agent_id in self.queues and self.queues[agent_id]:
return self.queues[agent_id].pop(0)
return None
def broadcast(self, message: AgentMessage, agents: List[str]):
"""Invia lo stesso messaggio a più agenti"""
for agent_id in agents:
msg_copy = AgentMessage(
from_agent=message.from_agent,
to_agent=agent_id,
message_type=message.message_type,
content=message.content,
metadata=message.metadata,
correlation_id=message.message_id
)
self.send(msg_copy)
def get_conversation(self, correlation_id: str) -> List[AgentMessage]:
"""Ottieni tutti i messaggi di una conversazione"""
return [m for m in self.history if
m.message_id == correlation_id or
m.correlation_id == correlation_id]
# Esempio utilizzo
bus = MessageBus()
bus.register_agent("supervisor")
bus.register_agent("research")
bus.register_agent("analysis")
# Supervisor assegna task alla ricerca
task_msg = AgentMessage(
from_agent="supervisor",
to_agent="research",
message_type=MessageType.TASK,
content="Ricerca tendenze AI 2025",
metadata={"priority": "high", "deadline": "2025-03-01"}
)
bus.send(task_msg)
# Research riceve e risponde
received = bus.receive("research")
if received:
result_msg = received.create_response(
content="Trovate 50 fonti rilevanti sulle tendenze AI 2025",
message_type=MessageType.RESULT
)
bus.send(result_msg)
6. Gestione degli Errori e Monitoring
I sistemi multi-agent introducono nuovi vettori di fallimento: agenti bloccati in loop, errori di comunicazione, output malformati, latenze eccessive. E' fondamentale implementare circuit breaker, timeout e monitoring specifico.
import asyncio
import time
from functools import wraps
from typing import Callable, TypeVar
from enum import Enum
T = TypeVar('T')
class CircuitState(Enum):
CLOSED = "closed" # Funziona normalmente
OPEN = "open" # Bloccato (troppi errori)
HALF_OPEN = "half_open" # Test: prova una chiamata
class CircuitBreaker:
"""
Circuit breaker per agenti AI.
Previene cascade failure quando un agente fallisce ripetutamente.
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
half_open_max_calls: int = 1
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.half_open_calls = 0
def __call__(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise Exception(
f"Circuit OPEN: agente non disponibile. "
f"Retry in {self.recovery_timeout - (time.time() - self.last_failure_time):.0f}s"
)
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise Exception("Circuit HALF_OPEN: attendere recovery")
self.half_open_calls += 1
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
return wrapper
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class MultiAgentMonitor:
"""Monitoring per sistemi multi-agent"""
def __init__(self):
self.agent_stats: Dict[str, Dict] = {}
self.task_history: List[Dict] = []
def record_agent_call(
self,
agent_id: str,
task: str,
success: bool,
duration_ms: float,
tokens_used: int = 0
):
"""Registra una chiamata a un agente"""
if agent_id not in self.agent_stats:
self.agent_stats[agent_id] = {
"total_calls": 0, "successes": 0, "failures": 0,
"total_tokens": 0, "total_duration_ms": 0
}
stats = self.agent_stats[agent_id]
stats["total_calls"] += 1
stats["successes" if success else "failures"] += 1
stats["total_tokens"] += tokens_used
stats["total_duration_ms"] += duration_ms
def get_system_health(self) -> dict:
"""Ritorna lo stato di salute del sistema"""
results = {}
for agent_id, stats in self.agent_stats.items():
total = stats["total_calls"]
success_rate = stats["successes"] / total if total > 0 else 0
avg_duration = stats["total_duration_ms"] / total if total > 0 else 0
results[agent_id] = {
"success_rate": success_rate,
"avg_duration_ms": avg_duration,
"total_calls": total,
"status": "healthy" if success_rate > 0.9 else
"degraded" if success_rate > 0.7 else "critical"
}
return results
7. Best Practices e Anti-Pattern
Best Practices Multi-Agent
- Principio di singola responsabilità: ogni agente deve avere un ruolo chiaramente definito e limitato. Un agente che fa "tutto" è difficile da debuggare e migliorare.
- Timeout su ogni agente: nessun agente deve poter bloccare il sistema indefinitamente. Imposta timeout realistici (10-30 secondi per agenti LLM).
- Stato esplicito e serializzabile: usa TypedDict o Pydantic per definire lo stato. Deve essere serializzabile (JSON) per logging, debugging e recovery.
- Circuit breaker per ogni agente: previeni cascade failure. Se un agente fallisce ripetutamente, il circuit breaker ferma le chiamate e permette il recovery.
- Traccia ogni decisione: logga ogni decisione del supervisor con reasoning esplicito. Il debugging di sistemi multi-agent senza trace è quasi impossibile.
Anti-Pattern da Evitare
- Agenti troppo generici: un agente "assistente universale" non è più efficace di un LLM singolo, ma è molto più costoso e difficile da controllare.
- Nessun limite di iterazione: senza max_iterations, un sistema ReAct può girare in loop infinito, consumando token e soldi. Imposta sempre un limite.
- Stato globale mutabile condiviso: la condivisione di stato mutabile tra agenti introduce race condition. Usa messaggi immutabili o stato gestito centralmente.
- Fidarsi ciecamente dell'output di un agente: un agente può produrre output malformati o con allucinazioni. Valida sempre l'output prima di passarlo al prossimo agente.
Conclusioni
I sistemi multi-agent rappresentano il futuro dell'AI applicata a problemi complessi. Abbiamo esplorato i pattern fondamentali (ReAct, Plan-and-Execute, Supervisor), LangGraph per orchestrazione stateful, pattern di comunicazione tra agenti e gestione degli errori in produzione.
I punti chiave:
- Scegli la topologia giusta: pipeline per flussi lineari, supervisor per routing dinamico
- ReAct è eccellente per task esplorativi con tool; Plan-and-Execute per task strutturati
- LangGraph è la scelta migliore per sistemi complessi in produzione
- Ogni agente deve avere un ruolo singolo, ben definito
- Circuit breaker e timeout sono indispensabili in produzione
- Traccia ogni decisione del supervisor per debugging efficace
Nel prossimo articolo esploreremo il Prompt Engineering in Produzione: template, versioning e testing sistematico per garantire qualità e consistenza nel tempo.
Continua la Serie
- Articolo 7: Context Window Management
- Articolo 8: Multi-Agent Systems (corrente)
- Articolo 9: Prompt Engineering in Produzione
- Articolo 10: Knowledge Graphs per AI
Approfondisci con: MLOps: Model Serving e LangChain per RAG.







