Introduzione: Costruire un Sistema di Ricerca Autonoma
In questo tredicesimo articolo della serie sugli agenti AI, passiamo dalla teoria alla pratica costruendo un sistema completo: un Autonomous Research Assistant composto da tre agenti specializzati che collaborano per produrre report di ricerca strutturati e verificati. Questo case study integra tutti i concetti esplorati nei dodici articoli precedenti: orchestrazione multi-agente, memoria condivisa, tool calling avanzato, testing, sicurezza, FinOps e deployment.
Il problema che affrontiamo è concreto e diffuso nel mondo enterprise: un analista deve raccogliere informazioni da fonti multiple sul web, verificarne l'attendibilita, estrarre insight rilevanti e produrre un report strutturato. Questo processo, eseguito manualmente, richiede ore di lavoro. Con un sistema multi-agente ben progettato, possiamo ridurre il tempo a pochi minuti mantenendo un livello di qualità elevato e tracciabile.
Il nostro sistema si basa su un'architettura a tre agenti coordinati tramite i pattern Sequential e Handoff che abbiamo analizzato negli articoli sull'orchestrazione. Ogni agente ha un ruolo preciso, strumenti dedicati e un contratto di input/output ben definito. La comunicazione avviene attraverso uno stato condiviso persistente, e il sistema integra RAG (Retrieval-Augmented Generation) per arricchire l'analisi con documenti interni.
Cosa Imparerai in Questo Articolo
- Come progettare un'architettura multi-agente per la ricerca autonoma con pattern Sequential e Handoff
- Implementazione completa di tre agenti specializzati: Researcher, Analyst e Editor
- Integrazione di RAG con Pinecone per arricchire l'analisi con documenti interni
- Gestione della memoria condivisa tra agenti con knowledge graph
- Strategie di error handling e fallback per sistemi multi-agente
- Deployment con Docker Compose e FastAPI per esporre il sistema via API
- Metriche di performance, accuratezza e costo del sistema in produzione
Architettura del Sistema
L'architettura del nostro Research Assistant si basa su tre agenti disposti in una pipeline sequenziale. Ogni agente riceve l'output del precedente, lo elabora con i propri tool specializzati e produce un output strutturato per il successivo. Tra gli agenti, uno stato condiviso mantiene il contesto completo della ricerca, permettendo a ogni agente di accedere alle informazioni raccolte dalle fasi precedenti.
Flusso del Sistema Multi-Agente
| Fase | Agente | Input | Output | Tools |
|---|---|---|---|---|
| 1 | Researcher | Query utente + parametri | Fonti validate con summary | Web Search, URL Scraper, Source Validator |
| 2 | Analyst | Fonti validate | Findings con confidence score | Cross-Reference, Key Extractor, Contradiction Detector |
| 3 | Editor | Findings strutturati | Report finale formattato | Template Engine, Citation Formatter, Export Generator |
Il Grafo di Orchestrazione
Il sistema utilizza LangGraph per l'orchestrazione. Il grafo principale definisce il flusso sequenziale tra i tre agenti, con edge condizionali che gestiscono i casi di errore e le richieste di chiarimento. L'Analyst, ad esempio, può richiedere al Researcher di cercare fonti aggiuntive se i dati raccolti sono insufficienti o contraddittori.
+------------------+
| User Query |
+--------+---------+
|
v
+--------+---------+
| RESEARCHER |
| (Web Search, |
| URL Scraping, |
| Validation) |
+--------+---------+
|
Fonti validate
|
v
+--------+---------+
+---->| ANALYST |
| | (Cross-Ref, |
| | Extraction, |
| | Fact-Check) |
| +--------+---------+
| |
Richiesta| Findings + Score
fonti | |
aggiuntive v
| +--------+---------+
+-----| ROUTER |
| (Score > 0.7?) |
+--------+---------+
|
Score OK
|
v
+--------+---------+
| EDITOR |
| (Template, |
| Citations, |
| Export) |
+--------+---------+
|
v
+--------+---------+
| Report Finale |
+------------------+
Definizione dello Stato Condiviso
Lo stato condiviso è il cuore della comunicazione tra agenti. Definisce la struttura dei dati che
fluiscono attraverso il grafo, garantendo type safety e tracciabilita. Utilizziamo un
TypedDict per definire esplicitamente ogni campo dello stato.
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import add_messages
from langchain_core.messages import BaseMessage
class Source(TypedDict):
url: str
title: str
content: str
credibility_score: float
domain: str
extraction_date: str
class Finding(TypedDict):
claim: str
evidence: List[str]
source_urls: List[str]
confidence: float
category: str
contradictions: Optional[List[str]]
class ResearchState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
query: str
max_sources: int
sources: List[Source]
findings: List[Finding]
overall_confidence: float
report: Optional[str]
report_format: str
errors: List[str]
iteration_count: int
needs_more_sources: bool
Agent 1: Il Researcher
Il Researcher è l'agente responsabile della raccolta delle fonti. Riceve la query dell'utente, la scompone in sotto-query di ricerca, esegue ricerche sul web tramite la Tavily API, scarica e analizza il contenuto delle pagine trovate e valida la credibilita di ciascuna fonte. L'output è una lista di fonti validate con un summary del contenuto rilevante.
Definizione dei Tool
Il Researcher dispone di tre tool specializzati: uno per la ricerca web, uno per lo scraping dei contenuti delle pagine e uno per la validazione della credibilita delle fonti.
from langchain_core.tools import tool
from tavily import TavilyClient
import httpx
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import json
tavily_client = TavilyClient(api_key="tvly-...")
@tool
def web_search(query: str, max_results: int = 5) -> str:
"""Cerca informazioni sul web per una query specifica.
Args:
query: La query di ricerca da eseguire
max_results: Numero massimo di risultati (default: 5)
Returns:
JSON con i risultati della ricerca inclusi URL, titolo e snippet
"""
response = tavily_client.search(
query=query,
max_results=max_results,
search_depth="advanced",
include_raw_content=True,
include_domains=["arxiv.org", "github.com", "medium.com",
"techcrunch.com", "reuters.com"]
)
results = []
for r in response.get("results", []):
results.append({
"url": r["url"],
"title": r["title"],
"snippet": r["content"][:500],
"raw_content": r.get("raw_content", "")[:2000],
"score": r.get("score", 0.0)
})
return json.dumps(results, indent=2)
@tool
def scrape_url(url: str) -> str:
"""Scarica e analizza il contenuto di una pagina web.
Args:
url: L'URL della pagina da analizzare
Returns:
Il testo estratto dalla pagina (max 3000 caratteri)
"""
try:
headers = {"User-Agent": "ResearchBot/1.0"}
response = httpx.get(url, headers=headers, timeout=15.0,
follow_redirects=True)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
return text[:3000]
except Exception as e:
return f"Errore durante lo scraping di {url}: {str(e)}"
@tool
def validate_source(url: str, title: str, content_snippet: str) -> str:
"""Valida la credibilita di una fonte basandosi su dominio e contenuto.
Args:
url: L'URL della fonte
title: Il titolo della pagina
content_snippet: Un estratto del contenuto
Returns:
JSON con il punteggio di credibilita e la motivazione
"""
domain = urlparse(url).netloc.lower()
high_credibility = ["arxiv.org", "nature.com", "science.org",
"ieee.org", "acm.org", "gov", ".edu"]
medium_credibility = ["github.com", "medium.com", "techcrunch.com",
"reuters.com", "bloomberg.com"]
score = 0.5
reasons = []
for hc in high_credibility:
if hc in domain:
score = 0.9
reasons.append(f"Dominio accademico/istituzionale: {domain}")
break
else:
for mc in medium_credibility:
if mc in domain:
score = 0.7
reasons.append(f"Dominio tecnico riconosciuto: {domain}")
break
if len(content_snippet) > 200:
score += 0.05
reasons.append("Contenuto sostanziale presente")
if title and len(title) > 10:
score += 0.02
reasons.append("Titolo descrittivo presente")
score = min(score, 1.0)
return json.dumps({
"url": url,
"credibility_score": round(score, 2),
"reasons": reasons,
"domain": domain
})
Definizione dell'Agente Researcher
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
researcher_llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
researcher_system_prompt = """Sei un Research Agent specializzato nella
raccolta di fonti attendibili dal web.
OBIETTIVO: Data una query di ricerca, trova e valida fonti di alta qualità.
PROCESSO:
1. Analizza la query e identifica 2-3 sotto-query specifiche
2. Per ogni sotto-query, usa web_search per trovare risultati
3. Per i risultati più promettenti, usa scrape_url per ottenere il
contenuto completo
4. Usa validate_source per verificare la credibilita di ogni fonte
5. Restituisci SOLO le fonti con credibility_score >= 0.6
OUTPUT: Produci un JSON con la lista delle fonti validate, ciascuna con:
- url, title, content (summary del contenuto rilevante)
- credibility_score, domain, extraction_date
NON inventare informazioni. Se non trovi fonti sufficienti, segnalalo."""
researcher_tools = [web_search, scrape_url, validate_source]
researcher_agent = create_react_agent(
model=researcher_llm,
tools=researcher_tools,
prompt=researcher_system_prompt
)
Agent 2: L'Analyst
L'Analyst riceve le fonti validate dal Researcher e le analizza in profondità. Il suo obiettivo è estrarre i key findings, verificare la coerenza tra le fonti tramite cross-referencing e identificare eventuali contraddizioni. L'output è una lista di findings strutturati, ciascuno con un confidence score basato sul numero e sulla qualità delle fonti che lo supportano.
Tool dell'Analyst
from collections import Counter
import re
@tool
def cross_reference_check(claim: str, sources_json: str) -> str:
"""Verifica un'affermazione incrociando multiple fonti.
Args:
claim: L'affermazione da verificare
sources_json: JSON con le fonti da analizzare
Returns:
JSON con il risultato della verifica è il numero di fonti
che supportano, contraddicono o non menzionano l'affermazione
"""
sources = json.loads(sources_json)
supporting = []
contradicting = []
neutral = []
claim_keywords = set(claim.lower().split())
for source in sources:
content = source.get("content", "").lower()
keyword_matches = sum(1 for kw in claim_keywords
if kw in content)
match_ratio = keyword_matches / max(len(claim_keywords), 1)
if match_ratio > 0.6:
supporting.append(source["url"])
elif match_ratio > 0.3:
neutral.append(source["url"])
else:
contradicting.append(source["url"])
total = len(sources)
confidence = len(supporting) / max(total, 1)
return json.dumps({
"claim": claim,
"supporting_sources": supporting,
"contradicting_sources": contradicting,
"neutral_sources": neutral,
"confidence": round(confidence, 2),
"verdict": "confirmed" if confidence > 0.6
else "uncertain" if confidence > 0.3
else "unverified"
})
@tool
def extract_key_findings(content: str, topic: str) -> str:
"""Estrae i finding principali da un testo rispetto a un topic.
Args:
content: Il testo da analizzare
topic: L'argomento di riferimento per l'estrazione
Returns:
JSON con i finding estratti e la loro rilevanza
"""
sentences = re.split(r'[.!?]+', content)
topic_keywords = set(topic.lower().split())
findings = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 20:
continue
words = set(sentence.lower().split())
relevance = len(words.intersection(topic_keywords))
relevance = relevance / max(len(topic_keywords), 1)
if relevance > 0.3:
findings.append({
"text": sentence[:200],
"relevance_score": round(relevance, 2)
})
findings.sort(key=lambda x: x["relevance_score"], reverse=True)
return json.dumps(findings[:10])
@tool
def detect_contradictions(findings_json: str) -> str:
"""Identifica contraddizioni tra i findings raccolti.
Args:
findings_json: JSON con i findings da analizzare
Returns:
JSON con le coppie di findings potenzialmente in contraddizione
"""
findings = json.loads(findings_json)
contradictions = []
negation_words = {"not", "no", "never", "non", "without",
"unlike", "contrary", "however", "but",
"decrease", "decline", "drop", "reduce"}
for i, f1 in enumerate(findings):
for f2 in findings[i+1:]:
text1 = set(f1.get("text", "").lower().split())
text2 = set(f2.get("text", "").lower().split())
overlap = text1.intersection(text2)
neg_in_1 = bool(text1.intersection(negation_words))
neg_in_2 = bool(text2.intersection(negation_words))
if len(overlap) > 3 and neg_in_1 != neg_in_2:
contradictions.append({
"finding_1": f1.get("text", "")[:100],
"finding_2": f2.get("text", "")[:100],
"shared_keywords": list(overlap)[:5],
"severity": "high" if len(overlap) > 5 else "medium"
})
return json.dumps(contradictions)
Definizione dell'Agente Analyst
analyst_llm = ChatOpenAI(model="gpt-4o", temperature=0.0)
analyst_system_prompt = """Sei un Analyst Agent specializzato nel fact-checking
e nell'estrazione di insight da fonti multiple.
OBIETTIVO: Analizzare le fonti fornite dal Researcher, estrarre i finding
principali e verificarne la coerenza.
PROCESSO:
1. Per ogni fonte, usa extract_key_findings per estrarre i punti chiave
2. Usa cross_reference_check per verificare ogni finding chiave
3. Usa detect_contradictions per identificare incoerenze
4. Assegna un confidence score complessivo ai findings
OUTPUT: Produci un JSON con:
- findings: lista di finding con claim, evidence, source_urls, confidence
- overall_confidence: media pesata dei confidence score
- contradictions: lista di contraddizioni trovate
- recommendation: "proceed" se confidence > 0.7, "needs_more_sources" altrimenti
Sii critico e rigoroso. Non dare per scontato nulla."""
analyst_tools = [cross_reference_check, extract_key_findings,
detect_contradictions]
analyst_agent = create_react_agent(
model=analyst_llm,
tools=analyst_tools,
prompt=analyst_system_prompt
)
Agent 3: L'Editor
L'Editor è l'agente responsabile della produzione del report finale. Riceve i findings strutturati dall'Analyst, li organizza in una struttura logica, formatta le citazioni secondo standard accademici e genera il report nel formato richiesto dall'utente (Markdown, HTML o JSON). L'Editor non inventa contenuti: si limita a strutturare, contestualizzare e rendere leggibili le informazioni raccolte e verificate dagli agenti precedenti.
Tool dell'Editor
from datetime import datetime
@tool
def apply_report_template(findings_json: str, query: str,
template_type: str = "executive") -> str:
"""Applica un template di report ai findings strutturati.
Args:
findings_json: JSON con i findings da formattare
query: La query originale dell'utente
template_type: Tipo di template (executive, technical, brief)
Returns:
Report formattato in Markdown
"""
findings = json.loads(findings_json)
date = datetime.now().strftime("%Y-%m-%d")
if template_type == "executive":
sections = [
f"# Research Report: {query}",
f"*Generated on {date}*\n",
"## Executive Summary\n",
"## Key Findings\n",
"## Detailed Analysis\n",
"## Sources and References\n",
"## Methodology\n",
"---",
"*This report was generated by an AI Research Assistant.*"
]
elif template_type == "technical":
sections = [
f"# Technical Analysis: {query}",
f"*Report Date: {date}*\n",
"## Abstract\n",
"## Data Sources\n",
"## Analysis\n",
"## Results\n",
"## Limitations\n",
"## References\n"
]
else:
sections = [
f"# Brief: {query}",
f"*{date}*\n",
"## Summary\n",
"## Key Points\n",
"## Sources\n"
]
return "\n".join(sections)
@tool
def format_citations(sources_json: str,
style: str = "apa") -> str:
"""Formatta le citazioni delle fonti secondo uno standard accademico.
Args:
sources_json: JSON con le fonti da citare
style: Stile di citazione (apa, chicago, ieee)
Returns:
Lista di citazioni formattate
"""
sources = json.loads(sources_json)
citations = []
for i, source in enumerate(sources, 1):
title = source.get("title", "Untitled")
url = source.get("url", "")
domain = source.get("domain", "")
date = source.get("extraction_date",
datetime.now().strftime("%Y-%m-%d"))
if style == "apa":
citation = (f"[{i}] {domain}. ({date}). "
f"*{title}*. Retrieved from {url}")
elif style == "ieee":
citation = (f"[{i}] \"{title},\" {domain}, "
f"{date}. [Online]. Available: {url}")
else:
citation = f"[{i}] {title}. {url} ({date})"
citations.append(citation)
return "\n".join(citations)
@tool
def generate_export(report_markdown: str,
output_format: str = "markdown") -> str:
"""Genera l'export finale del report nel formato richiesto.
Args:
report_markdown: Il report in formato Markdown
output_format: Formato di output (markdown, html, json)
Returns:
Il report nel formato specificato
"""
if output_format == "markdown":
return report_markdown
elif output_format == "html":
lines = report_markdown.split("\n")
html_lines = []
for line in lines:
if line.startswith("# "):
html_lines.append(f"<h1>{line[2:]}</h1>")
elif line.startswith("## "):
html_lines.append(f"<h2>{line[3:]}</h2>")
elif line.startswith("- "):
html_lines.append(f"<li>{line[2:]}</li>")
elif line.strip():
html_lines.append(f"<p>{line}</p>")
return "\n".join(html_lines)
elif output_format == "json":
return json.dumps({"report": report_markdown,
"format": output_format,
"generated_at": datetime.now().isoformat()})
return report_markdown
Definizione dell'Agente Editor
editor_llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
editor_system_prompt = """Sei un Editor Agent specializzato nella
produzione di report professionali e ben strutturati.
OBIETTIVO: Trasformare i findings dell'Analyst in un report leggibile
e citato correttamente.
PROCESSO:
1. Usa apply_report_template per creare la struttura del report
2. Popola ogni sezione con i findings pertinenti
3. Usa format_citations per generare la bibliografia
4. Usa generate_export per produrre il formato finale
REGOLE DI STILE:
- Scrivi in modo chiaro e professionale
- Ogni affermazione deve avere una citazione [N]
- Evidenzia il livello di confidenza per ogni finding
- Segnala esplicitamente le aree di incertezza
- Non inventare MAI dati o citazioni non presenti nei findings
OUTPUT: Il report completo nel formato richiesto."""
editor_tools = [apply_report_template, format_citations,
generate_export]
editor_agent = create_react_agent(
model=editor_llm,
tools=editor_tools,
prompt=editor_system_prompt
)
Orchestrazione con LangGraph
Ora assembliamo i tre agenti in un grafo LangGraph. Il grafo definisce il flusso sequenziale con un nodo router tra l'Analyst e l'Editor che verifica se il confidence score complessivo è sufficiente per procedere alla generazione del report, o se servono fonti aggiuntive.
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
def run_researcher(state: ResearchState) -> dict:
"""Nodo Researcher: raccoglie e valida le fonti."""
query = state["query"]
max_sources = state.get("max_sources", 5)
result = researcher_agent.invoke({
"messages": [("human",
f"Cerca fonti per: {query}. "
f"Trova almeno {max_sources} fonti attendibili.")]
})
last_message = result["messages"][-1].content
try:
sources = json.loads(last_message)
except json.JSONDecodeError:
sources = []
return {
"sources": sources,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": result["messages"]
}
def run_analyst(state: ResearchState) -> dict:
"""Nodo Analyst: analizza le fonti e produce findings."""
sources = state["sources"]
result = analyst_agent.invoke({
"messages": [("human",
f"Analizza queste fonti e produci findings strutturati:\n"
f"{json.dumps(sources, indent=2)}")]
})
last_message = result["messages"][-1].content
try:
analysis = json.loads(last_message)
findings = analysis.get("findings", [])
confidence = analysis.get("overall_confidence", 0.0)
needs_more = analysis.get("recommendation") == "needs_more_sources"
except json.JSONDecodeError:
findings = []
confidence = 0.0
needs_more = True
return {
"findings": findings,
"overall_confidence": confidence,
"needs_more_sources": needs_more,
"messages": result["messages"]
}
def run_editor(state: ResearchState) -> dict:
"""Nodo Editor: genera il report finale."""
findings = state["findings"]
query = state["query"]
report_format = state.get("report_format", "markdown")
result = editor_agent.invoke({
"messages": [("human",
f"Genera un report per la query '{query}' "
f"basato su questi findings:\n"
f"{json.dumps(findings, indent=2)}\n"
f"Formato richiesto: {report_format}")]
})
report = result["messages"][-1].content
return {
"report": report,
"messages": result["messages"]
}
def should_continue_or_edit(state: ResearchState) -> str:
"""Router: decide se procedere all'Editor o tornare al Researcher."""
if (state.get("needs_more_sources", False)
and state.get("iteration_count", 0) < 3):
return "researcher"
return "editor"
# Costruzione del grafo
graph = StateGraph(ResearchState)
graph.add_node("researcher", run_researcher)
graph.add_node("analyst", run_analyst)
graph.add_node("editor", run_editor)
graph.add_edge(START, "researcher")
graph.add_edge("researcher", "analyst")
graph.add_conditional_edges(
"analyst",
should_continue_or_edit,
{"researcher": "researcher", "editor": "editor"}
)
graph.add_edge("editor", END)
memory = MemorySaver()
research_app = graph.compile(checkpointer=memory)
Memory Integration
Un aspetto cruciale del nostro sistema è la gestione della memoria condivisa. I tre agenti operano sullo stesso stato, ma abbiamo anche bisogno di una memoria a lungo termine che permetta al sistema di imparare dalle ricerche precedenti, evitare di rivisitare fonti già analizzate e costruire progressivamente un knowledge graph del dominio.
Knowledge Graph Condiviso
Implementiamo un semplice knowledge graph che traccia le entità estratte dalle ricerche, le relazioni tra di esse e i fatti verificati. Questa struttura viene consultata dall'Analyst per arricchire le analisi con contesto storico e dall'Editor per inserire riferimenti incrociati.
from typing import Dict, Set, Tuple
import sqlite3
class KnowledgeGraph:
"""Knowledge graph persistente per il Research Assistant."""
def __init__(self, db_path: str = "research_kg.db"):
self.conn = sqlite3.connect(db_path)
self._create_tables()
def _create_tables(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS entities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
entity_type TEXT,
first_seen TEXT,
mention_count INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS relations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subject_id INTEGER REFERENCES entities(id),
predicate TEXT,
object_id INTEGER REFERENCES entities(id),
confidence REAL,
source_url TEXT
);
CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
statement TEXT,
confidence REAL,
verified_by INTEGER DEFAULT 0,
source_urls TEXT,
created_at TEXT
);
""")
self.conn.commit()
def add_entity(self, name: str, entity_type: str):
self.conn.execute("""
INSERT INTO entities (name, entity_type, first_seen)
VALUES (?, ?, datetime('now'))
ON CONFLICT(name) DO UPDATE
SET mention_count = mention_count + 1
""", (name, entity_type))
self.conn.commit()
def add_relation(self, subject: str, predicate: str,
obj: str, confidence: float,
source_url: str = ""):
self.add_entity(subject, "auto")
self.add_entity(obj, "auto")
sub_id = self.conn.execute(
"SELECT id FROM entities WHERE name = ?",
(subject,)).fetchone()[0]
obj_id = self.conn.execute(
"SELECT id FROM entities WHERE name = ?",
(obj,)).fetchone()[0]
self.conn.execute("""
INSERT INTO relations
(subject_id, predicate, object_id, confidence, source_url)
VALUES (?, ?, ?, ?, ?)
""", (sub_id, predicate, obj_id, confidence, source_url))
self.conn.commit()
def query_entity(self, name: str) -> Dict:
entity = self.conn.execute(
"SELECT * FROM entities WHERE name LIKE ?",
(f"%{name}%",)).fetchone()
if not entity:
return {}
relations = self.conn.execute("""
SELECT e2.name, r.predicate, r.confidence
FROM relations r
JOIN entities e2 ON r.object_id = e2.id
WHERE r.subject_id = ?
""", (entity[0],)).fetchall()
return {
"name": entity[1],
"type": entity[2],
"mentions": entity[4],
"relations": [{"target": r[0], "predicate": r[1],
"confidence": r[2]} for r in relations]
}
RAG Integration
Il nostro Research Assistant integra un sistema RAG (Retrieval-Augmented Generation) che permette all'Analyst di consultare documenti interni e report precedenti archiviati in un vector database. Utilizziamo Pinecone come vector store e OpenAI per la generazione degli embedding.
Pipeline di Retrieval
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = PineconeVectorStore(
index_name="research-assistant",
embedding=embeddings,
namespace="documents"
)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ". ", " "]
)
def index_document(content: str, metadata: dict):
"""Indicizza un documento nel vector store."""
chunks = text_splitter.split_text(content)
documents = [
Document(page_content=chunk, metadata={
**metadata,
"chunk_index": i,
"total_chunks": len(chunks)
})
for i, chunk in enumerate(chunks)
]
vectorstore.add_documents(documents)
def retrieve_relevant(query: str, top_k: int = 5) -> list:
"""Recupera i documenti più rilevanti per una query."""
results = vectorstore.similarity_search_with_score(
query, k=top_k
)
return [
{
"content": doc.page_content,
"metadata": doc.metadata,
"similarity_score": round(float(score), 3)
}
for doc, score in results
]
@tool
def search_internal_knowledge(query: str,
max_results: int = 5) -> str:
"""Cerca nei documenti interni e nei report precedenti.
Args:
query: La query di ricerca
max_results: Numero massimo di risultati
Returns:
JSON con i documenti rilevanti trovati nel knowledge base
"""
results = retrieve_relevant(query, top_k=max_results)
return json.dumps(results, indent=2)
Error Handling e Fallback
In un sistema multi-agente, la gestione degli errori è un aspetto critico. Ogni agente può fallire per motivi diversi: timeout di rete, API non disponibili, contenuti non parsabili, limiti di rate. Il nostro sistema implementa fallback strategies a tre livelli: tool-level, agent-level e system-level.
Strategia di Fallback a Tre Livelli
- Tool-level: ogni tool gestisce internamente i propri errori con try/except, restituendo messaggi di errore strutturati invece di propagare eccezioni. Se una fonte non e raggiungibile, il tool restituisce un errore con il codice HTTP e un suggerimento.
- Agent-level: se un agente non riesce a completare il proprio task, il router può re-indirizzare il flusso. Il Researcher cerca fonti alternative se le prime non sono disponibili. L'Analyst richiede fonti aggiuntive se trova troppe contraddizioni.
- System-level: il grafo ha un limite massimo di iterazioni (3 cicli Researcher-Analyst). Se dopo 3 iterazioni il confidence score resta sotto 0.5, il sistema genera comunque un report con un avviso esplicito sulla bassa affidabilità dei risultati e suggerisce un intervento manuale.
from langgraph.errors import NodeInterrupt
def run_researcher_with_fallback(state: ResearchState) -> dict:
"""Researcher con gestione errori e fallback."""
max_retries = 2
errors = list(state.get("errors", []))
for attempt in range(max_retries):
try:
result = run_researcher(state)
if not result.get("sources"):
errors.append(
f"Attempt {attempt+1}: No sources found"
)
continue
result["errors"] = errors
return result
except Exception as e:
errors.append(
f"Attempt {attempt+1}: {type(e).__name__}: {str(e)}"
)
return {
"sources": [],
"errors": errors,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": state.get("messages", [])
}
def manual_intervention_check(state: ResearchState) -> str:
"""Verifica se serve intervento manuale."""
iteration = state.get("iteration_count", 0)
confidence = state.get("overall_confidence", 0.0)
errors = state.get("errors", [])
if iteration >= 3 and confidence < 0.5:
raise NodeInterrupt(
f"Sistema bloccato dopo {iteration} iterazioni. "
f"Confidence: {confidence:.2f}. "
f"Errori: {len(errors)}. "
f"Intervento manuale richiesto."
)
if len(errors) > 5:
raise NodeInterrupt(
f"Troppi errori accumulati ({len(errors)}). "
f"Verificare la connettivita e i limiti API."
)
return "continue"
Deployment
Per rendere il nostro Research Assistant utilizzabile in produzione, lo impacchettiamo in un container Docker e lo esponiamo come servizio REST tramite FastAPI. Questa architettura permette di scalare orizzontalmente, integrare il sistema con applicazioni esistenti e monitorare le performance in tempo reale.
FastAPI Wrapper
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from uuid import uuid4
import asyncio
app = FastAPI(title="Research Assistant API", version="1.0.0")
class ResearchRequest(BaseModel):
query: str
max_sources: int = 5
report_format: str = "markdown"
class ResearchResponse(BaseModel):
job_id: str
status: str
report: str | None = None
confidence: float | None = None
sources_count: int | None = None
errors: list[str] = []
jobs: dict[str, ResearchResponse] = {}
async def execute_research(job_id: str, request: ResearchRequest):
"""Esegue la ricerca in background."""
try:
config = {"configurable": {"thread_id": job_id}}
initial_state = {
"query": request.query,
"max_sources": request.max_sources,
"report_format": request.report_format,
"messages": [],
"sources": [],
"findings": [],
"overall_confidence": 0.0,
"report": None,
"errors": [],
"iteration_count": 0,
"needs_more_sources": False
}
result = await asyncio.to_thread(
research_app.invoke, initial_state, config
)
jobs[job_id] = ResearchResponse(
job_id=job_id,
status="completed",
report=result.get("report"),
confidence=result.get("overall_confidence"),
sources_count=len(result.get("sources", [])),
errors=result.get("errors", [])
)
except Exception as e:
jobs[job_id] = ResearchResponse(
job_id=job_id,
status="failed",
errors=[str(e)]
)
@app.post("/research", response_model=ResearchResponse)
async def start_research(request: ResearchRequest,
background_tasks: BackgroundTasks):
job_id = str(uuid4())
jobs[job_id] = ResearchResponse(
job_id=job_id, status="processing")
background_tasks.add_task(execute_research, job_id, request)
return jobs[job_id]
@app.get("/research/{job_id}", response_model=ResearchResponse)
async def get_research_status(job_id: str):
if job_id not in jobs:
raise HTTPException(status_code=404,
detail="Job not found")
return jobs[job_id]
Docker Compose
version: "3.9"
services:
research-api:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=






