Building a RAG Pipeline
Building a RAG Pipeline
Time to put everything together. In this tutorial, we will build a complete RAG pipeline from scratch — loading documents, splitting them into chunks, generating embeddings, storing them in a vector database, and generating grounded answers with an LLM.
The Six Steps of a RAG Pipeline
Step 1: Load Documents → Get your data from files, URLs, databases
Step 2: Chunk Documents → Split large docs into smaller pieces
Step 3: Generate Embeddings → Convert chunks to vectors
Step 4: Store in Vector DB → Save vectors for fast retrieval
Step 5: Query and Retrieve → Find relevant chunks for a question
Step 6: Generate Answer → Use LLM with retrieved context
Let's build each step.
Step 1: Load Documents
First, get your data into the pipeline. Documents can come from files, URLs, databases, or APIs.
# Simple file loading def load_text_file(path: str) -> str: with open(path, "r", encoding="utf-8") as f: return f.read() # Load a directory of text files import os def load_directory(dir_path: str) -> list[dict]: documents = [] for filename in os.listdir(dir_path): if filename.endswith((".txt", ".md")): filepath = os.path.join(dir_path, filename) content = load_text_file(filepath) documents.append({ "content": content, "source": filename, "path": filepath }) return documents
For PDFs and other formats, use libraries like PyPDF2, pdfplumber, or LangChain's document loaders:
# Using LangChain for various file types from langchain_community.document_loaders import ( PyPDFLoader, TextLoader, UnstructuredMarkdownLoader, WebBaseLoader ) # Load a PDF pdf_loader = PyPDFLoader("company-handbook.pdf") pdf_docs = pdf_loader.load() # Load from a web page web_loader = WebBaseLoader("https://docs.example.com/faq") web_docs = web_loader.load()
Step 2: Chunk Documents
Large documents need to be split into smaller pieces. Why? Because:
- Embedding models have token limits
- Smaller chunks produce more precise retrieval
- LLM context windows are limited
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]: """Split text into overlapping chunks.""" chunks = [] start = 0 while start < len(text): end = start + chunk_size chunk = text[start:end] chunks.append(chunk) start = end - overlap # overlap prevents cutting mid-sentence return chunks # Example text = load_text_file("company-handbook.txt") chunks = chunk_text(text, chunk_size=500, overlap=50) print(f"Document split into {len(chunks)} chunks")
For smarter chunking, use LangChain's text splitters:
from langchain.text_splitter import RecursiveCharacterTextSplitter splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=50, separators=["\n\n", "\n", ". ", " ", ""] ) chunks = splitter.split_text(text)
The recursive splitter tries to split on paragraph breaks first, then sentences, then words — keeping chunks as semantically coherent as possible.
Step 3: Generate Embeddings
Convert each chunk into a vector:
from openai import OpenAI client = OpenAI() def get_embeddings(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]: """Generate embeddings for a batch of texts.""" response = client.embeddings.create(model=model, input=texts) return [item.embedding for item in response.data] # Embed all chunks (batch for efficiency) chunk_embeddings = get_embeddings(chunks) print(f"Generated {len(chunk_embeddings)} embeddings of dimension {len(chunk_embeddings[0])}")
Step 4: Store in Vector Database
Store the chunks and their embeddings for fast retrieval. Here we will use ChromaDB:
import chromadb client = chromadb.PersistentClient(path="./rag_data") collection = client.get_or_create_collection(name="knowledge_base") # Prepare data ids = [f"chunk-{i}" for i in range(len(chunks))] metadatas = [{"source": "company-handbook.txt", "chunk_index": i} for i in range(len(chunks))] # Store chunks with embeddings collection.add( ids=ids, documents=chunks, embeddings=chunk_embeddings, metadatas=metadatas ) print(f"Stored {collection.count()} chunks in the vector database")
Step 5: Query and Retrieve
When a user asks a question, find the most relevant chunks:
def retrieve(question: str, n_results: int = 5) -> list[str]: """Retrieve the most relevant chunks for a question.""" results = collection.query( query_texts=[question], n_results=n_results ) return results["documents"][0] # Example relevant_chunks = retrieve("What is our vacation policy?") for i, chunk in enumerate(relevant_chunks): print(f"--- Chunk {i+1} ---") print(chunk[:200] + "...")
Step 6: Generate Answer with Context
Pass the retrieved chunks to the LLM as context:
def generate_answer(question: str, context_chunks: list[str]) -> str: """Generate an answer using retrieved context.""" context = "\n\n---\n\n".join(context_chunks) response = client.chat.completions.create( model="gpt-4o", messages=[ { "role": "system", "content": """You are a helpful assistant that answers questions based on the provided context. If the context doesn't contain enough information to answer the question, say "I don't have enough information to answer that question." Always cite which part of the context your answer is based on.""" }, { "role": "user", "content": f"""Context: {context} Question: {question} Answer:""" } ], temperature=0.2 # Lower temperature for more factual answers ) return response.choices[0].message.content
Full Working Example
Here is the complete pipeline in one script:
""" Complete RAG Pipeline --------------------- Load → Chunk → Embed → Store → Retrieve → Generate """ from openai import OpenAI import chromadb # ── Initialize clients ────────────────────────────────── openai_client = OpenAI() chroma_client = chromadb.PersistentClient(path="./rag_data") collection = chroma_client.get_or_create_collection(name="knowledge_base") # ── Step 1 & 2: Load and chunk documents ──────────────── def load_and_chunk(file_path: str, chunk_size: int = 500, overlap: int = 50) -> list[dict]: with open(file_path, "r", encoding="utf-8") as f: text = f.read() chunks = [] start = 0 while start < len(text): end = start + chunk_size chunks.append({ "content": text[start:end], "source": file_path, "chunk_index": len(chunks) }) start = end - overlap return chunks # ── Step 3 & 4: Embed and store ──────────────────────── def index_documents(chunks: list[dict]): texts = [c["content"] for c in chunks] response = openai_client.embeddings.create( model="text-embedding-3-small", input=texts ) embeddings = [item.embedding for item in response.data] collection.add( ids=[f"chunk-{c['chunk_index']}" for c in chunks], documents=texts, embeddings=embeddings, metadatas=[{"source": c["source"], "index": c["chunk_index"]} for c in chunks] ) print(f"Indexed {len(chunks)} chunks") # ── Step 5 & 6: Retrieve and generate ────────────────── def ask(question: str) -> str: # Retrieve results = collection.query(query_texts=[question], n_results=5) context = "\n\n---\n\n".join(results["documents"][0]) # Generate response = openai_client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": "Answer based on the context provided. " "If the context is insufficient, say so."}, {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"} ], temperature=0.2 ) return response.choices[0].message.content # ── Run the pipeline ──────────────────────────────────── if __name__ == "__main__": # Index your documents (run once) chunks = load_and_chunk("company-handbook.txt") index_documents(chunks) # Ask questions (run as many times as you like) answer = ask("What is the vacation policy?") print(answer)
What to ask your AI: "Help me build a RAG pipeline that loads [my documents], chunks them, stores embeddings in [ChromaDB/Pinecone], and answers questions using [GPT-4o/Claude]."
Common Improvements
Once your basic pipeline works, consider these enhancements:
- Metadata filtering — Filter by source, date, or category before vector search
- Re-ranking — Use a cross-encoder to re-rank retrieved results for better precision
- Hybrid search — Combine vector search with keyword search for better recall
- Streaming responses — Stream the LLM output for a better user experience
- Conversation history — Include previous Q&A pairs for multi-turn conversations
What's Next?
Your pipeline works, but how you chunk your documents has a huge impact on retrieval quality. Next, we dive deep into chunking strategies — the art of splitting documents for optimal RAG performance.