01aiYi-NvidiaEmbed / langchainapp.py
Tonic's picture
fix sentence transformer latest version
ac88608
raw
history blame
No virus
10.2 kB
# app.py
import spaces
from torch.nn import DataParallel
from torch import Tensor
from transformers import AutoTokenizer, AutoModel
from huggingface_hub import InferenceClient
from openai import OpenAI
from langchain_community.embeddings import HuggingFaceInstructEmbeddings
from langchain_community.document_loaders import UnstructuredFileLoader
from langchain_chroma import Chroma
from chromadb import Documents, EmbeddingFunction, Embeddings
from chromadb.config import Settings
import chromadb #import HttpClient
from typing import List, Tuple, Dict, Any
import os
import re
import uuid
import gradio as gr
import torch
import torch.nn.functional as F
from dotenv import load_dotenv
from utils import load_env_variables, parse_and_route , escape_special_characters
from globalvars import API_BASE, intention_prompt, tasks, system_message, model_name , metadata_prompt
# import time
# import httpx
from langchain_community.chat_models import ChatOpenAI
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.retrievers import ContextualCompressionRetriever
from langchain.prompts.chat import ChatPromptTemplate, HumanMessagePromptTemplate
# from langchain.vectorstores import Chroma
load_dotenv()
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:30'
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
os.environ['CUDA_CACHE_DISABLE'] = '1'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
### Utils
hf_token, yi_token = load_env_variables()
tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token, trust_remote_code=True)
nvidiamodel = AutoModel.from_pretrained(model_name, token=hf_token, trust_remote_code=True).to(device)
# nvidiamodel.set_pooling_include_prompt(include_prompt=False)
def clear_cuda_cache():
torch.cuda.empty_cache()
client = OpenAI(api_key=yi_token, base_url=API_BASE)
chroma_client = chromadb.Client(Settings())
# Create a collection
chroma_collection = chroma_client.create_collection("all-my-documents")
class MyEmbeddingFunction(EmbeddingFunction):
def __init__(self, model_name: str, token: str, intention_client):
self.model_name = model_name
self.token = token
self.intention_client = intention_client
self.hf_embeddings = HuggingFaceInstructEmbeddings(
model_name=model_name,
model_kwargs={'device': 'cuda' if torch.cuda.is_available() else 'cpu'},
encode_kwargs={'normalize_embeddings': True}
)
def create_embedding_generator(self):
return self.hf_embeddings
def __call__(self, input: Documents) -> (List[List[float]], List[Dict[str, Any]]):
embeddings_with_metadata = [self.compute_embeddings(doc.page_content) for doc in input]
embeddings = [item[0] for item in embeddings_with_metadata]
metadata = [item[1] for item in embeddings_with_metadata]
embeddings_flattened = [emb for sublist in embeddings for emb in sublist]
metadata_flattened = [meta for sublist in metadata for meta in sublist]
return embeddings_flattened, metadata_flattened
@spaces.GPU
def compute_embeddings(self, input_text: str):
escaped_input_text = escape_special_characters(input_text)
# Get the intention
intention_completion = self.intention_client.chat.completions.create(
model="yi-large",
messages=[
{"role": "system", "content": escape_special_characters(intention_prompt)},
{"role": "user", "content": escaped_input_text}
]
)
intention_output = intention_completion.choices[0].message.content
parsed_task = parse_and_route(intention_output)
selected_task = parsed_task if parsed_task in tasks else "DEFAULT"
task_description = tasks[selected_task]
# query_prefix = "Instruct: " +tasks[selected_task] +"\nQuery: "
# Construct the embed_instruction and query_instruction dynamically
embed_instruction = f"Instruct: {task_description}" +"\nQuery:"
# query_instruction = f""
# Update the hf_embeddings object with the new instructions
self.hf_embeddings.embed_instruction = embed_instruction
# self.hf_embeddings.query_instruction = query_instruction
# Get the metadata
metadata_completion = self.intention_client.chat.completions.create(
model="yi-large",
messages=[
{"role": "system", "content": escape_special_characters(metadata_prompt)},
{"role": "user", "content": escaped_input_text}
]
)
metadata_output = metadata_completion.choices[0].message.content
metadata = self.extract_metadata(metadata_output)
# Get the embeddings
embeddings = self.hf_embeddings.embed_documents([escaped_input_text])
return embeddings[0], metadata
def extract_metadata(self, metadata_output: str) -> Dict[str, str]:
pattern = re.compile(r'\"(\w+)\": \"([^\"]+)\"')
matches = pattern.findall(metadata_output)
metadata = {key: value for key, value in matches}
return metadata
def load_documents(file_path: str, mode: str = "elements"):
loader = UnstructuredFileLoader(file_path, mode=mode)
docs = loader.load()
return [doc.page_content for doc in docs]
def initialize_chroma(collection_name: str, embedding_function: MyEmbeddingFunction):
db = Chroma(client=chroma_client, collection_name=collection_name, embedding_function=embedding_function)
return db
def add_documents_to_chroma(documents: list, embedding_function: MyEmbeddingFunction):
for doc in documents:
embeddings, metadata = embedding_function.compute_embeddings(doc)
for embedding, meta in zip(embeddings, metadata):
chroma_collection.add(
ids=[str(uuid.uuid1())],
documents=[doc],
embeddings=[embedding],
metadatas=[meta]
)
def query_chroma(query_text: str, embedding_function: MyEmbeddingFunction):
query_embeddings, query_metadata = embedding_function.compute_embeddings(query_text)
result_docs = chroma_collection.query(
query_texts=[query_text],
n_results=3
)
return result_docs
def answer_query(message: str, chat_history: List[Tuple[str, str]]):
base_compressor = LLMChainExtractor.from_llm(intention_client)
db = Chroma(persist_directory="output/general_knowledge", embedding_function=embedding_function)
base_retriever = db.as_retriever()
mq_retriever = MultiQueryRetriever.from_llm(retriever=base_retriever, llm=intention_client)
compression_retriever = ContextualCompressionRetriever(base_compressor=base_compressor, base_retriever=mq_retriever)
matched_docs = compression_retriever.get_relevant_documents(query=message)
context = ""
for doc in matched_docs:
page_content = doc.page_content
context += page_content
context += "\n\n"
template = """
Answer the following question only by using the context given below in the triple backticks, do not use any other information to answer the question.
If you can't answer the given question with the given context, you can return an empty string ('')
Context: ```{context}```
----------------------------
Question: {query}
----------------------------
Answer: """
human_message_prompt = HumanMessagePromptTemplate.from_template(template=template)
chat_prompt = ChatPromptTemplate.from_messages([human_message_prompt])
prompt = chat_prompt.format_prompt(query=message, context=context)
response = intention_client.chat(messages=prompt.to_messages()).content
chat_history.append((message, response))
return "", chat_history
# Initialize clients
intention_client = OpenAI(api_key=yi_token, base_url=API_BASE)
embedding_function = MyEmbeddingFunction(model_name=model_name, token=hf_token, intention_client=intention_client)
chroma_db = initialize_chroma(collection_name="Tonic-instruct", embedding_function=embedding_function)
def upload_documents(files):
for file in files:
loader = UnstructuredFileLoader(file.name)
documents = loader.load()
add_documents_to_chroma(documents, embedding_function)
return "Documents uploaded and processed successfully!"
def query_documents(query):
results = query_chroma(query)
return "\n\n".join([result.content for result in results])
with gr.Blocks() as demo:
with gr.Tab("Upload Documents"):
document_upload = gr.File(file_count="multiple", file_types=["document"])
upload_button = gr.Button("Upload and Process")
upload_button.click(upload_documents, inputs=document_upload, outputs=gr.Text())
with gr.Tab("Ask Questions"):
with gr.Row():
chat_interface = gr.ChatInterface(
answer_query,
additional_inputs=[
gr.Textbox(value="You are a friendly Chatbot.", label="System message"),
gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"),
gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"),
],
)
query_input = gr.Textbox(label="Query")
query_button = gr.Button("Query")
query_output = gr.Textbox()
query_button.click(query_documents, inputs=query_input, outputs=query_output)
if __name__ == "__main__":
# os.system("chroma run --host localhost --port 8000 &")
demo.launch()