RAG cu PostgreSQL: de la document la răspuns
Ți-ai dorit vreodată ca sistemul tău AI să poată răspunde la întrebări pe baza documentelor? specific companiei tale, fără a fi nevoie să antrenezi un model personalizat? Răspunsul da suna Recuperare-Augmented Generation (RAG), și este una dintre cele mai frumoase arhitecturi tehnologii puternice și practice ale AI moderne. Și PostgreSQL, cu pgvector, este unul dintre cele mai bune instrumente pentru a o implementa.
RAG combină două capabilități complementare: căutare semantică (găsește-l documentele cele mai relevante pentru o cerere) cu generarea limbajului natural (produceți un răspuns coerent pe baza acelor documente). Rezultatul este un sistem care răspunde cu cunoștințe actualizate despre datele dvs., nu cu cunoștințele generale ale unui model pre-antrenat.
În acest articol vom construi o conductă RAG completă de la capăt la capăt: de la ingerarea documentelor la interogarea cu răspuns generat de GPT-4, totul pe PostgreSQL. Fără baze de date suplimentare, nu există servicii externe pentru magazinul de vectori.
Prezentare generală a seriei
| # | Articol | Concentrează-te |
|---|---|---|
| 1 | pgvector | Instalare, operatori, indexare |
| 2 | Înglobări în profunzime | Modele, distante, generatie |
| 3 | Sunteți aici - RAG cu PostgreSQL | Conductă RAG de la capăt la capăt |
| 4 | Căutare de similaritate | Algoritmi si optimizare |
| 5 | HNSW și IVFFlat | Strategii avansate de indexare |
| 6 | RAG în producție | Scalabilitate și performanță |
Ce vei învăța
- Arhitectura completă a unui sistem RAG: componente și flux de date
- Conducta de asimilare a documentelor: încărcare, parsare, fragmentare
- Strategia de stocare în PostgreSQL cu pgvector
- Recuperare: de la interogare la selecția celor mai relevante bucăți
- Generație: cum se construiește promptul și cum se integrează cu GPT-4
- Căutare hibridă: combinați căutarea vectorială și căutarea în text complet PostgreSQL
- Evaluarea calității RAG: metrici și instrumente
Arhitectura RAG: Cum funcționează
Un sistem RAG are două faze principale care funcționează în momente diferite:
Faza 1: Ingestie (offline)
Se întâmplă o dată (sau periodic pe măsură ce documentele se schimbă). Procesul este:
- Încărca: Încărcați documente din sistemul de fișiere, URL, baza de date, API
- Analiza: Extrageți text din PDF, DOCX, HTML, Markdown
- Bucăți: Împărțiți textul în fragmente de dimensiuni optime
- Încorporați: Generați un vector de încorporare pentru fiecare bucată
- Magazin: Salvați bucată + încorporare + metadate în PostgreSQL
Faza 2: Recuperare + Generare (online, pentru fiecare interogare)
- Întrebări: Utilizatorul pune o întrebare în limbaj natural
- Încorporați interogări: Transformați întrebarea într-un vector folosind același model
- Căutare: Găsiți cele mai asemănătoare k bucăți în PostgreSQL
- Context: Asamblați bucățile găsite ca context
- Genera: Trimiteți întrebarea + context la LLM pentru a obține răspunsul
## Flusso RAG Visualizzato
INGESTION (offline):
Documento PDF
|
v
[Parser] -> Testo grezzo
|
v
[Chunker] -> ["chunk 1", "chunk 2", ..., "chunk N"]
|
v
[Embedding Model] -> [[0.023, -0.841, ...], [0.891, 0.234, ...], ...]
|
v
[PostgreSQL + pgvector] -> Memorizzazione permanente
QUERY (online):
Domanda utente: "Come funziona l'indicizzazione HNSW?"
|
v
[Embedding Model] -> [0.045, -0.823, ...] (query vector)
|
v
[PostgreSQL ANN Search] -> Top 5 chunk più simili
|
v
[Prompt Builder] -> "Usa questo contesto: [chunk1, chunk2, ...] Domanda: ..."
|
v
[GPT-4 / Claude] -> "L'indicizzazione HNSW (Hierarchical Navigable Small World) ..."
|
v
Risposta all'utente
Configurarea proiectului
Dependențe
# requirements.txt
openai>=1.12.0
psycopg2-binary>=2.9.9
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-community>=0.0.20
pypdf>=3.17.0
python-dotenv>=1.0.0
tiktoken>=0.5.0
# Installazione
pip install -r requirements.txt
Configurarea bazei de date
-- Setup iniziale PostgreSQL
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm; -- per full-text search
-- Schema completo per RAG
CREATE TABLE IF NOT EXISTS rag_documents (
id BIGSERIAL PRIMARY KEY,
-- Informazioni sorgente
source_path TEXT NOT NULL,
source_type TEXT NOT NULL CHECK (source_type IN ('pdf', 'txt', 'md', 'html', 'docx')),
source_hash TEXT NOT NULL, -- hash MD5 del file originale
-- Chunk info
chunk_index INTEGER NOT NULL,
chunk_total INTEGER,
-- Contenuto
title TEXT,
content TEXT NOT NULL,
content_length INTEGER GENERATED ALWAYS AS (length(content)) STORED,
-- Embedding
embedding_model TEXT NOT NULL DEFAULT 'text-embedding-3-small',
embedding vector(1536),
-- Metadata
metadata JSONB DEFAULT '{}',
tags TEXT[] DEFAULT '{}',
-- Timestamps
ingested_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (source_path, chunk_index, source_hash)
);
-- Indice HNSW per vector search veloce
CREATE INDEX idx_rag_embedding_hnsw
ON rag_documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Indice GIN per full-text search
CREATE INDEX idx_rag_content_fts
ON rag_documents
USING gin (to_tsvector('english', content));
-- Indice per filtri comuni
CREATE INDEX idx_rag_source_type ON rag_documents (source_type);
CREATE INDEX idx_rag_tags ON rag_documents USING gin (tags);
CREATE INDEX idx_rag_metadata ON rag_documents USING gin (metadata);
Conducta de absorbție a documentelor
Structura proiectului Python
rag_system/
├── config.py # Configurazione DB, API keys, parametri
├── ingestion/
│ ├── __init__.py
│ ├── loaders.py # Caricamento documenti da varie sorgenti
│ ├── parsers.py # Parsing PDF, DOCX, HTML, Markdown
│ ├── chunkers.py # Strategie di chunking
│ └── pipeline.py # Pipeline ingestion orchestrator
├── retrieval/
│ ├── __init__.py
│ ├── embedder.py # Generazione embeddings
│ └── searcher.py # Vector search e hybrid search
├── generation/
│ ├── __init__.py
│ ├── prompts.py # Template prompts
│ └── generator.py # Integrazione LLM
├── rag.py # Classe principale RAGSystem
└── main.py # Entry point
config.py
import os
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
@dataclass
class Config:
# Database
db_host: str = os.getenv("DB_HOST", "localhost")
db_port: int = int(os.getenv("DB_PORT", "5432"))
db_name: str = os.getenv("DB_NAME", "ragdb")
db_user: str = os.getenv("DB_USER", "postgres")
db_password: str = os.getenv("DB_PASSWORD", "")
# OpenAI
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
embedding_model: str = "text-embedding-3-small"
embedding_dim: int = 1536
chat_model: str = "gpt-4o-mini" # cost-effective default
# Chunking
chunk_size: int = 800
chunk_overlap: int = 150
min_chunk_size: int = 100
# Retrieval
top_k: int = 5
similarity_threshold: float = 0.65 # minimum cosine similarity
# Generation
max_context_tokens: int = 8000
temperature: float = 0.1 # low temperature for factual answers
def get_db_url(self) -> str:
return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}"
config = Config()
ingestion/loaders.py - Încărcare cu mai multe surse
import hashlib
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
import requests
from bs4 import BeautifulSoup
@dataclass
class RawDocument:
content: str
source_path: str
source_type: str
source_hash: str
title: Optional[str] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
def load_text_file(path: str) -> RawDocument:
p = Path(path)
content = p.read_text(encoding="utf-8")
return RawDocument(
content=content,
source_path=path,
source_type="txt",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=p.stem
)
def load_markdown_file(path: str) -> RawDocument:
p = Path(path)
content = p.read_text(encoding="utf-8")
# Estrai titolo dal frontmatter o dalla prima riga H1
title = None
for line in content.split("\n"):
if line.startswith("# "):
title = line[2:].strip()
break
return RawDocument(
content=content,
source_path=path,
source_type="md",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=title
)
def load_pdf_file(path: str) -> RawDocument:
from pypdf import PdfReader
reader = PdfReader(path)
pages = []
for page in reader.pages:
pages.append(page.extract_text())
content = "\n\n".join(pages)
return RawDocument(
content=content,
source_path=path,
source_type="pdf",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=Path(path).stem,
metadata={"pages": len(reader.pages)}
)
def load_url(url: str) -> RawDocument:
response = requests.get(url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Rimuovi script, style, nav
for tag in soup(["script", "style", "nav", "header", "footer"]):
tag.decompose()
content = soup.get_text(separator="\n", strip=True)
title = soup.title.string if soup.title else url
return RawDocument(
content=content,
source_path=url,
source_type="html",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=title
)
def load_document(source: str) -> RawDocument:
"""Smart loader che sceglie il parser corretto."""
if source.startswith("http"):
return load_url(source)
p = Path(source)
loaders = {
".txt": load_text_file,
".md": load_markdown_file,
".pdf": load_pdf_file,
}
loader = loaders.get(p.suffix.lower())
if not loader:
raise ValueError(f"Tipo file non supportato: {p.suffix}")
return loader(source)
ingestion/chunkers.py - Intelligent Chunking
from langchain.text_splitter import RecursiveCharacterTextSplitter
from dataclasses import dataclass
from typing import Optional
@dataclass
class TextChunk:
content: str
chunk_index: int
source_path: str
source_type: str
source_hash: str
title: Optional[str] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class SmartChunker:
"""
Chunker che adatta la strategia al tipo di documento.
"""
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Separatori per testo generico
self._text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " "],
length_function=len
)
# Separatori per markdown (rispetta la struttura)
self._md_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["## ", "# ", "\n\n", "\n", ". "],
length_function=len
)
def chunk(self, doc) -> list[TextChunk]:
"""Chunk un documento, scegliendo la strategia giusta."""
if doc.source_type == "md":
raw_chunks = self._md_splitter.split_text(doc.content)
else:
raw_chunks = self._text_splitter.split_text(doc.content)
# Filtra chunk troppo piccoli
raw_chunks = [c for c in raw_chunks if len(c.strip()) > 100]
return [
TextChunk(
content=chunk.strip(),
chunk_index=i,
source_path=doc.source_path,
source_type=doc.source_type,
source_hash=doc.source_hash,
title=doc.title,
metadata={
**doc.metadata,
"chunk_total": len(raw_chunks),
"char_count": len(chunk)
}
)
for i, chunk in enumerate(raw_chunks)
]
ingestion/pipeline.py - Orchestrator principal
import psycopg2
from psycopg2.extras import execute_values
import json
import time
from .loaders import load_document
from .chunkers import SmartChunker
class IngestionPipeline:
def __init__(self, config, embedder):
self.config = config
self.embedder = embedder
self.chunker = SmartChunker(
chunk_size=config.chunk_size,
chunk_overlap=config.chunk_overlap
)
self.conn = psycopg2.connect(config.get_db_url())
def is_already_ingested(self, source_path: str, source_hash: str) -> bool:
"""Controlla se il documento e già nel DB con lo stesso hash (non cambiato)."""
with self.conn.cursor() as cur:
cur.execute(
"SELECT COUNT(*) FROM rag_documents WHERE source_path = %s AND source_hash = %s",
(source_path, source_hash)
)
return cur.fetchone()[0] > 0
def ingest(self, source: str, tags: list[str] = None, force: bool = False) -> dict:
"""
Processa un documento e lo inserisce in PostgreSQL.
Ritorna statistiche sull'operazione.
"""
tags = tags or []
start_time = time.time()
# 1. Carica documento
doc = load_document(source)
print(f"Caricato: {source} ({len(doc.content)} chars, hash: {doc.source_hash[:8]})")
# 2. Controlla se già presente (incrementale update)
if not force and self.is_already_ingested(source, doc.source_hash):
print(f" Saltato: documento non modificato")
return {"skipped": True, "source": source}
# 3. Chunking
chunks = self.chunker.chunk(doc)
print(f" Chunking: {len(chunks)} chunk creati")
# 4. Elimina versione precedente (se esiste)
with self.conn.cursor() as cur:
cur.execute("DELETE FROM rag_documents WHERE source_path = %s", (source,))
# 5. Genera embeddings in batch
texts = [c.content for c in chunks]
embeddings = self.embedder.embed_batch(texts)
print(f" Embeddings generati: {len(embeddings)} vettori dim {len(embeddings[0])}")
# 6. Inserisci in PostgreSQL
rows = [
(
c.source_path,
c.source_type,
c.source_hash,
c.chunk_index,
len(chunks), # chunk_total
c.title,
c.content,
self.config.embedding_model,
embeddings[i],
json.dumps(c.metadata),
tags
)
for i, c in enumerate(chunks)
]
with self.conn.cursor() as cur:
execute_values(cur, """
INSERT INTO rag_documents
(source_path, source_type, source_hash, chunk_index, chunk_total,
title, content, embedding_model, embedding, metadata, tags)
VALUES %s
ON CONFLICT (source_path, chunk_index, source_hash) DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
updated_at = NOW()
""", rows, template="(%s,%s,%s,%s,%s,%s,%s,%s,%s::vector,%s::jsonb,%s::text[])")
self.conn.commit()
elapsed = time.time() - start_time
stats = {
"source": source,
"chunks": len(chunks),
"embeddings": len(embeddings),
"elapsed_sec": round(elapsed, 2)
}
print(f" Completato in {elapsed:.1f}s - {stats}")
return stats
def ingest_directory(self, directory: str, extensions: list[str] = None) -> list[dict]:
"""Ingesta tutti i documenti in una directory."""
from pathlib import Path
extensions = extensions or [".txt", ".md", ".pdf"]
results = []
for path in Path(directory).rglob("*"):
if path.suffix.lower() in extensions:
result = self.ingest(str(path))
results.append(result)
return results
Recuperare: Găsirea bucăților potrivite
retrieval/searcher.py
import psycopg2
from dataclasses import dataclass
from typing import Optional
@dataclass
class SearchResult:
id: int
source_path: str
source_type: str
chunk_index: int
title: Optional[str]
content: str
similarity: float
metadata: dict
class HybridSearcher:
"""
Combina vector search (semantica) con full-text search (keyword).
Reciprocal Rank Fusion per merging dei risultati.
"""
def __init__(self, config, embedder):
self.config = config
self.embedder = embedder
self.conn = psycopg2.connect(config.get_db_url())
def vector_search(self, query: str, top_k: int = 10,
source_type: Optional[str] = None,
tags: Optional[list[str]] = None) -> list[SearchResult]:
"""Ricerca semantica con filtri opzionali."""
query_embedding = self.embedder.embed_single(query)
threshold = 1 - self.config.similarity_threshold # converti a cosine distance
# Costruisci query dinamica con filtri opzionali
filters = ["embedding <=> %s::vector < %s"]
params = [query_embedding, threshold]
if source_type:
filters.append("source_type = %s")
params.append(source_type)
if tags:
filters.append("tags && %s::text[]") -- overlap: almeno un tag in comune
params.append(tags)
where_clause = " AND ".join(filters)
with self.conn.cursor() as cur:
cur.execute(f"""
SELECT
id, source_path, source_type, chunk_index, title, content,
1 - (embedding <=> %s::vector) AS similarity,
metadata
FROM rag_documents
WHERE {where_clause}
ORDER BY embedding <=> %s::vector
LIMIT %s
""", [query_embedding] + params + [query_embedding, top_k])
rows = cur.fetchall()
return [
SearchResult(
id=r[0], source_path=r[1], source_type=r[2],
chunk_index=r[3], title=r[4], content=r[5],
similarity=round(r[6], 4), metadata=r[7]
)
for r in rows
]
def fulltext_search(self, query: str, top_k: int = 10) -> list[SearchResult]:
"""Full-text search con ts_rank per ranking."""
with self.conn.cursor() as cur:
cur.execute("""
SELECT
id, source_path, source_type, chunk_index, title, content,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) AS rank,
metadata
FROM rag_documents
WHERE to_tsvector('english', content) @@
plainto_tsquery('english', %s)
ORDER BY rank DESC
LIMIT %s
""", (query, query, top_k))
rows = cur.fetchall()
return [
SearchResult(
id=r[0], source_path=r[1], source_type=r[2],
chunk_index=r[3], title=r[4], content=r[5],
similarity=round(float(r[6]), 4), metadata=r[7]
)
for r in rows
]
def hybrid_search(self, query: str, top_k: int = 5,
vector_weight: float = 0.7) -> list[SearchResult]:
"""
Reciprocal Rank Fusion (RRF) per combinare vector e full-text search.
RRF Score = sum(1 / (k + rank)) per ogni lista di risultati.
"""
k_rrf = 60 # costante RRF standard
# Ottieni entrambi i risultati
vector_results = self.vector_search(query, top_k=top_k * 2)
fts_results = self.fulltext_search(query, top_k=top_k * 2)
# Calcola RRF scores
scores = {}
all_results = {}
for rank, result in enumerate(vector_results):
scores[result.id] = scores.get(result.id, 0) + vector_weight / (k_rrf + rank + 1)
all_results[result.id] = result
fts_weight = 1 - vector_weight
for rank, result in enumerate(fts_results):
scores[result.id] = scores.get(result.id, 0) + fts_weight / (k_rrf + rank + 1)
all_results[result.id] = result
# Ordina per RRF score e prendi top_k
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
final_results = [all_results[id] for id in sorted_ids[:top_k]]
# Aggiorna similarity con RRF score normalizzato
max_score = scores[sorted_ids[0]] if sorted_ids else 1
for result in final_results:
result.similarity = round(scores[result.id] / max_score, 4)
return final_results
Generație: de la context la răspuns
generație/prompts.py
from string import Template
# System prompt che definisce il comportamento dell'AI
RAG_SYSTEM_PROMPT = """Sei un assistente AI preciso e utile. Rispondi alle domande
basandoti ESCLUSIVAMENTE sui documenti di contesto forniti.
Regole:
1. Usa SOLO le informazioni presenti nel contesto. Non inventare.
2. Se la risposta non e nel contesto, dillo chiaramente.
3. Cita le sorgenti usando [Fonte: nome_file, chunk X] dopo ogni affermazione.
4. Mantieni un tono professionale e conciso.
5. Struttura la risposta in modo chiaro con paragrafi o bullet points se appropriato.
"""
def build_rag_prompt(query: str, context_chunks: list, include_sources: bool = True) -> str:
"""
Costruisce il prompt per l'LLM con il contesto recuperato.
Args:
query: La domanda dell'utente
context_chunks: Lista di SearchResult
include_sources: Se includere le informazioni sulla sorgente
Returns:
Il prompt formattato per l'LLM
"""
if not context_chunks:
return f"Domanda: {query}\n\nNota: Non ho trovato documenti rilevanti nel knowledge base."
# Costruisci il contesto con numerazione e sorgente
context_parts = []
for i, chunk in enumerate(context_chunks, 1):
source_info = f"[Fonte: {chunk.source_path}, chunk {chunk.chunk_index}]" if include_sources else ""
context_parts.append(f"--- Documento {i} {source_info} ---\n{chunk.content}")
context_text = "\n\n".join(context_parts)
return f"""Contesto dai documenti:
{context_text}
---
Domanda dell'utente: {query}
Rispondi basandoti sul contesto fornito."""
generație/generator.py
from openai import OpenAI
from dataclasses import dataclass
from typing import Optional
import tiktoken
from .prompts import RAG_SYSTEM_PROMPT, build_rag_prompt
@dataclass
class RAGResponse:
answer: str
sources: list[dict]
model: str
total_tokens: int
prompt_tokens: int
completion_tokens: int
class RAGGenerator:
def __init__(self, config):
self.config = config
self.client = OpenAI(api_key=config.openai_api_key)
self.tokenizer = tiktoken.encoding_for_model("gpt-4o")
def count_tokens(self, text: str) -> int:
return len(self.tokenizer.encode(text))
def truncate_context(self, chunks: list, max_tokens: int) -> list:
"""
Tronca il contesto per non superare il limite di token.
Mantieni i chunk più rilevanti (gia ordinati per similarità).
"""
selected = []
used_tokens = 0
for chunk in chunks:
chunk_tokens = self.count_tokens(chunk.content)
if used_tokens + chunk_tokens > max_tokens:
break
selected.append(chunk)
used_tokens += chunk_tokens
return selected
def generate(self, query: str, context_chunks: list,
stream: bool = False) -> RAGResponse:
"""
Genera una risposta RAG.
Args:
query: La domanda dell'utente
context_chunks: Chunk recuperati da PostgreSQL
stream: Se True, usa streaming (non implementato qui per semplicità)
"""
# Tronca il contesto se necessario
max_context_tokens = self.config.max_context_tokens
truncated_chunks = self.truncate_context(context_chunks, max_context_tokens)
if len(truncated_chunks) < len(context_chunks):
print(f" Contesto troncato: {len(context_chunks)} -> {len(truncated_chunks)} chunk")
# Costruisci il prompt
user_prompt = build_rag_prompt(query, truncated_chunks)
# Chiama l'LLM
response = self.client.chat.completions.create(
model=self.config.chat_model,
messages=[
{"role": "system", "content": RAG_SYSTEM_PROMPT},
{"role": "user", "content": user_prompt}
],
temperature=self.config.temperature,
max_tokens=1500
)
answer = response.choices[0].message.content
usage = response.usage
# Prepara le sorgenti per la risposta
sources = [
{
"source": chunk.source_path,
"chunk_index": chunk.chunk_index,
"similarity": chunk.similarity,
"excerpt": chunk.content[:200] + "..."
}
for chunk in truncated_chunks
]
return RAGResponse(
answer=answer,
sources=sources,
model=self.config.chat_model,
total_tokens=usage.total_tokens,
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens
)
Sistemul RAG complet
rag.py - Clasa principală
from config import config, Config
from ingestion.pipeline import IngestionPipeline
from retrieval.searcher import HybridSearcher
from generation.generator import RAGGenerator
class EmbeddingService:
"""Wrapper per generazione embeddings OpenAI."""
def __init__(self, cfg: Config):
from openai import OpenAI
self.client = OpenAI(api_key=cfg.openai_api_key)
self.model = cfg.embedding_model
def embed_single(self, text: str) -> list[float]:
resp = self.client.embeddings.create(
input=[text.replace("\n", " ")],
model=self.model
)
return resp.data[0].embedding
def embed_batch(self, texts: list[str]) -> list[list[float]]:
cleaned = [t.replace("\n", " ").strip() for t in texts]
resp = self.client.embeddings.create(input=cleaned, model=self.model)
return [item.embedding for item in resp.data]
class RAGSystem:
"""
Sistema RAG completo: ingestion + retrieval + generation.
"""
def __init__(self, cfg: Config = None):
self.config = cfg or config
self.embedder = EmbeddingService(self.config)
self.ingestion = IngestionPipeline(self.config, self.embedder)
self.searcher = HybridSearcher(self.config, self.embedder)
self.generator = RAGGenerator(self.config)
def add_document(self, source: str, tags: list[str] = None) -> dict:
"""Aggiunge un documento al knowledge base."""
return self.ingestion.ingest(source, tags=tags)
def add_directory(self, directory: str, extensions: list[str] = None) -> list[dict]:
"""Aggiunge tutti i documenti di una directory."""
return self.ingestion.ingest_directory(directory, extensions)
def ask(self, question: str, use_hybrid: bool = True,
source_type: str = None) -> dict:
"""
Pone una domanda al sistema RAG.
Returns:
dict con answer, sources, usage
"""
# 1. Retrieval
if use_hybrid:
chunks = self.searcher.hybrid_search(question, top_k=self.config.top_k)
else:
chunks = self.searcher.vector_search(
question, top_k=self.config.top_k, source_type=source_type
)
if not chunks:
return {
"answer": "Non ho trovato informazioni rilevanti per rispondere a questa domanda.",
"sources": [],
"retrieval": {"chunks_found": 0}
}
# 2. Generation
response = self.generator.generate(question, chunks)
return {
"answer": response.answer,
"sources": response.sources,
"retrieval": {
"chunks_found": len(chunks),
"top_similarity": chunks[0].similarity if chunks else 0
},
"usage": {
"model": response.model,
"total_tokens": response.total_tokens
}
}
main.py - Utilizarea sistemului
from rag import RAGSystem
# Inizializza il sistema
rag = RAGSystem()
# --- INGESTION ---
print("=== Aggiungendo documenti al knowledge base ===")
# Aggiungi singoli file
rag.add_document("docs/postgresql_guide.pdf", tags=["postgresql", "database"])
rag.add_document("docs/pgvector_tutorial.md", tags=["pgvector", "vector-search"])
rag.add_document("https://www.postgresql.org/docs/current/", tags=["official-docs"])
# Aggiungi una directory intera
stats = rag.add_directory("docs/", extensions=[".md", ".txt", ".pdf"])
print(f"Ingestati {len(stats)} documenti")
# --- QUERY ---
print("\n=== Interrogando il sistema ===")
questions = [
"Come si installa pgvector su PostgreSQL 16?",
"Qual e la differenza tra HNSW e IVFFlat?",
"Come si ottimizza la memoria per il vector search?",
]
for q in questions:
print(f"\nDomanda: {q}")
print("-" * 60)
result = rag.ask(q)
print(f"Risposta:\n{result['answer']}")
print(f"\nSorgenti utilizzate ({len(result['sources'])}):")
for src in result["sources"]:
print(f" - {src['source']} [similarità: {src['similarity']}]")
print(f"\nToken usati: {result['usage']['total_tokens']}")
Căutare hibridă: PostgreSQL Full-Text + Vector
Unul dintre marile puncte forte ale PostgreSQL pentru RAG este că puteți combina căutarea într-o singură interogare semantică (vector) cu căutare clasică full-text. Acest lucru este deosebit de util pentru interogări care conțin termeni tehnici preciși (nume proprii, acronime, versiuni de software) care Numai căutarea semantică poate să nu capteze perfect:
-- Hybrid search in SQL puro: vettore + full-text in una query
WITH vector_search AS (
SELECT id, content, source_path, chunk_index,
1 - (embedding <=> %s::vector) AS vector_score,
ROW_NUMBER() OVER (ORDER BY embedding <=> %s::vector) AS vector_rank
FROM rag_documents
ORDER BY embedding <=> %s::vector
LIMIT 20
),
fts_search AS (
SELECT id, content, source_path, chunk_index,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) AS fts_score,
ROW_NUMBER() OVER (
ORDER BY ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) DESC
) AS fts_rank
FROM rag_documents
WHERE to_tsvector('english', content) @@ plainto_tsquery('english', %s)
LIMIT 20
),
-- Reciprocal Rank Fusion
rrf AS (
SELECT
COALESCE(v.id, f.id) AS id,
COALESCE(v.content, f.content) AS content,
COALESCE(v.source_path, f.source_path) AS source_path,
-- RRF score: 0.7 * vector_weight + 0.3 * fts_weight
COALESCE(0.7 / (60 + v.vector_rank), 0) +
COALESCE(0.3 / (60 + f.fts_rank), 0) AS rrf_score
FROM vector_search v
FULL OUTER JOIN fts_search f ON v.id = f.id
)
SELECT id, content, source_path, rrf_score
FROM rrf
ORDER BY rrf_score DESC
LIMIT 5;
Evaluarea calității RAG
Cum măsori dacă sistemul tău RAG funcționează bine? Principalele valori sunt:
| Metric | Ce măsoară | Ţintă | Cum se calculează |
|---|---|---|---|
| Recall@K | Documentele potrivite se găsesc în primele K rezultate | > 0,70 | Set de teste cu adevărul de bază |
| Precizie@K | Rezultatele găsite sunt într-adevăr relevante | > 0,60 | Adnotare manuală |
| Răspunde fidelitate | Răspunsul este susținut de contextul preluat | > 0,80 | cadru RAGAS |
| Relevanța răspunsului | Răspunsul răspunde la întrebarea pusă | > 0,75 | cadru RAGAS |
| P95 latență | Timp de răspuns la percentila 95 | < 3s | Monitorizare in productie |
# Valutazione con RAGAS
# pip install ragas
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_recall
from datasets import Dataset
# Prepara il dataset di test
test_data = {
"question": [
"Come si crea un indice HNSW in pgvector?",
"Qual e il limite di dimensioni per i vettori in pgvector?",
],
"answer": [
# Risposte generate dal tuo sistema RAG
rag.ask("Come si crea un indice HNSW in pgvector?")["answer"],
rag.ask("Qual e il limite di dimensioni per i vettori in pgvector?")["answer"],
],
"contexts": [
# I chunk recuperati per ciascuna domanda
[c["excerpt"] for c in rag.ask("...")["sources"]],
[c["excerpt"] for c in rag.ask("...")["sources"]],
],
"ground_truth": [
"CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64)",
"Il limite e 16000 dimensioni per vettori di tipo vector in pgvector 0.7+",
]
}
dataset = Dataset.from_dict(test_data)
results = evaluate(dataset, metrics=[faithfulness, answer_relevancy, context_recall])
print(results)
Strategii avansate de fragmentare
Calitatea fragmentării este unul dintre cei mai importanți factori pentru calitatea RAG. O strategie fragmentarea prost calibrată poate degrada performanța chiar și cu cel mai bun model de încorporare. Iată strategii avansate pentru cazuri de utilizare specifice:
Chunking cu Semantic Overlap
from langchain.text_splitter import RecursiveCharacterTextSplitter
import re
class SemanticChunker:
"""
Chunker che preserva la coerenza semantica dei paragrafi.
A differenza del semplice chunking per caratteri, questo
rispetta i confini di frase e paragrafo.
"""
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split_by_sentences(self, text: str) -> list[str]:
"""Divide il testo in frasi usando regex."""
# Pattern per fine frase: ., !, ? seguiti da spazio e maiuscola
sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z])', text)
return [s.strip() for s in sentences if s.strip()]
def create_chunks_with_context(self, text: str) -> list[str]:
"""
Crea chunk con context overlap:
ogni chunk include le ultime N parole del chunk precedente.
"""
sentences = self.split_by_sentences(text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
# Se la frase corrente supera da sola il chunk_size, spezzala
if sentence_size > self.chunk_size:
if current_chunk:
chunks.append(" ".join(current_chunk))
# Mantieni overlap: ultime N parole
words = " ".join(current_chunk).split()
overlap_words = words[-30:] # ~150 caratteri overlap
current_chunk = [" ".join(overlap_words)]
current_size = len(" ".join(overlap_words))
# Spezza la frase lunga
splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap
)
for sub in splitter.split_text(sentence):
chunks.append(sub)
continue
# Aggiungi frase al chunk corrente
if current_size + sentence_size + 1 > self.chunk_size and current_chunk:
chunks.append(" ".join(current_chunk))
# Overlap: ultime 30 parole del chunk precedente
words = " ".join(current_chunk).split()
overlap_words = words[-30:]
current_chunk = [" ".join(overlap_words), sentence]
current_size = len(" ".join(current_chunk))
else:
current_chunk.append(sentence)
current_size += sentence_size + 1
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
Diviziunea în funcție de structura documentului (pe bază de antet)
import re
from typing import Generator
def chunk_by_headers(markdown_text: str, max_chunk_size: int = 800) -> Generator:
"""
Chunking che rispetta la struttura gerarchica dei documenti Markdown.
Ogni sezione H2/H3 diventa un contesto separato, preservando il titolo
come header del chunk (fondamentale per la qualità dell'embedding).
"""
# Regex per trovare header Markdown (H1-H4)
header_pattern = re.compile(r'^(#{1,4})\s+(.+)






