Sisteme multi-agenți: orchestrare și comunicare
I sisteme multi-agent sunt una dintre cele mai explozive tendințe în IA în 2025. Gartner a raportat o creștere cu 1.445% a interogărilor despre „sisteme multi-agente” între Q1 2024 și Q2 2025. Motivul este simplu: multe probleme reale sunt prea complexe pentru un singur agent AI. Un agent de cercetare care trebuie să colecteze date, să le analizeze, scrierea unui raport și trimiterea prin e-mail nu este o singură problemă: sunt patru probleme specialişti care necesită competenţe diferite.
În acest articol construim sisteme reale multi-agent: din arhitectura de bază (orchestrator + agenți specializați) până la modele avansate precum Reacţiona (Raționament + Acțiune), Planificați și executați, Supraveghetor-Lucrător e LangGraph pentru orchestrare cu stare. Fiecare secțiune include cod Python funcțional și modele testate pe probleme reale.
Ce vei învăța
- Arhitectura și modelele sistemelor multi-agent
- Model ReAct: Raționare + Acționare cu apelarea instrumentului
- Planificare și execuție: Separați planificarea de execuție
- Model de supraveghetor: orchestrator care coordonează agenți specializați
- LangGraph pentru orchestrare cu stare cu grafic de control
- Comunicarea si coordonarea intre agenti
- Gestionarea erorilor și alternativă în sistemele cu mai mulți agenți
- Monitorizare și observabilitate pentru conducte multi-agenți
1. Arhitectura sistemelor multi-agenți
Un sistem multi-agent este compus din mai mulți agenţi (LLM + set de scule) care colaborează pentru atingerea unui scop comun. Cheia succesului este separarea responsabilitatilor: fiecare agent este specializat în a domeniu specific și știe când să delege altor agenți.
TOPOLOGIE MULTI-AGENT:
1. NETWORK (fully connected):
Ogni agente può chiamare qualsiasi altro agente.
Pro: massima flessibilità
Con: difficile da controllare, rischio di loop
A ◄──► B
▲ ▲
└──► C ◄┘
2. SUPERVISOR (star topology):
Un agente centrale orchestra tutti gli altri.
Pro: controllo centralizzato, facile da debuggare
Con: single point of failure, bottleneck
SUPERVISOR
├──► Agent A (ricerca)
├──► Agent B (analisi)
└──► Agent C (report)
3. HIERARCHICAL:
Supervisori multipli organizzati in gerarchia.
Pro: scalabilità, separazione chiara delle responsabilità
Con: latenza aumentata, complessità coordinazione
Manager
├──► Sub-manager A
│ ├──► Worker A1
│ └──► Worker A2
└──► Sub-manager B
├──► Worker B1
└──► Worker B2
4. PIPELINE (sequential):
Ogni agente processa l'output del precedente.
Pro: semplice, deterministico, facile da debuggare
Con: rigido, nessuna retroazione
Input ──► A ──► B ──► C ──► Output
SCELTA DELLA TOPOLOGIA:
- Task ben definiti, ordine chiaro → Pipeline
- Task con routing dinamico → Supervisor
- Problemi complessi con sottoproblemi → Hierarchical
- Ricerca esplorativa → Network (con guardrail)
2. ReAct Pattern: Raționament + Acțiune
Reacţiona (Raționare + Acțiune) este modelul fundamental pentru agenții AI cu acces la instrumente. Agentul alternează între Gânduri (raționarea cu privire la ceea ce trebuie făcut), Acţiune (execuția unui instrument) e Observare (interpretarea lui rezultat), repetând ciclul până când se ajunge la răspunsul final.
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain import hub
from typing import Optional
import requests
import json
# ============================================================
# DEFINIZIONE DEI TOOL
# ============================================================
@tool
def search_web(query: str) -> str:
"""Cerca informazioni sul web. Usa per trovare dati aggiornati o notizie."""
# Integrazione con Tavily, SerpAPI o simili
# Per semplicità, simuliamo una risposta
return f"Risultati ricerca per '{query}': [risultati simulati per demo]"
@tool
def analyze_data(data: str, analysis_type: str) -> str:
"""
Analizza dati numerici o testuali.
analysis_type può essere: 'summary', 'trend', 'anomaly', 'comparison'
"""
return f"Analisi {analysis_type} completata: [analisi simulata]"
@tool
def generate_report(title: str, sections: str) -> str:
"""
Genera un report strutturato.
sections: JSON con le sezioni del report
"""
return f"Report '{title}' generato con sezioni: {sections}"
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Invia un'email. Usa per comunicare risultati all'utente finale."""
return f"Email inviata a {to} con oggetto '{subject}'"
@tool
def query_database(query: str, database: str = "main") -> str:
"""
Esegue query su database interni.
database: 'main' per il DB principale, 'analytics' per il DW
"""
return f"Query su {database}: [risultati simulati]"
# ============================================================
# CREAZIONE AGENTE REACT
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Tool set disponibili all'agente
tools = [search_web, analyze_data, generate_report, send_email, query_database]
# Prompt ReAct standard (da LangChain Hub)
react_prompt = hub.pull("hwchase17/react")
# Il prompt include: instructions per pensiero/azione/osservazione
# con schema: "Thought: ...\nAction: tool_name\nAction Input: ...\nObservation: ..."
# Crea l'agente ReAct
agent = create_react_agent(
llm=llm,
tools=tools,
prompt=react_prompt
)
# AgentExecutor: gestisce il loop thought-action-observation
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True, # Mostra ogni step del reasoning
max_iterations=10, # Previeni loop infiniti
early_stopping_method="generate", # Genera risposta se troppi step
handle_parsing_errors=True # Gestisci errori di parsing gracefully
)
# Esempio: task complesso che richiede più step
task = """
Analizza le performance di vendita Q4 2024 dal database analytics,
cerca notizie recenti sul settore, genera un report con raccomandazioni
e invialo a manager@company.com
"""
result = agent_executor.invoke({"input": task})
print(f"\nRisposta finale:\n{result['output']}")
# Output tipico del reasoning ReAct:
# Thought: Devo prima ottenere i dati di vendita dal database
# Action: query_database
# Action Input: {"query": "SELECT * FROM sales WHERE quarter='Q4' AND year=2024"}
# Observation: [dati vendite]
# Thought: Ora cerco notizie recenti sul settore
# Action: search_web
# Action Input: "notizie settore vendite Q4 2024"
# Observation: [risultati]
# Thought: Ho tutti i dati, genero il report
# Action: generate_report
# ...
3. Model de planificare și execuție
Modelul Planificați și executați planificare separată de execuție: mai întâi un LLM creează un plan detaliat al pașilor necesari, apoi agenții execută fiecare pas secvenţial (sau în paralel). În comparație cu ReAct, permite vederea abordare mai holistică a problemei și un management mai bun al complexității.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import List, Dict, Any
import json
class PlanAndExecuteAgent:
"""
Agente Plan-and-Execute:
1. PLANNER: crea un piano step-by-step
2. EXECUTOR: esegue ogni step con i tool disponibili
3. RESPONDER: sintetizza i risultati in una risposta finale
"""
def __init__(self, llm, tools_dict: Dict[str, callable]):
self.llm = llm
self.tools = tools_dict
# PLANNER: LLM ottimizzato per pianificazione
self.planner_chain = (
ChatPromptTemplate.from_template("""
Sei un pianificatore esperto. Crea un piano dettagliato step-by-step per completare
il seguente task.
Tool disponibili: {available_tools}
Task: {task}
Crea un piano come lista JSON di steps. Ogni step deve avere:
- "step_id": numero progressivo (1, 2, 3...)
- "description": descrizione dell'azione
- "tool": nome del tool da usare (o "llm" per ragionamento puro)
- "depends_on": lista di step_id che devono completarsi prima (per parallelismo)
Rispondi SOLO con il JSON, nient'altro.""")
| llm
| StrOutputParser()
)
# EXECUTOR: esegue un singolo step
self.executor_chain = (
ChatPromptTemplate.from_template("""
Esegui questo step del piano:
Step: {step}
Risultati steps precedenti: {previous_results}
Se devi usare un tool, fornisci l'input necessario in modo preciso.
Descrivi brevemente cosa hai fatto e il risultato ottenuto.""")
| llm
| StrOutputParser()
)
# RESPONDER: sintesi finale
self.responder_chain = (
ChatPromptTemplate.from_template("""
Task originale: {task}
Piano eseguito con questi risultati:
{step_results}
Sintetizza i risultati in una risposta completa e strutturata per l'utente.""")
| llm
| StrOutputParser()
)
def _parse_plan(self, plan_json: str) -> List[Dict]:
"""Parse del piano JSON"""
try:
# Rimuovi eventuali backtick o prefissi markdown
clean = plan_json.strip().strip('`').strip()
if clean.startswith('json'):
clean = clean[4:].strip()
return json.loads(clean)
except json.JSONDecodeError:
# Fallback: crea un piano semplice
return [{"step_id": 1, "description": "Esegui il task", "tool": "llm", "depends_on": []}]
def _execute_step(self, step: Dict, previous_results: Dict) -> str:
"""Esegui un singolo step del piano"""
tool_name = step.get("tool", "llm")
if tool_name != "llm" and tool_name in self.tools:
# Usa il tool specifico
try:
tool_input = step.get("tool_input", step["description"])
result = self.tools[tool_name](tool_input)
return f"Tool {tool_name} eseguito: {result}"
except Exception as e:
return f"Errore nel tool {tool_name}: {str(e)}"
else:
# Ragionamento puro con LLM
return self.executor_chain.invoke({
"step": step["description"],
"previous_results": json.dumps(previous_results, ensure_ascii=False)
})
def run(self, task: str) -> dict:
"""Esegui il task con Plan-and-Execute"""
print(f"\nTask: {task}\n")
# STEP 1: Pianificazione
available_tools = list(self.tools.keys())
plan_json = self.planner_chain.invoke({
"task": task,
"available_tools": ", ".join(available_tools)
})
plan = self._parse_plan(plan_json)
print(f"Piano creato: {len(plan)} steps")
# STEP 2: Esecuzione sequenziale degli step
step_results = {}
completed_steps = set()
# Ordina per dipendenze (topological sort semplificato)
for step in plan:
step_id = step["step_id"]
depends_on = step.get("depends_on", [])
# Aspetta che i prerequisiti siano completati
while not all(str(d) in completed_steps for d in depends_on):
import time; time.sleep(0.1) # In pratica usa async
print(f" Esecuzione step {step_id}: {step['description'][:60]}...")
result = self._execute_step(step, step_results)
step_results[str(step_id)] = result
completed_steps.add(str(step_id))
# STEP 3: Sintesi finale
final_response = self.responder_chain.invoke({
"task": task,
"step_results": json.dumps(step_results, ensure_ascii=False, indent=2)
})
return {
"plan": plan,
"step_results": step_results,
"final_response": final_response
}
# Utilizzo
tools_dict = {
"search_web": lambda q: f"Risultati ricerca: {q}",
"analyze_data": lambda d: f"Analisi completata per: {d}",
"generate_report": lambda t: f"Report generato: {t}",
}
agent = PlanAndExecuteAgent(llm=llm, tools_dict=tools_dict)
result = agent.run("Analizza i dati di vendita 2024 e prepara un report esecutivo")
print(result["final_response"])
4. LangGraph: orchestrare cu graffi
LangGraph este biblioteca LangChain pentru construirea de sisteme multi-agent stateful bazat pe grafice aciclice direcționate (DAG) și grafice cu cicluri. Vă permite să definiți controlați în mod explicit fluxul dintre agenți, gestionați starea partajată și implementați bucle de feedback. Este cea mai bună alegere pentru sisteme complexe multi-agent in productie.
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, List, Literal
import operator
import json
# ============================================================
# STATO CONDIVISO DEL SISTEMA
# ============================================================
class AgentState(TypedDict):
"""Stato condiviso tra tutti gli agenti nel grafo"""
messages: Annotated[List, operator.add] # Lista messaggi (append-only)
task: str # Task originale
plan: List[str] # Piano di esecuzione
current_step: int # Step corrente
research_results: str # Output agente ricerca
analysis_results: str # Output agente analisi
final_report: str # Report finale
next_agent: str # Prossimo agente da chiamare
error_count: int # Contatore errori
# ============================================================
# AGENTI SPECIALIZZATI
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def supervisor_agent(state: AgentState) -> AgentState:
"""
Agente supervisor: decide quale agente chiamare.
Analizza lo stato corrente e decide il prossimo passo.
"""
system_msg = SystemMessage(content="""Sei un supervisor AI che coordina un team di agenti.
Devi decidere quale agente chiamare in base allo stato corrente del task.
Agenti disponibili:
- research: raccoglie informazioni e dati
- analysis: analizza i dati raccolti
- report_writer: scrive il report finale
- FINISH: il task è completato
Rispondi con UN SOLO nome tra quelli disponibili.""")
messages = [system_msg, HumanMessage(content=f"""
Task: {state['task']}
Research completata: {'Si' if state['research_results'] else 'No'}
Analisi completata: {'Si' if state['analysis_results'] else 'No'}
Report scritto: {'Si' if state['final_report'] else 'No'}
Quale agente deve agire ora?""")]
response = llm.invoke(messages)
next_agent = response.content.strip().lower()
# Valida la risposta
valid_agents = ["research", "analysis", "report_writer", "finish"]
if next_agent not in valid_agents:
next_agent = "research" # Fallback sicuro
return {"next_agent": next_agent, "messages": [response]}
def research_agent(state: AgentState) -> AgentState:
"""Agente specializzato in ricerca e raccolta dati"""
messages = [
SystemMessage(content="Sei un ricercatore AI. Raccogli dati rilevanti per il task."),
HumanMessage(content=f"Raccogli informazioni per: {state['task']}")
]
response = llm.invoke(messages)
return {
"research_results": response.content,
"messages": [response]
}
def analysis_agent(state: AgentState) -> AgentState:
"""Agente specializzato in analisi dei dati"""
messages = [
SystemMessage(content="Sei un analista AI. Analizza i dati raccolti."),
HumanMessage(content=f"""
Dati da analizzare:
{state['research_results']}
Task originale: {state['task']}
Fornisci un'analisi strutturata.""")
]
response = llm.invoke(messages)
return {
"analysis_results": response.content,
"messages": [response]
}
def report_writer_agent(state: AgentState) -> AgentState:
"""Agente specializzato nella scrittura di report"""
messages = [
SystemMessage(content="Sei un writer AI specializzato in report tecnici."),
HumanMessage(content=f"""
Task: {state['task']}
Ricerca: {state['research_results']}
Analisi: {state['analysis_results']}
Scrivi un report professionale e completo.""")
]
response = llm.invoke(messages)
return {
"final_report": response.content,
"messages": [response]
}
# ============================================================
# ROUTING FUNCTION
# ============================================================
def route_to_agent(state: AgentState) -> Literal["research", "analysis", "report_writer", END]:
"""Funzione di routing: decide il nodo successivo nel grafo"""
next_agent = state.get("next_agent", "research")
if next_agent == "finish" or state.get("error_count", 0) > 5:
return END
return next_agent
# ============================================================
# COSTRUZIONE DEL GRAFO
# ============================================================
# Crea il grafo stateful
workflow = StateGraph(AgentState)
# Aggiungi nodi (agenti)
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("research", research_agent)
workflow.add_node("analysis", analysis_agent)
workflow.add_node("report_writer", report_writer_agent)
# Definisci entry point
workflow.set_entry_point("supervisor")
# Definisci edges condizionali
workflow.add_conditional_edges(
"supervisor",
route_to_agent,
{
"research": "research",
"analysis": "analysis",
"report_writer": "report_writer",
END: END
}
)
# Tutti gli agenti tornano al supervisor
workflow.add_edge("research", "supervisor")
workflow.add_edge("analysis", "supervisor")
workflow.add_edge("report_writer", "supervisor")
# Compila il grafo
app = workflow.compile()
# ============================================================
# ESECUZIONE
# ============================================================
initial_state = {
"task": "Analizza le tendenze del mercato AI nel 2025 e prepara un report esecutivo",
"messages": [],
"plan": [],
"current_step": 0,
"research_results": "",
"analysis_results": "",
"final_report": "",
"next_agent": "",
"error_count": 0
}
# Esegui il grafo
config = {"recursion_limit": 20}
final_state = app.invoke(initial_state, config=config)
print("\n=== REPORT FINALE ===")
print(final_state["final_report"])
5. Comunicare și coordonare între agenți
Comunicarea eficientă între agenți este crucială. Există trei modele principale: trecerea mesajului (agenții fac schimb de mesaje structurate), stare comună (starea globală partajată ca în LangGraph) e tablă (bază de date partajată în care agenții scriu și citesc).
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from datetime import datetime
import uuid
from enum import Enum
class MessageType(Enum):
TASK = "task" # Assegna un task a un agente
RESULT = "result" # Restituisce il risultato di un task
ERROR = "error" # Segnala un errore
STATUS = "status" # Aggiornamento di stato
QUERY = "query" # Richiesta di informazioni
RESPONSE = "response" # Risposta a una query
@dataclass
class AgentMessage:
"""Messaggio strutturato per comunicazione inter-agent"""
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
from_agent: str = ""
to_agent: str = ""
message_type: MessageType = MessageType.TASK
content: Any = None
metadata: Dict = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
correlation_id: Optional[str] = None # Per tracciare conversazioni
def is_response_to(self, original: 'AgentMessage') -> bool:
return self.correlation_id == original.message_id
def create_response(self, content: Any, message_type: MessageType = MessageType.RESULT):
"""Crea una risposta a questo messaggio"""
return AgentMessage(
from_agent=self.to_agent,
to_agent=self.from_agent,
message_type=message_type,
content=content,
correlation_id=self.message_id
)
class MessageBus:
"""Bus di messaggi per comunicazione asincrona tra agenti"""
def __init__(self):
self.queues: Dict[str, List[AgentMessage]] = {}
self.history: List[AgentMessage] = []
def register_agent(self, agent_id: str):
"""Registra un agente nel bus"""
self.queues[agent_id] = []
def send(self, message: AgentMessage):
"""Invia un messaggio a un agente"""
if message.to_agent not in self.queues:
self.queues[message.to_agent] = []
self.queues[message.to_agent].append(message)
self.history.append(message)
def receive(self, agent_id: str) -> Optional[AgentMessage]:
"""Ricevi il primo messaggio dalla coda di un agente"""
if agent_id in self.queues and self.queues[agent_id]:
return self.queues[agent_id].pop(0)
return None
def broadcast(self, message: AgentMessage, agents: List[str]):
"""Invia lo stesso messaggio a più agenti"""
for agent_id in agents:
msg_copy = AgentMessage(
from_agent=message.from_agent,
to_agent=agent_id,
message_type=message.message_type,
content=message.content,
metadata=message.metadata,
correlation_id=message.message_id
)
self.send(msg_copy)
def get_conversation(self, correlation_id: str) -> List[AgentMessage]:
"""Ottieni tutti i messaggi di una conversazione"""
return [m for m in self.history if
m.message_id == correlation_id or
m.correlation_id == correlation_id]
# Esempio utilizzo
bus = MessageBus()
bus.register_agent("supervisor")
bus.register_agent("research")
bus.register_agent("analysis")
# Supervisor assegna task alla ricerca
task_msg = AgentMessage(
from_agent="supervisor",
to_agent="research",
message_type=MessageType.TASK,
content="Ricerca tendenze AI 2025",
metadata={"priority": "high", "deadline": "2025-03-01"}
)
bus.send(task_msg)
# Research riceve e risponde
received = bus.receive("research")
if received:
result_msg = received.create_response(
content="Trovate 50 fonti rilevanti sulle tendenze AI 2025",
message_type=MessageType.RESULT
)
bus.send(result_msg)
6. Gestionarea și monitorizarea erorilor
Sistemele multi-agent introduc noi vectori de defecțiune: agenți blocați bucle, erori de comunicare, ieșiri defectuoase, latențe excesive. Este fundamental implementați întrerupător, timeout și monitorizare specifică.
import asyncio
import time
from functools import wraps
from typing import Callable, TypeVar
from enum import Enum
T = TypeVar('T')
class CircuitState(Enum):
CLOSED = "closed" # Funziona normalmente
OPEN = "open" # Bloccato (troppi errori)
HALF_OPEN = "half_open" # Test: prova una chiamata
class CircuitBreaker:
"""
Circuit breaker per agenti AI.
Previene cascade failure quando un agente fallisce ripetutamente.
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
half_open_max_calls: int = 1
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.half_open_calls = 0
def __call__(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise Exception(
f"Circuit OPEN: agente non disponibile. "
f"Retry in {self.recovery_timeout - (time.time() - self.last_failure_time):.0f}s"
)
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise Exception("Circuit HALF_OPEN: attendere recovery")
self.half_open_calls += 1
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
return wrapper
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class MultiAgentMonitor:
"""Monitoring per sistemi multi-agent"""
def __init__(self):
self.agent_stats: Dict[str, Dict] = {}
self.task_history: List[Dict] = []
def record_agent_call(
self,
agent_id: str,
task: str,
success: bool,
duration_ms: float,
tokens_used: int = 0
):
"""Registra una chiamata a un agente"""
if agent_id not in self.agent_stats:
self.agent_stats[agent_id] = {
"total_calls": 0, "successes": 0, "failures": 0,
"total_tokens": 0, "total_duration_ms": 0
}
stats = self.agent_stats[agent_id]
stats["total_calls"] += 1
stats["successes" if success else "failures"] += 1
stats["total_tokens"] += tokens_used
stats["total_duration_ms"] += duration_ms
def get_system_health(self) -> dict:
"""Ritorna lo stato di salute del sistema"""
results = {}
for agent_id, stats in self.agent_stats.items():
total = stats["total_calls"]
success_rate = stats["successes"] / total if total > 0 else 0
avg_duration = stats["total_duration_ms"] / total if total > 0 else 0
results[agent_id] = {
"success_rate": success_rate,
"avg_duration_ms": avg_duration,
"total_calls": total,
"status": "healthy" if success_rate > 0.9 else
"degraded" if success_rate > 0.7 else "critical"
}
return results
7. Cele mai bune practici și anti-modele
Cele mai bune practici pentru mai mulți agenți
- Principiul responsabilitatii unice: fiecare agent trebuie să aibă un rol clar definit și limitat. Un agent care face „totul” este greu de depanat și îmbunătățit.
- Timeout pentru fiecare agent: niciun agent nu ar trebui să poată bloca sistemul la infinit. Setați intervale de timp realiste (10-30 de secunde pentru agenții LLM).
- Stare explicită și serializabilă: utilizați TypedDict sau Pydantic pentru a defini starea. Trebuie să fie serializabil (JSON) pentru înregistrare, depanare și recuperare.
- Întrerupător pentru fiecare agent: preveni defectarea cascadei. Dacă un agent eșuează în mod repetat, întrerupătorul de circuit oprește apelurile și permite recuperarea.
- Urmăriți fiecare decizie: înregistrați fiecare decizie a supervizorului cu un raționament explicit. Depanarea sistemelor multi-agent fără urme este aproape imposibilă.
Anti-modele de evitat
- Agenți prea generici: un agent „asistent universal” nu este mai eficient decât un singur LLM, dar este mult mai scump și mai greu de controlat.
- Fără limită de iterație: fără max_iterations, un sistem ReAct poate rula într-o buclă infinită, consumând jetoane și bani. Stabiliți întotdeauna o limită.
- Stare globală mutabilă partajată: partajarea stării mutabile între agenți introduce condiții de rasă. Utilizați mesaje imuabile sau stare gestionată central.
- Încredere oarbă în rezultatul unui agent: un agent poate produce rezultate malformate sau halucinatorii. Validați întotdeauna rezultatul înainte de a o transmite următorului agent.
Concluzii
Sistemele multi-agenți reprezintă viitorul AI aplicat problemelor complexe. Am explorat modelele fundamentale (React, Plan-and-Execute, Supervizor), LangGraph pentru orchestrare cu stare, modele de comunicare inter-agenți și managementul erorilor în producție.
Punctele cheie:
- Alegeți topologia potrivită: conductă pentru fluxuri liniare, supraveghetor pentru rutare dinamică
- ReAct este excelent pentru sarcini de explorare cu instrumente; Planificați și executați pentru sarcini structurate
- LangGraph este cea mai bună alegere pentru sistemele complexe în producție
- Fiecare agent trebuie să aibă un singur rol, bine definit
- Întreruptoarele și timeout-urile sunt esențiale în producție
- Urmăriți fiecare decizie a supervizorului pentru o depanare eficientă
În următorul articol vom explora Inginerie promptă în producție: șablon, versiunea și testarea sistematică pentru a asigura calitatea și consistența de-a lungul timpului.
Seria continuă
- Articolul 7: Gestionarea ferestrei de context
- Articolul 8: Sisteme cu mai mulți agenți (actual)
- Articolul 9: Inginerie promptă în producție
- Articolul 10: Grafice de cunoștințe pentru IA
Aflați mai multe cu: MLOps: Servire model e LangChain pentru RAG.







