# app.py import spaces from torch.nn import DataParallel from torch import Tensor from transformers import AutoTokenizer, AutoModel from huggingface_hub import InferenceClient from openai import OpenAI from langchain_community.embeddings import HuggingFaceInstructEmbeddings from langchain_community.document_loaders import UnstructuredFileLoader from langchain_chroma import Chroma from chromadb import Documents, EmbeddingFunction, Embeddings from chromadb.config import Settings import chromadb #import HttpClient from typing import List, Tuple, Dict, Any import os import re import uuid import gradio as gr import torch import torch.nn.functional as F from dotenv import load_dotenv from utils import load_env_variables, parse_and_route , escape_special_characters from globalvars import API_BASE, intention_prompt, tasks, system_message, model_name , metadata_prompt # import time # import httpx from langchain_community.chat_models import ChatOpenAI from langchain.retrievers.document_compressors import LLMChainExtractor from langchain.retrievers.multi_query import MultiQueryRetriever from langchain.retrievers import ContextualCompressionRetriever from langchain.prompts.chat import ChatPromptTemplate, HumanMessagePromptTemplate # from langchain.vectorstores import Chroma load_dotenv() os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:50' os.environ['CUDA_LAUNCH_BLOCKING'] = '1' os.environ['CUDA_CACHE_DISABLE'] = '1' device = torch.device("cuda" if torch.cuda.is_available() else "cpu") ### Utils hf_token, yi_token = load_env_variables() # tokenizer = AutoTokenizer.from_pretrained(model_name, token=hf_token, trust_remote_code=True) # Lazy load model model = None @spaces.GPU def load_model(): global model if model is None: from transformers import AutoModel model = AutoModel.from_pretrained(model_name, token=hf_token, trust_remote_code=True).to(device) return model # Load model nvidiamodel = load_model() # nvidiamodel.set_pooling_include_prompt(include_prompt=False) def clear_cuda_cache(): torch.cuda.empty_cache() client = OpenAI(api_key=yi_token, base_url=API_BASE) chroma_client = chromadb.Client(Settings()) # Create a collection chroma_collection = chroma_client.create_collection("all-my-documents") @spaces.GPU class MyEmbeddingFunction(EmbeddingFunction): def __init__(self, model_name: str, token: str, intention_client): self.model_name = model_name self.token = token self.intention_client = intention_client self.hf_embeddings = HuggingFaceInstructEmbeddings( model_name=model_name, model_kwargs={'device': 'cuda' if torch.cuda.is_available() else 'cpu'}, encode_kwargs={'normalize_embeddings': True} ) def create_embedding_generator(self): return self.hf_embeddings def __call__(self, input: Documents) -> (List[List[float]], List[Dict[str, Any]]): embeddings_with_metadata = [self.compute_embeddings(doc.page_content) for doc in input] embeddings = [item[0] for item in embeddings_with_metadata] metadata = [item[1] for item in embeddings_with_metadata] embeddings_flattened = [emb for sublist in embeddings for emb in sublist] metadata_flattened = [meta for sublist in metadata for meta in sublist] return embeddings_flattened, metadata_flattened @spaces.GPU def compute_embeddings(self, input_text: str): escaped_input_text = escape_special_characters(input_text) # Get the intention intention_completion = self.intention_client.chat.completions.create( model="yi-large", messages=[ {"role": "system", "content": escape_special_characters(intention_prompt)}, {"role": "user", "content": escaped_input_text} ] ) intention_output = intention_completion.choices[0].message.content parsed_task = parse_and_route(intention_output) selected_task = parsed_task if parsed_task in tasks else "DEFAULT" task_description = tasks[selected_task] # query_prefix = "Instruct: " +tasks[selected_task] +"\nQuery: " # Construct the embed_instruction and query_instruction dynamically embed_instruction = f"Instruct: {task_description}" +"\nQuery:" # query_instruction = f"" # Update the hf_embeddings object with the new instructions self.hf_embeddings.embed_instruction = embed_instruction # self.hf_embeddings.query_instruction = query_instruction # Get the metadata metadata_completion = self.intention_client.chat.completions.create( model="yi-large", messages=[ {"role": "system", "content": escape_special_characters(metadata_prompt)}, {"role": "user", "content": escaped_input_text} ] ) metadata_output = metadata_completion.choices[0].message.content metadata = self.extract_metadata(metadata_output) # Get the embeddings embeddings = self.hf_embeddings.embed_documents([escaped_input_text]) return embeddings[0], metadata def extract_metadata(self, metadata_output: str) -> Dict[str, str]: pattern = re.compile(r'\"(\w+)\": \"([^\"]+)\"') matches = pattern.findall(metadata_output) metadata = {key: value for key, value in matches} return metadata def load_documents(file_path: str, mode: str = "elements"): loader = UnstructuredFileLoader(file_path, mode=mode) docs = loader.load() return [doc.page_content for doc in docs] def initialize_chroma(collection_name: str, embedding_function: MyEmbeddingFunction): db = Chroma(client=chroma_client, collection_name=collection_name, embedding_function=embedding_function) return db def add_documents_to_chroma(documents: list, embedding_function: MyEmbeddingFunction): for doc in documents: embeddings, metadata = embedding_function.compute_embeddings(doc) for embedding, meta in zip(embeddings, metadata): chroma_collection.add( ids=[str(uuid.uuid1())], documents=[doc], embeddings=[embedding], metadatas=[meta] ) def query_chroma(query_text: str, embedding_function: MyEmbeddingFunction): model = load_model() query_embeddings, query_metadata = embedding_function.compute_embeddings(query_text) result_docs = chroma_collection.query( query_texts=[query_text], n_results=3 ) return result_docs def answer_query(message: str, chat_history: List[Tuple[str, str]]): base_compressor = LLMChainExtractor.from_llm(intention_client) db = Chroma(persist_directory="output/general_knowledge", embedding_function=embedding_function) base_retriever = db.as_retriever() mq_retriever = MultiQueryRetriever.from_llm(retriever=base_retriever, llm=intention_client) compression_retriever = ContextualCompressionRetriever(base_compressor=base_compressor, base_retriever=mq_retriever) matched_docs = compression_retriever.get_relevant_documents(query=message) context = "" for doc in matched_docs: page_content = doc.page_content context += page_content context += "\n\n" template = """ Answer the following question only by using the context given below in the triple backticks, do not use any other information to answer the question. If you can't answer the given question with the given context, you can return an empty string ('') Context: ```{context}``` ---------------------------- Question: {query} ---------------------------- Answer: """ human_message_prompt = HumanMessagePromptTemplate.from_template(template=template) chat_prompt = ChatPromptTemplate.from_messages([human_message_prompt]) prompt = chat_prompt.format_prompt(query=message, context=context) response = intention_client.chat(messages=prompt.to_messages()).content chat_history.append((message, response)) return "", chat_history # Initialize clients intention_client = OpenAI(api_key=yi_token, base_url=API_BASE) embedding_function = MyEmbeddingFunction(model_name=model_name, token=hf_token, intention_client=intention_client) chroma_db = initialize_chroma(collection_name="Tonic-instruct", embedding_function=embedding_function) def upload_documents(files): for file in files: loader = UnstructuredFileLoader(file.name) documents = loader.load() add_documents_to_chroma(documents, embedding_function) return "Documents uploaded and processed successfully!" def query_documents(query): model = load_model() results = query_chroma(query) return "\n\n".join([result.content for result in results]) with gr.Blocks() as demo: with gr.Tab("Upload Documents"): document_upload = gr.File(file_count="multiple", file_types=["document"]) upload_button = gr.Button("Upload and Process") upload_button.click(upload_documents, inputs=document_upload, outputs=gr.Text()) with gr.Tab("Ask Questions"): with gr.Row(): chat_interface = gr.ChatInterface( answer_query, additional_inputs=[ gr.Textbox(value="You are a friendly Chatbot.", label="System message"), gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max new tokens"), gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature"), gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"), ], ) query_input = gr.Textbox(label="Query") query_button = gr.Button("Query") query_output = gr.Textbox() query_button.click(query_documents, inputs=query_input, outputs=query_output) if __name__ == "__main__": # os.system("chroma run --host localhost --port 8000 &") demo.launch()