RAG w produkcji: architektura, skalowanie i monitorowanie
Zbudowanie prototypu RAG, który działa lokalnie, jest stosunkowo proste. Wprowadź to produkcji, gdzie musi obsłużyć tysiące jednoczesnych zapytań, odpowiada w czasie krótszym niż dwa sekundy, utrzymanie wysokiej jakości w czasie i brak utraty danych, a coś zupełnie innego. Odległość pomiędzy „działa na moim laptopie” a „działa w produkcji dla 10 000 użytkowników” jest ogromny i wiele projektów RAG kończy się niepowodzeniem już na tym etapie.
W tym artykule zajmiemy się prawdziwymi wyzwaniami RAG w produkcji: skalowalna architektura, optymalne dzielenie na porcje, reranking, zarządzanie aktualizacjami do korpusu, monitorowanie za pomocą wskaźników specyficznych dla RAG i automatyczna ocena jakość dzięki frameworkom takim jak RAGAS. Tu nie chodzi o teorię: każda sekcja zawiera Wykonywalny kod i wzorce Pythona testowane na rzeczywistych systemach.
Czego się nauczysz
- Gotowa do produkcji architektura skalowalnego systemu RAG
- Zaawansowane strategie fragmentacji (rekursywne, semantyczne, okno zdaniowe)
- Zmiana rankingu potoku między koderami w celu poprawy dokładności
- Zarządzanie przyrostowymi aktualizacjami korpusu wektorowego
- Monitorowanie za pomocą wskaźników specyficznych dla RAG (wierność, trafność, przypominanie)
- Automatyczna ocena w ramach RAGAS
- Inteligentne buforowanie w celu optymalizacji opóźnień i kosztów
- Obsługa błędów i płynna degradacja w produkcji
Przegląd serii
| # | Przedmiot | Centrum |
|---|---|---|
| 1 | RAG wyjaśnione | Fundamenty i architektura |
| 2 | Osadzanie i wyszukiwanie semantyczne | BERT, SBERT, FAISS |
| 3 | Baza danych wektorowych | Qdrant, Pinecone, Milvus |
| 4 | Odzyskiwanie hybrydowe | BM25 + wyszukiwanie wektorów |
| 5 | RAG w produkcji (tutaj jesteś) | Skalowanie, monitorowanie, ewaluacja |
| 6 | LangChain dla RAG | Zaawansowane frameworki i wzorce |
| 7 | Zarządzanie oknem kontekstowym | Zoptymalizuj dane wejściowe LLM |
| 8 | Systemy wieloagentowe | Orkiestracja i koordynacja |
| 9 | Szybka inżynieria w produkcji | Szablon, wersjonowanie, testowanie |
| 10 | Grafy wiedzy dla sztucznej inteligencji | Ustrukturyzowana wiedza w LLM |
1. Architektura gotowego do produkcji systemu RAG
System RAG w produkcji nie jest prostym rurociągiem sekwencyjnym: jest to system dystrybuowane z wyspecjalizowanymi komponentami, każdy z wymaganiami skalowalności, różna tolerancja na błędy i monitorowanie. Zrozum całą architekturę i pierwszy krok do zbudowania czegoś, co wytrzyma produkcję.
ARCHITETTURA RAG PRODUCTION
┌─────────────────────────────────────────────────────────┐
│ API GATEWAY │
│ (Rate limiting, auth, routing) │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────┴──────────┐
│ │
┌──────▼──────┐ ┌───────▼────────┐
│ QUERY │ │ INGESTION │
│ SERVICE │ │ SERVICE │
└──────┬──────┘ └───────┬────────┘
│ │
┌───────▼──────┐ ┌───────▼────────┐
│ RETRIEVAL │ │ DOCUMENT │
│ ENGINE │ │ PROCESSOR │
│ ┌─────────┐ │ │ ┌──────────┐ │
│ │Embedding│ │ │ │Chunking │ │
│ │Cache │ │ │ │Embedding │ │
│ └────┬────┘ │ │ │Indexing │ │
│ │ │ │ └──────────┘ │
│ ┌────▼────┐ │ └───────┬────────┘
│ │Vector │ │ │
│ │Search │ │ ┌───────▼────────┐
│ └────┬────┘ │ │ VECTOR DB │
│ │ │ │ (Qdrant/Pine) │
│ ┌────▼────┐ │ └────────────────┘
│ │Reranker │ │
│ └────┬────┘ │
└───────┼──────┘
│
┌───────▼──────┐ ┌────────────────┐
│ GENERATION │ │ CACHE │
│ SERVICE │◄──►│ (Redis/ │
│ (LLM) │ │ Semantic) │
└───────┬──────┘ └────────────────┘
│
┌───────▼──────┐ ┌────────────────┐
│ MONITORING │ │ EVALUATION │
│ SERVICE │ │ SERVICE │
│ (Prometheus)│ │ (RAGAS) │
└──────────────┘ └────────────────┘
1.1 Oddzielenie obaw: przetwarzanie a zapytanie
Podstawowym wzorcem w systemie produkcji RAG jest separacja pomiędzy plan pozyskiwania i plan zapytań. Te dwie ścieżki mają wymagania bardzo różne:
Pozyskiwanie a zapytanie: różne wymagania
| Rozmiar | Ścieżka spożycia | Ścieżka zapytania |
|---|---|---|
| Utajenie | Niekrytyczne (partia) | Krytyka (<2s s.95) |
| Przepustowość | Nisko-średni (dokumenty) | Wysoka (tysiące zapotrzebowań/s) |
| Procesor/GPU | Generowanie osadzania (GPU) | Osadzanie zapytań + reranking (GPU) |
| Błędy | Spróbuj ponownie z wycofaniem | Powrót pełen wdzięku |
| Ułuskowienie | Partia pozioma | Poziomy bezpaństwowiec |
2. Zaawansowane strategie dzielenia
Kawałki są prawdopodobnie najczęściej pomijaną zmienną w systemach RAG, a jednak tak jest ogromny wpływ na ostateczną jakość. Kawałek, który jest zbyt mały, traci kontekst; jeden zbyt duży wprowadza szum i wykracza poza kontekst osadzanego modelu.
2.1 Rekurencyjny rozdzielacz tekstu znakowego
Najskuteczniejszą metodą w większości przypadków użycia jest rekurencyjne dzielenie znaków: dzieli tekst stopniowo za pomocą separatorów delikatniejsze (akapity, zdania, słowa), aby zachować naturalną strukturę dokumentu.
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict, Any
import re
class AdvancedChunker:
"""Chunker avanzato con metadati e strategie multiple"""
def __init__(
self,
chunk_size: int = 512,
chunk_overlap: int = 64,
strategy: str = "recursive"
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.strategy = strategy
self.recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", "! ", "? ", " ", ""],
length_function=len,
is_separator_regex=False
)
def chunk_with_metadata(
self,
text: str,
doc_metadata: Dict[str, Any]
) -> List[Dict]:
"""Crea chunks con metadati completi per il retrieval"""
if self.strategy == "recursive":
chunks = self.recursive_splitter.split_text(text)
elif self.strategy == "semantic":
chunks = self._semantic_split(text)
elif self.strategy == "sentence_window":
chunks = self._sentence_window_split(text)
else:
raise ValueError(f"Strategia non supportata: {self.strategy}")
result = []
for i, chunk in enumerate(chunks):
chunk_meta = {
**doc_metadata,
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size": len(chunk),
"strategy": self.strategy,
# Snippet per il contesto nella risposta
"prev_chunk": chunks[i-1][:100] if i > 0 else None,
"next_chunk": chunks[i+1][:100] if i < len(chunks)-1 else None,
}
result.append({"text": chunk, "metadata": chunk_meta})
return result
def _sentence_window_split(
self,
text: str,
window_size: int = 3
) -> List[str]:
"""
Sentence Window: indicizza singole frasi ma recupera
le frasi vicine per mantenere il contesto.
Tecnica avanzata che migliora il recall mantenendo la precisione.
"""
# Split in frasi
sentences = re.split(r'(?<=[.!?])\s+', text)
sentences = [s.strip() for s in sentences if s.strip()]
chunks = []
for i, sentence in enumerate(sentences):
# Finestra di contesto: frase centrale + vicine
start = max(0, i - window_size // 2)
end = min(len(sentences), i + window_size // 2 + 1)
window = " ".join(sentences[start:end])
chunks.append(window)
return chunks
def _semantic_split(self, text: str) -> List[str]:
"""
Semantic split: divide in base ai cambi di topic
usando embeddings per individuare boundary semantici.
"""
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
sentences = re.split(r'(?<=[.!?])\s+', text)
if len(sentences) < 3:
return [text]
# Calcola embeddings di ogni frase
embeddings = model.encode(sentences)
# Calcola similarità tra frasi consecutive
similarities = []
for i in range(len(embeddings) - 1):
sim = np.dot(embeddings[i], embeddings[i+1]) / (
np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i+1])
)
similarities.append(sim)
# Trova boundary semantici (bassa similarità = cambio di topic)
threshold = np.mean(similarities) - np.std(similarities)
boundaries = [0]
for i, sim in enumerate(similarities):
if sim < threshold:
boundaries.append(i + 1)
boundaries.append(len(sentences))
# Crea chunks dai segmenti
chunks = []
for i in range(len(boundaries) - 1):
segment = " ".join(sentences[boundaries[i]:boundaries[i+1]])
if len(segment) > self.chunk_size * 2:
# Chunk troppo grande: dividi ulteriormente
sub_chunks = self.recursive_splitter.split_text(segment)
chunks.extend(sub_chunks)
elif segment.strip():
chunks.append(segment)
return chunks
# Esempio di utilizzo
chunker = AdvancedChunker(chunk_size=512, chunk_overlap=64, strategy="recursive")
document = """
Cos'è il RAG?
Il Retrieval-Augmented Generation (RAG) è un'architettura che combina la ricerca su basi
di conoscenza con la generazione di testo degli LLM. Il principio fondamentale è semplice:
invece di lasciare che l'LLM risponda solo con la sua conoscenza interna (soggetta ad
allucinazioni), RAG prima recupera documenti rilevanti e poi li fornisce come contesto.
Come funziona il retrieval?
Il processo di retrieval avviene in due fasi. Prima, i documenti vengono convertiti in
embeddings vettoriali e memorizzati in un vector database. Poi, la query dell'utente viene
anch'essa convertita in un embedding e viene effettuata una ricerca di similarità per
trovare i documenti più rilevanti.
"""
chunks = chunker.chunk_with_metadata(
document,
doc_metadata={"source": "rag-intro.txt", "author": "AI Team"}
)
for chunk in chunks:
print(f"Chunk {chunk['metadata']['chunk_index']}: {len(chunk['text'])} chars")
print(f" {chunk['text'][:100]}...")
2.2 Porcjowanie rodzic-dziecko
Jedną z najskuteczniejszych strategii dla zaawansowanych systemów RAG jest Dzielenie się rodzic-dziecko: małe fragmenty (dzieci) są indeksowane dla precyzja pobierania, ale w celu zapewnienia zwracane są duże fragmenty (rodzic). wystarczający kontekst dla LLM.
from langchain.storage import InMemoryStore
from langchain.retrievers import ParentDocumentRetriever
from langchain.vectorstores import Qdrant
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Splitter per chunks grandi (parent) - per il contesto
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
# Splitter per chunks piccoli (child) - per l'indicizzazione
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=40)
# Storage: chunks grandi in memory store, chunks piccoli in vector store
vectorstore = Qdrant.from_documents(
[], # inizialmente vuoto
OpenAIEmbeddings(),
url="http://localhost:6333",
collection_name="rag_child_chunks"
)
docstore = InMemoryStore() # o Redis per persistenza
# Parent-Child Retriever
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=docstore,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
# Come funziona:
# 1. add_documents() divide i documenti in parent e child chunks
# 2. I child chunks vengono indicizzati nel vector store
# 3. I parent chunks vengono salvati nel docstore con un ID univoco
# 4. I child chunks hanno un metadato "doc_id" che punta al parent
# Retrieval:
# 1. Cerca i child chunks più simili alla query
# 2. Recupera i parent chunks corrispondenti
# Risultato: massima precisione (child) + massimo contesto (parent)
3. Zmiana rankingu: Popraw dokładność wyszukiwania
Il zmiana rankingu jest to technika znacząco poprawiająca jakość wyszukiwania poprzez zastosowanie do wyników drugiego, dokładniejszego (ale wolniejszego) modelu inicjały. Typowy przepływ to: wyszukaj 50–100 kandydatów za pomocą szybkiego wyszukiwania wektorowego, Następnie Zmień kolejność za pomocą precyzyjnego cross-enkodera, W końcu zdobądź najwyższe „k”..
3.1 Bi-enkoder kontra koder krzyżowy
Różnica między tymi dwoma podejściami jest zasadnicza:
- Bi-enkoder (odzyskiwanie): oddzielnie kodować zapytania i dokumenty do wektorów. Bardzo szybki w wyszukiwaniu, ponieważ dokumenty są wstępnie obliczane, ale mniej precyzyjny, ponieważ nie widzi zapytania podczas kodowania dokumentów.
- Cross-enkoder (reranking): łączy zapytanie i dokument i generuje wynik trafności. Znacznie bardziej precyzyjny, ale nie tak skalowalny jak pobieranie, ponieważ musi przetwarzać każdą parę zapytanie-dokument w czasie rzeczywistym.
from sentence_transformers import CrossEncoder
from typing import List, Tuple
import time
class RerankingRetriever:
"""Retriever con reranking a due stadi"""
def __init__(
self,
bi_encoder, # Sentence Transformer per retrieval veloce
vector_index, # FAISS o vector DB
cross_encoder_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
):
self.bi_encoder = bi_encoder
self.index = vector_index
self.cross_encoder = CrossEncoder(cross_encoder_name)
self.documents = []
def retrieve_and_rerank(
self,
query: str,
initial_k: int = 50,
final_k: int = 5
) -> List[Tuple[str, float]]:
"""
Pipeline completa:
1. Retrieval veloce con bi-encoder (top-50)
2. Reranking preciso con cross-encoder (top-5)
"""
t0 = time.time()
# STAGE 1: Retrieval veloce
query_emb = self.bi_encoder.encode([query], normalize_embeddings=True)
scores, indices = self.index.search(query_emb.astype('float32'), initial_k)
candidates = [
(self.documents[i], float(s))
for s, i in zip(scores[0], indices[0])
if i != -1
]
t1 = time.time()
print(f"Retrieval: {(t1-t0)*1000:.1f}ms - {len(candidates)} candidati")
# STAGE 2: Reranking preciso con cross-encoder
if not candidates:
return []
# Prepara coppie (query, documento) per il cross-encoder
cross_pairs = [(query, doc) for doc, _ in candidates]
cross_scores = self.cross_encoder.predict(cross_pairs)
# Combina e riordina
reranked = sorted(
zip([doc for doc, _ in candidates], cross_scores),
key=lambda x: x[1],
reverse=True
)
t2 = time.time()
print(f"Reranking: {(t2-t1)*1000:.1f}ms")
return reranked[:final_k]
def reciprocal_rank_fusion(
self,
results_list: List[List[Tuple[str, float]]],
k: int = 60
) -> List[Tuple[str, float]]:
"""
Reciprocal Rank Fusion: combina risultati da multiple strategie
di retrieval (es. dense + sparse) in un singolo ranking.
Formula: score = sum(1 / (k + rank_i)) per ogni lista i
"""
doc_scores = {}
for results in results_list:
for rank, (doc, _) in enumerate(results):
if doc not in doc_scores:
doc_scores[doc] = 0.0
doc_scores[doc] += 1.0 / (k + rank + 1)
return sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
Modele cross-enkodera do rerankingu
| Model | Prędkość | jakość | Zalecane użycie |
|---|---|---|---|
| koder krzyżowy/ms-marco-MiniLM-L-6-v2 | Wysoki | Dobry | Produkcja, opóźnienie ważne |
| koder krzyżowy/ms-marco-electra-base | Przeciętny | Doskonały | Dobra równowaga |
| BAAI/bge-reranker-large | Niski | Doskonały | Maksymalna jakość, niekrytyczne opóźnienia |
| Spójne API zmiany rankingu | Pszczoła | Doskonały | Prototypowanie, dostępny budżet |
4. Inteligentne buforowanie pod kątem opóźnień i kosztów
W produkcyjnym systemie RAG duży procent zapytań jest podobnych lub identyczne (np. FAQ, często zadawane pytania). The buforowanie semantyczne idzie dalej dokładna pamięć podręczna (uderzenie w pamięć podręczną tylko w przypadku identycznych zapytań) i ponowne wykorzystanie wyników na zapytanie semantycznie podobne, radykalnie zmniejszając koszty wnioskowania LLM.
import redis
import numpy as np
import json
import hashlib
from sentence_transformers import SentenceTransformer
from typing import Optional, Tuple
import time
class SemanticCache:
"""Cache semantica che riusa risposte per query simili"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
embedding_model: str = "all-MiniLM-L6-v2",
similarity_threshold: float = 0.95,
ttl_seconds: int = 3600 # 1 ora
):
self.redis = redis.from_url(redis_url)
self.model = SentenceTransformer(embedding_model)
self.threshold = similarity_threshold
self.ttl = ttl_seconds
def _get_cache_key(self, text: str) -> str:
return f"rag_cache:{hashlib.md5(text.encode()).hexdigest()}"
def get(self, query: str) -> Optional[Tuple[str, float]]:
"""
Cerca in cache:
1. Prima cerca match esatto (O(1))
2. Poi cerca match semantico (O(n) - ottimizzabile con FAISS)
Ritorna (risposta, similarity_score) o None
"""
# Cache esatto
exact_key = self._get_cache_key(query)
cached = self.redis.get(exact_key)
if cached:
data = json.loads(cached)
return data['response'], 1.0
# Cache semantico
query_emb = self.model.encode([query], normalize_embeddings=True)[0]
# Ottimizzazione: usa Redis SCAN per iterare sulle chiavi cached
# In produzione, usa un indice FAISS separato per il lookup semantico
best_score = 0.0
best_response = None
for key in self.redis.scan_iter("rag_cache:*"):
cached = self.redis.get(key)
if not cached:
continue
data = json.loads(cached)
cached_emb = np.array(data['embedding'])
sim = float(np.dot(query_emb, cached_emb))
if sim > best_score:
best_score = sim
best_response = data['response']
if best_score >= self.threshold:
return best_response, best_score
return None
def set(self, query: str, response: str):
"""Salva la risposta in cache con l'embedding della query"""
query_emb = self.model.encode([query], normalize_embeddings=True)[0]
key = self._get_cache_key(query)
data = {
'query': query,
'response': response,
'embedding': query_emb.tolist(),
'timestamp': time.time()
}
self.redis.setex(key, self.ttl, json.dumps(data))
def get_stats(self) -> dict:
"""Statistiche del cache"""
keys = list(self.redis.scan_iter("rag_cache:*"))
return {
'total_cached': len(keys),
'memory_bytes': sum(
self.redis.memory_usage(k) or 0 for k in keys[:100]
)
}
# Integrazione nel RAG pipeline
class CachedRAGPipeline:
def __init__(self, rag_pipeline, cache: SemanticCache):
self.rag = rag_pipeline
self.cache = cache
self.hits = 0
self.misses = 0
def query(self, question: str) -> dict:
# 1. Prova cache
cached = self.cache.get(question)
if cached:
response, sim = cached
self.hits += 1
return {
"answer": response,
"cached": True,
"similarity": sim,
"latency_ms": 1 # quasi istantaneo
}
# 2. Full RAG pipeline
t0 = time.time()
response = self.rag.generate(question)
latency = (time.time() - t0) * 1000
# 3. Salva in cache per future query simili
self.cache.set(question, response)
self.misses += 1
return {
"answer": response,
"cached": False,
"similarity": None,
"latency_ms": latency
}
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
5. Monitorowanie i obserwowalność
System RAG w produkcji musi być widoczny na wszystkich poziomach. To nie wystarczy monitoruj opóźnienie HTTP: musisz mierzyć jakość pobierania, szybkość halucynacje, zadowolenie użytkownika i koszty zapytania.
5.1 Wskaźniki infrastruktury
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from functools import wraps
# Metriche infrastrutturali
rag_queries_total = Counter(
'rag_queries_total',
'Numero totale di query RAG',
['status', 'cached']
)
rag_query_duration = Histogram(
'rag_query_duration_seconds',
'Durata delle query RAG',
['component'],
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
)
rag_retrieval_chunks = Histogram(
'rag_retrieval_chunks',
'Numero di chunks recuperati per query',
buckets=[1, 3, 5, 10, 20, 50]
)
rag_retrieval_score = Histogram(
'rag_retrieval_score',
'Score di rilevanza del top-1 chunk',
buckets=[0.1, 0.3, 0.5, 0.7, 0.8, 0.9, 0.95, 1.0]
)
rag_cache_hit_rate = Gauge(
'rag_cache_hit_rate',
'Tasso di hit del semantic cache'
)
rag_llm_tokens_total = Counter(
'rag_llm_tokens_total',
'Totale token LLM utilizzati',
['type'] # 'prompt' o 'completion'
)
# Metriche qualità (aggiornate da evaluation service)
rag_faithfulness_score = Gauge(
'rag_faithfulness_score',
'Score medio di faithfulness (risposta supportata da contesto)'
)
rag_answer_relevance = Gauge(
'rag_answer_relevance',
'Score medio di rilevanza della risposta'
)
# Decorator per timing automatico
def track_timing(component: str):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
rag_queries_total.labels(status='success', cached='false').inc()
return result
except Exception as e:
rag_queries_total.labels(status='error', cached='false').inc()
raise
finally:
duration = time.time() - start
rag_query_duration.labels(component=component).observe(duration)
return wrapper
return decorator
# Esempio utilizzo
class MonitoredRAGService:
@track_timing('full_query')
def query(self, question: str) -> dict:
# Retrieval
with rag_query_duration.labels(component='retrieval').time():
chunks = self.retrieve(question)
rag_retrieval_chunks.observe(len(chunks))
if chunks:
rag_retrieval_score.observe(chunks[0][1]) # score top-1
# Generation
with rag_query_duration.labels(component='generation').time():
response = self.generate(question, chunks)
return response
5.2 Logowanie strukturalne dla RAG
import structlog
import uuid
from contextlib import contextmanager
# Configura structlog per output JSON
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_log_level,
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
class TracedRAGPipeline:
"""RAG pipeline con tracing distribuito"""
def query(self, question: str, user_id: str = None) -> dict:
trace_id = str(uuid.uuid4())
log = logger.bind(
trace_id=trace_id,
user_id=user_id,
question_hash=hashlib.md5(question.encode()).hexdigest()[:8]
)
log.info("rag_query_start", question_length=len(question))
# Retrieval
t0 = time.time()
try:
chunks = self.retrieve(question)
retrieval_time = time.time() - t0
log.info(
"rag_retrieval_complete",
chunks_retrieved=len(chunks),
top_score=chunks[0][1] if chunks else 0,
retrieval_ms=retrieval_time * 1000
)
except Exception as e:
log.error("rag_retrieval_error", error=str(e))
raise
# Generation
t1 = time.time()
try:
response = self.generate(question, chunks)
generation_time = time.time() - t1
log.info(
"rag_generation_complete",
response_length=len(response),
generation_ms=generation_time * 1000,
total_ms=(time.time() - t0) * 1000
)
except Exception as e:
log.error("rag_generation_error", error=str(e))
raise
return {
"answer": response,
"trace_id": trace_id,
"chunks_used": len(chunks),
"total_ms": (time.time() - t0) * 1000
}
6. Automatyczna ocena za pomocą RAGAS
RAGA (Ocena RAG) to najbardziej rozpowszechnione ramy oceny automatyczne systemy RAG. Mierzy cztery podstawowe wymiary jakości:
- Wierność: Czy odpowiedź jest obsługiwana przez pobrany kontekst? (przeciw halucynacji)
- Trafność odpowiedzi: Czy odpowiedź ma związek z pytaniem?
- Przypomnienie kontekstu: Czy pobrany kontekst zawiera niezbędne informacje?
- Dokładność kontekstu: Czy odzyskane fragmenty są istotne?
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_recall,
context_precision,
answer_correctness
)
from datasets import Dataset
from typing import List, Dict
import pandas as pd
class RAGEvaluator:
"""Valutazione automatica sistema RAG con RAGAS"""
def __init__(self, rag_pipeline):
self.rag = rag_pipeline
def create_evaluation_dataset(
self,
test_questions: List[str],
ground_truths: List[str]
) -> Dataset:
"""
Crea dataset di valutazione processando le domande con il RAG.
Ogni row contiene: domanda, risposta, contesti recuperati, verita.
"""
questions = []
answers = []
contexts = []
for question, gt in zip(test_questions, ground_truths):
# Recupera contesti
chunks = self.rag.retrieve(question, top_k=5)
context_list = [chunk for chunk, _ in chunks]
# Genera risposta
answer = self.rag.generate(question)
questions.append(question)
answers.append(answer)
contexts.append(context_list)
return Dataset.from_dict({
"question": questions,
"answer": answers,
"contexts": contexts,
"ground_truth": ground_truths
})
def evaluate_pipeline(
self,
test_questions: List[str],
ground_truths: List[str]
) -> pd.DataFrame:
"""
Valuta il pipeline RAG e ritorna un report dettagliato.
"""
dataset = self.create_evaluation_dataset(test_questions, ground_truths)
results = evaluate(
dataset,
metrics=[
faithfulness, # 0-1: risposta supportata da contesto?
answer_relevancy, # 0-1: risposta pertinente alla domanda?
context_recall, # 0-1: contesto copre la ground truth?
context_precision, # 0-1: contesto è tutto pertinente?
answer_correctness # 0-1: risposta corretta vs ground truth?
]
)
# Report
df = results.to_pandas()
print("\n=== RAGAS Evaluation Report ===")
print(f"Faithfulness: {df['faithfulness'].mean():.3f}")
print(f"Answer Relevancy: {df['answer_relevancy'].mean():.3f}")
print(f"Context Recall: {df['context_recall'].mean():.3f}")
print(f"Context Precision:{df['context_precision'].mean():.3f}")
print(f"Answer Correct.: {df['answer_correctness'].mean():.3f}")
# Identifica casi problematici
low_faith = df[df['faithfulness'] < 0.5]
if len(low_faith) > 0:
print(f"\nATTENZIONE: {len(low_faith)} domande con faithfulness bassa:")
for _, row in low_faith.iterrows():
print(f" Q: {row['question'][:80]}...")
return df
def continuous_evaluation(self, sample_rate: float = 0.05):
"""
Valutazione continua in produzione: campiona il 5% delle query
e le valuta automaticamente per rilevare degradazione.
"""
import random
def evaluate_sample(question: str, answer: str, contexts: List[str]):
if random.random() > sample_rate:
return
# Stima qualità senza ground truth usando LLM-as-judge
from openai import OpenAI
client = OpenAI()
judge_prompt = f"""Valuta la qualità di questa risposta RAG.
Domanda: {question}
Contesti recuperati: {" | ".join(contexts[:2])}
Risposta generata: {answer}
Valuta su scala 1-5:
1. La risposta è supportata dai contesti? (faithfulness)
2. La risposta è pertinente alla domanda? (relevance)
Rispondi SOLO con JSON: {"faithfulness": X, "relevance": X}"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": judge_prompt}],
temperature=0
)
try:
import json
scores = json.loads(response.choices[0].message.content)
# Aggiorna metriche Prometheus
rag_faithfulness_score.set(scores['faithfulness'] / 5.0)
rag_answer_relevance.set(scores['relevance'] / 5.0)
except:
pass
return evaluate_sample
# Test set di esempio
test_questions = [
"Cos'è il RAG e come funziona?",
"Qual è la differenza tra BERT e Sentence Transformers?",
"Come si sceglie un vector database per la produzione?"
]
ground_truths = [
"RAG (Retrieval-Augmented Generation) combina la ricerca su basi di conoscenza con la generazione LLM per ridurre le allucinazioni.",
"BERT produce embeddings contestuali per token, mentre Sentence Transformers è ottimizzato per produrre embeddings a livello di frase per la similarity search.",
"La scelta dipende da scala, requisiti di latenza, budget e se serve hosting gestito o self-hosted."
]
7. Zarządzanie aktualizacjami korpusu
System RAG w produkcji musi zarządzać korpusem, który zmienia się w czasie: nowymi dokumentami zostaną dodane, dokumenty nieaktualne zostaną usunięte, dokumenty istniejące zostaną dodane zaktualizowany. Z tym jest problem zarządzanie korpusem.
from qdrant_client import QdrantClient
from qdrant_client.models import (
PointStruct, UpdateStatus, Filter, FieldCondition, MatchValue
)
from sentence_transformers import SentenceTransformer
import hashlib
import time
from typing import List, Dict, Optional
class IncrementalRAGIndex:
"""Gestione aggiornamenti incrementali del corpus RAG"""
def __init__(
self,
collection_name: str = "rag_corpus",
embedding_model: str = "all-MiniLM-L6-v2"
):
self.client = QdrantClient(url="http://localhost:6333")
self.model = SentenceTransformer(embedding_model)
self.collection = collection_name
def _doc_hash(self, text: str) -> str:
"""Hash deterministico per deduplicazione"""
return hashlib.sha256(text.encode()).hexdigest()[:16]
def upsert_document(
self,
doc_id: str,
text: str,
metadata: Dict,
chunk_size: int = 512
) -> int:
"""
Upsert (insert o update) di un documento.
Se il documento è già presente e non modificato, skippa.
"""
# Calcola hash per deduplication
content_hash = self._doc_hash(text)
# Controlla se già presente e non modificato
existing = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
),
limit=1
)
if existing[0] and existing[0][0].payload.get('content_hash') == content_hash:
return 0 # Nessun aggiornamento necessario
# Rimuovi vecchi chunks del documento
self.client.delete(
collection_name=self.collection,
points_selector=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
)
)
# Crea nuovi chunks
words = text.split()
chunk_words = chunk_size // 5 # ~5 chars/word
chunks = [
' '.join(words[i:i+chunk_words])
for i in range(0, len(words), chunk_words - 10) # 10 word overlap
if words[i:i+chunk_words]
]
if not chunks:
return 0
# Genera embeddings
embeddings = self.model.encode(chunks, normalize_embeddings=True)
# Crea points
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
point_id = int(hashlib.md5(f"{doc_id}_{i}".encode()).hexdigest()[:8], 16)
points.append(PointStruct(
id=point_id,
vector=embedding.tolist(),
payload={
**metadata,
"doc_id": doc_id,
"chunk_index": i,
"content_hash": content_hash,
"text": chunk,
"updated_at": time.time()
}
))
# Upsert in Qdrant
self.client.upsert(
collection_name=self.collection,
points=points
)
return len(chunks)
def delete_document(self, doc_id: str):
"""Rimuove tutti i chunks di un documento"""
self.client.delete(
collection_name=self.collection,
points_selector=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
)
)
def get_corpus_stats(self) -> Dict:
"""Statistiche sul corpus"""
info = self.client.get_collection(self.collection)
return {
"total_chunks": info.points_count,
"index_status": info.status,
"vectors_config": str(info.config.params.vectors)
}
8. Najlepsze praktyki i anty-wzorce w produkcji
Lista kontrolna gotowości do produkcji RAG
- Kawałki: użyj tokenów o wielkości porcji 400–600 z zakładką 10–15%; przetestuj różne strategie na swoim konkretnym korpusie
- Zmiana rankingu: zawsze wdrażaj koder krzyżowy dla zapytań, w których precyzja ma kluczowe znaczenie; akceptuje dodatkowe opóźnienia (100-300 ms)
- Buforowanie: semantyczna pamięć podręczna z progiem 0,92-0,97 zmniejsza koszty LLM o 30-60% w korpusie przypominającym często zadawane pytania
- Monitorowanie: śledź wierność, trafność odpowiedzi, opóźnienie pobierania, koszt tokena LLM na zapytanie
- Ocena: utrzymuj złoty zestaw testów (100–200 pytań zawierających podstawową prawdę) i oceniaj przy każdym wdrożeniu
- Rezerwy: jeśli wyszukiwanie nie znajdzie niczego istotnego (najwyższy wynik < 0,5), zadeklaruj „nie wiem” zamiast mieć halucynacje
- Wersjonowanie: wersja szablonów osadzania i ponowne indeksowanie po zmianie; utrzymuj indeksy równolegle podczas migracji
Anty-wzorce, których należy unikać w produkcji
- Brak monitorowania jakości: monitorowanie samych opóźnień i czasu pracy nie wystarczy. RAG może technicznie „działać”, ale dawać błędne odpowiedzi.
- Korpus nie zaktualizowany: system RAG oparty na nieaktualnej dokumentacji jest gorszy niż brak RAG: pewnie reaguje na nieprawidłowe informacje.
- Naprawiono górne-k: dostosować liczbę pobieranych fragmentów do długości zapytania. Złożone zapytania wymagają większego kontekstu.
- Ignorowanie opóźnienia osadzania: Generowanie osadzania zapytania zajmuje 10–50 ms. Pomnóż przez 1000 wymagań/s, a stanie się wąskim gardłem.
- LLM jako sędzia absolutny: model generatywny może „halucynować” wyrwany z kontekstu nawet w przypadku RAG. Wdrożyć poręcze dla reakcji.
Wnioski
Doprowadzenie systemu RAG do produkcji wymaga znacznie więcej niż tylko rurociągu sekwencyjny. Widzieliśmy, jak architektura gotowa do produkcji oddziela plany pozyskiwanie i zapytania, jak zaawansowane fragmentowanie bezpośrednio wpływa na jakość, w jaki sposób zmiana rankingu między koderami poprawia dokładność wyszukiwania i w jaki sposób monitorowanie za pomocą RAGAS umożliwia śledzenie jakości w czasie.
Kluczowe punkty:
- Zawsze oddzielaj ścieżkę pozyskiwania i zapytania — mają one radykalnie różne wymagania
- Zainwestuj w porcjowanie: jest to najbardziej wpływowa i często pomijana zmienna
- Zaimplementuj reranking między koderami w przypadkach użycia, w których dokładność ma kluczowe znaczenie
- Użyj buforowania semantycznego, aby zmniejszyć koszty i opóźnienia w przypadku powtarzających się zapytań
- Mierz wierność i trafność odpowiedzi, a nie tylko opóźnienia i czas pracy
- Utrzymuj złoty zestaw testowy i oceniaj przy każdym wdrożeniu za pomocą RAGAS
W następnym artykule będziemy badać LangChain dla RAG: ramy najpopularniejszy do budowania aplikacji LLM, z naciskiem na zaawansowane wzorce, takie jak konwersacyjne RAG, pobieranie wieloskokowe i wywoływanie narzędzi.
Seria trwa
- Artykuł 1: Wyjaśnienie RAG – podstawy
- Artykuł 2: Osadzania i wyszukiwanie semantyczne
- Artykuł 3: Baza danych wektorów - Qdrant vs Pinecone
- Artykuł 4: Odzyskiwanie hybrydowe: BM25 + wektor
- Artykuł 5: RAG w produkcji (bieżący)
- Artykuł 6: LangChain dla RAG
Dowiedz się więcej dzięki: pgvector dla RAG na PostgreSQLu e MLOps: udostępnianie modeli w Produkcji.







