Odzyskiwanie hybrydowe: połączenie BM25 i wyszukiwania wektorowego w celu uzyskania produkcyjnego RAG
Wyszukiwanie semantyczne z osadzaniem zrewolucjonizowało sposób wyszukiwania informacji w systemach RAG, kryje jednak podstawowe ograniczenie, które regularnie pojawia się w produkcji: jeśli użytkownik szuka „Porównanie wskaźnika halucynacji GPT-4 w trzecim kwartale 2024 r.”, model osadzania będzie wyszukiwał dokumenty semantycznie bliski koncepcji „halucynacji wzorców językowych”, ale może nie odzyskać dokumentu dokładnie ten, który zawiera ten konkretny ciąg tekstu. Z drugiej strony, wyszukiwanie słów kluczowych pozwala znaleźć dokładnie to zdanie, ale nie wie, że „kwestia faktów LLM” jest koncepcyjnie identyczna.
Il Odzyskiwanie hybrydowe narodził się właśnie po to, aby rozwiązać to napięcie. Łączenie badań rzadki (BM25 i warianty) z gęstym wyszukiwaniem (wyszukiwanie wektorowe), otrzymujesz system, który jest zarówno dokładny na dokładnych dopasowaniach opiera się na zrozumieniu semantycznym. Najnowsze badania pokazują że systemy hybrydowe poprawiają jakość wyszukiwania 48% w porównaniu do metod pojedynczych w benchmarkach BEIR i MTEB, ze szczególnie dużym wzrostem w zakresie zapytań technicznych, nazw własnych i specjalistyczna terminologia.
Ten artykuł zawiera szczegółowe techniczne omówienie architektury wyszukiwania hybrydowego: od BM25, jak to działa wewnętrznie, do metod fuzji (Reciprocal Rank Fusion, fuzja ważona), do ponownego rankingu z cross-enkoderem, aż do praktycznego wdrożenia z Qdrant i oceny za pomocą metryk NDCG/MRR. Celem jest zapewnienie narzędzi do budowania i optymalizowania działających potoków wyszukiwania w produkcji, a nie tylko w benchmarkach.
Czego się nauczysz
- Ograniczenia wyszukiwania wyłącznie semantycznego i dlaczego BM25 pozostaje aktualny w 2025 r
- Algorytm BM25: nasycenie częstotliwości terminów, ważenie IDF, normalizacja długości
- Hybrydowa architektura wyszukiwania: rzadka + gęsta równolegle
- Reciprocal Rank Fusion (RRF): formuła, implementacja i strojenie parametru k
- Fuzja wyników ważonych: normalizacja i równoważenie wkładów
- Ponowne rankingowanie międzykoderów: kiedy go używać i jak zoptymalizować opóźnienie w porównaniu z dokładnością
- Implementacja z wektorami rzadkimi Qdrant i API zapytań
- Metryki oceny: NDCG@k, MRR, Precision@k w przypadku wyszukiwania hybrydowego
- Rurociąg produkcyjny z buforowaniem, monitorowaniem i progresywną optymalizacją
Granice badań wyłącznie semantycznych
Wyszukiwanie semantyczne za pomocą gęstych wektorów jest skuteczne w uchwyceniu ukrytego znaczenia tekstu, ale ma luki strukturalne, które stają się widoczne w przypadku rzeczywistych zapytań produkcyjnych. Zrozumienie tych ograniczeń jest pierwszym krokiem do zrozumienia, dlaczego konieczne jest pobieranie hybrydowe a nie tylko opcjonalne w poważnych systemach RAG.
Głównym problemem jest to, co nazywają badacze niedopasowanie słownictwa: model osadzania jest szkolony w zakresie ogólnych dystrybucji tekstu i nie zawsze przechwytuje znaczenie określonych terminów technicznych, akronimów, nazw produktów, wersji oprogramowania lub kody identyfikacyjne. Model osadzania nie wie, że „MSMARCO-v2.1” odnosi się do zbioru danych specyficzny lub że „CVE-2024-4577” jest krytyczną luką w zabezpieczeniach PHP, chyba że została dostrojone w tej domenie.
Gdy wyszukiwanie semantyczne nie powiedzie się
- Zapytania z numerami wersji: „Python 3.12 asyncio.TaskGroup” a „wzorce asynchroniczne Pythona”
- Unikalne identyfikatory: Identyfikatory CVE, numery zamówień, kody podatkowe, ISBN
- Rzadkie akronimy: terminy domenowe, akronimy firm, kody regulacyjne
- Rzadkie imiona: nazwiska osób, małe firmy, lokalizacje geograficzne
- Bardzo krótkie zapytania: przy 1-2 tokenach osadzanie nie jest zbyt dyskryminujące
- Najnowsza terminologia techniczna: modele z granicami wiedzy nie znają nowych terminów
Drugim problemem jest kalibracja wyniku: wyniki podobieństwa gęstych wektorów (zazwyczaj podobieństwo cosinusowe w zakresie [-1, 1] lub nieograniczony iloczyn skalarny) nie mają semantyki absolutny. Dokument z wynikiem 0,85 niekoniecznie jest bardziej trafny niż dokument z wynikiem 0,82 cala różne konteksty. Utrudnia to porównywanie lub łączenie wyników z różnych systemów bez odpowiedniej normalizacji.
Wreszcie cierpi na tym wyszukiwanie semantyczne dryf semantyczny w przypadku zapytań niejednoznacznych: zapytanie takie jak „Java” w kontekście programowania może spowodować pobranie dokumentów dotyczących „wyspy Java” zwłaszcza jeśli kontekst dokumentu nie jest wystarczająco jasny dla modelu osadzania z bardzo krótkimi lub pozbawionymi kontekstu fragmentami tekstu.
BM25: Odświeżenie algorytmu podstawowego
BM25 (Best Match 25) to funkcja rankingowa opracowana w latach 90. XX wieku, która w 2025 r. pozostanie jeden z najskuteczniejszych algorytmów wyszukiwania informacji przy badaniu słów kluczowych. Zrozumienie jego wewnętrzne działanie jest niezbędne zarówno do prawidłowego korzystania z niego, jak i do zrozumienia, dlaczego doskonale uzupełnia wyszukiwanie semantyczne.
BM25 rozszerza TF-IDF o dwa kluczowe mechanizmy: termin nasycenie częstotliwości e normalizacja długości. Kompletna formuła punktacji dokumentu D w odniesieniu do zapytania Q z terminami {q1, ..., qn} oraz:
# Formula BM25 (pseudocodice matematico)
# score(D, Q) = sommatoria per qi in Q di:
# IDF(qi) * (TF(qi, D) * (k1 + 1)) / (TF(qi, D) + k1 * (1 - b + b * |D| / avgdl))
#
# Dove:
# IDF(qi) = log((N - df_i + 0.5) / (df_i + 0.5) + 1)
# TF(qi, D) = frequenza del termine qi nel documento D
# |D| = lunghezza del documento D in termini
# avgdl = lunghezza media dei documenti nella collection
# N = numero totale di documenti
# df_i = numero di documenti che contengono qi
# k1 = parametro di saturazione TF (default: 1.2-2.0)
# b = parametro di length normalization (default: 0.75)
# Implementazione Python con rank_bm25
from rank_bm25 import BM25Okapi
import nltk
from nltk.tokenize import word_tokenize
class BM25Retriever:
def __init__(self, corpus: list[str], k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
# Tokenizzazione e lowercase
self.tokenized_corpus = [
word_tokenize(doc.lower()) for doc in corpus
]
self.bm25 = BM25Okapi(self.tokenized_corpus, k1=k1, b=b)
self.corpus = corpus
def retrieve(self, query: str, top_k: int = 20) -> list[dict]:
tokenized_query = word_tokenize(query.lower())
scores = self.bm25.get_scores(tokenized_query)
# Crea lista di (index, score) ordinata per score decrescente
ranked = sorted(
enumerate(scores),
key=lambda x: x[1],
reverse=True
)[:top_k]
return [
{"doc_id": idx, "text": self.corpus[idx], "score": score}
for idx, score in ranked
if score > 0 # Filtra documenti senza match
]
# Uso pratico
corpus = [
"BM25 is a ranking function used in information retrieval",
"Vector search uses dense embeddings for semantic similarity",
"Hybrid search combines BM25 and vector search for better recall",
"Python asyncio enables concurrent programming",
]
retriever = BM25Retriever(corpus, k1=1.5, b=0.75)
results = retriever.retrieve("BM25 hybrid search retrieval", top_k=3)
for r in results:
print(f"Score: {r['score']:.4f} | Text: {r['text'][:60]}...")
Parametr k1 kontroluje nasycenie częstotliwości członu: przy niskim k1 (0,5), różnica między 1 a 2 wystąpieniami terminu ma prawie takie samo znaczenie jak różnica między 10 a 100 zdarzenia; przy wysokim k1 (2,0) TF nadal ma wpływ nawet na wysokie częstotliwości. Parametr b kontroluje wysokość kary nakładanej na długie dokumenty: b=0 wyłącza normalizacja długości, b=1 normalizuje ją całkowicie.
Często pomijanym aspektem BM25 jest to, że jego wyniki są takie nie ograniczony górnie: bardzo trafny dokument zawierający wiele wystąpień wyszukiwanego hasła może uzyskać wynik 10, 50 lub 100 w zależności od korpusu. Stwarza to bezpośredni problem ze zgodnością z wynikami podobieństwa cosinus gęstych wektorów, które znajdują się w [-1, 1]. Fuzja hybrydowa musi poradzić sobie z tą rozbieżnością.
Hybrydowa architektura wyszukiwania: jak je połączyć
Podstawowa architektura hybrydowego systemu wyszukiwania umożliwia równoległe wyszukiwanie rzadkie i gęste podczas osobnego indeksowania, a następnie łączy wyniki przed zwróceniem ich użytkownikowi (lub LLM w kontekście RAG). Istnieją trzy punkty architektoniczne, w których może nastąpić fuzja, z różnymi kompromisami:
- Wczesna fuzja (przed pobraniem): dokumenty są reprezentowane za pomocą wektora hybryda, która łączy rzadkie i gęste cechy przed indeksowaniem. Przykłady: SPLADE, ColBERT w trybie od końca do końca. Droższe pod względem indeksowania, ale bardziej spójne.
- Późna fuzja (po odzyskaniu): oba retrievery działają niezależnie na indeksach oddzielane, a wyniki łączone na poziomie rankingu. Jest to najbardziej powszechne i elastyczne podejście, umożliwia niezależną aktualizację komponentów.
- Etapy ponownego rankingu: oddzielny model (koder krzyżowy) zmienia kolejność wyników połączyły się w wyniku późnej fuzji. Dodaje opóźnienie, ale znacznie poprawia precyzję @ k.
# Architettura base hybrid retrieval con late fusion
from typing import Protocol
import asyncio
class Retriever(Protocol):
async def search(self, query: str, top_k: int) -> list[dict]:
"""Ritorna lista di {'doc_id': str, 'text': str, 'score': float}"""
...
class HybridRetriever:
def __init__(
self,
sparse_retriever: Retriever,
dense_retriever: Retriever,
fusion_method: str = "rrf", # "rrf" | "weighted" | "dbsf"
sparse_weight: float = 0.4,
dense_weight: float = 0.6,
top_k_per_retriever: int = 50, # Recupera più docs per la fusione
):
self.sparse = sparse_retriever
self.dense = dense_retriever
self.fusion_method = fusion_method
self.sparse_weight = sparse_weight
self.dense_weight = dense_weight
self.top_k_per_retriever = top_k_per_retriever
async def search(self, query: str, final_top_k: int = 10) -> list[dict]:
# Esecuzione parallela dei due retriever
sparse_results, dense_results = await asyncio.gather(
self.sparse.search(query, self.top_k_per_retriever),
self.dense.search(query, self.top_k_per_retriever)
)
if self.fusion_method == "rrf":
return self._rrf_fusion(sparse_results, dense_results, final_top_k)
elif self.fusion_method == "weighted":
return self._weighted_fusion(sparse_results, dense_results, final_top_k)
else:
raise ValueError(f"Unknown fusion method: {self.fusion_method}")
def _rrf_fusion(self, sparse: list[dict], dense: list[dict], k: int) -> list[dict]:
# Implementato nella sezione successiva
pass
def _weighted_fusion(self, sparse: list[dict], dense: list[dict], k: int) -> list[dict]:
# Implementato nella sezione successiva
pass
Fuzja wzajemnych rang (RRF)
RRF jest najczęściej używanym algorytmem fuzji w wyszukiwaniu hybrydowym ze względu na swoją prostotę i niezawodność i niezależność od skali punktacji. Pierwotnie zaproponowany przez Cormacka, Clarke'a i Buettchera w 2009 r. przypisuje każdemu dokumentowi ocenę wyłącznie na podstawie jego pozycji na liście rankingowej każdego retrievera, całkowicie ignorując bezwzględną wartość punktacji.
Wzór RRF na punktację dokumentu D występującego na listach L1, L2,..., Lm oraz:
RRF(D) = sumowanie dla i=1..m z: 1 / (k + rank_i(D))
Gdzie k jest stałą (zwykle 60), która tłumi wpływ dokumentów znajdujących się na górze list. Jeśli D nie pojawia się na liście i, jego wkład wynosi 0. Parametr k=60 został wyznaczony empirycznie: typowe wartości wahają się od 10 do 100.
# Implementazione completa RRF
from collections import defaultdict
def reciprocal_rank_fusion(
result_lists: list[list[dict]],
k: int = 60,
id_field: str = "doc_id"
) -> list[dict]:
"""
Fonde N liste di risultati usando Reciprocal Rank Fusion.
Args:
result_lists: Lista di liste, ognuna ordinata per relevance decrescente
k: Costante di smorzamento (default 60, range consigliato 10-100)
id_field: Campo usato come identificatore univoco del documento
Returns:
Lista fusa ordinata per RRF score decrescente
"""
rrf_scores = defaultdict(float)
doc_registry = {} # Mappa doc_id -> doc completo
for result_list in result_lists:
for rank, doc in enumerate(result_list, start=1):
doc_id = doc[id_field]
# Formula RRF: 1 / (k + rank)
rrf_scores[doc_id] += 1.0 / (k + rank)
# Salva il documento (usa l'ultimo trovato se compare più volte)
if doc_id not in doc_registry:
doc_registry[doc_id] = doc
# Ordina per RRF score decrescente
sorted_docs = sorted(
rrf_scores.items(),
key=lambda x: x[1],
reverse=True
)
# Costruisce la lista finale con score
return [
{**doc_registry[doc_id], "rrf_score": score}
for doc_id, score in sorted_docs
]
# Esempio di utilizzo
sparse_results = [
{"doc_id": "doc_A", "text": "BM25 text...", "score": 12.5},
{"doc_id": "doc_B", "text": "...", "score": 8.3},
{"doc_id": "doc_C", "text": "...", "score": 5.1},
]
dense_results = [
{"doc_id": "doc_C", "text": "...", "score": 0.92}, # Doc C primo in dense
{"doc_id": "doc_A", "text": "BM25 text...", "score": 0.88},
{"doc_id": "doc_D", "text": "...", "score": 0.85}, # Solo in dense
]
fused = reciprocal_rank_fusion(
[sparse_results, dense_results],
k=60
)
# RRF scores:
# doc_A: 1/(60+1) + 1/(60+2) = 0.01639 + 0.01613 = 0.03252
# doc_C: 1/(60+3) + 1/(60+1) = 0.01587 + 0.01639 = 0.03226
# doc_B: 1/(60+2) = 0.01613
# doc_D: 1/(60+3) = 0.01587
for doc in fused:
print(f"{doc['doc_id']}: RRF={doc['rrf_score']:.5f}")
Siła RRF tkwi w jego odporność na wartości odstające: To nie ma znaczenia jeśli BM25 przyzna pierwszemu dokumentowi ocenę 100, drugiemu 50, zaś oceny podobieństwa wynoszą 0,99 i 0,97. Jedyne, co się liczy, to względne położenie. Dzięki temu jest szczególnie odpowiedni, gdy oba retrievery mają zupełnie inną skalę punktacji.
Parametr k wpływa na wagę dokumentów znajdujących się na wysokich pozycjach w porównaniu z osobami na niższych stanowiskach. Przy k=60 dokument o randze 1 otrzymuje 1/61 = 0,0164, jeden na pozycji 60 otrzymuje 1/120 = 0,0083: pierwszy jest wart mniej niż dwukrotnie więcej niż ostatni. Przy k=10 pierwsza (1/11 = 0,091) jest warta prawie 7 razy sześćdziesiąta (1/70 = 0,014): ranking plus „zwycięzca bierze wszystko”. W większości przypadków dobrym punktem wyjścia jest k=60.
Fuzja ważonych wyników z normalizacją
Ważona fuzja łączy wyniki bezwzględne zamiast rang, co pozwala kontrolować, ile waga, jaką należy nadać każdemu retrieverowi. Głównym problemem jest normalizacja wyników: Podobieństwo BM25 i cosinus żyją w zupełnie innych skalach, stąd bezpośrednie połączenie („bm25_score * 0,4 + gęsty_score * 0,6”) bez normalizacji nie ma znaczenia.
# Weighted fusion con normalizzazione Min-Max e normalizzazione Z-score
import numpy as np
from typing import Optional
def min_max_normalize(scores: list[float]) -> list[float]:
"""Normalizza scores in [0, 1] usando Min-Max scaling."""
if not scores:
return []
min_val = min(scores)
max_val = max(scores)
if max_val == min_val:
return [1.0] * len(scores) # Tutti uguali -> tutti 1.0
return [(s - min_val) / (max_val - min_val) for s in scores]
def dbsf_normalize(scores: list[float]) -> list[float]:
"""
Distribution-Based Score Fusion (DBSF) normalization.
Usa mean e std per normalizzazione più robusta agli outlier.
"""
if not scores:
return []
mean = np.mean(scores)
std = np.std(scores)
if std == 0:
return [0.5] * len(scores)
# Clamp tra 0 e 1 dopo la trasformazione
normalized = [(s - mean) / (3 * std) + 0.5 for s in scores]
return [max(0.0, min(1.0, n)) for n in normalized]
def weighted_fusion(
sparse_results: list[dict],
dense_results: list[dict],
sparse_weight: float = 0.3,
dense_weight: float = 0.7,
normalization: str = "minmax", # "minmax" | "dbsf"
top_k: Optional[int] = None,
id_field: str = "doc_id"
) -> list[dict]:
"""
Combina risultati sparse e dense con weighted fusion normalizzata.
"""
# Costruisce dizionari per lookup veloce
sparse_map = {d[id_field]: d for d in sparse_results}
dense_map = {d[id_field]: d for d in dense_results}
# Tutti i doc_ids unici
all_ids = set(sparse_map.keys()) | set(dense_map.keys())
# Normalizza gli score di ciascun retriever
normalize_fn = min_max_normalize if normalization == "minmax" else dbsf_normalize
if sparse_results:
sparse_scores_norm = dict(zip(
[d[id_field] for d in sparse_results],
normalize_fn([d["score"] for d in sparse_results])
))
else:
sparse_scores_norm = {}
if dense_results:
dense_scores_norm = dict(zip(
[d[id_field] for d in dense_results],
normalize_fn([d["score"] for d in dense_results])
))
else:
dense_scores_norm = {}
# Calcola score combinato
fused_docs = []
for doc_id in all_ids:
sparse_score = sparse_scores_norm.get(doc_id, 0.0)
dense_score = dense_scores_norm.get(doc_id, 0.0)
combined_score = sparse_weight * sparse_score + dense_weight * dense_score
# Prende il doc dal retriever che lo ha trovato
doc = sparse_map.get(doc_id) or dense_map.get(doc_id)
fused_docs.append({
**doc,
"combined_score": combined_score,
"sparse_score_norm": sparse_score,
"dense_score_norm": dense_score,
})
# Ordina per combined score
fused_docs.sort(key=lambda x: x["combined_score"], reverse=True)
return fused_docs[:top_k] if top_k else fused_docs
# Quando usare weighted vs RRF:
# - RRF: quando i retriever hanno scale molto diverse, come punto di partenza
# - Weighted + DBSF: quando vuoi controllare il bilanciamento sparse/dense
# basandoti su metriche di evaluation del tuo specifico dataset
# - Weighted + MinMax: più semplice, sensibile agli outlier di score
Zmiana rankingu za pomocą Cross-Encodera
Fuzja (RRF lub ważona) tworzy listę kandydatów uporządkowaną według szacowanej istotności. Ale używają zarówno BM25, jak i gęstych retrieverów bi-enkoder: zapytanie i dokument przychodzą kodowane oddzielnie, a podobieństwo jest obliczane post hoc. Jest to skuteczne, ale nieszczelne szczegółowe interakcje między zapytaniem a dokumentem.
I koder krzyżowy wspólnie przetwarzają zapytanie i dokument poprzez model transformator, umożliwiający mechanizmowi samouważności wychwytywanie bezpośrednich interakcji między tokenami zapytań a tokenami dokumentów. Rezultatem jest wynik trafności znacznie dokładniejsze, ale przy koszcie obliczeniowym proporcjonalnym do liczby par (zapytanie, dokument) do oceny.
# Cross-encoder re-ranking con sentence-transformers
from sentence_transformers import CrossEncoder
import time
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class CrossEncoderReranker:
"""
Re-ranker basato su cross-encoder per la fase di precision refinement.
Usa modello ms-marco per ranking di rilevanza query-documento.
"""
def __init__(
self,
model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2",
max_length: int = 512,
batch_size: int = 32,
device: Optional[str] = None, # None = auto-detect GPU/CPU
):
self.model = CrossEncoder(
model_name,
max_length=max_length,
device=device
)
self.batch_size = batch_size
logger.info(f"CrossEncoder loaded: {model_name}")
def rerank(
self,
query: str,
documents: list[dict],
text_field: str = "text",
top_k: Optional[int] = None,
) -> list[dict]:
"""
Re-ordina i documenti usando il cross-encoder.
Args:
query: Query originale dell'utente
documents: Lista di documenti da ri-ordinare (output del hybrid retriever)
text_field: Campo del documento che contiene il testo
top_k: Ritorna solo i top_k più rilevanti
Returns:
Documenti ri-ordinati con campo "rerank_score" aggiunto
"""
if not documents:
return []
start_time = time.time()
# Crea coppie (query, document_text) per il cross-encoder
query_doc_pairs = [
(query, doc[text_field]) for doc in documents
]
# Inferenza in batch per efficienza
scores = self.model.predict(
query_doc_pairs,
batch_size=self.batch_size,
show_progress_bar=False,
)
elapsed = time.time() - start_time
logger.debug(
f"Cross-encoder scored {len(documents)} docs in {elapsed:.3f}s "
f"({elapsed/len(documents)*1000:.1f}ms/doc)"
)
# Aggiunge score e ri-ordina
reranked = [
{**doc, "rerank_score": float(score)}
for doc, score in zip(documents, scores)
]
reranked.sort(key=lambda x: x["rerank_score"], reverse=True)
return reranked[:top_k] if top_k else reranked
# Pipeline completa: Hybrid Retrieval + Cross-Encoder Reranking
class RAGRetrievalPipeline:
def __init__(
self,
hybrid_retriever: HybridRetriever,
reranker: CrossEncoderReranker,
retrieval_top_k: int = 50, # Recupera molti per il reranker
final_top_k: int = 5, # Top-K finale per il LLM context
):
self.hybrid_retriever = hybrid_retriever
self.reranker = reranker
self.retrieval_top_k = retrieval_top_k
self.final_top_k = final_top_k
async def retrieve_for_llm(self, query: str) -> list[dict]:
"""
Pipeline completa: hybrid retrieval -> cross-encoder reranking.
Ottimizzato per massimizzare precision@5 (i 5 docs passati al LLM).
"""
# Step 1: Hybrid retrieval con largo top_k per il reranker
candidates = await self.hybrid_retriever.search(
query, final_top_k=self.retrieval_top_k
)
if not candidates:
return []
# Step 2: Re-ranking con cross-encoder
# Il reranker opera su self.retrieval_top_k docs, tipicamente 20-50
reranked = self.reranker.rerank(
query=query,
documents=candidates,
top_k=self.final_top_k
)
return reranked
# Performance tipica (GPU T4):
# - Hybrid retrieval (BM25 + HNSW): ~10-20ms
# - Cross-encoder reranking (20 docs): ~80-120ms
# - Cross-encoder reranking (50 docs): ~200-350ms
# Totale pipeline: ~100-370ms a seconda del top_k del reranker
Polecane modele cross-enkoderów (2025)
- koder krzyżowy/ms-marco-MiniLM-L-6-v2: Optymalny balans prędkości/dokładności. MAPA 0.82 na MS MARCO. ~12ms/doc na GPU. Idealny do produkcji.
- koder krzyżowy/ms-marco-MiniLM-L-12-v2: Dokładniejszy, ~2x wolniejszy. W przypadku zapytań o wysokim priorytecie.
- BAAI/bge-reranker-v2-m3: Wielojęzyczny, doskonały do języka włoskiego. Obsługuje do 8192 tokenów. Polecane dla RAG w języku włoskim.
- Cohere Rerank API: Rozwiązanie zarządzane, opóźnienie ~50 ms, doskonała dokładność. Koszt zapytania. Idealne do szybkiego sprawdzenia koncepcji.
- Jina Reranker v2: Open-source, kontekst tokena 8192, doskonały w tekstach technicznych.
Implementacja przy użyciu wektorów Qdrant Sparse + Dense
Qdrant natywnie obsługuje wyszukiwanie hybrydowe poprzez swoje Zapytanie o API z rzadkimi wektorami i mechanizmem pobierz z wyprzedzeniem. W odróżnieniu od rozwiązań wymagając oddzielnych systemów dla rzadkich i gęstych, Qdrant obsługuje oba w jednym kolekcji, znacznie upraszczając architekturę.
# Qdrant Hybrid Search: setup e query completa
from qdrant_client import QdrantClient
from qdrant_client import models
from fastembed import TextEmbedding, SparseTextEmbedding
import numpy as np
# Inizializzazione client e modelli di embedding
client = QdrantClient("localhost", port=6333)
# Modello dense: all-MiniLM-L6-v2 (384 dim, veloce)
dense_model = TextEmbedding("sentence-transformers/all-MiniLM-L6-v2")
# Modello sparse: BM25 tramite FastEmbed
sparse_model = SparseTextEmbedding("Qdrant/bm25")
COLLECTION_NAME = "hybrid_rag_collection"
def create_hybrid_collection():
"""Crea collection con supporto sparse + dense vectors."""
client.create_collection(
collection_name=COLLECTION_NAME,
vectors_config={
"dense": models.VectorParams(
size=384, # Dimensione all-MiniLM-L6-v2
distance=models.Distance.COSINE,
on_disk=False, # In-memory per latenza bassa
)
},
sparse_vectors_config={
"sparse": models.SparseVectorParams(
modifier=models.Modifier.IDF, # BM25-style IDF weighting
)
},
optimizers_config=models.OptimizersConfigDiff(
indexing_threshold=20000, # Inizia HNSW dopo 20k vettori
),
)
print(f"Collection '{COLLECTION_NAME}' creata con sparse + dense support")
def index_documents(documents: list[dict]):
"""
Indicizza documenti con vettori dense e sparse.
Args:
documents: Lista di {'id': str, 'text': str, 'metadata': dict}
"""
texts = [doc["text"] for doc in documents]
# Genera dense embeddings in batch
dense_embeddings = list(dense_model.embed(texts))
# Genera sparse embeddings (BM25) in batch
sparse_embeddings = list(sparse_model.embed(texts))
# Prepara i points per Qdrant
points = []
for i, doc in enumerate(documents):
sparse_emb = sparse_embeddings[i]
points.append(
models.PointStruct(
id=i, # Usa ID numerico o UUID
payload={
"text": doc["text"],
**doc.get("metadata", {})
},
vector={
"dense": dense_embeddings[i].tolist(),
"sparse": models.SparseVector(
indices=sparse_emb.indices.tolist(),
values=sparse_emb.values.tolist(),
)
}
)
)
# Upsert in batch
client.upsert(
collection_name=COLLECTION_NAME,
points=points,
wait=True
)
print(f"Indicizzati {len(documents)} documenti")
def hybrid_search_qdrant(
query: str,
top_k: int = 10,
prefetch_k: int = 50,
fusion: str = "rrf", # "rrf" | "dbsf"
) -> list[dict]:
"""
Esegue hybrid search con Qdrant Query API.
Usa il meccanismo di prefetch: recupera top_k*5 candidati da ogni
retriever, poi li fonde con RRF o DBSF.
"""
# Genera query embeddings
query_dense = list(dense_model.embed([query]))[0].tolist()
query_sparse_emb = list(sparse_model.embed([query]))[0]
query_sparse = models.SparseVector(
indices=query_sparse_emb.indices.tolist(),
values=query_sparse_emb.values.tolist(),
)
# Fusion method
fusion_model = (
models.Fusion.RRF if fusion == "rrf"
else models.Fusion.DBSF
)
# Query API con prefetch (hybrid search nativo Qdrant)
results = client.query_points(
collection_name=COLLECTION_NAME,
prefetch=[
# Prefetch BM25 sparse
models.Prefetch(
query=query_sparse,
using="sparse",
limit=prefetch_k,
),
# Prefetch dense semantic
models.Prefetch(
query=query_dense,
using="dense",
limit=prefetch_k,
),
],
query=models.FusionQuery(fusion=fusion_model),
limit=top_k,
with_payload=True,
)
return [
{
"doc_id": str(point.id),
"text": point.payload.get("text", ""),
"score": point.score,
"payload": point.payload
}
for point in results.points
]
# Esempio di utilizzo completo
if __name__ == "__main__":
# Crea la collection
create_hybrid_collection()
# Indicizza documenti di esempio
sample_docs = [
{"id": "1", "text": "Qdrant is a vector database optimized for ANN search with filtering", "metadata": {"category": "vectordb"}},
{"id": "2", "text": "BM25 algorithm for information retrieval with term frequency saturation", "metadata": {"category": "ir"}},
{"id": "3", "text": "Hybrid search combines sparse BM25 and dense vector embeddings", "metadata": {"category": "hybrid"}},
{"id": "4", "text": "Reciprocal Rank Fusion merges multiple ranked lists into a single ranking", "metadata": {"category": "fusion"}},
]
index_documents(sample_docs)
# Esegui query ibrida
query = "BM25 hybrid vector search fusion"
results = hybrid_search_qdrant(query, top_k=3, prefetch_k=20, fusion="rrf")
print(f"\nRisultati per: '{query}'")
for r in results:
print(f" Score: {r['score']:.4f} | {r['text'][:70]}...")
Ocena: NDCG, MRR i Precision@k
Budowa hybrydowego systemu wyszukiwania bez ram ewaluacyjnych i budowanie bez pomiarów. Przed optymalizacją dowolnego parametru (k RRF, waga rzadka/gęsta, próg zmiany rankingu), you need a test dataset with ground truth and defined metrics. Trzy najważniejsze wskaźniki do wyszukiwania są to NDCG, MRR i Precision@k.
# Framework di evaluation per hybrid retrieval
import numpy as np
from typing import Optional
def ndcg_at_k(
retrieved_ids: list[str],
relevant_ids: list[str],
k: int,
relevance_grades: Optional[dict] = None
) -> float:
"""
Normalized Discounted Cumulative Gain @k.
Misura la qualità del ranking considerando la posizione dei documenti rilevanti.
Valori in [0, 1], 1 = ranking perfetto.
Args:
retrieved_ids: Lista di doc_id nell'ordine recuperato
relevant_ids: Lista di doc_id rilevanti (ground truth)
k: Cut-off
relevance_grades: Dizionario {doc_id: grade} per rilevanza graduata (opzionale)
"""
relevant_set = set(relevant_ids)
top_k = retrieved_ids[:k]
# DCG: Discounted Cumulative Gain
dcg = 0.0
for i, doc_id in enumerate(top_k):
if relevance_grades:
grade = relevance_grades.get(doc_id, 0)
else:
grade = 1.0 if doc_id in relevant_set else 0.0
# Formula: rel_i / log2(i + 2) (i 0-indexed, log2(2) per i=0)
dcg += grade / np.log2(i + 2)
# IDCG: Ideal DCG (ranking perfetto)
if relevance_grades:
ideal_grades = sorted(
[relevance_grades.get(rid, 0) for rid in relevant_ids],
reverse=True
)[:k]
else:
ideal_grades = [1.0] * min(len(relevant_ids), k)
idcg = sum(
grade / np.log2(i + 2)
for i, grade in enumerate(ideal_grades)
)
return dcg / idcg if idcg > 0 else 0.0
def mrr(retrieved_results: list[list[str]], relevant_results: list[list[str]]) -> float:
"""
Mean Reciprocal Rank.
Media del reciproco del rank del primo risultato rilevante.
"""
reciprocal_ranks = []
for retrieved, relevant in zip(retrieved_results, relevant_results):
relevant_set = set(relevant)
rr = 0.0
for rank, doc_id in enumerate(retrieved, start=1):
if doc_id in relevant_set:
rr = 1.0 / rank
break
reciprocal_ranks.append(rr)
return np.mean(reciprocal_ranks)
def evaluate_retriever(
retriever_fn, # Funzione che prende query e ritorna lista di doc_id
test_queries: list[dict], # Lista di {'query': str, 'relevant_ids': list[str]}
k_values: list[int] = [1, 3, 5, 10]
) -> dict:
"""
Valuta un retriever su un test set con più metriche.
"""
all_retrieved = []
all_relevant = []
ndcg_scores = {k: [] for k in k_values}
for item in test_queries:
query = item["query"]
relevant_ids = item["relevant_ids"]
# Recupera risultati
results = retriever_fn(query, top_k=max(k_values))
retrieved_ids = [r["doc_id"] for r in results]
all_retrieved.append(retrieved_ids)
all_relevant.append(relevant_ids)
# Calcola NDCG per ogni k
for k in k_values:
ndcg = ndcg_at_k(retrieved_ids, relevant_ids, k)
ndcg_scores[k].append(ndcg)
# Aggrega metriche
metrics = {
"MRR": mrr(all_retrieved, all_relevant),
}
for k in k_values:
metrics[f"NDCG@{k}"] = np.mean(ndcg_scores[k])
return metrics
# Esempio: confronto sparse-only vs dense-only vs hybrid
def run_ablation_study(test_queries, sparse_retriever, dense_retriever, hybrid_retriever):
print("=== Ablation Study: Retrieval Methods ===\n")
for name, retriever in [
("BM25 only", sparse_retriever.retrieve),
("Dense only", dense_retriever.search),
("Hybrid RRF", hybrid_retriever.search),
]:
metrics = evaluate_retriever(retriever, test_queries)
print(f"{name}:")
for metric, value in metrics.items():
print(f" {metric}: {value:.4f}")
print()
Rurociąg produkcyjny
Hybrydowy potok pobierania w środowisku produkcyjnym musi obsługiwać więcej aspektów niż tylko to poprawność: utajenie (Typowy SLA: p95 poniżej 500ms), buforowanie w przypadku częstych zapytań, monitorowanie jakości w czasie, np pełna wdzięku degradacja gdy jeden z elementów ulegnie awarii.
# Pipeline di produzione con caching, monitoring e fallback
import asyncio
import hashlib
import json
import time
from functools import lru_cache
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class ProductionHybridPipeline:
"""
Pipeline hybrid retrieval production-ready con:
- Cache query risultati (TTL configurabile)
- Metriche latenza per monitoring
- Fallback a dense-only se sparse non disponibile
- Circuit breaker per reranker
"""
def __init__(
self,
sparse_retriever,
dense_retriever,
reranker: Optional[CrossEncoderReranker] = None,
cache_ttl_seconds: int = 300,
retrieval_top_k: int = 50,
final_top_k: int = 5,
reranker_enabled: bool = True,
reranker_timeout_ms: float = 500,
):
self.sparse = sparse_retriever
self.dense = dense_retriever
self.reranker = reranker
self.cache: dict = {}
self.cache_ttl = cache_ttl_seconds
self.retrieval_top_k = retrieval_top_k
self.final_top_k = final_top_k
self.reranker_enabled = reranker_enabled
self.reranker_timeout_ms = reranker_timeout_ms
# Metriche (in produzione: Prometheus o simili)
self.metrics = {
"total_queries": 0,
"cache_hits": 0,
"reranker_timeouts": 0,
"sparse_failures": 0,
"avg_latency_ms": 0.0,
}
def _cache_key(self, query: str) -> str:
"""Hash deterministico della query per cache key."""
return hashlib.sha256(query.encode()).hexdigest()[:16]
def _get_cached(self, cache_key: str) -> Optional[list[dict]]:
"""Recupera dalla cache se non scaduta."""
if cache_key in self.cache:
cached_at, results = self.cache[cache_key]
if time.time() - cached_at < self.cache_ttl:
return results
del self.cache[cache_key]
return None
async def search(
self,
query: str,
use_cache: bool = True,
force_rerank: bool = False,
) -> dict:
"""
Esegue la pipeline completa con caching e monitoring.
Returns:
{
'results': list[dict],
'latency_ms': float,
'cache_hit': bool,
'reranked': bool,
'fallback_mode': bool,
}
"""
start = time.time()
self.metrics["total_queries"] += 1
cache_key = self._cache_key(query)
# Controlla cache
if use_cache:
cached = self._get_cached(cache_key)
if cached:
self.metrics["cache_hits"] += 1
return {
"results": cached,
"latency_ms": (time.time() - start) * 1000,
"cache_hit": True,
"reranked": False,
"fallback_mode": False,
}
fallback_mode = False
# Hybrid retrieval con fallback
try:
sparse_task = asyncio.create_task(
self.sparse.search(query, self.retrieval_top_k)
)
dense_task = asyncio.create_task(
self.dense.search(query, self.retrieval_top_k)
)
sparse_results, dense_results = await asyncio.gather(
sparse_task, dense_task
)
candidates = reciprocal_rank_fusion(
[sparse_results, dense_results], k=60
)[:self.retrieval_top_k]
except Exception as e:
logger.warning(f"Sparse retriever failed: {e}. Falling back to dense-only.")
self.metrics["sparse_failures"] += 1
fallback_mode = True
candidates = await self.dense.search(query, self.retrieval_top_k)
# Re-ranking opzionale con timeout
reranked = False
if self.reranker and (self.reranker_enabled or force_rerank) and candidates:
try:
rerank_start = time.time()
final_results = self.reranker.rerank(
query=query,
documents=candidates[:20], # Max 20 per contenere latenza
top_k=self.final_top_k,
)
rerank_elapsed = (time.time() - rerank_start) * 1000
if rerank_elapsed > self.reranker_timeout_ms:
logger.warning(f"Reranker slow: {rerank_elapsed:.0f}ms")
self.metrics["reranker_timeouts"] += 1
reranked = True
except Exception as e:
logger.error(f"Reranker failed: {e}. Using hybrid results.")
final_results = candidates[:self.final_top_k]
else:
final_results = candidates[:self.final_top_k]
# Aggiorna cache e metriche
self.cache[cache_key] = (time.time(), final_results)
latency = (time.time() - start) * 1000
self.metrics["avg_latency_ms"] = (
self.metrics["avg_latency_ms"] * 0.95 + latency * 0.05 # EMA
)
return {
"results": final_results,
"latency_ms": latency,
"cache_hit": False,
"reranked": reranked,
"fallback_mode": fallback_mode,
}
Najlepsze praktyki i anty-wzorce
Budowa skutecznego hybrydowego systemu wyszukiwania wymaga uniknięcia pewnych typowych pułapek które pojawiają się w środowisku produkcyjnym i nie są widoczne w podstawowych tutorialach.
Najlepsze praktyki w zakresie odzyskiwania hybrydowego
- Zacznij od RRF k=60: Jest to najbardziej solidna empirycznie wartość domyślna. Eksperymentuj z innymi wartościami dopiero po ustaleniu wartości bazowej za pomocą wskaźników NDCG.
- top_k dla retrievera >= 3x final_top_k: Jeśli chcesz finałową piątkę, odzyskać co najmniej 15 sztuk od każdego retrievera, aby zapewnić fuzji wystarczającą ilość materiału.
- Spójna tokenizacja: Należy zastosować BM25 i model osadzania ten sam potok przetwarzania wstępnego (małe litery, słowa blokujące, słowa macierzyste) w celu zapewnienia spójności.
- Koder krzyżowy dla maksymalnie 20–50 dokumentów: Zarabia ponad 50 kandydatów precyzja jest marginalna w porównaniu z dodatkowym kosztem opóźnienia.
- Oceń osobno rzadkie i gęste: Przed integracją mierzy metryki każdego komponentu. Jeśli gęsty retriever ma już 90% NDCG@5, hybryda może nie dodawać wartości do konkretnego zbioru danych.
- Znormalizowana pamięć podręczna na poziomie zapytania: Małe litery i przycięcie zapytania przed mieszaniem, aby zmaksymalizować współczynnik trafień w pamięci podręcznej.
Anty-wzorce, których należy unikać
- Łączenie nieznormalizowanych wyników: „BM25_score + cosinus_score” bez normalizacja daje wyniki zdominowane przez aportera o największej skali (prawie zawsze BM25).
- Użyj rerankera dla wszystkich wyników wyszukiwania: Zmień rangę 200 dokumentów dodaje 2-3 sekundy opóźnienia. Zawsze ograniczaj osobę przeprowadzającą reranking do 20–50 kandydatów.
- Ignorowanie jakości fragmentów: Pobieranie hybrydowe nie rozwiązuje fragmentów źle sformułowany (zbyt krótki, ucięty w połowie koncepcji). Jakość indeksowania jest to podstawowy warunek wstępny.
- Optymalizacja bez zestawów testowych: Zmień rzadkie/gęste wagi lub k RRF bez pomiaru na testowym zbiorze danych prowadzi do nadmiernego dopasowania do subiektywnych wrażeń.
- Nie obsługuj powrotu: Jeśli indeks BM25 przejdzie w tryb offline, system musi to zrobić degraduj się z wdziękiem do stanu gęstego, nie rozbijaj się.
Kiedy pobieranie hybrydowe nie wystarczy
Odzyskiwanie hybrydowe rozwiązuje wiele problemów, ale nie wszystkie. Jeśli po wdrożeniu metryki pobierania są nadal niewystarczające, rozważ te zaawansowane ścieżki:
- HyDE (hipotetyczne osadzanie dokumentów): LLM generuje hipotetyczną odpowiedź do zapytania, które jest następnie używane jako zapytanie dla modułu pobierającego. Poprawia zapamiętywanie semantyczne w przypadku abstrakcyjnych lub źle sformułowanych zapytań.
- Rozszerzenie zapytania: Generuj warianty zapytań (synonimy, przeformułowania) za pomocą LLM i wykonaj pobieranie na wszystkich, a następnie połącz wyniki z RRF.
- MIECZE: Nauczono się modelu rzadkiego, który zamiast tego generuje „inteligentne” wektory rzadkie czysta częstotliwość terminów. Bardziej dokładny niż BM25, ale wymaga wnioskowania ML.
- ColBERT/ColPali: Model późnej interakcji, który porównuje każdy token w zapytaniu z każdy żeton dokumentu. Dokładność lepsza niż w przypadku kodera krzyżowego z opóźnieniem pobierania (bez zmiany rankingu).
- WykresRAG: Wzbogać wyszukiwanie wektorów za pomocą przechwytującego wykresu wiedzy ustrukturyzowane relacje między podmiotami. Idealny w przypadku pytań wymagających rozumowania wieloprzeskokowego.
Wnioski
Odzyskiwanie hybrydowe jest obecnie standardową strategią dla produkcyjnych systemów RAG praca nad heterogenicznymi zapytaniami: od dokładnego terminu technicznego po niejasne pytanie pojęciowe. Połączenie BM25 + gęsty z RRF zapewnia już bardzo solidną linię bazową, która Zmiana rankingu między koderami prowadzi do poziomów precyzji, które są trudne do osiągnięcia pojedyncze podejścia.
Kluczem do jego pomyślnego wdrożenia jest kolejność działań: najpierw buduj zestaw testowy z rzeczywistą prawdą o Twojej domenie, ustal osobne wartości bazowe dla BM25 e gęsty, następnie poeksperymentuj z syntezą i zmierz deltę. Tylko z konkretnymi wskaźnikami (NDCG@5, MRR) możesz stwierdzić, czy dodanie modułu rerankingu jest warte dodatkowych 200 ms opóźnienia w twoim przypadku użycia.
Następne kroki
- Kontynuuj z Rurociąg LangChain RAG: Dokument do odpowiedzi zintegrować tego retrievera z kompletnym rurociągiem z LLM.
- Prawa RAG w produkcji: monitorowanie, ocena, optymalizacja dla kompletnych ram oceny i monitorowania w produkcji.
- Badać Osadzanie i wyszukiwanie wektorów: BERT kontra transformatory zdań aby dowiedzieć się więcej o wyborze optymalnego modelu gęstego dla Twojej domeny.
- Rozważać pgvector i PostgreSQL AI jeśli chcesz zaimplementować wyszukiwanie hybrydowe bezpośrednio w istniejącej bazie danych PostgreSQL.
Zasoby i referencje
- Dokumentacja wyszukiwania hybrydowego Qdrant — interfejs API zapytań i wektory rzadkie
- Cormack, Clarke, Buettcher (2009) - „Wzajemna fuzja rang przewyższa metody uczenia się Condorceta i indywidualnych rang”
- Benchmark BEIR — test porównawczy heterogenicznego wyszukiwania
- dokumentacja transformatorów zdań/cross-enkodera
- MTEB (test porównawczy ogromnego osadzania tekstu) — tabela wyników 2025







