Hybridní vyhledávání: Kombinace BM25 a vektorového vyhledávání pro produkční RAG
Sémantické vyhledávání s vkládáním způsobilo revoluci ve způsobu, jakým získáváme informace v systémech RAG, ale skrývá zásadní limit, který se v produkci pravidelně objevuje: pokud uživatel hledá "Srovnávací míra halucinací GPT-4 Q3 2024", model vkládání najde dokumenty sémanticky blízký konceptu „halucinace jazykových vzorů“, ale nemusí dokument obnovit přesně ten, který obsahuje tento konkrétní řetězec textu. Vyhledávání podle klíčových slov naopak najde přesně ta věta, ale neví, že „problém faktičnosti LLM“ je koncepčně totožný.
Il Hybridní vyhledávání se narodil právě proto, aby vyřešil toto napětí. Kombinace výzkumu řídké (BM25 a varianty) s hustým vyhledáváním (vektorové vyhledávání), získáte systém, který je jak přesný na přesné shody je robustní na sémantické porozumění. Nedávný výzkum ukazuje že hybridní systémy zlepšují kvalitu vyhledávání 48 % ve srovnání s jednotlivými metodami v benchmarcích BEIR a MTEB, se zvláště silným ziskem u technických dotazů, vlastních jmen a odbornou terminologií.
Tento článek je technickým hlubokým ponorem do architektury hybridního vyhledávání: z BM25, jak to funguje interně k fúzním metodám (Reciprocal Rank Fusion, vážená fúze), k přehodnocení s křížovým kodérem, až po praktickou implementaci s Qdrant a vyhodnocení s metrikami NDCG/MRR. Cílem je poskytnout nástroje pro vytváření a optimalizaci vyhledávacích kanálů, které fungují ve výrobě, nejen v benchmarcích.
Co se naučíte
- Omezení pouze sémantického vyhledávání a proč BM25 zůstává relevantní i v roce 2025
- Algoritmus BM25: časová frekvenční saturace, IDF vážení, délková normalizace
- Architektura hybridního vyhledávání: řídká + hustá paralelně
- Reciprocal Rank Fusion (RRF): vzorec, implementace a ladění parametru k
- Fúze váženého skóre: normalizace a vyvážení příspěvků
- Přehodnocení mezi kodérem: kdy jej použít a jak optimalizovat latenci vs. přesnost
- Implementace s řídkými vektory Qdrant a Query API
- Metriky hodnocení: NDCG@k, MRR, Precision@k pro hybridní vyhledávání
- Produkční kanál s cachováním, monitorováním a progresivní optimalizací
Limity pouze sémantického výzkumu
Sémantické vyhledávání s hustými vektory je mocné pro zachycení latentního významu textu, ale má strukturální zranitelnosti, které se projeví při skutečných produkčních dotazech. Pochopení těchto omezení je prvním krokem k pochopení toho, proč je hybridní vyhledávání nezbytné a nejen volitelné u seriózních systémů RAG.
Hlavním problémem je to, co vědci nazývají nesoulad slovní zásoby: model vkládání je trénován na obecných distribucích textu a ne vždy zachytí relevanci konkrétních technických termínů, akronymů, názvů produktů, verzí softwaru popř identifikační kódy. Model vkládání neví, že „MSMARCO-v2.1“ odkazuje na datovou sadu konkrétní nebo že „CVE-2024-4577“ je kritická zranitelnost PHP, pokud nebyla vyladěné na této doméně.
Když sémantické vyhledávání selže
- Dotazy s čísly verzí: "Python 3.12 asyncio.TaskGroup" vs "Asynchronní vzory Pythonu"
- Jedinečné identifikátory: CVE ID, objednací čísla, daňové kódy, ISBN
- Neobvyklé zkratky: doménové termíny, zkratky společností, regulační kódy
- Vzácná křestní jména: jména lidí, malé firmy, zeměpisné polohy
- Velmi krátké dotazy: s 1-2 tokeny není vkládání příliš diskriminační
- Nejnovější technická terminologie: modely se znalostními limity neznají nové pojmy
Druhým problémem je kalibrace skóre: skóre podobnosti hustých vektorů (typicky kosinusová podobnost v rozsahu [-1, 1] nebo neomezený součin teček) nemají sémantiku absolutní. Dokument se skóre 0,85 nemusí být nutně relevantnější než dokument se skóre 0,82 palce různé kontexty. To ztěžuje porovnávání nebo kombinování skóre z různých systémů bez patřičné normalizace.
Konečně trpí sémantické vyhledávání sémantický drift na nejednoznačné dotazy: dotaz jako "Java" v kontextu programování může načíst dokumenty o "Java Island" zejména pokud kontext dokumentu není dostatečně jasný pro model vkládání s velmi krátkými nebo dekontextualizovanými kusy textu.
BM25: Refresh of the Fundamental Algorithm
BM25 (Best Match 25) je funkce hodnocení vyvinutá v 90. letech minulého století, která zůstává i v roce 2025 jeden z nejúčinnějších algoritmů vyhledávání informací pro výzkum klíčových slov. Pochopení jeho vnitřní fungování je nezbytné jak pro jeho správné použití, tak pro pochopení proč tak dobře doplňuje sémantické vyhledávání.
BM25 rozšiřuje TF-IDF o dva klíčové mechanismy: termínová frekvenční saturace e normalizace délky. Kompletní vzorec pro hodnocení dokumentu D s ohledem na dotaz Q s pojmy {q1, ..., qn} a:
# Formula BM25 (pseudocodice matematico)
# score(D, Q) = sommatoria per qi in Q di:
# IDF(qi) * (TF(qi, D) * (k1 + 1)) / (TF(qi, D) + k1 * (1 - b + b * |D| / avgdl))
#
# Dove:
# IDF(qi) = log((N - df_i + 0.5) / (df_i + 0.5) + 1)
# TF(qi, D) = frequenza del termine qi nel documento D
# |D| = lunghezza del documento D in termini
# avgdl = lunghezza media dei documenti nella collection
# N = numero totale di documenti
# df_i = numero di documenti che contengono qi
# k1 = parametro di saturazione TF (default: 1.2-2.0)
# b = parametro di length normalization (default: 0.75)
# Implementazione Python con rank_bm25
from rank_bm25 import BM25Okapi
import nltk
from nltk.tokenize import word_tokenize
class BM25Retriever:
def __init__(self, corpus: list[str], k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
# Tokenizzazione e lowercase
self.tokenized_corpus = [
word_tokenize(doc.lower()) for doc in corpus
]
self.bm25 = BM25Okapi(self.tokenized_corpus, k1=k1, b=b)
self.corpus = corpus
def retrieve(self, query: str, top_k: int = 20) -> list[dict]:
tokenized_query = word_tokenize(query.lower())
scores = self.bm25.get_scores(tokenized_query)
# Crea lista di (index, score) ordinata per score decrescente
ranked = sorted(
enumerate(scores),
key=lambda x: x[1],
reverse=True
)[:top_k]
return [
{"doc_id": idx, "text": self.corpus[idx], "score": score}
for idx, score in ranked
if score > 0 # Filtra documenti senza match
]
# Uso pratico
corpus = [
"BM25 is a ranking function used in information retrieval",
"Vector search uses dense embeddings for semantic similarity",
"Hybrid search combines BM25 and vector search for better recall",
"Python asyncio enables concurrent programming",
]
retriever = BM25Retriever(corpus, k1=1.5, b=0.75)
results = retriever.retrieve("BM25 hybrid search retrieval", top_k=3)
for r in results:
print(f"Score: {r['score']:.4f} | Text: {r['text'][:60]}...")
Parametr k1 řídí saturaci termínové frekvence: s nízkou k1 (0,5), na rozdílu mezi 1 a 2 výskyty termínu záleží téměř stejně jako na rozdílu mezi 10 a 100 výskyty; s vysokým k1 (2,0) má TF nadále vliv i pro vysoké frekvence. Parametr b řídí, jak moc se mají penalizovat dlouhé dokumenty: b=0 deaktivuje normalizace délky, b=1 ji zcela normalizuje.
Často přehlíženým aspektem BM25 je, že má skóre není ohraničena nadřazeně: velmi relevantní dokument s mnoha výskyty hledaného výrazu může mít skóre 10, 50 nebo 100 v závislosti na korpusu. To vytváří přímý problém s kompatibilitou se skóre kosinové podobnosti hustých vektorů, které jsou v [-1, 1]. Hybridní fúze musí zvládnout tento rozpor.
Architektura hybridního vyhledávání: Jak je kombinovat
Základní architektura hybridního vyhledávacího systému provádí paralelní řídké a husté vyhledávání na samostatné indexování, poté výsledky sloučí, než je vrátí uživateli (nebo LLM v kontextu RAG). Existují tři architektonické body, kde může dojít k fúzi, s různými kompromisy:
- Včasná fúze (před získáním): dokumenty jsou reprezentovány vektorem hybrid, který před indexováním kombinuje řídké a husté prvky. Příklady: SPLADE, ColBERT v režimu end-to-end. Dražší z hlediska indexování, ale konzistentnější.
- Pozdní fúze (po získání): dva retrívry pracují nezávisle na indexech odděleny a výsledky jsou sloučeny na úrovni pořadí. Je to nejběžnější a nejflexibilnější přístup, umožňuje nezávisle aktualizovat komponenty.
- Fáze přehodnocení: samostatný model (křížový kodér) přeuspořádá výsledky sloučené pozdní fúzí. Přidává latenci, ale výrazně zlepšuje precision@k.
# Architettura base hybrid retrieval con late fusion
from typing import Protocol
import asyncio
class Retriever(Protocol):
async def search(self, query: str, top_k: int) -> list[dict]:
"""Ritorna lista di {'doc_id': str, 'text': str, 'score': float}"""
...
class HybridRetriever:
def __init__(
self,
sparse_retriever: Retriever,
dense_retriever: Retriever,
fusion_method: str = "rrf", # "rrf" | "weighted" | "dbsf"
sparse_weight: float = 0.4,
dense_weight: float = 0.6,
top_k_per_retriever: int = 50, # Recupera più docs per la fusione
):
self.sparse = sparse_retriever
self.dense = dense_retriever
self.fusion_method = fusion_method
self.sparse_weight = sparse_weight
self.dense_weight = dense_weight
self.top_k_per_retriever = top_k_per_retriever
async def search(self, query: str, final_top_k: int = 10) -> list[dict]:
# Esecuzione parallela dei due retriever
sparse_results, dense_results = await asyncio.gather(
self.sparse.search(query, self.top_k_per_retriever),
self.dense.search(query, self.top_k_per_retriever)
)
if self.fusion_method == "rrf":
return self._rrf_fusion(sparse_results, dense_results, final_top_k)
elif self.fusion_method == "weighted":
return self._weighted_fusion(sparse_results, dense_results, final_top_k)
else:
raise ValueError(f"Unknown fusion method: {self.fusion_method}")
def _rrf_fusion(self, sparse: list[dict], dense: list[dict], k: int) -> list[dict]:
# Implementato nella sezione successiva
pass
def _weighted_fusion(self, sparse: list[dict], dense: list[dict], k: int) -> list[dict]:
# Implementato nella sezione successiva
pass
Reciprocal Rank Fusion (RRF)
RRF je nejpoužívanějším fúzním algoritmem v hybridním vyhledávání díky své jednoduchosti a robustnosti a nezávislost na bodové škále. Původně navrhli Cormack, Clarke a Buettcher v roce 2009 přiděluje každému dokumentu skóre pouze na základě jeho pozice v žebříčku každého retrívra, přičemž zcela ignoruje absolutní hodnotu skóre.
Vzorec RRF pro hodnocení dokumentu D objevujícího se v seznamech L1, L2, ..., Lm a:
RRF(D) = součet pro i=1..m z: 1 / (k + pořadí_i(D))
Kde k je konstanta (obvykle 60), která tlumí dopad dokumentů v horní části seznamů. Pokud se D neobjeví v seznamu i, jeho příspěvek je 0. Parametr k=60 byl stanoven empiricky: typické hodnoty se pohybují od 10 do 100.
# Implementazione completa RRF
from collections import defaultdict
def reciprocal_rank_fusion(
result_lists: list[list[dict]],
k: int = 60,
id_field: str = "doc_id"
) -> list[dict]:
"""
Fonde N liste di risultati usando Reciprocal Rank Fusion.
Args:
result_lists: Lista di liste, ognuna ordinata per relevance decrescente
k: Costante di smorzamento (default 60, range consigliato 10-100)
id_field: Campo usato come identificatore univoco del documento
Returns:
Lista fusa ordinata per RRF score decrescente
"""
rrf_scores = defaultdict(float)
doc_registry = {} # Mappa doc_id -> doc completo
for result_list in result_lists:
for rank, doc in enumerate(result_list, start=1):
doc_id = doc[id_field]
# Formula RRF: 1 / (k + rank)
rrf_scores[doc_id] += 1.0 / (k + rank)
# Salva il documento (usa l'ultimo trovato se compare più volte)
if doc_id not in doc_registry:
doc_registry[doc_id] = doc
# Ordina per RRF score decrescente
sorted_docs = sorted(
rrf_scores.items(),
key=lambda x: x[1],
reverse=True
)
# Costruisce la lista finale con score
return [
{**doc_registry[doc_id], "rrf_score": score}
for doc_id, score in sorted_docs
]
# Esempio di utilizzo
sparse_results = [
{"doc_id": "doc_A", "text": "BM25 text...", "score": 12.5},
{"doc_id": "doc_B", "text": "...", "score": 8.3},
{"doc_id": "doc_C", "text": "...", "score": 5.1},
]
dense_results = [
{"doc_id": "doc_C", "text": "...", "score": 0.92}, # Doc C primo in dense
{"doc_id": "doc_A", "text": "BM25 text...", "score": 0.88},
{"doc_id": "doc_D", "text": "...", "score": 0.85}, # Solo in dense
]
fused = reciprocal_rank_fusion(
[sparse_results, dense_results],
k=60
)
# RRF scores:
# doc_A: 1/(60+1) + 1/(60+2) = 0.01639 + 0.01613 = 0.03252
# doc_C: 1/(60+3) + 1/(60+1) = 0.01587 + 0.01639 = 0.03226
# doc_B: 1/(60+2) = 0.01613
# doc_D: 1/(60+3) = 0.01587
for doc in fused:
print(f"{doc['doc_id']}: RRF={doc['rrf_score']:.5f}")
Síla RRF spočívá v jeho robustnost pro dosažení odlehlých hodnot: To je jedno pokud BM25 přiřadí skóre 100 prvnímu dokumentu a 50 druhému, zatímco skóre podobnosti je 0,99 a 0,97. Jediné, na čem záleží, je relativní poloha. Díky tomu je zvláště vhodný, když tito dva retrívři mají úplně jiné bodovací stupnice.
Parametr k ovlivňuje, jakou váhu mají dokumenty na vysokých pozicích ve srovnání s těmi na nízkých pozicích. S k=60 získá dokument na pozici 1 1/61 = 0,0164, jeden na 60. místě získá 1/120 = 0,0083: první má hodnotu méně než dvojnásobek posledního. Při k=10 má první (1/11 = 0,091) téměř 7násobek šedesátiny (1/70 = 0,014): hodnocení plus „vítěz bere vše“. Pro většinu případů je k=60 dobrým výchozím bodem.
Spojení váženého skóre s normalizací
Vážená fúze kombinuje absolutní skóre namísto hodnocení, což vám umožňuje kontrolovat, kolik váhu dát každému retrívrovi. Hlavním problémem je normalizace skóre: BM25 a kosinusová podobnost žijí ve zcela odlišných měřítcích, proto přímá kombinace ("bm25_score * 0,4 + husté_score * 0,6") nemá bez normalizace smysl.
# Weighted fusion con normalizzazione Min-Max e normalizzazione Z-score
import numpy as np
from typing import Optional
def min_max_normalize(scores: list[float]) -> list[float]:
"""Normalizza scores in [0, 1] usando Min-Max scaling."""
if not scores:
return []
min_val = min(scores)
max_val = max(scores)
if max_val == min_val:
return [1.0] * len(scores) # Tutti uguali -> tutti 1.0
return [(s - min_val) / (max_val - min_val) for s in scores]
def dbsf_normalize(scores: list[float]) -> list[float]:
"""
Distribution-Based Score Fusion (DBSF) normalization.
Usa mean e std per normalizzazione più robusta agli outlier.
"""
if not scores:
return []
mean = np.mean(scores)
std = np.std(scores)
if std == 0:
return [0.5] * len(scores)
# Clamp tra 0 e 1 dopo la trasformazione
normalized = [(s - mean) / (3 * std) + 0.5 for s in scores]
return [max(0.0, min(1.0, n)) for n in normalized]
def weighted_fusion(
sparse_results: list[dict],
dense_results: list[dict],
sparse_weight: float = 0.3,
dense_weight: float = 0.7,
normalization: str = "minmax", # "minmax" | "dbsf"
top_k: Optional[int] = None,
id_field: str = "doc_id"
) -> list[dict]:
"""
Combina risultati sparse e dense con weighted fusion normalizzata.
"""
# Costruisce dizionari per lookup veloce
sparse_map = {d[id_field]: d for d in sparse_results}
dense_map = {d[id_field]: d for d in dense_results}
# Tutti i doc_ids unici
all_ids = set(sparse_map.keys()) | set(dense_map.keys())
# Normalizza gli score di ciascun retriever
normalize_fn = min_max_normalize if normalization == "minmax" else dbsf_normalize
if sparse_results:
sparse_scores_norm = dict(zip(
[d[id_field] for d in sparse_results],
normalize_fn([d["score"] for d in sparse_results])
))
else:
sparse_scores_norm = {}
if dense_results:
dense_scores_norm = dict(zip(
[d[id_field] for d in dense_results],
normalize_fn([d["score"] for d in dense_results])
))
else:
dense_scores_norm = {}
# Calcola score combinato
fused_docs = []
for doc_id in all_ids:
sparse_score = sparse_scores_norm.get(doc_id, 0.0)
dense_score = dense_scores_norm.get(doc_id, 0.0)
combined_score = sparse_weight * sparse_score + dense_weight * dense_score
# Prende il doc dal retriever che lo ha trovato
doc = sparse_map.get(doc_id) or dense_map.get(doc_id)
fused_docs.append({
**doc,
"combined_score": combined_score,
"sparse_score_norm": sparse_score,
"dense_score_norm": dense_score,
})
# Ordina per combined score
fused_docs.sort(key=lambda x: x["combined_score"], reverse=True)
return fused_docs[:top_k] if top_k else fused_docs
# Quando usare weighted vs RRF:
# - RRF: quando i retriever hanno scale molto diverse, come punto di partenza
# - Weighted + DBSF: quando vuoi controllare il bilanciamento sparse/dense
# basandoti su metriche di evaluation del tuo specifico dataset
# - Weighted + MinMax: più semplice, sensibile agli outlier di score
Přehodnocení pomocí Cross-Encoder
Fúze (RRF nebo vážená) vytvoří seznam kandidátů seřazený podle odhadované relevance. Ale používají jak BM25, tak husté retrívry bi-kodér: přijde dotaz a dokument kódovány odděleně a podobnost se vypočítává post-hoc. To je účinné, ale děravé jemné interakce mezi dotazem a dokumentem.
I křížový kodér společně zpracují dotaz a dokument prostřednictvím modelu transformátor, umožňující mechanismu sebepozorování zachytit přímé interakce mezi tokeny dotazu a tokeny dokumentů. Výsledkem je skóre relevance výrazně přesnější, ale za výpočetní náklady úměrné počtu párů (dotaz, dokument) k vyhodnocení.
# Cross-encoder re-ranking con sentence-transformers
from sentence_transformers import CrossEncoder
import time
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class CrossEncoderReranker:
"""
Re-ranker basato su cross-encoder per la fase di precision refinement.
Usa modello ms-marco per ranking di rilevanza query-documento.
"""
def __init__(
self,
model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
max_length: int = 512,
batch_size: int = 32,
device: Optional[str] = None, # None = auto-detect GPU/CPU
):
self.model = CrossEncoder(
model_name,
max_length=max_length,
device=device
)
self.batch_size = batch_size
logger.info(f"CrossEncoder loaded: {model_name}")
def rerank(
self,
query: str,
documents: list[dict],
text_field: str = "text",
top_k: Optional[int] = None,
) -> list[dict]:
"""
Re-ordina i documenti usando il cross-encoder.
Args:
query: Query originale dell'utente
documents: Lista di documenti da ri-ordinare (output del hybrid retriever)
text_field: Campo del documento che contiene il testo
top_k: Ritorna solo i top_k più rilevanti
Returns:
Documenti ri-ordinati con campo "rerank_score" aggiunto
"""
if not documents:
return []
start_time = time.time()
# Crea coppie (query, document_text) per il cross-encoder
query_doc_pairs = [
(query, doc[text_field]) for doc in documents
]
# Inferenza in batch per efficienza
scores = self.model.predict(
query_doc_pairs,
batch_size=self.batch_size,
show_progress_bar=False,
)
elapsed = time.time() - start_time
logger.debug(
f"Cross-encoder scored {len(documents)} docs in {elapsed:.3f}s "
f"({elapsed/len(documents)*1000:.1f}ms/doc)"
)
# Aggiunge score e ri-ordina
reranked = [
{**doc, "rerank_score": float(score)}
for doc, score in zip(documents, scores)
]
reranked.sort(key=lambda x: x["rerank_score"], reverse=True)
return reranked[:top_k] if top_k else reranked
# Pipeline completa: Hybrid Retrieval + Cross-Encoder Reranking
class RAGRetrievalPipeline:
def __init__(
self,
hybrid_retriever: HybridRetriever,
reranker: CrossEncoderReranker,
retrieval_top_k: int = 50, # Recupera molti per il reranker
final_top_k: int = 5, # Top-K finale per il LLM context
):
self.hybrid_retriever = hybrid_retriever
self.reranker = reranker
self.retrieval_top_k = retrieval_top_k
self.final_top_k = final_top_k
async def retrieve_for_llm(self, query: str) -> list[dict]:
"""
Pipeline completa: hybrid retrieval -> cross-encoder reranking.
Ottimizzato per massimizzare precision@5 (i 5 docs passati al LLM).
"""
# Step 1: Hybrid retrieval con largo top_k per il reranker
candidates = await self.hybrid_retriever.search(
query, final_top_k=self.retrieval_top_k
)
if not candidates:
return []
# Step 2: Re-ranking con cross-encoder
# Il reranker opera su self.retrieval_top_k docs, tipicamente 20-50
reranked = self.reranker.rerank(
query=query,
documents=candidates,
top_k=self.final_top_k
)
return reranked
# Performance tipica (GPU T4):
# - Hybrid retrieval (BM25 + HNSW): ~10-20ms
# - Cross-encoder reranking (20 docs): ~80-120ms
# - Cross-encoder reranking (50 docs): ~200-350ms
# Totale pipeline: ~100-370ms a seconda del top_k del reranker
Doporučené modely křížových kodérů (2025)
- křížový kodér/ms-marco-MiniLM-L-6-v2: Optimální poměr rychlost/přesnost. MAP 0,82 na MS MARCO. ~12 ms/doc na GPU. Ideální pro výrobu.
- křížový kodér/ms-marco-MiniLM-L-12-v2: Přesnější, ~2x pomalejší. Pro dotazy s vysokou prioritou.
- BAAI/bge-reranker-v2-m3: Vícejazyčné, vynikající pro italštinu. Podporuje až 8192 tokenů. Doporučeno pro RAG v italštině.
- Cohere Rerank API: Řízené řešení, latence ~50 ms, vynikající přesnost. Cena za dotaz. Skvělé pro rychlé ověření konceptu.
- Jina Reranker v2: Open-source, kontext tokenu 8192, vynikající na technické texty.
Implementace pomocí Qdrant Sparse + Dense Vectors
Qdrant nativně podporuje hybridní vyhledávání prostřednictvím svého Dotaz API s řídkými vektory a mechanismem přednačítání. Na rozdíl od řešení vyžadující samostatné systémy pro řídké a husté, Qdrant zvládá obojí v jednom kolekce, což výrazně zjednodušuje architekturu.
# Qdrant Hybrid Search: setup e query completa
from qdrant_client import QdrantClient
from qdrant_client import models
from fastembed import TextEmbedding, SparseTextEmbedding
import numpy as np
# Inizializzazione client e modelli di embedding
client = QdrantClient("localhost", port=6333)
# Modello dense: all-MiniLM-L6-v2 (384 dim, veloce)
dense_model = TextEmbedding("sentence-transformers/all-MiniLM-L6-v2")
# Modello sparse: BM25 tramite FastEmbed
sparse_model = SparseTextEmbedding("Qdrant/bm25")
COLLECTION_NAME = "hybrid_rag_collection"
def create_hybrid_collection():
"""Crea collection con supporto sparse + dense vectors."""
client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config={
"dense": models.VectorParams(
size=384, # Dimensione all-MiniLM-L6-v2
distance=models.Distance.COSINE,
on_disk=False, # In-memory per latenza bassa
)
},
sparse_vectors_config={
"sparse": models.SparseVectorParams(
modifier=models.Modifier.IDF, # BM25-style IDF weighting
)
},
optimizers_config=models.OptimizersConfigDiff(
indexing_threshold=20000, # Inizia HNSW dopo 20k vettori
),
)
print(f"Collection '{COLLECTION_NAME}' creata con sparse + dense support")
def index_documents(documents: list[dict]):
"""
Indicizza documenti con vettori dense e sparse.
Args:
documents: Lista di {'id': str, 'text': str, 'metadata': dict}
"""
texts = [doc["text"] for doc in documents]
# Genera dense embeddings in batch
dense_embeddings = list(dense_model.embed(texts))
# Genera sparse embeddings (BM25) in batch
sparse_embeddings = list(sparse_model.embed(texts))
# Prepara i points per Qdrant
points = []
for i, doc in enumerate(documents):
sparse_emb = sparse_embeddings[i]
points.append(
models.PointStruct(
id=i, # Usa ID numerico o UUID
payload={
"text": doc["text"],
**doc.get("metadata", {})
},
vector={
"dense": dense_embeddings[i].tolist(),
"sparse": models.SparseVector(
indices=sparse_emb.indices.tolist(),
values=sparse_emb.values.tolist(),
)
}
)
)
# Upsert in batch
client.upsert(
collection_name=COLLECTION_NAME,
points=points,
wait=True
)
print(f"Indicizzati {len(documents)} documenti")
def hybrid_search_qdrant(
query: str,
top_k: int = 10,
prefetch_k: int = 50,
fusion: str = "rrf", # "rrf" | "dbsf"
) -> list[dict]:
"""
Esegue hybrid search con Qdrant Query API.
Usa il meccanismo di prefetch: recupera top_k*5 candidati da ogni
retriever, poi li fonde con RRF o DBSF.
"""
# Genera query embeddings
query_dense = list(dense_model.embed([query]))[0].tolist()
query_sparse_emb = list(sparse_model.embed([query]))[0]
query_sparse = models.SparseVector(
indices=query_sparse_emb.indices.tolist(),
values=query_sparse_emb.values.tolist(),
)
# Fusion method
fusion_model = (
models.Fusion.RRF if fusion == "rrf"
else models.Fusion.DBSF
)
# Query API con prefetch (hybrid search nativo Qdrant)
results = client.query_points(
collection_name=COLLECTION_NAME,
prefetch=[
# Prefetch BM25 sparse
models.Prefetch(
query=query_sparse,
using="sparse",
limit=prefetch_k,
),
# Prefetch dense semantic
models.Prefetch(
query=query_dense,
using="dense",
limit=prefetch_k,
),
],
query=models.FusionQuery(fusion=fusion_model),
limit=top_k,
with_payload=True,
)
return [
{
"doc_id": str(point.id),
"text": point.payload.get("text", ""),
"score": point.score,
"payload": point.payload
}
for point in results.points
]
# Esempio di utilizzo completo
if __name__ == "__main__":
# Crea la collection
create_hybrid_collection()
# Indicizza documenti di esempio
sample_docs = [
{"id": "1", "text": "Qdrant is a vector database optimized for ANN search with filtering", "metadata": {"category": "vectordb"}},
{"id": "2", "text": "BM25 algorithm for information retrieval with term frequency saturation", "metadata": {"category": "ir"}},
{"id": "3", "text": "Hybrid search combines sparse BM25 and dense vector embeddings", "metadata": {"category": "hybrid"}},
{"id": "4", "text": "Reciprocal Rank Fusion merges multiple ranked lists into a single ranking", "metadata": {"category": "fusion"}},
]
index_documents(sample_docs)
# Esegui query ibrida
query = "BM25 hybrid vector search fusion"
results = hybrid_search_qdrant(query, top_k=3, prefetch_k=20, fusion="rrf")
print(f"\nRisultati per: '{query}'")
for r in results:
print(f" Score: {r['score']:.4f} | {r['text'][:70]}...")
Hodnocení: NDCG, MRR a Precision@k
Budování hybridního vyhledávacího systému bez hodnotícího rámce a budování bez měření. Před optimalizací jakéhokoli parametru (k RRF, řídká/hustá hmotnost, práh změny pořadí), potřebujete testovací datovou sadu se základní pravdou a definovanými metrikami. Tři nejdůležitější metriky pro vyhledávání jsou to NDCG, MRR a Precision@k.
# Framework di evaluation per hybrid retrieval
import numpy as np
from typing import Optional
def ndcg_at_k(
retrieved_ids: list[str],
relevant_ids: list[str],
k: int,
relevance_grades: Optional[dict] = None
) -> float:
"""
Normalized Discounted Cumulative Gain @k.
Misura la qualità del ranking considerando la posizione dei documenti rilevanti.
Valori in [0, 1], 1 = ranking perfetto.
Args:
retrieved_ids: Lista di doc_id nell'ordine recuperato
relevant_ids: Lista di doc_id rilevanti (ground truth)
k: Cut-off
relevance_grades: Dizionario {doc_id: grade} per rilevanza graduata (opzionale)
"""
relevant_set = set(relevant_ids)
top_k = retrieved_ids[:k]
# DCG: Discounted Cumulative Gain
dcg = 0.0
for i, doc_id in enumerate(top_k):
if relevance_grades:
grade = relevance_grades.get(doc_id, 0)
else:
grade = 1.0 if doc_id in relevant_set else 0.0
# Formula: rel_i / log2(i + 2) (i 0-indexed, log2(2) per i=0)
dcg += grade / np.log2(i + 2)
# IDCG: Ideal DCG (ranking perfetto)
if relevance_grades:
ideal_grades = sorted(
[relevance_grades.get(rid, 0) for rid in relevant_ids],
reverse=True
)[:k]
else:
ideal_grades = [1.0] * min(len(relevant_ids), k)
idcg = sum(
grade / np.log2(i + 2)
for i, grade in enumerate(ideal_grades)
)
return dcg / idcg if idcg > 0 else 0.0
def mrr(retrieved_results: list[list[str]], relevant_results: list[list[str]]) -> float:
"""
Mean Reciprocal Rank.
Media del reciproco del rank del primo risultato rilevante.
"""
reciprocal_ranks = []
for retrieved, relevant in zip(retrieved_results, relevant_results):
relevant_set = set(relevant)
rr = 0.0
for rank, doc_id in enumerate(retrieved, start=1):
if doc_id in relevant_set:
rr = 1.0 / rank
break
reciprocal_ranks.append(rr)
return np.mean(reciprocal_ranks)
def evaluate_retriever(
retriever_fn, # Funzione che prende query e ritorna lista di doc_id
test_queries: list[dict], # Lista di {'query': str, 'relevant_ids': list[str]}
k_values: list[int] = [1, 3, 5, 10]
) -> dict:
"""
Valuta un retriever su un test set con più metriche.
"""
all_retrieved = []
all_relevant = []
ndcg_scores = {k: [] for k in k_values}
for item in test_queries:
query = item["query"]
relevant_ids = item["relevant_ids"]
# Recupera risultati
results = retriever_fn(query, top_k=max(k_values))
retrieved_ids = [r["doc_id"] for r in results]
all_retrieved.append(retrieved_ids)
all_relevant.append(relevant_ids)
# Calcola NDCG per ogni k
for k in k_values:
ndcg = ndcg_at_k(retrieved_ids, relevant_ids, k)
ndcg_scores[k].append(ndcg)
# Aggrega metriche
metrics = {
"MRR": mrr(all_retrieved, all_relevant),
}
for k in k_values:
metrics[f"NDCG@{k}"] = np.mean(ndcg_scores[k])
return metrics
# Esempio: confronto sparse-only vs dense-only vs hybrid
def run_ablation_study(test_queries, sparse_retriever, dense_retriever, hybrid_retriever):
print("=== Ablation Study: Retrieval Methods ===\n")
for name, retriever in [
("BM25 only", sparse_retriever.retrieve),
("Dense only", dense_retriever.search),
("Hybrid RRF", hybrid_retriever.search),
]:
metrics = evaluate_retriever(retriever, test_queries)
print(f"{name}:")
for metric, value in metrics.items():
print(f" {metric}: {value:.4f}")
print()
Výrobní potrubí
Hybridní sběrné potrubí ve výrobě musí zvládnout více aspektů než jen to správnost: latence (Typická SLA: p95 pod 500 ms), ukládání do mezipaměti na časté dotazy, sledování kvality v čase e půvabná degradace když jedna ze součástí spadne.
# Pipeline di produzione con caching, monitoring e fallback
import asyncio
import hashlib
import json
import time
from functools import lru_cache
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class ProductionHybridPipeline:
"""
Pipeline hybrid retrieval production-ready con:
- Cache query risultati (TTL configurabile)
- Metriche latenza per monitoring
- Fallback a dense-only se sparse non disponibile
- Circuit breaker per reranker
"""
def __init__(
self,
sparse_retriever,
dense_retriever,
reranker: Optional[CrossEncoderReranker] = None,
cache_ttl_seconds: int = 300,
retrieval_top_k: int = 50,
final_top_k: int = 5,
reranker_enabled: bool = True,
reranker_timeout_ms: float = 500,
):
self.sparse = sparse_retriever
self.dense = dense_retriever
self.reranker = reranker
self.cache: dict = {}
self.cache_ttl = cache_ttl_seconds
self.retrieval_top_k = retrieval_top_k
self.final_top_k = final_top_k
self.reranker_enabled = reranker_enabled
self.reranker_timeout_ms = reranker_timeout_ms
# Metriche (in produzione: Prometheus o simili)
self.metrics = {
"total_queries": 0,
"cache_hits": 0,
"reranker_timeouts": 0,
"sparse_failures": 0,
"avg_latency_ms": 0.0,
}
def _cache_key(self, query: str) -> str:
"""Hash deterministico della query per cache key."""
return hashlib.sha256(query.encode()).hexdigest()[:16]
def _get_cached(self, cache_key: str) -> Optional[list[dict]]:
"""Recupera dalla cache se non scaduta."""
if cache_key in self.cache:
cached_at, results = self.cache[cache_key]
if time.time() - cached_at < self.cache_ttl:
return results
del self.cache[cache_key]
return None
async def search(
self,
query: str,
use_cache: bool = True,
force_rerank: bool = False,
) -> dict:
"""
Esegue la pipeline completa con caching e monitoring.
Returns:
{
'results': list[dict],
'latency_ms': float,
'cache_hit': bool,
'reranked': bool,
'fallback_mode': bool,
}
"""
start = time.time()
self.metrics["total_queries"] += 1
cache_key = self._cache_key(query)
# Controlla cache
if use_cache:
cached = self._get_cached(cache_key)
if cached:
self.metrics["cache_hits"] += 1
return {
"results": cached,
"latency_ms": (time.time() - start) * 1000,
"cache_hit": True,
"reranked": False,
"fallback_mode": False,
}
fallback_mode = False
# Hybrid retrieval con fallback
try:
sparse_task = asyncio.create_task(
self.sparse.search(query, self.retrieval_top_k)
)
dense_task = asyncio.create_task(
self.dense.search(query, self.retrieval_top_k)
)
sparse_results, dense_results = await asyncio.gather(
sparse_task, dense_task
)
candidates = reciprocal_rank_fusion(
[sparse_results, dense_results], k=60
)[:self.retrieval_top_k]
except Exception as e:
logger.warning(f"Sparse retriever failed: {e}. Falling back to dense-only.")
self.metrics["sparse_failures"] += 1
fallback_mode = True
candidates = await self.dense.search(query, self.retrieval_top_k)
# Re-ranking opzionale con timeout
reranked = False
if self.reranker and (self.reranker_enabled or force_rerank) and candidates:
try:
rerank_start = time.time()
final_results = self.reranker.rerank(
query=query,
documents=candidates[:20], # Max 20 per contenere latenza
top_k=self.final_top_k,
)
rerank_elapsed = (time.time() - rerank_start) * 1000
if rerank_elapsed > self.reranker_timeout_ms:
logger.warning(f"Reranker slow: {rerank_elapsed:.0f}ms")
self.metrics["reranker_timeouts"] += 1
reranked = True
except Exception as e:
logger.error(f"Reranker failed: {e}. Using hybrid results.")
final_results = candidates[:self.final_top_k]
else:
final_results = candidates[:self.final_top_k]
# Aggiorna cache e metriche
self.cache[cache_key] = (time.time(), final_results)
latency = (time.time() - start) * 1000
self.metrics["avg_latency_ms"] = (
self.metrics["avg_latency_ms"] * 0.95 + latency * 0.05 # EMA
)
return {
"results": final_results,
"latency_ms": latency,
"cache_hit": False,
"reranked": reranked,
"fallback_mode": fallback_mode,
}
Osvědčené postupy a anti-vzorce
Vybudování účinného hybridního vyhledávacího systému vyžaduje vyhnout se některým běžným nástrahám které se objevují ve výrobě a nejsou patrné v základních výukových programech.
Best Practices Hybrid Retrieval
- Začněte s RRF k=60: Je to empiricky nejrobustnější výchozí nastavení. Experimentujte s jinými hodnotami až po stanovení základní linie pomocí metrik NDCG.
- top_k pro retrívra >= 3x final_top_k: Pokud chcete konečných 5 nejlepších, získat alespoň 15 z každého retrívra, aby měl fúze dostatek materiálu.
- Konzistentní tokenizace: Je nutné použít BM25 a model vkládání stejný kanál předběžného zpracování (malá písmena, ignorovaná slova, stemming) pro konzistenci.
- Křížový kodér na max. 20–50 dokumentech: Vydělává více než 50 kandidátů přesnost je marginální ve srovnání s vyššími náklady na latenci.
- Vyhodnoťte odděleně řídké a husté: Před integrací, měří metriky každé komponenty. Pokud je hustý retrívr již na 90 % NDCG@5, hybrid nemusí přidávat hodnotu pro váš konkrétní datový soubor.
- Normalizovaná mezipaměť na úrovni dotazu: Malá písmena a oříznutí dotazu před hashováním, aby se maximalizovala míra přístupu do mezipaměti.
Anti-vzory, kterým je třeba se vyhnout
- Kombinace nenormalizovaných skóre: "BM25_score + cosine_score" bez normalizace produkuje výsledky, kterým dominuje největší retrívr (téměř vždy BM25).
- Použijte reranker na všechny výsledky vyhledávání: Přehodnotit 200 dokumentů přidá 2-3 sekundy latence. Vždy omezte reranker na 20-50 kandidátů.
- Ignorování kvality bloku: Hybridní načítání neřeší bloky špatně tvarovaný (příliš krátký, odříznutý v polovině konceptu). Kvalita indexování je to základní předpoklad.
- Optimalizace bez testovacích sad: Změňte řídké/husté hmotnosti nebo k RRF bez měření na testovacím datovém souboru vede k nadměrnému přizpůsobení subjektivním dojmům.
- Neřešit záložní: Pokud index BM25 přejde do režimu offline, systém musí ladně degradovat na husté pouze, nehavarovat.
Když hybridní vyhledávání nestačí
Hybridní vyhledávání řeší mnoho problémů, ale ne všechny. Pokud po implementaci metriky vyhledávání jsou stále nedostatečné, zvažte tyto pokročilé cesty:
- HyDE (Hypothetical Document Embeddings): LLM generuje hypotetickou odpověď na dotaz, který je pak použit jako dotaz pro retriever. Zlepšuje sémantické vybavování na abstraktní nebo špatně vytvořené dotazy.
- Rozšíření dotazu: Generujte varianty dotazů (synonyma, přeformulování) pomocí LLM a proveďte vyhledávání na všech z nich, poté sloučte výsledky s RRF.
- MEČE: Naučený řídký model, který místo toho produkuje „chytré“ řídké vektory čistá termínová frekvence. Přesnější než BM25, ale vyžaduje odvození ML.
- ColBERT/ColPali: Model pozdní interakce, který porovnává každý token v dotazu s každý token dokumentu. Přesnost lepší než křížový kodér s latencí načítání (nikoli přehodnocením).
- GraphRAG: Rozšiřte vyhledávání vektorů pomocí grafu zachycování znalostí strukturované vztahy mezi entitami. Ideální pro otázky, které vyžadují víceskokové uvažování.
Závěry
Hybrid Retrieval je dnes standardní strategií pro produkční systémy RAG práce na heterogenních dotazech: od přesného odborného termínu po vágní koncepční otázku. Kombinace BM25 + dense s RRF poskytuje již tak velmi robustní základní linii, kterou Změna pořadí mezi kodérem vede k úrovním přesnosti, které je obtížné dosáhnout jednotlivé přístupy.
Klíčem k úspěšné implementaci je pořadí operací: Nejprve sestavit testovací sada se skutečnou základní pravdou vaší domény, stanovte samostatné základní linie pro BM25 e hustý, pak experimentujte s fúzí a změřte deltu. Pouze s konkrétními metrikami (NDCG@5, MRR) můžete říci, zda přidání rerankeru stojí za dalších 200 ms latence pro váš případ použití.
Další kroky
- Pokračujte s LangChain RAG Pipeline: Dokument k reakci integrovat tento retriever do kompletního potrubí s LLM.
- zákony RAG ve výrobě: monitorování, hodnocení, optimalizace pro kompletní rámec hodnocení a monitorování ve výrobě.
- Prozkoumat Vkládání a vyhledávání vektorů: BERT vs Sentence Transformers se dozvíte více o výběru optimálního hustého modelu pro vaši doménu.
- Zvážit pgvector a PostgreSQL AI pokud chcete implementovat hybridní vyhledávání přímo do vaší stávající databáze PostgreSQL.
Zdroje a reference
- Qdrant Hybrid Search Documentation - Query API a Sparse Vectors
- Cormack, Clarke, Buettcher (2009) – „Reciprocal Rank Fusion překonává Condorcet a individuální metody Rank Learning“
- Benchmark BEIR – Benchmark heterogenního vyhledávání
- dokumentace větných transformátorů/křížových kodérů
- MTEB (Massive Text Embedding Benchmark) – žebříček 2025







