LangChain for RAG: Advanced Framework and Patterns
LangChain has become the reference framework for building LLM-based applications. With over 80,000 GitHub stars and a rapidly growing community, it offers powerful abstractions for every component of a RAG system: document loaders, text splitters, embedding models, vector stores, retrievers and chains. But its real power emerges when you combine these building blocks into advanced patterns.
In this article we will build complete RAG systems with LangChain: starting from the basic pipeline up to advanced patterns like conversational RAG (contextual memory between consecutive questions), multi-hop retrieval (queries requiring multiple reasoning steps), self-query retrieval (automatic semantic filtering of metadata), and ensemble retrieval (hybrid search with BM25 + dense). All with executable code examples.
What You Will Learn
- LangChain architecture: chains, runnables and LCEL (LangChain Expression Language)
- Basic RAG pipeline with LangChain: from documentation to response
- Conversational RAG: contextual memory and history management
- Multi-hop retrieval for questions requiring multi-step reasoning
- Self-query retrieval: automatic metadata filtering from the query
- Ensemble retriever and hybrid search in LangChain
- Streaming responses for better UX in production
- Debugging and testing LangChain pipelines with LangSmith
1. LangChain Expression Language (LCEL)
Starting from version 0.1.0, LangChain introduced the LangChain Expression
Language (LCEL): a declarative syntax based on the pipe pattern (|)
for composing chains in a readable, type-safe way. LCEL is optimized for streaming,
parallelism and tracing, and is the modern way to build LangChain pipelines.
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_qdrant import QdrantVectorStore
# Setup base components
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.1)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# RAG prompt template
rag_prompt = ChatPromptTemplate.from_template("""
You are a precise technical assistant. Answer the question based ONLY on the
provided context. If the context does not contain sufficient information, say so.
Context:
{context}
Question: {question}
Answer:""")
# Vector store (assuming Qdrant locally)
vectorstore = QdrantVectorStore.from_existing_collection(
embedding=embeddings,
url="http://localhost:6333",
collection_name="rag_docs"
)
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5}
)
def format_docs(docs):
"""Format retrieved documents as context string"""
return "\n\n---\n\n".join(
f"[Source: {doc.metadata.get('source', 'N/A')}]\n{doc.page_content}"
for doc in docs
)
# LCEL Pipeline with pipe syntax
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| rag_prompt
| llm
| StrOutputParser()
)
# Simple invocation
answer = rag_chain.invoke("What is RAG and what problems does it solve?")
print(answer)
# Streaming (important for production UX!)
for chunk in rag_chain.stream("What are the main vector databases?"):
print(chunk, end="", flush=True)
1.1 RunnableParallel for Multiple Contexts
One of LCEL's capabilities is parallel composition: you can retrieve contexts from different sources in parallel and combine them before passing to the LLM.
from langchain_core.runnables import RunnableParallel
# Two different retrievers: technical docs and FAQ
tech_retriever = tech_vectorstore.as_retriever(search_kwargs={"k": 3})
faq_retriever = faq_vectorstore.as_retriever(search_kwargs={"k": 2})
# Pipeline with parallel retrieval
multi_source_chain = (
RunnableParallel(
tech_context=tech_retriever | format_docs,
faq_context=faq_retriever | format_docs,
question=RunnablePassthrough()
)
| ChatPromptTemplate.from_template("""
Question: {question}
Technical Documentation:
{tech_context}
FAQ:
{faq_context}
Answer based on both sources:""")
| llm
| StrOutputParser()
)
answer = multi_source_chain.invoke("How do I configure authentication?")
2. Complete Base RAG Pipeline
Before tackling advanced patterns, let us build a complete and robust RAG pipeline with LangChain: from document ingestion to retrieval to answer generation.
from langchain_community.document_loaders import (
PyPDFLoader, TextLoader, DirectoryLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_qdrant import QdrantVectorStore
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from typing import List
import logging
logger = logging.getLogger(__name__)
class LangChainRAGSystem:
"""Complete RAG system with LangChain"""
def __init__(self, collection_name="rag_docs",
embedding_model="text-embedding-3-small",
llm_model="gpt-4o-mini"):
self.embeddings = OpenAIEmbeddings(model=embedding_model)
self.llm = ChatOpenAI(model=llm_model, temperature=0.1)
self.collection_name = collection_name
# Text splitter optimized for RAG
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=64,
separators=["\n\n", "\n", ". ", "! ", "? ", " "],
add_start_index=True # saves position in original document
)
self.vectorstore = self._init_vectorstore()
# MMR for diversity: fetch 20 candidates, select top 5 with max diversity
self.retriever = self.vectorstore.as_retriever(
search_type="mmr",
search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.7}
)
self.prompt = ChatPromptTemplate.from_template("""
You are a precise technical assistant. Answer the question based EXCLUSIVELY on the
provided context. Do not invent information not present in the context.
If the context is insufficient for a complete answer, say so explicitly.
Context:
{context}
Question: {question}
Answer:""")
self.chain = self._build_chain()
def _init_vectorstore(self):
try:
return QdrantVectorStore.from_existing_collection(
embedding=self.embeddings,
url="http://localhost:6333",
collection_name=self.collection_name
)
except Exception:
return QdrantVectorStore.from_documents(
documents=[],
embedding=self.embeddings,
url="http://localhost:6333",
collection_name=self.collection_name
)
def _build_chain(self):
def format_docs(docs):
formatted = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get("source", "N/A")
page = doc.metadata.get("page", "")
header = f"[Source {i}: {source}{f', p.{page}' if page else ''}]"
formatted.append(f"{header}\n{doc.page_content}")
return "\n\n---\n\n".join(formatted)
return (
{"context": self.retriever | format_docs, "question": RunnablePassthrough()}
| self.prompt
| self.llm
| StrOutputParser()
)
def ingest_pdf(self, pdf_path: str) -> int:
"""Ingest a PDF into the RAG system"""
loader = PyPDFLoader(pdf_path)
documents = loader.load()
chunks = self.text_splitter.split_documents(documents)
for chunk in chunks:
chunk.metadata["doc_type"] = "pdf"
self.vectorstore.add_documents(chunks)
logger.info(f"Ingested {len(chunks)} chunks from {pdf_path}")
return len(chunks)
def query(self, question: str) -> str:
return self.chain.invoke(question)
def query_with_sources(self, question: str) -> dict:
docs = self.retriever.invoke(question)
answer = self.chain.invoke(question)
sources = list(set(doc.metadata.get("source", "N/A") for doc in docs))
return {"answer": answer, "sources": sources, "num_docs": len(docs)}
3. Conversational RAG: Contextual Memory
The problem with basic RAG is that every query is treated independently. In a real conversation, users expect the system to remember the context of previous questions. "What about the second option?" makes no sense without knowing what was being discussed. Conversational RAG solves this problem.
LangChain handles conversation in two steps:
- Query reformulation: given chat history, reformulate the current question into a standalone query containing all necessary context for retrieval
- RAG with history: use the reformulated query for retrieval, then generate the response providing both retrieved context and chat history
from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from typing import Dict
class ConversationalRAG:
"""Conversational RAG with chat history memory"""
def __init__(self, retriever, llm):
self.retriever = retriever
self.llm = llm
self.store: Dict[str, ChatMessageHistory] = {}
# Step 1: Prompt to reformulate query using history
contextualize_q_prompt = ChatPromptTemplate.from_messages([
("system", """Given a chat history and the user's latest question,
which might reference context in the chat history, formulate a standalone question
that is understandable without the chat history. Do NOT answer the question,
just reformulate it if needed, otherwise return it as-is."""),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
# History-aware retriever: reformulates query before retrieval
self.history_aware_retriever = create_history_aware_retriever(
llm, retriever, contextualize_q_prompt
)
# Step 2: Prompt for answer with context and history
qa_prompt = ChatPromptTemplate.from_messages([
("system", """You are a precise technical assistant. Answer the question
based on the provided context and conversation history.
If the context does not contain the answer, say so clearly.
Context:
{context}"""),
MessagesPlaceholder("chat_history"),
("human", "{input}")
])
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt)
self.rag_chain = create_retrieval_chain(
self.history_aware_retriever, question_answer_chain
)
# Wrapper with automatic history management
self.conversational_rag = RunnableWithMessageHistory(
self.rag_chain,
self._get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
output_messages_key="answer"
)
def _get_session_history(self, session_id: str) -> ChatMessageHistory:
if session_id not in self.store:
self.store[session_id] = ChatMessageHistory()
return self.store[session_id]
def chat(self, message: str, session_id: str = "default") -> str:
result = self.conversational_rag.invoke(
{"input": message},
config={"configurable": {"session_id": session_id}}
)
return result["answer"]
# Multi-turn conversation example
conv_rag = ConversationalRAG(retriever=retriever, llm=llm)
questions = [
"What is LangChain?",
"What are its main components?", # "its" refers to LangChain
"Which of these is most important for RAG?" # "these" = components mentioned
]
for q in questions:
answer = conv_rag.chat(q, session_id="user123")
print(f"Q: {q}")
print(f"A: {answer}\n")
4. Self-Query Retrieval: Automatic Metadata Filtering
Self-Query Retrieval is one of LangChain's most powerful patterns: it allows the LLM to interpret the user's natural language query and automatically extract both the semantic query and metadata filters. The user writes "2024 articles on RAG written by experts" and the system automatically extracts the filter for year=2024 and type="expert".
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain.chains.query_constructor.base import AttributeInfo
from langchain_openai import ChatOpenAI
# Describe available metadata in the vector store
metadata_field_info = [
AttributeInfo(
name="source",
description="The source file or URL of the document",
type="string",
),
AttributeInfo(
name="author",
description="The author of the document or article",
type="string",
),
AttributeInfo(
name="year",
description="Publication year (e.g. 2023, 2024)",
type="integer",
),
AttributeInfo(
name="category",
description="Content category (e.g. 'tutorial', 'paper', 'documentation')",
type="string",
),
AttributeInfo(
name="difficulty",
description="Difficulty level (beginner, intermediate, advanced)",
type="string",
),
]
document_content_description = """
Technical articles and documentation on AI engineering, RAG, LLMs, embeddings,
vector databases and machine learning.
"""
self_query_retriever = SelfQueryRetriever.from_llm(
llm=ChatOpenAI(model="gpt-4o-mini", temperature=0),
vectorstore=vectorstore,
document_contents=document_content_description,
metadata_field_info=metadata_field_info,
verbose=True, # shows the generated structured query
search_kwargs={"k": 5}
)
# Natural language queries with implicit filters
examples = [
"2024 tutorials on RAG for beginners",
"Advanced papers on embeddings written by Reimers",
"Documentation on Qdrant or Pinecone"
]
for query in examples:
print(f"\nQuery: {query}")
docs = self_query_retriever.invoke(query)
print(f"Found: {len(docs)} documents")
for doc in docs:
print(f" - {doc.metadata.get('source', 'N/A')} ({doc.metadata.get('year', 'N/A')})")
5. Multi-Hop Retrieval for Complex Questions
Some questions require multiple reasoning steps: "Who developed the model used by LangChain by default and when was it founded?" requires first finding that LangChain uses OpenAI by default, then finding OpenAI's founding date. This is called multi-hop retrieval.
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import List
class MultiHopRAG:
"""RAG with query decomposition into sub-queries"""
def __init__(self, retriever, llm):
self.retriever = retriever
self.llm = llm
# Chain to decompose query into sub-queries
self.decompose_chain = (
ChatPromptTemplate.from_template("""
Decompose this complex question into 2-4 simpler sub-questions that, answered
in sequence, allow answering the original question.
Original question: {question}
Provide sub-questions as a numbered list, one per line.
Only the list, nothing else.""")
| llm
| StrOutputParser()
)
# Chain for final answer with all contexts
self.answer_chain = (
ChatPromptTemplate.from_template("""
You have received information from multiple search steps to answer the question.
Synthesize this information into a coherent and complete answer.
Original question: {original_question}
Gathered information:
{gathered_info}
Synthetic answer:""")
| llm
| StrOutputParser()
)
def _parse_subquestions(self, text: str) -> List[str]:
"""Extract sub-questions from LLM response"""
lines = text.strip().split('\n')
subquestions = []
for line in lines:
line = line.strip()
if line and (line[0].isdigit() or line.startswith('-')):
clean = line.lstrip('0123456789.-) ').strip()
if clean:
subquestions.append(clean)
return subquestions
def multi_hop_query(self, question: str) -> dict:
"""Execute multi-hop retrieval with query decomposition"""
print(f"Original question: {question}\n")
# Step 1: Query decomposition
subquestions_text = self.decompose_chain.invoke({"question": question})
subquestions = self._parse_subquestions(subquestions_text)
print(f"Generated sub-queries: {len(subquestions)}")
# Step 2: Retrieval and answer for each sub-query
gathered_info = []
all_sources = []
for i, subq in enumerate(subquestions, 1):
print(f" Hop {i}: {subq}")
docs = self.retriever.invoke(subq)
context = "\n".join(doc.page_content for doc in docs[:3])
partial_answer = self.llm.invoke(
f"Context: {context}\nQuestion: {subq}\nBrief answer:"
).content
gathered_info.append(
f"Sub-question {i}: {subq}\nAnswer: {partial_answer}"
)
all_sources.extend(doc.metadata.get("source", "") for doc in docs)
# Step 3: Final synthesis
final_answer = self.answer_chain.invoke({
"original_question": question,
"gathered_info": "\n\n".join(gathered_info)
})
return {
"answer": final_answer,
"subquestions": subquestions,
"num_hops": len(subquestions),
"sources": list(set(s for s in all_sources if s))
}
6. Ensemble Retriever and Hybrid Search
LangChain provides an EnsembleRetriever that combines multiple retrievers with configurable weights, applying Reciprocal Rank Fusion for the final ranking. It is the simplest way to implement hybrid search (BM25 + vector) in LangChain.
from langchain.retrievers import EnsembleRetriever, BM25Retriever
# BM25 retriever for keyword search
bm25_retriever = BM25Retriever.from_documents(documents, k=5)
# Dense retriever for semantic search
dense_retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
# Ensemble with weights: 40% BM25, 60% dense
ensemble_retriever = EnsembleRetriever(
retrievers=[bm25_retriever, dense_retriever],
weights=[0.4, 0.6]
# Reciprocal Rank Fusion combines results from both retrievers
)
# Standard usage - identical interface to any retriever
docs = ensemble_retriever.invoke("How to implement reranking?")
# Integration in LCEL chain
hybrid_rag_chain = (
{"context": ensemble_retriever | format_docs, "question": RunnablePassthrough()}
| rag_prompt
| llm
| StrOutputParser()
)
answer = hybrid_rag_chain.invoke("BM25 + vector search tutorial")
7. LangSmith: Tracing and Debugging
LangSmith is the observability platform for LangChain. It allows you to visualize every step of the chain, prompts sent to the LLM, retrieved documents, latencies and costs. It is essential for debugging in development and monitoring in production.
import os
from langsmith import Client
# Configure LangSmith
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "rag-production"
# All chain invocations are now automatically traced!
# Visit app.langchain.com to see traces
# Custom evaluator for faithfulness
from langsmith.evaluation import evaluate as ls_evaluate
from langsmith.schemas import Run, Example
def faithfulness_evaluator(run: Run, example: Example) -> dict:
"""Custom faithfulness evaluator using LLM-as-judge"""
answer = run.outputs.get("answer", "")
context = run.outputs.get("context", "")
judge = ChatOpenAI(model="gpt-4o-mini", temperature=0)
score = judge.invoke(
f"""On a scale 0-1, how well is the following answer supported by the context?
Answer: {answer}
Context: {context[:500]}
Reply ONLY with a number between 0 and 1."""
).content
try:
return {"score": float(score.strip()), "key": "faithfulness"}
except:
return {"score": 0.5, "key": "faithfulness"}
# Evaluate chain on a test dataset
client = Client()
results = ls_evaluate(
lambda inputs: rag_chain.invoke(inputs["question"]),
data="rag-evaluation-dataset",
evaluators=[faithfulness_evaluator],
experiment_prefix="v1-baseline"
)
8. Streaming Responses for Better UX
In production, LLM responses can take 5-15 seconds. Showing words as they are generated (streaming) dramatically improves the user's perception of speed. LCEL supports streaming natively.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
app = FastAPI()
# Async version of the chain for streaming
async_rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| rag_prompt
| llm # llm supports native streaming
| StrOutputParser()
)
@app.get("/rag/stream")
async def stream_rag(question: str):
"""Streaming endpoint via Server-Sent Events"""
async def generate():
docs = await retriever.ainvoke(question)
context = format_docs(docs)
async for chunk in llm.astream(
rag_prompt.format_messages(context=context, question=question)
):
if chunk.content:
yield f"data: {chunk.content}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}
)
@app.post("/rag/query")
async def query_rag(question: str):
"""Standard (non-streaming) endpoint"""
answer = await async_rag_chain.ainvoke(question)
return {"answer": answer}
9. Best Practices and Anti-Patterns
LangChain Best Practices
- Always use LCEL instead of legacy chains (LLMChain, RetrievalQA). LCEL is more performant, type-safe and natively supports streaming.
- Enable LangSmith in development: automatic tracing saves hours of debugging. You can disable it in production to save costs.
- MMR for diversity: use Maximum Marginal Relevance (search_type="mmr") instead of pure similarity to avoid retrieving nearly identical chunks.
- async/await for throughput: use ainvoke and astream for I/O operations (LLM, vector DB). Allows handling concurrent requests without thread overhead.
- Separate retrieval logic from generation: makes code testable and allows mocking the retriever in tests.
Anti-Patterns to Avoid
- Overly nested chains: LangChain allows composing very complex chains. Beyond 3-4 nesting levels it becomes difficult to debug. Consider breaking the chain into functions.
- Ignoring token costs: every document in the context increases cost. Measure and optimize the number of tokens sent to the LLM.
- Unversioned prompt templates: prompts are code. Version them, test them, and track their changes like any other component.
- High LLM temperature for RAG: for RAG use temperature 0.0-0.2. High temperature increases variability, not quality, and tends to increase hallucinations.
Conclusions
LangChain transforms the complexity of a RAG system into a series of composable building blocks. We built pipelines from the simplest (basic RAG with LCEL) to the most advanced (conversational RAG, multi-hop, self-query), covering every aspect relevant to production: streaming, LangSmith tracing, hybrid search and quality best practices.
Key takeaways:
- LCEL is the modern way to compose chains: readable, type-safe, streaming-native
- Conversational RAG requires query reformulation before retrieval
- Self-query retrieval automates metadata filtering from natural language queries
- Multi-hop retrieval decomposes complex questions into sequential sub-queries
- EnsembleRetriever combines BM25 + dense with a single command
- LangSmith is essential for debugging and evaluation in production
In the next article we will explore Context Window Management: how to manage and optimize the LLM's token budget when available context exceeds the model's capabilities.







