Spaces:
Sleeping
Sleeping
Refactored jupyter rag logic and categorizer methods
Browse files
.vscode/settings.json
ADDED
@@ -0,0 +1,3 @@
|
|
|
|
|
|
|
|
|
1 |
+
{
|
2 |
+
"jupyter.notebookFileRoot": "${workspaceFolder}"
|
3 |
+
}
|
app/categorization/file_processing.py
ADDED
@@ -0,0 +1,120 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import os
|
2 |
+
import glob
|
3 |
+
import shutil
|
4 |
+
import logging
|
5 |
+
import warnings
|
6 |
+
from typing import Optional, Union, Dict, List
|
7 |
+
from datetime import datetime
|
8 |
+
|
9 |
+
import pandas as pd
|
10 |
+
from dateparser import parse
|
11 |
+
|
12 |
+
from categorizer_list import categorize_list
|
13 |
+
from config import TEXT_OUTPUT_FILE, CATEGORY_REFERENCE_OUTPUT_FILE
|
14 |
+
|
15 |
+
# Read file and process it (e.g. categorize transactions)
|
16 |
+
async def process_file(file_path: str) -> Dict[str, Union[str, pd.DataFrame]]:
|
17 |
+
"""
|
18 |
+
Process the input file by reading, cleaning, standardizing, and categorizing the transactions.
|
19 |
+
|
20 |
+
Args:
|
21 |
+
file_path (str): Path to the input file.
|
22 |
+
|
23 |
+
Returns:
|
24 |
+
Dict[str, Union[str, pd.DataFrame]]: Dictionary containing the file name, processed output, and error information if any
|
25 |
+
"""
|
26 |
+
|
27 |
+
file_name = os.path.basename(file_path)
|
28 |
+
result= {'file_name': file_name, 'output': pd.DataFrame(), 'error': ''}
|
29 |
+
try:
|
30 |
+
# Read file into standardized tx format: source, date, type, category, description, amount
|
31 |
+
tx_list = standardize_csv_file(file_path)
|
32 |
+
|
33 |
+
# Categorize transactions
|
34 |
+
result['output'] = await categorize_list(tx_list)
|
35 |
+
print(f'File processed sucessfully: {file_name}')
|
36 |
+
|
37 |
+
except Exception as e:
|
38 |
+
# Return an error indicator and exception info
|
39 |
+
logging.log(logging.ERROR, f"| File: {file_name} | Unexpected Error: {e}")
|
40 |
+
print(f'ERROR processing file {file_name}: {e}')
|
41 |
+
result['error'] = str(e)
|
42 |
+
|
43 |
+
return result
|
44 |
+
|
45 |
+
|
46 |
+
|
47 |
+
def standardize_csv_file(file_path: str) -> pd.DataFrame:
|
48 |
+
"""
|
49 |
+
Read and prepare the data from the input file.
|
50 |
+
|
51 |
+
Args:
|
52 |
+
file_path (str): Path to the input file.
|
53 |
+
|
54 |
+
Returns:
|
55 |
+
pd.DataFrame: Prepared transaction data.
|
56 |
+
"""
|
57 |
+
|
58 |
+
tx_list = pd.read_csv(file_path, index_col=False)
|
59 |
+
tx_list.attrs['file_name'] = file_path
|
60 |
+
tx_list.columns = tx_list.columns.str.lower().str.strip()
|
61 |
+
|
62 |
+
# Standardize dates to YYYY/MM/DD format
|
63 |
+
tx_list['date'] = pd.to_datetime(tx_list['date']).dt.strftime('%Y/%m/%d')
|
64 |
+
|
65 |
+
# Add source and reindex to desired tx format; category column is new and therefore empty
|
66 |
+
tx_list.loc[:, 'source'] = os.path.basename(file_path)
|
67 |
+
tx_list = tx_list.reindex(columns=['date', 'expense/income', 'category', 'name/description', 'amount'])
|
68 |
+
|
69 |
+
return tx_list
|
70 |
+
|
71 |
+
|
72 |
+
def save_results(results: List) -> None:
|
73 |
+
"""
|
74 |
+
Merge all interim results in the input folder and write the merged results to the output file.
|
75 |
+
|
76 |
+
Args:
|
77 |
+
in_folder (str): Path to the input folder containing interim results.
|
78 |
+
out_file (str): Path to the output file.
|
79 |
+
|
80 |
+
Returns:
|
81 |
+
None
|
82 |
+
"""
|
83 |
+
|
84 |
+
# Concatenate all (valid) results into a single DataFrame
|
85 |
+
# Print errors to console
|
86 |
+
ok_files = []
|
87 |
+
ko_files = []
|
88 |
+
error_messages = []
|
89 |
+
|
90 |
+
col_list = ['date', 'expense/income', 'category', 'name/description', 'amount']
|
91 |
+
tx_list = pd.DataFrame(columns=col_list)
|
92 |
+
for result in results:
|
93 |
+
if not result['error']:
|
94 |
+
ok_files.append(result['file_name'])
|
95 |
+
result_df = result['output']
|
96 |
+
result_df.columns = col_list
|
97 |
+
tx_list = pd.concat([tx_list, result_df], ignore_index=True)
|
98 |
+
else:
|
99 |
+
ko_files.append(result['file_name'])
|
100 |
+
error_messages.append(f"{result['file_name']}: {result['error']}")
|
101 |
+
|
102 |
+
# Write contents to output file (based on file type)
|
103 |
+
tx_list.to_csv(TEXT_OUTPUT_FILE, mode="a", index=False, header=not os.path.exists(TEXT_OUTPUT_FILE))
|
104 |
+
|
105 |
+
new_ref_data = tx_list[['name/description', 'category']]
|
106 |
+
if os.path.exists(CATEGORY_REFERENCE_OUTPUT_FILE):
|
107 |
+
# If it exists, add master file to interim results
|
108 |
+
old_ref_data = pd.read_csv(CATEGORY_REFERENCE_OUTPUT_FILE, names=['name/description', 'category'], header=0)
|
109 |
+
new_ref_data = pd.concat([old_ref_data, new_ref_data], ignore_index=True)
|
110 |
+
|
111 |
+
# Drop duplicates, sort, and write to create new Master File
|
112 |
+
new_ref_data.drop_duplicates(subset=['name/description']).sort_values(by=['name/description']).to_csv(CATEGORY_REFERENCE_OUTPUT_FILE, mode="w", index=False, header=True)
|
113 |
+
|
114 |
+
# Summarize results
|
115 |
+
print(f"\nProcessed {len(results)} files: {len(ok_files)} successful, {len(ko_files)} with errors\n")
|
116 |
+
if len(ko_files):
|
117 |
+
print(f"Errors in the following files:")
|
118 |
+
for message in error_messages:
|
119 |
+
print(f" {message}")
|
120 |
+
print('\n')
|
app/{rag β transactions_rag}/categorize_transactions.ipynb
RENAMED
File without changes
|
app/{rag/transactions_2022_2023.csv β transactions_rag/transactions_2024.csv}
RENAMED
File without changes
|