Q&A on Custom Dataset Using Retrieval-Augmented Generation (RAG) with GPT-J Embeddings

Published:

Introduction

I have been working on an exciting project using SageMaker for Natural Language Processing (NLP). With SageMaker, I built a powerful text embedding model that can understand and analyze textual data from a custom dateset. By leveraging state-of-the-art RAG algorithm and the good old KNN, I developed a system capable of generating meaningful and factually correct answers from a given textual dataset.

!pip install --upgrade sagemaker --quiet
!pip install ipywidgets==7.0.0 --quiet
import time
import sagemaker
import boto3
import json
from sagemaker.session import Session as SM_Session
from sagemaker.model import Model as SM_Model
from sagemaker import (
    image_uris,
    model_uris,
    script_uris,
    hyperparameters
)
from sagemaker.predictor import Predictor as SM_Predictor
from sagemaker.utils import name_from_base as sm_name_from_base

sm_session = SM_Session()
sm_role = sm_session.get_caller_identity_arn()
sm_region = boto3.Session().region_name
sm_session = sagemaker.Session()
sm_version = "*"


Document Retrieval and LLM Integration

In this project, the primary objective is to leverage document embeddings to retrieve the most relevant documents from our extensive knowledge library. These documents will then be combined with the prompt I provide to the Language Model (LLM) for further analysis and processing.

To achieve this, I have outlined the following steps:

1. Generating Document Embeddings

I will utilize the powerful GPT-J-6B embedding model to generate embeddings for each document in our knowledge library. These embeddings capture the semantic meaning and contextual information of the documents.

2. Identifing Top K Relevant Documents

Given a user query, I will follow these steps to identify the top K most relevant documents:

  • I will generate an embedding for the user query using the same GPT-J-6B embedding model.
  • Utilizing the SageMaker KNN algorithm, I will search for the top K most relevant documents based on the embedding space. This algorithm allows me to efficiently and accurately retrieve documents that closely match the query.
  • Using the indexes obtained from the KNN algorithm, I will retrieve the corresponding documents.

3. Combining Retrieved Documents with Prompt and Question

To provide a comprehensive input to the LLM, I will merge the retrieved documents with the prompt and question. This consolidated input will be utilized to further analyze and process the information using the LLM. It is crucial to ensure that the combined document and text are of an appropriate size, containing enough information to answer the question while adhering to the maximum sequence length of 1024 tokens required by the LLM.

By following these steps, I aim to enhance the information retrieval process by leveraging document embeddings and harnessing the capabilities of the GPT-J-6B model. This approach empowers me to extract relevant information and create a more comprehensive input for the LLM, enabling accurate analysis and insightful responses.

Please note that the retrieved documents need to strike a balance between containing sufficient information and fitting within the constraints of the LLM’s maximum sequence length of 1024 tokens. This ensures efficient processing and optimal utilization of the LLM’s capabilities.

Deploying the model endpoint for GPT-J-6B embedding model

import random
import string
import sagemaker
import boto3
from sagemaker.model import Model as SM_Model
from sagemaker import image_uris, model_uris
from sagemaker.session import Session as SM_Session
from sagemaker.predictor import Predictor as SM_Predictor

def generate_random_string(length):
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(length))

model_id, model_version = "text-embedding-gpt-j-6b", "*"
endpoint_name = generate_random_string(10)

inference_instance_type = "ml.g5.24xlarge"

deploy_image_uri = image_uris.retrieve(
    region=boto3.Session().region_name,
    framework="pytorch",
    image_scope="inference",
    model_id=model_id,
    model_version=model_version,
    instance_type=inference_instance_type,
)

model_uri = model_uris.retrieve(
    model_id=model_id,
    model_version=model_version,
    model_scope="inference"
)

model = SM_Model(
    image_uri=deploy_image_uri,
    model_data=model_uri,
    role=sm_role,
    name=endpoint_name,
    env={"TS_DEFAULT_WORKERS_PER_MODEL": "1"}
)

model_predictor = model.deploy(
    initial_instance_count=1,
    instance_type=inference_instance_type,
    endpoint_name=endpoint_name,
)



def query_endpoint(encoded_json, endpoint, content_type="application/json"):
    client = boto3.client("runtime.sagemaker")
    response = client.invoke_endpoint(
        EndpointName=endpoint, ContentType=content_type, Body=encoded_json
    )
    return response

def parse_query_response(response):
    predictions = json.loads(response["Body"].read())
    embeddings = predictions["embedding"]
    return embeddings

def build_embedding_table(df, endpoint, column, batch_size=10):
    embeddings = []
    for idx in tqdm(range(0, df.shape[0], batch_size)):
        content = df.loc[idx : (idx + batch_size - 1)][column].tolist()
        payload = {"text_inputs": content}
        response = query_endpoint(
            json.dumps(payload).encode("utf-8"), endpoint
        )
        generated_embeddings = parse_query_response(response)
        embeddings.extend(generated_embeddings)
    embeddings_df = pd.DataFrame(embeddings)
    return embeddings_df

endpoint_name = generate_random_string(10)
embeddings_df = build_embedding_table(data_frame, endpoint_name, column_name)

Indexing the embedding knowledge library using SageMaker KNN algorithm

1. Training Job for Embedding Indexing

I start the KNN by initiating a training job that creates an index for the embedding knowledge data. To accomplish this, I employ the powerful Faiss algorithm, which efficiently indexes the data for similarity search.

