Spaces:
Runtime error
Runtime error
#install dependencies | |
from flask import Flask, render_template, request, redirect, url_for | |
import os | |
import shutil | |
import webview | |
import tkinter as tk | |
from tkinter import filedialog | |
import openpyxl | |
import pandas as pd | |
import requests | |
from fuzzywuzzy import fuzz | |
from openpyxl.styles import PatternFill | |
from openpyxl.styles.alignment import Alignment | |
import google.generativeai as genai | |
app = Flask(__name__, static_folder='./static', template_folder='./templates') | |
app.config['UPLOAD_FOLDER'] = 'uploads' | |
app.config['OUTPUT_FOLDER'] = 'output' | |
output_file = None | |
window = webview.create_window('DeDuplicae-Vendor', app) | |
#connect to google gemini API key | |
GOOGLE_API_KEY='AIzaSyCtACPu9EOnEa1_iAWsv_u__PQRpaCT564' | |
genai.configure(api_key=GOOGLE_API_KEY) | |
#Load the gemini model | |
model = genai.GenerativeModel('gemini-pro') | |
# Function to apply to df1 to create the cont_person_name column | |
def process_fuzzy_ratios(rows_dict): | |
fuzz_data = {} | |
for key, row in enumerate(rows_dict): | |
if key == 0: | |
# For the first row, delete specified columns | |
del row["address_fuzzy_ratio"] | |
del row["bank_fuzzy_ratio"] | |
del row["name_fuzzy_ratio"] | |
del row["accgrp_fuzzy_ratio"] | |
del row["tax_fuzzy_ratio"] | |
del row["postal_fuzzy_ratio"] | |
else: | |
# For subsequent rows, store data in fuzz_data dictionary | |
fuzz_data["row_" + str(key + 1)] = { | |
"address_fuzzy_ratio": row.pop("address_fuzzy_ratio"), | |
"bank_fuzzy_ratio": row.pop("bank_fuzzy_ratio"), | |
"name_fuzzy_ratio": row.pop("name_fuzzy_ratio"), | |
"accgrp_fuzzy_ratio": row.pop("accgrp_fuzzy_ratio"), | |
"tax_fuzzy_ratio": row.pop("tax_fuzzy_ratio"), | |
"postal_fuzzy_ratio": row.pop("postal_fuzzy_ratio") | |
} | |
return fuzz_data, rows_dict | |
# Code to perform gemini analysis | |
def gemini_analysis(dataframe): | |
prev_row_duplicate = False | |
prev_row_number = None | |
for index, row in dataframe.iterrows(): | |
# Find duplicate pairs | |
if row['Remarks'] == 'Duplicate': | |
if prev_row_duplicate: | |
duplicate_pairs=[] | |
row1 = dataframe.loc[index-1].to_dict() | |
row2 = row.to_dict() | |
duplicate_pairs.append(row1) | |
duplicate_pairs.append(row2) | |
fuzzy_ratios, duplicate_pairs = process_fuzzy_ratios(duplicate_pairs) | |
for dictionary in duplicate_pairs: | |
for _ in range(12): | |
if dictionary: | |
dictionary.popitem() | |
main_data_str = "[{}]".format(', '.join([str(d) for d in duplicate_pairs])) | |
fuzzy_data_str = "{}".format(fuzzy_ratios) | |
qs="I have the data",main_data_str,"The corresponding fuzzy ratios are here: ",fuzzy_data_str,"Give a concise explanation why these two rows are duplicate based on analyzing the main data and explaining which column values are same and which column values are different?" | |
# Ask gemini to analyse the data | |
try: | |
response = model.generate_content(qs) | |
dataframe.at[index-1, 'Explanation'] = response.text | |
except requests.HTTPError: | |
dataframe.at[index-1, 'Explanation'] = 'An error occured' | |
except ValueError: | |
dataframe.at[index-1, 'Explanation'] = 'An error occured' | |
except Exception: | |
dataframe.at[index-1, 'Explanation'] = 'An error occured' | |
prev_row_duplicate = True | |
else: | |
prev_row_duplicate = False | |
# The logic to find duplicacy | |
def process_csv(file, check=['Tax','Bank','Address','Name','PostCode','AccGrp']): | |
def calculate_tax_duplicacy(df): | |
df.sort_values(['Tax'], inplace=True) | |
df = df.reset_index(drop=True) | |
df.at[0, 'tax_fuzzy_ratio'] = 100 | |
last_row_index = len(df) - 1 | |
df.at[last_row_index, 'tax_fuzzy_ratio'] = 100 | |
for i in range(1, last_row_index): | |
current_tax = df['Tax'].iloc[i] | |
previous_tax = df['Tax'].iloc[i - 1] | |
fuzzy_ratio = fuzz.ratio(previous_tax, current_tax) | |
df.at[i, 'tax_fuzzy_ratio'] = fuzzy_ratio | |
df['tax_fuzzy_ratio'] = pd.to_numeric(df['tax_fuzzy_ratio'], errors='coerce') | |
# Calculate the duplicate groups based on tax column | |
group_counter = 1 | |
df.at[0, 'tax_based_group'] = group_counter | |
for i in range(1, len(df)): | |
if df.at[i, 'tax_fuzzy_ratio'] > 90: | |
df.at[i, 'tax_based_group'] = df.at[i - 1, 'tax_based_group'] | |
else: | |
group_counter += 1 | |
df.at[i, 'tax_based_group'] = group_counter | |
return df | |
def calculate_bank_duplicacy(df): | |
df.sort_values(['Group_tax', 'Bank'], inplace=True) | |
df = df.reset_index(drop=True) | |
df.at[0, 'bank_fuzzy_ratio'] = 100 | |
df.at[last_row_index, 'bank_fuzzy_ratio'] = 100 | |
for i in range(1, last_row_index): | |
current_address = df['Bank'].iloc[i] | |
previous_address = df['Bank'].iloc[i - 1] | |
fuzzy_ratio = fuzz.ratio(previous_address, current_address) | |
df.at[i, 'bank_fuzzy_ratio'] = fuzzy_ratio | |
df['bank_fuzzy_ratio'] = pd.to_numeric(df['bank_fuzzy_ratio'], errors='coerce') | |
# Calculate the duplicate groups for bank column | |
bank_group_counter = 1 | |
df.at[0, 'bank_based_group'] = str(bank_group_counter) | |
group = df.at[0, 'tax_based_group'] | |
for i in range(1, len(df)): | |
if df.at[i, 'bank_fuzzy_ratio'] >= 100: | |
df.at[i, 'bank_based_group'] = df.at[i - 1, 'bank_based_group'] | |
else: | |
if df.at[i, 'tax_based_group'] != group: | |
bank_group_counter = 1 | |
group = df.at[i, 'tax_based_group'] | |
else: | |
bank_group_counter += 1 | |
df.at[i, 'bank_based_group'] = str(bank_group_counter) | |
return df | |
def calculate_address_duplicacy(df): | |
df.sort_values(['Group_tax_bank', 'Address'], inplace=True) | |
df = df.reset_index(drop=True) | |
df.at[0, 'address_fuzzy_ratio'] = 100 | |
df.at[last_row_index, 'address_fuzzy_ratio'] = 100 | |
for i in range(1, last_row_index): | |
current_address = df['Address'].iloc[i] | |
previous_address = df['Address'].iloc[i - 1] | |
fuzzy_ratio = fuzz.ratio(previous_address, current_address) | |
df.at[i, 'address_fuzzy_ratio'] = fuzzy_ratio | |
df['address_fuzzy_ratio'] = pd.to_numeric(df['address_fuzzy_ratio'], errors='coerce') | |
# Calculate the duplicate groups for address column | |
address_group_counter = 1 | |
df.at[0, 'address_based_group'] = str(address_group_counter) | |
group = df.at[0, 'Group_tax_bank'] | |
for i in range(1, len(df)): | |
if df.at[i, 'address_fuzzy_ratio'] > 70: | |
df.at[i, 'address_based_group'] = df.at[i - 1, 'address_based_group'] | |
else: | |
if df.at[i, 'Group_tax_bank'] != group: | |
address_group_counter = 1 | |
group = df.at[i, 'Group_tax_bank'] | |
else: | |
address_group_counter += 1 | |
df.at[i, 'address_based_group'] = str(address_group_counter) | |
return df | |
def calculate_name_duplicacy(df): | |
df.sort_values(['Group_tax_bank_add', 'Name'], inplace=True) | |
df = df.reset_index(drop=True) | |
df.at[0, 'name_fuzzy_ratio'] = 100 | |
df.at[last_row_index, 'name_fuzzy_ratio'] = 100 | |
for i in range(1, last_row_index): | |
current_address = df['Name'].iloc[i] | |
previous_address = df['Name'].iloc[i - 1] | |
fuzzy_ratio = fuzz.ratio(previous_address, current_address) | |
df.at[i, 'name_fuzzy_ratio'] = fuzzy_ratio | |
df['name_fuzzy_ratio'] = pd.to_numeric(df['name_fuzzy_ratio'], errors='coerce') | |
# Calculate the duplicate groups for name column | |
name_group_counter = 1 | |
df.at[0, 'name_based_group'] = str(name_group_counter) | |
group = df.at[0, 'Group_tax_bank_add'] | |
for i in range(1, len(df)): | |
if df.at[i, 'name_fuzzy_ratio'] > 80: | |
df.at[i, 'name_based_group'] = df.at[i - 1, 'name_based_group'] | |
else: | |
if df.at[i, 'Group_tax_bank_add'] != group: | |
name_group_counter = 1 | |
group = df.at[i, 'Group_tax_bank_add'] | |
else: | |
name_group_counter += 1 | |
df.at[i, 'name_based_group'] = str(name_group_counter) | |
return df | |
def calculate_postcode_duplicacy(df): | |
df.sort_values(['Group_tax_bank_add_name', 'POSTCODE1'], inplace=True) | |
df = df.reset_index(drop=True) | |
df.at[0, 'postal_fuzzy_ratio'] = 100 | |
df.at[last_row_index, 'postal_fuzzy_ratio'] = 100 | |
for i in range(1, last_row_index): | |
current_address = df['POSTCODE1'].iloc[i] | |
previous_address = df['POSTCODE1'].iloc[i - 1] | |
fuzzy_ratio = fuzz.ratio(previous_address, current_address) | |
df.at[i, 'postal_fuzzy_ratio'] = fuzzy_ratio | |
df['postal_fuzzy_ratio'] = pd.to_numeric(df['postal_fuzzy_ratio'], errors='coerce') | |
# Calculate the duplicate groups for postcode column | |
postcode_group_counter = 1 | |
df.at[0, 'postal_based_group'] = str(postcode_group_counter) | |
group = df.at[0, 'Group_tax_bank_add_name'] | |
for i in range(1, len(df)): | |
if df.at[i, 'postal_fuzzy_ratio'] > 90: | |
df.at[i, 'postal_based_group'] = df.at[i - 1, 'postal_based_group'] | |
else: | |
if df.at[i, 'Group_tax_bank_add_name'] != group: | |
postcode_group_counter = 1 | |
group = df.at[i, 'Group_tax_bank_add_name'] | |
else: | |
postcode_group_counter += 1 | |
df.at[i, 'postal_based_group'] = str(postcode_group_counter) | |
return df | |
def calculate_accgrp_duplicacy(df): | |
df.sort_values(['Group_tax_bank_add_name_post', 'KTOKK'], inplace=True) | |
df = df.reset_index(drop=True) | |
df.at[0, 'accgrp_fuzzy_ratio'] = 100 | |
df.at[last_row_index, 'accgrp_fuzzy_ratio'] = 100 | |
for i in range(1, last_row_index): | |
current_address = df['KTOKK'].iloc[i] | |
previous_address = df['KTOKK'].iloc[i - 1] | |
fuzzy_ratio = fuzz.ratio(previous_address, current_address) | |
df.at[i, 'accgrp_fuzzy_ratio'] = fuzzy_ratio | |
df['accgrp_fuzzy_ratio'] = pd.to_numeric(df['accgrp_fuzzy_ratio'], errors='coerce') | |
# Calculate the duplicate groups for accgrp column | |
accgrp_group_counter = 1 | |
df.at[0, 'accgrp_based_group'] = str(accgrp_group_counter) | |
group = df.at[0, 'Group_tax_bank_add_name_post'] | |
for i in range(1, len(df)): | |
if df.at[i, 'accgrp_fuzzy_ratio'] >= 100: | |
df.at[i, 'accgrp_based_group'] = df.at[i - 1, 'accgrp_based_group'] | |
else: | |
if df.at[i, 'Group_tax_bank_add_name_post'] != group: | |
accgrp_group_counter = 1 | |
group = df.at[i, 'Group_tax_bank_add_name_post'] | |
else: | |
accgrp_group_counter += 1 | |
df.at[i, 'accgrp_based_group'] = str(accgrp_group_counter) | |
return df | |
# Search for the header row | |
def find_header_row(file_path, specified_headers, sheet_name): | |
workbook = openpyxl.load_workbook(file_path) | |
sheet = workbook[sheet_name] | |
header_row = None | |
temp_values = [] | |
for row in sheet.iter_rows(): | |
for cell in row: | |
if cell.value in specified_headers: | |
header_row = cell.row | |
break | |
if header_row is not None: | |
break | |
if header_row is None: | |
return | |
# Store values in temporary variable | |
for row in range(1, header_row): | |
for cell in sheet[row]: | |
temp_values.append(cell.value) | |
# Read DataFrame below the header row using pandas | |
df = pd.DataFrame(sheet.iter_rows(min_row=header_row + 1, values_only=True), | |
columns=[cell.value for cell in next(sheet.iter_rows(min_row=header_row))]) | |
return header_row, temp_values, df | |
sheet_name1 = 'General Data ' | |
specified_headers = ["LIFNR", "KTOKK", "NAMEFIRST", "NAMELAST", "NAME3", "NAME4", "STREET", "POSTCODE1", "CITY1", "COUNTRY", "REGION", "SMTPADDR", "BANKL", "BANKN", "TAXTYPE", "TAXNUM", "Unnamed: 16", "Unnamed: 17", "Unnamed: 18"] | |
header_row, temp_values, df = find_header_row(file, specified_headers, sheet_name1) | |
# Replace null values with a blank space | |
df = df.fillna(" ") | |
# Creating new columns by concatenating original columns | |
df['Address'] = df['STREET'].astype(str) + '-' + df['CITY1'].astype(str) + '-' + df['COUNTRY'].astype(str) + '-' + \ | |
df['REGION'].astype(str) | |
df['Name'] = df['NAMEFIRST'].astype(str) + '-' + df['NAMELAST'].astype(str) + '-' + df['NAME3'].astype(str) + '-' + \ | |
df['NAME4'].astype(str) | |
df['Bank'] = df['BANKL'].astype(str) + '-' + df['BANKN'].astype(str) | |
df['Tax'] = df['TAXTYPE'].astype(str) + '-' + df['TAXNUM'].astype(str) | |
# Converting all concatenated columns to lowercase | |
df['Name'] = df['Name'].str.lower() | |
df['Address'] = df['Address'].str.lower() | |
df['Bank'] = df['Bank'].str.lower() | |
df['Tax'] = df['Tax'].str.lower() | |
# Create new columns with the following names for fuzzy ratio | |
df['name_fuzzy_ratio'] = '' | |
df['accgrp_fuzzy_ratio'] = '' | |
df['address_fuzzy_ratio'] = '' | |
df['bank_fuzzy_ratio'] = '' | |
df['tax_fuzzy_ratio'] = '' | |
df['postal_fuzzy_ratio'] = '' | |
# Create new columns with the following names for crearing groups | |
df['name_based_group'] = '' | |
df['accgrp_based_group'] = '' | |
df['address_based_group'] = '' | |
df['bank_based_group'] = '' | |
df['tax_based_group'] = '' | |
df['postal_based_group'] = '' | |
# Calculate last row index value | |
last_row_index = len(df) - 1 | |
# Calculate the fuzzy ratios for tax column | |
if 'Tax' in check: | |
df = calculate_tax_duplicacy(df) | |
df['Group_tax'] = df.apply(lambda row: '{}'.format(row['tax_based_group']), axis=1) | |
# Calculate the fuzzy ratios for bank column | |
if 'Bank' in check: | |
df = calculate_bank_duplicacy(df) | |
df['Group_tax_bank'] = df.apply(lambda row: '{}_{}'.format(row['tax_based_group'], row['bank_based_group']), axis=1) | |
# Calculate the fuzzy ratios for address column | |
if 'Address' in check: | |
df = calculate_address_duplicacy(df) | |
df['Group_tax_bank_add'] = df.apply(lambda row: '{}_{}'.format(row['Group_tax_bank'], row['address_based_group']), | |
axis=1) | |
# Calculate the fuzzy ratios for name column | |
if 'Name' in check: | |
df = calculate_name_duplicacy(df) | |
df['Group_tax_bank_add_name'] = df.apply( | |
lambda row: '{}_{}'.format(row['Group_tax_bank_add'], row['name_based_group']), axis=1) | |
# Calculate the fuzzy ratios for postcode column | |
if 'PostCode' in check: | |
df = calculate_postcode_duplicacy(df) | |
df['Group_tax_bank_add_name_post'] = df.apply( | |
lambda row: '{}_{}'.format(row['Group_tax_bank_add_name'], row['postal_based_group']), axis=1) | |
# Calculate the fuzzy ratios for accgrp column | |
if 'AccGrp' in check: | |
df = calculate_accgrp_duplicacy(df) | |
df['Group_tax_bank_add_name_post_accgrp'] = df.apply( | |
lambda row: '{}_{}'.format(row['Group_tax_bank_add_name_post'], row['accgrp_based_group']), axis=1) | |
# Find the final duplicate groups in AND condition | |
duplicate_groups = df['Group_tax_bank_add_name_post_accgrp'].duplicated(keep=False) | |
df['Remarks'] = ['Duplicate' if is_duplicate else 'Unique' for is_duplicate in duplicate_groups] | |
# Ask gemini to analyse the duplicate columns | |
gemini_analysis(df) | |
# Drop the columns related to fuzzy ratios and groups | |
columns_to_drop = ['name_fuzzy_ratio', 'accgrp_fuzzy_ratio', 'address_fuzzy_ratio', 'bank_fuzzy_ratio', | |
'tax_fuzzy_ratio', 'postal_fuzzy_ratio', 'name_based_group', 'accgrp_based_group', | |
'address_based_group', 'bank_based_group', 'tax_based_group', 'postal_based_group', | |
'Group_tax_bank', 'Group_tax_bank_add', 'Group_tax_bank_add_name', | |
'Group_tax_bank_add_name_post', 'Group_tax', 'Group_tax_bank_add_name_post_accgrp'] | |
df = df.drop(columns=columns_to_drop, axis=1) | |
df.to_excel('output/output.xlsx', index=False) | |
excel_writer = pd.ExcelWriter('output/output.xlsx', engine='openpyxl') | |
df.to_excel(excel_writer, index=False, sheet_name='Sheet1') | |
# Access the workbook | |
workbook = excel_writer.book | |
worksheet = workbook['Sheet1'] | |
# Apply row coloring based on the value in the 'Remarks' column and also wrap the texts | |
duplicate_fill = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid") | |
for idx, row in df.iterrows(): | |
if row['Remarks'] == 'Duplicate': | |
for cell in worksheet[idx + 2]: | |
cell.alignment = Alignment(wrap_text=True) | |
cell.fill = duplicate_fill | |
# Iterate over columns and set their width | |
for col in worksheet.columns: | |
col_letter = col[0].column_letter | |
worksheet.column_dimensions[col_letter].width = 28 | |
# Iterate over rows and set their height | |
for row in worksheet.iter_rows(): | |
worksheet.row_dimensions[row[0].row].height = 20 | |
# Save the changes | |
excel_writer.close() | |
output_path = os.path.join(app.config['OUTPUT_FOLDER'], 'output.xlsx') | |
return output_path | |
def save_error_message(error_message): | |
with open('static/error.txt', 'w') as f: | |
f.write(error_message) | |
def upload_file(): | |
global output_file | |
error_message = None | |
if request.method == 'POST': | |
file = request.files['file'] | |
selected_options = request.form.getlist('option') | |
if file: | |
try: | |
file_path = os.path.join(app.config['UPLOAD_FOLDER'], file.filename) | |
file.save(file_path) | |
output_file = process_csv(file_path) | |
return redirect(url_for('upload_file')) | |
except Exception as e: | |
error_message = str(e) | |
save_error_message(error_message) | |
return render_template('index.html', output_file=output_file, error_message=error_message) | |
def save_file_dialog(default_filename="output.xlsx", filetypes=(("XLSX files", ".xlsx"), ("All files", ".*"))): | |
root = tk.Tk() | |
root.withdraw() | |
file_path = filedialog.asksaveasfilename(initialfile=default_filename, filetypes=filetypes, defaultextension=".xlsx") | |
return file_path | |
def download_file(): | |
output_file_path = os.path.join(app.config['OUTPUT_FOLDER'], 'output.xlsx') | |
selected_path = save_file_dialog() | |
if selected_path: | |
shutil.copyfile(output_file_path, selected_path) | |
return redirect(url_for('upload_file')) | |
if __name__ == '__main__': | |
app.run(debug=True) |