import pandas as pd import numpy as np import re import os import warnings import gradio as gr import re import zipfile import datetime import openpyxl from openpyxl.styles import Font, PatternFill from openpyxl.utils import column_index_from_string, get_column_letter g_mapping = None elems = """ #button { /* Permalink - use to edit and share this gradient: https://colorzilla.com/gradient-editor/#f6e6b4+0,ed9017+100;Yellow+3D+%231 */ background: #f6e6b4; /* Old browsers */ background: -moz-linear-gradient(top, #f6e6b4 0%, #ed9017 100%); /* FF3.6-15 */ background: -webkit-linear-gradient(top, #f6e6b4 0%,#ed9017 100%); /* Chrome10-25,Safari5.1-6 */ background: linear-gradient(to bottom, #f6e6b4 0%,#ed9017 100%); /* W3C, IE10+, FF16+, Chrome26+, Opera12+, Safari7+ */ filter: progid:DXImageTransform.Microsoft.gradient( startColorstr='#f6e6b4', endColorstr='#ed9017',GradientType=0 ); /* IE6-9 */ text-shadow: 2px 2px 10px #000000; } """ def download_csv_as_dataframe(url): import io import pandas as pd import requests if 'drive.google.com' in url: # Google Drive link file_id = url.split('/')[-2] download_url = f'https://drive.google.com/uc?id={file_id}' elif 'docs.google.com/spreadsheets' in url: # Google Sheets link file_id = url.split('/')[-2] download_url = f'https://docs.google.com/spreadsheets/d/{file_id}/export?format=csv' else: print('Invalid URL') return None # Send a GET request to download the file response = requests.get(download_url) # Read the content as CSV and convert to DataFrame content = response.content.decode('utf-8') df = pd.read_csv(io.StringIO(content)) return df def map_names(odf,fname): global g_mapping msg = None if g_mapping is None: g_mapping = download_csv_as_dataframe('https://docs.google.com/spreadsheets/d/1rVoLrrTEDzU79x2H2Z1lJ7-z_jRbt-NMUdTarjLvSGo/edit?usp=drive_link') mapping = g_mapping#pd.read_csv("data_automation_mapping.csv") fname = fname.lower() ftype = next((element for element in [x for x in list(mapping['type'].unique())] if element.lower() in fname), None) fcompany = next((element for element in [x for x in list(mapping['company'].unique())] if element.lower() in fname), None) mapped_frame = None if ftype is not None and fcompany is not None: print(fname,"has been successfully remapped") query_result = mapping[(mapping['type'].str.lower() == ftype.lower()) & (mapping['company'].str.lower() == fcompany.lower())] mapped_frame = query_result for index, row in mapped_frame.iterrows(): original_val = row['original'] rename_val = row['rename'] odf = odf.replace(original_val, rename_val) #display(odf) mapped_frame = odf else: mapped_frame = odf msg = ' LOB has not been mapped for this file as name must have insurance line of business type (example: as_motor_summary.csv)' print(msg) return mapped_frame,msg def get_lob(df): global g_mapping if g_mapping is None: g_mapping = download_csv_as_dataframe('https://docs.google.com/spreadsheets/d/1rVoLrrTEDzU79x2H2Z1lJ7-z_jRbt-NMUdTarjLvSGo/edit?usp=drive_link') mapping = g_mapping column_names = set(df.columns) best_match_col = None max_matches = 0 for pattern in ["lob", "market_segment", "product", "class_of_business", 'type']: matching_columns = {col for col in column_names if pattern in col.lower()} for col in matching_columns: matches = sum(df[col].isin(g_mapping['original'])) if matches > max_matches: best_match_col = col max_matches = matches column_names -= matching_columns return best_match_col if max_matches > 0 else None def get_paid_amount(df): for col in df.columns: # Replace "Gross" with "amount" in column name if "Gross" in col or "gross" in col: new_col = col.replace("Gross", "amount").replace("gross", "amount") else: new_col = col # If "paid" and "amount" are in the column name, return the column name if "paid" in new_col.lower() and "amount" in new_col.lower(): return col # If "paid" and "claim" are in the column name, return the column name if "paid" in new_col.lower() and "claim" in new_col.lower(): return col return None def get_gross_os(df): for col in df.columns: if 'ri' in col.lower(): continue new_col = col.replace("gross", "amount").replace("Gross", "Amount") if "amount" in new_col.lower() and "os" in new_col.lower(): return col if "os" in new_col.lower() and "claim" in new_col.lower(): return col return None def get_recover_os(df): for col in df.columns: # If "recover" and "os" are in the column name, return the column name if "recover" in col.lower() and "os" in col.lower() and "ed" not in col.lower(): return col return None def get_gross_recoveries(df): for col in df.columns: # Replace "settled" with "amount" in column name new_col = col.replace("settled", "amount").replace("Settled", "Amount") # If "recover" and "amount" are in the column name, return the column name if "recover" in new_col.lower() and "amount" in new_col.lower(): return col # If "gross" and "recover" are in the column name, return the column name if "gross" in new_col.lower() and "recover" in new_col.lower(): return col return None def get_claim_count(df): for col in df.columns: # If "claim" and "count" are in the column name, return the column name if "claim" in col.lower() and "count" in col.lower(): return col return None def get_quarter_bracket(df): columns = df.columns for col in columns: if col.lower() == "quarter_bracket": return col return None def get_earned(df): for col in df.columns: # If "GEP" is in the column name, return the column name if "gep" in col.lower(): return col # If "premium" and "earned" are in the column name, return the column name if "premium" in col.lower() and "earned" in col.lower(): return col return None def get_erp(df): for col in df.columns: # If "ERP" is in the column name, return the column name if "erp" in col.lower(): return col return None def quarters(df): valid_cols = [] df = df.applymap(lambda x: str(int(x)) if isinstance(x, (int, float)) and str(x) != 'nan' else str(x)) for col in df.columns: # Check if all values in column are either 'nan' or numeric if all(df[col].apply(lambda x: str(x).isnumeric() or str(x) == 'nan')): # Check if column has at least one value with length of 6 if any(df[col].apply(lambda x: len(str(x))) == 6): # Check if all non-zero numeric values end with '03', '06', '09', or '12' filtered = df[df[col] != '0'] filtered = filtered[filtered[col].apply(lambda x: str(x).isnumeric())] if filtered[col].apply(lambda x: x[-2:]).isin(['03', '06', '09', '12']).all(): valid_cols.append(col) valid_cols = [elem for elem in valid_cols if "report" not in elem.lower() if "effect" not in elem.lower()] return valid_cols def col_to_ints(df,columns_to_convert): for col in columns_to_convert: df[col] = df[col].apply(lambda x: str(int(x)) if isinstance(x, (int, float)) and str(x) != 'nan' else str(x)) return df def fill_missing_quarters(df, lob, acc, transaction): filled = [] missing_count = 0 lobs_dict = dict() print('accident',acc,'transaction',transaction) columns_to_convert = [acc,transaction] # Only affect acc and transaction print('Number of NaN values in', acc, ':', df[acc].isna().sum()) print('Number of NaN values in', transaction, ':', df[transaction].isna().sum()) for col in columns_to_convert: df[col] = df[col].apply(lambda x: str(int(x)) if isinstance(x, (int, float)) and str(x) != 'nan' else str(x)) quarters = [] start_year = 2017 end_year = 2022 # df_temp = df.copy(deep=True) # df_temp = df_temp.dropna() end_year = min(int(df[acc].max()[:4]), 2022) print("the end year", end_year) print("safe and sound") for year in range(start_year, end_year+1): for quarter in ['03', '06', '09', '12']: quarters.append(str(year) + quarter) # Find the missing quarters by LOB missing_quarters = [] for l in df[lob].unique(): l_df = df[df[lob] == l] l_quarters = set(quarters) - set(l_df[acc]) l_missing_df = pd.DataFrame({acc: list(l_quarters), transaction: [str(end_year)+'12'] * len(l_quarters)}) for col in df.columns: # Fill the missing #print("\n"*5,col,transaction) if col != lob: # These two checks are nesscary in case we are filling for the premium then we only fill it with the missing quarters without the 202212 for transactions if col == acc: l_missing_df[col] = list(l_quarters) elif str(col) == str(transaction): l_missing_df[col] = [str(end_year) + '12'] * len(l_quarters) else: # Pad l_missing_df[col] = 0.1 # Count padding per lob if col not in lobs_dict: lobs_dict[col] = 0 lobs_dict[col] = 0.1 + lobs_dict[col] # Count total paddings missing_count = missing_count + 1 if len(l_quarters) > 0 : filled_warn = str(l)+' was filled with the dates '+str(l_quarters) print(filled_warn) filled.append(filled_warn) l_missing_df[lob] = l missing_quarters.append(l_missing_df) filled.append([lobs_dict.keys(),lobs_dict.values()]) #filled.append("Total paddings (0.1): "+str(missing_count)) print("=="*100) print('Unique values in', acc, 'for missing quarters:', l_missing_df[acc].unique()) # Concatenate the original dataframe and the missing quarters dataframe filled_df = pd.concat([df] + missing_quarters, ignore_index=True) print('Number of NaN values in', acc, 'after concatenation:', filled_df[acc].isna().sum()) print('Unique values in', acc, 'before conversion:', filled_df[acc].unique()) # Convert the 'accident_quarter_bracket' column to datetime format filled_df[acc] = pd.to_datetime(filled_df[acc], format='%Y%m').dt.strftime('%Y%m') print('Unique values in', acc, 'after conversion:', filled_df[acc].unique()) print("=="*100) # Sort the dataframe by quarter filled_df = filled_df.sort_values(acc) # Reset the index filled_df = filled_df.reset_index(drop=True) # Print the filled quarters or a message if there are no missing quarters filled_quarters = filled_df[acc].unique() filtered_quarters = [q for q in filled_quarters if q[:4] in [str(year1) for year1 in range(start_year, end_year + 1)]] if len(filtered_quarters) == 0: msg = "No missing quarters between "+start_year+"-"+str(end_year) print(msg) filled.append(msg) else: pass#print(filtered_quarters) #filled_df = filled_df[[acc, transaction] + [col for col in filled_df.columns if col not in [acc, transaction]]] return filled_df,filled def drop_missing_rows(df, columns): #import sys removed_rows = df[df[columns].isnull().any(axis=1)] #display(removed_rows) print("LOB NAME", columns[0]) #sys.exit() removed_rows = df[df[columns].isnull().any(axis=1)].dropna(subset=columns[0], how='any') removed_rows = removed_rows[removed_rows[columns].isnull().any(axis=1)].dropna(subset=columns[0], how='any') df = df.dropna(subset=columns, how='any') return df,removed_rows # def write_log(sheet_data_dict): # workbook = openpyxl.Workbook() # max_sheet_name_length = 31 # for sheet_name, data_dict in sheet_data_dict.items(): # sheet_name = sheet_name[:max_sheet_name_length] # sheet = workbook.create_sheet(title=sheet_name) # col_index = 1 # Start from column 1 (A), column 0 does not exist in Excel # adjacent_col_index = 2 # Initialize adjacent column index to 2 (B) # row_index = 1 # Initialize row index to 1 to start writing from the first row # for title, data in data_dict.items(): # lst, color = data[0], (data[1] if len(data) > 1 else None) # adjacent = data[2] if len(data) > 2 else False # if adjacent: # write_col_index = adjacent_col_index # Use adjacent column # adjacent_col_index += 1 # Increment adjacent column index for next adjacent data # else: # write_col_index = col_index # Use column 1 (A) for non-adjacent data # row_index = sheet.max_row + 1 if sheet.max_row > 0 else 1 # Start from next available row in column 1 # # Write title # title_cell = sheet.cell(row=row_index, column=write_col_index) # title_cell.value = title # title_cell.font = Font(size=14, bold=True) # # Write list items and apply color # for item_index, item in enumerate(lst, start=row_index + 1): # cell = sheet.cell(row=item_index, column=write_col_index) # cell.value = item # if color: # fill = PatternFill(start_color=color, end_color=color, fill_type="solid") # cell.fill = fill # # Adjust column width # max_length = 0 # for cell in sheet[get_column_letter(write_col_index)]: # try: # if len(str(cell.value)) > max_length: # max_length = len(cell.value) # except: # pass # adjusted_width = (max_length + 2) # sheet.column_dimensions[get_column_letter(write_col_index)].width = adjusted_width # if "Sheet" in workbook.sheetnames: # workbook.remove(workbook["Sheet"]) # workbook.save('Log.xlsx') def write_log(sheet_data_dict): workbook = openpyxl.Workbook() max_sheet_name_length = 31 for sheet_name, data_dict in sheet_data_dict.items(): sheet_name = sheet_name[:max_sheet_name_length] sheet = workbook.create_sheet(title=sheet_name) col_index = 1 adjacent_col_index = 1 start_row_index = 1 for title, data in data_dict.items(): lst, color = data[0], (data[1] if len(data) > 1 else None) adjacent = data[2] if len(data) > 2 else False if adjacent: adjacent_col_index += 1 # Move to the next column for adjacent data write_col_index = adjacent_col_index # Write data in the adjacent column else: col_index = 1 # Reset to column 1 (A) for non-adjacent data adjacent_col_index = col_index # Reset adjacent column index write_col_index = col_index # Write data in column 1 (A) start_row_index = sheet.max_row + 1 if sheet.max_row > 0 else 1 # Start from the next available row in column 1 (A) # Write the title title_cell = sheet.cell(row=start_row_index, column=write_col_index) title_cell.value = title title_cell.font = Font(size=14, bold=True) # Write list items and apply color for item_index, item in enumerate(lst, start=start_row_index + 1): cell = sheet.cell(row=item_index, column=write_col_index) cell.value = item if color: fill = PatternFill(start_color=color, end_color=color, fill_type="solid") cell.fill = fill # Adjust the column width max_length = max(len(str(val)) for val in [title, *lst]) adjusted_width = (max_length + 2) sheet.column_dimensions[get_column_letter(write_col_index)].width = adjusted_width if "Sheet" in workbook.sheetnames: workbook.remove(workbook["Sheet"]) workbook.save('Log.xlsx') def column_letter(index): """Convert a column index into a column letter""" letters = "" while index > 0: index, remainder = divmod(index - 1, 26) letters = chr(65 + remainder) + letters return letters warnings = [] def is_found(c,text): global warnings if c[-1] == None: warnings.append(text+" was not found") def get_alts(atype): if atype == 'claim': return ['lob','accident_quarter_bracket','transaction_quarter_bracket','paid_amount','gross_recoveries_settled','os_amount','gross_os_recoveries','claim_count'] return ['lob','quarter_bracket','gross_premium_earned','ERP'] def filter_claims(df): print("Sum of Null beginning: ",df.isnull().sum()) print("Sum of Null beginning 2: ",(df == '').sum()) print(df.dtypes) filled_warn = [] global warnings warnings = [] columns = [] # Find lob columns.append(get_lob(df)) is_found(columns,"lob") if None in columns: return None,None # Find quarters sublist = quarters(df) print("\n"*10,sublist,"\n"*10) columns.extend(sublist) # min_col = min(sublist, key=lambda col: df.dropna()[col].sum()) # max_col = max(sublist, key=lambda col: df.dropna()[col].sum()) min_col = df[sublist].sum().idxmin() max_col = [col for col in sublist if col != min_col][0] df,temp = drop_missing_rows(df,columns) print('missing: ',df[df.columns[1]].isnull().sum()) #df.to_csv("gayassshit.csv") #temp.to_csv("gayassshit1.csv") #df.to_csv("before_filling.csv") #print("\n"*10,columns[0],min_col,max_col,"\n"*10) df, filled_warn = fill_missing_quarters(df,columns[0],min_col,max_col) #df.to_csv("after_filling.csv") #print(columns[0],min_col,max_col) #temp = fill_missing_quarters(temp,columns[0],min_col,max_col) df = col_to_ints(df,sublist) #df = df[[min_col, max_col] + [col for col in df.columns if col not in [min_col, max_col]]] #display(df) min_col_index = columns.index(min_col) # Find the index of min_col max_col_index = columns.index(max_col) # Find the index of max_col # Rearrange the columns list if min_col_index > max_col_index: columns.insert(max_col_index, columns.pop(min_col_index)) is_found(columns,"quarters") # Find paid amount columns.append(get_paid_amount(df)) is_found(columns,"paid amount") # Find gross recoveries columns.append(get_gross_recoveries(df)) is_found(columns,"gross recoveries") # Find gross os columns.append(get_gross_os(df)) is_found(columns,"gross os") # Find recover os columns.append(get_recover_os(df)) is_found(columns,"recover os") # Find claims count columns.append(get_claim_count(df)) is_found(columns,"claim count") # Warn for i,w in enumerate(warnings): print(str(i+1)+'-',w) #df = pd.concat([df, temp], ignore_index=True) df = df.replace('nan',0) df = df.fillna({col: 0 for col in df.columns if col not in sublist}) return df,columns,temp,filled_warn def filter_premiums(df): global warnings warnings = [] columns = [] filled_warn = [] # Find lob columns.append(get_lob(df)) is_found(columns,"lob") if None in columns: return None,None # Find quarter bracket columns.append(get_quarter_bracket(df)) df,filled_warn = fill_missing_quarters(df,columns[0],columns[-1],columns[-1]) is_found(columns,"quarter") # Find premium earned columns.append(get_earned(df)) is_found(columns,"premium earned") # Find ERP columns.append(get_erp(df)) is_found(columns,"ERP") # Warn for i,w in enumerate(warnings): print(str(i+1)+'-',w) return df,columns,filled_warn css_code='body{background-image:url("https://picsum.photos/seed/picsum/200/300");}' # def unzip_files(zip_file_path): # file_extension = os.path.splitext(zip_file_path)[1] # if file_extension == '.zip': # with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: # file_list = zip_ref.namelist() # csv_excel_files = [file for file in file_list if file.endswith(('.csv', '.xls', '.xlsx'))] # return csv_excel_files # else: # return [zip_file_path] def unzip_files(zip_file_path): file_extension = os.path.splitext(zip_file_path)[1] if file_extension == '.zip': with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: file_list = zip_ref.namelist() csv_excel_files = [file for file in file_list if file.endswith(('.csv', '.xls', '.xlsx'))] extracted_files = [] for file in csv_excel_files: zip_ref.extract(file) extracted_files.append(file) return extracted_files else: return [zip_file_path] def zip_files(file_paths): current_date = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M") new_file_name = f"processed_files_{current_date}.zip" with zipfile.ZipFile(new_file_name, 'w') as zipf: for file_path in file_paths: file_name = file_path.split('/')[-1] zipf.write(file_path, file_name) print(f"{len(file_paths)} files compressed and saved as '{new_file_name}'.") return new_file_name def valid(text): file_extensions = [".zip", ".xlsx", ".csv"] pattern = r"\b({})\b".format("|".join(map(re.escape, file_extensions))) match = re.search(pattern, text, flags=re.IGNORECASE) return bool(match) def op_outcome(name,msg): name = os.path.basename(name) return name+msg def process(files,button): global warnings fail = ' ❌\n' passe = ' ✔️\n' warn = ' ⚠️\n' status = [] cleaned_names = [] if files is None: msg = 'No file provided'+fail return None, msg names = unzip_files(files.name) sheet_data = dict() for name in names: #name = os.path.basename(name) if valid(name): # return zip_files([files.name]),'Success'+passe temp = None columns = [] filled_warn = [] replacens = dict() print("Processing:", name) try: df = pd.read_csv(name) except: df = pd.read_excel(name) old_cols = df.columns old_olds = list(old_cols) sums_old = ['{:,.2f}'.format(df[col].sum()) if np.issubdtype(df[col].dtype, np.number) else "-" for col in old_cols] print("Before columns") print(old_olds) if "summ" in name: print("Summary:") df,columns,filled_warn = filter_premiums(df) if columns == None: print(name,'has no LOB column') print("--"*50) status.append(op_outcome(name,' has no LOB column'+fail)) continue altnames = get_alts('summ') else: print("Claims:") df,columns,temp,filled_warn = filter_claims(df) if columns == None: print(name,'has no LOB column') print("--"*50) status.append(op_outcome(name,' has no LOB column'+fail)) continue altnames = get_alts('claim') finalnames = [] for ind,col in enumerate(columns): if col is not None: finalnames.append(columns[ind]+" ("+altnames[ind]+")") columns = [x for x in columns if x is not None] print("After columns") print(columns) df, msg = map_names(df,name) df = df[columns] print("temp",temp) if isinstance(temp,pd.DataFrame): temp, _ = map_names(temp,name) temp = temp[columns] temp = temp[temp.iloc[:, 3:].sum(axis=1) != 0] df = pd.concat([df, temp], ignore_index=True) column_mapping = dict(zip(columns, finalnames)) df = df.rename(columns=column_mapping) # sum new ncols = df.columns sums_new = ['{:,.2f}'.format(df[col].sum()) if np.issubdtype(df[col].dtype, np.number) else "-" for col in ncols] #display(df) name = os.path.basename(name) #print(columns) #print(warnings) sheetwarnings = [['No warnings'],'00FF00'] if len(warnings) > 0: sheetwarnings = [warnings,'FFA500'] filled_warn.pop(-1) if len(filled_warn) == 0: filled_warn = ['No fillings'] # else: # # tempt_list = [element for element in filled_warn[-2][0] if element in columns] # # filled_warn[-2] = "Padded columns "+str(list(tempt_list))+" with total of "+str(round(filled_warn[-2][1],3))+" each" # pass # fillings_amounts = filled_warn[-1][1] sheet_data[name] = { "Before columns": [old_olds], 'Sum Before':[sums_old,None,True], "After columns": [ncols, '00FF00'], 'Sum After':[sums_new,None,True], #'Filling amount':[fillings_amounts,None,True], 'Fillings':[filled_warn,None], "Warnings": sheetwarnings } c_name = name.split('.')[0]+'_cleaned.csv' df.to_csv(c_name,index=False) cleaned_names.append(c_name) formatted_warnings = '' if len(warnings) > 0: formatted_warnings = '📝:\n'+'\n'.join(warnings) if msg == None: status.append(op_outcome(name,' was processed'+passe+formatted_warnings)) else: status.append(op_outcome(name,msg+warn+formatted_warnings)) else: name = os.path.basename(name) status.append(op_outcome(name,' Failed (Only .csv, .xlsx, .zip are allowed)'+fail)) if len(cleaned_names) > 0: write_log(sheet_data) cleaned_names.append('Log.xlsx') final_file = zip_files(cleaned_names) else: final_file = None msg = '\n'.join(f"{index + 1}.{value}" for index, value in enumerate(status)) return gr.File.update(value=final_file,visible=True),msg #return(str(files)+'fole') with gr.Blocks(css=elems) as demo: gr.Markdown( """
Excel Sheet

Upload a singular xlsx/csv file to clean

Zip Icon Excel Sheet

Or upload multiple compressed into a zip file

""" ) with gr.Row(): inp = gr.File(label='Input file/s') with gr.Row(): bt = gr.Button(value='🧹 Clean',elem_id='button') #bt1 = gr.Button(value='Restart') for _ in range(2): with gr.Row(): pass with gr.Row(): out = gr.File(label='Cleaned files',visible=False) with gr.Row(): log = gr.Textbox(label='Process log 📄',visible=True) bt.click(fn = process, inputs=[inp,bt], outputs=[out,log]) demo.launch(debug=True)