RAG with PostgreSQL: From Document to Answer
Have you ever wanted your AI system to answer questions based on your company's specific documents, without training a custom model? The solution is called Retrieval-Augmented Generation (RAG), and it is one of the most powerful and practical architectures in modern AI. PostgreSQL with pgvector is one of the best tools available to implement it.
RAG combines two complementary capabilities: semantic search (finding the most relevant documents for a question) with natural language generation (producing a coherent answer grounded in those documents). The result is a system that responds with knowledge from your own data, not with the general knowledge of a pre-trained model.
In this article, we will build a complete end-to-end RAG pipeline: from document ingestion to GPT-4-powered query answering, all running on PostgreSQL. No additional vector database, no external vector store service.
Series Overview
| # | Article | Focus |
|---|---|---|
| 1 | pgvector | Installation, operators, indexing |
| 2 | Embeddings Deep Dive | Models, distances, generation |
| 3 | You are here - RAG with PostgreSQL | End-to-end RAG pipeline |
| 4 | Similarity Search | Algorithms and optimization |
| 5 | HNSW and IVFFlat | Advanced indexing strategies |
| 6 | RAG in Production | Scalability and performance |
What You Will Learn
- The complete architecture of a RAG system: components and data flow
- Document ingestion pipeline: loading, parsing, chunking
- Storage strategy in PostgreSQL with pgvector
- Retrieval: from query to selecting the most relevant chunks
- Generation: how to build the prompt and integrate with GPT-4
- Hybrid search: combining vector search and PostgreSQL full-text search
- RAG quality evaluation: metrics and tools
RAG Architecture: How It Works
A RAG system has two main phases that operate at different times:
Phase 1: Ingestion (offline)
This runs once (or periodically when documents change). The steps are:
- Load: Load documents from filesystem, URLs, databases, APIs
- Parse: Extract text from PDF, DOCX, HTML, Markdown
- Chunk: Split text into optimally-sized fragments
- Embed: Generate an embedding vector for each chunk
- Store: Save chunk + embedding + metadata in PostgreSQL
Phase 2: Retrieval + Generation (online, for each query)
- Query: The user asks a question in natural language
- Embed query: Transform the question into a vector using the same model
- Search: Find the k most similar chunks in PostgreSQL
- Context: Assemble the found chunks as context
- Generate: Send question + context to the LLM to get an answer
## RAG Flow Diagram
INGESTION (offline):
PDF Document
|
v
[Parser] -> Raw text
|
v
[Chunker] -> ["chunk 1", "chunk 2", ..., "chunk N"]
|
v
[Embedding Model] -> [[0.023, -0.841, ...], [0.891, 0.234, ...], ...]
|
v
[PostgreSQL + pgvector] -> Permanent storage
QUERY (online):
User question: "How does HNSW indexing work?"
|
v
[Embedding Model] -> [0.045, -0.823, ...] (query vector)
|
v
[PostgreSQL ANN Search] -> Top 5 most similar chunks
|
v
[Prompt Builder] -> "Use this context: [chunk1, chunk2, ...] Question: ..."
|
v
[GPT-4 / Claude] -> "HNSW (Hierarchical Navigable Small World) indexing ..."
|
v
Answer to user
Project Setup
Dependencies
# requirements.txt
openai>=1.12.0
psycopg2-binary>=2.9.9
langchain>=0.1.0
langchain-openai>=0.0.5
langchain-community>=0.0.20
pypdf>=3.17.0
python-dotenv>=1.0.0
tiktoken>=0.5.0
beautifulsoup4>=4.12.0
requests>=2.31.0
# Install
pip install -r requirements.txt
Database Configuration
-- PostgreSQL initial setup
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pg_trgm; -- for full-text search
-- Complete RAG schema
CREATE TABLE IF NOT EXISTS rag_documents (
id BIGSERIAL PRIMARY KEY,
-- Source information
source_path TEXT NOT NULL,
source_type TEXT NOT NULL CHECK (source_type IN ('pdf', 'txt', 'md', 'html', 'docx')),
source_hash TEXT NOT NULL, -- MD5 hash of original file
-- Chunk info
chunk_index INTEGER NOT NULL,
chunk_total INTEGER,
-- Content
title TEXT,
content TEXT NOT NULL,
content_length INTEGER GENERATED ALWAYS AS (length(content)) STORED,
-- Embedding
embedding_model TEXT NOT NULL DEFAULT 'text-embedding-3-small',
embedding vector(1536),
-- Metadata
metadata JSONB DEFAULT '{}',
tags TEXT[] DEFAULT '{}',
-- Timestamps
ingested_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (source_path, chunk_index, source_hash)
);
-- HNSW index for fast vector search
CREATE INDEX idx_rag_embedding_hnsw
ON rag_documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- GIN index for full-text search
CREATE INDEX idx_rag_content_fts
ON rag_documents
USING gin (to_tsvector('english', content));
-- Common filter indexes
CREATE INDEX idx_rag_source_type ON rag_documents (source_type);
CREATE INDEX idx_rag_tags ON rag_documents USING gin (tags);
CREATE INDEX idx_rag_metadata ON rag_documents USING gin (metadata);
Document Ingestion Pipeline
Project Structure
rag_system/
├── config.py # DB config, API keys, parameters
├── ingestion/
│ ├── __init__.py
│ ├── loaders.py # Document loading from various sources
│ ├── parsers.py # PDF, DOCX, HTML, Markdown parsing
│ ├── chunkers.py # Chunking strategies
│ └── pipeline.py # Ingestion pipeline orchestrator
├── retrieval/
│ ├── __init__.py
│ ├── embedder.py # Embedding generation
│ └── searcher.py # Vector search and hybrid search
├── generation/
│ ├── __init__.py
│ ├── prompts.py # Prompt templates
│ └── generator.py # LLM integration
├── rag.py # Main RAGSystem class
└── main.py # Entry point
config.py
import os
from dataclasses import dataclass
from dotenv import load_dotenv
load_dotenv()
@dataclass
class Config:
# Database
db_host: str = os.getenv("DB_HOST", "localhost")
db_port: int = int(os.getenv("DB_PORT", "5432"))
db_name: str = os.getenv("DB_NAME", "ragdb")
db_user: str = os.getenv("DB_USER", "postgres")
db_password: str = os.getenv("DB_PASSWORD", "")
# OpenAI
openai_api_key: str = os.getenv("OPENAI_API_KEY", "")
embedding_model: str = "text-embedding-3-small"
embedding_dim: int = 1536
chat_model: str = "gpt-4o-mini" # cost-effective default
# Chunking
chunk_size: int = 800
chunk_overlap: int = 150
min_chunk_size: int = 100
# Retrieval
top_k: int = 5
similarity_threshold: float = 0.65 # minimum cosine similarity
# Generation
max_context_tokens: int = 8000
temperature: float = 0.1 # low temperature for factual answers
def get_db_url(self) -> str:
return f"postgresql://{self.db_user}:{self.db_password}@{self.db_host}:{self.db_port}/{self.db_name}"
config = Config()
ingestion/loaders.py - Multi-Source Document Loading
import hashlib
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
import requests
from bs4 import BeautifulSoup
@dataclass
class RawDocument:
content: str
source_path: str
source_type: str
source_hash: str
title: Optional[str] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
def load_text_file(path: str) -> RawDocument:
p = Path(path)
content = p.read_text(encoding="utf-8")
return RawDocument(
content=content,
source_path=path,
source_type="txt",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=p.stem
)
def load_markdown_file(path: str) -> RawDocument:
p = Path(path)
content = p.read_text(encoding="utf-8")
# Extract title from frontmatter or first H1 line
title = None
for line in content.split("\n"):
if line.startswith("# "):
title = line[2:].strip()
break
return RawDocument(
content=content,
source_path=path,
source_type="md",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=title
)
def load_pdf_file(path: str) -> RawDocument:
from pypdf import PdfReader
reader = PdfReader(path)
pages = []
for page in reader.pages:
pages.append(page.extract_text())
content = "\n\n".join(pages)
return RawDocument(
content=content,
source_path=path,
source_type="pdf",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=Path(path).stem,
metadata={"pages": len(reader.pages)}
)
def load_url(url: str) -> RawDocument:
response = requests.get(url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove script, style, nav, header, footer
for tag in soup(["script", "style", "nav", "header", "footer"]):
tag.decompose()
content = soup.get_text(separator="\n", strip=True)
title = soup.title.string if soup.title else url
return RawDocument(
content=content,
source_path=url,
source_type="html",
source_hash=hashlib.md5(content.encode()).hexdigest(),
title=title
)
def load_document(source: str) -> RawDocument:
"""Smart loader that picks the correct parser."""
if source.startswith("http"):
return load_url(source)
p = Path(source)
loaders = {
".txt": load_text_file,
".md": load_markdown_file,
".pdf": load_pdf_file,
}
loader = loaders.get(p.suffix.lower())
if not loader:
raise ValueError(f"Unsupported file type: {p.suffix}")
return loader(source)
ingestion/chunkers.py - Smart Chunking
from langchain.text_splitter import RecursiveCharacterTextSplitter
from dataclasses import dataclass
from typing import Optional
@dataclass
class TextChunk:
content: str
chunk_index: int
source_path: str
source_type: str
source_hash: str
title: Optional[str] = None
metadata: dict = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}
class SmartChunker:
"""
Chunker that adapts its strategy to the document type.
"""
def __init__(self, chunk_size: int = 800, chunk_overlap: int = 150):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Generic text splitter
self._text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", "! ", "? ", "; ", ", ", " "],
length_function=len
)
# Markdown-aware splitter (respects document structure)
self._md_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["## ", "# ", "\n\n", "\n", ". "],
length_function=len
)
def chunk(self, doc) -> list[TextChunk]:
"""Chunk a document, choosing the right strategy."""
if doc.source_type == "md":
raw_chunks = self._md_splitter.split_text(doc.content)
else:
raw_chunks = self._text_splitter.split_text(doc.content)
# Filter out chunks that are too small
raw_chunks = [c for c in raw_chunks if len(c.strip()) > 100]
return [
TextChunk(
content=chunk.strip(),
chunk_index=i,
source_path=doc.source_path,
source_type=doc.source_type,
source_hash=doc.source_hash,
title=doc.title,
metadata={
**doc.metadata,
"chunk_total": len(raw_chunks),
"char_count": len(chunk)
}
)
for i, chunk in enumerate(raw_chunks)
]
ingestion/pipeline.py - Main Orchestrator
import psycopg2
from psycopg2.extras import execute_values
import json
import time
from .loaders import load_document
from .chunkers import SmartChunker
class IngestionPipeline:
def __init__(self, config, embedder):
self.config = config
self.embedder = embedder
self.chunker = SmartChunker(
chunk_size=config.chunk_size,
chunk_overlap=config.chunk_overlap
)
self.conn = psycopg2.connect(config.get_db_url())
def is_already_ingested(self, source_path: str, source_hash: str) -> bool:
"""Check if the document is already in DB with the same hash (unchanged)."""
with self.conn.cursor() as cur:
cur.execute(
"SELECT COUNT(*) FROM rag_documents WHERE source_path = %s AND source_hash = %s",
(source_path, source_hash)
)
return cur.fetchone()[0] > 0
def ingest(self, source: str, tags: list[str] = None, force: bool = False) -> dict:
"""
Process a document and insert it into PostgreSQL.
Returns statistics about the operation.
"""
tags = tags or []
start_time = time.time()
# 1. Load document
doc = load_document(source)
print(f"Loaded: {source} ({len(doc.content)} chars, hash: {doc.source_hash[:8]})")
# 2. Check if already present (incremental update)
if not force and self.is_already_ingested(source, doc.source_hash):
print(f" Skipped: document unchanged")
return {"skipped": True, "source": source}
# 3. Chunking
chunks = self.chunker.chunk(doc)
print(f" Chunked: {len(chunks)} chunks created")
# 4. Delete previous version (if exists)
with self.conn.cursor() as cur:
cur.execute("DELETE FROM rag_documents WHERE source_path = %s", (source,))
# 5. Generate embeddings in batch
texts = [c.content for c in chunks]
embeddings = self.embedder.embed_batch(texts)
print(f" Embeddings: {len(embeddings)} vectors of dim {len(embeddings[0])}")
# 6. Insert into PostgreSQL
rows = [
(
c.source_path,
c.source_type,
c.source_hash,
c.chunk_index,
len(chunks), # chunk_total
c.title,
c.content,
self.config.embedding_model,
embeddings[i],
json.dumps(c.metadata),
tags
)
for i, c in enumerate(chunks)
]
with self.conn.cursor() as cur:
execute_values(cur, """
INSERT INTO rag_documents
(source_path, source_type, source_hash, chunk_index, chunk_total,
title, content, embedding_model, embedding, metadata, tags)
VALUES %s
ON CONFLICT (source_path, chunk_index, source_hash) DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
updated_at = NOW()
""", rows, template="(%s,%s,%s,%s,%s,%s,%s,%s,%s::vector,%s::jsonb,%s::text[])")
self.conn.commit()
elapsed = time.time() - start_time
stats = {
"source": source,
"chunks": len(chunks),
"embeddings": len(embeddings),
"elapsed_sec": round(elapsed, 2)
}
print(f" Completed in {elapsed:.1f}s - {stats}")
return stats
def ingest_directory(self, directory: str, extensions: list[str] = None) -> list[dict]:
"""Ingest all documents in a directory."""
from pathlib import Path
extensions = extensions or [".txt", ".md", ".pdf"]
results = []
for path in Path(directory).rglob("*"):
if path.suffix.lower() in extensions:
result = self.ingest(str(path))
results.append(result)
return results
Retrieval: Finding the Right Chunks
retrieval/searcher.py
import psycopg2
from dataclasses import dataclass
from typing import Optional
@dataclass
class SearchResult:
id: int
source_path: str
source_type: str
chunk_index: int
title: Optional[str]
content: str
similarity: float
metadata: dict
class HybridSearcher:
"""
Combines vector search (semantic) with full-text search (keyword).
Uses Reciprocal Rank Fusion to merge the two result sets.
"""
def __init__(self, config, embedder):
self.config = config
self.embedder = embedder
self.conn = psycopg2.connect(config.get_db_url())
def vector_search(self, query: str, top_k: int = 10,
source_type: Optional[str] = None,
tags: Optional[list[str]] = None) -> list[SearchResult]:
"""Semantic search with optional filters."""
query_embedding = self.embedder.embed_single(query)
threshold = 1 - self.config.similarity_threshold # convert to cosine distance
# Build dynamic query with optional filters
filters = ["embedding <=> %s::vector < %s"]
params = [query_embedding, threshold]
if source_type:
filters.append("source_type = %s")
params.append(source_type)
if tags:
filters.append("tags && %s::text[]") -- overlap: at least one tag in common
params.append(tags)
where_clause = " AND ".join(filters)
with self.conn.cursor() as cur:
cur.execute(f"""
SELECT
id, source_path, source_type, chunk_index, title, content,
1 - (embedding <=> %s::vector) AS similarity,
metadata
FROM rag_documents
WHERE {where_clause}
ORDER BY embedding <=> %s::vector
LIMIT %s
""", [query_embedding] + params + [query_embedding, top_k])
rows = cur.fetchall()
return [
SearchResult(
id=r[0], source_path=r[1], source_type=r[2],
chunk_index=r[3], title=r[4], content=r[5],
similarity=round(r[6], 4), metadata=r[7]
)
for r in rows
]
def fulltext_search(self, query: str, top_k: int = 10) -> list[SearchResult]:
"""Full-text search with ts_rank for ranking."""
with self.conn.cursor() as cur:
cur.execute("""
SELECT
id, source_path, source_type, chunk_index, title, content,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) AS rank,
metadata
FROM rag_documents
WHERE to_tsvector('english', content) @@
plainto_tsquery('english', %s)
ORDER BY rank DESC
LIMIT %s
""", (query, query, top_k))
rows = cur.fetchall()
return [
SearchResult(
id=r[0], source_path=r[1], source_type=r[2],
chunk_index=r[3], title=r[4], content=r[5],
similarity=round(float(r[6]), 4), metadata=r[7]
)
for r in rows
]
def hybrid_search(self, query: str, top_k: int = 5,
vector_weight: float = 0.7) -> list[SearchResult]:
"""
Reciprocal Rank Fusion (RRF) to combine vector and full-text results.
RRF Score = sum(1 / (k + rank)) for each result list.
"""
k_rrf = 60 # standard RRF constant
# Get both result sets
vector_results = self.vector_search(query, top_k=top_k * 2)
fts_results = self.fulltext_search(query, top_k=top_k * 2)
# Compute RRF scores
scores = {}
all_results = {}
for rank, result in enumerate(vector_results):
scores[result.id] = scores.get(result.id, 0) + vector_weight / (k_rrf + rank + 1)
all_results[result.id] = result
fts_weight = 1 - vector_weight
for rank, result in enumerate(fts_results):
scores[result.id] = scores.get(result.id, 0) + fts_weight / (k_rrf + rank + 1)
all_results[result.id] = result
# Sort by RRF score and take top_k
sorted_ids = sorted(scores.keys(), key=lambda x: scores[x], reverse=True)
final_results = [all_results[id] for id in sorted_ids[:top_k]]
# Update similarity with normalized RRF score
max_score = scores[sorted_ids[0]] if sorted_ids else 1
for result in final_results:
result.similarity = round(scores[result.id] / max_score, 4)
return final_results
Generation: From Context to Answer
generation/prompts.py
from string import Template
# System prompt that defines the AI's behavior
RAG_SYSTEM_PROMPT = """You are a precise and helpful AI assistant. Answer questions
based EXCLUSIVELY on the provided context documents.
Rules:
1. Use ONLY information present in the context. Do not make things up.
2. If the answer is not in the context, say so clearly.
3. Cite sources using [Source: filename, chunk X] after each claim.
4. Keep a professional and concise tone.
5. Structure the answer clearly with paragraphs or bullet points where appropriate.
"""
def build_rag_prompt(query: str, context_chunks: list, include_sources: bool = True) -> str:
"""
Build the prompt for the LLM with the retrieved context.
Args:
query: The user's question
context_chunks: List of SearchResult objects
include_sources: Whether to include source information
Returns:
The formatted prompt for the LLM
"""
if not context_chunks:
return f"Question: {query}\n\nNote: No relevant documents found in the knowledge base."
# Build context with numbering and source
context_parts = []
for i, chunk in enumerate(context_chunks, 1):
source_info = f"[Source: {chunk.source_path}, chunk {chunk.chunk_index}]" if include_sources else ""
context_parts.append(f"--- Document {i} {source_info} ---\n{chunk.content}")
context_text = "\n\n".join(context_parts)
return f"""Context from documents:
{context_text}
---
User question: {query}
Answer based on the provided context."""
generation/generator.py
from openai import OpenAI
from dataclasses import dataclass
from typing import Optional
import tiktoken
from .prompts import RAG_SYSTEM_PROMPT, build_rag_prompt
@dataclass
class RAGResponse:
answer: str
sources: list[dict]
model: str
total_tokens: int
prompt_tokens: int
completion_tokens: int
class RAGGenerator:
def __init__(self, config):
self.config = config
self.client = OpenAI(api_key=config.openai_api_key)
self.tokenizer = tiktoken.encoding_for_model("gpt-4o")
def count_tokens(self, text: str) -> int:
return len(self.tokenizer.encode(text))
def truncate_context(self, chunks: list, max_tokens: int) -> list:
"""
Truncate context to avoid exceeding the token limit.
Keeps the most relevant chunks (already sorted by similarity).
"""
selected = []
used_tokens = 0
for chunk in chunks:
chunk_tokens = self.count_tokens(chunk.content)
if used_tokens + chunk_tokens > max_tokens:
break
selected.append(chunk)
used_tokens += chunk_tokens
return selected
def generate(self, query: str, context_chunks: list,
stream: bool = False) -> RAGResponse:
"""
Generate a RAG response.
Args:
query: The user's question
context_chunks: Chunks retrieved from PostgreSQL
stream: If True, use streaming (not implemented here for simplicity)
"""
# Truncate context if necessary
max_context_tokens = self.config.max_context_tokens
truncated_chunks = self.truncate_context(context_chunks, max_context_tokens)
if len(truncated_chunks) < len(context_chunks):
print(f" Context truncated: {len(context_chunks)} -> {len(truncated_chunks)} chunks")
# Build the prompt
user_prompt = build_rag_prompt(query, truncated_chunks)
# Call the LLM
response = self.client.chat.completions.create(
model=self.config.chat_model,
messages=[
{"role": "system", "content": RAG_SYSTEM_PROMPT},
{"role": "user", "content": user_prompt}
],
temperature=self.config.temperature,
max_tokens=1500
)
answer = response.choices[0].message.content
usage = response.usage
# Prepare sources for the response
sources = [
{
"source": chunk.source_path,
"chunk_index": chunk.chunk_index,
"similarity": chunk.similarity,
"excerpt": chunk.content[:200] + "..."
}
for chunk in truncated_chunks
]
return RAGResponse(
answer=answer,
sources=sources,
model=self.config.chat_model,
total_tokens=usage.total_tokens,
prompt_tokens=usage.prompt_tokens,
completion_tokens=usage.completion_tokens
)
The Complete RAG System
rag.py - Main Class
from config import config, Config
from ingestion.pipeline import IngestionPipeline
from retrieval.searcher import HybridSearcher
from generation.generator import RAGGenerator
class EmbeddingService:
"""Wrapper for OpenAI embedding generation."""
def __init__(self, cfg: Config):
from openai import OpenAI
self.client = OpenAI(api_key=cfg.openai_api_key)
self.model = cfg.embedding_model
def embed_single(self, text: str) -> list[float]:
resp = self.client.embeddings.create(
input=[text.replace("\n", " ")],
model=self.model
)
return resp.data[0].embedding
def embed_batch(self, texts: list[str]) -> list[list[float]]:
cleaned = [t.replace("\n", " ").strip() for t in texts]
resp = self.client.embeddings.create(input=cleaned, model=self.model)
return [item.embedding for item in resp.data]
class RAGSystem:
"""
Complete RAG system: ingestion + retrieval + generation.
"""
def __init__(self, cfg: Config = None):
self.config = cfg or config
self.embedder = EmbeddingService(self.config)
self.ingestion = IngestionPipeline(self.config, self.embedder)
self.searcher = HybridSearcher(self.config, self.embedder)
self.generator = RAGGenerator(self.config)
def add_document(self, source: str, tags: list[str] = None) -> dict:
"""Add a document to the knowledge base."""
return self.ingestion.ingest(source, tags=tags)
def add_directory(self, directory: str, extensions: list[str] = None) -> list[dict]:
"""Add all documents in a directory."""
return self.ingestion.ingest_directory(directory, extensions)
def ask(self, question: str, use_hybrid: bool = True,
source_type: str = None) -> dict:
"""
Ask a question to the RAG system.
Returns:
dict with answer, sources, usage
"""
# 1. Retrieval
if use_hybrid:
chunks = self.searcher.hybrid_search(question, top_k=self.config.top_k)
else:
chunks = self.searcher.vector_search(
question, top_k=self.config.top_k, source_type=source_type
)
if not chunks:
return {
"answer": "No relevant information found to answer this question.",
"sources": [],
"retrieval": {"chunks_found": 0}
}
# 2. Generation
response = self.generator.generate(question, chunks)
return {
"answer": response.answer,
"sources": response.sources,
"retrieval": {
"chunks_found": len(chunks),
"top_similarity": chunks[0].similarity if chunks else 0
},
"usage": {
"model": response.model,
"total_tokens": response.total_tokens
}
}
main.py - System Usage
from rag import RAGSystem
# Initialize the system
rag = RAGSystem()
# --- INGESTION ---
print("=== Adding documents to the knowledge base ===")
# Add individual files
rag.add_document("docs/postgresql_guide.pdf", tags=["postgresql", "database"])
rag.add_document("docs/pgvector_tutorial.md", tags=["pgvector", "vector-search"])
rag.add_document("https://www.postgresql.org/docs/current/", tags=["official-docs"])
# Add an entire directory
stats = rag.add_directory("docs/", extensions=[".md", ".txt", ".pdf"])
print(f"Ingested {len(stats)} documents")
# --- QUERY ---
print("\n=== Querying the system ===")
questions = [
"How do I install pgvector on PostgreSQL 16?",
"What is the difference between HNSW and IVFFlat?",
"How do I optimize memory for vector search?",
]
for q in questions:
print(f"\nQuestion: {q}")
print("-" * 60)
result = rag.ask(q)
print(f"Answer:\n{result['answer']}")
print(f"\nSources used ({len(result['sources'])}):")
for src in result["sources"]:
print(f" - {src['source']} [similarity: {src['similarity']}]")
print(f"\nTokens used: {result['usage']['total_tokens']}")
Hybrid Search: PostgreSQL Full-Text + Vector
One of PostgreSQL's great strengths for RAG is that you can combine semantic (vector) search with classic full-text search in a single query. This is especially useful for queries containing precise technical terms (proper names, acronyms, software versions) that semantic search alone might not capture perfectly:
-- Pure SQL hybrid search: vector + full-text in a single query
WITH vector_search AS (
SELECT id, content, source_path, chunk_index,
1 - (embedding <=> %s::vector) AS vector_score,
ROW_NUMBER() OVER (ORDER BY embedding <=> %s::vector) AS vector_rank
FROM rag_documents
ORDER BY embedding <=> %s::vector
LIMIT 20
),
fts_search AS (
SELECT id, content, source_path, chunk_index,
ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) AS fts_score,
ROW_NUMBER() OVER (
ORDER BY ts_rank(to_tsvector('english', content),
plainto_tsquery('english', %s)) DESC
) AS fts_rank
FROM rag_documents
WHERE to_tsvector('english', content) @@ plainto_tsquery('english', %s)
LIMIT 20
),
-- Reciprocal Rank Fusion
rrf AS (
SELECT
COALESCE(v.id, f.id) AS id,
COALESCE(v.content, f.content) AS content,
COALESCE(v.source_path, f.source_path) AS source_path,
-- RRF score: 0.7 * vector_weight + 0.3 * fts_weight
COALESCE(0.7 / (60 + v.vector_rank), 0) +
COALESCE(0.3 / (60 + f.fts_rank), 0) AS rrf_score
FROM vector_search v
FULL OUTER JOIN fts_search f ON v.id = f.id
)
SELECT id, content, source_path, rrf_score
FROM rrf
ORDER BY rrf_score DESC
LIMIT 5;
RAG Quality Evaluation
How do you measure whether your RAG system is working well? The key metrics are:
| Metric | What It Measures | Target | How to Compute |
|---|---|---|---|
| Recall@K | Right documents found in top K results | > 0.70 | Test set with ground truth |
| Precision@K | Retrieved results are actually relevant | > 0.60 | Manual annotation |
| Answer Faithfulness | Answer is grounded in retrieved context | > 0.80 | RAGAS framework |
| Answer Relevancy | Answer addresses the question asked | > 0.75 | RAGAS framework |
| P95 Latency | Response time at 95th percentile | < 3s | Production monitoring |
# Evaluation with RAGAS
# pip install ragas
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_recall
from datasets import Dataset
# Prepare the test dataset
test_data = {
"question": [
"How do I create an HNSW index in pgvector?",
"What is the vector dimension limit in pgvector?",
],
"answer": [
# Answers generated by your RAG system
rag.ask("How do I create an HNSW index in pgvector?")["answer"],
rag.ask("What is the vector dimension limit in pgvector?")["answer"],
],
"contexts": [
# The chunks retrieved for each question
[c["excerpt"] for c in rag.ask("How do I create an HNSW index in pgvector?")["sources"]],
[c["excerpt"] for c in rag.ask("What is the vector dimension limit in pgvector?")["sources"]],
],
"ground_truth": [
"CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64)",
"The limit is 16000 dimensions for vector type in pgvector 0.7+",
]
}
dataset = Dataset.from_dict(test_data)
results = evaluate(dataset, metrics=[faithfulness, answer_relevancy, context_recall])
print(results)
Advanced Chunking Strategies
Chunking quality is one of the most important factors for RAG performance. A poorly calibrated chunking strategy can degrade results even with the best embedding model. Here are advanced strategies for specific use cases:
Header-Based Chunking for Structured Documents
import re
from typing import Generator
def chunk_by_headers(markdown_text: str, max_chunk_size: int = 800) -> Generator:
"""
Chunking that respects the hierarchical structure of Markdown documents.
Each H2/H3 section becomes a separate context, preserving the title
as the chunk header (critical for embedding quality - the model needs
to understand what topic the chunk is about).
"""
# Regex to find Markdown headers (H1-H4)
header_pattern = re.compile(r'^(#{1,4})\s+(.+)






