3.6.4.9. RAG from Scratch#

Authored by Kalyan KS. To stay updated with LLM, RAG and Agent updates, you can follow me on Twitter.

  • Step-1 : Extract text

  • Step-2 : Chunk the extracted text

  • Step-3 : Create a vector store with the chunks

  • Step-4 : Create a retriever which returns the relevant chunks

  • Step-5 : Build context from the relevant chunk texts

  • Step-6 : Build the RAG pipeline

  • Step-7 : Run the RAG pipeline to get the answer.

3.6.4.9.1. Install libraries#

!pip install -qU PyPDF2 chromadb litellm
?25l     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 0.0/67.3 kB ? eta -:--:--
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 67.3/67.3 kB 2.5 MB/s eta 0:00:00
?25h  Installing build dependencies ... ?25l?25hdone
  Getting requirements to build wheel ... ?25l?25hdone
  Preparing metadata (pyproject.toml) ... ?25l?25hdone
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 232.6/232.6 kB 8.4 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 611.1/611.1 kB 21.0 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.4/2.4 MB 52.5 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 6.8/6.8 MB 54.9 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 278.6/278.6 kB 14.2 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 94.8/94.8 kB 5.4 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.0/2.0 MB 43.2 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 101.6/101.6 kB 5.8 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 13.3/13.3 MB 55.9 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 55.9/55.9 kB 2.8 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 177.4/177.4 kB 8.5 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 65.0/65.0 kB 3.4 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 118.7/118.7 kB 6.7 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 73.0/73.0 kB 3.7 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.2/1.2 MB 32.5 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.3/62.3 kB 3.2 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 459.8/459.8 kB 20.6 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 319.7/319.7 kB 16.5 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 71.5/71.5 kB 3.3 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 4.0/4.0 MB 39.2 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 452.6/452.6 kB 13.0 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 46.0/46.0 kB 2.3 MB/s eta 0:00:00
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 86.8/86.8 kB 4.0 MB/s eta 0:00:00
?25h  Building wheel for pypika (pyproject.toml) ... ?25l?25hdone

3.6.4.9.2. Set up the LLM API Key#

from google.colab import userdata
import os
os.environ['OPENAI_API_KEY'] = userdata.get("OPENAI_API_KEY")

3.6.4.9.3. Extract Text#

from PyPDF2 import PdfReader
from typing import List
from PyPDF2 import PdfReader

def text_extract(pdf_path: str) -> str:
    """
    Extracts text from all pages of a given PDF file.

    Args:
        pdf_path (str): Path to the PDF file.

    Returns:
        str: Extracted text from the PDF, concatenated with newline separators.
    """

    # An empty list to store extracted text from PDF pages
    pdf_pages = []

    # Open the PDF file in binary read mode
    with open(pdf_path, 'rb') as file:

        # Create a PdfReader object to read the PDF
        pdf_reader = PdfReader(file)

        # Iterate through all pages in the PDF
        for page in pdf_reader.pages:

            # Extract text from the current page
            text = page.extract_text()

            # Append the extracted text to the list
            pdf_pages.append(text)

    # Join all extracted text using newline separator
    pdf_text = "\n".join(pdf_pages)

    # Return the extracted text as a single string
    return pdf_text
# Download the PDF file
import requests

pdf_url = 'https://proceedings.neurips.cc/paper_files/paper/2017/file/3f5ee243547dee91fbd053c1c4a845aa-Paper.pdf'
response = requests.get(pdf_url)

pdf_path = 'attention_is_all_you_need.pdf'
with open(pdf_path, 'wb') as file:
    file.write(response.content)
pdf_text = text_extract(pdf_path)
print(pdf_text[:300])
Attention Is All You Need
Ashish Vaswani
Google Brain
avaswani@google.comNoam Shazeer
Google Brain
noam@google.comNiki Parmar
Google Research
nikip@google.comJakob Uszkoreit
Google Research
usz@google.com
Llion Jones
Google Research
llion@google.comAidan N. Gomezy
University of Toronto
aidan@c

