Spaces:
Sleeping
Sleeping
import os | |
import glob | |
import shutil | |
import logging | |
import warnings | |
from typing import Optional, Union, Dict, List | |
from datetime import datetime | |
import pandas as pd | |
from dateparser import parse | |
from app.categorization.categorizer_list import categorize_list | |
from app.categorization.config import RESULT_OUTPUT_FILE, CATEGORY_REFERENCE_OUTPUT_FILE | |
# Read file and process it (e.g. categorize transactions) | |
async def process_file(file_path: str) -> Dict[str, Union[str, pd.DataFrame]]: | |
""" | |
Process the input file by reading, cleaning, standardizing, and categorizing the transactions. | |
Args: | |
file_path (str): Path to the input file. | |
Returns: | |
Dict[str, Union[str, pd.DataFrame]]: Dictionary containing the file name, processed output, and error information if any | |
""" | |
file_name = os.path.basename(file_path) | |
result= {'file_name': file_name, 'output': pd.DataFrame(), 'error': ''} | |
try: | |
# Read file into standardized tx format: source, date, type, category, description, amount | |
tx_list = standardize_csv_file(file_path) | |
# Categorize transactions | |
result['output'] = await categorize_list(tx_list) | |
print(f'File processed sucessfully: {file_name}') | |
except Exception as e: | |
# Return an error indicator and exception info | |
logging.log(logging.ERROR, f"| File: {file_name} | Unexpected Error: {e}") | |
print(f'ERROR processing file {file_name}: {e}') | |
result['error'] = str(e) | |
return result | |
def standardize_csv_file(file_path: str) -> pd.DataFrame: | |
""" | |
Read and prepare the data from the input file. | |
Args: | |
file_path (str): Path to the input file. | |
Returns: | |
pd.DataFrame: Prepared transaction data. | |
""" | |
tx_list = pd.read_csv(file_path, index_col=False) | |
tx_list.attrs['file_name'] = file_path | |
tx_list.columns = tx_list.columns.str.lower().str.strip() | |
# Standardize dates to YYYY/MM/DD format | |
tx_list['date'] = pd.to_datetime(tx_list['date']).dt.strftime('%Y/%m/%d') | |
# Add source and reindex to desired tx format; category column is new and therefore empty | |
tx_list.loc[:, 'source'] = os.path.basename(file_path) | |
tx_list = tx_list.reindex(columns=['date', 'expense/income', 'category', 'name/description', 'amount']) | |
return tx_list | |
def save_results(results: List) -> None: | |
""" | |
Merge all interim results in the input folder and write the merged results to the output file. | |
Args: | |
in_folder (str): Path to the input folder containing interim results. | |
out_file (str): Path to the output file. | |
Returns: | |
None | |
""" | |
# Concatenate all (valid) results into a single DataFrame | |
# Print errors to console | |
ok_files = [] | |
ko_files = [] | |
error_messages = [] | |
col_list = ['date', 'expense/income', 'category', 'name/description', 'amount'] | |
tx_list = pd.DataFrame(columns=col_list) | |
for result in results: | |
if not result['error']: | |
ok_files.append(result['file_name']) | |
result_df = result['output'] | |
result_df.columns = col_list | |
tx_list = pd.concat([tx_list, result_df], ignore_index=True) | |
else: | |
ko_files.append(result['file_name']) | |
error_messages.append(f"{result['file_name']}: {result['error']}") | |
# Write contents to output file (based on file type) | |
tx_list.to_csv(RESULT_OUTPUT_FILE, mode="a", index=False, header=not os.path.exists(RESULT_OUTPUT_FILE)) | |
new_ref_data = tx_list[['name/description', 'category']] | |
if os.path.exists(CATEGORY_REFERENCE_OUTPUT_FILE): | |
# If it exists, add master file to interim results | |
old_ref_data = pd.read_csv(CATEGORY_REFERENCE_OUTPUT_FILE, names=['name/description', 'category'], header=0) | |
new_ref_data = pd.concat([old_ref_data, new_ref_data], ignore_index=True) | |
# Drop duplicates, sort, and write to create new Master File | |
new_ref_data.drop_duplicates(subset=['name/description']).sort_values(by=['name/description']).to_csv(CATEGORY_REFERENCE_OUTPUT_FILE, mode="w", index=False, header=True) | |
# Summarize results | |
print(f"\nProcessed {len(results)} files: {len(ok_files)} successful, {len(ko_files)} with errors\n") | |
if len(ko_files): | |
print(f"Errors in the following files:") | |
for message in error_messages: | |
print(f" {message}") | |
print('\n') |