Hybrid Retrieval: Combinare BM25 e Vector Search per RAG di Produzione
La ricerca semantica con embedding ha rivoluzionato il modo in cui recuperiamo informazioni nei sistemi RAG, ma nasconde un limite fondamentale che emerge puntualmente in produzione: se l'utente cerca "GPT-4 hallucination rate benchmark Q3 2024", un modello di embedding trovera documenti semanticamente vicini al concetto di "allucinazione dei modelli linguistici", ma potrebbe non recuperare il documento esatto che contiene quella stringa specifica di testo. La ricerca per keyword, al contrario, trova esattamente quella frase, ma non sa che "LLM factuality issue" è concettualmente identica.
Il Hybrid Retrieval nasce proprio per risolvere questa tensione. Combinando la ricerca sparse (BM25 e varianti) con la ricerca dense (vector search), si ottiene un sistema che è sia preciso sulle corrispondenze esatte sia robusto sulla comprensione semantica. Ricerche recenti mostrano che i sistemi ibridi migliorano il retrieval quality del 48% rispetto ai metodi singoli nei benchmark BEIR e MTEB, con guadagni particolarmente marcati su query tecniche, nomi propri e terminologia specializzata.
Questo articolo è un deep dive tecnico sull'architettura hybrid retrieval: da BM25 come funziona internamente, ai metodi di fusione (Reciprocal Rank Fusion, weighted fusion), al re-ranking con cross-encoder, fino all'implementazione pratica con Qdrant e all'evaluation con metriche NDCG/MRR. L'obiettivo è fornire gli strumenti per costruire e ottimizzare pipeline di retrieval che funzionano in produzione, non solo nei benchmark.
Cosa Imparerai
- Limiti della ricerca solo-semantica e perchè BM25 rimane rilevante nel 2025
- Algoritmo BM25: term frequency saturation, IDF weighting, length normalization
- Architettura hybrid search: sparse + dense in parallelo
- Reciprocal Rank Fusion (RRF): formula, implementazione e tuning del parametro k
- Weighted score fusion: normalizzazione e bilanciamento dei contributi
- Cross-encoder re-ranking: quando usarlo e come ottimizzare latenza vs accuracy
- Implementazione con Qdrant sparse vectors e Query API
- Metriche di evaluation: NDCG@k, MRR, Precision@k per hybrid search
- Pipeline di produzione con caching, monitoring e ottimizzazione progressiva
Limiti della Ricerca Solo-Semantica
La ricerca semantica con dense vectors è potente per catturare il significato latente del testo, ma presenta vulnerabilità strutturali che diventano evidenti con query reali di produzione. Comprendere questi limiti è il primo passo per capire perchè il hybrid retrieval sia necessario e non solo opzionale in sistemi RAG seri.
Il problema principale è quello che i ricercatori chiamano vocabulary mismatch: il modello di embedding è addestrato su distribuzioni di testo generali e non sempre cattura la rilevanza di termini tecnici specifici, acronimi, nomi di prodotti, versioni software o codici identificativi. Un embedding model non sa che "MSMARCO-v2.1" si riferisce a un dataset specifico o che "CVE-2024-4577" è una vulnerabilità critica PHP, a meno che non sia stato fine-tuned su quel dominio.
Quando la Ricerca Semantica Fallisce
- Query con numeri di versione: "Python 3.12 asyncio.TaskGroup" vs "Python async patterns"
- Identificatori unici: CVE IDs, numeri ordine, codici fiscali, ISBN
- Acronimi non comuni: termini di dominio, sigle aziendali, codici normativi
- Nomi propri rari: nomi di persone, aziende piccole, localita geografiche
- Query molto corte: con 1-2 token l'embedding è poco discriminativo
- Terminologia tecnica recente: modelli con knowledge cutoff non conoscono termini nuovi
Un secondo problema è la score calibration: i similarity score dei dense vectors (tipicamente cosine similarity in range [-1, 1] o dot product non bounded) non hanno una semantica assoluta. Un documento con score 0.85 non è necessariamente più rilevante di uno con 0.82 in contesti diversi. Questo rende difficile confrontare o combinare score da sistemi diversi senza una normalizzazione appropriata.
Infine, la ricerca semantica soffre di semantic drift su query ambigue: una query come "Java" in un contesto di programmazione potrebbe recuperare documenti su "Isola di Giava" se il contesto del documento non è sufficientemente chiaro per l'embedding model, specialmente con chunk di testo molto brevi o decontestualizzati.
BM25: Refresh dell'Algoritmo Fondamentale
BM25 (Best Match 25) è una funzione di ranking sviluppata negli anni '90 che rimane, nel 2025, uno degli algoritmi di information retrieval più efficaci per ricerca keyword. Comprendere il suo funzionamento interno è necessario sia per usarlo correttamente sia per capire perchè complementa cosi bene la ricerca semantica.
BM25 estende TF-IDF con due meccanismi fondamentali: term frequency saturation e length normalization. La formula completa per il punteggio di un documento D rispetto a una query Q con termini {q1, ..., qn} e:
# Formula BM25 (pseudocodice matematico)
# score(D, Q) = sommatoria per qi in Q di:
# IDF(qi) * (TF(qi, D) * (k1 + 1)) / (TF(qi, D) + k1 * (1 - b + b * |D| / avgdl))
#
# Dove:
# IDF(qi) = log((N - df_i + 0.5) / (df_i + 0.5) + 1)
# TF(qi, D) = frequenza del termine qi nel documento D
# |D| = lunghezza del documento D in termini
# avgdl = lunghezza media dei documenti nella collection
# N = numero totale di documenti
# df_i = numero di documenti che contengono qi
# k1 = parametro di saturazione TF (default: 1.2-2.0)
# b = parametro di length normalization (default: 0.75)
# Implementazione Python con rank_bm25
from rank_bm25 import BM25Okapi
import nltk
from nltk.tokenize import word_tokenize
class BM25Retriever:
def __init__(self, corpus: list[str], k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
# Tokenizzazione e lowercase
self.tokenized_corpus = [
word_tokenize(doc.lower()) for doc in corpus
]
self.bm25 = BM25Okapi(self.tokenized_corpus, k1=k1, b=b)
self.corpus = corpus
def retrieve(self, query: str, top_k: int = 20) -> list[dict]:
tokenized_query = word_tokenize(query.lower())
scores = self.bm25.get_scores(tokenized_query)
# Crea lista di (index, score) ordinata per score decrescente
ranked = sorted(
enumerate(scores),
key=lambda x: x[1],
reverse=True
)[:top_k]
return [
{"doc_id": idx, "text": self.corpus[idx], "score": score}
for idx, score in ranked
if score > 0 # Filtra documenti senza match
]
# Uso pratico
corpus = [
"BM25 is a ranking function used in information retrieval",
"Vector search uses dense embeddings for semantic similarity",
"Hybrid search combines BM25 and vector search for better recall",
"Python asyncio enables concurrent programming",
]
retriever = BM25Retriever(corpus, k1=1.5, b=0.75)
results = retriever.retrieve("BM25 hybrid search retrieval", top_k=3)
for r in results:
print(f"Score: {r['score']:.4f} | Text: {r['text'][:60]}...")
Il parametro k1 controlla la saturazione della term frequency: con k1 basso (0.5), la differenza tra 1 e 2 occorrenze di un termine conta quasi quanto la differenza tra 10 e 100 occorrenze; con k1 alto (2.0), la TF continua ad avere impatto anche per frequenze elevate. Il parametro b controlla quanto penalizzare i documenti lunghi: b=0 disabilita la length normalization, b=1 la normalizza completamente.
Un aspetto spesso trascurato di BM25 è che i suoi score sono non bounded superiormente: un documento molto rilevante con molte occorrenze del termine di ricerca può avere score 10, 50 o 100 a seconda del corpus. Questo crea un problema di compatibilità diretta con i cosine similarity scores dei dense vectors, che stanno in [-1, 1]. La fusione ibrida deve gestire questa discrepanza.
Architettura Hybrid Search: Come Combinarli
L'architettura base di un sistema hybrid retrieval esegue sparse e dense search in parallelo su indexing separati, poi fonde i risultati prima di restituirli all'utente (o al LLM nel contesto RAG). Esistono tre punti architetturali dove la fusione può avvenire, con tradeoff diversi:
- Early fusion (pre-retrieval): i documenti vengono rappresentati con un vettore ibrido che combina features sparse e dense prima dell'indexing. Esempi: SPLADE, ColBERT in modalità end-to-end. Più costoso a livello di indexing, ma più coerente.
- Late fusion (post-retrieval): i due retriever operano indipendentemente su indici separati e i risultati vengono fusi a livello di ranking. E' l'approccio più comune e flessibile, permette di aggiornare i componenti indipendentemente.
- Re-ranking stage: un modello separato (cross-encoder) ri-ordina i risultati fusi dalla late fusion. Aggiunge latenza ma migliora significativamente la precision@k.
# Architettura base hybrid retrieval con late fusion
from typing import Protocol
import asyncio
class Retriever(Protocol):
async def search(self, query: str, top_k: int) -> list[dict]:
"""Ritorna lista di {'doc_id': str, 'text': str, 'score': float}"""
...
class HybridRetriever:
def __init__(
self,
sparse_retriever: Retriever,
dense_retriever: Retriever,
fusion_method: str = "rrf", # "rrf" | "weighted" | "dbsf"
sparse_weight: float = 0.4,
dense_weight: float = 0.6,
top_k_per_retriever: int = 50, # Recupera più docs per la fusione
):
self.sparse = sparse_retriever
self.dense = dense_retriever
self.fusion_method = fusion_method
self.sparse_weight = sparse_weight
self.dense_weight = dense_weight
self.top_k_per_retriever = top_k_per_retriever
async def search(self, query: str, final_top_k: int = 10) -> list[dict]:
# Esecuzione parallela dei due retriever
sparse_results, dense_results = await asyncio.gather(
self.sparse.search(query, self.top_k_per_retriever),
self.dense.search(query, self.top_k_per_retriever)
)
if self.fusion_method == "rrf":
return self._rrf_fusion(sparse_results, dense_results, final_top_k)
elif self.fusion_method == "weighted":
return self._weighted_fusion(sparse_results, dense_results, final_top_k)
else:
raise ValueError(f"Unknown fusion method: {self.fusion_method}")
def _rrf_fusion(self, sparse: list[dict], dense: list[dict], k: int) -> list[dict]:
# Implementato nella sezione successiva
pass
def _weighted_fusion(self, sparse: list[dict], dense: list[dict], k: int) -> list[dict]:
# Implementato nella sezione successiva
pass
Reciprocal Rank Fusion (RRF)
RRF è l'algoritmo di fusione più usato in hybrid search grazie alla sua semplicità, robustezza e indipendenza dalla scala degli score. Proposto originariamente da Cormack, Clarke e Buettcher nel 2009, assegna a ogni documento un punteggio basato esclusivamente sulla sua posizione nella lista di ranking di ciascun retriever, ignorando completamente il valore assoluto dello score.
La formula RRF per il punteggio di un documento D che compare nelle liste L1, L2, ..., Lm e:
RRF(D) = sommatoria per i=1..m di: 1 / (k + rank_i(D))
Dove k è una costante (di solito 60) che smorza l'impatto dei documenti in cima alle liste. Se D non appare nella lista i, il suo contributo è 0. Il parametro k=60 è stato determinato empiricamente: valori tipici vanno da 10 a 100.
# Implementazione completa RRF
from collections import defaultdict
def reciprocal_rank_fusion(
result_lists: list[list[dict]],
k: int = 60,
id_field: str = "doc_id"
) -> list[dict]:
"""
Fonde N liste di risultati usando Reciprocal Rank Fusion.
Args:
result_lists: Lista di liste, ognuna ordinata per relevance decrescente
k: Costante di smorzamento (default 60, range consigliato 10-100)
id_field: Campo usato come identificatore univoco del documento
Returns:
Lista fusa ordinata per RRF score decrescente
"""
rrf_scores = defaultdict(float)
doc_registry = {} # Mappa doc_id -> doc completo
for result_list in result_lists:
for rank, doc in enumerate(result_list, start=1):
doc_id = doc[id_field]
# Formula RRF: 1 / (k + rank)
rrf_scores[doc_id] += 1.0 / (k + rank)
# Salva il documento (usa l'ultimo trovato se compare più volte)
if doc_id not in doc_registry:
doc_registry[doc_id] = doc
# Ordina per RRF score decrescente
sorted_docs = sorted(
rrf_scores.items(),
key=lambda x: x[1],
reverse=True
)
# Costruisce la lista finale con score
return [
{**doc_registry[doc_id], "rrf_score": score}
for doc_id, score in sorted_docs
]
# Esempio di utilizzo
sparse_results = [
{"doc_id": "doc_A", "text": "BM25 text...", "score": 12.5},
{"doc_id": "doc_B", "text": "...", "score": 8.3},
{"doc_id": "doc_C", "text": "...", "score": 5.1},
]
dense_results = [
{"doc_id": "doc_C", "text": "...", "score": 0.92}, # Doc C primo in dense
{"doc_id": "doc_A", "text": "BM25 text...", "score": 0.88},
{"doc_id": "doc_D", "text": "...", "score": 0.85}, # Solo in dense
]
fused = reciprocal_rank_fusion(
[sparse_results, dense_results],
k=60
)
# RRF scores:
# doc_A: 1/(60+1) + 1/(60+2) = 0.01639 + 0.01613 = 0.03252
# doc_C: 1/(60+3) + 1/(60+1) = 0.01587 + 0.01639 = 0.03226
# doc_B: 1/(60+2) = 0.01613
# doc_D: 1/(60+3) = 0.01587
for doc in fused:
print(f"{doc['doc_id']}: RRF={doc['rrf_score']:.5f}")
La potenza di RRF sta nella sua robustezza agli outlier di score: non importa se BM25 assegna score 100 al primo documento e 50 al secondo, mentre cosine similarity da 0.99 e 0.97. L'unica cosa che conta è la posizione relativa. Questo lo rende particolarmente adatto quando i due retriever hanno scale di scoring completamente diverse.
Il parametro k influenza quanto peso viene dato ai documenti nelle posizioni alte rispetto a quelli nelle posizioni basse. Con k=60, un documento al rank 1 ottiene 1/61 = 0.0164, uno al rank 60 ottiene 1/120 = 0.0083: il primo vale meno del doppio dell'ultimo. Con k=10, il primo (1/11 = 0.091) vale quasi 7 volte il sessantesimo (1/70 = 0.014): ranking più "winner-takes-all". Per la maggior parte dei casi, k=60 e un ottimo punto di partenza.
Weighted Score Fusion con Normalizzazione
La weighted fusion combina score assoluti invece di rank, permettendo di controllare quanto peso dare a ciascun retriever. Il problema principale è la normalizzazione degli score: BM25 e cosine similarity vivono in scale completamente diverse, quindi la combinazione diretta ("bm25_score * 0.4 + dense_score * 0.6") non ha senso senza normalizzazione.
# Weighted fusion con normalizzazione Min-Max e normalizzazione Z-score
import numpy as np
from typing import Optional
def min_max_normalize(scores: list[float]) -> list[float]:
"""Normalizza scores in [0, 1] usando Min-Max scaling."""
if not scores:
return []
min_val = min(scores)
max_val = max(scores)
if max_val == min_val:
return [1.0] * len(scores) # Tutti uguali -> tutti 1.0
return [(s - min_val) / (max_val - min_val) for s in scores]
def dbsf_normalize(scores: list[float]) -> list[float]:
"""
Distribution-Based Score Fusion (DBSF) normalization.
Usa mean e std per normalizzazione più robusta agli outlier.
"""
if not scores:
return []
mean = np.mean(scores)
std = np.std(scores)
if std == 0:
return [0.5] * len(scores)
# Clamp tra 0 e 1 dopo la trasformazione
normalized = [(s - mean) / (3 * std) + 0.5 for s in scores]
return [max(0.0, min(1.0, n)) for n in normalized]
def weighted_fusion(
sparse_results: list[dict],
dense_results: list[dict],
sparse_weight: float = 0.3,
dense_weight: float = 0.7,
normalization: str = "minmax", # "minmax" | "dbsf"
top_k: Optional[int] = None,
id_field: str = "doc_id"
) -> list[dict]:
"""
Combina risultati sparse e dense con weighted fusion normalizzata.
"""
# Costruisce dizionari per lookup veloce
sparse_map = {d[id_field]: d for d in sparse_results}
dense_map = {d[id_field]: d for d in dense_results}
# Tutti i doc_ids unici
all_ids = set(sparse_map.keys()) | set(dense_map.keys())
# Normalizza gli score di ciascun retriever
normalize_fn = min_max_normalize if normalization == "minmax" else dbsf_normalize
if sparse_results:
sparse_scores_norm = dict(zip(
[d[id_field] for d in sparse_results],
normalize_fn([d["score"] for d in sparse_results])
))
else:
sparse_scores_norm = {}
if dense_results:
dense_scores_norm = dict(zip(
[d[id_field] for d in dense_results],
normalize_fn([d["score"] for d in dense_results])
))
else:
dense_scores_norm = {}
# Calcola score combinato
fused_docs = []
for doc_id in all_ids:
sparse_score = sparse_scores_norm.get(doc_id, 0.0)
dense_score = dense_scores_norm.get(doc_id, 0.0)
combined_score = sparse_weight * sparse_score + dense_weight * dense_score
# Prende il doc dal retriever che lo ha trovato
doc = sparse_map.get(doc_id) or dense_map.get(doc_id)
fused_docs.append({
**doc,
"combined_score": combined_score,
"sparse_score_norm": sparse_score,
"dense_score_norm": dense_score,
})
# Ordina per combined score
fused_docs.sort(key=lambda x: x["combined_score"], reverse=True)
return fused_docs[:top_k] if top_k else fused_docs
# Quando usare weighted vs RRF:
# - RRF: quando i retriever hanno scale molto diverse, come punto di partenza
# - Weighted + DBSF: quando vuoi controllare il bilanciamento sparse/dense
# basandoti su metriche di evaluation del tuo specifico dataset
# - Weighted + MinMax: più semplice, sensibile agli outlier di score
Re-Ranking con Cross-Encoder
La fusione (RRF o weighted) produce una lista di candidati ordinati per rilevanza stimata. Ma sia BM25 che i dense retriever usano bi-encoder: query e documento vengono encodati separatamente e la similarità è calcolata post-hoc. Questo è efficiente ma perde le interazioni fine-grained tra query e documento.
I cross-encoder elaborano query e documento insieme attraverso un modello transformer, permettendo al meccanismo di self-attention di catturare interazioni dirette tra i token della query e quelli del documento. Il risultato è un punteggio di rilevanza significativamente più accurato, ma a un costo computazionale proporzionale al numero di coppie (query, documento) da valutare.
# Cross-encoder re-ranking con sentence-transformers
from sentence_transformers import CrossEncoder
import time
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class CrossEncoderReranker:
"""
Re-ranker basato su cross-encoder per la fase di precision refinement.
Usa modello ms-marco per ranking di rilevanza query-documento.
"""
def __init__(
self,
model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
max_length: int = 512,
batch_size: int = 32,
device: Optional[str] = None, # None = auto-detect GPU/CPU
):
self.model = CrossEncoder(
model_name,
max_length=max_length,
device=device
)
self.batch_size = batch_size
logger.info(f"CrossEncoder loaded: {model_name}")
def rerank(
self,
query: str,
documents: list[dict],
text_field: str = "text",
top_k: Optional[int] = None,
) -> list[dict]:
"""
Re-ordina i documenti usando il cross-encoder.
Args:
query: Query originale dell'utente
documents: Lista di documenti da ri-ordinare (output del hybrid retriever)
text_field: Campo del documento che contiene il testo
top_k: Ritorna solo i top_k più rilevanti
Returns:
Documenti ri-ordinati con campo "rerank_score" aggiunto
"""
if not documents:
return []
start_time = time.time()
# Crea coppie (query, document_text) per il cross-encoder
query_doc_pairs = [
(query, doc[text_field]) for doc in documents
]
# Inferenza in batch per efficienza
scores = self.model.predict(
query_doc_pairs,
batch_size=self.batch_size,
show_progress_bar=False,
)
elapsed = time.time() - start_time
logger.debug(
f"Cross-encoder scored {len(documents)} docs in {elapsed:.3f}s "
f"({elapsed/len(documents)*1000:.1f}ms/doc)"
)
# Aggiunge score e ri-ordina
reranked = [
{**doc, "rerank_score": float(score)}
for doc, score in zip(documents, scores)
]
reranked.sort(key=lambda x: x["rerank_score"], reverse=True)
return reranked[:top_k] if top_k else reranked
# Pipeline completa: Hybrid Retrieval + Cross-Encoder Reranking
class RAGRetrievalPipeline:
def __init__(
self,
hybrid_retriever: HybridRetriever,
reranker: CrossEncoderReranker,
retrieval_top_k: int = 50, # Recupera molti per il reranker
final_top_k: int = 5, # Top-K finale per il LLM context
):
self.hybrid_retriever = hybrid_retriever
self.reranker = reranker
self.retrieval_top_k = retrieval_top_k
self.final_top_k = final_top_k
async def retrieve_for_llm(self, query: str) -> list[dict]:
"""
Pipeline completa: hybrid retrieval -> cross-encoder reranking.
Ottimizzato per massimizzare precision@5 (i 5 docs passati al LLM).
"""
# Step 1: Hybrid retrieval con largo top_k per il reranker
candidates = await self.hybrid_retriever.search(
query, final_top_k=self.retrieval_top_k
)
if not candidates:
return []
# Step 2: Re-ranking con cross-encoder
# Il reranker opera su self.retrieval_top_k docs, tipicamente 20-50
reranked = self.reranker.rerank(
query=query,
documents=candidates,
top_k=self.final_top_k
)
return reranked
# Performance tipica (GPU T4):
# - Hybrid retrieval (BM25 + HNSW): ~10-20ms
# - Cross-encoder reranking (20 docs): ~80-120ms
# - Cross-encoder reranking (50 docs): ~200-350ms
# Totale pipeline: ~100-370ms a seconda del top_k del reranker
Modelli Cross-Encoder Consigliati (2025)
- cross-encoder/ms-marco-MiniLM-L-6-v2: Bilanciamento ottimale velocità/accuracy. MAP 0.82 su MS MARCO. ~12ms/doc su GPU. Ideale per produzione.
- cross-encoder/ms-marco-MiniLM-L-12-v2: Più accurato, ~2x più lento. Per query ad alta priorità.
- BAAI/bge-reranker-v2-m3: Multilingue, ottimo per italiano. Supporta fino a 8192 token. Consigliato per RAG in italiano.
- Cohere Rerank API: Soluzione managed, ~50ms latenza, eccellente accuracy. Costo per query. Ottimo per proof-of-concept rapido.
- Jina Reranker v2: Open-source, 8192 token context, ottimo su testi tecnici.
Implementazione con Qdrant Sparse + Dense Vectors
Qdrant supporta nativamente il hybrid search attraverso la sua Query API con sparse vectors e il meccanismo di prefetch. A differenza di soluzioni che richiedono sistemi separati per sparse e dense, Qdrant gestisce entrambi in un'unica collection, semplificando significativamente l'architettura.
# Qdrant Hybrid Search: setup e query completa
from qdrant_client import QdrantClient
from qdrant_client import models
from fastembed import TextEmbedding, SparseTextEmbedding
import numpy as np
# Inizializzazione client e modelli di embedding
client = QdrantClient("localhost", port=6333)
# Modello dense: all-MiniLM-L6-v2 (384 dim, veloce)
dense_model = TextEmbedding("sentence-transformers/all-MiniLM-L6-v2")
# Modello sparse: BM25 tramite FastEmbed
sparse_model = SparseTextEmbedding("Qdrant/bm25")
COLLECTION_NAME = "hybrid_rag_collection"
def create_hybrid_collection():
"""Crea collection con supporto sparse + dense vectors."""
client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config={
"dense": models.VectorParams(
size=384, # Dimensione all-MiniLM-L6-v2
distance=models.Distance.COSINE,
on_disk=False, # In-memory per latenza bassa
)
},
sparse_vectors_config={
"sparse": models.SparseVectorParams(
modifier=models.Modifier.IDF, # BM25-style IDF weighting
)
},
optimizers_config=models.OptimizersConfigDiff(
indexing_threshold=20000, # Inizia HNSW dopo 20k vettori
),
)
print(f"Collection '{COLLECTION_NAME}' creata con sparse + dense support")
def index_documents(documents: list[dict]):
"""
Indicizza documenti con vettori dense e sparse.
Args:
documents: Lista di {'id': str, 'text': str, 'metadata': dict}
"""
texts = [doc["text"] for doc in documents]
# Genera dense embeddings in batch
dense_embeddings = list(dense_model.embed(texts))
# Genera sparse embeddings (BM25) in batch
sparse_embeddings = list(sparse_model.embed(texts))
# Prepara i points per Qdrant
points = []
for i, doc in enumerate(documents):
sparse_emb = sparse_embeddings[i]
points.append(
models.PointStruct(
id=i, # Usa ID numerico o UUID
payload={
"text": doc["text"],
**doc.get("metadata", {})
},
vector={
"dense": dense_embeddings[i].tolist(),
"sparse": models.SparseVector(
indices=sparse_emb.indices.tolist(),
values=sparse_emb.values.tolist(),
)
}
)
)
# Upsert in batch
client.upsert(
collection_name=COLLECTION_NAME,
points=points,
wait=True
)
print(f"Indicizzati {len(documents)} documenti")
def hybrid_search_qdrant(
query: str,
top_k: int = 10,
prefetch_k: int = 50,
fusion: str = "rrf", # "rrf" | "dbsf"
) -> list[dict]:
"""
Esegue hybrid search con Qdrant Query API.
Usa il meccanismo di prefetch: recupera top_k*5 candidati da ogni
retriever, poi li fonde con RRF o DBSF.
"""
# Genera query embeddings
query_dense = list(dense_model.embed([query]))[0].tolist()
query_sparse_emb = list(sparse_model.embed([query]))[0]
query_sparse = models.SparseVector(
indices=query_sparse_emb.indices.tolist(),
values=query_sparse_emb.values.tolist(),
)
# Fusion method
fusion_model = (
models.Fusion.RRF if fusion == "rrf"
else models.Fusion.DBSF
)
# Query API con prefetch (hybrid search nativo Qdrant)
results = client.query_points(
collection_name=COLLECTION_NAME,
prefetch=[
# Prefetch BM25 sparse
models.Prefetch(
query=query_sparse,
using="sparse",
limit=prefetch_k,
),
# Prefetch dense semantic
models.Prefetch(
query=query_dense,
using="dense",
limit=prefetch_k,
),
],
query=models.FusionQuery(fusion=fusion_model),
limit=top_k,
with_payload=True,
)
return [
{
"doc_id": str(point.id),
"text": point.payload.get("text", ""),
"score": point.score,
"payload": point.payload
}
for point in results.points
]
# Esempio di utilizzo completo
if __name__ == "__main__":
# Crea la collection
create_hybrid_collection()
# Indicizza documenti di esempio
sample_docs = [
{"id": "1", "text": "Qdrant is a vector database optimized for ANN search with filtering", "metadata": {"category": "vectordb"}},
{"id": "2", "text": "BM25 algorithm for information retrieval with term frequency saturation", "metadata": {"category": "ir"}},
{"id": "3", "text": "Hybrid search combines sparse BM25 and dense vector embeddings", "metadata": {"category": "hybrid"}},
{"id": "4", "text": "Reciprocal Rank Fusion merges multiple ranked lists into a single ranking", "metadata": {"category": "fusion"}},
]
index_documents(sample_docs)
# Esegui query ibrida
query = "BM25 hybrid vector search fusion"
results = hybrid_search_qdrant(query, top_k=3, prefetch_k=20, fusion="rrf")
print(f"\nRisultati per: '{query}'")
for r in results:
print(f" Score: {r['score']:.4f} | {r['text'][:70]}...")
Evaluation: NDCG, MRR e Precision@k
Costruire un sistema hybrid retrieval senza un framework di evaluation e costruire senza misure. Prima di ottimizzare qualsiasi parametro (k di RRF, peso sparse/dense, threshold del reranker), serve un dataset di test con ground truth e metriche definite. Le tre metriche più importanti per retrieval sono NDCG, MRR e Precision@k.
# Framework di evaluation per hybrid retrieval
import numpy as np
from typing import Optional
def ndcg_at_k(
retrieved_ids: list[str],
relevant_ids: list[str],
k: int,
relevance_grades: Optional[dict] = None
) -> float:
"""
Normalized Discounted Cumulative Gain @k.
Misura la qualità del ranking considerando la posizione dei documenti rilevanti.
Valori in [0, 1], 1 = ranking perfetto.
Args:
retrieved_ids: Lista di doc_id nell'ordine recuperato
relevant_ids: Lista di doc_id rilevanti (ground truth)
k: Cut-off
relevance_grades: Dizionario {doc_id: grade} per rilevanza graduata (opzionale)
"""
relevant_set = set(relevant_ids)
top_k = retrieved_ids[:k]
# DCG: Discounted Cumulative Gain
dcg = 0.0
for i, doc_id in enumerate(top_k):
if relevance_grades:
grade = relevance_grades.get(doc_id, 0)
else:
grade = 1.0 if doc_id in relevant_set else 0.0
# Formula: rel_i / log2(i + 2) (i 0-indexed, log2(2) per i=0)
dcg += grade / np.log2(i + 2)
# IDCG: Ideal DCG (ranking perfetto)
if relevance_grades:
ideal_grades = sorted(
[relevance_grades.get(rid, 0) for rid in relevant_ids],
reverse=True
)[:k]
else:
ideal_grades = [1.0] * min(len(relevant_ids), k)
idcg = sum(
grade / np.log2(i + 2)
for i, grade in enumerate(ideal_grades)
)
return dcg / idcg if idcg > 0 else 0.0
def mrr(retrieved_results: list[list[str]], relevant_results: list[list[str]]) -> float:
"""
Mean Reciprocal Rank.
Media del reciproco del rank del primo risultato rilevante.
"""
reciprocal_ranks = []
for retrieved, relevant in zip(retrieved_results, relevant_results):
relevant_set = set(relevant)
rr = 0.0
for rank, doc_id in enumerate(retrieved, start=1):
if doc_id in relevant_set:
rr = 1.0 / rank
break
reciprocal_ranks.append(rr)
return np.mean(reciprocal_ranks)
def evaluate_retriever(
retriever_fn, # Funzione che prende query e ritorna lista di doc_id
test_queries: list[dict], # Lista di {'query': str, 'relevant_ids': list[str]}
k_values: list[int] = [1, 3, 5, 10]
) -> dict:
"""
Valuta un retriever su un test set con più metriche.
"""
all_retrieved = []
all_relevant = []
ndcg_scores = {k: [] for k in k_values}
for item in test_queries:
query = item["query"]
relevant_ids = item["relevant_ids"]
# Recupera risultati
results = retriever_fn(query, top_k=max(k_values))
retrieved_ids = [r["doc_id"] for r in results]
all_retrieved.append(retrieved_ids)
all_relevant.append(relevant_ids)
# Calcola NDCG per ogni k
for k in k_values:
ndcg = ndcg_at_k(retrieved_ids, relevant_ids, k)
ndcg_scores[k].append(ndcg)
# Aggrega metriche
metrics = {
"MRR": mrr(all_retrieved, all_relevant),
}
for k in k_values:
metrics[f"NDCG@{k}"] = np.mean(ndcg_scores[k])
return metrics
# Esempio: confronto sparse-only vs dense-only vs hybrid
def run_ablation_study(test_queries, sparse_retriever, dense_retriever, hybrid_retriever):
print("=== Ablation Study: Retrieval Methods ===\n")
for name, retriever in [
("BM25 only", sparse_retriever.retrieve),
("Dense only", dense_retriever.search),
("Hybrid RRF", hybrid_retriever.search),
]:
metrics = evaluate_retriever(retriever, test_queries)
print(f"{name}:")
for metric, value in metrics.items():
print(f" {metric}: {value:.4f}")
print()
Pipeline di Produzione
Una pipeline hybrid retrieval in produzione deve gestire più aspetti oltre alla semplice correttezza: latenza (SLA tipici: p95 sotto 500ms), caching per query frequenti, monitoring della qualità nel tempo e graceful degradation quando uno dei componenti va down.
# Pipeline di produzione con caching, monitoring e fallback
import asyncio
import hashlib
import json
import time
from functools import lru_cache
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class ProductionHybridPipeline:
"""
Pipeline hybrid retrieval production-ready con:
- Cache query risultati (TTL configurabile)
- Metriche latenza per monitoring
- Fallback a dense-only se sparse non disponibile
- Circuit breaker per reranker
"""
def __init__(
self,
sparse_retriever,
dense_retriever,
reranker: Optional[CrossEncoderReranker] = None,
cache_ttl_seconds: int = 300,
retrieval_top_k: int = 50,
final_top_k: int = 5,
reranker_enabled: bool = True,
reranker_timeout_ms: float = 500,
):
self.sparse = sparse_retriever
self.dense = dense_retriever
self.reranker = reranker
self.cache: dict = {}
self.cache_ttl = cache_ttl_seconds
self.retrieval_top_k = retrieval_top_k
self.final_top_k = final_top_k
self.reranker_enabled = reranker_enabled
self.reranker_timeout_ms = reranker_timeout_ms
# Metriche (in produzione: Prometheus o simili)
self.metrics = {
"total_queries": 0,
"cache_hits": 0,
"reranker_timeouts": 0,
"sparse_failures": 0,
"avg_latency_ms": 0.0,
}
def _cache_key(self, query: str) -> str:
"""Hash deterministico della query per cache key."""
return hashlib.sha256(query.encode()).hexdigest()[:16]
def _get_cached(self, cache_key: str) -> Optional[list[dict]]:
"""Recupera dalla cache se non scaduta."""
if cache_key in self.cache:
cached_at, results = self.cache[cache_key]
if time.time() - cached_at < self.cache_ttl:
return results
del self.cache[cache_key]
return None
async def search(
self,
query: str,
use_cache: bool = True,
force_rerank: bool = False,
) -> dict:
"""
Esegue la pipeline completa con caching e monitoring.
Returns:
{
'results': list[dict],
'latency_ms': float,
'cache_hit': bool,
'reranked': bool,
'fallback_mode': bool,
}
"""
start = time.time()
self.metrics["total_queries"] += 1
cache_key = self._cache_key(query)
# Controlla cache
if use_cache:
cached = self._get_cached(cache_key)
if cached:
self.metrics["cache_hits"] += 1
return {
"results": cached,
"latency_ms": (time.time() - start) * 1000,
"cache_hit": True,
"reranked": False,
"fallback_mode": False,
}
fallback_mode = False
# Hybrid retrieval con fallback
try:
sparse_task = asyncio.create_task(
self.sparse.search(query, self.retrieval_top_k)
)
dense_task = asyncio.create_task(
self.dense.search(query, self.retrieval_top_k)
)
sparse_results, dense_results = await asyncio.gather(
sparse_task, dense_task
)
candidates = reciprocal_rank_fusion(
[sparse_results, dense_results], k=60
)[:self.retrieval_top_k]
except Exception as e:
logger.warning(f"Sparse retriever failed: {e}. Falling back to dense-only.")
self.metrics["sparse_failures"] += 1
fallback_mode = True
candidates = await self.dense.search(query, self.retrieval_top_k)
# Re-ranking opzionale con timeout
reranked = False
if self.reranker and (self.reranker_enabled or force_rerank) and candidates:
try:
rerank_start = time.time()
final_results = self.reranker.rerank(
query=query,
documents=candidates[:20], # Max 20 per contenere latenza
top_k=self.final_top_k,
)
rerank_elapsed = (time.time() - rerank_start) * 1000
if rerank_elapsed > self.reranker_timeout_ms:
logger.warning(f"Reranker slow: {rerank_elapsed:.0f}ms")
self.metrics["reranker_timeouts"] += 1
reranked = True
except Exception as e:
logger.error(f"Reranker failed: {e}. Using hybrid results.")
final_results = candidates[:self.final_top_k]
else:
final_results = candidates[:self.final_top_k]
# Aggiorna cache e metriche
self.cache[cache_key] = (time.time(), final_results)
latency = (time.time() - start) * 1000
self.metrics["avg_latency_ms"] = (
self.metrics["avg_latency_ms"] * 0.95 + latency * 0.05 # EMA
)
return {
"results": final_results,
"latency_ms": latency,
"cache_hit": False,
"reranked": reranked,
"fallback_mode": fallback_mode,
}
Best Practices e Anti-Pattern
Costruire un sistema hybrid retrieval efficace richiede di evitare alcune trappole comuni che emergono in produzione e che non sono evidenti nei tutorial di base.
Best Practices Hybrid Retrieval
- Inizia con RRF k=60: E' il default empiricamente più robusto. Sperimenta altri valori solo dopo aver stabilito una baseline con metriche NDCG.
- top_k per retriever >= 3x final_top_k: Se vuoi i top 5 finali, recupera almeno 15 da ogni retriever per dare alla fusione materiale sufficiente.
- Tokenizzazione consistente: BM25 e l'embedding model devono usare la stessa pipeline di preprocessing (lowercase, stopwords, stemming) per coerenza.
- Cross-encoder su max 20-50 docs: Oltre 50 candidati il guadagno di precision è marginale rispetto al costo di latenza aggiunto.
- Valuta separatamente sparse e dense: Prima dell'integrazione, misura le metriche di ciascun componente. Se il dense retriever è già al 90% NDCG@5, l'ibrido potrebbe non aggiungere valore per il tuo dataset specifico.
- Cache a livello di query normalizzata: Lowercase e trim della query prima dell'hashing per massimizzare i cache hit rate.
Anti-Pattern da Evitare
- Combinare score non normalizzati: "BM25_score + cosine_score" senza normalizzazione produce risultati dominati dal retriever con scala più grande (quasi sempre BM25).
- Usare il reranker su tutti i risultati del retrieval: Re-rankare 200 docs aggiunge 2-3 secondi di latenza. Limita sempre il reranker a 20-50 candidati.
- Ignorare la qualità dei chunk: Hybrid retrieval non risolve chunk mal formati (troppo corti, tagliati a meta di un concetto). La qualità dell'indexing è il prerequisito fondamentale.
- Ottimizzare senza test set: Cambiare i pesi sparse/dense o il k di RRF senza misurare su un dataset di test porta a overfitting su impressioni soggettive.
- Non gestire il fallback: Se il BM25 index va offline, il sistema deve degradare gracefully a dense-only, non crashare.
Quando Hybrid Retrieval Non Basta
Il hybrid retrieval risolve molti problemi, ma non tutti. Se dopo l'implementazione le metriche di retrieval sono ancora insufficienti, considera queste strade avanzate:
- HyDE (Hypothetical Document Embeddings): Il LLM genera una risposta ipotetica alla query, che viene poi usata come query per il retriever. Migliora il recall semantico su query astratte o mal formate.
- Query expansion: Genera varianti della query (sinonimi, riformulazioni) con il LLM ed esegui il retrieval su tutte, poi fonde i risultati con RRF.
- SPLADE: Modello learned sparse che produce vettori sparsi "intelligenti" invece di term frequency pura. Più accurato di BM25 ma richiede inferenza ML.
- ColBERT/ColPali: Late interaction model che confronta ogni token della query con ogni token del documento. Accuracy superiore al cross-encoder con latenza di retrieval (non reranking).
- GraphRAG: Augmenta il vector retrieval con un knowledge graph che cattura relazioni strutturate tra entità. Ideale per domande che richiedono ragionamento multi-hop.
Conclusioni
Il Hybrid Retrieval è oggi la strategia standard per sistemi RAG di produzione che devono funzionare su query eterogenee: dal termine tecnico esatto alla domanda concettuale vaga. La combinazione BM25 + dense con RRF fornisce un baseline già molto robusto, che il cross-encoder re-ranking porta a livelli di precision difficilmente raggiungibili con approcci singoli.
La chiave per implementarlo con successo è nell'ordine delle operazioni: costruisci prima un test set con ground truth reale del tuo dominio, stabilisci baseline separate per BM25 e dense, poi sperimenta la fusione e misura il delta. Solo con metriche concrete (NDCG@5, MRR) puoi sapere se l'aggiunta del reranker vale i 200ms extra di latenza per il tuo caso d'uso.
Prossimi Passi
- Continua con LangChain RAG Pipeline: da Documento a Risposta per integrare questo retriever in una pipeline completa con LLM.
- Leggi RAG in Production: Monitoring, Evaluation, Optimization per un framework completo di evaluation e monitoring in produzione.
- Esplora Embedding e Vector Search: BERT vs Sentence Transformers per approfondire la scelta del modello dense ottimale per il tuo dominio.
- Considera pgvector e PostgreSQL AI se vuoi implementare hybrid search direttamente nel tuo database PostgreSQL esistente.
Risorse e Riferimenti
- Qdrant Hybrid Search Documentation - Query API e Sparse Vectors
- Cormack, Clarke, Buettcher (2009) - "Reciprocal Rank Fusion outperforms Condorcet and individual Rank Learning Methods"
- BEIR Benchmark - Heterogeneous Retrieval Benchmark
- sentence-transformers/cross-encoder documentation
- MTEB (Massive Text Embedding Benchmark) - Leaderboard 2025