2. Endpoint for Nearest Document Retrieval

Once the indexing is complete, I create the endpoint. This endpoint handles the task of accepting query embeddings and promptly returning the top K nearest indexes of the relevant documents.

Note: During the KNN training job, matrix of features represented by an N by P structure. Here, N denotes the number of documents in the knowledge library, P represents the embedding dimension, and each row elegantly corresponds to the embedding of a document.

import numpy as np
import os
import io
from sagemaker.session import Session as SM_Session
import sagemaker.amazon.common as smac

train_features = np.array(df_knowledge_embed)

# Providing each answer embedding label
train_labels = np.array([i for i in range(len(train_features))])

print("train_features shape =", train_features.shape)
print("train_labels shape =", train_labels.shape)

buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(buf, train_features, train_labels)
buf.seek(0)

bucket_name = SM_Session().default_bucket()  # modify to your bucket name
prefix = "RAGDatabase"
key = "Amazon-SageMaker-RAG"

s3_client = boto3.client("s3")
s3_client.upload_fileobj(buf, bucket_name, os.path.join(prefix, "train", key))
s3_train_data = f"s3://{bucket_name}/{prefix}/train/{key}"
print(f"Uploaded training data location: {s3_train_data}")

TOP_K = 5

from sagemaker.session import Session as SM_Session
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.estimator import Estimator as SM_Estimator


def create_estimator(train_data, hyperparameters, output_path):
    """
    Create an Estimator from the given hyperparameters, fit to training data,
    and return a deployed predictor.
    """
    # Set up the estimator
    knn = SM_Estimator(
        get_image_uri(boto3.Session().region_name, "knn"),
        role,
        instance_count=1,
        instance_type="ml.m5.2xlarge",
        output_path=output_path,
        sagemaker_session=session
    )
    knn.set_hyperparameters(**hyperparameters)

    # Train a model. fit_input contains the locations of the train data
    fit_input = {"train": train_data}
    knn.fit(fit_input)
    return knn

hyperparameters = {
    "feature_dim": train_features.shape[1],
    "k": TOP_K,
    "sample_size": train_features.shape[0],
    "predictor_type": "classifier"
}
output_path = f"s3://{bucket_name}/{prefix}/default_example/output"
knn_estimator = create_estimator(s3_train_data, hyperparameters, output_path)


Deploying the KNN endpoint for retrieving indexes of top K most relevant docuemnts.

from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer


def predictor_from_estimator(estimator, instance_type, endpoint_name=None):
    predictor = estimator.deploy(
        initial_instance_count=1,
        instance_type=instance_type,
        endpoint_name=endpoint_name
    )
    predictor.serializer = CSVSerializer()
    predictor.deserializer = JSONDeserializer()
    return predictor


instance_type = "ml.m4.xlarge"
endpoint_name = name_from_base(f"jumpstart-example-ragknn")

knn_predictor = predictor_from_estimator(knn_estimator, instance_type, endpoint_name=endpoint_name)

Retrieving the most relevant documents

Given the embedding of a query, the endpoint is queried to obtain the indexes of the top K most relevant documents. These indexes will then be used to retrieve the corresponding textual documents.

The textual documents are retrieved while ensuring that the combined length does not exceed MAX_SECTION_LEN. This step is crucial to ensure that the context sent into the prompt contains a substantial amount of information while staying within the capacity of the model.


MAX_SECTION_LEN = 500
SEPARATOR = "\n* "

def construct_context(context_predictions, data_frame) -> str:
    selected_sections = []
    total_length = 0

    for index in context_predictions:
        document_section = data_frame.loc[index]
        section_length = len(document_section) + 2
        if total_length + section_length > MAX_SECTION_LEN:
            break

        selected_sections.append(SEPARATOR + document_section.replace("\n", " "))
        total_length += section_length

    concatenated_doc = "".join(selected_sections)
    print(
        f"With maximum sequence length {MAX_SECTION_LEN}, selected top {len(selected_sections)} document sections: {concatenated_doc}"
    )

    return concatenated_doc


start_time = time.time()

query_response = query_endpoint_with_json_payload(
    question, endpoint_name, content_type="application/x-text"
)
question_embedding = parse_response_multiple_texts(query_response)

# Getting the most relevant context using KNN
context_predictions = knn_predictor.predict(
    np.array(question_embedding),
    initial_args={"ContentType": "text/csv", "Accept": "application/json; verbose=true"},
)["predictions"][0]["labels"]

context_embed_retrieve = construct_context(context_predictions, df_knowledge["Answer"])

elapsed_time = time.time() - start_time

print(
    f"\nElapsed time for computing the embedding of a query and retrieving the top K most relevant documents: {elapsed_time} seconds.\n"
)


Combining the retrieved documents, prompt, and question to query the LLM


for model_id in _MODEL_CONFIG_:
    endpoint_name = _MODEL_CONFIG_[model_id]["endpoint_name"]

    model_prompt = _MODEL_CONFIG_[model_id]["prompt"]

    text_input = model_prompt.replace("{context}", context_embed_retrieve)
    text_input = text_input.replace("{question}", question)

    payload = {"text_inputs": text_input, **parameters}

    query_response = query_endpoint_with_json_payload(
        json.dumps(payload).encode("utf-8"), endpoint_name=endpoint_name
    )
    generated_texts = _MODEL_CONFIG_[model_id]["parse_function"](query_response)
    print(f"For model: {model_id}, the generated output is: {generated_texts[0]}\n")