File size: 7,294 Bytes
f66af3d ec5ad7b f66af3d 4538fa2 3487766 f66af3d d8c7d29 f66af3d d8c7d29 f66af3d 3487766 f66af3d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 |
import gradio as gr
import boto3
import json
import os
import numpy as np
import botocore
import time
from scipy.spatial.distance import cosine as cosine_similarity
theme = gr.themes.Base(text_size='sm')
# Retrieve AWS credentials from environment variables
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_REGION = os.getenv('REGION_NAME')
AWS_SESSION = os.getenv('AWS_SESSION')
BUCKET_NAME = os.getenv('BUCKET_NAME')
EXTRACTIONS_PATH = os.getenv('EXTRACTIONS_PATH')
# Create AWS Bedrock client using environment variables
def create_bedrock_client():
return boto3.client(
'bedrock-runtime',
region_name=AWS_REGION,
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_session_token=AWS_SESSION
)
def create_s3_client():
# Create an S3 client
return boto3.client(
's3',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_session_token=AWS_SESSION
)
def read_json_from_s3():
response = s3_client.get_object(Bucket=BUCKET_NAME, Key=EXTRACTIONS_PATH)
file_content = response['Body'].read().decode('utf-8')
json_content = json.loads(file_content)
return json_content
def get_titan_embedding(bedrock, doc_name, text, attempt=0, cutoff=10000):
"""
Retrieves a text embedding for a given document using the Amazon Titan Embedding model.
This function sends the provided text to the Amazon Titan text embedding model
and retrieves the resulting embedding. It handles retries for throttling exceptions
and input size limitations by recursively calling itself with adjusted parameters.
Parameters:
doc_name (str): The name of the document, used for logging and error messages.
text (str): The text content to be processed by the Titan embedding model.
attempt (int): The current attempt number (used in recursive calls to handle retries). Defaults to 0.
cutoff (int): The maximum number of words to include from the input text if a ValidationException occurs due to input size limits. Defaults to 5000.
Returns:
dict or None: The embedding response from the Titan model as a dictionary, or None if the operation fails or exceeds the retry limits.
"""
retries = 5
try:
model_id = 'amazon.titan-embed-text-v1'
accept = 'application/json'
content_type = 'application/json'
body = json.dumps({
"inputText": text,
})
# Invoke model
response = bedrock.invoke_model(
body=body,
modelId=model_id,
accept=accept,
contentType=content_type
)
# Print response
response_body = json.loads(response['body'].read())
# Handle a few common client exceptions
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] == 'ThrottlingException':
if attempt + 1 == retries:
return None
delay = 2 ** (attempt + 1);
time.sleep(delay)
return get_titan_embedding(doc_name, text, attempt=attempt + 1)
elif error.response['Error']['Code'] == 'ValidationException':
# get chunks of text length 20000 characters
text_chunks = [text[i:i+cutoff] for i in range(0, len(text), cutoff)]
embeddings = []
for chunk in text_chunks:
embeddings.append(get_titan_embedding(bedrock, doc_name, chunk))
# return the average of the embeddinngs
return np.mean(embeddings, axis=0)
else:
yield f"Unhandled Exception when processing {doc_name}! : {error.response['Error']['Code']}"
return None
# Catch-all for any other exceptions
except Exception as error:
yield f"Unhandled Exception when processing {doc_name}: {type(error).__name__}"
return None
return response_body.get('embedding')
def ask_ds(message, history):
if len(message) == 0:
return
question = message
# RAG
question_embedding = get_titan_embedding(bedrock_client, 'question', question)
similar_documents = []
for file, data in extractions.items():
similarity = cosine_similarity(question_embedding, np.array(data['embedding']))
similar_documents.append((file, similarity))
similar_documents.sort(key=lambda x: x[1], reverse=False)
similar_content = ''
for file, _ in similar_documents[:5]:
similar_content += extractions[file]['content'] + '\n'
# Invoke
response = bedrock_client.invoke_model_with_response_stream(
modelId="anthropic.claude-3-sonnet-20240229-v1:0",
body=json.dumps(
{
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 4096,
"system": f"""You are a helpful, excited assistant that answers questions about certain provided documents.
<Task>
Your task is to review the provided relevant information and answer the user's question to the best of your ability.
Try to use only the information in the document to answer. Refrain from saying things like 'According to the relevant information provided'.
Format your output nicely with sentences that are not too long. You should prefer lists or bullet points when applicable.
Begin by thanking the user for their question, and at the end of your answer, say "Thank you for using Ask Dane Street!"
</Task>
<Relevant Information>
{similar_content}
</Relevant Information>""",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": message
}
]
}
],
}
),
)
all_text = ''
stream = response.get('body')
if stream:
for event in stream:
chunk = event.get('chunk')
if chunk and json.loads(chunk.get('bytes').decode()):
# check if delta is present
try:
this_text = json.loads(chunk.get('bytes').decode()).get('delta').get('text')
all_text += this_text
yield all_text # Stream the text back to the UI
except:
pass
output = '\n\nCheck out the following documents for more information:\n'
for file, _ in similar_documents[:5]:
output += f"\n{file.replace('.txt', '.pdf')}"
yield all_text + output
bedrock_client = create_bedrock_client()
s3_client = create_s3_client()
extractions = read_json_from_s3()
demo = gr.ChatInterface(fn=ask_ds, title="AskDS_HR", multimodal=False, chatbot=gr.Chatbot(value=[(None, "")],),theme=theme)
demo.launch()
|