Multi-Agent Systems: Orchestrace a komunikace
I multiagentní systémy jsou jedním z nejvýbušnějších trendů v AI v roce 2025. Gartner oznámil 1 445% nárůst dotazů na „multi-agentní systémy“ mezi 1. čtvrtletí 2024 a 2. čtvrtletí 2025. Důvod je jednoduchý: mnoho skutečných problémů je příliš složitých pro jednoho agenta AI. Výzkumný agent, který musí sbírat data, analyzovat je, napsat zprávu a odeslat ji e-mailem není jediný problém: jsou to čtyři problémy specialisty, kteří vyžadují různé dovednosti.
V tomto článku stavíme skutečné multiagentní systémy: od základní architektury (orchestrátor + specializovaní agenti) až po pokročilé vzory jako např Reagovat (uvažování + jednání), Plánujte a provádějte, Vedoucí-pracovník e LangGraph pro orchestraci stavový. Každá sekce obsahuje funkční kód Pythonu a vzory testované na skutečných problémech.
Co se naučíte
- Architektura a vzory multiagentních systémů
- Vzor ReAct: Uvažování + jednání s vyvoláním nástroje
- Plan-and-Execute: Oddělte plánování od provádění
- Vzor supervizora: orchestrátor, který koordinuje specializované agenty
- LangGraph pro stavovou orchestraci s řídicím grafem
- Komunikace a koordinace mezi agenty
- Ošetření chyb a nouzové řešení v multiagentních systémech
- Monitorování a pozorovatelnost pro multiagentní potrubí
1. Architektura multiagentních systémů
Multiagentní systém se skládá z více agenti (LLM + sada nářadí) kteří spolupracují na dosažení společného cíle. Klíčem k úspěchu je oddělení odpovědnosti: každý agent se specializuje na a konkrétní doménu a ví, kdy delegovat na jiné agenty.
TOPOLOGIE MULTI-AGENT:
1. NETWORK (fully connected):
Ogni agente può chiamare qualsiasi altro agente.
Pro: massima flessibilità
Con: difficile da controllare, rischio di loop
A ◄──► B
▲ ▲
└──► C ◄┘
2. SUPERVISOR (star topology):
Un agente centrale orchestra tutti gli altri.
Pro: controllo centralizzato, facile da debuggare
Con: single point of failure, bottleneck
SUPERVISOR
├──► Agent A (ricerca)
├──► Agent B (analisi)
└──► Agent C (report)
3. HIERARCHICAL:
Supervisori multipli organizzati in gerarchia.
Pro: scalabilità, separazione chiara delle responsabilità
Con: latenza aumentata, complessità coordinazione
Manager
├──► Sub-manager A
│ ├──► Worker A1
│ └──► Worker A2
└──► Sub-manager B
├──► Worker B1
└──► Worker B2
4. PIPELINE (sequential):
Ogni agente processa l'output del precedente.
Pro: semplice, deterministico, facile da debuggare
Con: rigido, nessuna retroazione
Input ──► A ──► B ──► C ──► Output
SCELTA DELLA TOPOLOGIA:
- Task ben definiti, ordine chiaro → Pipeline
- Task con routing dinamico → Supervisor
- Problemi complessi con sottoproblemi → Hierarchical
- Ricerca esplorativa → Network (con guardrail)
2. Vzor reakce: uvažování + jednání
Reagovat (Uvažování + jednání) je základní vzorec pro agenty AI s přístupem k nářadí. Agent se střídá Myšlenky (uvažování o tom, co dělat), Akce (provedení nástroje) e Pozorování (výklad výsledek), opakování cyklu, dokud není dosaženo konečné odpovědi.
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain import hub
from typing import Optional
import requests
import json
# ============================================================
# DEFINIZIONE DEI TOOL
# ============================================================
@tool
def search_web(query: str) -> str:
"""Cerca informazioni sul web. Usa per trovare dati aggiornati o notizie."""
# Integrazione con Tavily, SerpAPI o simili
# Per semplicità, simuliamo una risposta
return f"Risultati ricerca per '{query}': [risultati simulati per demo]"
@tool
def analyze_data(data: str, analysis_type: str) -> str:
"""
Analizza dati numerici o testuali.
analysis_type può essere: 'summary', 'trend', 'anomaly', 'comparison'
"""
return f"Analisi {analysis_type} completata: [analisi simulata]"
@tool
def generate_report(title: str, sections: str) -> str:
"""
Genera un report strutturato.
sections: JSON con le sezioni del report
"""
return f"Report '{title}' generato con sezioni: {sections}"
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Invia un'email. Usa per comunicare risultati all'utente finale."""
return f"Email inviata a {to} con oggetto '{subject}'"
@tool
def query_database(query: str, database: str = "main") -> str:
"""
Esegue query su database interni.
database: 'main' per il DB principale, 'analytics' per il DW
"""
return f"Query su {database}: [risultati simulati]"
# ============================================================
# CREAZIONE AGENTE REACT
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Tool set disponibili all'agente
tools = [search_web, analyze_data, generate_report, send_email, query_database]
# Prompt ReAct standard (da LangChain Hub)
react_prompt = hub.pull("hwchase17/react")
# Il prompt include: instructions per pensiero/azione/osservazione
# con schema: "Thought: ...\nAction: tool_name\nAction Input: ...\nObservation: ..."
# Crea l'agente ReAct
agent = create_react_agent(
llm=llm,
tools=tools,
prompt=react_prompt
)
# AgentExecutor: gestisce il loop thought-action-observation
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True, # Mostra ogni step del reasoning
max_iterations=10, # Previeni loop infiniti
early_stopping_method="generate", # Genera risposta se troppi step
handle_parsing_errors=True # Gestisci errori di parsing gracefully
)
# Esempio: task complesso che richiede più step
task = """
Analizza le performance di vendita Q4 2024 dal database analytics,
cerca notizie recenti sul settore, genera un report con raccomandazioni
e invialo a manager@company.com
"""
result = agent_executor.invoke({"input": task})
print(f"\nRisposta finale:\n{result['output']}")
# Output tipico del reasoning ReAct:
# Thought: Devo prima ottenere i dati di vendita dal database
# Action: query_database
# Action Input: {"query": "SELECT * FROM sales WHERE quarter='Q4' AND year=2024"}
# Observation: [dati vendite]
# Thought: Ora cerco notizie recenti sul settore
# Action: search_web
# Action Input: "notizie settore vendite Q4 2024"
# Observation: [risultati]
# Thought: Ho tutti i dati, genero il report
# Action: generate_report
# ...
3. Vzor naplánujte a proveďte
Vzor Plánujte a provádějte oddělené plánování od realizace: nejprve LLM vytvoří podrobný plán nezbytných kroků, poté agenti provedou každý krok postupně (nebo paralelně). Ve srovnání s ReAct umožňuje vidění holistický přístup k problému a lepší řízení složitosti.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import List, Dict, Any
import json
class PlanAndExecuteAgent:
"""
Agente Plan-and-Execute:
1. PLANNER: crea un piano step-by-step
2. EXECUTOR: esegue ogni step con i tool disponibili
3. RESPONDER: sintetizza i risultati in una risposta finale
"""
def __init__(self, llm, tools_dict: Dict[str, callable]):
self.llm = llm
self.tools = tools_dict
# PLANNER: LLM ottimizzato per pianificazione
self.planner_chain = (
ChatPromptTemplate.from_template("""
Sei un pianificatore esperto. Crea un piano dettagliato step-by-step per completare
il seguente task.
Tool disponibili: {available_tools}
Task: {task}
Crea un piano come lista JSON di steps. Ogni step deve avere:
- "step_id": numero progressivo (1, 2, 3...)
- "description": descrizione dell'azione
- "tool": nome del tool da usare (o "llm" per ragionamento puro)
- "depends_on": lista di step_id che devono completarsi prima (per parallelismo)
Rispondi SOLO con il JSON, nient'altro.""")
| llm
| StrOutputParser()
)
# EXECUTOR: esegue un singolo step
self.executor_chain = (
ChatPromptTemplate.from_template("""
Esegui questo step del piano:
Step: {step}
Risultati steps precedenti: {previous_results}
Se devi usare un tool, fornisci l'input necessario in modo preciso.
Descrivi brevemente cosa hai fatto e il risultato ottenuto.""")
| llm
| StrOutputParser()
)
# RESPONDER: sintesi finale
self.responder_chain = (
ChatPromptTemplate.from_template("""
Task originale: {task}
Piano eseguito con questi risultati:
{step_results}
Sintetizza i risultati in una risposta completa e strutturata per l'utente.""")
| llm
| StrOutputParser()
)
def _parse_plan(self, plan_json: str) -> List[Dict]:
"""Parse del piano JSON"""
try:
# Rimuovi eventuali backtick o prefissi markdown
clean = plan_json.strip().strip('`').strip()
if clean.startswith('json'):
clean = clean[4:].strip()
return json.loads(clean)
except json.JSONDecodeError:
# Fallback: crea un piano semplice
return [{"step_id": 1, "description": "Esegui il task", "tool": "llm", "depends_on": []}]
def _execute_step(self, step: Dict, previous_results: Dict) -> str:
"""Esegui un singolo step del piano"""
tool_name = step.get("tool", "llm")
if tool_name != "llm" and tool_name in self.tools:
# Usa il tool specifico
try:
tool_input = step.get("tool_input", step["description"])
result = self.tools[tool_name](tool_input)
return f"Tool {tool_name} eseguito: {result}"
except Exception as e:
return f"Errore nel tool {tool_name}: {str(e)}"
else:
# Ragionamento puro con LLM
return self.executor_chain.invoke({
"step": step["description"],
"previous_results": json.dumps(previous_results, ensure_ascii=False)
})
def run(self, task: str) -> dict:
"""Esegui il task con Plan-and-Execute"""
print(f"\nTask: {task}\n")
# STEP 1: Pianificazione
available_tools = list(self.tools.keys())
plan_json = self.planner_chain.invoke({
"task": task,
"available_tools": ", ".join(available_tools)
})
plan = self._parse_plan(plan_json)
print(f"Piano creato: {len(plan)} steps")
# STEP 2: Esecuzione sequenziale degli step
step_results = {}
completed_steps = set()
# Ordina per dipendenze (topological sort semplificato)
for step in plan:
step_id = step["step_id"]
depends_on = step.get("depends_on", [])
# Aspetta che i prerequisiti siano completati
while not all(str(d) in completed_steps for d in depends_on):
import time; time.sleep(0.1) # In pratica usa async
print(f" Esecuzione step {step_id}: {step['description'][:60]}...")
result = self._execute_step(step, step_results)
step_results[str(step_id)] = result
completed_steps.add(str(step_id))
# STEP 3: Sintesi finale
final_response = self.responder_chain.invoke({
"task": task,
"step_results": json.dumps(step_results, ensure_ascii=False, indent=2)
})
return {
"plan": plan,
"step_results": step_results,
"final_response": final_response
}
# Utilizzo
tools_dict = {
"search_web": lambda q: f"Risultati ricerca: {q}",
"analyze_data": lambda d: f"Analisi completata per: {d}",
"generate_report": lambda t: f"Report generato: {t}",
}
agent = PlanAndExecuteAgent(llm=llm, tools_dict=tools_dict)
result = agent.run("Analizza i dati di vendita 2024 e prepara un report esecutivo")
print(result["final_response"])
4. LangGraph: Stateful Orchestrace s grafy
LangGraph je knihovna LangChain pro budování multi-agentních systémů stavové založené na směrovaných acyklických grafech (DAG) a grafech s cykly. Umožňuje definovat explicitně řídit tok mezi agenty, spravovat sdílený stav a implementovat zpětnovazební smyčky. Je to nejlepší volba pro komplexní multiagentní systémy ve výrobě.
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, List, Literal
import operator
import json
# ============================================================
# STATO CONDIVISO DEL SISTEMA
# ============================================================
class AgentState(TypedDict):
"""Stato condiviso tra tutti gli agenti nel grafo"""
messages: Annotated[List, operator.add] # Lista messaggi (append-only)
task: str # Task originale
plan: List[str] # Piano di esecuzione
current_step: int # Step corrente
research_results: str # Output agente ricerca
analysis_results: str # Output agente analisi
final_report: str # Report finale
next_agent: str # Prossimo agente da chiamare
error_count: int # Contatore errori
# ============================================================
# AGENTI SPECIALIZZATI
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def supervisor_agent(state: AgentState) -> AgentState:
"""
Agente supervisor: decide quale agente chiamare.
Analizza lo stato corrente e decide il prossimo passo.
"""
system_msg = SystemMessage(content="""Sei un supervisor AI che coordina un team di agenti.
Devi decidere quale agente chiamare in base allo stato corrente del task.
Agenti disponibili:
- research: raccoglie informazioni e dati
- analysis: analizza i dati raccolti
- report_writer: scrive il report finale
- FINISH: il task è completato
Rispondi con UN SOLO nome tra quelli disponibili.""")
messages = [system_msg, HumanMessage(content=f"""
Task: {state['task']}
Research completata: {'Si' if state['research_results'] else 'No'}
Analisi completata: {'Si' if state['analysis_results'] else 'No'}
Report scritto: {'Si' if state['final_report'] else 'No'}
Quale agente deve agire ora?""")]
response = llm.invoke(messages)
next_agent = response.content.strip().lower()
# Valida la risposta
valid_agents = ["research", "analysis", "report_writer", "finish"]
if next_agent not in valid_agents:
next_agent = "research" # Fallback sicuro
return {"next_agent": next_agent, "messages": [response]}
def research_agent(state: AgentState) -> AgentState:
"""Agente specializzato in ricerca e raccolta dati"""
messages = [
SystemMessage(content="Sei un ricercatore AI. Raccogli dati rilevanti per il task."),
HumanMessage(content=f"Raccogli informazioni per: {state['task']}")
]
response = llm.invoke(messages)
return {
"research_results": response.content,
"messages": [response]
}
def analysis_agent(state: AgentState) -> AgentState:
"""Agente specializzato in analisi dei dati"""
messages = [
SystemMessage(content="Sei un analista AI. Analizza i dati raccolti."),
HumanMessage(content=f"""
Dati da analizzare:
{state['research_results']}
Task originale: {state['task']}
Fornisci un'analisi strutturata.""")
]
response = llm.invoke(messages)
return {
"analysis_results": response.content,
"messages": [response]
}
def report_writer_agent(state: AgentState) -> AgentState:
"""Agente specializzato nella scrittura di report"""
messages = [
SystemMessage(content="Sei un writer AI specializzato in report tecnici."),
HumanMessage(content=f"""
Task: {state['task']}
Ricerca: {state['research_results']}
Analisi: {state['analysis_results']}
Scrivi un report professionale e completo.""")
]
response = llm.invoke(messages)
return {
"final_report": response.content,
"messages": [response]
}
# ============================================================
# ROUTING FUNCTION
# ============================================================
def route_to_agent(state: AgentState) -> Literal["research", "analysis", "report_writer", END]:
"""Funzione di routing: decide il nodo successivo nel grafo"""
next_agent = state.get("next_agent", "research")
if next_agent == "finish" or state.get("error_count", 0) > 5:
return END
return next_agent
# ============================================================
# COSTRUZIONE DEL GRAFO
# ============================================================
# Crea il grafo stateful
workflow = StateGraph(AgentState)
# Aggiungi nodi (agenti)
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("research", research_agent)
workflow.add_node("analysis", analysis_agent)
workflow.add_node("report_writer", report_writer_agent)
# Definisci entry point
workflow.set_entry_point("supervisor")
# Definisci edges condizionali
workflow.add_conditional_edges(
"supervisor",
route_to_agent,
{
"research": "research",
"analysis": "analysis",
"report_writer": "report_writer",
END: END
}
)
# Tutti gli agenti tornano al supervisor
workflow.add_edge("research", "supervisor")
workflow.add_edge("analysis", "supervisor")
workflow.add_edge("report_writer", "supervisor")
# Compila il grafo
app = workflow.compile()
# ============================================================
# ESECUZIONE
# ============================================================
initial_state = {
"task": "Analizza le tendenze del mercato AI nel 2025 e prepara un report esecutivo",
"messages": [],
"plan": [],
"current_step": 0,
"research_results": "",
"analysis_results": "",
"final_report": "",
"next_agent": "",
"error_count": 0
}
# Esegui il grafo
config = {"recursion_limit": 20}
final_state = app.invoke(initial_state, config=config)
print("\n=== REPORT FINALE ===")
print(final_state["final_report"])
5. Komunikace a koordinace mezi agenty
Efektivní komunikace mezi agenty je zásadní. Existují tři hlavní vzory: předávání zpráv (agenti si vyměňují strukturované zprávy), sdílený stát (globálně sdílený stav jako v LangGraph) e tabule (sdílená databáze, kam agenti píší a čtou).
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from datetime import datetime
import uuid
from enum import Enum
class MessageType(Enum):
TASK = "task" # Assegna un task a un agente
RESULT = "result" # Restituisce il risultato di un task
ERROR = "error" # Segnala un errore
STATUS = "status" # Aggiornamento di stato
QUERY = "query" # Richiesta di informazioni
RESPONSE = "response" # Risposta a una query
@dataclass
class AgentMessage:
"""Messaggio strutturato per comunicazione inter-agent"""
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
from_agent: str = ""
to_agent: str = ""
message_type: MessageType = MessageType.TASK
content: Any = None
metadata: Dict = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
correlation_id: Optional[str] = None # Per tracciare conversazioni
def is_response_to(self, original: 'AgentMessage') -> bool:
return self.correlation_id == original.message_id
def create_response(self, content: Any, message_type: MessageType = MessageType.RESULT):
"""Crea una risposta a questo messaggio"""
return AgentMessage(
from_agent=self.to_agent,
to_agent=self.from_agent,
message_type=message_type,
content=content,
correlation_id=self.message_id
)
class MessageBus:
"""Bus di messaggi per comunicazione asincrona tra agenti"""
def __init__(self):
self.queues: Dict[str, List[AgentMessage]] = {}
self.history: List[AgentMessage] = []
def register_agent(self, agent_id: str):
"""Registra un agente nel bus"""
self.queues[agent_id] = []
def send(self, message: AgentMessage):
"""Invia un messaggio a un agente"""
if message.to_agent not in self.queues:
self.queues[message.to_agent] = []
self.queues[message.to_agent].append(message)
self.history.append(message)
def receive(self, agent_id: str) -> Optional[AgentMessage]:
"""Ricevi il primo messaggio dalla coda di un agente"""
if agent_id in self.queues and self.queues[agent_id]:
return self.queues[agent_id].pop(0)
return None
def broadcast(self, message: AgentMessage, agents: List[str]):
"""Invia lo stesso messaggio a più agenti"""
for agent_id in agents:
msg_copy = AgentMessage(
from_agent=message.from_agent,
to_agent=agent_id,
message_type=message.message_type,
content=message.content,
metadata=message.metadata,
correlation_id=message.message_id
)
self.send(msg_copy)
def get_conversation(self, correlation_id: str) -> List[AgentMessage]:
"""Ottieni tutti i messaggi di una conversazione"""
return [m for m in self.history if
m.message_id == correlation_id or
m.correlation_id == correlation_id]
# Esempio utilizzo
bus = MessageBus()
bus.register_agent("supervisor")
bus.register_agent("research")
bus.register_agent("analysis")
# Supervisor assegna task alla ricerca
task_msg = AgentMessage(
from_agent="supervisor",
to_agent="research",
message_type=MessageType.TASK,
content="Ricerca tendenze AI 2025",
metadata={"priority": "high", "deadline": "2025-03-01"}
)
bus.send(task_msg)
# Research riceve e risponde
received = bus.receive("research")
if received:
result_msg = received.create_response(
content="Trovate 50 fonti rilevanti sulle tendenze AI 2025",
message_type=MessageType.RESULT
)
bus.send(result_msg)
6. Správa a monitorování chyb
Multiagentní systémy zavádějí nové vektory selhání: uvíznutí agenti smyčky, komunikační chyby, chybné výstupy, nadměrné latence. Je to zásadní implementovat jistič, timeout a specifické monitorování.
import asyncio
import time
from functools import wraps
from typing import Callable, TypeVar
from enum import Enum
T = TypeVar('T')
class CircuitState(Enum):
CLOSED = "closed" # Funziona normalmente
OPEN = "open" # Bloccato (troppi errori)
HALF_OPEN = "half_open" # Test: prova una chiamata
class CircuitBreaker:
"""
Circuit breaker per agenti AI.
Previene cascade failure quando un agente fallisce ripetutamente.
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
half_open_max_calls: int = 1
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.half_open_calls = 0
def __call__(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise Exception(
f"Circuit OPEN: agente non disponibile. "
f"Retry in {self.recovery_timeout - (time.time() - self.last_failure_time):.0f}s"
)
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise Exception("Circuit HALF_OPEN: attendere recovery")
self.half_open_calls += 1
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
return wrapper
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class MultiAgentMonitor:
"""Monitoring per sistemi multi-agent"""
def __init__(self):
self.agent_stats: Dict[str, Dict] = {}
self.task_history: List[Dict] = []
def record_agent_call(
self,
agent_id: str,
task: str,
success: bool,
duration_ms: float,
tokens_used: int = 0
):
"""Registra una chiamata a un agente"""
if agent_id not in self.agent_stats:
self.agent_stats[agent_id] = {
"total_calls": 0, "successes": 0, "failures": 0,
"total_tokens": 0, "total_duration_ms": 0
}
stats = self.agent_stats[agent_id]
stats["total_calls"] += 1
stats["successes" if success else "failures"] += 1
stats["total_tokens"] += tokens_used
stats["total_duration_ms"] += duration_ms
def get_system_health(self) -> dict:
"""Ritorna lo stato di salute del sistema"""
results = {}
for agent_id, stats in self.agent_stats.items():
total = stats["total_calls"]
success_rate = stats["successes"] / total if total > 0 else 0
avg_duration = stats["total_duration_ms"] / total if total > 0 else 0
results[agent_id] = {
"success_rate": success_rate,
"avg_duration_ms": avg_duration,
"total_calls": total,
"status": "healthy" if success_rate > 0.9 else
"degraded" if success_rate > 0.7 else "critical"
}
return results
7. Nejlepší postupy a anti-vzorce
Doporučené postupy pro více agentů
- Princip jediné odpovědnosti: každý agent musí mít jasně definovanou a omezenou roli. Agent, který dělá „všechno“, je těžké odladit a vylepšit.
- Časový limit u každého agenta: žádný agent by neměl mít možnost blokovat systém donekonečna. Nastavte realistické časové limity (10–30 sekund pro agenty LLM).
- Explicitní a serializovatelný stav: k definování stavu použijte TypedDict nebo Pydantic. Musí být serializovatelný (JSON) pro protokolování, ladění a obnovu.
- Jistič pro každého agenta: zabránit selhání kaskády. Pokud agent opakovaně selže, jistič zastaví volání a umožní obnovu.
- Sledujte každé rozhodnutí: zaznamenat každé rozhodnutí nadřízeného s explicitním odůvodněním. Ladění multiagentních systémů beze stop je téměř nemožné.
Anti-vzory, kterým je třeba se vyhnout
- Agenti, kteří jsou příliš obecní: agent „univerzálního asistenta“ není o nic efektivnější než jeden LLM, ale je mnohem dražší a obtížněji se ovládá.
- Žádný iterační limit: bez max_iterations může systém ReAct běžet v nekonečné smyčce a spotřebovávat tokeny a peníze. Vždy si stanovte limit.
- Sdílený proměnlivý globální stav: sdílení proměnlivého stavu mezi agenty zavádí rasové podmínky. Používejte neměnné zprávy nebo centrálně spravovaný stav.
- Slepě důvěřovat výstupu agenta: agent může vyvolat deformovaný nebo halucinační výstup. Vždy ověřte výstup před jeho předáním dalšímu agentovi.
Závěry
Multiagentní systémy představují budoucnost AI aplikované na komplexní problémy. Prozkoumali jsme základní vzorce (ReAct, Plan-and-Execute, Supervisor), LangGraph pro stavovou orchestraci, vzorce komunikace mezi agenty a správa chyb ve výrobě.
Klíčové body:
- Zvolte správnou topologii: potrubí pro lineární toky, supervizor pro dynamické směrování
- ReAct je vynikající pro průzkumné úkoly s nástroji; Plan-and-Execute pro strukturované úkoly
- LangGraph je nejlepší volbou pro složité systémy ve výrobě
- Každý agent musí mít jednu, dobře definovanou roli
- Jističe a časové limity jsou ve výrobě zásadní
- Sledujte každé rozhodnutí nadřízeného pro efektivní ladění
V příštím článku prozkoumáme Rychlé inženýrství ve výrobě: šablony, verzování a systematické testování pro zajištění kvality a konzistence časem.
Série pokračuje
- Článek 7: Správa kontextových oken
- Článek 8: Multiagentní systémy (aktuální)
- Článek 9: Rychlé inženýrství ve výrobě
- Článek 10: Diagramy znalostí pro umělou inteligenci
Další informace: MLOps: Model Serving e LangChain pro RAG.







