import os import glob import shutil import logging import warnings from typing import Optional, Union, Dict, List from datetime import datetime import pandas as pd from dateparser import parse from app.categorization.categorizer_list import categorize_list from app.categorization.config import RESULT_OUTPUT_FILE, CATEGORY_REFERENCE_OUTPUT_FILE from app.model.transaction import Transaction from app.schema.index import TransactionCreate # Read file and process it (e.g. categorize transactions) async def process_file(file_path: str) -> Dict[str, Union[str, pd.DataFrame]]: """ Process the input file by reading, cleaning, standardizing, and categorizing the transactions. Args: file_path (str): Path to the input file. Returns: Dict[str, Union[str, pd.DataFrame]]: Dictionary containing the file name, processed output, and error information if any """ file_name = os.path.basename(file_path) result = {"file_name": file_name, "output": pd.DataFrame(), "error": ""} try: # Read file into standardized tx format: source, date, type, category, description, amount tx_list = standardize_csv_file(file_path) # Categorize transactions result["output"] = await categorize_list(tx_list) print(f"File processed sucessfully: {file_name}") except Exception as e: # Return an error indicator and exception info logging.log(logging.ERROR, f"| File: {file_name} | Unexpected Error: {e}") print(f"ERROR processing file {file_name}: {e}") result["error"] = str(e) return result def standardize_csv_file(file_path: str) -> pd.DataFrame: """ Read and prepare the data from the input file. Args: file_path (str): Path to the input file. Returns: pd.DataFrame: Prepared transaction data. """ tx_list = pd.read_csv(file_path, index_col=False) tx_list.attrs["file_name"] = file_path tx_list.columns = tx_list.columns.str.lower().str.strip() # Standardize dates to YYYY/MM/DD format tx_list["date"] = pd.to_datetime(tx_list["date"]).dt.strftime("%Y/%m/%d") # Add source and reindex to desired tx format; category column is new and therefore empty tx_list.loc[:, "source"] = os.path.basename(file_path) tx_list = tx_list.reindex(columns=["date", "expense/income", "category", "name/description", "amount"]) return tx_list async def save_results(results: List) -> None: """ Merge all interim results in the input folder and write the merged results to the output file. Args: in_folder (str): Path to the input folder containing interim results. out_file (str): Path to the output file. Returns: None """ # Concatenate all (valid) results into a single DataFrame # Print errors to console ok_files = [] ko_files = [] error_messages = [] col_list = ["transaction_date", "type", "category", "name_description", "amount"] tx_list = pd.DataFrame(columns=col_list) for result in results: if not result["error"]: ok_files.append(result["file_name"]) result_df = result["output"] result_df.columns = col_list tx_list = pd.concat([tx_list, result_df], ignore_index=True) else: ko_files.append(result["file_name"]) error_messages.append(f"{result['file_name']}: {result['error']}") # Save to database # FIXME: get user_id from session txn_list_to_save = [TransactionCreate(**row.to_dict(), user_id=1) for _, row in tx_list.iterrows()] await Transaction.bulk_create(txn_list_to_save) new_ref_data = tx_list[["name/description", "category"]] if os.path.exists(CATEGORY_REFERENCE_OUTPUT_FILE): # If it exists, add master file to interim results old_ref_data = pd.read_csv(CATEGORY_REFERENCE_OUTPUT_FILE, names=["name/description", "category"], header=0) new_ref_data = pd.concat([old_ref_data, new_ref_data], ignore_index=True) # Drop duplicates, sort, and write to create new Master File new_ref_data.drop_duplicates(subset=["name/description"]).sort_values(by=["name/description"]).to_csv( CATEGORY_REFERENCE_OUTPUT_FILE, mode="w", index=False, header=True ) # Summarize results print(f"\nProcessed {len(results)} files: {len(ok_files)} successful, {len(ko_files)} with errors\n") if len(ko_files): print(f"Errors in the following files:") for message in error_messages: print(f" {message}") print("\n")