3.6.4.9.4. Chunk Text#

from typing import List
import re
from collections import deque


def text_chunk(text: str, max_length: int = 1000) -> List[str]:
    """
    Splits a given text into chunks while ensuring that sentences remain intact.

    The function maintains sentence boundaries by splitting based on punctuation
    (. ! ?) and attempts to fit as many sentences as possible within `max_length`
    per chunk.

    Args:
        text (str): The input text to be chunked.
        max_length (int, optional): Maximum length of each chunk. Default is 1000.

    Returns:
        List[str]: A list of text chunks, each containing full sentences.
    """

    # Split text into sentences while ensuring punctuation (. ! ?) stays at the end
    sentences = deque(re.split(r'(?<=[.!?])\s+', text.replace('\n', ' ')))

    # An empty list to store the final chunks
    chunks = []

    # Temporary string to hold the current chunk
    chunk_text = ""

    while sentences:
        # Access sentence from the deque and strip any extra spaces
        sentence = sentences.popleft().strip()

        # Check if the sentence is non-empty before processing
        if sentence:
            # If adding this sentence exceeds max_length and chunk_text is not empty, store the current chunk
            if len(chunk_text) + len(sentence) > max_length and chunk_text:

                # Save the current chunk
                chunks.append(chunk_text)

                # Start a new chunk with the current sentence
                chunk_text = sentence
            else:
                # Append the sentence to the current chunk with a space
                chunk_text += " " + sentence

    # Add the last chunk if there's any remaining text
    if chunk_text:
        chunks.append(chunk_text)

    return chunks
chunks = text_chunk(pdf_text)
print(f"Number of chunks ={len(chunks)}")
print(chunks[0])
Number of chunks =36
 Attention Is All You Need Ashish Vaswani Google Brain avaswani@google.comNoam Shazeer Google Brain noam@google.comNiki Parmar Google Research nikip@google.comJakob Uszkoreit Google Research usz@google.com Llion Jones Google Research llion@google.comAidan N. Gomezy University of Toronto aidan@cs.toronto.eduŁukasz Kaiser Google Brain lukaszkaiser@google.com Illia Polosukhinz illia.polosukhin@gmail.com Abstract The dominant sequence transduction models are based on complex recurrent or convolutional neural networks that include an encoder and a decoder. The best performing models also connect the encoder and decoder through an attention mechanism. We propose a new simple network architecture, the Transformer, based solely on attention mechanisms, dispensing with recurrence and convolutions entirely. Experiments on two machine translation tasks show these models to be superior in quality while being more parallelizable and requiring significantly less time to train.

3.6.4.9.5. Create the Vector Store#

# Set up Chromadb
import chromadb
from chromadb.utils import embedding_functions
from chromadb.api.models import Collection

def create_vector_store(db_path: str) -> Collection:
    """
    Creates a persistent ChromaDB vector store with OpenAI embeddings.

    Args:
        db_path (str): Path where the ChromaDB database will be stored.

    Returns:
        Collection: A ChromaDB collection object for storing and retrieving embedded vectors.
    """

    # Initialize a ChromaDB PersistentClient with the specified database path
    client = chromadb.PersistentClient(path=db_path)

    # Create an embedding function using OpenAI's text embedding model
    embeddings = embedding_functions.OpenAIEmbeddingFunction(
        api_key=userdata.get("OPENAI_API_KEY"),  # Retrieve API key from user data
        model_name="text-embedding-3-small"  # Specify the embedding model
    )

    # Create a new collection in the ChromaDB database with the embedding function
    db = client.create_collection(
        name="pdf_chunks",  # Name of the collection where embeddings will be stored
        embedding_function=embeddings  # Apply the embedding function
    )

    # Return the created ChromaDB collection
    return db
