RAG con PostgreSQL: da Documento a Risposta
Hai mai desiderato che il tuo sistema AI potesse rispondere a domande basandosi su documenti specifici della tua azienda, senza dover addestrare un modello personalizzato? La risposta si chiama Retrieval-Augmented Generation (RAG), ed e una delle architetture più potenti e pratiche dell'AI moderna. E PostgreSQL, con pgvector, e uno degli strumenti migliori per implementarla.
RAG combina due capacità complementari: la ricerca semantica (trovare i documenti più rilevanti per una domanda) con la generazione di linguaggio naturale (produrre una risposta coerente basata su quei documenti). Il risultato e un sistema che risponde con la conoscenza aggiornata dei tuoi dati, non con le conoscenze generali di un modello pre-addestrato.
In questo articolo costruiremo un pipeline RAG completo end-to-end: dall'ingestion dei documenti alla query con risposta generata da GPT-4, tutto su PostgreSQL. Nessun database aggiuntivo, nessun servizio esterno per il vector store.
Panoramica della Serie
| # | Articolo | Focus |
|---|---|---|
| 1 | pgvector | Installazione, operatori, indexing |
| 2 | Embeddings in Profondità | Modelli, distanze, generazione |
| 3 | Sei qui - RAG con PostgreSQL | Pipeline RAG end-to-end |
| 4 | Similarity Search | Algoritmi e ottimizzazione |
| 5 | HNSW e IVFFlat | Strategie di indicizzazione avanzate |
| 6 | RAG in Produzione | Scalabilità e performance |
Cosa Imparerai
- L'architettura completa di un sistema RAG: componenti e flusso dati
- Document ingestion pipeline: caricamento, parsing, chunking
- Strategia di storage in PostgreSQL con pgvector
- Retrieval: dalla query alla selezione dei chunk più rilevanti
- Generation: come costruire il prompt e integrare con GPT-4
- Hybrid search: combinare vector search e full-text search PostgreSQL
- Valutazione della qualità RAG: metriche e strumenti
Architettura RAG: Come Funziona
Un sistema RAG ha due fasi principali che operano in momenti diversi:
Fase 1: Ingestion (offline)
Avviene una volta (o periodicamente quando i documenti cambiano). Il processo e:
- Load: Carica i documenti da file system, URL, database, API
- Parse: Estrai il testo da PDF, DOCX, HTML, Markdown
- Chunk: Dividi il testo in frammenti di dimensione ottimale
- Embed: Genera un vettore embedding per ogni chunk
- Store: Salva chunk + embedding + metadata in PostgreSQL
Fase 2: Retrieval + Generation (online, per ogni query)
- Query: L'utente pone una domanda in linguaggio naturale
- Embed query: Trasforma la domanda in un vettore usando lo stesso modello
- Search: Trova i k chunk più simili in PostgreSQL
- Context: Assembla i chunk trovati come contesto
- Generate: Invia domanda + contesto all'LLM per ottenere la risposta
## Flusso RAG Visualizzato
INGESTION (offline):
Documento PDF
|
v
[Parser] -> Testo grezzo
|
v
[Chunker] -> ["chunk 1", "chunk 2", ..., "chunk N"]
|
v
[Embedding Model] -> [[0.023, -0.841, ...], [0.891, 0.234, ...], ...]
|
v
[PostgreSQL + pgvector] -> Memorizzazione permanente
QUERY (online):
Domanda utente: "Come funziona l'indicizzazione HNSW?"
|
v
[Embedding Model] -> [0.045, -0.823, ...] (query vector)
|
v
[PostgreSQL ANN Search] -> Top 5 chunk più simili
|
v
[Prompt Builder] -> "Usa questo contesto: [chunk1, chunk2, ...] Domanda: ..."
|
v
[GPT-4 / Claude] -> "L'indicizzazione HNSW (Hierarchical Navigable Small World) ..."
|
v
Risposta all'utente
Setup del Progetto
Dipendenze
# requirements.txt
openai>=1.12.0
psycopg2-binary>=2.9.9
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-community>=0.0.20
pypdf>=3.17.0
python-dotenv>=1.0.0
tiktoken>=0.5.0
# Installazione
pip install -r requirements.txt
Configurazione Database
-- Setup iniziale PostgreSQL
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm; -- per full-text search
-- Schema completo per RAG
CREATE TABLE IF NOT EXISTS rag_documents (
id BIGSERIAL PRIMARY KEY,
-- Informazioni sorgente
source_path TEXT NOT NULL,
source_type TEXT NOT NULL CHECK (source_type IN ('pdf', 'txt', 'md', 'html', 'docx')),
source_hash TEXT NOT NULL, -- hash MD5 del file originale
-- Chunk info
chunk_index INTEGER NOT NULL,
chunk_total INTEGER,
-- Contenuto
title TEXT,
content TEXT NOT NULL,
content_length INTEGER GENERATED ALWAYS AS (length(content)) STORED,
-- Embedding
embedding_model TEXT NOT NULL DEFAULT 'text-embedding-3-small',
embedding vector(1536),
-- Metadata
metadata JSONB DEFAULT '{}',
tags TEXT[] DEFAULT '{}',
-- Timestamps
ingested_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (source_path, chunk_index, source_hash)
);
-- Indice HNSW per vector search veloce
CREATE INDEX idx_rag_embedding_hnsw
ON rag_documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Indice GIN per full-text search
CREATE INDEX idx_rag_content_fts
ON rag_documents
USING gin (to_tsvector('english', content));
-- Indice per filtri comuni
CREATE INDEX idx_rag_source_type ON rag_documents (source_type);
CREATE INDEX idx_rag_tags ON rag_documents USING gin (tags);
CREATE INDEX idx_rag_metadata ON rag_documents USING gin (metadata);
Document Ingestion Pipeline
Struttura del Progetto Python
rag_system/
├── config.py # Configurazione DB, API keys, parametri
├── ingestion/
│ ├── __init__.py
│ ├── loaders.py # Caricamento documenti da varie sorgenti
│ ├── parsers.py # Parsing PDF, DOCX, HTML, Markdown
│ ├── chunkers.py # Strategie di chunking
│ └── pipeline.py # Pipeline ingestion orchestrator
├── retrieval/
│ ├── __init__.py
│ ├── embedder.py # Generazione embeddings
│ └── searcher.py # Vector search e hybrid search
├── generation/
│ ├── __init__.py
│ ├── prompts.py # Template prompts
│ └── generator.py # Integrazione LLM
├── rag.py # Classe principale RAGSystem
└── main.py # Entry point
config.py
import os
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
@dataclass
class Config:
# Database
db_host: str = os.getenv("DB_HOST", "localhost")
db_port: int = int(os.getenv("DB_PORT", "5432"))
db_name: str = os.getenv("DB_NAME", "ragdb")
db_user: str = os.getenv("DB_USER", "postgres")
db_password: str = os.getenv("DB_PASSWORD", "")
# OpenAI
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
embedding_model: str = "text-embedding-3-small"
embedding_dim: int = 1536
chat_model: str = "gpt-4o-mini" # cost-effective default
# Chunking
chunk_size: int = 800
chunk_overlap: int = 150
min_chunk_size: int = 100
# Retrieval
top_k: int = 5
similarity_threshold: float = 0.65 # minimum cosine similarity
# Generation
max_context_tokens: int = 8000
temperature: float = 0.1 # low temperature for factual answers
def get_db_url(self) -> str:
return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}"
config = Config()
ingestion/loaders.py - Caricamento Multi-Sorgente
import hashlib
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
import requests
from bs4 import BeautifulSoup
@dataclass
class RawDocument:
content: str
source_path: str
source_type: str
source_hash: str
title: Optional[str] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
def load_text_file(path: str) -> RawDocument:
p = Path(path)
content = p.read_text(encoding="utf-8")
return RawDocument(
content=content,
source_path=path,
source_type="txt",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=p.stem
)
def load_markdown_file(path: str) -> RawDocument:
p = Path(path)
content = p.read_text(encoding="utf-8")
# Estrai titolo dal frontmatter o dalla prima riga H1
title = None
for line in content.split("\n"):
if line.startswith("# "):
title = line[2:].strip()
break
return RawDocument(
content=content,
source_path=path,
source_type="md",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=title
)
def load_pdf_file(path: str) -> RawDocument:
from pypdf import PdfReader
reader = PdfReader(path)
pages = []
for page in reader.pages:
pages.append(page.extract_text())
content = "\n\n".join(pages)
return RawDocument(
content=content,
source_path=path,
source_type="pdf",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=Path(path).stem,
metadata={"pages": len(reader.pages)}
)
def load_url(url: str) -> RawDocument:
response = requests.get(url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Rimuovi script, style, nav
for tag in soup(["script", "style", "nav", "header", "footer"]):
tag.decompose()
content = soup.get_text(separator="\n", strip=True)
title = soup.title.string if soup.title else url
return RawDocument(
content=content,
source_path=url,
source_type="html",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=title
)
def load_document(source: str) -> RawDocument:
"""Smart loader che sceglie il parser corretto."""
if source.startswith("http"):
return load_url(source)
p = Path(source)
loaders = {
".txt": load_text_file,
".md": load_markdown_file,
".pdf": load_pdf_file,
}
loader = loaders.get(p.suffix.lower())
if not loader:
raise ValueError(f"Tipo file non supportato: {p.suffix}")
return loader(source)
ingestion/chunkers.py - Chunking Intelligente
from langchain.text_splitter import RecursiveCharacterTextSplitter
from dataclasses import dataclass
from typing import Optional
@dataclass
class TextChunk:
content: str
chunk_index: int
source_path: str
source_type: str
source_hash: str
title: Optional[str] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class SmartChunker:
"""
Chunker che adatta la strategia al tipo di documento.
"""
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Separatori per testo generico
self._text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " "],
length_function=len
)
# Separatori per markdown (rispetta la struttura)
self._md_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["## ", "# ", "\n\n", "\n", ". "],
length_function=len
)
def chunk(self, doc) -> list[TextChunk]:
"""Chunk un documento, scegliendo la strategia giusta."""
if doc.source_type == "md":
raw_chunks = self._md_splitter.split_text(doc.content)
else:
raw_chunks = self._text_splitter.split_text(doc.content)
# Filtra chunk troppo piccoli
raw_chunks = [c for c in raw_chunks if len(c.strip()) > 100]
return [
TextChunk(
content=chunk.strip(),
chunk_index=i,
source_path=doc.source_path,
source_type=doc.source_type,
source_hash=doc.source_hash,
title=doc.title,
metadata={
**doc.metadata,
"chunk_total": len(raw_chunks),
"char_count": len(chunk)
}
)
for i, chunk in enumerate(raw_chunks)
]
ingestion/pipeline.py - Orchestrator Principale
import psycopg2
from psycopg2.extras import execute_values
import json
import time
from .loaders import load_document
from .chunkers import SmartChunker
class IngestionPipeline:
def __init__(self, config, embedder):
self.config = config
self.embedder = embedder
self.chunker = SmartChunker(
chunk_size=config.chunk_size,
chunk_overlap=config.chunk_overlap
)
self.conn = psycopg2.connect(config.get_db_url())
def is_already_ingested(self, source_path: str, source_hash: str) -> bool:
"""Controlla se il documento e già nel DB con lo stesso hash (non cambiato)."""
with self.conn.cursor() as cur:
cur.execute(
"SELECT COUNT(*) FROM rag_documents WHERE source_path = %s AND source_hash = %s",
(source_path, source_hash)
)
return cur.fetchone()[0] > 0
def ingest(self, source: str, tags: list[str] = None, force: bool = False) -> dict:
"""
Processa un documento e lo inserisce in PostgreSQL.
Ritorna statistiche sull'operazione.
"""
tags = tags or []
start_time = time.time()
# 1. Carica documento
doc = load_document(source)
print(f"Caricato: {source} ({len(doc.content)} chars, hash: {doc.source_hash[:8]})")
# 2. Controlla se già presente (incrementale update)
if not force and self.is_already_ingested(source, doc.source_hash):
print(f" Saltato: documento non modificato")
return {"skipped": True, "source": source}
# 3. Chunking
chunks = self.chunker.chunk(doc)
print(f" Chunking: {len(chunks)} chunk creati")
# 4. Elimina versione precedente (se esiste)
with self.conn.cursor() as cur:
cur.execute("DELETE FROM rag_documents WHERE source_path = %s", (source,))
# 5. Genera embeddings in batch
texts = [c.content for c in chunks]
embeddings = self.embedder.embed_batch(texts)
print(f" Embeddings generati: {len(embeddings)} vettori dim {len(embeddings[0])}")
# 6. Inserisci in PostgreSQL
rows = [
(
c.source_path,
c.source_type,
c.source_hash,
c.chunk_index,
len(chunks), # chunk_total
c.title,
c.content,
self.config.embedding_model,
embeddings[i],
json.dumps(c.metadata),
tags
)
for i, c in enumerate(chunks)
]
with self.conn.cursor() as cur:
execute_values(cur, """
INSERT INTO rag_documents
(source_path, source_type, source_hash, chunk_index, chunk_total,
title, content, embedding_model, embedding, metadata, tags)
VALUES %s
ON CONFLICT (source_path, chunk_index, source_hash) DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
updated_at = NOW()
""", rows, template="(%s,%s,%s,%s,%s,%s,%s,%s,%s::vector,%s::jsonb,%s::text[])")
self.conn.commit()
elapsed = time.time() - start_time
stats = {
"source": source,
"chunks": len(chunks),
"embeddings": len(embeddings),
"elapsed_sec": round(elapsed, 2)
}
print(f" Completato in {elapsed:.1f}s - {stats}")
return stats
def ingest_directory(self, directory: str, extensions: list[str] = None) -> list[dict]:
"""Ingesta tutti i documenti in una directory."""
from pathlib import Path
extensions = extensions or [".txt", ".md", ".pdf"]
results = []
for path in Path(directory).rglob("*"):
if path.suffix.lower() in extensions:
result = self.ingest(str(path))
results.append(result)
return results
Retrieval: Trovare i Chunk Giusti
retrieval/searcher.py
import psycopg2
from dataclasses import dataclass
from typing import Optional
@dataclass
class SearchResult:
id: int
source_path: str
source_type: str
chunk_index: int
title: Optional[str]
content: str
similarity: float
metadata: dict
class HybridSearcher:
"""
Combina vector search (semantica) con full-text search (keyword).
Reciprocal Rank Fusion per merging dei risultati.
"""
def __init__(self, config, embedder):
self.config = config
self.embedder = embedder
self.conn = psycopg2.connect(config.get_db_url())
def vector_search(self, query: str, top_k: int = 10,
source_type: Optional[str] = None,
tags: Optional[list[str]] = None) -> list[SearchResult]:
"""Ricerca semantica con filtri opzionali."""
query_embedding = self.embedder.embed_single(query)
threshold = 1 - self.config.similarity_threshold # converti a cosine distance
# Costruisci query dinamica con filtri opzionali
filters = ["embedding <=> %s::vector < %s"]
params = [query_embedding, threshold]
if source_type:
filters.append("source_type = %s")
params.append(source_type)
if tags:
filters.append("tags && %s::text[]") -- overlap: almeno un tag in comune
params.append(tags)
where_clause = " AND ".join(filters)
with self.conn.cursor() as cur:
cur.execute(f"""
SELECT
id, source_path, source_type, chunk_index, title, content,
1 - (embedding <=> %s::vector) AS similarity,
metadata
FROM rag_documents
WHERE {where_clause}
ORDER BY embedding <=> %s::vector
LIMIT %s
""", [query_embedding] + params + [query_embedding, top_k])
rows = cur.fetchall()
return [
SearchResult(
id=r[0], source_path=r[1], source_type=r[2],
chunk_index=r[3], title=r[4], content=r[5],
similarity=round(r[6], 4), metadata=r[7]
)
for r in rows
]
def fulltext_search(self, query: str, top_k: int = 10) -> list[SearchResult]:
"""Full-text search con ts_rank per ranking."""
with self.conn.cursor() as cur:
cur.execute("""
SELECT
id, source_path, source_type, chunk_index, title, content,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) AS rank,
metadata
FROM rag_documents
WHERE to_tsvector('english', content) @@
plainto_tsquery('english', %s)
ORDER BY rank DESC
LIMIT %s
""", (query, query, top_k))
rows = cur.fetchall()
return [
SearchResult(
id=r[0], source_path=r[1], source_type=r[2],
chunk_index=r[3], title=r[4], content=r[5],
similarity=round(float(r[6]), 4), metadata=r[7]
)
for r in rows
]
def hybrid_search(self, query: str, top_k: int = 5,
vector_weight: float = 0.7) -> list[SearchResult]:
"""
Reciprocal Rank Fusion (RRF) per combinare vector e full-text search.
RRF Score = sum(1 / (k + rank)) per ogni lista di risultati.
"""
k_rrf = 60 # costante RRF standard
# Ottieni entrambi i risultati
vector_results = self.vector_search(query, top_k=top_k * 2)
fts_results = self.fulltext_search(query, top_k=top_k * 2)
# Calcola RRF scores
scores = {}
all_results = {}
for rank, result in enumerate(vector_results):
scores[result.id] = scores.get(result.id, 0) + vector_weight / (k_rrf + rank + 1)
all_results[result.id] = result
fts_weight = 1 - vector_weight
for rank, result in enumerate(fts_results):
scores[result.id] = scores.get(result.id, 0) + fts_weight / (k_rrf + rank + 1)
all_results[result.id] = result
# Ordina per RRF score e prendi top_k
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
final_results = [all_results[id] for id in sorted_ids[:top_k]]
# Aggiorna similarity con RRF score normalizzato
max_score = scores[sorted_ids[0]] if sorted_ids else 1
for result in final_results:
result.similarity = round(scores[result.id] / max_score, 4)
return final_results
Generation: Da Contesto a Risposta
generation/prompts.py
from string import Template
# System prompt che definisce il comportamento dell'AI
RAG_SYSTEM_PROMPT = """Sei un assistente AI preciso e utile. Rispondi alle domande
basandoti ESCLUSIVAMENTE sui documenti di contesto forniti.
Regole:
1. Usa SOLO le informazioni presenti nel contesto. Non inventare.
2. Se la risposta non e nel contesto, dillo chiaramente.
3. Cita le sorgenti usando [Fonte: nome_file, chunk X] dopo ogni affermazione.
4. Mantieni un tono professionale e conciso.
5. Struttura la risposta in modo chiaro con paragrafi o bullet points se appropriato.
"""
def build_rag_prompt(query: str, context_chunks: list, include_sources: bool = True) -> str:
"""
Costruisce il prompt per l'LLM con il contesto recuperato.
Args:
query: La domanda dell'utente
context_chunks: Lista di SearchResult
include_sources: Se includere le informazioni sulla sorgente
Returns:
Il prompt formattato per l'LLM
"""
if not context_chunks:
return f"Domanda: {query}\n\nNota: Non ho trovato documenti rilevanti nel knowledge base."
# Costruisci il contesto con numerazione e sorgente
context_parts = []
for i, chunk in enumerate(context_chunks, 1):
source_info = f"[Fonte: {chunk.source_path}, chunk {chunk.chunk_index}]" if include_sources else ""
context_parts.append(f"--- Documento {i} {source_info} ---\n{chunk.content}")
context_text = "\n\n".join(context_parts)
return f"""Contesto dai documenti:
{context_text}
---
Domanda dell'utente: {query}
Rispondi basandoti sul contesto fornito."""
generation/generator.py
from openai import OpenAI
from dataclasses import dataclass
from typing import Optional
import tiktoken
from .prompts import RAG_SYSTEM_PROMPT, build_rag_prompt
@dataclass
class RAGResponse:
answer: str
sources: list[dict]
model: str
total_tokens: int
prompt_tokens: int
completion_tokens: int
class RAGGenerator:
def __init__(self, config):
self.config = config
self.client = OpenAI(api_key=config.openai_api_key)
self.tokenizer = tiktoken.encoding_for_model("gpt-4o")
def count_tokens(self, text: str) -> int:
return len(self.tokenizer.encode(text))
def truncate_context(self, chunks: list, max_tokens: int) -> list:
"""
Tronca il contesto per non superare il limite di token.
Mantieni i chunk più rilevanti (gia ordinati per similarità).
"""
selected = []
used_tokens = 0
for chunk in chunks:
chunk_tokens = self.count_tokens(chunk.content)
if used_tokens + chunk_tokens > max_tokens:
break
selected.append(chunk)
used_tokens += chunk_tokens
return selected
def generate(self, query: str, context_chunks: list,
stream: bool = False) -> RAGResponse:
"""
Genera una risposta RAG.
Args:
query: La domanda dell'utente
context_chunks: Chunk recuperati da PostgreSQL
stream: Se True, usa streaming (non implementato qui per semplicità)
"""
# Tronca il contesto se necessario
max_context_tokens = self.config.max_context_tokens
truncated_chunks = self.truncate_context(context_chunks, max_context_tokens)
if len(truncated_chunks) < len(context_chunks):
print(f" Contesto troncato: {len(context_chunks)} -> {len(truncated_chunks)} chunk")
# Costruisci il prompt
user_prompt = build_rag_prompt(query, truncated_chunks)
# Chiama l'LLM
response = self.client.chat.completions.create(
model=self.config.chat_model,
messages=[
{"role": "system", "content": RAG_SYSTEM_PROMPT},
{"role": "user", "content": user_prompt}
],
temperature=self.config.temperature,
max_tokens=1500
)
answer = response.choices[0].message.content
usage = response.usage
# Prepara le sorgenti per la risposta
sources = [
{
"source": chunk.source_path,
"chunk_index": chunk.chunk_index,
"similarity": chunk.similarity,
"excerpt": chunk.content[:200] + "..."
}
for chunk in truncated_chunks
]
return RAGResponse(
answer=answer,
sources=sources,
model=self.config.chat_model,
total_tokens=usage.total_tokens,
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens
)
Il Sistema RAG Completo
rag.py - Classe Principale
from config import config, Config
from ingestion.pipeline import IngestionPipeline
from retrieval.searcher import HybridSearcher
from generation.generator import RAGGenerator
class EmbeddingService:
"""Wrapper per generazione embeddings OpenAI."""
def __init__(self, cfg: Config):
from openai import OpenAI
self.client = OpenAI(api_key=cfg.openai_api_key)
self.model = cfg.embedding_model
def embed_single(self, text: str) -> list[float]:
resp = self.client.embeddings.create(
input=[text.replace("\n", " ")],
model=self.model
)
return resp.data[0].embedding
def embed_batch(self, texts: list[str]) -> list[list[float]]:
cleaned = [t.replace("\n", " ").strip() for t in texts]
resp = self.client.embeddings.create(input=cleaned, model=self.model)
return [item.embedding for item in resp.data]
class RAGSystem:
"""
Sistema RAG completo: ingestion + retrieval + generation.
"""
def __init__(self, cfg: Config = None):
self.config = cfg or config
self.embedder = EmbeddingService(self.config)
self.ingestion = IngestionPipeline(self.config, self.embedder)
self.searcher = HybridSearcher(self.config, self.embedder)
self.generator = RAGGenerator(self.config)
def add_document(self, source: str, tags: list[str] = None) -> dict:
"""Aggiunge un documento al knowledge base."""
return self.ingestion.ingest(source, tags=tags)
def add_directory(self, directory: str, extensions: list[str] = None) -> list[dict]:
"""Aggiunge tutti i documenti di una directory."""
return self.ingestion.ingest_directory(directory, extensions)
def ask(self, question: str, use_hybrid: bool = True,
source_type: str = None) -> dict:
"""
Pone una domanda al sistema RAG.
Returns:
dict con answer, sources, usage
"""
# 1. Retrieval
if use_hybrid:
chunks = self.searcher.hybrid_search(question, top_k=self.config.top_k)
else:
chunks = self.searcher.vector_search(
question, top_k=self.config.top_k, source_type=source_type
)
if not chunks:
return {
"answer": "Non ho trovato informazioni rilevanti per rispondere a questa domanda.",
"sources": [],
"retrieval": {"chunks_found": 0}
}
# 2. Generation
response = self.generator.generate(question, chunks)
return {
"answer": response.answer,
"sources": response.sources,
"retrieval": {
"chunks_found": len(chunks),
"top_similarity": chunks[0].similarity if chunks else 0
},
"usage": {
"model": response.model,
"total_tokens": response.total_tokens
}
}
main.py - Utilizzo del Sistema
from rag import RAGSystem
# Inizializza il sistema
rag = RAGSystem()
# --- INGESTION ---
print("=== Aggiungendo documenti al knowledge base ===")
# Aggiungi singoli file
rag.add_document("docs/postgresql_guide.pdf", tags=["postgresql", "database"])
rag.add_document("docs/pgvector_tutorial.md", tags=["pgvector", "vector-search"])
rag.add_document("https://www.postgresql.org/docs/current/", tags=["official-docs"])
# Aggiungi una directory intera
stats = rag.add_directory("docs/", extensions=[".md", ".txt", ".pdf"])
print(f"Ingestati {len(stats)} documenti")
# --- QUERY ---
print("\n=== Interrogando il sistema ===")
questions = [
"Come si installa pgvector su PostgreSQL 16?",
"Qual e la differenza tra HNSW e IVFFlat?",
"Come si ottimizza la memoria per il vector search?",
]
for q in questions:
print(f"\nDomanda: {q}")
print("-" * 60)
result = rag.ask(q)
print(f"Risposta:\n{result['answer']}")
print(f"\nSorgenti utilizzate ({len(result['sources'])}):")
for src in result["sources"]:
print(f" - {src['source']} [similarità: {src['similarity']}]")
print(f"\nToken usati: {result['usage']['total_tokens']}")
Hybrid Search: PostgreSQL Full-Text + Vector
Una delle grandi forze di PostgreSQL per RAG e che puoi combinare in una sola query la ricerca semantica (vector) con la ricerca full-text classica. Questo e particolarmente utile per query che contengono termini tecnici precisi (nomi propri, sigle, versioni software) che la ricerca semantica da sola potrebbe non catturare perfettamente:
-- Hybrid search in SQL puro: vettore + full-text in una query
WITH vector_search AS (
SELECT id, content, source_path, chunk_index,
1 - (embedding <=> %s::vector) AS vector_score,
ROW_NUMBER() OVER (ORDER BY embedding <=> %s::vector) AS vector_rank
FROM rag_documents
ORDER BY embedding <=> %s::vector
LIMIT 20
),
fts_search AS (
SELECT id, content, source_path, chunk_index,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) AS fts_score,
ROW_NUMBER() OVER (
ORDER BY ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) DESC
) AS fts_rank
FROM rag_documents
WHERE to_tsvector('english', content) @@ plainto_tsquery('english', %s)
LIMIT 20
),
-- Reciprocal Rank Fusion
rrf AS (
SELECT
COALESCE(v.id, f.id) AS id,
COALESCE(v.content, f.content) AS content,
COALESCE(v.source_path, f.source_path) AS source_path,
-- RRF score: 0.7 * vector_weight + 0.3 * fts_weight
COALESCE(0.7 / (60 + v.vector_rank), 0) +
COALESCE(0.3 / (60 + f.fts_rank), 0) AS rrf_score
FROM vector_search v
FULL OUTER JOIN fts_search f ON v.id = f.id
)
SELECT id, content, source_path, rrf_score
FROM rrf
ORDER BY rrf_score DESC
LIMIT 5;
Valutazione della qualità RAG
Come misuri se il tuo sistema RAG funziona bene? Le metriche principali sono:
| Metrica | Cosa misura | Target | Come calcolarla |
|---|---|---|---|
| Recall@K | I documenti giusti vengono trovati nei top K risultati | > 0.70 | Test set con ground truth |
| Precision@K | I risultati trovati sono effettivamente rilevanti | > 0.60 | Annotazione manuale |
| Answer Faithfulness | La risposta e supportata dal contesto recuperato | > 0.80 | RAGAS framework |
| Answer Relevancy | La risposta risponde alla domanda posta | > 0.75 | RAGAS framework |
| Latenza P95 | Tempo risposta al 95mo percentile | < 3s | Monitoring in produzione |
# Valutazione con RAGAS
# pip install ragas
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_recall
from datasets import Dataset
# Prepara il dataset di test
test_data = {
"question": [
"Come si crea un indice HNSW in pgvector?",
"Qual e il limite di dimensioni per i vettori in pgvector?",
],
"answer": [
# Risposte generate dal tuo sistema RAG
rag.ask("Come si crea un indice HNSW in pgvector?")["answer"],
rag.ask("Qual e il limite di dimensioni per i vettori in pgvector?")["answer"],
],
"contexts": [
# I chunk recuperati per ciascuna domanda
[c["excerpt"] for c in rag.ask("...")["sources"]],
[c["excerpt"] for c in rag.ask("...")["sources"]],
],
"ground_truth": [
"CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64)",
"Il limite e 16000 dimensioni per vettori di tipo vector in pgvector 0.7+",
]
}
dataset = Dataset.from_dict(test_data)
results = evaluate(dataset, metrics=[faithfulness, answer_relevancy, context_recall])
print(results)
Strategie Avanzate di Chunking
La qualità del chunking e uno dei fattori più importanti per la qualità RAG. Una strategia di chunking mal calibrata può degradare le performance anche con il miglior modello di embedding. Ecco le strategie avanzate per casi d'uso specifici:
Chunking con Overlap Semantico
from langchain.text_splitter import RecursiveCharacterTextSplitter
import re
class SemanticChunker:
"""
Chunker che preserva la coerenza semantica dei paragrafi.
A differenza del semplice chunking per caratteri, questo
rispetta i confini di frase e paragrafo.
"""
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split_by_sentences(self, text: str) -> list[str]:
"""Divide il testo in frasi usando regex."""
# Pattern per fine frase: ., !, ? seguiti da spazio e maiuscola
sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z])', text)
return [s.strip() for s in sentences if s.strip()]
def create_chunks_with_context(self, text: str) -> list[str]:
"""
Crea chunk con context overlap:
ogni chunk include le ultime N parole del chunk precedente.
"""
sentences = self.split_by_sentences(text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
# Se la frase corrente supera da sola il chunk_size, spezzala
if sentence_size > self.chunk_size:
if current_chunk:
chunks.append(" ".join(current_chunk))
# Mantieni overlap: ultime N parole
words = " ".join(current_chunk).split()
overlap_words = words[-30:] # ~150 caratteri overlap
current_chunk = [" ".join(overlap_words)]
current_size = len(" ".join(overlap_words))
# Spezza la frase lunga
splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap
)
for sub in splitter.split_text(sentence):
chunks.append(sub)
continue
# Aggiungi frase al chunk corrente
if current_size + sentence_size + 1 > self.chunk_size and current_chunk:
chunks.append(" ".join(current_chunk))
# Overlap: ultime 30 parole del chunk precedente
words = " ".join(current_chunk).split()
overlap_words = words[-30:]
current_chunk = [" ".join(overlap_words), sentence]
current_size = len(" ".join(current_chunk))
else:
current_chunk.append(sentence)
current_size += sentence_size + 1
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
Chunking per Struttura del Documento (Header-Based)
import re
from typing import Generator
def chunk_by_headers(markdown_text: str, max_chunk_size: int = 800) -> Generator:
"""
Chunking che rispetta la struttura gerarchica dei documenti Markdown.
Ogni sezione H2/H3 diventa un contesto separato, preservando il titolo
come header del chunk (fondamentale per la qualità dell'embedding).
"""
# Regex per trovare header Markdown (H1-H4)
header_pattern = re.compile(r'^(#{1,4})\s+(.+)






