Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import gradio as gr | |
| import pandas as pd | |
| import Levenshtein | |
| from typing import List, Type | |
| from datetime import datetime | |
| import re | |
| from search_funcs.helper_functions import create_highlighted_excel_wb, output_folder, load_spacy_model | |
| from spacy import prefer_gpu | |
| from spacy.matcher import Matcher, PhraseMatcher | |
| PandasDataFrame = Type[pd.DataFrame] | |
| today_rev = datetime.now().strftime("%Y%m%d") | |
| def spacy_fuzzy_search(string_query:str, tokenised_data: List[List[str]], original_data: PandasDataFrame, text_column:str, in_join_file: PandasDataFrame, search_df_join_column:str, in_join_column:str, spelling_mistakes_max:int = 1, search_whole_phrase:bool=False, progress=gr.Progress(track_tqdm=True)): | |
| ''' Conduct fuzzy match on a list of data.''' | |
| if not tokenised_data: | |
| out_message = "Prepared data not found. Have you clicked 'Load data' above to prepare a search index?" | |
| print(out_message) | |
| return out_message, None | |
| # Lower case query | |
| string_query = string_query.lower() | |
| prefer_gpu() | |
| # Load spaCy model | |
| nlp = load_spacy_model() | |
| # Convert tokenised data back into a list of strings | |
| df_list = list(map(" ".join, tokenised_data)) | |
| if len(df_list) > 100000: | |
| out_message = "Your data has more than 100,000 rows and will take more than 30 minutes to do a fuzzy search. Please try keyword or semantic search for data of this size." | |
| return out_message, None | |
| query = nlp(string_query) | |
| if search_whole_phrase == False: | |
| tokenised_query = [token.text for token in query] | |
| spelling_mistakes_fuzzy_pattern = "FUZZY" + str(spelling_mistakes_max) | |
| if len(tokenised_query) > 1: | |
| pattern_lemma = [{"LEMMA": {"IN": tokenised_query}}] | |
| pattern_fuzz = [{"TEXT": {spelling_mistakes_fuzzy_pattern: {"IN": tokenised_query}}}] | |
| else: | |
| pattern_lemma = [{"LEMMA": tokenised_query[0]}] | |
| pattern_fuzz = [{"TEXT": {spelling_mistakes_fuzzy_pattern: tokenised_query[0]}}] | |
| matcher = Matcher(nlp.vocab) | |
| matcher.add(string_query, [pattern_fuzz]) | |
| matcher.add(string_query, [pattern_lemma]) | |
| else: | |
| # If matching a whole phrase, use Spacy PhraseMatcher, then consider similarity after using Levenshtein distance. | |
| tokenised_query = [string_query.lower()] | |
| # If you want to match the whole phrase, use phrase matcher | |
| matcher = PhraseMatcher(nlp.vocab, attr="LOWER") | |
| patterns = [nlp.make_doc(string_query)] # Convert query into a Doc object | |
| matcher.add("PHRASE", patterns) | |
| batch_size = 256 | |
| docs = nlp.pipe(df_list, batch_size=batch_size) | |
| # %% | |
| all_matches = [] | |
| # Get number of matches per doc | |
| for doc in progress.tqdm(docs, desc = "Searching text", unit = "rows"): | |
| matches = matcher(doc) | |
| match_count = len(matches) | |
| # If considering each sub term individually, append match. If considering together, consider weight of the relevance to that of the whole phrase. | |
| if search_whole_phrase==False: | |
| all_matches.append(match_count) | |
| else: | |
| for match_id, start, end in matches: | |
| span = str(doc[start:end]).strip() | |
| query_search = str(query).strip() | |
| distance = Levenshtein.distance(query_search, span) | |
| # Compute a semantic similarity estimate. Defaults to cosine over vectors. | |
| if distance > spelling_mistakes_max: | |
| # Calculate Levenshtein distance | |
| match_count = match_count - 1 | |
| all_matches.append(match_count) | |
| #print("all_matches:", all_matches) | |
| print("Search complete") | |
| ## Get document lengths | |
| lengths = [] | |
| for element in df_list: | |
| lengths.append(len(element)) | |
| # Score is number of matches divided by length of document | |
| match_scores = (np.array(all_matches)/np.array(lengths)).tolist() | |
| # Prepare results and export | |
| results_df = pd.DataFrame(data={"index": list(range(len(df_list))), | |
| "search_text": df_list, | |
| "search_score_abs": match_scores}) | |
| results_df['search_score_abs'] = abs(round(results_df['search_score_abs']*100, 2)) | |
| results_df_out = results_df[['index', 'search_text', 'search_score_abs']].merge(original_data,left_on="index", right_index=True, how="left").drop(["index_x", "index_y"], axis=1, errors="ignore") | |
| # Keep only results with at least one match | |
| results_df_out = results_df_out.loc[results_df["search_score_abs"] > 0, :] | |
| # Join on additional files | |
| if not in_join_file.empty: | |
| progress(0.5, desc = "Joining on additional data file") | |
| join_df = in_join_file | |
| join_df[in_join_column] = join_df[in_join_column].astype(str).str.replace("\.0$","", regex=True) | |
| results_df_out[search_df_join_column] = results_df_out[search_df_join_column].astype(str).str.replace("\.0$","", regex=True) | |
| # Duplicates dropped so as not to expand out dataframe | |
| join_df = join_df.drop_duplicates(in_join_column) | |
| results_df_out = results_df_out.merge(join_df,left_on=search_df_join_column, right_on=in_join_column, how="left", suffixes=('','_y'))#.drop(in_join_column, axis=1) | |
| # Reorder results by score | |
| results_df_out = results_df_out.sort_values('search_score_abs', ascending=False) | |
| # Out file | |
| query_str_file = "_".join(tokenised_query).replace(" ", "_") # Replace spaces with underscores | |
| query_str_file = re.sub(r'[<>:"/\\|?*]', '', query_str_file) # Remove invalid characters | |
| query_str_file = query_str_file[:100] # Limit to 100 characters | |
| results_df_name = output_folder + "fuzzy_keyword_search_result_" + today_rev + "_" + query_str_file + ".xlsx" | |
| print("Saving search file output") | |
| progress(0.7, desc = "Saving search output to file") | |
| #results_df_out.to_excel(results_df_name, index= None) | |
| #print("string_query:", string_query) | |
| #print(results_df_out) | |
| # Highlight found text and save to file | |
| results_df_out_wb = create_highlighted_excel_wb(results_df_out, string_query, "search_text") | |
| results_df_out_wb.save(results_df_name) | |
| #results_first_text = results_df_out[text_column].iloc[0] | |
| # Check if the DataFrame is empty or if the column does not exist | |
| if results_df_out.empty or text_column not in results_df_out.columns: | |
| results_first_text = "" #None # or handle it as needed | |
| print("Nothing found.") | |
| else: | |
| results_first_text = results_df_out[text_column].iloc[0] | |
| print("Returning results") | |
| return results_first_text, results_df_name |