Fetch_Employer_Name / helper.py
chandanzeon's picture
minor change
4cdcc2f
import pandas as pd
import numpy as np
from rank_bm25 import BM25Okapi
import re
from nltk.stem import WordNetLemmatizer, PorterStemmer
from datetime import datetime
lemmatizer = WordNetLemmatizer()
threshold = 11.6 # Threshold score for employer match
def clean_text(text):
"""
Cleans and normalizes the input text by performing the following operations:
- Lowercases the text
- Removes special characters and digits
- Replaces abbreviations with full words (e.g., 'pvt' -> 'private', 'ltd' -> 'limited')
- Lemmatizes the words for normalization
Parameters:
text (str): The input text string to be cleaned.
Returns:
str: The cleaned and lemmatized text.
"""
cleaned_text = text.lower()
cleaned_text = re.sub(r'[^A-Za-z0-9\s./]', ' ', cleaned_text) # Remove special characters
cleaned_text = re.sub(r'\.', '', cleaned_text) # Remove periods
cleaned_text = re.sub(r'\/', '', cleaned_text) # Remove slashes
cleaned_text = re.sub(r'\d{3,}', '', cleaned_text) # Remove numbers with more than 3 digits
cleaned_text = re.sub('pvt', 'private', cleaned_text) # Replace 'pvt' with 'private'
cleaned_text = re.sub('ltd', 'limited', cleaned_text) # Replace 'ltd' with 'limited'
cleaned_text = re.sub(r'(?<!\w)dev(?!\w)', 'development', cleaned_text) # Replace 'dev' with 'development'
cleaned_text = re.sub(r'(?<!\w)co(?!\w)', 'corporation', cleaned_text) # Replace 'co' with 'corporation'
cleaned_text = re.sub(r'\s+', ' ', cleaned_text) # Remove extra spaces
cleaned_text = ' '.join([lemmatizer.lemmatize(word) for word in cleaned_text.split()]) # Lemmatize the words
return cleaned_text.strip()
def fetch_empno(text):
"""
Extracts 6-digit employee numbers from the input text using a regular expression.
Parameters:
text (str): The input text from which to extract employee numbers.
Returns:
list: A list of extracted 6-digit employee numbers.
"""
return re.findall(r'\b\d{6}\b', text)
def preprocess_query(query):
"""
Preprocesses the input query by cleaning and extracting the meaningful part of the text.
- Removes extra data from query if certain characters ('||', '-') are present
- Cleans the query using the `clean_text` function
Parameters:
query (str): The raw query text to preprocess.
Returns:
str: The cleaned and processed query text.
"""
new_query = query
# Extract part of the query after '||' or '-'
if '||' in query:
ind = query.find('||')
new_query = query[ind + 2:]
elif '-' in query:
ind = query.find('-')
new_query = query[ind:]
if len(new_query) < 20:
new_query = query # Restore original query if extracted part is too short
new_query = clean_text(new_query)
return new_query
def parse_date(date_str):
"""
Parses a date string and converts it to the format 'DD/MM/YYYY'.
Handles multiple input date formats.
Parameters:
date_str (str): The input date string.
Returns:
str: The date formatted as 'DD/MM/YYYY', or the original string if parsing fails.
"""
try:
return datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S').strftime('%d/%m/%Y')
except ValueError:
try:
return datetime.strptime(date_str, '%m/%d/%Y').strftime('%d/%m/%Y')
except ValueError:
return date_str # Return original string if parsing fails
def generate_df(master_data, df, employer_names):
"""
Generates a DataFrame by combining employer information from the master data
with transaction data from the input DataFrame.
Parameters:
master_data (DataFrame): The master data containing employer information.
df (DataFrame): The input data with transaction details.
employer_names (list): List of employer names to be matched with master data.
Returns:
DataFrame: A DataFrame combining transaction details with corresponding employer information.
"""
dates = [datetime.strptime(date_str, '%d%m%y').strftime('%d/%m/%Y') for date_str in df[4]]
bank_desc = list(df[9])
accounts = ['NASA' if i == '1001010071' else 'EDAS' if i == '1001233102' else '' for i in df[1]]
credits = list(df[7])
# Initialize lists for employer-related fields
employer_codes, bank_statemnt_ref, account_mgr = [], [], []
emp_province, region, industry, contributing_stts = [], [], [], []
date_joined, termination_date, email_addr = [], [], []
# Iterate through each employer name and retrieve details from the master data
for name in employer_names:
if name == "NOT FOUND":
employer_codes.append(np.nan)
bank_statemnt_ref.append(np.nan)
account_mgr.append(np.nan)
emp_province.append(np.nan)
region.append(np.nan)
industry.append(np.nan)
contributing_stts.append(np.nan)
date_joined.append(np.nan)
termination_date.append(np.nan)
email_addr.append(np.nan)
elif name == "EDAS":
employer_codes.append(np.nan)
bank_statemnt_ref.append(np.nan)
account_mgr.append(np.nan)
emp_province.append(np.nan)
region.append(np.nan)
industry.append(np.nan)
contributing_stts.append(np.nan)
date_joined.append(np.nan)
termination_date.append(np.nan)
email_addr.append(np.nan)
else:
tmp = master_data[master_data['Employer Name'] == name]
if tmp.empty:
employer_codes.append(np.nan)
bank_statemnt_ref.append(np.nan)
account_mgr.append(np.nan)
emp_province.append(np.nan)
region.append(np.nan)
industry.append(np.nan)
contributing_stts.append(np.nan)
date_joined.append(np.nan)
termination_date.append(np.nan)
email_addr.append(np.nan)
else:
employer_codes.append(list(tmp['Employer Number'])[-1])
bank_statemnt_ref.append(list(tmp['Bank Statement Reference'])[-1])
account_mgr.append(list(tmp['NASFUNDContact'])[-1])
emp_province.append(list(tmp['Employer Province'])[-1])
region.append(list(tmp['Region'])[-1])
industry.append(list(tmp['Industry'])[-1])
contributing_stts.append(list(tmp['Contributing Status'])[-1])
date = str(list(tmp['Date Joined Plan'])[-1])
date_joined.append(parse_date(date))
termination_date.append(list(tmp['Termination Date'])[-1])
email_addr.append(list(tmp['Email Addresses'])[-1])
# Construct the final DataFrame
res_df = pd.DataFrame({
'Receipt Date': dates,
'Bank Description': bank_desc,
'Account': accounts,
' Credit ': credits,
'Employer Code': employer_codes,
'Employer Name': employer_names,
'Bank Statement Reference': bank_statemnt_ref,
'Account Manager': account_mgr,
'Employer Province': emp_province,
'Region': region,
'Industry': industry,
'Contributing Status': contributing_stts,
'Date Joined Plan': date_joined,
'Termination Date': termination_date,
'Email Addresses': email_addr,
'First Name': np.nan,
'Surname': np.nan,
'Membership#': np.nan
})
return res_df
def get_res_df(master_data, df, thrshld):
"""
Retrieves the result DataFrame by matching employer names using BM25 algorithm
and employee numbers.
Parameters:
master_data (DataFrame): The master data containing employer information.
df (DataFrame): The input data with transaction details.
Returns:
DataFrame: A DataFrame containing matched employer data and transaction details.
"""
# Preprocess master data
threshold = thrshld
corpus = list(master_data['Employer Name'])
lower_case_corpus = [clean_text(name) for name in corpus]
corpus = corpus[1:] # Exclude the first row if it's a header
lower_case_corpus = lower_case_corpus[1:]
tokenized_corpus = [doc.split(' ') for doc in lower_case_corpus]
bm25 = BM25Okapi(tokenized_corpus) # BM25 model for employer name matching
# Preprocess queries from transaction data
queries = list(df[9])
queries = [query[:query.rindex('-')] for query in queries] # Extract part of the query before '-'
acc_nos = list(df[1])
empnos = [fetch_empno(text) for text in queries]
new_queries = [preprocess_query(query) for query in queries]
exact_matches = []
for query in queries:
match_found = False
for j, comp in enumerate(corpus):
if comp.lower().strip() in query.lower().strip():
exact_matches.append(corpus[j])
match_found = True
break
if not match_found:
exact_matches.append('')
res_names, found_by, scores = [], [], []
found_by_direct_search, found_by_emp_no, found_by_bm5, not_found, edas = 0, 0, 0, 0, 0
# Match each query to an employer
for query,empno_arr,exact_match,acc_no in zip(new_queries,empnos,exact_matches,acc_nos):
if acc_no == '1001233102':
edas+=1
res_names.append("")
found_by.append("EDAS")
else:
name = ""
# Find Employer by Direct Search
if exact_match!='':
name = exact_match
scores.append(100)
found_by_direct_search+=1
found_by.append("Direct Search")
res_names.append(name)
# Try to find an employer using the employee number if Direct Search Fails
elif len(empno_arr) != 0:
for empno in empno_arr:
names = list(master_data[master_data['Employer Number']==empno]['Employer Name'])
if len(names)!=0:
name=names[0]
scores.append(100) # Perfect match with employee number
found_by_emp_no+=1
found_by.append("Employer Number")
res_names.append(name)
break
# Fall back to BM25 matching if employee number fails
if name=="":
tokenized_query = query.split(" ")
name = bm25.get_top_n(tokenized_query, corpus, n=1)
doc_score = max(bm25.get_scores(tokenized_query))
scores.append(doc_score)
if doc_score>threshold:
found_by_bm5 += 1
res_names.append(name[0])
found_by.append("BM25")
else:
not_found+=1
res_names.append("NOT FOUND")
found_by.append("NOT FOUND")
# Generate the final result DataFrame
res_df = generate_df(master_data=master_data, df=df, employer_names=res_names)
print(f"{found_by_direct_search=},{found_by_emp_no=},{found_by_bm5=},{not_found=},{edas=}")
return res_df, found_by_direct_search, found_by_emp_no, found_by_bm5, not_found