RAG ve výrobě: Architektura, škálování a monitorování
Vytvoření prototypu RAG, který funguje lokálně, je poměrně jednoduché. Přineste to výroba, kde musí zpracovávat tisíce současných dotazů, odpovídá za méně než dva sekund, udržení vysoké kvality v průběhu času a neztrácení dat a něco úplně jiného. Vzdálenost mezi „funguje na mém notebooku“ a „funguje ve výrobě pro 10 000 uživatelů“ je obrovský a mnoho projektů RAG selhává právě v tomto kroku.
V tomto článku se zabýváme skutečnými výzvami RAG ve výrobě: škálovatelná architektura, optimální chunking, reranking, správa aktualizací do korpusu, monitorování pomocí RAG-specifických metrik a automatické vyhodnocování kvalita s frameworky jako RAGAS. Nejde o teorii: každá sekce obsahuje Spustitelný Python kód a vzory testované na skutečných systémech.
Co se naučíte
- Architektura škálovatelného RAG systému připravená na výrobu
- Pokročilé strategie rozdělování (rekurzivní, sémantické, větné okno)
- Potrubí změny pořadí křížového kodéru pro zvýšení přesnosti
- Správa přírůstkových aktualizací vektorového korpusu
- Monitorování pomocí metrik specifických pro RAG (věrnost, relevance, odvolání)
- Automatické vyhodnocení s rámcem RAGAS
- Inteligentní ukládání do mezipaměti pro optimalizaci latence a nákladů
- Ošetření chyb a elegantní degradace ve výrobě
Přehled série
| # | Položka | Soustředit |
|---|---|---|
| 1 | RAG Vysvětleno | Základy a architektura |
| 2 | Vložení a sémantické vyhledávání | BERT, SBERT, FAISS |
| 3 | Vektorové databáze | Qdrant, Šiška, Milvus |
| 4 | Hybridní vyhledávání | BM25 + vektorové vyhledávání |
| 5 | RAG ve výrobě (jste zde) | Škálování, monitorování, vyhodnocování |
| 6 | LangChain pro RAG | Pokročilé rámce a vzory |
| 7 | Správa kontextových oken | Optimalizujte vstup LLM |
| 8 | Multiagentní systémy | Orchestrování a koordinace |
| 9 | Rychlé inženýrství ve výrobě | Šablona, verzování, testování |
| 10 | Znalostní grafy pro AI | Strukturované znalosti v LLM |
1. Architektura systému RAG připraveného na výrobu
Systém RAG ve výrobě není jednoduchým sekvenčním potrubím: je to systém distribuované se specializovanými komponentami, z nichž každá má požadavky na škálovatelnost, odlišná odolnost proti chybám a monitorování. Pochopte kompletní architekturu a první krok k vybudování něčeho, co obstojí ve výrobě.
ARCHITETTURA RAG PRODUCTION
┌─────────────────────────────────────────────────────────┐
│ API GATEWAY │
│ (Rate limiting, auth, routing) │
└──────────────────────┬──────────────────────────────────┘
│
┌──────────┴──────────┐
│ │
┌──────▼──────┐ ┌───────▼────────┐
│ QUERY │ │ INGESTION │
│ SERVICE │ │ SERVICE │
└──────┬──────┘ └───────┬────────┘
│ │
┌───────▼──────┐ ┌───────▼────────┐
│ RETRIEVAL │ │ DOCUMENT │
│ ENGINE │ │ PROCESSOR │
│ ┌─────────┐ │ │ ┌──────────┐ │
│ │Embedding│ │ │ │Chunking │ │
│ │Cache │ │ │ │Embedding │ │
│ └────┬────┘ │ │ │Indexing │ │
│ │ │ │ └──────────┘ │
│ ┌────▼────┐ │ └───────┬────────┘
│ │Vector │ │ │
│ │Search │ │ ┌───────▼────────┐
│ └────┬────┘ │ │ VECTOR DB │
│ │ │ │ (Qdrant/Pine) │
│ ┌────▼────┐ │ └────────────────┘
│ │Reranker │ │
│ └────┬────┘ │
└───────┼──────┘
│
┌───────▼──────┐ ┌────────────────┐
│ GENERATION │ │ CACHE │
│ SERVICE │◄──►│ (Redis/ │
│ (LLM) │ │ Semantic) │
└───────┬──────┘ └────────────────┘
│
┌───────▼──────┐ ┌────────────────┐
│ MONITORING │ │ EVALUATION │
│ SERVICE │ │ SERVICE │
│ (Prometheus)│ │ (RAGAS) │
└──────────────┘ └────────────────┘
1.1 Oddělení obav: Požití versus dotaz
Základním vzorem ve výrobním systému RAG je oddělení mezi plán příjmu a plán dotazů. Tyto dvě cesty mají své požadavky velmi odlišné:
Příjem vs. dotaz: různé požadavky
| Velikost | Cesta požití | QueryPath |
|---|---|---|
| Latence | Není kritické (dávka) | Kritika (<2s p95) |
| Propustnost | Nízká střední (dokumenty) | Vysoká (tisíce požadavků/s) |
| CPU/GPU | Generování vložení (GPU) | Dotazy na vkládání + přehodnocení (GPU) |
| Chyby | Zkuste to znovu s couvnutím | Záložní elegantní |
| Měřítko | Horizontální dávka | Horizontální bezstavové |
2. Pokročilé strategie dělení
Chunking je pravděpodobně nejvíce přehlíženou proměnnou v systémech RAG, přesto ano velký vliv na výslednou kvalitu. Příliš malý kus ztrácí kontext; jeden příliš velký vnáší šum a překračuje kontext modelu vkládání.
2.1 Rekurzivní rozdělovač textu znaků
Nejúčinnější metodou pro většinu případů použití je rekurzivní dělení znaků: postupně rozdělí text na oddělovače jemnější (odstavce, věty, slova), aby respektovala přirozenou strukturu dokumentu.
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict, Any
import re
class AdvancedChunker:
"""Chunker avanzato con metadati e strategie multiple"""
def __init__(
self,
chunk_size: int = 512,
chunk_overlap: int = 64,
strategy: str = "recursive"
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.strategy = strategy
self.recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", "! ", "? ", " ", ""],
length_function=len,
is_separator_regex=False
)
def chunk_with_metadata(
self,
text: str,
doc_metadata: Dict[str, Any]
) -> List[Dict]:
"""Crea chunks con metadati completi per il retrieval"""
if self.strategy == "recursive":
chunks = self.recursive_splitter.split_text(text)
elif self.strategy == "semantic":
chunks = self._semantic_split(text)
elif self.strategy == "sentence_window":
chunks = self._sentence_window_split(text)
else:
raise ValueError(f"Strategia non supportata: {self.strategy}")
result = []
for i, chunk in enumerate(chunks):
chunk_meta = {
**doc_metadata,
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size": len(chunk),
"strategy": self.strategy,
# Snippet per il contesto nella risposta
"prev_chunk": chunks[i-1][:100] if i > 0 else None,
"next_chunk": chunks[i+1][:100] if i < len(chunks)-1 else None,
}
result.append({"text": chunk, "metadata": chunk_meta})
return result
def _sentence_window_split(
self,
text: str,
window_size: int = 3
) -> List[str]:
"""
Sentence Window: indicizza singole frasi ma recupera
le frasi vicine per mantenere il contesto.
Tecnica avanzata che migliora il recall mantenendo la precisione.
"""
# Split in frasi
sentences = re.split(r'(?<=[.!?])\s+', text)
sentences = [s.strip() for s in sentences if s.strip()]
chunks = []
for i, sentence in enumerate(sentences):
# Finestra di contesto: frase centrale + vicine
start = max(0, i - window_size // 2)
end = min(len(sentences), i + window_size // 2 + 1)
window = " ".join(sentences[start:end])
chunks.append(window)
return chunks
def _semantic_split(self, text: str) -> List[str]:
"""
Semantic split: divide in base ai cambi di topic
usando embeddings per individuare boundary semantici.
"""
from sentence_transformers import SentenceTransformer
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
sentences = re.split(r'(?<=[.!?])\s+', text)
if len(sentences) < 3:
return [text]
# Calcola embeddings di ogni frase
embeddings = model.encode(sentences)
# Calcola similarità tra frasi consecutive
similarities = []
for i in range(len(embeddings) - 1):
sim = np.dot(embeddings[i], embeddings[i+1]) / (
np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i+1])
)
similarities.append(sim)
# Trova boundary semantici (bassa similarità = cambio di topic)
threshold = np.mean(similarities) - np.std(similarities)
boundaries = [0]
for i, sim in enumerate(similarities):
if sim < threshold:
boundaries.append(i + 1)
boundaries.append(len(sentences))
# Crea chunks dai segmenti
chunks = []
for i in range(len(boundaries) - 1):
segment = " ".join(sentences[boundaries[i]:boundaries[i+1]])
if len(segment) > self.chunk_size * 2:
# Chunk troppo grande: dividi ulteriormente
sub_chunks = self.recursive_splitter.split_text(segment)
chunks.extend(sub_chunks)
elif segment.strip():
chunks.append(segment)
return chunks
# Esempio di utilizzo
chunker = AdvancedChunker(chunk_size=512, chunk_overlap=64, strategy="recursive")
document = """
Cos'è il RAG?
Il Retrieval-Augmented Generation (RAG) è un'architettura che combina la ricerca su basi
di conoscenza con la generazione di testo degli LLM. Il principio fondamentale è semplice:
invece di lasciare che l'LLM risponda solo con la sua conoscenza interna (soggetta ad
allucinazioni), RAG prima recupera documenti rilevanti e poi li fornisce come contesto.
Come funziona il retrieval?
Il processo di retrieval avviene in due fasi. Prima, i documenti vengono convertiti in
embeddings vettoriali e memorizzati in un vector database. Poi, la query dell'utente viene
anch'essa convertita in un embedding e viene effettuata una ricerca di similarità per
trovare i documenti più rilevanti.
"""
chunks = chunker.chunk_with_metadata(
document,
doc_metadata={"source": "rag-intro.txt", "author": "AI Team"}
)
for chunk in chunks:
print(f"Chunk {chunk['metadata']['chunk_index']}: {len(chunk['text'])} chars")
print(f" {chunk['text'][:100]}...")
2.2 Chunking mezi rodiči a dětmi
Jednou z nejúčinnějších strategií pro pokročilé systémy RAG je Chunking mezi rodiči a dětmi: malé části (děti) jsou indexovány pro přesnost vyhledávání, ale velké kusy (rodič) jsou vráceny, aby poskytovaly dostatečný kontext pro LLM.
from langchain.storage import InMemoryStore
from langchain.retrievers import ParentDocumentRetriever
from langchain.vectorstores import Qdrant
from langchain_openai import OpenAIEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Splitter per chunks grandi (parent) - per il contesto
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200)
# Splitter per chunks piccoli (child) - per l'indicizzazione
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=40)
# Storage: chunks grandi in memory store, chunks piccoli in vector store
vectorstore = Qdrant.from_documents(
[], # inizialmente vuoto
OpenAIEmbeddings(),
url="http://localhost:6333",
collection_name="rag_child_chunks"
)
docstore = InMemoryStore() # o Redis per persistenza
# Parent-Child Retriever
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=docstore,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
# Come funziona:
# 1. add_documents() divide i documenti in parent e child chunks
# 2. I child chunks vengono indicizzati nel vector store
# 3. I parent chunks vengono salvati nel docstore con un ID univoco
# 4. I child chunks hanno un metadato "doc_id" che punta al parent
# Retrieval:
# 1. Cerca i child chunks più simili alla query
# 2. Recupera i parent chunks corrispondenti
# Risultato: massima precisione (child) + massimo contesto (parent)
3. Změna pořadí: Zlepšení přesnosti vyhledávání
Il reranking je to technika, která výrazně zlepšuje kvalitu vyhledávání aplikací druhého, přesnějšího (ale pomalejšího) modelu na výsledky iniciály. Typický tok je: získat 50-100 kandidátů pomocí rychlého vektorového vyhledávání, Pak doobjednejte s přesným křížovým kodérem, Nakonec získat top-k.
3.1 Bi-Encoder vs Cross-Encoder
Rozdíl mezi těmito dvěma přístupy je zásadní:
- Bi-kodér (načítání): kódovat dotazy a dokumenty samostatně do vektorů. Velmi rychlé načítání, protože dokumenty jsou předem vypočítané, ale méně přesné, protože při kódování dokumentů nevidí dotaz.
- Křížový kodér (přehodnocení): vezme dotaz + dokument zřetězený a vytvoří skóre relevance. Mnohem přesnější, ale ne tak škálovatelné jako vyhledávání, protože musí zpracovat každý pár dotaz-dokument v reálném čase.
from sentence_transformers import CrossEncoder
from typing import List, Tuple
import time
class RerankingRetriever:
"""Retriever con reranking a due stadi"""
def __init__(
self,
bi_encoder, # Sentence Transformer per retrieval veloce
vector_index, # FAISS o vector DB
cross_encoder_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"
):
self.bi_encoder = bi_encoder
self.index = vector_index
self.cross_encoder = CrossEncoder(cross_encoder_name)
self.documents = []
def retrieve_and_rerank(
self,
query: str,
initial_k: int = 50,
final_k: int = 5
) -> List[Tuple[str, float]]:
"""
Pipeline completa:
1. Retrieval veloce con bi-encoder (top-50)
2. Reranking preciso con cross-encoder (top-5)
"""
t0 = time.time()
# STAGE 1: Retrieval veloce
query_emb = self.bi_encoder.encode([query], normalize_embeddings=True)
scores, indices = self.index.search(query_emb.astype('float32'), initial_k)
candidates = [
(self.documents[i], float(s))
for s, i in zip(scores[0], indices[0])
if i != -1
]
t1 = time.time()
print(f"Retrieval: {(t1-t0)*1000:.1f}ms - {len(candidates)} candidati")
# STAGE 2: Reranking preciso con cross-encoder
if not candidates:
return []
# Prepara coppie (query, documento) per il cross-encoder
cross_pairs = [(query, doc) for doc, _ in candidates]
cross_scores = self.cross_encoder.predict(cross_pairs)
# Combina e riordina
reranked = sorted(
zip([doc for doc, _ in candidates], cross_scores),
key=lambda x: x[1],
reverse=True
)
t2 = time.time()
print(f"Reranking: {(t2-t1)*1000:.1f}ms")
return reranked[:final_k]
def reciprocal_rank_fusion(
self,
results_list: List[List[Tuple[str, float]]],
k: int = 60
) -> List[Tuple[str, float]]:
"""
Reciprocal Rank Fusion: combina risultati da multiple strategie
di retrieval (es. dense + sparse) in un singolo ranking.
Formula: score = sum(1 / (k + rank_i)) per ogni lista i
"""
doc_scores = {}
for results in results_list:
for rank, (doc, _) in enumerate(results):
if doc not in doc_scores:
doc_scores[doc] = 0.0
doc_scores[doc] += 1.0 / (k + rank + 1)
return sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
Modely křížového kodéru pro změnu pořadí
| Model | Rychlost | kvalitní | Doporučené použití |
|---|---|---|---|
| křížový kodér/ms-marco-MiniLM-L-6-v2 | Vysoký | Dobrý | Důležitá je produkce, latence |
| křížový kodér/ms-marco-elektro-báze | Průměrný | Vynikající | Dobrá rovnováha |
| BAAI/bge-reranker-large | Nízký | Vynikající | Maximální kvalita, nekritická latence |
| Cohere Rerank API | API | Vynikající | Prototypování, dostupný rozpočet |
4. Inteligentní ukládání do mezipaměti pro latenci a náklady
V produkčním RAG systému je velké procento dotazů podobných resp identické (např. FAQ, často kladené otázky). The sémantické ukládání do mezipaměti jde to dál přesná mezipaměť (mezipaměti pouze u identických dotazů) a opakované použití výsledků na dotaz sémanticky podobné, což výrazně snižuje náklady na odvození LLM.
import redis
import numpy as np
import json
import hashlib
from sentence_transformers import SentenceTransformer
from typing import Optional, Tuple
import time
class SemanticCache:
"""Cache semantica che riusa risposte per query simili"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
embedding_model: str = "all-MiniLM-L6-v2",
similarity_threshold: float = 0.95,
ttl_seconds: int = 3600 # 1 ora
):
self.redis = redis.from_url(redis_url)
self.model = SentenceTransformer(embedding_model)
self.threshold = similarity_threshold
self.ttl = ttl_seconds
def _get_cache_key(self, text: str) -> str:
return f"rag_cache:{hashlib.md5(text.encode()).hexdigest()}"
def get(self, query: str) -> Optional[Tuple[str, float]]:
"""
Cerca in cache:
1. Prima cerca match esatto (O(1))
2. Poi cerca match semantico (O(n) - ottimizzabile con FAISS)
Ritorna (risposta, similarity_score) o None
"""
# Cache esatto
exact_key = self._get_cache_key(query)
cached = self.redis.get(exact_key)
if cached:
data = json.loads(cached)
return data['response'], 1.0
# Cache semantico
query_emb = self.model.encode([query], normalize_embeddings=True)[0]
# Ottimizzazione: usa Redis SCAN per iterare sulle chiavi cached
# In produzione, usa un indice FAISS separato per il lookup semantico
best_score = 0.0
best_response = None
for key in self.redis.scan_iter("rag_cache:*"):
cached = self.redis.get(key)
if not cached:
continue
data = json.loads(cached)
cached_emb = np.array(data['embedding'])
sim = float(np.dot(query_emb, cached_emb))
if sim > best_score:
best_score = sim
best_response = data['response']
if best_score >= self.threshold:
return best_response, best_score
return None
def set(self, query: str, response: str):
"""Salva la risposta in cache con l'embedding della query"""
query_emb = self.model.encode([query], normalize_embeddings=True)[0]
key = self._get_cache_key(query)
data = {
'query': query,
'response': response,
'embedding': query_emb.tolist(),
'timestamp': time.time()
}
self.redis.setex(key, self.ttl, json.dumps(data))
def get_stats(self) -> dict:
"""Statistiche del cache"""
keys = list(self.redis.scan_iter("rag_cache:*"))
return {
'total_cached': len(keys),
'memory_bytes': sum(
self.redis.memory_usage(k) or 0 for k in keys[:100]
)
}
# Integrazione nel RAG pipeline
class CachedRAGPipeline:
def __init__(self, rag_pipeline, cache: SemanticCache):
self.rag = rag_pipeline
self.cache = cache
self.hits = 0
self.misses = 0
def query(self, question: str) -> dict:
# 1. Prova cache
cached = self.cache.get(question)
if cached:
response, sim = cached
self.hits += 1
return {
"answer": response,
"cached": True,
"similarity": sim,
"latency_ms": 1 # quasi istantaneo
}
# 2. Full RAG pipeline
t0 = time.time()
response = self.rag.generate(question)
latency = (time.time() - t0) * 1000
# 3. Salva in cache per future query simili
self.cache.set(question, response)
self.misses += 1
return {
"answer": response,
"cached": False,
"similarity": None,
"latency_ms": latency
}
@property
def hit_rate(self) -> float:
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
5. Monitorování a pozorovatelnost
Systém RAG ve výrobě musí být pozorovatelný na všech úrovních. To nestačí monitorovat latenci HTTP: potřebujete měřit kvalitu vyhledávání, rychlost halucinace, spokojenost uživatelů a náklady na dotaz.
5.1 Metriky infrastruktury
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from functools import wraps
# Metriche infrastrutturali
rag_queries_total = Counter(
'rag_queries_total',
'Numero totale di query RAG',
['status', 'cached']
)
rag_query_duration = Histogram(
'rag_query_duration_seconds',
'Durata delle query RAG',
['component'],
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0]
)
rag_retrieval_chunks = Histogram(
'rag_retrieval_chunks',
'Numero di chunks recuperati per query',
buckets=[1, 3, 5, 10, 20, 50]
)
rag_retrieval_score = Histogram(
'rag_retrieval_score',
'Score di rilevanza del top-1 chunk',
buckets=[0.1, 0.3, 0.5, 0.7, 0.8, 0.9, 0.95, 1.0]
)
rag_cache_hit_rate = Gauge(
'rag_cache_hit_rate',
'Tasso di hit del semantic cache'
)
rag_llm_tokens_total = Counter(
'rag_llm_tokens_total',
'Totale token LLM utilizzati',
['type'] # 'prompt' o 'completion'
)
# Metriche qualità (aggiornate da evaluation service)
rag_faithfulness_score = Gauge(
'rag_faithfulness_score',
'Score medio di faithfulness (risposta supportata da contesto)'
)
rag_answer_relevance = Gauge(
'rag_answer_relevance',
'Score medio di rilevanza della risposta'
)
# Decorator per timing automatico
def track_timing(component: str):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
try:
result = func(*args, **kwargs)
rag_queries_total.labels(status='success', cached='false').inc()
return result
except Exception as e:
rag_queries_total.labels(status='error', cached='false').inc()
raise
finally:
duration = time.time() - start
rag_query_duration.labels(component=component).observe(duration)
return wrapper
return decorator
# Esempio utilizzo
class MonitoredRAGService:
@track_timing('full_query')
def query(self, question: str) -> dict:
# Retrieval
with rag_query_duration.labels(component='retrieval').time():
chunks = self.retrieve(question)
rag_retrieval_chunks.observe(len(chunks))
if chunks:
rag_retrieval_score.observe(chunks[0][1]) # score top-1
# Generation
with rag_query_duration.labels(component='generation').time():
response = self.generate(question, chunks)
return response
5.2 Strukturované protokolování pro RAG
import structlog
import uuid
from contextlib import contextmanager
# Configura structlog per output JSON
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_log_level,
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
class TracedRAGPipeline:
"""RAG pipeline con tracing distribuito"""
def query(self, question: str, user_id: str = None) -> dict:
trace_id = str(uuid.uuid4())
log = logger.bind(
trace_id=trace_id,
user_id=user_id,
question_hash=hashlib.md5(question.encode()).hexdigest()[:8]
)
log.info("rag_query_start", question_length=len(question))
# Retrieval
t0 = time.time()
try:
chunks = self.retrieve(question)
retrieval_time = time.time() - t0
log.info(
"rag_retrieval_complete",
chunks_retrieved=len(chunks),
top_score=chunks[0][1] if chunks else 0,
retrieval_ms=retrieval_time * 1000
)
except Exception as e:
log.error("rag_retrieval_error", error=str(e))
raise
# Generation
t1 = time.time()
try:
response = self.generate(question, chunks)
generation_time = time.time() - t1
log.info(
"rag_generation_complete",
response_length=len(response),
generation_ms=generation_time * 1000,
total_ms=(time.time() - t0) * 1000
)
except Exception as e:
log.error("rag_generation_error", error=str(e))
raise
return {
"answer": response,
"trace_id": trace_id,
"chunks_used": len(chunks),
"total_ms": (time.time() - t0) * 1000
}
6. Automatické vyhodnocení pomocí RAGAS
RAGAS (RAG Assessment) je nejrozšířenějším rámcem pro hodnocení automatické systémy RAG. Měří čtyři základní dimenze kvality:
- Věrnost: Je odpověď podporována načteným kontextem? (proti halucinacím)
- Relevance odpovědi: Je odpověď relevantní k otázce?
- Připomenutí kontextu: Obsahuje načtený kontext potřebné informace?
- Kontextová přesnost: Jsou všechny obnovené kousky relevantní?
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_recall,
context_precision,
answer_correctness
)
from datasets import Dataset
from typing import List, Dict
import pandas as pd
class RAGEvaluator:
"""Valutazione automatica sistema RAG con RAGAS"""
def __init__(self, rag_pipeline):
self.rag = rag_pipeline
def create_evaluation_dataset(
self,
test_questions: List[str],
ground_truths: List[str]
) -> Dataset:
"""
Crea dataset di valutazione processando le domande con il RAG.
Ogni row contiene: domanda, risposta, contesti recuperati, verita.
"""
questions = []
answers = []
contexts = []
for question, gt in zip(test_questions, ground_truths):
# Recupera contesti
chunks = self.rag.retrieve(question, top_k=5)
context_list = [chunk for chunk, _ in chunks]
# Genera risposta
answer = self.rag.generate(question)
questions.append(question)
answers.append(answer)
contexts.append(context_list)
return Dataset.from_dict({
"question": questions,
"answer": answers,
"contexts": contexts,
"ground_truth": ground_truths
})
def evaluate_pipeline(
self,
test_questions: List[str],
ground_truths: List[str]
) -> pd.DataFrame:
"""
Valuta il pipeline RAG e ritorna un report dettagliato.
"""
dataset = self.create_evaluation_dataset(test_questions, ground_truths)
results = evaluate(
dataset,
metrics=[
faithfulness, # 0-1: risposta supportata da contesto?
answer_relevancy, # 0-1: risposta pertinente alla domanda?
context_recall, # 0-1: contesto copre la ground truth?
context_precision, # 0-1: contesto è tutto pertinente?
answer_correctness # 0-1: risposta corretta vs ground truth?
]
)
# Report
df = results.to_pandas()
print("\n=== RAGAS Evaluation Report ===")
print(f"Faithfulness: {df['faithfulness'].mean():.3f}")
print(f"Answer Relevancy: {df['answer_relevancy'].mean():.3f}")
print(f"Context Recall: {df['context_recall'].mean():.3f}")
print(f"Context Precision:{df['context_precision'].mean():.3f}")
print(f"Answer Correct.: {df['answer_correctness'].mean():.3f}")
# Identifica casi problematici
low_faith = df[df['faithfulness'] < 0.5]
if len(low_faith) > 0:
print(f"\nATTENZIONE: {len(low_faith)} domande con faithfulness bassa:")
for _, row in low_faith.iterrows():
print(f" Q: {row['question'][:80]}...")
return df
def continuous_evaluation(self, sample_rate: float = 0.05):
"""
Valutazione continua in produzione: campiona il 5% delle query
e le valuta automaticamente per rilevare degradazione.
"""
import random
def evaluate_sample(question: str, answer: str, contexts: List[str]):
if random.random() > sample_rate:
return
# Stima qualità senza ground truth usando LLM-as-judge
from openai import OpenAI
client = OpenAI()
judge_prompt = f"""Valuta la qualità di questa risposta RAG.
Domanda: {question}
Contesti recuperati: {" | ".join(contexts[:2])}
Risposta generata: {answer}
Valuta su scala 1-5:
1. La risposta è supportata dai contesti? (faithfulness)
2. La risposta è pertinente alla domanda? (relevance)
Rispondi SOLO con JSON: {"faithfulness": X, "relevance": X}"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": judge_prompt}],
temperature=0
)
try:
import json
scores = json.loads(response.choices[0].message.content)
# Aggiorna metriche Prometheus
rag_faithfulness_score.set(scores['faithfulness'] / 5.0)
rag_answer_relevance.set(scores['relevance'] / 5.0)
except:
pass
return evaluate_sample
# Test set di esempio
test_questions = [
"Cos'è il RAG e come funziona?",
"Qual è la differenza tra BERT e Sentence Transformers?",
"Come si sceglie un vector database per la produzione?"
]
ground_truths = [
"RAG (Retrieval-Augmented Generation) combina la ricerca su basi di conoscenza con la generazione LLM per ridurre le allucinazioni.",
"BERT produce embeddings contestuali per token, mentre Sentence Transformers è ottimizzato per produrre embeddings a livello di frase per la similarity search.",
"La scelta dipende da scala, requisiti di latenza, budget e se serve hosting gestito o self-hosted."
]
7. Správa aktualizací korpusu
Systém RAG ve výrobě musí spravovat korpus, který se v čase mění: nové dokumenty jsou přidány, zastaralé dokumenty mají být odstraněny, stávající dokumenty jsou aktualizováno. S tím je problém řízení korpusu.
from qdrant_client import QdrantClient
from qdrant_client.models import (
PointStruct, UpdateStatus, Filter, FieldCondition, MatchValue
)
from sentence_transformers import SentenceTransformer
import hashlib
import time
from typing import List, Dict, Optional
class IncrementalRAGIndex:
"""Gestione aggiornamenti incrementali del corpus RAG"""
def __init__(
self,
collection_name: str = "rag_corpus",
embedding_model: str = "all-MiniLM-L6-v2"
):
self.client = QdrantClient(url="http://localhost:6333")
self.model = SentenceTransformer(embedding_model)
self.collection = collection_name
def _doc_hash(self, text: str) -> str:
"""Hash deterministico per deduplicazione"""
return hashlib.sha256(text.encode()).hexdigest()[:16]
def upsert_document(
self,
doc_id: str,
text: str,
metadata: Dict,
chunk_size: int = 512
) -> int:
"""
Upsert (insert o update) di un documento.
Se il documento è già presente e non modificato, skippa.
"""
# Calcola hash per deduplication
content_hash = self._doc_hash(text)
# Controlla se già presente e non modificato
existing = self.client.scroll(
collection_name=self.collection,
scroll_filter=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
),
limit=1
)
if existing[0] and existing[0][0].payload.get('content_hash') == content_hash:
return 0 # Nessun aggiornamento necessario
# Rimuovi vecchi chunks del documento
self.client.delete(
collection_name=self.collection,
points_selector=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
)
)
# Crea nuovi chunks
words = text.split()
chunk_words = chunk_size // 5 # ~5 chars/word
chunks = [
' '.join(words[i:i+chunk_words])
for i in range(0, len(words), chunk_words - 10) # 10 word overlap
if words[i:i+chunk_words]
]
if not chunks:
return 0
# Genera embeddings
embeddings = self.model.encode(chunks, normalize_embeddings=True)
# Crea points
points = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
point_id = int(hashlib.md5(f"{doc_id}_{i}".encode()).hexdigest()[:8], 16)
points.append(PointStruct(
id=point_id,
vector=embedding.tolist(),
payload={
**metadata,
"doc_id": doc_id,
"chunk_index": i,
"content_hash": content_hash,
"text": chunk,
"updated_at": time.time()
}
))
# Upsert in Qdrant
self.client.upsert(
collection_name=self.collection,
points=points
)
return len(chunks)
def delete_document(self, doc_id: str):
"""Rimuove tutti i chunks di un documento"""
self.client.delete(
collection_name=self.collection,
points_selector=Filter(
must=[FieldCondition(
key="doc_id",
match=MatchValue(value=doc_id)
)]
)
)
def get_corpus_stats(self) -> Dict:
"""Statistiche sul corpus"""
info = self.client.get_collection(self.collection)
return {
"total_chunks": info.points_count,
"index_status": info.status,
"vectors_config": str(info.config.params.vectors)
}
8. Best Practices a Anti-Patterns in Production
Kontrolní seznam připravenosti na výrobu RAG
- Chunking: použijte blok o velikosti 400-600 tokenů s 10-15% překrytím; otestujte různé strategie na vašem konkrétním korpusu
- Změna pořadí: vždy implementujte křížový kodér pro dotazy, kde je přesnost kritická; akceptuje extra latenci (100-300 ms)
- Ukládání do mezipaměti: sémantická mezipaměť s prahem 0,92–0,97 snižuje náklady LLM o 30–60 % u korpusu typu FAQ
- Sledování: sledovat věrnost, relevanci odpovědi, latenci vyhledávání, cenu tokenu LLM na dotaz
- Hodnocení: udržovat zlatou sadu testů (100–200 otázek se základní pravdou) a vyhodnocovat při každém nasazení
- Záložní: pokud vyhledávání nenajde nic relevantního (nejvyšší skóre < 0,5), místo halucinací prohlaste „nevím“
- Verze: verze šablon pro vkládání a reindexace při změně; udržovat indexy během migrace paralelní
Anti-vzory, kterým je třeba se ve výrobě vyhnout
- Žádné sledování kvality: sledování pouze latence a doby provozu nestačí. RAG může technicky „fungovat“, ale dává špatné odpovědi.
- Korpus není aktualizován: systém RAG na zastaralé dokumentaci je horší než žádný RAG: s jistotou reaguje na nesprávné informace.
- Pevná horní část: přizpůsobte počet načtených bloků délce dotazu. Složité dotazy vyžadují více kontextu.
- Ignorování latence vkládání: Generování vložení dotazu trvá 10-50 ms. Vynásobte 1000 req/s a stane se úzkým hrdlem.
- LLM jako absolutní rozhodčí: generativní model může „halucinovat“ z kontextu i s RAG. Implementujte zábradlí pro reakce.
Závěry
Uvedení systému RAG do výroby vyžaduje mnohem více než jen potrubí sekvenční. Viděli jsme, jak architektura připravená na výrobu odděluje plány příjem a dotazy, jak pokročilé dělení přímo ovlivňuje kvalitu, jak změna pořadí mezi kodérem zlepšuje přesnost vyhledávání a jak monitorování pomocí RAGAS vám umožňuje sledovat kvalitu v průběhu času.
Klíčové body:
- Vždy oddělte cestu příjmu a dotazu – mají radikálně odlišné požadavky
- Investujte do chunkingu: je to nejpůsobivější a často přehlížená proměnná
- Implementujte přehodnocení křížového kodéru pro případy použití, kde je přesnost rozhodující
- Použijte sémantické ukládání do mezipaměti ke snížení nákladů a latence u opakujících se dotazů
- Měřte věrnost a relevanci odpovědi, nejen latenci a dobu provozu
- Udržujte zlatou testovací sadu a vyhodnocujte při každém nasazení s RAGAS
V příštím článku prozkoumáme LangChain pro RAG: rámec nejoblíbenější pro budování LLM aplikací, se zaměřením na pokročilé vzory jako např konverzační RAG, multi-hop vyhledávání a volání nástrojů.
Série pokračuje
- Článek 1: Vysvětlení RAG – Základy
- Článek 2: Vložení a sémantické vyhledávání
- Článek 3: Vektorová databáze – Qdrant vs Pinecone
- Článek 4: Hybridní vyhledávání: BM25 + Vector
- Článek 5: RAG ve výrobě (aktuální)
- Článek 6: LangChain pro RAG
Další informace: pgvector pro RAG na PostgreSQL e MLOps: Model Serving ve výrobě.







