Production RAG Pipelines
Retrieval-Augmented Generation (RAG) enhances LLM responses by grounding them in relevant external knowledge. Instead of relying solely on the model's training data, RAG fetches relevant documents and injects them into the prompt --- dramatically reducing hallucinations and enabling answers over private data.
The RAG Pipeline
User Query
|
[1. Query Processing]
|
[2. Retrieval] -----> Vector DB / Search Index
|
[3. Context Assembly]
|
[4. Generation] -----> LLM
|
Response
Each stage offers multiple strategies and optimizations. Let's explore them in depth.
The RAG Triad
Document Loading
The first step is ingesting documents from various sources into your pipeline.
from langchain_community.document_loaders import (
PyPDFLoader,
TextLoader,
CSVLoader,
UnstructuredHTMLLoader,
WebBaseLoader,
DirectoryLoader,
NotionDirectoryLoader,
)Load a PDF
pdf_loader = PyPDFLoader("report.pdf")
pdf_docs = pdf_loader.load()
print(f"Loaded {len(pdf_docs)} pages from PDF")
print(f"Page 1 content: {pdf_docs[0].page_content[:200]}")
print(f"Metadata: {pdf_docs[0].metadata}") # {'source': 'report.pdf', 'page': 0}Load a website
web_loader = WebBaseLoader("https://example.com/article")
web_docs = web_loader.load()Load an entire directory of files
dir_loader = DirectoryLoader(
"./documents/",
glob="**/*.txt",
loader_cls=TextLoader,
show_progress=True,
)
all_docs = dir_loader.load()
print(f"Loaded {len(all_docs)} documents from directory")
Chunking Strategies
Documents must be split into chunks that are small enough to be relevant but large enough to be coherent. This is one of the most impactful decisions in RAG pipeline design.
Fixed-Size Chunking
from langchain.text_splitter import RecursiveCharacterTextSplitterThe most common and generally effective splitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # Target size in characters
chunk_overlap=200, # Overlap between consecutive chunks
separators=["\n\n", "\n", ". ", " ", ""], # Split priority
length_function=len,
)text = """
Chapter 1: Introduction to Machine Learning
Machine learning is a branch of artificial intelligence...
[imagine a long document here]
"""
chunks = splitter.split_text(text)
print(f"Created {len(chunks)} chunks")
for i, chunk in enumerate(chunks[:3]):
print(f"Chunk {i}: {len(chunk)} chars --- {chunk[:80]}...")
Semantic Chunking
Groups content by meaning rather than character count:
from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddingsSplit based on semantic similarity between sentences
semantic_splitter = SemanticChunker(
embeddings=OpenAIEmbeddings(),
breakpoint_threshold_type="percentile",
breakpoint_threshold_amount=85, # Split at 85th percentile of dissimilarity
)semantic_chunks = semantic_splitter.split_text(text)
print(f"Created {len(semantic_chunks)} semantic chunks")
Document-Structure-Aware Chunking
from langchain.text_splitter import MarkdownHeaderTextSplitterheaders_to_split_on = [
("#", "h1"),
("##", "h2"),
("###", "h3"),
]
md_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on,
strip_headers=False,
)
markdown_text = """
Machine Learning
Supervised Learning
Classification
Classification assigns inputs to discrete categories...Regression
Regression predicts continuous values...Unsupervised Learning
Clustering
Clustering groups similar data points...
"""md_chunks = md_splitter.split_text(markdown_text)
for chunk in md_chunks:
print(f"Headers: {chunk.metadata} | Content: {chunk.page_content[:60]}...")
Chunking Best Practices
Retrieval Strategies
How you retrieve chunks is just as important as how you create them.
Dense Retrieval (Vector Search)
Uses embedding similarity --- the standard approach:
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chromaembeddings = OpenAIEmbeddings(model="text-embedding-3-small")
Create vector store from documents
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
collection_name="my_docs",
)Basic similarity search
retriever = vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5},
)docs = retriever.invoke("How does attention work in transformers?")
Sparse Retrieval (BM25 Keyword Search)
Good for exact term matching (names, IDs, technical terms):
from langchain_community.retrievers import BM25Retrieverbm25_retriever = BM25Retriever.from_documents(chunks, k=5)
bm25_results = bm25_retriever.invoke("BERT attention mechanism")
Hybrid Retrieval (Dense + Sparse)
Combines the best of both worlds:
from langchain.retrievers import EnsembleRetrieverCombine dense and sparse retrievers with weights
hybrid_retriever = EnsembleRetriever(
retrievers=[retriever, bm25_retriever],
weights=[0.6, 0.4], # 60% vector, 40% keyword
)results = hybrid_retriever.invoke("BERT attention mechanism")
Reranking
After initial retrieval, a cross-encoder reranks results for better precision:
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoderLoad a cross-encoder reranker
cross_encoder = HuggingFaceCrossEncoder(model_name="cross-encoder/ms-marco-MiniLM-L-6-v2")
reranker = CrossEncoderReranker(model=cross_encoder, top_n=3)Wrap the base retriever with reranking
reranking_retriever = ContextualCompressionRetriever(
base_compressor=reranker,
base_retriever=hybrid_retriever, # Retrieve 20 candidates, rerank to top 3
)reranked_docs = reranking_retriever.invoke("How does self-attention work?")
Retrieval is the Bottleneck
Query Transformation
Raw user queries are often vague, incomplete, or poorly suited for embedding search. Query transformation techniques rewrite or expand queries to improve retrieval.
HyDE (Hypothetical Document Embeddings)
Generate a hypothetical answer, then use THAT as the search query:
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplatellm = ChatOpenAI(model="gpt-4o-mini")
hyde_prompt = ChatPromptTemplate.from_template(
"""Please write a short passage that would answer the following question.
Do not explain that you are writing a passage. Just write the passage directly.
Question: {question}
Passage:"""
)
def hyde_retrieval(question: str, retriever):
"""Use HyDE to improve retrieval."""
# Step 1: Generate a hypothetical document
chain = hyde_prompt | llm
hypothetical_doc = chain.invoke({"question": question}).content
# Step 2: Use the hypothetical doc as the search query
# (it's closer in embedding space to the real answer)
results = retriever.invoke(hypothetical_doc)
return results
docs = hyde_retrieval("What is RLHF?", retriever)
Multi-Query Retrieval
Generate multiple query variations to improve recall:
from langchain.retrievers.multi_query import MultiQueryRetrievermulti_query_retriever = MultiQueryRetriever.from_llm(
retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
llm=ChatOpenAI(model="gpt-4o-mini"),
)
Internally generates ~3 query variations
e.g., "What is RLHF?" might also search:
- "How does reinforcement learning from human feedback work?"
- "Explain the RLHF training process for language models"
docs = multi_query_retriever.invoke("What is RLHF?")
Step-Back Prompting
Retrieve context for a more general version of the question:
step_back_prompt = ChatPromptTemplate.from_template(
"""You are an expert at generating step-back questions.
Given a specific question, generate a more general question that,
if answered, would help answer the original question. Original question: {question}
Step-back question:"""
)
def step_back_retrieval(question: str, retriever, llm):
chain = step_back_prompt | llm
general_question = chain.invoke({"question": question}).content
# Retrieve for both original and step-back question
original_docs = retriever.invoke(question)
general_docs = retriever.invoke(general_question)
# Combine and deduplicate
all_docs = original_docs + general_docs
seen = set()
unique_docs = []
for doc in all_docs:
if doc.page_content not in seen:
seen.add(doc.page_content)
unique_docs.append(doc)
return unique_docs
End-to-End RAG Pipeline
Putting it all together into a production-ready pipeline:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough1. Setup
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vectorstore = Chroma(
collection_name="my_knowledge_base",
embedding_function=embeddings,
persist_directory="./chroma_db",
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)2. RAG Prompt
rag_prompt = ChatPromptTemplate.from_template(
"""You are a helpful assistant. Answer the question based ONLY on
the following context. If the context doesn't contain the answer,
say "I don't have enough information to answer that question."Context:
{context}
Question: {question}
Answer:"""
)
3. Format retrieved documents
def format_docs(docs):
return "\n\n---\n\n".join(
f"Source: {doc.metadata.get('source', 'unknown')}\n{doc.page_content}"
for doc in docs
)4. Build the chain
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| rag_prompt
| llm
| StrOutputParser()
)5. Query
answer = rag_chain.invoke("What are the benefits of RAG?")
print(answer)
RAGAS Evaluation
RAGAS (Retrieval Augmented Generation Assessment) provides automated metrics for RAG systems.
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
)
from datasets import DatasetPrepare evaluation data
eval_data = {
"question": [
"What is RAG?",
"How does chunking affect retrieval?",
],
"answer": [
"RAG is a technique that augments LLM responses with retrieved documents.",
"Smaller chunks improve precision while larger chunks provide more context.",
],
"contexts": [
["RAG stands for Retrieval-Augmented Generation. It fetches relevant documents to ground LLM responses."],
["Chunk size is a critical parameter. Smaller chunks (200-500 chars) increase precision. Larger chunks (1000-2000 chars) preserve context."],
],
"ground_truth": [
"RAG enhances LLM outputs by retrieving relevant external documents and including them in the prompt.",
"Chunk size impacts retrieval quality: smaller chunks are more precise, larger chunks retain more surrounding context.",
],
}dataset = Dataset.from_dict(eval_data)
Run evaluation
results = evaluate(
dataset,
metrics=[
faithfulness, # Is the answer grounded in the context?
answer_relevancy, # Does the answer address the question?
context_precision, # Are retrieved docs relevant?
context_recall, # Were all relevant docs retrieved?
],
)print(results)
{'faithfulness': 0.92, 'answer_relevancy': 0.88,
'context_precision': 0.85, 'context_recall': 0.90}