import json import os import gradio as gr import time import langchain from pydantic import BaseModel, Field from typing import Any, Optional, Dict, List, Union from huggingface_hub import InferenceClient from langchain.llms.base import LLM #from langchain.Images import Images from langchain.llms.base import LLM #from langchain_core.embeddings import EmbeddingFunction, Embeddings from langchain.embeddings import HuggingFaceInstructEmbeddings #from langchain import [all] #from langchain.Documents import Documents from langchain.vectorstores import Chroma from dotenv import load_dotenv from transformers import AutoTokenizer, AutoModel, Tool load_dotenv() path_work = "." hf_token = os.getenv("HF") class HuggingFaceInstructEmbeddings(HuggingFaceInstructEmbeddings): def __init__(self, model_name: str, model_kwargs: Optional[Dict[str, Any]] = None): self.model = AutoModel.from_pretrained(model_name, **(model_kwargs or {})) self.tokenizer = AutoTokenizer.from_pretrained(model_name) def __call__(self, input: Union[Documents]) -> HuggingFaceInstructEmbeddings: if isinstance(input, Documents): texts = [doc.text for doc in input] embeddings = self._embed_text(texts) else: # Handle image embeddings if needed pass return embeddings def _embed_text(self, texts: List[str]) -> Embeddings: # Your existing logic for text embeddings using Hugging Face models... inputs = self.tokenizer(texts, return_tensors="pt", padding=True, truncation=True) with torch.no_grad(): outputs = self.model(**inputs) embeddings = outputs.last_hidden_state.mean(dim=1) # Adjust this based on your specific model return embeddings vectordb = Chroma( persist_directory=path_work + '/new_papers', embedding_function=HuggingFaceInstructEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2", model_kwargs={"device": "cpu"}) ) retriever = vectordb.as_retriever(search_kwargs={"k": 2})#5 class KwArgsModel(BaseModel): kwargs: Dict[str, Any] = Field(default_factory=dict) class CustomInferenceClient(LLM, KwArgsModel): model_name: str inference_client: InferenceClient def __init__(self, model_name: str, hf_token: str, kwargs: Optional[Dict[str, Any]] = None): inference_client = InferenceClient(model=model_name, token=hf_token) super().__init__( model_name=model_name, hf_token=hf_token, kwargs=kwargs, inference_client=inference_client ) def _call( self, prompt: str, stop: Optional[List[str]] = None ) -> str: if stop is not None: raise ValueError("stop kwargs are not permitted.") response_gen = self.inference_client.text_generation(prompt, **self.kwargs, stream=True) response = ''.join(response_gen) return response @property def _llm_type(self) -> str: return "custom" @property def _identifying_params(self) -> dict: return {"model_name": self.model_name} kwargs = {"max_new_tokens": 256, "temperature": 0.9, "top_p": 0.6, "repetition_penalty": 1.3, "do_sample": True} model_list = [ "meta-llama/Llama-2-13b-chat-hf", "HuggingFaceH4/zephyr-7b-alpha", "meta-llama/Llama-2-70b-chat-hf", "tiiuae/falcon-180B-chat" ] qa_chain = None def load_model(model_selected): global qa_chain model_name = model_selected llm = CustomInferenceClient(model_name=model_name, hf_token=hf_token, kwargs=kwargs) from langchain.chains import RetrievalQA qa_chain = RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=True, verbose=True, ) return qa_chain load_model("meta-llama/Llama-2-70b-chat-hf") ########## ##### ######### ### ### ### def predict(message, temperature=0.9, max_new_tokens=512, top_p=0.6, repetition_penalty=1.3): temperature = float(temperature) if temperature < 1e-2: temperature = 1e-2 top_p = float(top_p) llm_response = qa_chain(message) res_result = llm_response['result'] res_relevant_doc = [source.metadata['source'] for source in llm_response["source_documents"]] response = f"{res_result}" + "\n\n" + "[Answer Source Documents (Ctrl + Click!)] :" + "\n" + f" \n {res_relevant_doc}" print("response: =====> \n", response, "\n\n") tokens = response.split('\n') token_list = [] for idx, token in enumerate(tokens): token_dict = {"id": idx + 1, "text": token} token_list.append(token_dict) response = {"data": {"token": token_list}} response = json.dumps(response, indent=4) response = json.loads(response) data_dict = response.get('data', {}) token_list = data_dict.get('token', []) partial_message = "" for token_entry in token_list: if token_entry: try: # Handle missing 'id' key gracefully token_id = token_entry.get('id', None) token_text = token_entry.get('text', None) if token_text: for char in token_text: partial_message += char yield partial_message time.sleep(0.01) else: print(f"Warning ==> The key 'text' does not exist or is None in this token entry: {token_entry}") pass except KeyError as e: print(f"KeyError: {e} occurred for token entry: {token_entry}") continue class TextGeneratorTool(Tool): name = "vector_retriever" description = "This tool searches in a vector store based on a given prompt." inputs = ["prompt"] outputs = ["text"] def __init__(self): #self.retriever = db.as_retriever(search_kwargs={"k": 1}) pass # You might want to add some initialization logic here def __call__(self, prompt: str): result = predict(prompt, 0.9, 512, 0.6, 1.4) return result