Recuperare hibridă: combinarea BM25 și căutarea vectorială pentru RAG de producție
Căutarea semantică cu încorporare a revoluționat modul în care regăsim informații în sistemele RAG, dar ascunde o limită fundamentală care apare regulat în producţie: dacă utilizatorul caută „Indica de referință pentru rata halucinațiilor GPT-4 T3 2024”, un model de încorporare va găsi documentele semantic aproape de conceptul de „halucinație a modelelor de limbaj”, dar este posibil să nu recupereze documentul exact cel care conține acel șir specific de text. Căutarea prin cuvinte cheie, pe de altă parte, găsește exact acea propoziție, dar nu știe că „problema de fapt LLM” este identică din punct de vedere conceptual.
Il Recuperare hibridă s-a născut tocmai pentru a rezolva această tensiune. Combinând cercetarea rar (BM25 și variante) cu căutare densă (căutare vectorială), obțineți un sistem care este atât precis pe potriviri exacte este robustă pe înțelegerea semantică. Cercetările recente arată că sistemele hibride îmbunătățesc calitatea regăsirii 48% comparativ cu metodele unice în benchmark-urile BEIR și MTEB, cu câștiguri deosebit de puternice la interogări tehnice, nume proprii și terminologie de specialitate.
Acest articol este o scufundare tehnică profundă asupra arhitecturii de recuperare hibridă: de la BM25 cum funcționează intern, la metode de fuziune (Reciprocal Rank Fusion, fuziune ponderată), la re-clasificare cu cross-encoder, până la implementare practică cu Qdrant și evaluare cu metrici NDCG/MRR. Scopul este de a oferi instrumentele pentru a construi și optimiza conductele de recuperare care funcționează în producție, nu doar în benchmark-uri.
Ce vei învăța
- Limitările căutării numai semantice și de ce BM25 rămâne relevant în 2025
- Algoritmul BM25: saturație de frecvență a termenului, ponderare IDF, normalizare lungimi
- Arhitectură de căutare hibridă: rar + dens în paralel
- Reciprocal Rank Fusion (RRF): formula, implementarea și reglarea parametrului k
- Fuziunea scorului ponderat: normalizarea și echilibrarea contribuțiilor
- Re-clasificare încrucișată: când să-l folosiți și cum să optimizați latența față de acuratețe
- Implementare cu vectori rari Qdrant și API-ul de interogare
- Valori de evaluare: NDCG@k, MRR, Precision@k pentru căutare hibridă
- Conducta de producție cu stocare în cache, monitorizare și optimizare progresivă
Limitele cercetării numai semantice
Căutarea semantică cu vectori denși este puternică pentru a capta sensul latent al textului, dar are vulnerabilități structurale care devin evidente cu interogări reale de producție. Înțelegerea acestor limitări este primul pas în înțelegerea de ce este necesară recuperarea hibridă și nu doar opțional în sistemele RAG serioase.
Problema principală este ceea ce numesc cercetătorii nepotrivire de vocabular: modelul de încorporare este antrenat pe distribuții generale de text și nu captează întotdeauna relevanța anumitor termeni tehnici, acronime, nume de produse, versiuni de software sau coduri de identificare. Un model de încorporare nu știe că „MSMARCO-v2.1” se referă la un set de date specific sau că „CVE-2024-4577” este o vulnerabilitate critică PHP, cu excepția cazului în care a fost reglat fin pe acel domeniu.
Când căutarea semantică eșuează
- Interogări cu numere de versiune: „Python 3.12 asyncio.TaskGroup” vs „Șabloane asincrone Python”
- Identificatori unici: ID-uri CVE, numere de comandă, coduri fiscale, ISBN
- Acronime neobișnuite: termeni de domeniu, acronime de companie, coduri de reglementare
- Prenume rare: nume de persoane, companii mici, locații geografice
- Întrebări foarte scurte: cu 1-2 jetoane încorporarea nu este foarte discriminativă
- Terminologie tehnică recentă: modelele cu limite de cunoaștere nu cunosc termeni noi
O a doua problemă este calibrarea scorului: scorurile de similaritate ale vectorilor densi (de obicei, asemănarea cosinusului în intervalul [-1, 1] sau produs punctual nemărginit) nu au semantică absolută. Un document cu un scor de 0,85 nu este neapărat mai relevant decât unul cu 0,82 in contexte diferite. Acest lucru face dificilă compararea sau combinarea scorurilor din diferite sisteme fără o normalizare adecvată.
În cele din urmă, căutarea semantică are de suferit deriva semantică la întrebări ambigue: o interogare precum „Java” într-un context de programare ar putea prelua documente despre „Java Island” dacă contextul documentului nu este suficient de clar pentru modelul de încorporare, mai ales cu bucăți de text foarte scurte sau decontextualizate.
BM25: Reîmprospătarea algoritmului fundamental
BM25 (Best Match 25) este o funcție de clasare dezvoltată în anii 1990 care rămâne, în 2025, unul dintre cei mai eficienți algoritmi de regăsire a informațiilor pentru cercetarea cuvintelor cheie. Înțelegerea funcționările sale interne sunt necesare atât pentru a-l folosi corect, cât și pentru a înțelege de ce completează atât de bine căutarea semantică.
BM25 extinde TF-IDF cu două mecanisme cheie: saturație de frecvență termen e normalizarea lungimii. Formula completă pentru notarea unui document D în raport cu o interogare Q cu termenii {q1, ..., qn} și:
# Formula BM25 (pseudocodice matematico)
# score(D, Q) = sommatoria per qi in Q di:
# IDF(qi) * (TF(qi, D) * (k1 + 1)) / (TF(qi, D) + k1 * (1 - b + b * |D| / avgdl))
#
# Dove:
# IDF(qi) = log((N - df_i + 0.5) / (df_i + 0.5) + 1)
# TF(qi, D) = frequenza del termine qi nel documento D
# |D| = lunghezza del documento D in termini
# avgdl = lunghezza media dei documenti nella collection
# N = numero totale di documenti
# df_i = numero di documenti che contengono qi
# k1 = parametro di saturazione TF (default: 1.2-2.0)
# b = parametro di length normalization (default: 0.75)
# Implementazione Python con rank_bm25
from rank_bm25 import BM25Okapi
import nltk
from nltk.tokenize import word_tokenize
class BM25Retriever:
def __init__(self, corpus: list[str], k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
# Tokenizzazione e lowercase
self.tokenized_corpus = [
word_tokenize(doc.lower()) for doc in corpus
]
self.bm25 = BM25Okapi(self.tokenized_corpus, k1=k1, b=b)
self.corpus = corpus
def retrieve(self, query: str, top_k: int = 20) -> list[dict]:
tokenized_query = word_tokenize(query.lower())
scores = self.bm25.get_scores(tokenized_query)
# Crea lista di (index, score) ordinata per score decrescente
ranked = sorted(
enumerate(scores),
key=lambda x: x[1],
reverse=True
)[:top_k]
return [
{"doc_id": idx, "text": self.corpus[idx], "score": score}
for idx, score in ranked
if score > 0 # Filtra documenti senza match
]
# Uso pratico
corpus = [
"BM25 is a ranking function used in information retrieval",
"Vector search uses dense embeddings for semantic similarity",
"Hybrid search combines BM25 and vector search for better recall",
"Python asyncio enables concurrent programming",
]
retriever = BM25Retriever(corpus, k1=1.5, b=0.75)
results = retriever.retrieve("BM25 hybrid search retrieval", top_k=3)
for r in results:
print(f"Score: {r['score']:.4f} | Text: {r['text'][:60]}...")
Parametrul k1 controlează saturația termenului de frecvență: cu k1 scăzut (0,5), diferența dintre 1 și 2 apariții ale unui termen contează aproape la fel de mult ca diferența dintre 10 și 100 apariții; cu k1 ridicat (2,0), TF continuă să aibă un impact chiar și pentru frecvențe înalte. Parametrul b controlează cât de mult să penalizeze documentele lungi: b=0 dezactivează normalizarea lungimii, b=1 o normalizează complet.
Un aspect adesea trecut cu vederea al BM25 este că scorurile sale sunt nu mărginit superior: un document foarte relevant cu multe apariții ale termenului de căutare poate avea un scor de 10, 50 sau 100 in functie de corpus. Acest lucru creează o problemă de compatibilitate directă cu scorurile de similaritate cosinus de vectori densi, care sunt în [-1, 1]. Fuziunea hibridă trebuie să se ocupe de această discrepanță.
Arhitectura de căutare hibridă: cum să le combinați
Arhitectura de bază a unui sistem hibrid de recuperare efectuează căutări rare și dense în paralel la indexare separată, apoi îmbină rezultatele înainte de a le returna utilizatorului (sau LLM în contextul RAG). Există trei puncte arhitecturale în care poate avea loc fuziunea, cu diferite compromisuri:
- Fuziune timpurie (pre-recuperare): documentele sunt reprezentate cu un vector hibrid care combină caracteristici rare și dense înainte de indexare. Exemple: SPLADE, ColBERT în mod cap la cap. Mai scump din punct de vedere al indexării, dar mai consistent.
- Fuziune târzie (post-recuperare): cei doi retrievers operează independent pe indici separate și rezultatele sunt îmbinate la nivel de clasare. Este cea mai comună și flexibilă abordare, vă permite să actualizați componente în mod independent.
- Etape de reclasificare: un model separat (cross-encoder) reordonează rezultatele contopit prin fuziune târzie. Adaugă latență, dar îmbunătățește semnificativ precision@k.
# Architettura base hybrid retrieval con late fusion
from typing import Protocol
import asyncio
class Retriever(Protocol):
async def search(self, query: str, top_k: int) -> list[dict]:
"""Ritorna lista di {'doc_id': str, 'text': str, 'score': float}"""
...
class HybridRetriever:
def __init__(
self,
sparse_retriever: Retriever,
dense_retriever: Retriever,
fusion_method: str = "rrf", # "rrf" | "weighted" | "dbsf"
sparse_weight: float = 0.4,
dense_weight: float = 0.6,
top_k_per_retriever: int = 50, # Recupera più docs per la fusione
):
self.sparse = sparse_retriever
self.dense = dense_retriever
self.fusion_method = fusion_method
self.sparse_weight = sparse_weight
self.dense_weight = dense_weight
self.top_k_per_retriever = top_k_per_retriever
async def search(self, query: str, final_top_k: int = 10) -> list[dict]:
# Esecuzione parallela dei due retriever
sparse_results, dense_results = await asyncio.gather(
self.sparse.search(query, self.top_k_per_retriever),
self.dense.search(query, self.top_k_per_retriever)
)
if self.fusion_method == "rrf":
return self._rrf_fusion(sparse_results, dense_results, final_top_k)
elif self.fusion_method == "weighted":
return self._weighted_fusion(sparse_results, dense_results, final_top_k)
else:
raise ValueError(f"Unknown fusion method: {self.fusion_method}")
def _rrf_fusion(self, sparse: list[dict], dense: list[dict], k: int) -> list[dict]:
# Implementato nella sezione successiva
pass
def _weighted_fusion(self, sparse: list[dict], dense: list[dict], k: int) -> list[dict]:
# Implementato nella sezione successiva
pass
Fuziune de rang reciproc (RRF)
RRF este cel mai folosit algoritm de fuziune în căutarea hibridă datorită simplității și robusteței sale și independența față de scala de scor. Propus inițial de Cormack, Clarke și Buettcher în 2009, atribuie fiecărui document un punctaj bazat exclusiv pe poziţia sa în clasamentul fiecărui retriever, ignorând complet valoarea absolută a scorului.
Formula RRF pentru notarea unui document D care apare în listele L1, L2, ..., Lm și:
RRF(D) = însumarea pentru i=1..m a: 1 / (k + rang_i(D))
Unde k este o constantă (de obicei 60) care atenuează impactul documentelor din partea de sus a listelor. Dacă D nu apare în lista i, contribuția sa este 0. Parametrul k=60 a fost determinat empiric: valorile tipice variază de la 10 la 100.
# Implementazione completa RRF
from collections import defaultdict
def reciprocal_rank_fusion(
result_lists: list[list[dict]],
k: int = 60,
id_field: str = "doc_id"
) -> list[dict]:
"""
Fonde N liste di risultati usando Reciprocal Rank Fusion.
Args:
result_lists: Lista di liste, ognuna ordinata per relevance decrescente
k: Costante di smorzamento (default 60, range consigliato 10-100)
id_field: Campo usato come identificatore univoco del documento
Returns:
Lista fusa ordinata per RRF score decrescente
"""
rrf_scores = defaultdict(float)
doc_registry = {} # Mappa doc_id -> doc completo
for result_list in result_lists:
for rank, doc in enumerate(result_list, start=1):
doc_id = doc[id_field]
# Formula RRF: 1 / (k + rank)
rrf_scores[doc_id] += 1.0 / (k + rank)
# Salva il documento (usa l'ultimo trovato se compare più volte)
if doc_id not in doc_registry:
doc_registry[doc_id] = doc
# Ordina per RRF score decrescente
sorted_docs = sorted(
rrf_scores.items(),
key=lambda x: x[1],
reverse=True
)
# Costruisce la lista finale con score
return [
{**doc_registry[doc_id], "rrf_score": score}
for doc_id, score in sorted_docs
]
# Esempio di utilizzo
sparse_results = [
{"doc_id": "doc_A", "text": "BM25 text...", "score": 12.5},
{"doc_id": "doc_B", "text": "...", "score": 8.3},
{"doc_id": "doc_C", "text": "...", "score": 5.1},
]
dense_results = [
{"doc_id": "doc_C", "text": "...", "score": 0.92}, # Doc C primo in dense
{"doc_id": "doc_A", "text": "BM25 text...", "score": 0.88},
{"doc_id": "doc_D", "text": "...", "score": 0.85}, # Solo in dense
]
fused = reciprocal_rank_fusion(
[sparse_results, dense_results],
k=60
)
# RRF scores:
# doc_A: 1/(60+1) + 1/(60+2) = 0.01639 + 0.01613 = 0.03252
# doc_C: 1/(60+3) + 1/(60+1) = 0.01587 + 0.01639 = 0.03226
# doc_B: 1/(60+2) = 0.01613
# doc_D: 1/(60+3) = 0.01587
for doc in fused:
print(f"{doc['doc_id']}: RRF={doc['rrf_score']:.5f}")
Puterea RRF constă în ea robustețe pentru a nota valori aberante: Nu contează dacă BM25 atribuie un scor de 100 primului document și 50 celui de-al doilea, în timp ce scorurile de similaritate sunt 0,99 și 0,97. Singurul lucru care contează este poziția relativă. Acest lucru îl face deosebit de potrivit când cei doi retrievers au scale de notare complet diferite.
Parametrul k influențează cât de multă pondere se acordă documentelor aflate în poziții înalte comparativ cu cei aflati pe pozitii joase. Cu k=60, un document la rangul 1 devine 1/61 = 0,0164, unul la rangul 60 primește 1/120 = 0,0083: primul valorează mai puțin decât dublu pe ultimul. Cu k=10, primul (1/11 = 0,091) valorează de aproape 7 ori a șaizecimea (1/70 = 0,014): clasament plus „câștigătorul-i-a-toate”. Pentru majoritatea cazurilor, k=60 este un bun punct de plecare.
Fuziunea scorului ponderat cu normalizarea
Fuziunea ponderată combină scoruri absolute în loc de ranguri, permițându-vă să controlați cât de mult greutate de dat fiecărui retriever. Problema principală este normalizarea scorurilor: BM25 și asemănarea cosinus trăiesc în scări complet diferite, de unde și combinația directă ("bm25_score * 0,4 + dense_score * 0,6") nu are sens fără normalizare.
# Weighted fusion con normalizzazione Min-Max e normalizzazione Z-score
import numpy as np
from typing import Optional
def min_max_normalize(scores: list[float]) -> list[float]:
"""Normalizza scores in [0, 1] usando Min-Max scaling."""
if not scores:
return []
min_val = min(scores)
max_val = max(scores)
if max_val == min_val:
return [1.0] * len(scores) # Tutti uguali -> tutti 1.0
return [(s - min_val) / (max_val - min_val) for s in scores]
def dbsf_normalize(scores: list[float]) -> list[float]:
"""
Distribution-Based Score Fusion (DBSF) normalization.
Usa mean e std per normalizzazione più robusta agli outlier.
"""
if not scores:
return []
mean = np.mean(scores)
std = np.std(scores)
if std == 0:
return [0.5] * len(scores)
# Clamp tra 0 e 1 dopo la trasformazione
normalized = [(s - mean) / (3 * std) + 0.5 for s in scores]
return [max(0.0, min(1.0, n)) for n in normalized]
def weighted_fusion(
sparse_results: list[dict],
dense_results: list[dict],
sparse_weight: float = 0.3,
dense_weight: float = 0.7,
normalization: str = "minmax", # "minmax" | "dbsf"
top_k: Optional[int] = None,
id_field: str = "doc_id"
) -> list[dict]:
"""
Combina risultati sparse e dense con weighted fusion normalizzata.
"""
# Costruisce dizionari per lookup veloce
sparse_map = {d[id_field]: d for d in sparse_results}
dense_map = {d[id_field]: d for d in dense_results}
# Tutti i doc_ids unici
all_ids = set(sparse_map.keys()) | set(dense_map.keys())
# Normalizza gli score di ciascun retriever
normalize_fn = min_max_normalize if normalization == "minmax" else dbsf_normalize
if sparse_results:
sparse_scores_norm = dict(zip(
[d[id_field] for d in sparse_results],
normalize_fn([d["score"] for d in sparse_results])
))
else:
sparse_scores_norm = {}
if dense_results:
dense_scores_norm = dict(zip(
[d[id_field] for d in dense_results],
normalize_fn([d["score"] for d in dense_results])
))
else:
dense_scores_norm = {}
# Calcola score combinato
fused_docs = []
for doc_id in all_ids:
sparse_score = sparse_scores_norm.get(doc_id, 0.0)
dense_score = dense_scores_norm.get(doc_id, 0.0)
combined_score = sparse_weight * sparse_score + dense_weight * dense_score
# Prende il doc dal retriever che lo ha trovato
doc = sparse_map.get(doc_id) or dense_map.get(doc_id)
fused_docs.append({
**doc,
"combined_score": combined_score,
"sparse_score_norm": sparse_score,
"dense_score_norm": dense_score,
})
# Ordina per combined score
fused_docs.sort(key=lambda x: x["combined_score"], reverse=True)
return fused_docs[:top_k] if top_k else fused_docs
# Quando usare weighted vs RRF:
# - RRF: quando i retriever hanno scale molto diverse, come punto di partenza
# - Weighted + DBSF: quando vuoi controllare il bilanciamento sparse/dense
# basandoti su metriche di evaluation del tuo specifico dataset
# - Weighted + MinMax: più semplice, sensibile agli outlier di score
Re-clasificare cu Cross-Encoder
Fuziunea (RRF sau ponderată) produce o listă de candidați ordonate după relevanța estimată. Dar atât BM25, cât și retrieverii denși folosesc bi-coder: interogarea și documentul vin codificat separat, iar asemănarea este calculată post-hoc. Acesta este eficient, dar cu scurgeri interacțiuni detaliate între interogare și document.
I encoder încrucișat procesează interogarea și se documentează împreună printr-un model transformator, permițând mecanismului de autoatenție să capteze interacțiuni directe între jetoanele de interogare și jetoanele de document. Rezultatul este un scor de relevanță semnificativ mai precis, dar la un cost de calcul proporțional cu numărul de perechi (interogare, document) de evaluat.
# Cross-encoder re-ranking con sentence-transformers
from sentence_transformers import CrossEncoder
import time
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class CrossEncoderReranker:
"""
Re-ranker basato su cross-encoder per la fase di precision refinement.
Usa modello ms-marco per ranking di rilevanza query-documento.
"""
def __init__(
self,
model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
max_length: int = 512,
batch_size: int = 32,
device: Optional[str] = None, # None = auto-detect GPU/CPU
):
self.model = CrossEncoder(
model_name,
max_length=max_length,
device=device
)
self.batch_size = batch_size
logger.info(f"CrossEncoder loaded: {model_name}")
def rerank(
self,
query: str,
documents: list[dict],
text_field: str = "text",
top_k: Optional[int] = None,
) -> list[dict]:
"""
Re-ordina i documenti usando il cross-encoder.
Args:
query: Query originale dell'utente
documents: Lista di documenti da ri-ordinare (output del hybrid retriever)
text_field: Campo del documento che contiene il testo
top_k: Ritorna solo i top_k più rilevanti
Returns:
Documenti ri-ordinati con campo "rerank_score" aggiunto
"""
if not documents:
return []
start_time = time.time()
# Crea coppie (query, document_text) per il cross-encoder
query_doc_pairs = [
(query, doc[text_field]) for doc in documents
]
# Inferenza in batch per efficienza
scores = self.model.predict(
query_doc_pairs,
batch_size=self.batch_size,
show_progress_bar=False,
)
elapsed = time.time() - start_time
logger.debug(
f"Cross-encoder scored {len(documents)} docs in {elapsed:.3f}s "
f"({elapsed/len(documents)*1000:.1f}ms/doc)"
)
# Aggiunge score e ri-ordina
reranked = [
{**doc, "rerank_score": float(score)}
for doc, score in zip(documents, scores)
]
reranked.sort(key=lambda x: x["rerank_score"], reverse=True)
return reranked[:top_k] if top_k else reranked
# Pipeline completa: Hybrid Retrieval + Cross-Encoder Reranking
class RAGRetrievalPipeline:
def __init__(
self,
hybrid_retriever: HybridRetriever,
reranker: CrossEncoderReranker,
retrieval_top_k: int = 50, # Recupera molti per il reranker
final_top_k: int = 5, # Top-K finale per il LLM context
):
self.hybrid_retriever = hybrid_retriever
self.reranker = reranker
self.retrieval_top_k = retrieval_top_k
self.final_top_k = final_top_k
async def retrieve_for_llm(self, query: str) -> list[dict]:
"""
Pipeline completa: hybrid retrieval -> cross-encoder reranking.
Ottimizzato per massimizzare precision@5 (i 5 docs passati al LLM).
"""
# Step 1: Hybrid retrieval con largo top_k per il reranker
candidates = await self.hybrid_retriever.search(
query, final_top_k=self.retrieval_top_k
)
if not candidates:
return []
# Step 2: Re-ranking con cross-encoder
# Il reranker opera su self.retrieval_top_k docs, tipicamente 20-50
reranked = self.reranker.rerank(
query=query,
documents=candidates,
top_k=self.final_top_k
)
return reranked
# Performance tipica (GPU T4):
# - Hybrid retrieval (BM25 + HNSW): ~10-20ms
# - Cross-encoder reranking (20 docs): ~80-120ms
# - Cross-encoder reranking (50 docs): ~200-350ms
# Totale pipeline: ~100-370ms a seconda del top_k del reranker
Modele de codificatoare încrucișate recomandate (2025)
- cross-encoder/ms-marco-MiniLM-L-6-v2: Echilibru optim viteză/precizie. HARTĂ 0,82 pe MS MARCO. ~12 ms/doc pe GPU. Ideal pentru productie.
- cross-encoder/ms-marco-MiniLM-L-12-v2: Mai precis, ~2x mai lent. Pentru interogări cu prioritate ridicată.
- BAAI/bge-reranker-v2-m3: Multilingv, excelent pentru italiană. Suportă până la 8192 de jetoane. Recomandat pentru RAG în italiană.
- API-ul Cohere Rerank: Soluție gestionată, latență de ~50 ms, precizie excelentă. Cost pe interogare. Excelent pentru o dovadă rapidă a conceptului.
- Jina Reranker v2: Open-source, context 8192 token, excelent pe texte tehnice.
Implementare cu Qdrant Sparse + Dense Vectors
Qdrant acceptă în mod nativ căutarea hibridă prin intermediul acestuia API de interogare cu vectori rari și mecanismul preluare. Spre deosebire de soluții necesitând sisteme separate pentru rar și dens, Qdrant se ocupă de ambele într-unul colecție, simplificând semnificativ arhitectura.
# Qdrant Hybrid Search: setup e query completa
from qdrant_client import QdrantClient
from qdrant_client import models
from fastembed import TextEmbedding, SparseTextEmbedding
import numpy as np
# Inizializzazione client e modelli di embedding
client = QdrantClient("localhost", port=6333)
# Modello dense: all-MiniLM-L6-v2 (384 dim, veloce)
dense_model = TextEmbedding("sentence-transformers/all-MiniLM-L6-v2")
# Modello sparse: BM25 tramite FastEmbed
sparse_model = SparseTextEmbedding("Qdrant/bm25")
COLLECTION_NAME = "hybrid_rag_collection"
def create_hybrid_collection():
"""Crea collection con supporto sparse + dense vectors."""
client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config={
"dense": models.VectorParams(
size=384, # Dimensione all-MiniLM-L6-v2
distance=models.Distance.COSINE,
on_disk=False, # In-memory per latenza bassa
)
},
sparse_vectors_config={
"sparse": models.SparseVectorParams(
modifier=models.Modifier.IDF, # BM25-style IDF weighting
)
},
optimizers_config=models.OptimizersConfigDiff(
indexing_threshold=20000, # Inizia HNSW dopo 20k vettori
),
)
print(f"Collection '{COLLECTION_NAME}' creata con sparse + dense support")
def index_documents(documents: list[dict]):
"""
Indicizza documenti con vettori dense e sparse.
Args:
documents: Lista di {'id': str, 'text': str, 'metadata': dict}
"""
texts = [doc["text"] for doc in documents]
# Genera dense embeddings in batch
dense_embeddings = list(dense_model.embed(texts))
# Genera sparse embeddings (BM25) in batch
sparse_embeddings = list(sparse_model.embed(texts))
# Prepara i points per Qdrant
points = []
for i, doc in enumerate(documents):
sparse_emb = sparse_embeddings[i]
points.append(
models.PointStruct(
id=i, # Usa ID numerico o UUID
payload={
"text": doc["text"],
**doc.get("metadata", {})
},
vector={
"dense": dense_embeddings[i].tolist(),
"sparse": models.SparseVector(
indices=sparse_emb.indices.tolist(),
values=sparse_emb.values.tolist(),
)
}
)
)
# Upsert in batch
client.upsert(
collection_name=COLLECTION_NAME,
points=points,
wait=True
)
print(f"Indicizzati {len(documents)} documenti")
def hybrid_search_qdrant(
query: str,
top_k: int = 10,
prefetch_k: int = 50,
fusion: str = "rrf", # "rrf" | "dbsf"
) -> list[dict]:
"""
Esegue hybrid search con Qdrant Query API.
Usa il meccanismo di prefetch: recupera top_k*5 candidati da ogni
retriever, poi li fonde con RRF o DBSF.
"""
# Genera query embeddings
query_dense = list(dense_model.embed([query]))[0].tolist()
query_sparse_emb = list(sparse_model.embed([query]))[0]
query_sparse = models.SparseVector(
indices=query_sparse_emb.indices.tolist(),
values=query_sparse_emb.values.tolist(),
)
# Fusion method
fusion_model = (
models.Fusion.RRF if fusion == "rrf"
else models.Fusion.DBSF
)
# Query API con prefetch (hybrid search nativo Qdrant)
results = client.query_points(
collection_name=COLLECTION_NAME,
prefetch=[
# Prefetch BM25 sparse
models.Prefetch(
query=query_sparse,
using="sparse",
limit=prefetch_k,
),
# Prefetch dense semantic
models.Prefetch(
query=query_dense,
using="dense",
limit=prefetch_k,
),
],
query=models.FusionQuery(fusion=fusion_model),
limit=top_k,
with_payload=True,
)
return [
{
"doc_id": str(point.id),
"text": point.payload.get("text", ""),
"score": point.score,
"payload": point.payload
}
for point in results.points
]
# Esempio di utilizzo completo
if __name__ == "__main__":
# Crea la collection
create_hybrid_collection()
# Indicizza documenti di esempio
sample_docs = [
{"id": "1", "text": "Qdrant is a vector database optimized for ANN search with filtering", "metadata": {"category": "vectordb"}},
{"id": "2", "text": "BM25 algorithm for information retrieval with term frequency saturation", "metadata": {"category": "ir"}},
{"id": "3", "text": "Hybrid search combines sparse BM25 and dense vector embeddings", "metadata": {"category": "hybrid"}},
{"id": "4", "text": "Reciprocal Rank Fusion merges multiple ranked lists into a single ranking", "metadata": {"category": "fusion"}},
]
index_documents(sample_docs)
# Esegui query ibrida
query = "BM25 hybrid vector search fusion"
results = hybrid_search_qdrant(query, top_k=3, prefetch_k=20, fusion="rrf")
print(f"\nRisultati per: '{query}'")
for r in results:
print(f" Score: {r['score']:.4f} | {r['text'][:70]}...")
Evaluare: NDCG, MRR și Precision@k
Construirea unui sistem hibrid de recuperare fără un cadru de evaluare și construirea fără măsurători. Înainte de optimizarea oricărui parametru (k de RRF, greutate rară/densă, pragul reranker), aveți nevoie de un set de date de testare cu adevărul de bază și metrici definite. Cele mai importante trei valori pentru recuperare sunt NDCG, MRR și Precision@k.
# Framework di evaluation per hybrid retrieval
import numpy as np
from typing import Optional
def ndcg_at_k(
retrieved_ids: list[str],
relevant_ids: list[str],
k: int,
relevance_grades: Optional[dict] = None
) -> float:
"""
Normalized Discounted Cumulative Gain @k.
Misura la qualità del ranking considerando la posizione dei documenti rilevanti.
Valori in [0, 1], 1 = ranking perfetto.
Args:
retrieved_ids: Lista di doc_id nell'ordine recuperato
relevant_ids: Lista di doc_id rilevanti (ground truth)
k: Cut-off
relevance_grades: Dizionario {doc_id: grade} per rilevanza graduata (opzionale)
"""
relevant_set = set(relevant_ids)
top_k = retrieved_ids[:k]
# DCG: Discounted Cumulative Gain
dcg = 0.0
for i, doc_id in enumerate(top_k):
if relevance_grades:
grade = relevance_grades.get(doc_id, 0)
else:
grade = 1.0 if doc_id in relevant_set else 0.0
# Formula: rel_i / log2(i + 2) (i 0-indexed, log2(2) per i=0)
dcg += grade / np.log2(i + 2)
# IDCG: Ideal DCG (ranking perfetto)
if relevance_grades:
ideal_grades = sorted(
[relevance_grades.get(rid, 0) for rid in relevant_ids],
reverse=True
)[:k]
else:
ideal_grades = [1.0] * min(len(relevant_ids), k)
idcg = sum(
grade / np.log2(i + 2)
for i, grade in enumerate(ideal_grades)
)
return dcg / idcg if idcg > 0 else 0.0
def mrr(retrieved_results: list[list[str]], relevant_results: list[list[str]]) -> float:
"""
Mean Reciprocal Rank.
Media del reciproco del rank del primo risultato rilevante.
"""
reciprocal_ranks = []
for retrieved, relevant in zip(retrieved_results, relevant_results):
relevant_set = set(relevant)
rr = 0.0
for rank, doc_id in enumerate(retrieved, start=1):
if doc_id in relevant_set:
rr = 1.0 / rank
break
reciprocal_ranks.append(rr)
return np.mean(reciprocal_ranks)
def evaluate_retriever(
retriever_fn, # Funzione che prende query e ritorna lista di doc_id
test_queries: list[dict], # Lista di {'query': str, 'relevant_ids': list[str]}
k_values: list[int] = [1, 3, 5, 10]
) -> dict:
"""
Valuta un retriever su un test set con più metriche.
"""
all_retrieved = []
all_relevant = []
ndcg_scores = {k: [] for k in k_values}
for item in test_queries:
query = item["query"]
relevant_ids = item["relevant_ids"]
# Recupera risultati
results = retriever_fn(query, top_k=max(k_values))
retrieved_ids = [r["doc_id"] for r in results]
all_retrieved.append(retrieved_ids)
all_relevant.append(relevant_ids)
# Calcola NDCG per ogni k
for k in k_values:
ndcg = ndcg_at_k(retrieved_ids, relevant_ids, k)
ndcg_scores[k].append(ndcg)
# Aggrega metriche
metrics = {
"MRR": mrr(all_retrieved, all_relevant),
}
for k in k_values:
metrics[f"NDCG@{k}"] = np.mean(ndcg_scores[k])
return metrics
# Esempio: confronto sparse-only vs dense-only vs hybrid
def run_ablation_study(test_queries, sparse_retriever, dense_retriever, hybrid_retriever):
print("=== Ablation Study: Retrieval Methods ===\n")
for name, retriever in [
("BM25 only", sparse_retriever.retrieve),
("Dense only", dense_retriever.search),
("Hybrid RRF", hybrid_retriever.search),
]:
metrics = evaluate_retriever(retriever, test_queries)
print(f"{name}:")
for metric, value in metrics.items():
print(f" {metric}: {value:.4f}")
print()
Conducta de producție
O conductă de recuperare hibridă în producție trebuie să se ocupe de mai multe aspecte decât atât corectitudine: latenta (SLA tipic: p95 sub 500 ms), stocarea în cache pentru întrebări frecvente, monitorizare de calitate în timp e degradare grațioasă când una dintre componente scade.
# Pipeline di produzione con caching, monitoring e fallback
import asyncio
import hashlib
import json
import time
from functools import lru_cache
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class ProductionHybridPipeline:
"""
Pipeline hybrid retrieval production-ready con:
- Cache query risultati (TTL configurabile)
- Metriche latenza per monitoring
- Fallback a dense-only se sparse non disponibile
- Circuit breaker per reranker
"""
def __init__(
self,
sparse_retriever,
dense_retriever,
reranker: Optional[CrossEncoderReranker] = None,
cache_ttl_seconds: int = 300,
retrieval_top_k: int = 50,
final_top_k: int = 5,
reranker_enabled: bool = True,
reranker_timeout_ms: float = 500,
):
self.sparse = sparse_retriever
self.dense = dense_retriever
self.reranker = reranker
self.cache: dict = {}
self.cache_ttl = cache_ttl_seconds
self.retrieval_top_k = retrieval_top_k
self.final_top_k = final_top_k
self.reranker_enabled = reranker_enabled
self.reranker_timeout_ms = reranker_timeout_ms
# Metriche (in produzione: Prometheus o simili)
self.metrics = {
"total_queries": 0,
"cache_hits": 0,
"reranker_timeouts": 0,
"sparse_failures": 0,
"avg_latency_ms": 0.0,
}
def _cache_key(self, query: str) -> str:
"""Hash deterministico della query per cache key."""
return hashlib.sha256(query.encode()).hexdigest()[:16]
def _get_cached(self, cache_key: str) -> Optional[list[dict]]:
"""Recupera dalla cache se non scaduta."""
if cache_key in self.cache:
cached_at, results = self.cache[cache_key]
if time.time() - cached_at < self.cache_ttl:
return results
del self.cache[cache_key]
return None
async def search(
self,
query: str,
use_cache: bool = True,
force_rerank: bool = False,
) -> dict:
"""
Esegue la pipeline completa con caching e monitoring.
Returns:
{
'results': list[dict],
'latency_ms': float,
'cache_hit': bool,
'reranked': bool,
'fallback_mode': bool,
}
"""
start = time.time()
self.metrics["total_queries"] += 1
cache_key = self._cache_key(query)
# Controlla cache
if use_cache:
cached = self._get_cached(cache_key)
if cached:
self.metrics["cache_hits"] += 1
return {
"results": cached,
"latency_ms": (time.time() - start) * 1000,
"cache_hit": True,
"reranked": False,
"fallback_mode": False,
}
fallback_mode = False
# Hybrid retrieval con fallback
try:
sparse_task = asyncio.create_task(
self.sparse.search(query, self.retrieval_top_k)
)
dense_task = asyncio.create_task(
self.dense.search(query, self.retrieval_top_k)
)
sparse_results, dense_results = await asyncio.gather(
sparse_task, dense_task
)
candidates = reciprocal_rank_fusion(
[sparse_results, dense_results], k=60
)[:self.retrieval_top_k]
except Exception as e:
logger.warning(f"Sparse retriever failed: {e}. Falling back to dense-only.")
self.metrics["sparse_failures"] += 1
fallback_mode = True
candidates = await self.dense.search(query, self.retrieval_top_k)
# Re-ranking opzionale con timeout
reranked = False
if self.reranker and (self.reranker_enabled or force_rerank) and candidates:
try:
rerank_start = time.time()
final_results = self.reranker.rerank(
query=query,
documents=candidates[:20], # Max 20 per contenere latenza
top_k=self.final_top_k,
)
rerank_elapsed = (time.time() - rerank_start) * 1000
if rerank_elapsed > self.reranker_timeout_ms:
logger.warning(f"Reranker slow: {rerank_elapsed:.0f}ms")
self.metrics["reranker_timeouts"] += 1
reranked = True
except Exception as e:
logger.error(f"Reranker failed: {e}. Using hybrid results.")
final_results = candidates[:self.final_top_k]
else:
final_results = candidates[:self.final_top_k]
# Aggiorna cache e metriche
self.cache[cache_key] = (time.time(), final_results)
latency = (time.time() - start) * 1000
self.metrics["avg_latency_ms"] = (
self.metrics["avg_latency_ms"] * 0.95 + latency * 0.05 # EMA
)
return {
"results": final_results,
"latency_ms": latency,
"cache_hit": False,
"reranked": reranked,
"fallback_mode": fallback_mode,
}
Cele mai bune practici și anti-modele
Construirea unui sistem eficient de recuperare hibrid necesită evitarea unor capcane comune care apar în producție și nu sunt evidente în tutorialele de bază.
Cele mai bune practici de recuperare hibridă
- Începe cu RRF k=60: Este cea mai robustă implicită din punct de vedere empiric. Experimentați cu alte valori numai după stabilirea unei linii de bază cu valorile NDCG.
- top_k pentru retriever >= 3x final_top_k: Dacă vrei top 5 final, recuperați cel puțin 15 de la fiecare retriever pentru a oferi materialului de fuziune suficient.
- Tokenizare constantă: BM25 și modelul de încorporare trebuie să folosească aceeași conductă de preprocesare (minuscule, cuvinte stop, stemming) pentru consecvență.
- Cross-encoder pe maximum 20-50 de documente: Peste 50 de candidați câștigă de precizie este marginală în comparație cu costul de latență adăugat.
- Evaluați separat și dens: Înainte de integrare, măsoară valorile fiecărei componente. Dacă retrieverul dens este deja la 90% NDCG@5, hibridul poate să nu adauge valoare pentru setul dvs. de date specific.
- Cache normalizat la nivel de interogare: Litere mici și tăiați interogarea înainte de hashing pentru a maximiza ratele de accesare a memoriei cache.
Anti-modele de evitat
- Combinarea scorurilor nenormalizate: „BM25_score + cosine_score” fără normalizarea produce rezultate dominate de cel mai mare scale retriever (aproape întotdeauna BM25).
- Folosiți reranker-ul pentru toate rezultatele extragerii: Reclasează 200 de documente adaugă 2-3 secunde de latență. Limitați întotdeauna rerankerul la 20-50 de candidați.
- Ignorarea calității fragmentului: Recuperarea hibridă nu rezolvă bucățile slab format (prea scurt, tăiat la jumătatea unui concept). Calitatea indexării este premisa fundamentală.
- Optimizați fără seturi de testare: Schimbați greutățile rare/dense sau k din RRF fără măsurarea pe un set de date de testare duce la supraadaptarea impresiilor subiective.
- Nu gestionați alternativă: Dacă indexul BM25 este offline, sistemul trebuie degradați grațios la numai dens, nu vă prăbușiți.
Când recuperarea hibridă nu este suficientă
Recuperarea hibridă rezolvă multe probleme, dar nu toate. Dacă după implementare metrica de recuperare sunt încă insuficiente, luați în considerare aceste căi avansate:
- HyDE (înglobare de documente ipotetice): LLM generează un răspuns ipotetic la interogare, care este apoi folosită ca interogare pentru retriever. Îmbunătățește reamintirea semantică pe interogări abstracte sau slab formate.
- Extindere interogare: Generați variante de interogare (sinonime, reformulări) cu LLM și efectuați recuperarea pe toate, apoi îmbinați rezultatele cu RRF.
- SABII: Model rar învățat care produce vectori rari „inteligenti” în loc de frecvența pură a termenului. Mai precis decât BM25, dar necesită inferență ML.
- ColBERT/ColPali: Model de interacțiune tardivă care compară fiecare token din interogare cu fiecare jeton de document. Precizie superioară codificatorului încrucișat cu latență de recuperare (nu reclasificare).
- GraphRAG: Îmbunătățiți recuperarea vectorului cu un grafic de captare a cunoștințelor relaţii structurate între entităţi. Ideal pentru întrebări care necesită raționament multi-hop.
Concluzii
Hybrid Retrieval este strategia standard pentru sistemele RAG de producție în prezent lucru pe interogări eterogene: de la termenul tehnic exact la întrebarea conceptuală vagă. Combinația BM25 + dens cu RRF oferă o linie de bază deja foarte robustă, pe care Re-clasificarea încrucișată a codificatorilor duce la niveluri de precizie cu care sunt dificil de atins abordări unice.
Cheia implementării cu succes a acestuia este în ordinea operațiunilor: Construiți mai întâi un set de testare cu adevărul real al domeniului dvs., stabiliți linii de bază separate pentru BM25 e dens, apoi experimentați cu fuziunea și măsurați delta. Numai cu metrici concrete (NDCG@5, MRR) vă puteți da seama dacă adăugarea rerankerului merită cei 200 ms de latență în plus pentru cazul dvs. de utilizare.
Următorii pași
- Continuați cu Conducta LangChain RAG: document până la răspuns pentru a integra acest retriever într-o conductă completă cu LLM.
- Legile RAG în producție: Monitorizare, Evaluare, Optimizare pentru un cadru complet de evaluare și monitorizare în producție.
- Explora Încorporare și căutare vectorială: BERT vs Sentence Transformers pentru a afla mai multe despre alegerea modelului dens optim pentru domeniul dvs.
- Luați în considerare pgvector și PostgreSQL AI dacă doriți să implementați căutarea hibridă direct în baza de date PostgreSQL existentă.
Resurse și referințe
- Documentație de căutare hibridă Qdrant - API-ul de interogare și vectori rari
- Cormack, Clarke, Buettcher (2009) - „Fuziunea de rang reciproc depășește metodele Condorcet și de învățare a rangului individual”
- Benchmark BEIR - Benchmark de regăsire heterogenă
- documentație privind transformatoarele de propoziție/cross-encoder
- MTEB (Massive Text Embedding Benchmark) - Clasament 2025