# Insert chunks into vector store
import os
import uuid

def insert_chunks_vectordb(chunks: List[str], db: Collection, file_path: str) -> None:
    """
    Inserts text chunks into a ChromaDB vector store with metadata.

    Args:
        chunks (List[str]): List of text chunks to be stored.
        db (Collection): The ChromaDB collection where the chunks will be inserted.
        file_path (str): Path of the source file for metadata.

    Returns:
        None
    """

    # Extract the file name from the given file path
    file_name = os.path.basename(file_path)

    # Generate unique IDs for each chunk
    id_list = [str(uuid.uuid4()) for _ in range(len(chunks))]

    # Create metadata for each chunk, storing the chunk index and source file name
    metadata_list = [{"chunk": i, "source": file_name} for i in range(len(chunks))]

    # Define batch size for inserting chunks to optimize performance
    batch_size = 40

    # Insert chunks into the database in batches
    for i in range(0, len(chunks), batch_size):
        end_id = min(i + batch_size, len(chunks))  # Ensure we don't exceed list length

        # Add the batch of chunks to the vector store
        db.add(
            documents=chunks[i:end_id],
            metadatas=metadata_list[i:end_id],
            ids=id_list[i:end_id]
        )

    print(f"{len(chunks)} chunks added to the vector store")

3.6.4.9.6. Retrieve Chunks#

from typing import Any, List

def retrieve_chunks(db: Collection, query: str, n_results: int = 2) -> List[Any]:
    """
    Retrieves relevant chunks from the  vector store for the given query.

    Args:
        db (Collection): The vector store object
        query (str): The search query text.
        n_results (int, optional): The number of relevant chunks to retrieve. Defaults to 2.

    Returns:
        List[Any]: A list of relevant chunks retrieved from the vector store.
    """

    # Perform a query on the database to get the most relevant chunks
    relevant_chunks = db.query(query_texts=[query], n_results=n_results)

    # Return the retrieved relevant chunks
    return relevant_chunks

3.6.4.9.7. Build Context#


def build_context(relevant_chunks) -> str:
    """
    Builds a single context string by combining texts from relevant chunks.

    Args:
        relevant_chunks: relevant chunks retrieved from the vector store.

    Returns:
        str: A single string containing all document chunks combined with newline separators.
    """

    # combine the text from relevant chunks with newline separator
    context = "\n".join(relevant_chunks['documents'][0])

    # Return the combined context string
    return context

3.6.4.9.8. Build RAG Pipeline#

import os
from typing import Tuple

def get_context(pdf_path: str, query: str, db_path: str) -> Tuple[str, str]:
    """
    Retrieves the relevant chunks from the vector store and then builds context from them.

    Args:
        pdf_path (str): The file path to the PDF document.
        query (str): The query string to search within the vector store.
        db_path (str): The file path to the persistent vector store database.

    Returns:
        Tuple[str, str]: A tuple containing the context related to the query and the original query string.
    """

    # Check if the vector store already exists
    if os.path.exists(db_path):
        print("Loading existing vector store...")

        # Initialize the persistent client for the existing database
        client = chromadb.PersistentClient(path=db_path)

        # Create the embedding function using OpenAI embeddings
        embeddings = embedding_functions.OpenAIEmbeddingFunction(
            api_key=userdata.get("OPENAI_API_KEY"),  # Fetch API key from userdata
            model_name="text-embedding-3-small"      # Specify the embedding model
        )

        # Get the collection of PDF chunks from the existing vector store
        db = client.get_collection(name="pdf_chunks", embedding_function=embeddings)
    else:
        print("Creating new vector store...")

        # Extract text from the provided PDF
        pdf_text = text_extract(pdf_path)

        # Chunk the extracted text
        chunks = text_chunk(pdf_text)

        # Create a new vector store
        db = create_vector_store(db_path)

        # Insert the text chunks into the vector store
        insert_chunks_vectordb(chunks, db, pdf_path)

    # Retrieve the relevant chunks based on the query
    relevant_chunks = retrieve_chunks(db, query)

    # Build the context from the relevant chunks
    context = build_context(relevant_chunks)

    # Return the context and the original query
    return context, query

