Skip to main content

Production RAG Pipelines

End-to-end RAG, document loading, chunking strategies, retrieval strategies (dense/sparse/hybrid/reranking), query transformation, RAGAS evaluation

~55 min
Listen to this lesson

Production RAG Pipelines

Retrieval-Augmented Generation (RAG) enhances LLM responses by grounding them in relevant external knowledge. Instead of relying solely on the model's training data, RAG fetches relevant documents and injects them into the prompt --- dramatically reducing hallucinations and enabling answers over private data.

The RAG Pipeline

     User Query
         |
    [1. Query Processing]
         |
    [2. Retrieval] -----> Vector DB / Search Index
         |
    [3. Context Assembly]
         |
    [4. Generation] -----> LLM
         |
     Response

Each stage offers multiple strategies and optimizations. Let's explore them in depth.

The RAG Triad

Every RAG system is evaluated on three axes: (1) Context Relevance --- did you retrieve the right documents? (2) Groundedness --- is the answer supported by the retrieved context? (3) Answer Relevance --- does the answer actually address the user's question? Optimizing all three simultaneously is the core challenge of RAG engineering.

Document Loading

The first step is ingesting documents from various sources into your pipeline.

from langchain_community.document_loaders import (
    PyPDFLoader,
    TextLoader,
    CSVLoader,
    UnstructuredHTMLLoader,
    WebBaseLoader,
    DirectoryLoader,
    NotionDirectoryLoader,
)

Load a PDF

pdf_loader = PyPDFLoader("report.pdf") pdf_docs = pdf_loader.load() print(f"Loaded {len(pdf_docs)} pages from PDF") print(f"Page 1 content: {pdf_docs[0].page_content[:200]}") print(f"Metadata: {pdf_docs[0].metadata}") # {'source': 'report.pdf', 'page': 0}

Load a website

web_loader = WebBaseLoader("https://example.com/article") web_docs = web_loader.load()

Load an entire directory of files

dir_loader = DirectoryLoader( "./documents/", glob="**/*.txt", loader_cls=TextLoader, show_progress=True, ) all_docs = dir_loader.load() print(f"Loaded {len(all_docs)} documents from directory")

Chunking Strategies

Documents must be split into chunks that are small enough to be relevant but large enough to be coherent. This is one of the most impactful decisions in RAG pipeline design.

Fixed-Size Chunking

from langchain.text_splitter import RecursiveCharacterTextSplitter

The most common and generally effective splitter

