PostgreSQL ile RAG: Belgeden Yanıta
Yapay zeka sisteminizin soruları belgelere dayalı olarak yanıtlayabilmesini hiç istediniz mi? Firmanıza özel, özel bir model yetiştirmenize gerek kalmadan mı? Cevap evet çağrı Alma-Artırılmış Nesil (RAG)ve en güzel mimarilerden biridir Modern yapay zekanın güçlü ve pratik teknolojileri. Ve pgvector ile PostgreSQL en iyi araçlardan biridir bunu uygulamak için.
RAG iki tamamlayıcı yeteneği birleştirir: anlamsal arama (beni bul başvuruyla en alakalı belgeler) doğal dil üretimi (bu belgelere dayanarak tutarlı bir yanıt üretin). Sonuç, yanıt veren bir sistemdir önceden eğitilmiş bir modelin genel bilgisiyle değil, verilerinizin güncel bilgisiyle.
Bu makalede, belge alımından itibaren eksiksiz bir uçtan uca RAG işlem hattı oluşturacağız tamamı PostgreSQL'de olmak üzere GPT-4 tarafından oluşturulan yanıtla birlikte sorguya. Ek veri tabanı yok, vektör mağazası için harici hizmet yok.
Seriye Genel Bakış
| # | Öğe | Odak |
|---|---|---|
| 1 | pgvektör | Kurulum, operatörler, indeksleme |
| 2 | Derinlikteki Gömmeler | Modeller, mesafeler, nesil |
| 3 | Buradasınız - PostgreSQL ile RAG | Uçtan uca RAG boru hattı |
| 4 | Benzerlik Arama | Algoritmalar ve optimizasyon |
| 5 | HNSW ve IVFFlat | Gelişmiş indeksleme stratejileri |
| 6 | Üretimde RAG | Ölçeklenebilirlik ve performans |
Ne Öğreneceksiniz
- Bir RAG sisteminin eksiksiz mimarisi: bileşenler ve veri akışı
- Belge alma ardışık düzeni: yükleme, ayrıştırma, parçalama
- PostgreSQL'de pgvector ile depolama stratejisi
- Alma: sorgudan en alakalı parçaların seçimine kadar
- Nesil: İstemin nasıl oluşturulacağı ve GPT-4 ile nasıl entegre edileceği
- Hibrit arama: vektör aramayı ve PostgreSQL tam metin aramasını birleştirin
- RAG kalite değerlendirmesi: ölçümler ve araçlar
RAG Mimarisi: Nasıl Çalışır?
Bir RAG sisteminin farklı zamanlarda çalışan iki ana aşaması vardır:
Aşama 1: Besleme (çevrimdışı)
Bir kez (veya belgeler değiştikçe periyodik olarak) gerçekleşir. Süreç:
- Yük: Belgeleri dosya sisteminden, URL'den, veritabanından, API'den yükleyin
- Ayrıştırma: PDF, DOCX, HTML, Markdown'dan metin çıkarma
- Parçalar: Metni en uygun boyuttaki parçalara bölme
- Yerleştirme: Her parça için bir gömme vektörü oluşturun
- Mağaza: PostgreSQL'de yığın + gömme + meta verileri kaydedin
Aşama 2: Alma + Oluşturma (çevrimiçi, her sorgu için)
- Sorgular: Kullanıcı doğal dilde bir soru sorar
- Sorguları yerleştirme: Aynı modeli kullanarak soruyu bir vektöre dönüştürün
- Aramak: PostgreSQL'de en benzer k parçayı bulun
- Bağlam: Bulunan parçaları bağlam olarak birleştirin
- Oluştur: Cevabı almak için soruyu + bağlamı Yüksek Lisans'a gönderin
## Flusso RAG Visualizzato
INGESTION (offline):
Documento PDF
|
v
[Parser] -> Testo grezzo
|
v
[Chunker] -> ["chunk 1", "chunk 2", ..., "chunk N"]
|
v
[Embedding Model] -> [[0.023, -0.841, ...], [0.891, 0.234, ...], ...]
|
v
[PostgreSQL + pgvector] -> Memorizzazione permanente
QUERY (online):
Domanda utente: "Come funziona l'indicizzazione HNSW?"
|
v
[Embedding Model] -> [0.045, -0.823, ...] (query vector)
|
v
[PostgreSQL ANN Search] -> Top 5 chunk più simili
|
v
[Prompt Builder] -> "Usa questo contesto: [chunk1, chunk2, ...] Domanda: ..."
|
v
[GPT-4 / Claude] -> "L'indicizzazione HNSW (Hierarchical Navigable Small World) ..."
|
v
Risposta all'utente
Proje Kurulumu
Bağımlılıklar
# requirements.txt
openai>=1.12.0
psycopg2-binary>=2.9.9
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-community>=0.0.20
pypdf>=3.17.0
python-dotenv>=1.0.0
tiktoken>=0.5.0
# Installazione
pip install -r requirements.txt
Veritabanı Yapılandırması
-- Setup iniziale PostgreSQL
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm; -- per full-text search
-- Schema completo per RAG
CREATE TABLE IF NOT EXISTS rag_documents (
id BIGSERIAL PRIMARY KEY,
-- Informazioni sorgente
source_path TEXT NOT NULL,
source_type TEXT NOT NULL CHECK (source_type IN ('pdf', 'txt', 'md', 'html', 'docx')),
source_hash TEXT NOT NULL, -- hash MD5 del file originale
-- Chunk info
chunk_index INTEGER NOT NULL,
chunk_total INTEGER,
-- Contenuto
title TEXT,
content TEXT NOT NULL,
content_length INTEGER GENERATED ALWAYS AS (length(content)) STORED,
-- Embedding
embedding_model TEXT NOT NULL DEFAULT 'text-embedding-3-small',
embedding vector(1536),
-- Metadata
metadata JSONB DEFAULT '{}',
tags TEXT[] DEFAULT '{}',
-- Timestamps
ingested_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (source_path, chunk_index, source_hash)
);
-- Indice HNSW per vector search veloce
CREATE INDEX idx_rag_embedding_hnsw
ON rag_documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Indice GIN per full-text search
CREATE INDEX idx_rag_content_fts
ON rag_documents
USING gin (to_tsvector('english', content));
-- Indice per filtri comuni
CREATE INDEX idx_rag_source_type ON rag_documents (source_type);
CREATE INDEX idx_rag_tags ON rag_documents USING gin (tags);
CREATE INDEX idx_rag_metadata ON rag_documents USING gin (metadata);
Belge Besleme İşlem Hattı
Python Proje Yapısı
rag_system/
├── config.py # Configurazione DB, API keys, parametri
├── ingestion/
│ ├── __init__.py
│ ├── loaders.py # Caricamento documenti da varie sorgenti
│ ├── parsers.py # Parsing PDF, DOCX, HTML, Markdown
│ ├── chunkers.py # Strategie di chunking
│ └── pipeline.py # Pipeline ingestion orchestrator
├── retrieval/
│ ├── __init__.py
│ ├── embedder.py # Generazione embeddings
│ └── searcher.py # Vector search e hybrid search
├── generation/
│ ├── __init__.py
│ ├── prompts.py # Template prompts
│ └── generator.py # Integrazione LLM
├── rag.py # Classe principale RAGSystem
└── main.py # Entry point
yapılandırma.py
import os
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
@dataclass
class Config:
# Database
db_host: str = os.getenv("DB_HOST", "localhost")
db_port: int = int(os.getenv("DB_PORT", "5432"))
db_name: str = os.getenv("DB_NAME", "ragdb")
db_user: str = os.getenv("DB_USER", "postgres")
db_password: str = os.getenv("DB_PASSWORD", "")
# OpenAI
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
embedding_model: str = "text-embedding-3-small"
embedding_dim: int = 1536
chat_model: str = "gpt-4o-mini" # cost-effective default
# Chunking
chunk_size: int = 800
chunk_overlap: int = 150
min_chunk_size: int = 100
# Retrieval
top_k: int = 5
similarity_threshold: float = 0.65 # minimum cosine similarity
# Generation
max_context_tokens: int = 8000
temperature: float = 0.1 # low temperature for factual answers
def get_db_url(self) -> str:
return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}"
config = Config()
ingestion/loaders.py - Çok Kaynaklı Yükleme
import hashlib
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
import requests
from bs4 import BeautifulSoup
@dataclass
class RawDocument:
content: str
source_path: str
source_type: str
source_hash: str
title: Optional[str] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
def load_text_file(path: str) -> RawDocument:
p = Path(path)
content = p.read_text(encoding="utf-8")
return RawDocument(
content=content,
source_path=path,
source_type="txt",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=p.stem
)
def load_markdown_file(path: str) -> RawDocument:
p = Path(path)
content = p.read_text(encoding="utf-8")
# Estrai titolo dal frontmatter o dalla prima riga H1
title = None
for line in content.split("\n"):
if line.startswith("# "):
title = line[2:].strip()
break
return RawDocument(
content=content,
source_path=path,
source_type="md",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=title
)
def load_pdf_file(path: str) -> RawDocument:
from pypdf import PdfReader
reader = PdfReader(path)
pages = []
for page in reader.pages:
pages.append(page.extract_text())
content = "\n\n".join(pages)
return RawDocument(
content=content,
source_path=path,
source_type="pdf",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=Path(path).stem,
metadata={"pages": len(reader.pages)}
)
def load_url(url: str) -> RawDocument:
response = requests.get(url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Rimuovi script, style, nav
for tag in soup(["script", "style", "nav", "header", "footer"]):
tag.decompose()
content = soup.get_text(separator="\n", strip=True)
title = soup.title.string if soup.title else url
return RawDocument(
content=content,
source_path=url,
source_type="html",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=title
)
def load_document(source: str) -> RawDocument:
"""Smart loader che sceglie il parser corretto."""
if source.startswith("http"):
return load_url(source)
p = Path(source)
loaders = {
".txt": load_text_file,
".md": load_markdown_file,
".pdf": load_pdf_file,
}
loader = loaders.get(p.suffix.lower())
if not loader:
raise ValueError(f"Tipo file non supportato: {p.suffix}")
return loader(source)
alımı/chunkers.py - Akıllı Parçalama
from langchain.text_splitter import RecursiveCharacterTextSplitter
from dataclasses import dataclass
from typing import Optional
@dataclass
class TextChunk:
content: str
chunk_index: int
source_path: str
source_type: str
source_hash: str
title: Optional[str] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class SmartChunker:
"""
Chunker che adatta la strategia al tipo di documento.
"""
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Separatori per testo generico
self._text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " "],
length_function=len
)
# Separatori per markdown (rispetta la struttura)
self._md_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["## ", "# ", "\n\n", "\n", ". "],
length_function=len
)
def chunk(self, doc) -> list[TextChunk]:
"""Chunk un documento, scegliendo la strategia giusta."""
if doc.source_type == "md":
raw_chunks = self._md_splitter.split_text(doc.content)
else:
raw_chunks = self._text_splitter.split_text(doc.content)
# Filtra chunk troppo piccoli
raw_chunks = [c for c in raw_chunks if len(c.strip()) > 100]
return [
TextChunk(
content=chunk.strip(),
chunk_index=i,
source_path=doc.source_path,
source_type=doc.source_type,
source_hash=doc.source_hash,
title=doc.title,
metadata={
**doc.metadata,
"chunk_total": len(raw_chunks),
"char_count": len(chunk)
}
)
for i, chunk in enumerate(raw_chunks)
]
ingestion/pipeline.py - Ana Düzenleyici
import psycopg2
from psycopg2.extras import execute_values
import json
import time
from .loaders import load_document
from .chunkers import SmartChunker
class IngestionPipeline:
def __init__(self, config, embedder):
self.config = config
self.embedder = embedder
self.chunker = SmartChunker(
chunk_size=config.chunk_size,
chunk_overlap=config.chunk_overlap
)
self.conn = psycopg2.connect(config.get_db_url())
def is_already_ingested(self, source_path: str, source_hash: str) -> bool:
"""Controlla se il documento e già nel DB con lo stesso hash (non cambiato)."""
with self.conn.cursor() as cur:
cur.execute(
"SELECT COUNT(*) FROM rag_documents WHERE source_path = %s AND source_hash = %s",
(source_path, source_hash)
)
return cur.fetchone()[0] > 0
def ingest(self, source: str, tags: list[str] = None, force: bool = False) -> dict:
"""
Processa un documento e lo inserisce in PostgreSQL.
Ritorna statistiche sull'operazione.
"""
tags = tags or []
start_time = time.time()
# 1. Carica documento
doc = load_document(source)
print(f"Caricato: {source} ({len(doc.content)} chars, hash: {doc.source_hash[:8]})")
# 2. Controlla se già presente (incrementale update)
if not force and self.is_already_ingested(source, doc.source_hash):
print(f" Saltato: documento non modificato")
return {"skipped": True, "source": source}
# 3. Chunking
chunks = self.chunker.chunk(doc)
print(f" Chunking: {len(chunks)} chunk creati")
# 4. Elimina versione precedente (se esiste)
with self.conn.cursor() as cur:
cur.execute("DELETE FROM rag_documents WHERE source_path = %s", (source,))
# 5. Genera embeddings in batch
texts = [c.content for c in chunks]
embeddings = self.embedder.embed_batch(texts)
print(f" Embeddings generati: {len(embeddings)} vettori dim {len(embeddings[0])}")
# 6. Inserisci in PostgreSQL
rows = [
(
c.source_path,
c.source_type,
c.source_hash,
c.chunk_index,
len(chunks), # chunk_total
c.title,
c.content,
self.config.embedding_model,
embeddings[i],
json.dumps(c.metadata),
tags
)
for i, c in enumerate(chunks)
]
with self.conn.cursor() as cur:
execute_values(cur, """
INSERT INTO rag_documents
(source_path, source_type, source_hash, chunk_index, chunk_total,
title, content, embedding_model, embedding, metadata, tags)
VALUES %s
ON CONFLICT (source_path, chunk_index, source_hash) DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
updated_at = NOW()
""", rows, template="(%s,%s,%s,%s,%s,%s,%s,%s,%s::vector,%s::jsonb,%s::text[])")
self.conn.commit()
elapsed = time.time() - start_time
stats = {
"source": source,
"chunks": len(chunks),
"embeddings": len(embeddings),
"elapsed_sec": round(elapsed, 2)
}
print(f" Completato in {elapsed:.1f}s - {stats}")
return stats
def ingest_directory(self, directory: str, extensions: list[str] = None) -> list[dict]:
"""Ingesta tutti i documenti in una directory."""
from pathlib import Path
extensions = extensions or [".txt", ".md", ".pdf"]
results = []
for path in Path(directory).rglob("*"):
if path.suffix.lower() in extensions:
result = self.ingest(str(path))
results.append(result)
return results
Geri Alma: Doğru Parçaları Bulmak
alma/searcher.py
import psycopg2
from dataclasses import dataclass
from typing import Optional
@dataclass
class SearchResult:
id: int
source_path: str
source_type: str
chunk_index: int
title: Optional[str]
content: str
similarity: float
metadata: dict
class HybridSearcher:
"""
Combina vector search (semantica) con full-text search (keyword).
Reciprocal Rank Fusion per merging dei risultati.
"""
def __init__(self, config, embedder):
self.config = config
self.embedder = embedder
self.conn = psycopg2.connect(config.get_db_url())
def vector_search(self, query: str, top_k: int = 10,
source_type: Optional[str] = None,
tags: Optional[list[str]] = None) -> list[SearchResult]:
"""Ricerca semantica con filtri opzionali."""
query_embedding = self.embedder.embed_single(query)
threshold = 1 - self.config.similarity_threshold # converti a cosine distance
# Costruisci query dinamica con filtri opzionali
filters = ["embedding <=> %s::vector < %s"]
params = [query_embedding, threshold]
if source_type:
filters.append("source_type = %s")
params.append(source_type)
if tags:
filters.append("tags && %s::text[]") -- overlap: almeno un tag in comune
params.append(tags)
where_clause = " AND ".join(filters)
with self.conn.cursor() as cur:
cur.execute(f"""
SELECT
id, source_path, source_type, chunk_index, title, content,
1 - (embedding <=> %s::vector) AS similarity,
metadata
FROM rag_documents
WHERE {where_clause}
ORDER BY embedding <=> %s::vector
LIMIT %s
""", [query_embedding] + params + [query_embedding, top_k])
rows = cur.fetchall()
return [
SearchResult(
id=r[0], source_path=r[1], source_type=r[2],
chunk_index=r[3], title=r[4], content=r[5],
similarity=round(r[6], 4), metadata=r[7]
)
for r in rows
]
def fulltext_search(self, query: str, top_k: int = 10) -> list[SearchResult]:
"""Full-text search con ts_rank per ranking."""
with self.conn.cursor() as cur:
cur.execute("""
SELECT
id, source_path, source_type, chunk_index, title, content,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) AS rank,
metadata
FROM rag_documents
WHERE to_tsvector('english', content) @@
plainto_tsquery('english', %s)
ORDER BY rank DESC
LIMIT %s
""", (query, query, top_k))
rows = cur.fetchall()
return [
SearchResult(
id=r[0], source_path=r[1], source_type=r[2],
chunk_index=r[3], title=r[4], content=r[5],
similarity=round(float(r[6]), 4), metadata=r[7]
)
for r in rows
]
def hybrid_search(self, query: str, top_k: int = 5,
vector_weight: float = 0.7) -> list[SearchResult]:
"""
Reciprocal Rank Fusion (RRF) per combinare vector e full-text search.
RRF Score = sum(1 / (k + rank)) per ogni lista di risultati.
"""
k_rrf = 60 # costante RRF standard
# Ottieni entrambi i risultati
vector_results = self.vector_search(query, top_k=top_k * 2)
fts_results = self.fulltext_search(query, top_k=top_k * 2)
# Calcola RRF scores
scores = {}
all_results = {}
for rank, result in enumerate(vector_results):
scores[result.id] = scores.get(result.id, 0) + vector_weight / (k_rrf + rank + 1)
all_results[result.id] = result
fts_weight = 1 - vector_weight
for rank, result in enumerate(fts_results):
scores[result.id] = scores.get(result.id, 0) + fts_weight / (k_rrf + rank + 1)
all_results[result.id] = result
# Ordina per RRF score e prendi top_k
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
final_results = [all_results[id] for id in sorted_ids[:top_k]]
# Aggiorna similarity con RRF score normalizzato
max_score = scores[sorted_ids[0]] if sorted_ids else 1
for result in final_results:
result.similarity = round(scores[result.id] / max_score, 4)
return final_results
Nesil: Bağlamdan Tepkiye
nesil/prompts.py
from string import Template
# System prompt che definisce il comportamento dell'AI
RAG_SYSTEM_PROMPT = """Sei un assistente AI preciso e utile. Rispondi alle domande
basandoti ESCLUSIVAMENTE sui documenti di contesto forniti.
Regole:
1. Usa SOLO le informazioni presenti nel contesto. Non inventare.
2. Se la risposta non e nel contesto, dillo chiaramente.
3. Cita le sorgenti usando [Fonte: nome_file, chunk X] dopo ogni affermazione.
4. Mantieni un tono professionale e conciso.
5. Struttura la risposta in modo chiaro con paragrafi o bullet points se appropriato.
"""
def build_rag_prompt(query: str, context_chunks: list, include_sources: bool = True) -> str:
"""
Costruisce il prompt per l'LLM con il contesto recuperato.
Args:
query: La domanda dell'utente
context_chunks: Lista di SearchResult
include_sources: Se includere le informazioni sulla sorgente
Returns:
Il prompt formattato per l'LLM
"""
if not context_chunks:
return f"Domanda: {query}\n\nNota: Non ho trovato documenti rilevanti nel knowledge base."
# Costruisci il contesto con numerazione e sorgente
context_parts = []
for i, chunk in enumerate(context_chunks, 1):
source_info = f"[Fonte: {chunk.source_path}, chunk {chunk.chunk_index}]" if include_sources else ""
context_parts.append(f"--- Documento {i} {source_info} ---\n{chunk.content}")
context_text = "\n\n".join(context_parts)
return f"""Contesto dai documenti:
{context_text}
---
Domanda dell'utente: {query}
Rispondi basandoti sul contesto fornito."""
nesil/generator.py
from openai import OpenAI
from dataclasses import dataclass
from typing import Optional
import tiktoken
from .prompts import RAG_SYSTEM_PROMPT, build_rag_prompt
@dataclass
class RAGResponse:
answer: str
sources: list[dict]
model: str
total_tokens: int
prompt_tokens: int
completion_tokens: int
class RAGGenerator:
def __init__(self, config):
self.config = config
self.client = OpenAI(api_key=config.openai_api_key)
self.tokenizer = tiktoken.encoding_for_model("gpt-4o")
def count_tokens(self, text: str) -> int:
return len(self.tokenizer.encode(text))
def truncate_context(self, chunks: list, max_tokens: int) -> list:
"""
Tronca il contesto per non superare il limite di token.
Mantieni i chunk più rilevanti (gia ordinati per similarità).
"""
selected = []
used_tokens = 0
for chunk in chunks:
chunk_tokens = self.count_tokens(chunk.content)
if used_tokens + chunk_tokens > max_tokens:
break
selected.append(chunk)
used_tokens += chunk_tokens
return selected
def generate(self, query: str, context_chunks: list,
stream: bool = False) -> RAGResponse:
"""
Genera una risposta RAG.
Args:
query: La domanda dell'utente
context_chunks: Chunk recuperati da PostgreSQL
stream: Se True, usa streaming (non implementato qui per semplicità)
"""
# Tronca il contesto se necessario
max_context_tokens = self.config.max_context_tokens
truncated_chunks = self.truncate_context(context_chunks, max_context_tokens)
if len(truncated_chunks) < len(context_chunks):
print(f" Contesto troncato: {len(context_chunks)} -> {len(truncated_chunks)} chunk")
# Costruisci il prompt
user_prompt = build_rag_prompt(query, truncated_chunks)
# Chiama l'LLM
response = self.client.chat.completions.create(
model=self.config.chat_model,
messages=[
{"role": "system", "content": RAG_SYSTEM_PROMPT},
{"role": "user", "content": user_prompt}
],
temperature=self.config.temperature,
max_tokens=1500
)
answer = response.choices[0].message.content
usage = response.usage
# Prepara le sorgenti per la risposta
sources = [
{
"source": chunk.source_path,
"chunk_index": chunk.chunk_index,
"similarity": chunk.similarity,
"excerpt": chunk.content[:200] + "..."
}
for chunk in truncated_chunks
]
return RAGResponse(
answer=answer,
sources=sources,
model=self.config.chat_model,
total_tokens=usage.total_tokens,
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens
)
Eksiksiz RAG Sistemi
rag.py - Ana Sınıf
from config import config, Config
from ingestion.pipeline import IngestionPipeline
from retrieval.searcher import HybridSearcher
from generation.generator import RAGGenerator
class EmbeddingService:
"""Wrapper per generazione embeddings OpenAI."""
def __init__(self, cfg: Config):
from openai import OpenAI
self.client = OpenAI(api_key=cfg.openai_api_key)
self.model = cfg.embedding_model
def embed_single(self, text: str) -> list[float]:
resp = self.client.embeddings.create(
input=[text.replace("\n", " ")],
model=self.model
)
return resp.data[0].embedding
def embed_batch(self, texts: list[str]) -> list[list[float]]:
cleaned = [t.replace("\n", " ").strip() for t in texts]
resp = self.client.embeddings.create(input=cleaned, model=self.model)
return [item.embedding for item in resp.data]
class RAGSystem:
"""
Sistema RAG completo: ingestion + retrieval + generation.
"""
def __init__(self, cfg: Config = None):
self.config = cfg or config
self.embedder = EmbeddingService(self.config)
self.ingestion = IngestionPipeline(self.config, self.embedder)
self.searcher = HybridSearcher(self.config, self.embedder)
self.generator = RAGGenerator(self.config)
def add_document(self, source: str, tags: list[str] = None) -> dict:
"""Aggiunge un documento al knowledge base."""
return self.ingestion.ingest(source, tags=tags)
def add_directory(self, directory: str, extensions: list[str] = None) -> list[dict]:
"""Aggiunge tutti i documenti di una directory."""
return self.ingestion.ingest_directory(directory, extensions)
def ask(self, question: str, use_hybrid: bool = True,
source_type: str = None) -> dict:
"""
Pone una domanda al sistema RAG.
Returns:
dict con answer, sources, usage
"""
# 1. Retrieval
if use_hybrid:
chunks = self.searcher.hybrid_search(question, top_k=self.config.top_k)
else:
chunks = self.searcher.vector_search(
question, top_k=self.config.top_k, source_type=source_type
)
if not chunks:
return {
"answer": "Non ho trovato informazioni rilevanti per rispondere a questa domanda.",
"sources": [],
"retrieval": {"chunks_found": 0}
}
# 2. Generation
response = self.generator.generate(question, chunks)
return {
"answer": response.answer,
"sources": response.sources,
"retrieval": {
"chunks_found": len(chunks),
"top_similarity": chunks[0].similarity if chunks else 0
},
"usage": {
"model": response.model,
"total_tokens": response.total_tokens
}
}
main.py - Sistem Kullanımı
from rag import RAGSystem
# Inizializza il sistema
rag = RAGSystem()
# --- INGESTION ---
print("=== Aggiungendo documenti al knowledge base ===")
# Aggiungi singoli file
rag.add_document("docs/postgresql_guide.pdf", tags=["postgresql", "database"])
rag.add_document("docs/pgvector_tutorial.md", tags=["pgvector", "vector-search"])
rag.add_document("https://www.postgresql.org/docs/current/", tags=["official-docs"])
# Aggiungi una directory intera
stats = rag.add_directory("docs/", extensions=[".md", ".txt", ".pdf"])
print(f"Ingestati {len(stats)} documenti")
# --- QUERY ---
print("\n=== Interrogando il sistema ===")
questions = [
"Come si installa pgvector su PostgreSQL 16?",
"Qual e la differenza tra HNSW e IVFFlat?",
"Come si ottimizza la memoria per il vector search?",
]
for q in questions:
print(f"\nDomanda: {q}")
print("-" * 60)
result = rag.ask(q)
print(f"Risposta:\n{result['answer']}")
print(f"\nSorgenti utilizzate ({len(result['sources'])}):")
for src in result["sources"]:
print(f" - {src['source']} [similarità: {src['similarity']}]")
print(f"\nToken usati: {result['usage']['total_tokens']}")
Hibrit Arama: PostgreSQL Tam Metin + Vektör
RAG için PostgreSQL'in en güçlü yönlerinden biri, aramayı tek bir sorguda birleştirebilmenizdir klasik tam metin aramasıyla anlambilim (vektör). Bu özellikle aşağıdakiler için faydalıdır: kesin teknik terimler (özel adlar, kısaltmalar, yazılım sürümleri) içeren sorgular Semantik arama tek başına aşağıdakileri mükemmel şekilde yakalayamayabilir:
-- Hybrid search in SQL puro: vettore + full-text in una query
WITH vector_search AS (
SELECT id, content, source_path, chunk_index,
1 - (embedding <=> %s::vector) AS vector_score,
ROW_NUMBER() OVER (ORDER BY embedding <=> %s::vector) AS vector_rank
FROM rag_documents
ORDER BY embedding <=> %s::vector
LIMIT 20
),
fts_search AS (
SELECT id, content, source_path, chunk_index,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) AS fts_score,
ROW_NUMBER() OVER (
ORDER BY ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) DESC
) AS fts_rank
FROM rag_documents
WHERE to_tsvector('english', content) @@ plainto_tsquery('english', %s)
LIMIT 20
),
-- Reciprocal Rank Fusion
rrf AS (
SELECT
COALESCE(v.id, f.id) AS id,
COALESCE(v.content, f.content) AS content,
COALESCE(v.source_path, f.source_path) AS source_path,
-- RRF score: 0.7 * vector_weight + 0.3 * fts_weight
COALESCE(0.7 / (60 + v.vector_rank), 0) +
COALESCE(0.3 / (60 + f.fts_rank), 0) AS rrf_score
FROM vector_search v
FULL OUTER JOIN fts_search f ON v.id = f.id
)
SELECT id, content, source_path, rrf_score
FROM rrf
ORDER BY rrf_score DESC
LIMIT 5;
RAG kalite değerlendirmesi
RAG sisteminizin iyi çalışıp çalışmadığını nasıl ölçersiniz? Ana metrikler şunlardır:
| Metrik | Neyi ölçer | Hedef | Nasıl hesaplanır |
|---|---|---|---|
| Geri Çağırma@K | Doğru belgeler en üstteki K sonuçlarında bulunur | > 0,70 | Temel gerçekleri içeren test seti |
| Hassas @K | Bulunan sonuçlar gerçekten alakalı | > 0,60 | Manuel açıklama |
| Cevap Sadakati | Yanıt, alınan bağlam tarafından destekleniyor | > 0,80 | RAGAS çerçevesi |
| Cevap Alaka Düzeyi | Cevap sorulan soruya cevap veriyor | > 0,75 | RAGAS çerçevesi |
| P95 gecikmesi | 95. yüzdelik dilimde yanıt süresi | < 3s | Üretimde izleme |
# Valutazione con RAGAS
# pip install ragas
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_recall
from datasets import Dataset
# Prepara il dataset di test
test_data = {
"question": [
"Come si crea un indice HNSW in pgvector?",
"Qual e il limite di dimensioni per i vettori in pgvector?",
],
"answer": [
# Risposte generate dal tuo sistema RAG
rag.ask("Come si crea un indice HNSW in pgvector?")["answer"],
rag.ask("Qual e il limite di dimensioni per i vettori in pgvector?")["answer"],
],
"contexts": [
# I chunk recuperati per ciascuna domanda
[c["excerpt"] for c in rag.ask("...")["sources"]],
[c["excerpt"] for c in rag.ask("...")["sources"]],
],
"ground_truth": [
"CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64)",
"Il limite e 16000 dimensioni per vettori di tipo vector in pgvector 0.7+",
]
}
dataset = Dataset.from_dict(test_data)
results = evaluate(dataset, metrics=[faithfulness, answer_relevancy, context_recall])
print(results)
Gelişmiş Parçalama Stratejileri
Parçalama kalitesi RAG kalitesi için en önemli faktörlerden biridir. Bir strateji kötü kalibre edilmiş parçalama, en iyi yerleştirme modelinde bile performansı düşürebilir. Belirli kullanım örneklerine yönelik gelişmiş stratejiler şunlardır:
Anlamsal Örtüşme ile Parçalama
from langchain.text_splitter import RecursiveCharacterTextSplitter
import re
class SemanticChunker:
"""
Chunker che preserva la coerenza semantica dei paragrafi.
A differenza del semplice chunking per caratteri, questo
rispetta i confini di frase e paragrafo.
"""
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def split_by_sentences(self, text: str) -> list[str]:
"""Divide il testo in frasi usando regex."""
# Pattern per fine frase: ., !, ? seguiti da spazio e maiuscola
sentences = re.split(r'(?<=[.!?])\s+(?=[A-Z])', text)
return [s.strip() for s in sentences if s.strip()]
def create_chunks_with_context(self, text: str) -> list[str]:
"""
Crea chunk con context overlap:
ogni chunk include le ultime N parole del chunk precedente.
"""
sentences = self.split_by_sentences(text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
# Se la frase corrente supera da sola il chunk_size, spezzala
if sentence_size > self.chunk_size:
if current_chunk:
chunks.append(" ".join(current_chunk))
# Mantieni overlap: ultime N parole
words = " ".join(current_chunk).split()
overlap_words = words[-30:] # ~150 caratteri overlap
current_chunk = [" ".join(overlap_words)]
current_size = len(" ".join(overlap_words))
# Spezza la frase lunga
splitter = RecursiveCharacterTextSplitter(
chunk_size=self.chunk_size,
chunk_overlap=self.chunk_overlap
)
for sub in splitter.split_text(sentence):
chunks.append(sub)
continue
# Aggiungi frase al chunk corrente
if current_size + sentence_size + 1 > self.chunk_size and current_chunk:
chunks.append(" ".join(current_chunk))
# Overlap: ultime 30 parole del chunk precedente
words = " ".join(current_chunk).split()
overlap_words = words[-30:]
current_chunk = [" ".join(overlap_words), sentence]
current_size = len(" ".join(current_chunk))
else:
current_chunk.append(sentence)
current_size += sentence_size + 1
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
Belge Yapısına Göre Parçalama (Başlık Tabanlı)
import re
from typing import Generator
def chunk_by_headers(markdown_text: str, max_chunk_size: int = 800) -> Generator:
"""
Chunking che rispetta la struttura gerarchica dei documenti Markdown.
Ogni sezione H2/H3 diventa un contesto separato, preservando il titolo
come header del chunk (fondamentale per la qualità dell'embedding).
"""
# Regex per trovare header Markdown (H1-H4)
header_pattern = re.compile(r'^(#{1,4})\s+(.+)