def get_prompt(context: str, query: str) -> str:
    """
    Generates a rag prompt based on the given context and query.

    Args:
        context (str): The context the LLM should use to answer the question.
        query (str): The user query that needs to be answered based on the context.

    Returns:
        str: The generated rag prompt.
    """

    # Format the prompt with the provided context and query
    rag_prompt = f""" You are an AI model trained for question answering. You should answer the
    given question based on the given context only.
    Question : {query}
    \n
    Context : {context}
    \n
    If the answer is not present in the given context, respond as: The answer to this question is not available
    in the provided content.
    """

    # Return the formatted prompt
    return rag_prompt
from litellm import completion

def get_response(rag_prompt: str) -> str:
    """
    Sends a prompt to the OpenAI LLM and returns the answer.

    Args:
        rag_prompt (str): The rag prompt.

    Returns:
        str: The LLM generated answer.
    """
    # Specify the LLM to use
    model = "openai/gpt-4o-mini"

    # Prepare the message to be sent to the model
    messages = [{"role": "user", "content": rag_prompt}]

    # Call the completion function to get a response from the model
    response = completion(model=model, messages=messages, temperature=0)

    # Return the answer
    answer = response.choices[0].message.content
    return answer

def rag_pipeline(pdf_path: str, query: str, db_path: str) -> str:
    """
    Runs a Retrieval-Augmented Generation (RAG) pipeline to retrieve context from a vector store,
    generate the rag prompt, and then get the answer from the model.

    Args:
        pdf_path (str): The file path to the PDF document from which context is extracted.
        query (str): The query for which a response is needed, based on the context.
        db_path (str): The file path to the persistent vector store database used for context retrieval.

    Returns:
        str: The model's response based on the context and the provided query.
    """

    # get the context
    context, query = get_context(pdf_path, query, db_path)

    # Generate the rag prompt based on the context and query
    rag_prompt = get_prompt(context, query)

    # Get the response from the model using the rag prompt
    response = get_response(rag_prompt)

    # Return the model's response
    return response

3.6.4.9.9. Run RAG Pipeline#

# Set the chroma DB path
current_dir = "/content/rag"
persistent_directory = os.path.join(current_dir, "db", "chroma_db_pdf")

# PDF path
pdf_path = "/content/attention_is_all_you_need.pdf"

# RAG query
query = "What is the transformer architecture?"
# Run the RAG pipeline
answer = rag_pipeline(pdf_path, query, persistent_directory)
Creating new vector store...
36 chunks added to the vector store
print(f"Query:{query}")
print(f"Generated answer:{answer}")
Query:What is the transformer architecture?
Generated answer:The Transformer architecture is a model that relies entirely on an attention mechanism to draw global dependencies between input and output, eschewing recurrence. It consists of stacked self-attention and point-wise, fully connected layers for both the encoder and decoder. The encoder is composed of a stack of six identical layers, each with a multi-head self-attention mechanism and a position-wise fully connected feed-forward network, along with residual connections and layer normalization. This architecture allows for significant parallelization and achieves state-of-the-art translation quality with relatively short training times.
# RAG query
query = "What is self-attention?"

# Run the RAG pipeline
answer = rag_pipeline(pdf_path, query, persistent_directory)

print(f"Query:{query}")
print(f"Generated answer:{answer}")
Loading existing vector store...
Query:What is self-attention?
Generated answer:Self-attention, sometimes called intra-attention, is an attention mechanism that relates different positions of a single sequence in order to compute a representation of that sequence. It has been successfully used in various tasks, including reading comprehension, abstractive summarization, textual entailment, and learning task-independent sentence representations.