Introduction: Building an Autonomous Research System
In this thirteenth article of the AI Agents series, we move from theory to practice by building a complete system: an Autonomous Research Assistant composed of three specialized agents that collaborate to produce structured and verified research reports. This case study integrates every concept explored in the previous twelve articles: multi-agent orchestration, shared memory, advanced tool calling, testing, security, FinOps and deployment.
The problem we tackle is concrete and widespread in the enterprise world: an analyst needs to collect information from multiple web sources, verify their reliability, extract relevant insights, and produce a structured report. When performed manually, this process requires hours of work. With a well-designed multi-agent system, we can reduce the time to minutes while maintaining a high and traceable level of quality.
Our system is based on a three-agent architecture coordinated through the Sequential and Handoff patterns we analyzed in the orchestration articles. Each agent has a precise role, dedicated tools, and a well-defined input/output contract. Communication occurs through a shared persistent state, and the system integrates RAG (Retrieval-Augmented Generation) to enrich the analysis with internal documents.
What You Will Learn in This Article
- How to design a multi-agent architecture for autonomous research using Sequential and Handoff patterns
- Complete implementation of three specialized agents: Researcher, Analyst and Editor
- RAG integration with Pinecone to enrich analysis with internal documents
- Shared memory management between agents using a knowledge graph
- Error handling and fallback strategies for multi-agent systems
- Deployment with Docker Compose and FastAPI to expose the system via API
- Performance, accuracy and cost metrics of the system in production
System Architecture
The architecture of our Research Assistant is based on three agents arranged in a sequential pipeline. Each agent receives the output of the previous one, processes it with its own specialized tools, and produces structured output for the next one. Between agents, a shared state maintains the complete research context, allowing each agent to access information collected from previous phases.
Multi-Agent System Flow
| Phase | Agent | Input | Output | Tools |
|---|---|---|---|---|
| 1 | Researcher | User query + parameters | Validated sources with summary | Web Search, URL Scraper, Source Validator |
| 2 | Analyst | Validated sources | Findings with confidence score | Cross-Reference, Key Extractor, Contradiction Detector |
| 3 | Editor | Structured findings | Formatted final report | Template Engine, Citation Formatter, Export Generator |
The Orchestration Graph
The system uses LangGraph for orchestration. The main graph defines the sequential flow between the three agents, with conditional edges that handle error cases and clarification requests. The Analyst, for instance, can request the Researcher to search for additional sources if the collected data is insufficient or contradictory.
+------------------+
| User Query |
+--------+---------+
|
v
+--------+---------+
| RESEARCHER |
| (Web Search, |
| URL Scraping, |
| Validation) |
+--------+---------+
|
Validated sources
|
v
+--------+---------+
+---->| ANALYST |
| | (Cross-Ref, |
| | Extraction, |
| | Fact-Check) |
| +--------+---------+
| |
Request | Findings + Score
for more | |
sources | v
| +--------+---------+
+-----| ROUTER |
| (Score > 0.7?) |
+--------+---------+
|
Score OK
|
v
+--------+---------+
| EDITOR |
| (Template, |
| Citations, |
| Export) |
+--------+---------+
|
v
+--------+---------+
| Final Report |
+------------------+
Shared State Definition
The shared state is the heart of inter-agent communication. It defines the data structure that
flows through the graph, ensuring type safety and traceability. We use a TypedDict
to explicitly define each field in the state.
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import add_messages
from langchain_core.messages import BaseMessage
class Source(TypedDict):
url: str
title: str
content: str
credibility_score: float
domain: str
extraction_date: str
class Finding(TypedDict):
claim: str
evidence: List[str]
source_urls: List[str]
confidence: float
category: str
contradictions: Optional[List[str]]
class ResearchState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
query: str
max_sources: int
sources: List[Source]
findings: List[Finding]
overall_confidence: float
report: Optional[str]
report_format: str
errors: List[str]
iteration_count: int
needs_more_sources: bool
Agent 1: The Researcher
The Researcher is the agent responsible for source collection. It receives the user query, decomposes it into sub-queries, executes web searches through the Tavily API, downloads and analyzes the content of found pages, and validates the credibility of each source. The output is a list of validated sources with a summary of the relevant content.
Tool Definitions
The Researcher has three specialized tools: one for web search, one for page content scraping, and one for source credibility validation.
from langchain_core.tools import tool
from tavily import TavilyClient
import httpx
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import json
tavily_client = TavilyClient(api_key="tvly-...")
@tool
def web_search(query: str, max_results: int = 5) -> str:
"""Search the web for information about a specific query.
Args:
query: The search query to execute
max_results: Maximum number of results (default: 5)
Returns:
JSON with search results including URL, title and snippet
"""
response = tavily_client.search(
query=query,
max_results=max_results,
search_depth="advanced",
include_raw_content=True,
include_domains=["arxiv.org", "github.com", "medium.com",
"techcrunch.com", "reuters.com"]
)
results = []
for r in response.get("results", []):
results.append({
"url": r["url"],
"title": r["title"],
"snippet": r["content"][:500],
"raw_content": r.get("raw_content", "")[:2000],
"score": r.get("score", 0.0)
})
return json.dumps(results, indent=2)
@tool
def scrape_url(url: str) -> str:
"""Download and analyze the content of a web page.
Args:
url: The URL of the page to analyze
Returns:
Extracted text from the page (max 3000 characters)
"""
try:
headers = {"User-Agent": "ResearchBot/1.0"}
response = httpx.get(url, headers=headers, timeout=15.0,
follow_redirects=True)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "header"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
return text[:3000]
except Exception as e:
return f"Error scraping {url}: {str(e)}"
@tool
def validate_source(url: str, title: str, content_snippet: str) -> str:
"""Validate the credibility of a source based on domain and content.
Args:
url: The source URL
title: The page title
content_snippet: An excerpt from the content
Returns:
JSON with credibility score and reasoning
"""
domain = urlparse(url).netloc.lower()
high_credibility = ["arxiv.org", "nature.com", "science.org",
"ieee.org", "acm.org", "gov", ".edu"]
medium_credibility = ["github.com", "medium.com", "techcrunch.com",
"reuters.com", "bloomberg.com"]
score = 0.5
reasons = []
for hc in high_credibility:
if hc in domain:
score = 0.9
reasons.append(f"Academic/institutional domain: {domain}")
break
else:
for mc in medium_credibility:
if mc in domain:
score = 0.7
reasons.append(f"Recognized technical domain: {domain}")
break
if len(content_snippet) > 200:
score += 0.05
reasons.append("Substantial content present")
if title and len(title) > 10:
score += 0.02
reasons.append("Descriptive title present")
score = min(score, 1.0)
return json.dumps({
"url": url,
"credibility_score": round(score, 2),
"reasons": reasons,
"domain": domain
})
Researcher Agent Definition
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
researcher_llm = ChatOpenAI(model="gpt-4o", temperature=0.1)
researcher_system_prompt = """You are a Research Agent specialized in
collecting reliable sources from the web.
OBJECTIVE: Given a research query, find and validate high-quality sources.
PROCESS:
1. Analyze the query and identify 2-3 specific sub-queries
2. For each sub-query, use web_search to find results
3. For the most promising results, use scrape_url to get full content
4. Use validate_source to verify credibility of each source
5. Return ONLY sources with credibility_score >= 0.6
OUTPUT: Produce a JSON with the list of validated sources, each with:
- url, title, content (summary of relevant content)
- credibility_score, domain, extraction_date
Do NOT invent information. If you cannot find enough sources, report it."""
researcher_tools = [web_search, scrape_url, validate_source]
researcher_agent = create_react_agent(
model=researcher_llm,
tools=researcher_tools,
prompt=researcher_system_prompt
)
Agent 2: The Analyst
The Analyst receives the validated sources from the Researcher and analyzes them in depth. Its objective is to extract key findings, verify consistency across sources through cross-referencing, and identify potential contradictions. The output is a list of structured findings, each with a confidence score based on the number and quality of supporting sources.
Analyst Tools
from collections import Counter
import re
@tool
def cross_reference_check(claim: str, sources_json: str) -> str:
"""Verify a claim by cross-referencing multiple sources.
Args:
claim: The claim to verify
sources_json: JSON with sources to analyze
Returns:
JSON with verification result and count of sources
that support, contradict, or don't mention the claim
"""
sources = json.loads(sources_json)
supporting = []
contradicting = []
neutral = []
claim_keywords = set(claim.lower().split())
for source in sources:
content = source.get("content", "").lower()
keyword_matches = sum(1 for kw in claim_keywords
if kw in content)
match_ratio = keyword_matches / max(len(claim_keywords), 1)
if match_ratio > 0.6:
supporting.append(source["url"])
elif match_ratio > 0.3:
neutral.append(source["url"])
else:
contradicting.append(source["url"])
total = len(sources)
confidence = len(supporting) / max(total, 1)
return json.dumps({
"claim": claim,
"supporting_sources": supporting,
"contradicting_sources": contradicting,
"neutral_sources": neutral,
"confidence": round(confidence, 2),
"verdict": "confirmed" if confidence > 0.6
else "uncertain" if confidence > 0.3
else "unverified"
})
@tool
def extract_key_findings(content: str, topic: str) -> str:
"""Extract main findings from text relative to a topic.
Args:
content: The text to analyze
topic: The reference topic for extraction
Returns:
JSON with extracted findings and their relevance
"""
sentences = re.split(r'[.!?]+', content)
topic_keywords = set(topic.lower().split())
findings = []
for sentence in sentences:
sentence = sentence.strip()
if len(sentence) < 20:
continue
words = set(sentence.lower().split())
relevance = len(words.intersection(topic_keywords))
relevance = relevance / max(len(topic_keywords), 1)
if relevance > 0.3:
findings.append({
"text": sentence[:200],
"relevance_score": round(relevance, 2)
})
findings.sort(key=lambda x: x["relevance_score"], reverse=True)
return json.dumps(findings[:10])
@tool
def detect_contradictions(findings_json: str) -> str:
"""Identify contradictions among collected findings.
Args:
findings_json: JSON with findings to analyze
Returns:
JSON with pairs of potentially contradictory findings
"""
findings = json.loads(findings_json)
contradictions = []
negation_words = {"not", "no", "never", "non", "without",
"unlike", "contrary", "however", "but",
"decrease", "decline", "drop", "reduce"}
for i, f1 in enumerate(findings):
for f2 in findings[i+1:]:
text1 = set(f1.get("text", "").lower().split())
text2 = set(f2.get("text", "").lower().split())
overlap = text1.intersection(text2)
neg_in_1 = bool(text1.intersection(negation_words))
neg_in_2 = bool(text2.intersection(negation_words))
if len(overlap) > 3 and neg_in_1 != neg_in_2:
contradictions.append({
"finding_1": f1.get("text", "")[:100],
"finding_2": f2.get("text", "")[:100],
"shared_keywords": list(overlap)[:5],
"severity": "high" if len(overlap) > 5 else "medium"
})
return json.dumps(contradictions)
Analyst Agent Definition
analyst_llm = ChatOpenAI(model="gpt-4o", temperature=0.0)
analyst_system_prompt = """You are an Analyst Agent specialized in
fact-checking and extracting insights from multiple sources.
OBJECTIVE: Analyze the sources provided by the Researcher, extract
key findings and verify their consistency.
PROCESS:
1. For each source, use extract_key_findings to extract key points
2. Use cross_reference_check to verify each key finding
3. Use detect_contradictions to identify inconsistencies
4. Assign an overall confidence score to findings
OUTPUT: Produce a JSON with:
- findings: list of findings with claim, evidence, source_urls, confidence
- overall_confidence: weighted average of confidence scores
- contradictions: list of contradictions found
- recommendation: "proceed" if confidence > 0.7, "needs_more_sources" otherwise
Be critical and rigorous. Do not take anything for granted."""
analyst_tools = [cross_reference_check, extract_key_findings,
detect_contradictions]
analyst_agent = create_react_agent(
model=analyst_llm,
tools=analyst_tools,
prompt=analyst_system_prompt
)
Agent 3: The Editor
The Editor is the agent responsible for producing the final report. It receives the structured findings from the Analyst, organizes them into a logical structure, formats citations according to academic standards, and generates the report in the format requested by the user (Markdown, HTML or JSON). The Editor does not invent content: it simply structures, contextualizes, and makes readable the information collected and verified by the preceding agents.
Editor Tools
from datetime import datetime
@tool
def apply_report_template(findings_json: str, query: str,
template_type: str = "executive") -> str:
"""Apply a report template to structured findings.
Args:
findings_json: JSON with findings to format
query: The original user query
template_type: Template type (executive, technical, brief)
Returns:
Formatted report in Markdown
"""
findings = json.loads(findings_json)
date = datetime.now().strftime("%Y-%m-%d")
if template_type == "executive":
sections = [
f"# Research Report: {query}",
f"*Generated on {date}*\n",
"## Executive Summary\n",
"## Key Findings\n",
"## Detailed Analysis\n",
"## Sources and References\n",
"## Methodology\n",
"---",
"*This report was generated by an AI Research Assistant.*"
]
elif template_type == "technical":
sections = [
f"# Technical Analysis: {query}",
f"*Report Date: {date}*\n",
"## Abstract\n",
"## Data Sources\n",
"## Analysis\n",
"## Results\n",
"## Limitations\n",
"## References\n"
]
else:
sections = [
f"# Brief: {query}",
f"*{date}*\n",
"## Summary\n",
"## Key Points\n",
"## Sources\n"
]
return "\n".join(sections)
@tool
def format_citations(sources_json: str,
style: str = "apa") -> str:
"""Format source citations according to an academic standard.
Args:
sources_json: JSON with sources to cite
style: Citation style (apa, chicago, ieee)
Returns:
List of formatted citations
"""
sources = json.loads(sources_json)
citations = []
for i, source in enumerate(sources, 1):
title = source.get("title", "Untitled")
url = source.get("url", "")
domain = source.get("domain", "")
date = source.get("extraction_date",
datetime.now().strftime("%Y-%m-%d"))
if style == "apa":
citation = (f"[{i}] {domain}. ({date}). "
f"*{title}*. Retrieved from {url}")
elif style == "ieee":
citation = (f"[{i}] \"{title},\" {domain}, "
f"{date}. [Online]. Available: {url}")
else:
citation = f"[{i}] {title}. {url} ({date})"
citations.append(citation)
return "\n".join(citations)
@tool
def generate_export(report_markdown: str,
output_format: str = "markdown") -> str:
"""Generate the final export of the report in the requested format.
Args:
report_markdown: The report in Markdown format
output_format: Output format (markdown, html, json)
Returns:
The report in the specified format
"""
if output_format == "markdown":
return report_markdown
elif output_format == "html":
lines = report_markdown.split("\n")
html_lines = []
for line in lines:
if line.startswith("# "):
html_lines.append(f"<h1>{line[2:]}</h1>")
elif line.startswith("## "):
html_lines.append(f"<h2>{line[3:]}</h2>")
elif line.startswith("- "):
html_lines.append(f"<li>{line[2:]}</li>")
elif line.strip():
html_lines.append(f"<p>{line}</p>")
return "\n".join(html_lines)
elif output_format == "json":
return json.dumps({"report": report_markdown,
"format": output_format,
"generated_at": datetime.now().isoformat()})
return report_markdown
Editor Agent Definition
editor_llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
editor_system_prompt = """You are an Editor Agent specialized in
producing professional and well-structured reports.
OBJECTIVE: Transform the Analyst's findings into a readable and
properly cited report.
PROCESS:
1. Use apply_report_template to create the report structure
2. Populate each section with relevant findings
3. Use format_citations to generate the bibliography
4. Use generate_export to produce the final format
STYLE RULES:
- Write clearly and professionally
- Every claim must have a citation [N]
- Highlight the confidence level for each finding
- Explicitly flag areas of uncertainty
- NEVER invent data or citations not present in the findings
OUTPUT: The complete report in the requested format."""
editor_tools = [apply_report_template, format_citations,
generate_export]
editor_agent = create_react_agent(
model=editor_llm,
tools=editor_tools,
prompt=editor_system_prompt
)
Orchestration with LangGraph
Now we assemble the three agents into a LangGraph graph. The graph defines the sequential flow with a router node between the Analyst and the Editor that checks whether the overall confidence score is sufficient to proceed with report generation, or whether additional sources are needed.
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
def run_researcher(state: ResearchState) -> dict:
"""Researcher node: collects and validates sources."""
query = state["query"]
max_sources = state.get("max_sources", 5)
result = researcher_agent.invoke({
"messages": [("human",
f"Search for sources about: {query}. "
f"Find at least {max_sources} reliable sources.")]
})
last_message = result["messages"][-1].content
try:
sources = json.loads(last_message)
except json.JSONDecodeError:
sources = []
return {
"sources": sources,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": result["messages"]
}
def run_analyst(state: ResearchState) -> dict:
"""Analyst node: analyzes sources and produces findings."""
sources = state["sources"]
result = analyst_agent.invoke({
"messages": [("human",
f"Analyze these sources and produce structured findings:\n"
f"{json.dumps(sources, indent=2)}")]
})
last_message = result["messages"][-1].content
try:
analysis = json.loads(last_message)
findings = analysis.get("findings", [])
confidence = analysis.get("overall_confidence", 0.0)
needs_more = analysis.get("recommendation") == "needs_more_sources"
except json.JSONDecodeError:
findings = []
confidence = 0.0
needs_more = True
return {
"findings": findings,
"overall_confidence": confidence,
"needs_more_sources": needs_more,
"messages": result["messages"]
}
def run_editor(state: ResearchState) -> dict:
"""Editor node: generates the final report."""
findings = state["findings"]
query = state["query"]
report_format = state.get("report_format", "markdown")
result = editor_agent.invoke({
"messages": [("human",
f"Generate a report for the query '{query}' "
f"based on these findings:\n"
f"{json.dumps(findings, indent=2)}\n"
f"Required format: {report_format}")]
})
report = result["messages"][-1].content
return {
"report": report,
"messages": result["messages"]
}
def should_continue_or_edit(state: ResearchState) -> str:
"""Router: decides whether to proceed to Editor or return to Researcher."""
if (state.get("needs_more_sources", False)
and state.get("iteration_count", 0) < 3):
return "researcher"
return "editor"
# Build the graph
graph = StateGraph(ResearchState)
graph.add_node("researcher", run_researcher)
graph.add_node("analyst", run_analyst)
graph.add_node("editor", run_editor)
graph.add_edge(START, "researcher")
graph.add_edge("researcher", "analyst")
graph.add_conditional_edges(
"analyst",
should_continue_or_edit,
{"researcher": "researcher", "editor": "editor"}
)
graph.add_edge("editor", END)
memory = MemorySaver()
research_app = graph.compile(checkpointer=memory)
Memory Integration
A crucial aspect of our system is shared memory management. The three agents operate on the same state, but we also need long-term memory that allows the system to learn from previous searches, avoid revisiting already-analyzed sources, and progressively build a domain knowledge graph.
Shared Knowledge Graph
We implement a simple knowledge graph that tracks entities extracted from searches, relationships between them, and verified facts. This structure is consulted by the Analyst to enrich analyses with historical context and by the Editor to insert cross-references.
from typing import Dict, Set, Tuple
import sqlite3
class KnowledgeGraph:
"""Persistent knowledge graph for the Research Assistant."""
def __init__(self, db_path: str = "research_kg.db"):
self.conn = sqlite3.connect(db_path)
self._create_tables()
def _create_tables(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS entities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE,
entity_type TEXT,
first_seen TEXT,
mention_count INTEGER DEFAULT 1
);
CREATE TABLE IF NOT EXISTS relations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subject_id INTEGER REFERENCES entities(id),
predicate TEXT,
object_id INTEGER REFERENCES entities(id),
confidence REAL,
source_url TEXT
);
CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
statement TEXT,
confidence REAL,
verified_by INTEGER DEFAULT 0,
source_urls TEXT,
created_at TEXT
);
""")
self.conn.commit()
def add_entity(self, name: str, entity_type: str):
self.conn.execute("""
INSERT INTO entities (name, entity_type, first_seen)
VALUES (?, ?, datetime('now'))
ON CONFLICT(name) DO UPDATE
SET mention_count = mention_count + 1
""", (name, entity_type))
self.conn.commit()
def add_relation(self, subject: str, predicate: str,
obj: str, confidence: float,
source_url: str = ""):
self.add_entity(subject, "auto")
self.add_entity(obj, "auto")
sub_id = self.conn.execute(
"SELECT id FROM entities WHERE name = ?",
(subject,)).fetchone()[0]
obj_id = self.conn.execute(
"SELECT id FROM entities WHERE name = ?",
(obj,)).fetchone()[0]
self.conn.execute("""
INSERT INTO relations
(subject_id, predicate, object_id, confidence, source_url)
VALUES (?, ?, ?, ?, ?)
""", (sub_id, predicate, obj_id, confidence, source_url))
self.conn.commit()
def query_entity(self, name: str) -> Dict:
entity = self.conn.execute(
"SELECT * FROM entities WHERE name LIKE ?",
(f"%{name}%",)).fetchone()
if not entity:
return {}
relations = self.conn.execute("""
SELECT e2.name, r.predicate, r.confidence
FROM relations r
JOIN entities e2 ON r.object_id = e2.id
WHERE r.subject_id = ?
""", (entity[0],)).fetchall()
return {
"name": entity[1],
"type": entity[2],
"mentions": entity[4],
"relations": [{"target": r[0], "predicate": r[1],
"confidence": r[2]} for r in relations]
}
RAG Integration
Our Research Assistant integrates a RAG (Retrieval-Augmented Generation) system that allows the Analyst to consult internal documents and previous reports stored in a vector database. We use Pinecone as the vector store and OpenAI for embedding generation.
Retrieval Pipeline
from langchain_openai import OpenAIEmbeddings
from langchain_pinecone import PineconeVectorStore
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = PineconeVectorStore(
index_name="research-assistant",
embedding=embeddings,
namespace="documents"
)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", ". ", " "]
)
def index_document(content: str, metadata: dict):
"""Index a document in the vector store."""
chunks = text_splitter.split_text(content)
documents = [
Document(page_content=chunk, metadata={
**metadata,
"chunk_index": i,
"total_chunks": len(chunks)
})
for i, chunk in enumerate(chunks)
]
vectorstore.add_documents(documents)
def retrieve_relevant(query: str, top_k: int = 5) -> list:
"""Retrieve the most relevant documents for a query."""
results = vectorstore.similarity_search_with_score(
query, k=top_k
)
return [
{
"content": doc.page_content,
"metadata": doc.metadata,
"similarity_score": round(float(score), 3)
}
for doc, score in results
]
@tool
def search_internal_knowledge(query: str,
max_results: int = 5) -> str:
"""Search internal documents and previous reports.
Args:
query: The search query
max_results: Maximum number of results
Returns:
JSON with relevant documents found in the knowledge base
"""
results = retrieve_relevant(query, top_k=max_results)
return json.dumps(results, indent=2)
Error Handling and Fallback
In a multi-agent system, error handling is a critical aspect. Each agent can fail for different reasons: network timeouts, unavailable APIs, unparseable content, rate limits. Our system implements fallback strategies at three levels: tool-level, agent-level, and system-level.
Three-Level Fallback Strategy
- Tool-level: each tool internally handles its own errors with try/except, returning structured error messages instead of propagating exceptions. If a source is unreachable, the tool returns an error with the HTTP code and a suggestion.
- Agent-level: if an agent cannot complete its task, the router can redirect the flow. The Researcher searches for alternative sources if the first ones are unavailable. The Analyst requests additional sources if it finds too many contradictions.
- System-level: the graph has a maximum iteration limit (3 Researcher-Analyst cycles). If after 3 iterations the confidence score remains below 0.5, the system still generates a report with an explicit warning about low reliability and suggests manual intervention.
from langgraph.errors import NodeInterrupt
def run_researcher_with_fallback(state: ResearchState) -> dict:
"""Researcher with error handling and fallback."""
max_retries = 2
errors = list(state.get("errors", []))
for attempt in range(max_retries):
try:
result = run_researcher(state)
if not result.get("sources"):
errors.append(
f"Attempt {attempt+1}: No sources found"
)
continue
result["errors"] = errors
return result
except Exception as e:
errors.append(
f"Attempt {attempt+1}: {type(e).__name__}: {str(e)}"
)
return {
"sources": [],
"errors": errors,
"iteration_count": state.get("iteration_count", 0) + 1,
"messages": state.get("messages", [])
}
def manual_intervention_check(state: ResearchState) -> str:
"""Check if manual intervention is needed."""
iteration = state.get("iteration_count", 0)
confidence = state.get("overall_confidence", 0.0)
errors = state.get("errors", [])
if iteration >= 3 and confidence < 0.5:
raise NodeInterrupt(
f"System stuck after {iteration} iterations. "
f"Confidence: {confidence:.2f}. "
f"Errors: {len(errors)}. "
f"Manual intervention required."
)
if len(errors) > 5:
raise NodeInterrupt(
f"Too many accumulated errors ({len(errors)}). "
f"Check connectivity and API limits."
)
return "continue"
Deployment
To make our Research Assistant usable in production, we package it in a Docker container and expose it as a REST service through FastAPI. This architecture enables horizontal scaling, integration with existing applications, and real-time performance monitoring.
FastAPI Wrapper
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from uuid import uuid4
import asyncio
app = FastAPI(title="Research Assistant API", version="1.0.0")
class ResearchRequest(BaseModel):
query: str
max_sources: int = 5
report_format: str = "markdown"
class ResearchResponse(BaseModel):
job_id: str
status: str
report: str | None = None
confidence: float | None = None
sources_count: int | None = None
errors: list[str] = []
jobs: dict[str, ResearchResponse] = {}
async def execute_research(job_id: str, request: ResearchRequest):
"""Execute research in background."""
try:
config = {"configurable": {"thread_id": job_id}}
initial_state = {
"query": request.query,
"max_sources": request.max_sources,
"report_format": request.report_format,
"messages": [],
"sources": [],
"findings": [],
"overall_confidence": 0.0,
"report": None,
"errors": [],
"iteration_count": 0,
"needs_more_sources": False
}
result = await asyncio.to_thread(
research_app.invoke, initial_state, config
)
jobs[job_id] = ResearchResponse(
job_id=job_id,
status="completed",
report=result.get("report"),
confidence=result.get("overall_confidence"),
sources_count=len(result.get("sources", [])),
errors=result.get("errors", [])
)
except Exception as e:
jobs[job_id] = ResearchResponse(
job_id=job_id,
status="failed",
errors=[str(e)]
)
@app.post("/research", response_model=ResearchResponse)
async def start_research(request: ResearchRequest,
background_tasks: BackgroundTasks):
job_id = str(uuid4())
jobs[job_id] = ResearchResponse(
job_id=job_id, status="processing")
background_tasks.add_task(execute_research, job_id, request)
return jobs[job_id]
@app.get("/research/{job_id}", response_model=ResearchResponse)
async def get_research_status(job_id: str):
if job_id not in jobs:
raise HTTPException(status_code=404,
detail="Job not found")
return jobs[job_id]
Docker Compose
version: "3.9"
services:
research-api:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=






