import gradio as gr
import boto3
import json
import os
import numpy as np
import botocore
import time
from scipy.spatial.distance import cosine as cosine_similarity
theme = gr.themes.Base(text_size='sm')
# Retrieve AWS credentials from environment variables
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_REGION = os.getenv('REGION_NAME')
AWS_SESSION = os.getenv('AWS_SESSION')
BUCKET_NAME = os.getenv('BUCKET_NAME')
EXTRACTIONS_PATH = os.getenv('EXTRACTIONS_PATH')
# Create AWS Bedrock client using environment variables
def create_bedrock_client():
return boto3.client(
'bedrock-runtime',
region_name=AWS_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_session_token=AWS_SESSION
)
# Create AWS S3 client using environment variables
def create_s3_client():
return boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_session_token=AWS_SESSION
)
# Read JSON directly into mem from S3
def read_json_from_s3():
response = s3_client.get_object(Bucket=BUCKET_NAME, Key=EXTRACTIONS_PATH)
file_content = response['Body'].read().decode('utf-8')
json_content = json.loads(file_content)
return json_content
# Get AWS Titan embedding of text
def get_titan_embedding(bedrock_client, doc_name, text, attempt=0, cutoff=10000):
"""
Retrieves a text embedding for a given document using the Amazon Titan Embedding model.
This function sends the provided text to the Amazon Titan text embedding model
and retrieves the resulting embedding. It handles retries for throttling exceptions
and input size limitations by recursively calling itself with adjusted parameters.
Parameters:
doc_name (str): The name of the document, used for logging and error messages.
text (str): The text content to be processed by the Titan embedding model.
attempt (int): The current attempt number (used in recursive calls to handle retries). Defaults to 0.
cutoff (int): The maximum number of words to include from the input text if a ValidationException occurs due to input size limits. Defaults to 5000.
Returns:
dict or None: The embedding response from the Titan model as a dictionary, or None if the operation fails or exceeds the retry limits.
"""
retries = 5
model_id = 'amazon.titan-embed-text-v1'
accept = 'application/json'
content_type = 'application/json'
body = json.dumps({
"inputText": text,
})
# Invoke model
response = bedrock_client.invoke_model(
body=body,
modelId=model_id,
accept=accept,
contentType=content_type
)
# Print response
response_body = json.loads(response['body'].read())
return response_body.get('embedding')
# Main Chat
def ask_ds(message, history):
if len(message) == 0:
return
question = message
# RAG
question_embedding = get_titan_embedding(bedrock_client, 'question', question)
similar_documents = []
for file, data in extractions.items():
similarity = cosine_similarity(question_embedding, np.array(data['embedding']))
similar_documents.append((file, similarity))
similar_documents.sort(key=lambda x: x[1], reverse=False)
top_3 = similar_documents[:3]
similar_content = ''
for file, _ in top_3:
similar_content += extractions[file]['content'] + '\n'
# Invoke
response = bedrock_client.invoke_model_with_response_stream(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps(
{
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"system": f"""You are a helpful, excited assistant that answers questions about certain provided documents.
Your task is to review the provided relevant information and answer the user's question to the best of your ability.
Try to use only the information in the document to answer. Refrain from saying things like 'According to the relevant information provided'.
Format your output nicely with sentences that are not too long. You should prefer lists or bullet points but only when applicable.
Begin by thanking the user for their question, and at the end of your answer, say "Thank you for using Ask Dane Street!"
{similar_content}
""",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": message
}
]
}
],
}
),
)
# Stream the response
all_text = ''
stream = response.get('body')
if stream:
for event in stream:
chunk = event.get('chunk')
if chunk and json.loads(chunk.get('bytes').decode()):
# check if delta is present
try:
this_text = json.loads(chunk.get('bytes').decode()).get('delta').get('text')
all_text += this_text
yield all_text # Stream the text back to the UI
except:
pass
# Print relevant files
output = '\n\nCheck out the following documents for more information:\n'
for file, sim in top_3:
output += f"\n{file.replace('.txt', '.pdf')}"
yield all_text + output
# Create necessary services and collect data
bedrock_client = create_bedrock_client()
s3_client = create_s3_client()
extractions = read_json_from_s3()
demo = gr.ChatInterface(fn=ask_ds, title="AskDS_HR", multimodal=False, chatbot=gr.Chatbot(value=[(None, "Welcome to Ask HR at Dane Street! Whether you're new to the team or just looking for some quick information, I'm here to guide you through our comprehensive range of documents. From Benefits and Payroll queries to understanding our Policies and Paylocity, simply ask your question and I'll provide you with the most relevant information I can.\n\nCurious about your 401(k) account changes? Or maybe you need details on the 2023 Benefits Guide? I can help with that and much more! Don't hesitate to ask, and thank you for making Dane Street an amazing place to work. If there’s something specific you'd like to know, please type your question below, and I will do my best to assist you.")],),theme=theme)
demo.launch()