Books/RAG Essentials/Building a RAG Pipeline

    Building a RAG Pipeline

    Building a RAG Pipeline

    Time to put everything together. In this tutorial, we will build a complete RAG pipeline from scratch — loading documents, splitting them into chunks, generating embeddings, storing them in a vector database, and generating grounded answers with an LLM.

    The Six Steps of a RAG Pipeline

    Step 1: Load Documents      → Get your data from files, URLs, databases
    Step 2: Chunk Documents      → Split large docs into smaller pieces
    Step 3: Generate Embeddings  → Convert chunks to vectors
    Step 4: Store in Vector DB   → Save vectors for fast retrieval
    Step 5: Query and Retrieve   → Find relevant chunks for a question
    Step 6: Generate Answer      → Use LLM with retrieved context
    

    Let's build each step.

    Step 1: Load Documents

    First, get your data into the pipeline. Documents can come from files, URLs, databases, or APIs.

    # Simple file loading
    def load_text_file(path: str) -> str:
        with open(path, "r", encoding="utf-8") as f:
            return f.read()
    
    # Load a directory of text files
    import os
    
    def load_directory(dir_path: str) -> list[dict]:
        documents = []
        for filename in os.listdir(dir_path):
            if filename.endswith((".txt", ".md")):
                filepath = os.path.join(dir_path, filename)
                content = load_text_file(filepath)
                documents.append({
                    "content": content,
                    "source": filename,
                    "path": filepath
                })
        return documents

    For PDFs and other formats, use libraries like PyPDF2, pdfplumber, or LangChain's document loaders:

    # Using LangChain for various file types
    from langchain_community.document_loaders import (
        PyPDFLoader,
        TextLoader,
        UnstructuredMarkdownLoader,
        WebBaseLoader
    )
    
    # Load a PDF
    pdf_loader = PyPDFLoader("company-handbook.pdf")
    pdf_docs = pdf_loader.load()
    
    # Load from a web page
    web_loader = WebBaseLoader("https://docs.example.com/faq")
    web_docs = web_loader.load()

    Step 2: Chunk Documents

    Large documents need to be split into smaller pieces. Why? Because:

    • Embedding models have token limits
    • Smaller chunks produce more precise retrieval
    • LLM context windows are limited
    def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
        """Split text into overlapping chunks."""
        chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            chunk = text[start:end]
            chunks.append(chunk)
            start = end - overlap    # overlap prevents cutting mid-sentence
        return chunks
    
    # Example
    text = load_text_file("company-handbook.txt")
    chunks = chunk_text(text, chunk_size=500, overlap=50)
    print(f"Document split into {len(chunks)} chunks")

    For smarter chunking, use LangChain's text splitters:

    from langchain.text_splitter import RecursiveCharacterTextSplitter
    
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,
        chunk_overlap=50,
        separators=["\n\n", "\n", ". ", " ", ""]
    )
    
    chunks = splitter.split_text(text)

    The recursive splitter tries to split on paragraph breaks first, then sentences, then words — keeping chunks as semantically coherent as possible.

    Step 3: Generate Embeddings

    Convert each chunk into a vector:

    from openai import OpenAI
    
    client = OpenAI()
    
    def get_embeddings(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
        """Generate embeddings for a batch of texts."""
        response = client.embeddings.create(model=model, input=texts)
        return [item.embedding for item in response.data]
    
    # Embed all chunks (batch for efficiency)
    chunk_embeddings = get_embeddings(chunks)
    print(f"Generated {len(chunk_embeddings)} embeddings of dimension {len(chunk_embeddings[0])}")

    Step 4: Store in Vector Database

    Store the chunks and their embeddings for fast retrieval. Here we will use ChromaDB:

    import chromadb
    
    client = chromadb.PersistentClient(path="./rag_data")
    
    collection = client.get_or_create_collection(name="knowledge_base")
    
    # Prepare data
    ids = [f"chunk-{i}" for i in range(len(chunks))]
    metadatas = [{"source": "company-handbook.txt", "chunk_index": i} for i in range(len(chunks))]
    
    # Store chunks with embeddings
    collection.add(
        ids=ids,
        documents=chunks,
        embeddings=chunk_embeddings,
        metadatas=metadatas
    )
    
    print(f"Stored {collection.count()} chunks in the vector database")

    Step 5: Query and Retrieve

    When a user asks a question, find the most relevant chunks:

    def retrieve(question: str, n_results: int = 5) -> list[str]:
        """Retrieve the most relevant chunks for a question."""
        results = collection.query(
            query_texts=[question],
            n_results=n_results
        )
        return results["documents"][0]
    
    # Example
    relevant_chunks = retrieve("What is our vacation policy?")
    for i, chunk in enumerate(relevant_chunks):
        print(f"--- Chunk {i+1} ---")
        print(chunk[:200] + "...")

    Step 6: Generate Answer with Context

    Pass the retrieved chunks to the LLM as context:

    def generate_answer(question: str, context_chunks: list[str]) -> str:
        """Generate an answer using retrieved context."""
        context = "\n\n---\n\n".join(context_chunks)
    
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": """You are a helpful assistant that answers questions
    based on the provided context. If the context doesn't contain enough
    information to answer the question, say "I don't have enough information
    to answer that question." Always cite which part of the context your
    answer is based on."""
                },
                {
                    "role": "user",
                    "content": f"""Context:
    {context}
    
    Question: {question}
    
    Answer:"""
                }
            ],
            temperature=0.2    # Lower temperature for more factual answers
        )
    
        return response.choices[0].message.content

    Full Working Example

    Here is the complete pipeline in one script:

    """
    Complete RAG Pipeline
    ---------------------
    Load → Chunk → Embed → Store → Retrieve → Generate
    """
    
    from openai import OpenAI
    import chromadb
    
    # ── Initialize clients ──────────────────────────────────
    openai_client = OpenAI()
    chroma_client = chromadb.PersistentClient(path="./rag_data")
    collection = chroma_client.get_or_create_collection(name="knowledge_base")
    
    # ── Step 1 & 2: Load and chunk documents ────────────────
    def load_and_chunk(file_path: str, chunk_size: int = 500, overlap: int = 50) -> list[dict]:
        with open(file_path, "r", encoding="utf-8") as f:
            text = f.read()
    
        chunks = []
        start = 0
        while start < len(text):
            end = start + chunk_size
            chunks.append({
                "content": text[start:end],
                "source": file_path,
                "chunk_index": len(chunks)
            })
            start = end - overlap
        return chunks
    
    # ── Step 3 & 4: Embed and store ────────────────────────
    def index_documents(chunks: list[dict]):
        texts = [c["content"] for c in chunks]
        response = openai_client.embeddings.create(
            model="text-embedding-3-small", input=texts
        )
        embeddings = [item.embedding for item in response.data]
    
        collection.add(
            ids=[f"chunk-{c['chunk_index']}" for c in chunks],
            documents=texts,
            embeddings=embeddings,
            metadatas=[{"source": c["source"], "index": c["chunk_index"]} for c in chunks]
        )
        print(f"Indexed {len(chunks)} chunks")
    
    # ── Step 5 & 6: Retrieve and generate ──────────────────
    def ask(question: str) -> str:
        # Retrieve
        results = collection.query(query_texts=[question], n_results=5)
        context = "\n\n---\n\n".join(results["documents"][0])
    
        # Generate
        response = openai_client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": "Answer based on the context provided. "
                 "If the context is insufficient, say so."},
                {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
            ],
            temperature=0.2
        )
        return response.choices[0].message.content
    
    # ── Run the pipeline ────────────────────────────────────
    if __name__ == "__main__":
        # Index your documents (run once)
        chunks = load_and_chunk("company-handbook.txt")
        index_documents(chunks)
    
        # Ask questions (run as many times as you like)
        answer = ask("What is the vacation policy?")
        print(answer)

    What to ask your AI: "Help me build a RAG pipeline that loads [my documents], chunks them, stores embeddings in [ChromaDB/Pinecone], and answers questions using [GPT-4o/Claude]."

    Common Improvements

    Once your basic pipeline works, consider these enhancements:

    • Metadata filtering — Filter by source, date, or category before vector search
    • Re-ranking — Use a cross-encoder to re-rank retrieved results for better precision
    • Hybrid search — Combine vector search with keyword search for better recall
    • Streaming responses — Stream the LLM output for a better user experience
    • Conversation history — Include previous Q&A pairs for multi-turn conversations

    What's Next?

    Your pipeline works, but how you chunk your documents has a huge impact on retrieval quality. Next, we dive deep into chunking strategies — the art of splitting documents for optimal RAG performance.


    🌐 www.genai-mentor.ai