Systemy wieloagentowe: orkiestracja i komunikacja
I systemy wieloagentowe to jeden z najbardziej wybuchowych trendów w AI w 2025 roku. Gartner odnotował wzrost zapytań o „systemy wieloagentowe” o 1445%. I kwartał 2024 r. i II kwartał 2025 r. Powód jest prosty: wiele realnych problemów jest zbyt skomplikowanych dla pojedynczego agenta AI. Agent badawczy, który musi zbierać dane, analizować je, napisanie raportu i przesłanie go e-mailem nie jest pojedynczym problemem: są to cztery problemy specjalistów wymagających różnych umiejętności.
W tym artykule budujemy prawdziwe systemy wieloagentowe: od podstawowej architektury (orkiestrator + wyspecjalizowani agenci) do zaawansowanych wzorców takich jak Zareagować (Rozumowanie + działanie), Zaplanuj i wykonaj, Przełożony-Pracownik e LangGraf do orkiestracji stanowy. Każda sekcja zawiera działający kod Pythona i wzorce przetestowane na rzeczywistych problemach.
Czego się nauczysz
- Architektura i wzorce systemów wieloagentowych
- Wzorzec ReAct: Rozumowanie + Działanie z wywołaniem narzędzia
- Zaplanuj i wykonaj: oddziel planowanie od wykonania
- Wzór nadzorcy: orkiestrator koordynujący wyspecjalizowanych agentów
- LangGraph do orkiestracji stanowej z wykresem kontrolnym
- Komunikacja i koordynacja pomiędzy agentami
- Obsługa błędów i powrót do pracy w systemach wieloagentowych
- Monitorowanie i obserwowalność potoków wieloagentowych
1. Architektura systemów wieloagentowych
System wieloagentowy składa się z wielu agenci (LLM + zestaw narzędzi) którzy współpracują, aby osiągnąć wspólny cel. Kluczem do sukcesu jest rozdzielenie obowiązków: każdy agent specjalizuje się w konkretną domenę i wie, kiedy delegować innym agentom.
TOPOLOGIE MULTI-AGENT:
1. NETWORK (fully connected):
Ogni agente può chiamare qualsiasi altro agente.
Pro: massima flessibilità
Con: difficile da controllare, rischio di loop
A ◄──► B
▲ ▲
└──► C ◄┘
2. SUPERVISOR (star topology):
Un agente centrale orchestra tutti gli altri.
Pro: controllo centralizzato, facile da debuggare
Con: single point of failure, bottleneck
SUPERVISOR
├──► Agent A (ricerca)
├──► Agent B (analisi)
└──► Agent C (report)
3. HIERARCHICAL:
Supervisori multipli organizzati in gerarchia.
Pro: scalabilità, separazione chiara delle responsabilità
Con: latenza aumentata, complessità coordinazione
Manager
├──► Sub-manager A
│ ├──► Worker A1
│ └──► Worker A2
└──► Sub-manager B
├──► Worker B1
└──► Worker B2
4. PIPELINE (sequential):
Ogni agente processa l'output del precedente.
Pro: semplice, deterministico, facile da debuggare
Con: rigido, nessuna retroazione
Input ──► A ──► B ──► C ──► Output
SCELTA DELLA TOPOLOGIA:
- Task ben definiti, ordine chiaro → Pipeline
- Task con routing dinamico → Supervisor
- Problemi complessi con sottoproblemi → Hierarchical
- Ricerca esplorativa → Network (con guardrail)
2. Wzór reakcji: rozumowanie + działanie
Zareagować (Rozumowanie + działanie) to podstawowy wzorzec dla agentów AI z dostępem do narzędzi. Agent zmienia się pomiędzy Myśli (rozważanie, co zrobić), Działanie (wykonanie narzędzia) e Obserwacja (interpretacja wynik), powtarzając cykl aż do uzyskania ostatecznej odpowiedzi.
from langchain.agents import create_react_agent, AgentExecutor
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain import hub
from typing import Optional
import requests
import json
# ============================================================
# DEFINIZIONE DEI TOOL
# ============================================================
@tool
def search_web(query: str) -> str:
"""Cerca informazioni sul web. Usa per trovare dati aggiornati o notizie."""
# Integrazione con Tavily, SerpAPI o simili
# Per semplicità, simuliamo una risposta
return f"Risultati ricerca per '{query}': [risultati simulati per demo]"
@tool
def analyze_data(data: str, analysis_type: str) -> str:
"""
Analizza dati numerici o testuali.
analysis_type può essere: 'summary', 'trend', 'anomaly', 'comparison'
"""
return f"Analisi {analysis_type} completata: [analisi simulata]"
@tool
def generate_report(title: str, sections: str) -> str:
"""
Genera un report strutturato.
sections: JSON con le sezioni del report
"""
return f"Report '{title}' generato con sezioni: {sections}"
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Invia un'email. Usa per comunicare risultati all'utente finale."""
return f"Email inviata a {to} con oggetto '{subject}'"
@tool
def query_database(query: str, database: str = "main") -> str:
"""
Esegue query su database interni.
database: 'main' per il DB principale, 'analytics' per il DW
"""
return f"Query su {database}: [risultati simulati]"
# ============================================================
# CREAZIONE AGENTE REACT
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# Tool set disponibili all'agente
tools = [search_web, analyze_data, generate_report, send_email, query_database]
# Prompt ReAct standard (da LangChain Hub)
react_prompt = hub.pull("hwchase17/react")
# Il prompt include: instructions per pensiero/azione/osservazione
# con schema: "Thought: ...\nAction: tool_name\nAction Input: ...\nObservation: ..."
# Crea l'agente ReAct
agent = create_react_agent(
llm=llm,
tools=tools,
prompt=react_prompt
)
# AgentExecutor: gestisce il loop thought-action-observation
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True, # Mostra ogni step del reasoning
max_iterations=10, # Previeni loop infiniti
early_stopping_method="generate", # Genera risposta se troppi step
handle_parsing_errors=True # Gestisci errori di parsing gracefully
)
# Esempio: task complesso che richiede più step
task = """
Analizza le performance di vendita Q4 2024 dal database analytics,
cerca notizie recenti sul settore, genera un report con raccomandazioni
e invialo a manager@company.com
"""
result = agent_executor.invoke({"input": task})
print(f"\nRisposta finale:\n{result['output']}")
# Output tipico del reasoning ReAct:
# Thought: Devo prima ottenere i dati di vendita dal database
# Action: query_database
# Action Input: {"query": "SELECT * FROM sales WHERE quarter='Q4' AND year=2024"}
# Observation: [dati vendite]
# Thought: Ora cerco notizie recenti sul settore
# Action: search_web
# Action Input: "notizie settore vendite Q4 2024"
# Observation: [risultati]
# Thought: Ho tutti i dati, genero il report
# Action: generate_report
# ...
3. Schemat „zaplanuj i wykonaj”.
Wzór Zaplanuj i wykonaj oddziel planowanie od wykonania: najpierw LLM tworzy szczegółowy plan niezbędnych kroków, a następnie agenci wykonują każdy krok sekwencyjnie (lub równolegle). W porównaniu do ReAct pozwala na wizję bardziej całościowe podejście do problemu i lepsze zarządzanie złożonością.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import List, Dict, Any
import json
class PlanAndExecuteAgent:
"""
Agente Plan-and-Execute:
1. PLANNER: crea un piano step-by-step
2. EXECUTOR: esegue ogni step con i tool disponibili
3. RESPONDER: sintetizza i risultati in una risposta finale
"""
def __init__(self, llm, tools_dict: Dict[str, callable]):
self.llm = llm
self.tools = tools_dict
# PLANNER: LLM ottimizzato per pianificazione
self.planner_chain = (
ChatPromptTemplate.from_template("""
Sei un pianificatore esperto. Crea un piano dettagliato step-by-step per completare
il seguente task.
Tool disponibili: {available_tools}
Task: {task}
Crea un piano come lista JSON di steps. Ogni step deve avere:
- "step_id": numero progressivo (1, 2, 3...)
- "description": descrizione dell'azione
- "tool": nome del tool da usare (o "llm" per ragionamento puro)
- "depends_on": lista di step_id che devono completarsi prima (per parallelismo)
Rispondi SOLO con il JSON, nient'altro.""")
| llm
| StrOutputParser()
)
# EXECUTOR: esegue un singolo step
self.executor_chain = (
ChatPromptTemplate.from_template("""
Esegui questo step del piano:
Step: {step}
Risultati steps precedenti: {previous_results}
Se devi usare un tool, fornisci l'input necessario in modo preciso.
Descrivi brevemente cosa hai fatto e il risultato ottenuto.""")
| llm
| StrOutputParser()
)
# RESPONDER: sintesi finale
self.responder_chain = (
ChatPromptTemplate.from_template("""
Task originale: {task}
Piano eseguito con questi risultati:
{step_results}
Sintetizza i risultati in una risposta completa e strutturata per l'utente.""")
| llm
| StrOutputParser()
)
def _parse_plan(self, plan_json: str) -> List[Dict]:
"""Parse del piano JSON"""
try:
# Rimuovi eventuali backtick o prefissi markdown
clean = plan_json.strip().strip('`').strip()
if clean.startswith('json'):
clean = clean[4:].strip()
return json.loads(clean)
except json.JSONDecodeError:
# Fallback: crea un piano semplice
return [{"step_id": 1, "description": "Esegui il task", "tool": "llm", "depends_on": []}]
def _execute_step(self, step: Dict, previous_results: Dict) -> str:
"""Esegui un singolo step del piano"""
tool_name = step.get("tool", "llm")
if tool_name != "llm" and tool_name in self.tools:
# Usa il tool specifico
try:
tool_input = step.get("tool_input", step["description"])
result = self.tools[tool_name](tool_input)
return f"Tool {tool_name} eseguito: {result}"
except Exception as e:
return f"Errore nel tool {tool_name}: {str(e)}"
else:
# Ragionamento puro con LLM
return self.executor_chain.invoke({
"step": step["description"],
"previous_results": json.dumps(previous_results, ensure_ascii=False)
})
def run(self, task: str) -> dict:
"""Esegui il task con Plan-and-Execute"""
print(f"\nTask: {task}\n")
# STEP 1: Pianificazione
available_tools = list(self.tools.keys())
plan_json = self.planner_chain.invoke({
"task": task,
"available_tools": ", ".join(available_tools)
})
plan = self._parse_plan(plan_json)
print(f"Piano creato: {len(plan)} steps")
# STEP 2: Esecuzione sequenziale degli step
step_results = {}
completed_steps = set()
# Ordina per dipendenze (topological sort semplificato)
for step in plan:
step_id = step["step_id"]
depends_on = step.get("depends_on", [])
# Aspetta che i prerequisiti siano completati
while not all(str(d) in completed_steps for d in depends_on):
import time; time.sleep(0.1) # In pratica usa async
print(f" Esecuzione step {step_id}: {step['description'][:60]}...")
result = self._execute_step(step, step_results)
step_results[str(step_id)] = result
completed_steps.add(str(step_id))
# STEP 3: Sintesi finale
final_response = self.responder_chain.invoke({
"task": task,
"step_results": json.dumps(step_results, ensure_ascii=False, indent=2)
})
return {
"plan": plan,
"step_results": step_results,
"final_response": final_response
}
# Utilizzo
tools_dict = {
"search_web": lambda q: f"Risultati ricerca: {q}",
"analyze_data": lambda d: f"Analisi completata per: {d}",
"generate_report": lambda t: f"Report generato: {t}",
}
agent = PlanAndExecuteAgent(llm=llm, tools_dict=tools_dict)
result = agent.run("Analizza i dati di vendita 2024 e prepara un report esecutivo")
print(result["final_response"])
4. LangGraph: Stateful Orchestration with graphs
LangGraf to biblioteka LangChain do budowy systemów wieloagentowych stanowe w oparciu o skierowane grafy acykliczne (DAG) i grafy z cyklami. Pozwala zdefiniować jawnie kontroluj przepływ między agentami, zarządzaj stanem współdzielonym i wdrożyć pętle informacji zwrotnej. Jest to najlepszy wybór dla złożonych systemów wieloagentowych w produkcji.
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolExecutor
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from typing import TypedDict, Annotated, List, Literal
import operator
import json
# ============================================================
# STATO CONDIVISO DEL SISTEMA
# ============================================================
class AgentState(TypedDict):
"""Stato condiviso tra tutti gli agenti nel grafo"""
messages: Annotated[List, operator.add] # Lista messaggi (append-only)
task: str # Task originale
plan: List[str] # Piano di esecuzione
current_step: int # Step corrente
research_results: str # Output agente ricerca
analysis_results: str # Output agente analisi
final_report: str # Report finale
next_agent: str # Prossimo agente da chiamare
error_count: int # Contatore errori
# ============================================================
# AGENTI SPECIALIZZATI
# ============================================================
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
def supervisor_agent(state: AgentState) -> AgentState:
"""
Agente supervisor: decide quale agente chiamare.
Analizza lo stato corrente e decide il prossimo passo.
"""
system_msg = SystemMessage(content="""Sei un supervisor AI che coordina un team di agenti.
Devi decidere quale agente chiamare in base allo stato corrente del task.
Agenti disponibili:
- research: raccoglie informazioni e dati
- analysis: analizza i dati raccolti
- report_writer: scrive il report finale
- FINISH: il task è completato
Rispondi con UN SOLO nome tra quelli disponibili.""")
messages = [system_msg, HumanMessage(content=f"""
Task: {state['task']}
Research completata: {'Si' if state['research_results'] else 'No'}
Analisi completata: {'Si' if state['analysis_results'] else 'No'}
Report scritto: {'Si' if state['final_report'] else 'No'}
Quale agente deve agire ora?""")]
response = llm.invoke(messages)
next_agent = response.content.strip().lower()
# Valida la risposta
valid_agents = ["research", "analysis", "report_writer", "finish"]
if next_agent not in valid_agents:
next_agent = "research" # Fallback sicuro
return {"next_agent": next_agent, "messages": [response]}
def research_agent(state: AgentState) -> AgentState:
"""Agente specializzato in ricerca e raccolta dati"""
messages = [
SystemMessage(content="Sei un ricercatore AI. Raccogli dati rilevanti per il task."),
HumanMessage(content=f"Raccogli informazioni per: {state['task']}")
]
response = llm.invoke(messages)
return {
"research_results": response.content,
"messages": [response]
}
def analysis_agent(state: AgentState) -> AgentState:
"""Agente specializzato in analisi dei dati"""
messages = [
SystemMessage(content="Sei un analista AI. Analizza i dati raccolti."),
HumanMessage(content=f"""
Dati da analizzare:
{state['research_results']}
Task originale: {state['task']}
Fornisci un'analisi strutturata.""")
]
response = llm.invoke(messages)
return {
"analysis_results": response.content,
"messages": [response]
}
def report_writer_agent(state: AgentState) -> AgentState:
"""Agente specializzato nella scrittura di report"""
messages = [
SystemMessage(content="Sei un writer AI specializzato in report tecnici."),
HumanMessage(content=f"""
Task: {state['task']}
Ricerca: {state['research_results']}
Analisi: {state['analysis_results']}
Scrivi un report professionale e completo.""")
]
response = llm.invoke(messages)
return {
"final_report": response.content,
"messages": [response]
}
# ============================================================
# ROUTING FUNCTION
# ============================================================
def route_to_agent(state: AgentState) -> Literal["research", "analysis", "report_writer", END]:
"""Funzione di routing: decide il nodo successivo nel grafo"""
next_agent = state.get("next_agent", "research")
if next_agent == "finish" or state.get("error_count", 0) > 5:
return END
return next_agent
# ============================================================
# COSTRUZIONE DEL GRAFO
# ============================================================
# Crea il grafo stateful
workflow = StateGraph(AgentState)
# Aggiungi nodi (agenti)
workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("research", research_agent)
workflow.add_node("analysis", analysis_agent)
workflow.add_node("report_writer", report_writer_agent)
# Definisci entry point
workflow.set_entry_point("supervisor")
# Definisci edges condizionali
workflow.add_conditional_edges(
"supervisor",
route_to_agent,
{
"research": "research",
"analysis": "analysis",
"report_writer": "report_writer",
END: END
}
)
# Tutti gli agenti tornano al supervisor
workflow.add_edge("research", "supervisor")
workflow.add_edge("analysis", "supervisor")
workflow.add_edge("report_writer", "supervisor")
# Compila il grafo
app = workflow.compile()
# ============================================================
# ESECUZIONE
# ============================================================
initial_state = {
"task": "Analizza le tendenze del mercato AI nel 2025 e prepara un report esecutivo",
"messages": [],
"plan": [],
"current_step": 0,
"research_results": "",
"analysis_results": "",
"final_report": "",
"next_agent": "",
"error_count": 0
}
# Esegui il grafo
config = {"recursion_limit": 20}
final_state = app.invoke(initial_state, config=config)
print("\n=== REPORT FINALE ===")
print(final_state["final_report"])
5. Komunikacja i koordynacja pomiędzy agentami
Skuteczna komunikacja pomiędzy agentami jest kluczowa. Istnieją trzy główne wzorce: przekazywanie wiadomości (agenci wymieniają ustrukturyzowane wiadomości), stan wspólny (globalny stan współdzielony jak w LangGraph) e tablica (wspólna baza danych, w której agenci piszą i czytają).
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from datetime import datetime
import uuid
from enum import Enum
class MessageType(Enum):
TASK = "task" # Assegna un task a un agente
RESULT = "result" # Restituisce il risultato di un task
ERROR = "error" # Segnala un errore
STATUS = "status" # Aggiornamento di stato
QUERY = "query" # Richiesta di informazioni
RESPONSE = "response" # Risposta a una query
@dataclass
class AgentMessage:
"""Messaggio strutturato per comunicazione inter-agent"""
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
from_agent: str = ""
to_agent: str = ""
message_type: MessageType = MessageType.TASK
content: Any = None
metadata: Dict = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
correlation_id: Optional[str] = None # Per tracciare conversazioni
def is_response_to(self, original: 'AgentMessage') -> bool:
return self.correlation_id == original.message_id
def create_response(self, content: Any, message_type: MessageType = MessageType.RESULT):
"""Crea una risposta a questo messaggio"""
return AgentMessage(
from_agent=self.to_agent,
to_agent=self.from_agent,
message_type=message_type,
content=content,
correlation_id=self.message_id
)
class MessageBus:
"""Bus di messaggi per comunicazione asincrona tra agenti"""
def __init__(self):
self.queues: Dict[str, List[AgentMessage]] = {}
self.history: List[AgentMessage] = []
def register_agent(self, agent_id: str):
"""Registra un agente nel bus"""
self.queues[agent_id] = []
def send(self, message: AgentMessage):
"""Invia un messaggio a un agente"""
if message.to_agent not in self.queues:
self.queues[message.to_agent] = []
self.queues[message.to_agent].append(message)
self.history.append(message)
def receive(self, agent_id: str) -> Optional[AgentMessage]:
"""Ricevi il primo messaggio dalla coda di un agente"""
if agent_id in self.queues and self.queues[agent_id]:
return self.queues[agent_id].pop(0)
return None
def broadcast(self, message: AgentMessage, agents: List[str]):
"""Invia lo stesso messaggio a più agenti"""
for agent_id in agents:
msg_copy = AgentMessage(
from_agent=message.from_agent,
to_agent=agent_id,
message_type=message.message_type,
content=message.content,
metadata=message.metadata,
correlation_id=message.message_id
)
self.send(msg_copy)
def get_conversation(self, correlation_id: str) -> List[AgentMessage]:
"""Ottieni tutti i messaggi di una conversazione"""
return [m for m in self.history if
m.message_id == correlation_id or
m.correlation_id == correlation_id]
# Esempio utilizzo
bus = MessageBus()
bus.register_agent("supervisor")
bus.register_agent("research")
bus.register_agent("analysis")
# Supervisor assegna task alla ricerca
task_msg = AgentMessage(
from_agent="supervisor",
to_agent="research",
message_type=MessageType.TASK,
content="Ricerca tendenze AI 2025",
metadata={"priority": "high", "deadline": "2025-03-01"}
)
bus.send(task_msg)
# Research riceve e risponde
received = bus.receive("research")
if received:
result_msg = received.create_response(
content="Trovate 50 fonti rilevanti sulle tendenze AI 2025",
message_type=MessageType.RESULT
)
bus.send(result_msg)
6. Zarządzanie błędami i monitorowanie
Systemy wieloagentowe wprowadzają nowe wektory awarii: zablokowani agenci pętle, błędy komunikacji, zniekształcone wyjścia, nadmierne opóźnienia. To fundamentalne wdrożyć wyłącznik automatyczny, przekroczenie limitu czasu i specjalne monitorowanie.
import asyncio
import time
from functools import wraps
from typing import Callable, TypeVar
from enum import Enum
T = TypeVar('T')
class CircuitState(Enum):
CLOSED = "closed" # Funziona normalmente
OPEN = "open" # Bloccato (troppi errori)
HALF_OPEN = "half_open" # Test: prova una chiamata
class CircuitBreaker:
"""
Circuit breaker per agenti AI.
Previene cascade failure quando un agente fallisce ripetutamente.
"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
half_open_max_calls: int = 1
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = 0
self.half_open_calls = 0
def __call__(self, func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise Exception(
f"Circuit OPEN: agente non disponibile. "
f"Retry in {self.recovery_timeout - (time.time() - self.last_failure_time):.0f}s"
)
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise Exception("Circuit HALF_OPEN: attendere recovery")
self.half_open_calls += 1
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
return wrapper
def _on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class MultiAgentMonitor:
"""Monitoring per sistemi multi-agent"""
def __init__(self):
self.agent_stats: Dict[str, Dict] = {}
self.task_history: List[Dict] = []
def record_agent_call(
self,
agent_id: str,
task: str,
success: bool,
duration_ms: float,
tokens_used: int = 0
):
"""Registra una chiamata a un agente"""
if agent_id not in self.agent_stats:
self.agent_stats[agent_id] = {
"total_calls": 0, "successes": 0, "failures": 0,
"total_tokens": 0, "total_duration_ms": 0
}
stats = self.agent_stats[agent_id]
stats["total_calls"] += 1
stats["successes" if success else "failures"] += 1
stats["total_tokens"] += tokens_used
stats["total_duration_ms"] += duration_ms
def get_system_health(self) -> dict:
"""Ritorna lo stato di salute del sistema"""
results = {}
for agent_id, stats in self.agent_stats.items():
total = stats["total_calls"]
success_rate = stats["successes"] / total if total > 0 else 0
avg_duration = stats["total_duration_ms"] / total if total > 0 else 0
results[agent_id] = {
"success_rate": success_rate,
"avg_duration_ms": avg_duration,
"total_calls": total,
"status": "healthy" if success_rate > 0.9 else
"degraded" if success_rate > 0.7 else "critical"
}
return results
7. Najlepsze praktyki i antywzorce
Najlepsze praktyki dotyczące wielu agentów
- Zasada pojedynczej odpowiedzialności: każdy agent musi mieć jasno określoną i ograniczoną rolę. Agenta, który robi „wszystko”, trudno jest debugować i ulepszać.
- Limit czasu dla każdego agenta: żaden agent nie powinien mieć możliwości blokowania systemu w nieskończoność. Ustaw realistyczne limity czasu (10-30 sekund dla agentów LLM).
- Stan jawny i możliwy do serializacji: użyj TypedDict lub Pydantic, aby zdefiniować stan. Musi umożliwiać serializację (JSON) na potrzeby rejestrowania, debugowania i odzyskiwania.
- Wyłącznik automatyczny dla każdego agenta: zapobiegać awariom kaskady. Jeśli agent wielokrotnie zawodzi, wyłącznik automatyczny przerywa połączenia i umożliwia przywrócenie działania.
- Śledź każdą decyzję: rejestruj każdą decyzję przełożonego z wyraźnym uzasadnieniem. Debugowanie systemów wieloagentowych bez śladów jest prawie niemożliwe.
Anty-wzorce, których należy unikać
- Agenci, którzy są zbyt ogólnikowi: agent „uniwersalny asystent” nie jest bardziej skuteczny niż pojedynczy LLM, ale jest znacznie droższy i trudniejszy do kontrolowania.
- Brak limitu iteracji: bez max_iterations system ReAct może działać w nieskończonej pętli, zużywając tokeny i pieniądze. Zawsze wyznaczaj limit.
- Udostępniony zmienny stan globalny: dzielenie się zmiennym stanem między agentami wprowadza warunki wyścigu. Używaj niezmiennych komunikatów lub stanu zarządzanego centralnie.
- Ślepe zaufanie wynikom agenta: agent może wytwarzać zniekształcone lub halucynacyjne wyniki. Zawsze sprawdzaj wynik przed przekazaniem go następnemu agentowi.
Wnioski
Systemy wieloagentowe reprezentują przyszłość sztucznej inteligencji stosowanej do złożonych problemów. Zbadaliśmy podstawowe wzorce (ReAct, Plan-and-Execute, Supervisor), LangGraph do orkiestracji stanowej, wzorców komunikacji między agentami i zarządzanie błędami na produkcji.
Kluczowe punkty:
- Wybierz właściwą topologię: rurociąg dla przepływów liniowych, nadzorca dla dynamicznego routingu
- ReAct doskonale nadaje się do zadań eksploracyjnych za pomocą narzędzi; Planuj i wykonuj dla ustrukturyzowanych zadań
- LangGraph to najlepszy wybór dla złożonych systemów w produkcji
- Każdy agent musi mieć jedną, dobrze zdefiniowaną rolę
- Wyłączniki automatyczne i limity czasu są niezbędne w produkcji
- Śledź każdą decyzję przełożonego w celu skutecznego debugowania
W następnym artykule omówimy Szybka inżynieria w produkcji: szablon, wersjonowanie i systematyczne testowanie w celu zapewnienia jakości i spójności z biegiem czasu.
Seria trwa
- Artykuł 7: Zarządzanie oknem kontekstowym
- Artykuł 8: Systemy wieloagentowe (obecny)
- Artykuł 9: Szybka inżynieria w produkcji
- Artykuł 10: Grafy wiedzy dotyczące sztucznej inteligencji
Dowiedz się więcej dzięki: MLOps: udostępnianie modeli e LangChain dla RAG.







