import json, os, time, uuid import pandas as pd import numpy as np from sklearn.metrics.pairwise import cosine_similarity from transformers import AutoTokenizer, AutoModel import chromadb from chromadb.config import Settings from chromadb.utils import embedding_functions from InstructorEmbedding import INSTRUCTOR from langchain.vectorstores import Chroma ''' If there is a transformers install error: pip install transformers==4.29.2 Python 3.8 and above will need to upgrade the transformers to 4.2x.xx https://github.com/huggingface/transformers/issues/11799 The goal is to creat a domain knowledge database based on existing transcribed labels. I modify the domain knowledge (an xlsx file) so that each row is embedded in a way that most closely resembles the raw OCR output, since that is what will be used to query against the db. Once the closest row is found, I use the id to go back to the xlsx and take the whole row, converting it into a dictionary similar to the desired output from the LLM. This dict is then added to the prompt as a hint for the LLM. ''' ''' pip uninstall protobuf pip install protobuf==3.19.5 ''' class VoucherVisionEmbedding: # def __init__(self, db_name, path_domain_knowledge, logger, build_new_db=False, model_name="hkunlp/instructor-xl", device="cuda"): # DB_DIR = os.path.join(os.path.dirname(__file__), db_name) # client_settings = chromadb.config.Settings( # chroma_db_impl="duckdb+parquet", # persist_directory=DB_DIR, # anonymized_telemetry=False # ) # embeddings = embedding_functions.InstructorEmbeddingFunction(model_name=model_name, device=device) # self.collection = Chroma( # collection_name="langchain_store", # embedding_function=embeddings, # client_settings=client_settings, # persist_directory=DB_DIR, # ) # total_rows = len(self.domain_knowledge) # for index, row in self.domain_knowledge.iterrows(): # try: # self.logger.info(f"[Creating New Embedding DB] --- Adding Row {index+1}/{total_rows}") # except: # print(f"Row {index+1}/{total_rows}") # id = str(row[0]) # document = str(' '.join(row[1:][row[1:].notna()].astype(str))) # self.collection.add_texts(document, None, id, embedding=embeddings) # self.collection.persist() # print(self.collection) def __init__(self, db_name, path_domain_knowledge, logger, build_new_db=False, model_name="hkunlp/instructor-xl", device="cuda"): DB_DIR = os.path.join(os.path.dirname(__file__), db_name) self.logger = logger self.path_domain_knowledge = path_domain_knowledge self.client = chromadb.PersistentClient(path=DB_DIR, settings=Settings(anonymized_telemetry=False)) ef = embedding_functions.InstructorEmbeddingFunction(model_name=model_name, device=device) self.domain_knowledge = pd.read_excel(path_domain_knowledge).fillna('').astype(str) if build_new_db: self.logger.info(f"Creating new DB from {self.path_domain_knowledge}") self.collection = self.client.create_collection(name=db_name, embedding_function=ef, metadata={"hnsw:space": "cosine"}) self.create_db_from_xlsx() else: try: self.collection = self.client.get_collection(name=db_name, embedding_function=ef) except: self.logger.error(f"Embedding database not found! Creating new DB from {self.path_domain_knowledge}") self.collection = self.client.create_collection(name=db_name, embedding_function=ef, metadata={"hnsw:space": "cosine"}) self.create_db_from_xlsx() def add_document(self, document, metadata, id): id = str(id) existing_documents = self.collection.get() if id not in existing_documents['ids']: try: self.collection.add(documents=[document], ids=[id]) except Exception as e: self.logger.error(f"Error while adding document {id}: {str(e)}") # try: # self.collection.add(documents=[document], ids=[id]) # except: # try: # time.sleep(0.1) # self.collection.add(documents=[document], ids=[id]) # except: # try: # self.logger.info(f"[Embedding Add Doc] --- Failed, skipping: {id}") # except: # print(f"Failed, skipping: {id}") else: try: self.logger.info(f"[Embedding Add Doc] --- ID already exists in the collection: {id}") except: print(f"ID already exists in the collection: {id}") def query_db(self, query_text, n_results): results = self.collection.query(query_texts=[query_text], n_results=n_results) self.similarity = round(results['distances'][0][0],3) self.similarity_exact = results['distances'][0][0] try: self.logger.info(f"[Embedding Search] --- Similarity (close to zero is best) {self.similarity}") except: print(f"Similarity (close to zero is best) --- {self.similarity}") self.domain_knowledge.iloc[:, 0] = self.domain_knowledge.iloc[:, 0].astype(str) # Initialize an empty list to hold dictionaries for id in results['ids']: row_dicts = self._get_row_from_df(id) if not row_dicts: # try: # self.logger.info(f"[Embedding Search] --- Similar Dictionary\n{row_dicts}") # except: # print(row_dicts) # else: try: self.logger.info(f"[Embedding Search] --- No row found for id {id}") except: print(f"No row found for id {id}") # Return the list of dictionaries if n_results > 1, else return single dictionary if n_results > 1: return row_dicts else: return row_dicts[0] if row_dicts else None def create_db_from_xlsx(self): total_rows = len(self.domain_knowledge) for index, row in self.domain_knowledge.iterrows(): try: self.logger.info(f"[Creating New Embedding DB] --- Adding Row {index+1}/{total_rows}") except: print(f"Row {index+1}/{total_rows}") id = str(row.iloc[0]) document = str(' '.join(row[0:][row[0:].notna()].astype(str))) self.add_document(document, None, id) def get_similarity(self): return self.similarity_exact def _get_row_from_df(self, ids): row_dicts = [] # initialize an empty list to hold dictionaries for id in ids: row = self.domain_knowledge[self.domain_knowledge.iloc[:, 0] == id] if not row.empty: row_dict = row.iloc[0].to_dict() row_dict.pop('Catalog Number', None) for key in row_dict: if pd.isna(row_dict[key]): row_dict[key] = '' row_dicts.append(row_dict) # append the dictionary to the list return row_dicts if row_dicts else None # return the list of dictionaries or None if it's empty # def _get_row_from_df(self, ids): # for id in ids: # row = self.domain_knowledge[self.domain_knowledge.iloc[:, 0] == id] # if not row.empty: # row_dict = row.iloc[0].to_dict() # row_dict.pop('Catalog Number', None) # for key in row_dict: # if pd.isna(row_dict[key]): # row_dict[key] = '' # return row_dict # return None class VoucherVisionEmbeddingTest: def __init__(self, ground_truth_dir, llm_output_dir, model_name="hkunlp/instructor-xl"): self.ground_truth_dir = ground_truth_dir self.llm_output_dir = llm_output_dir self.model_name = model_name self.model = INSTRUCTOR(model_name, device="cuda") self.instruction = "Represent the Science json dictionary document:" def compare_texts(self, ground_truth_text, predicted_text): # Convert the texts to embeddings using the given model ground_truth_embedding = self.model.encode([[self.instruction,ground_truth_text]]) predicted_embedding = self.model.encode([[self.instruction,predicted_text]]) # Compute the cosine similarity between the two embeddings similarity = cosine_similarity(ground_truth_embedding, predicted_embedding) return similarity[0][0] @staticmethod def json_to_text(json_dict): return str(json_dict) def get_max_difference(self, similarities): differences = [abs(1 - sim) for sim in similarities] return max(differences) def evaluate(self): # Get a list of all ground truth and LLM output files ground_truth_files = os.listdir(self.ground_truth_dir) llm_output_files = os.listdir(self.llm_output_dir) # Ensure file lists are sorted so they match up correctly ground_truth_files.sort() llm_output_files.sort() similarities = [] key_similarities = [] # List to store key similarity for ground_truth_file, llm_output_file in zip(ground_truth_files, llm_output_files): # Read the files and convert them to text with open(os.path.join(self.ground_truth_dir, ground_truth_file), 'r') as f: ground_truth_dict = json.load(f) ground_truth_text = self.json_to_text(ground_truth_dict) with open(os.path.join(self.llm_output_dir, llm_output_file), 'r') as ff: llm_output_dict = json.load(ff) llm_output_text = self.json_to_text(llm_output_dict) # Compute the similarity between the ground truth and the LLM output similarity = self.compare_texts(ground_truth_text, llm_output_text) # Clip and round to mitigate/smudge floating-point precision limitations similarity = np.clip(similarity, -1.0, 1.0) similarity = np.round(similarity, 6) similarities.append(similarity) # Compare keys ground_truth_keys = ', '.join(sorted(ground_truth_dict.keys())) llm_output_keys = ', '.join(sorted(llm_output_dict.keys())) key_similarity = self.compare_texts(ground_truth_keys, llm_output_keys) key_similarity = np.clip(key_similarity, -1.0, 1.0) key_similarity = np.round(key_similarity, 6) key_similarities.append(key_similarity) # Compute the mean similarity mean_similarity = np.mean(similarities) mean_key_similarity = np.mean(key_similarities) max_diff = self.get_max_difference(similarities) max_diff_key = self.get_max_difference(key_similarities) return mean_similarity, max_diff, similarities, mean_key_similarity, max_diff_key, key_similarities if __name__ == '__main__': # db_name = "VV_all_asia_minimal" db_name = "all_asia_minimal" path_domain_knowledge = 'D:/Dropbox/LeafMachine2/leafmachine2/transcription/domain_knowledge/AllAsiaMinimalasof25May2023_2__FOR-EMBEDDING.xlsx' # path_domain_knowledge = 'D:/Dropbox/LeafMachine2/leafmachine2/transcription/domain_knowledge/AllAsiaMinimalasof25May2023_2__TRIMMEDtiny.xlsx' build_new_db = False VVE = VoucherVisionEmbedding(db_name, path_domain_knowledge, build_new_db) test_query = "Golden Thread\nHerbaria of Michigan State University (MSC) and\nUniversiti Kebangsaan Malaysia, Sabah Campus (UKMS)\nUNITED STATES\n3539788\nNATIONAL HERBARIUM\nPLANTS OF BORNEO\nBrookea tomentosa Benth.\nMalaysia. Sabah. Beaufort District: Beaufort Hill. 5°22'N,\n115°45'E. Elev. 200 m. Burned logged dipterocarp forest.\nCrocker Formation. Small tree, corolla cream.\nDet. at K, 1986\n28 August 1983\nWith: Reed S. Beaman and Teofila E. Beamann\nJohn H. Beaman 6844" domain_knowledge_example = VVE.query_db(test_query, 1)