backend / app /categorization /file_processing.py
praneethys's picture
01-Categorization-Transactions (#1)
e612627 verified
raw
history blame
4.43 kB
import os
import glob
import shutil
import logging
import warnings
from typing import Optional, Union, Dict, List
from datetime import datetime
import pandas as pd
from dateparser import parse
from app.categorization.categorizer_list import categorize_list
from app.categorization.config import RESULT_OUTPUT_FILE, CATEGORY_REFERENCE_OUTPUT_FILE
# Read file and process it (e.g. categorize transactions)
async def process_file(file_path: str) -> Dict[str, Union[str, pd.DataFrame]]:
"""
Process the input file by reading, cleaning, standardizing, and categorizing the transactions.
Args:
file_path (str): Path to the input file.
Returns:
Dict[str, Union[str, pd.DataFrame]]: Dictionary containing the file name, processed output, and error information if any
"""
file_name = os.path.basename(file_path)
result= {'file_name': file_name, 'output': pd.DataFrame(), 'error': ''}
try:
# Read file into standardized tx format: source, date, type, category, description, amount
tx_list = standardize_csv_file(file_path)
# Categorize transactions
result['output'] = await categorize_list(tx_list)
print(f'File processed sucessfully: {file_name}')
except Exception as e:
# Return an error indicator and exception info
logging.log(logging.ERROR, f"| File: {file_name} | Unexpected Error: {e}")
print(f'ERROR processing file {file_name}: {e}')
result['error'] = str(e)
return result
def standardize_csv_file(file_path: str) -> pd.DataFrame:
"""
Read and prepare the data from the input file.
Args:
file_path (str): Path to the input file.
Returns:
pd.DataFrame: Prepared transaction data.
"""
tx_list = pd.read_csv(file_path, index_col=False)
tx_list.attrs['file_name'] = file_path
tx_list.columns = tx_list.columns.str.lower().str.strip()
# Standardize dates to YYYY/MM/DD format
tx_list['date'] = pd.to_datetime(tx_list['date']).dt.strftime('%Y/%m/%d')
# Add source and reindex to desired tx format; category column is new and therefore empty
tx_list.loc[:, 'source'] = os.path.basename(file_path)
tx_list = tx_list.reindex(columns=['date', 'expense/income', 'category', 'name/description', 'amount'])
return tx_list
def save_results(results: List) -> None:
"""
Merge all interim results in the input folder and write the merged results to the output file.
Args:
in_folder (str): Path to the input folder containing interim results.
out_file (str): Path to the output file.
Returns:
None
"""
# Concatenate all (valid) results into a single DataFrame
# Print errors to console
ok_files = []
ko_files = []
error_messages = []
col_list = ['date', 'expense/income', 'category', 'name/description', 'amount']
tx_list = pd.DataFrame(columns=col_list)
for result in results:
if not result['error']:
ok_files.append(result['file_name'])
result_df = result['output']
result_df.columns = col_list
tx_list = pd.concat([tx_list, result_df], ignore_index=True)
else:
ko_files.append(result['file_name'])
error_messages.append(f"{result['file_name']}: {result['error']}")
# Write contents to output file (based on file type)
tx_list.to_csv(RESULT_OUTPUT_FILE, mode="a", index=False, header=not os.path.exists(RESULT_OUTPUT_FILE))
new_ref_data = tx_list[['name/description', 'category']]
if os.path.exists(CATEGORY_REFERENCE_OUTPUT_FILE):
# If it exists, add master file to interim results
old_ref_data = pd.read_csv(CATEGORY_REFERENCE_OUTPUT_FILE, names=['name/description', 'category'], header=0)
new_ref_data = pd.concat([old_ref_data, new_ref_data], ignore_index=True)
# Drop duplicates, sort, and write to create new Master File
new_ref_data.drop_duplicates(subset=['name/description']).sort_values(by=['name/description']).to_csv(CATEGORY_REFERENCE_OUTPUT_FILE, mode="w", index=False, header=True)
# Summarize results
print(f"\nProcessed {len(results)} files: {len(ok_files)} successful, {len(ko_files)} with errors\n")
if len(ko_files):
print(f"Errors in the following files:")
for message in error_messages:
print(f" {message}")
print('\n')