import numpy as np | |
import gradio as gr | |
import pandas as pd | |
import Levenshtein | |
from typing import List, Type | |
from datetime import datetime | |
import re | |
from search_funcs.helper_functions import create_highlighted_excel_wb, output_folder, load_spacy_model | |
from spacy import prefer_gpu | |
from spacy.matcher import Matcher, PhraseMatcher | |
PandasDataFrame = Type[pd.DataFrame] | |
today_rev ="%Y%m%d") | |
def spacy_fuzzy_search(string_query:str, tokenised_data: List[List[str]], original_data: PandasDataFrame, text_column:str, in_join_file: PandasDataFrame, search_df_join_column:str, in_join_column:str, spelling_mistakes_max:int = 1, search_whole_phrase:bool=False, progress=gr.Progress(track_tqdm=True)): | |
''' Conduct fuzzy match on a list of data.''' | |
if not tokenised_data: | |
out_message = "Prepared data not found. Have you clicked 'Load data' above to prepare a search index?" | |
print(out_message) | |
return out_message, None | |
# Lower case query | |
string_query = string_query.lower() | |
prefer_gpu() | |
# Load spaCy model | |
nlp = load_spacy_model() | |
# Convert tokenised data back into a list of strings | |
df_list = list(map(" ".join, tokenised_data)) | |
if len(df_list) > 100000: | |
out_message = "Your data has more than 100,000 rows and will take more than 30 minutes to do a fuzzy search. Please try keyword or semantic search for data of this size." | |
return out_message, None | |
query = nlp(string_query) | |
if search_whole_phrase == False: | |
tokenised_query = [token.text for token in query] | |
spelling_mistakes_fuzzy_pattern = "FUZZY" + str(spelling_mistakes_max) | |
if len(tokenised_query) > 1: | |
pattern_lemma = [{"LEMMA": {"IN": tokenised_query}}] | |
pattern_fuzz = [{"TEXT": {spelling_mistakes_fuzzy_pattern: {"IN": tokenised_query}}}] | |
else: | |
pattern_lemma = [{"LEMMA": tokenised_query[0]}] | |
pattern_fuzz = [{"TEXT": {spelling_mistakes_fuzzy_pattern: tokenised_query[0]}}] | |
matcher = Matcher(nlp.vocab) | |
matcher.add(string_query, [pattern_fuzz]) | |
matcher.add(string_query, [pattern_lemma]) | |
else: | |
# If matching a whole phrase, use Spacy PhraseMatcher, then consider similarity after using Levenshtein distance. | |
tokenised_query = [string_query.lower()] | |
# If you want to match the whole phrase, use phrase matcher | |
matcher = PhraseMatcher(nlp.vocab, attr="LOWER") | |
patterns = [nlp.make_doc(string_query)] # Convert query into a Doc object | |
matcher.add("PHRASE", patterns) | |
batch_size = 256 | |
docs = nlp.pipe(df_list, batch_size=batch_size) | |
# %% | |
all_matches = [] | |
# Get number of matches per doc | |
for doc in progress.tqdm(docs, desc = "Searching text", unit = "rows"): | |
matches = matcher(doc) | |
match_count = len(matches) | |
# If considering each sub term individually, append match. If considering together, consider weight of the relevance to that of the whole phrase. | |
if search_whole_phrase==False: | |
all_matches.append(match_count) | |
else: | |
for match_id, start, end in matches: | |
span = str(doc[start:end]).strip() | |
query_search = str(query).strip() | |
distance = Levenshtein.distance(query_search, span) | |
# Compute a semantic similarity estimate. Defaults to cosine over vectors. | |
if distance > spelling_mistakes_max: | |
# Calculate Levenshtein distance | |
match_count = match_count - 1 | |
all_matches.append(match_count) | |
#print("all_matches:", all_matches) | |
print("Search complete") | |
## Get document lengths | |
lengths = [] | |
for element in df_list: | |
lengths.append(len(element)) | |
# Score is number of matches divided by length of document | |
match_scores = (np.array(all_matches)/np.array(lengths)).tolist() | |
# Prepare results and export | |
results_df = pd.DataFrame(data={"index": list(range(len(df_list))), | |
"search_text": df_list, | |
"search_score_abs": match_scores}) | |
results_df['search_score_abs'] = abs(round(results_df['search_score_abs']*100, 2)) | |
results_df_out = results_df[['index', 'search_text', 'search_score_abs']].merge(original_data,left_on="index", right_index=True, how="left").drop(["index_x", "index_y"], axis=1, errors="ignore") | |
# Keep only results with at least one match | |
results_df_out = results_df_out.loc[results_df["search_score_abs"] > 0, :] | |
# Join on additional files | |
if not in_join_file.empty: | |
progress(0.5, desc = "Joining on additional data file") | |
join_df = in_join_file | |
join_df[in_join_column] = join_df[in_join_column].astype(str).str.replace("\.0$","", regex=True) | |
results_df_out[search_df_join_column] = results_df_out[search_df_join_column].astype(str).str.replace("\.0$","", regex=True) | |
# Duplicates dropped so as not to expand out dataframe | |
join_df = join_df.drop_duplicates(in_join_column) | |
results_df_out = results_df_out.merge(join_df,left_on=search_df_join_column, right_on=in_join_column, how="left", suffixes=('','_y'))#.drop(in_join_column, axis=1) | |
# Reorder results by score | |
results_df_out = results_df_out.sort_values('search_score_abs', ascending=False) | |
# Out file | |
query_str_file = "_".join(tokenised_query).replace(" ", "_") # Replace spaces with underscores | |
query_str_file = re.sub(r'[<>:"/\\|?*]', '', query_str_file) # Remove invalid characters | |
query_str_file = query_str_file[:100] # Limit to 100 characters | |
results_df_name = output_folder + "fuzzy_keyword_search_result_" + today_rev + "_" + query_str_file + ".xlsx" | |
print("Saving search file output") | |
progress(0.7, desc = "Saving search output to file") | |
#results_df_out.to_excel(results_df_name, index= None) | |
#print("string_query:", string_query) | |
#print(results_df_out) | |
# Highlight found text and save to file | |
results_df_out_wb = create_highlighted_excel_wb(results_df_out, string_query, "search_text") | | | |
#results_first_text = results_df_out[text_column].iloc[0] | |
# Check if the DataFrame is empty or if the column does not exist | |
if results_df_out.empty or text_column not in results_df_out.columns: | |
results_first_text = "" #None # or handle it as needed | |
print("Nothing found.") | |
else: | |
results_first_text = results_df_out[text_column].iloc[0] | |
print("Returning results") | |
return results_first_text, results_df_name |