splitter = RecursiveCharacterTextSplitter( chunk_size=1000, # Target size in characters chunk_overlap=200, # Overlap between consecutive chunks separators=["\n\n", "\n", ". ", " ", ""], # Split priority length_function=len, )

text = """ Chapter 1: Introduction to Machine Learning

Machine learning is a branch of artificial intelligence... [imagine a long document here] """

chunks = splitter.split_text(text) print(f"Created {len(chunks)} chunks") for i, chunk in enumerate(chunks[:3]): print(f"Chunk {i}: {len(chunk)} chars --- {chunk[:80]}...")

Semantic Chunking

Groups content by meaning rather than character count:

from langchain_experimental.text_splitter import SemanticChunker
from langchain_openai import OpenAIEmbeddings

Split based on semantic similarity between sentences

semantic_splitter = SemanticChunker( embeddings=OpenAIEmbeddings(), breakpoint_threshold_type="percentile", breakpoint_threshold_amount=85, # Split at 85th percentile of dissimilarity )

semantic_chunks = semantic_splitter.split_text(text) print(f"Created {len(semantic_chunks)} semantic chunks")

Document-Structure-Aware Chunking

from langchain.text_splitter import MarkdownHeaderTextSplitter

headers_to_split_on = [ ("#", "h1"), ("##", "h2"), ("###", "h3"), ]

md_splitter = MarkdownHeaderTextSplitter( headers_to_split_on=headers_to_split_on, strip_headers=False, )

markdown_text = """

Machine Learning

Supervised Learning

Classification

Classification assigns inputs to discrete categories...

Regression

Regression predicts continuous values...

Unsupervised Learning

Clustering

Clustering groups similar data points... """

md_chunks = md_splitter.split_text(markdown_text) for chunk in md_chunks: print(f"Headers: {chunk.metadata} | Content: {chunk.page_content[:60]}...")

Chunking Best Practices

Start with RecursiveCharacterTextSplitter (chunk_size=1000, overlap=200) as a solid baseline. Use semantic chunking when document structure is inconsistent. Always preserve metadata (source file, page number, section header) through the chunking process. Test different chunk sizes --- smaller chunks (200-500 chars) improve precision, larger chunks (1000-2000 chars) improve context richness. For code, use language-aware splitters that respect function and class boundaries.

Retrieval Strategies

How you retrieve chunks is just as important as how you create them.

Dense Retrieval (Vector Search)

Uses embedding similarity --- the standard approach:

from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

Create vector store from documents

vectorstore = Chroma.from_documents( documents=chunks, embedding=embeddings, collection_name="my_docs", )

Basic similarity search

retriever = vectorstore.as_retriever( search_type="similarity", search_kwargs={"k": 5}, )

docs = retriever.invoke("How does attention work in transformers?")

Sparse Retrieval (BM25 Keyword Search)

Good for exact term matching (names, IDs, technical terms):

from langchain_community.retrievers import BM25Retriever

bm25_retriever = BM25Retriever.from_documents(chunks, k=5) bm25_results = bm25_retriever.invoke("BERT attention mechanism")

Hybrid Retrieval (Dense + Sparse)

Combines the best of both worlds:

from langchain.retrievers import EnsembleRetriever

Combine dense and sparse retrievers with weights

hybrid_retriever = EnsembleRetriever( retrievers=[retriever, bm25_retriever], weights=[0.6, 0.4], # 60% vector, 40% keyword )

results = hybrid_retriever.invoke("BERT attention mechanism")

Reranking

After initial retrieval, a cross-encoder reranks results for better precision:

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CrossEncoderReranker
from langchain_community.cross_encoders import HuggingFaceCrossEncoder

Load a cross-encoder reranker

cross_encoder = HuggingFaceCrossEncoder(model_name="cross-encoder/ms-marco-MiniLM-L-6-v2") reranker = CrossEncoderReranker(model=cross_encoder, top_n=3)

Wrap the base retriever with reranking

reranking_retriever = ContextualCompressionRetriever( base_compressor=reranker, base_retriever=hybrid_retriever, # Retrieve 20 candidates, rerank to top 3 )

reranked_docs = reranking_retriever.invoke("How does self-attention work?")

Retrieval is the Bottleneck

In production RAG systems, retrieval quality accounts for roughly 80% of the final answer quality. A perfect LLM with bad retrieval will hallucinate or give irrelevant answers. Always invest in retrieval before trying to optimize the generation prompt.

Query Transformation

Raw user queries are often vague, incomplete, or poorly suited for embedding search. Query transformation techniques rewrite or expand queries to improve retrieval.

HyDE (Hypothetical Document Embeddings)

Generate a hypothetical answer, then use THAT as the search query:

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

llm = ChatOpenAI(model="gpt-4o-mini")

hyde_prompt = ChatPromptTemplate.from_template( """Please write a short passage that would answer the following question. Do not explain that you are writing a passage. Just write the passage directly.

Question: {question}

Passage:""" )

def hyde_retrieval(question: str, retriever): """Use HyDE to improve retrieval.""" # Step 1: Generate a hypothetical document chain = hyde_prompt | llm hypothetical_doc = chain.invoke({"question": question}).content

# Step 2: Use the hypothetical doc as the search query # (it's closer in embedding space to the real answer) results = retriever.invoke(hypothetical_doc) return results

docs = hyde_retrieval("What is RLHF?", retriever)

Multi-Query Retrieval

Generate multiple query variations to improve recall:

from langchain.retrievers.multi_query import MultiQueryRetriever

multi_query_retriever = MultiQueryRetriever.from_llm( retriever=vectorstore.as_retriever(search_kwargs={"k": 5}), llm=ChatOpenAI(model="gpt-4o-mini"), )

Internally generates ~3 query variations

e.g., "What is RLHF?" might also search:

- "How does reinforcement learning from human feedback work?"

- "Explain the RLHF training process for language models"

docs = multi_query_retriever.invoke("What is RLHF?")

Step-Back Prompting

Retrieve context for a more general version of the question:

step_back_prompt = ChatPromptTemplate.from_template(
    """You are an expert at generating step-back questions.
    Given a specific question, generate a more general question that,
    if answered, would help answer the original question.

Original question: {question} Step-back question:""" )

def step_back_retrieval(question: str, retriever, llm): chain = step_back_prompt | llm general_question = chain.invoke({"question": question}).content

# Retrieve for both original and step-back question original_docs = retriever.invoke(question) general_docs = retriever.invoke(general_question)

# Combine and deduplicate all_docs = original_docs + general_docs seen = set() unique_docs = [] for doc in all_docs: if doc.page_content not in seen: seen.add(doc.page_content) unique_docs.append(doc) return unique_docs

End-to-End RAG Pipeline

Putting it all together into a production-ready pipeline:

from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

1. Setup

embeddings = OpenAIEmbeddings(model="text-embedding-3-small") vectorstore = Chroma( collection_name="my_knowledge_base", embedding_function=embeddings, persist_directory="./chroma_db", ) retriever = vectorstore.as_retriever(search_kwargs={"k": 5}) llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

2. RAG Prompt

rag_prompt = ChatPromptTemplate.from_template( """You are a helpful assistant. Answer the question based ONLY on the following context. If the context doesn't contain the answer, say "I don't have enough information to answer that question."

Context: {context}

Question: {question}

Answer:""" )

3. Format retrieved documents

def format_docs(docs): return "\n\n---\n\n".join( f"Source: {doc.metadata.get('source', 'unknown')}\n{doc.page_content}" for doc in docs )

4. Build the chain

rag_chain = ( {"context": retriever | format_docs, "question": RunnablePassthrough()} | rag_prompt | llm | StrOutputParser() )

5. Query

answer = rag_chain.invoke("What are the benefits of RAG?") print(answer)

RAGAS Evaluation

RAGAS (Retrieval Augmented Generation Assessment) provides automated metrics for RAG systems.

from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall,
)
from datasets import Dataset

Prepare evaluation data

eval_data = { "question": [ "What is RAG?", "How does chunking affect retrieval?", ], "answer": [ "RAG is a technique that augments LLM responses with retrieved documents.", "Smaller chunks improve precision while larger chunks provide more context.", ], "contexts": [ ["RAG stands for Retrieval-Augmented Generation. It fetches relevant documents to ground LLM responses."], ["Chunk size is a critical parameter. Smaller chunks (200-500 chars) increase precision. Larger chunks (1000-2000 chars) preserve context."], ], "ground_truth": [ "RAG enhances LLM outputs by retrieving relevant external documents and including them in the prompt.", "Chunk size impacts retrieval quality: smaller chunks are more precise, larger chunks retain more surrounding context.", ], }

dataset = Dataset.from_dict(eval_data)

Run evaluation

results = evaluate( dataset, metrics=[ faithfulness, # Is the answer grounded in the context? answer_relevancy, # Does the answer address the question? context_precision, # Are retrieved docs relevant? context_recall, # Were all relevant docs retrieved? ], )

print(results)

{'faithfulness': 0.92, 'answer_relevancy': 0.88,

'context_precision': 0.85, 'context_recall': 0.90}

RAGAS Metrics Explained

Faithfulness: Measures if the generated answer is supported by the retrieved context (reduces hallucination). Answer Relevancy: Checks if the answer actually addresses the user's question. Context Precision: Evaluates if the retrieved chunks are relevant (high = fewer irrelevant chunks). Context Recall: Measures if all necessary information was retrieved (high = nothing important was missed). Aim for all metrics above 0.8 in production.