Spaces:
Sleeping
Sleeping
save processed results to database
#8
by
praneethys
- opened
app/categorization/file_processing.py
CHANGED
@@ -11,6 +11,9 @@ from dateparser import parse
|
|
11 |
|
12 |
from app.categorization.categorizer_list import categorize_list
|
13 |
from app.categorization.config import RESULT_OUTPUT_FILE, CATEGORY_REFERENCE_OUTPUT_FILE
|
|
|
|
|
|
|
14 |
|
15 |
# Read file and process it (e.g. categorize transactions)
|
16 |
async def process_file(file_path: str) -> Dict[str, Union[str, pd.DataFrame]]:
|
@@ -22,26 +25,25 @@ async def process_file(file_path: str) -> Dict[str, Union[str, pd.DataFrame]]:
|
|
22 |
|
23 |
Returns:
|
24 |
Dict[str, Union[str, pd.DataFrame]]: Dictionary containing the file name, processed output, and error information if any
|
25 |
-
"""
|
26 |
|
27 |
file_name = os.path.basename(file_path)
|
28 |
-
result= {
|
29 |
try:
|
30 |
-
# Read file into standardized tx format: source, date, type, category, description, amount
|
31 |
tx_list = standardize_csv_file(file_path)
|
32 |
|
33 |
# Categorize transactions
|
34 |
-
result[
|
35 |
-
print(f
|
36 |
|
37 |
except Exception as e:
|
38 |
# Return an error indicator and exception info
|
39 |
logging.log(logging.ERROR, f"| File: {file_name} | Unexpected Error: {e}")
|
40 |
-
print(f
|
41 |
-
result[
|
42 |
-
|
43 |
-
return result
|
44 |
|
|
|
45 |
|
46 |
|
47 |
def standardize_csv_file(file_path: str) -> pd.DataFrame:
|
@@ -55,21 +57,21 @@ def standardize_csv_file(file_path: str) -> pd.DataFrame:
|
|
55 |
pd.DataFrame: Prepared transaction data.
|
56 |
"""
|
57 |
|
58 |
-
tx_list = pd.read_csv(file_path, index_col=False)
|
59 |
-
tx_list.attrs[
|
60 |
tx_list.columns = tx_list.columns.str.lower().str.strip()
|
61 |
|
62 |
# Standardize dates to YYYY/MM/DD format
|
63 |
-
tx_list[
|
64 |
|
65 |
# Add source and reindex to desired tx format; category column is new and therefore empty
|
66 |
-
tx_list.loc[:,
|
67 |
-
tx_list = tx_list.reindex(columns=[
|
68 |
|
69 |
return tx_list
|
70 |
|
71 |
|
72 |
-
def save_results(results: List) -> None:
|
73 |
"""
|
74 |
Merge all interim results in the input folder and write the merged results to the output file.
|
75 |
|
@@ -87,29 +89,33 @@ def save_results(results: List) -> None:
|
|
87 |
ko_files = []
|
88 |
error_messages = []
|
89 |
|
90 |
-
col_list = [
|
91 |
tx_list = pd.DataFrame(columns=col_list)
|
92 |
for result in results:
|
93 |
-
if not result[
|
94 |
-
ok_files.append(result[
|
95 |
-
result_df = result[
|
96 |
result_df.columns = col_list
|
97 |
tx_list = pd.concat([tx_list, result_df], ignore_index=True)
|
98 |
else:
|
99 |
-
ko_files.append(result[
|
100 |
-
error_messages.append(f"{result['file_name']}: {result['error']}")
|
101 |
|
102 |
-
#
|
103 |
-
|
|
|
|
|
104 |
|
105 |
-
new_ref_data = tx_list[[
|
106 |
if os.path.exists(CATEGORY_REFERENCE_OUTPUT_FILE):
|
107 |
# If it exists, add master file to interim results
|
108 |
-
old_ref_data = pd.read_csv(CATEGORY_REFERENCE_OUTPUT_FILE, names=[
|
109 |
new_ref_data = pd.concat([old_ref_data, new_ref_data], ignore_index=True)
|
110 |
-
|
111 |
# Drop duplicates, sort, and write to create new Master File
|
112 |
-
new_ref_data.drop_duplicates(subset=[
|
|
|
|
|
113 |
|
114 |
# Summarize results
|
115 |
print(f"\nProcessed {len(results)} files: {len(ok_files)} successful, {len(ko_files)} with errors\n")
|
@@ -117,4 +123,4 @@ def save_results(results: List) -> None:
|
|
117 |
print(f"Errors in the following files:")
|
118 |
for message in error_messages:
|
119 |
print(f" {message}")
|
120 |
-
print(
|
|
|
11 |
|
12 |
from app.categorization.categorizer_list import categorize_list
|
13 |
from app.categorization.config import RESULT_OUTPUT_FILE, CATEGORY_REFERENCE_OUTPUT_FILE
|
14 |
+
from app.model.transaction import Transaction
|
15 |
+
from app.schema.index import TransactionCreate
|
16 |
+
|
17 |
|
18 |
# Read file and process it (e.g. categorize transactions)
|
19 |
async def process_file(file_path: str) -> Dict[str, Union[str, pd.DataFrame]]:
|
|
|
25 |
|
26 |
Returns:
|
27 |
Dict[str, Union[str, pd.DataFrame]]: Dictionary containing the file name, processed output, and error information if any
|
28 |
+
"""
|
29 |
|
30 |
file_name = os.path.basename(file_path)
|
31 |
+
result = {"file_name": file_name, "output": pd.DataFrame(), "error": ""}
|
32 |
try:
|
33 |
+
# Read file into standardized tx format: source, date, type, category, description, amount
|
34 |
tx_list = standardize_csv_file(file_path)
|
35 |
|
36 |
# Categorize transactions
|
37 |
+
result["output"] = await categorize_list(tx_list)
|
38 |
+
print(f"File processed sucessfully: {file_name}")
|
39 |
|
40 |
except Exception as e:
|
41 |
# Return an error indicator and exception info
|
42 |
logging.log(logging.ERROR, f"| File: {file_name} | Unexpected Error: {e}")
|
43 |
+
print(f"ERROR processing file {file_name}: {e}")
|
44 |
+
result["error"] = str(e)
|
|
|
|
|
45 |
|
46 |
+
return result
|
47 |
|
48 |
|
49 |
def standardize_csv_file(file_path: str) -> pd.DataFrame:
|
|
|
57 |
pd.DataFrame: Prepared transaction data.
|
58 |
"""
|
59 |
|
60 |
+
tx_list = pd.read_csv(file_path, index_col=False)
|
61 |
+
tx_list.attrs["file_name"] = file_path
|
62 |
tx_list.columns = tx_list.columns.str.lower().str.strip()
|
63 |
|
64 |
# Standardize dates to YYYY/MM/DD format
|
65 |
+
tx_list["date"] = pd.to_datetime(tx_list["date"]).dt.strftime("%Y/%m/%d")
|
66 |
|
67 |
# Add source and reindex to desired tx format; category column is new and therefore empty
|
68 |
+
tx_list.loc[:, "source"] = os.path.basename(file_path)
|
69 |
+
tx_list = tx_list.reindex(columns=["date", "expense/income", "category", "name/description", "amount"])
|
70 |
|
71 |
return tx_list
|
72 |
|
73 |
|
74 |
+
async def save_results(results: List) -> None:
|
75 |
"""
|
76 |
Merge all interim results in the input folder and write the merged results to the output file.
|
77 |
|
|
|
89 |
ko_files = []
|
90 |
error_messages = []
|
91 |
|
92 |
+
col_list = ["transaction_date", "type", "category", "name_description", "amount"]
|
93 |
tx_list = pd.DataFrame(columns=col_list)
|
94 |
for result in results:
|
95 |
+
if not result["error"]:
|
96 |
+
ok_files.append(result["file_name"])
|
97 |
+
result_df = result["output"]
|
98 |
result_df.columns = col_list
|
99 |
tx_list = pd.concat([tx_list, result_df], ignore_index=True)
|
100 |
else:
|
101 |
+
ko_files.append(result["file_name"])
|
102 |
+
error_messages.append(f"{result['file_name']}: {result['error']}")
|
103 |
|
104 |
+
# Save to database
|
105 |
+
# FIXME: get user_id from session
|
106 |
+
txn_list_to_save = [TransactionCreate(**row.to_dict(), user_id=1) for _, row in tx_list.iterrows()]
|
107 |
+
await Transaction.bulk_create(txn_list_to_save)
|
108 |
|
109 |
+
new_ref_data = tx_list[["name/description", "category"]]
|
110 |
if os.path.exists(CATEGORY_REFERENCE_OUTPUT_FILE):
|
111 |
# If it exists, add master file to interim results
|
112 |
+
old_ref_data = pd.read_csv(CATEGORY_REFERENCE_OUTPUT_FILE, names=["name/description", "category"], header=0)
|
113 |
new_ref_data = pd.concat([old_ref_data, new_ref_data], ignore_index=True)
|
114 |
+
|
115 |
# Drop duplicates, sort, and write to create new Master File
|
116 |
+
new_ref_data.drop_duplicates(subset=["name/description"]).sort_values(by=["name/description"]).to_csv(
|
117 |
+
CATEGORY_REFERENCE_OUTPUT_FILE, mode="w", index=False, header=True
|
118 |
+
)
|
119 |
|
120 |
# Summarize results
|
121 |
print(f"\nProcessed {len(results)} files: {len(ok_files)} successful, {len(ko_files)} with errors\n")
|
|
|
123 |
print(f"Errors in the following files:")
|
124 |
for message in error_messages:
|
125 |
print(f" {message}")
|
126 |
+
print("\n")
|
app/model/transaction.py
CHANGED
@@ -7,6 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
|
7 |
|
8 |
from app.model.base import BaseModel
|
9 |
from app.engine.postgresdb import Base
|
|
|
10 |
|
11 |
|
12 |
class Transaction(Base, BaseModel):
|
@@ -23,11 +24,17 @@ class Transaction(Base, BaseModel):
|
|
23 |
@classmethod
|
24 |
async def create(cls: "type[Transaction]", db: AsyncSession, **kwargs) -> "Transaction":
|
25 |
query = sql.insert(cls).values(**kwargs).returning(cls.id)
|
26 |
-
transactions = await db.
|
27 |
transaction = transactions.first()
|
28 |
await db.commit()
|
29 |
return transaction
|
30 |
|
|
|
|
|
|
|
|
|
|
|
|
|
31 |
@classmethod
|
32 |
async def update(cls: "type[Transaction]", db: AsyncSession, id: int, **kwargs) -> "Transaction":
|
33 |
query = sql.update(cls).where(cls.id == id).values(**kwargs).execution_options(synchronize_session="fetch")
|
|
|
7 |
|
8 |
from app.model.base import BaseModel
|
9 |
from app.engine.postgresdb import Base
|
10 |
+
from app.schema.index import TransactionCreate
|
11 |
|
12 |
|
13 |
class Transaction(Base, BaseModel):
|
|
|
24 |
@classmethod
|
25 |
async def create(cls: "type[Transaction]", db: AsyncSession, **kwargs) -> "Transaction":
|
26 |
query = sql.insert(cls).values(**kwargs).returning(cls.id)
|
27 |
+
transactions = await db.execute(query)
|
28 |
transaction = transactions.first()
|
29 |
await db.commit()
|
30 |
return transaction
|
31 |
|
32 |
+
@classmethod
|
33 |
+
async def bulk_create(cls: "type[Transaction]", db: AsyncSession, transactions: List[TransactionCreate]) -> None:
|
34 |
+
query = sql.insert(cls).values(transactions)
|
35 |
+
await db.execute(query)
|
36 |
+
await db.commit()
|
37 |
+
|
38 |
@classmethod
|
39 |
async def update(cls: "type[Transaction]", db: AsyncSession, id: int, **kwargs) -> "Transaction":
|
40 |
query = sql.update(cls).where(cls.id == id).values(**kwargs).execution_options(synchronize_session="fetch")
|
app/schema/index.py
CHANGED
@@ -45,5 +45,9 @@ class TransactionResponse(PydanticBaseModel):
|
|
45 |
type: TransactionType
|
46 |
|
47 |
|
|
|
|
|
|
|
|
|
48 |
class Transaction(TransactionResponse):
|
49 |
user: User
|
|
|
45 |
type: TransactionType
|
46 |
|
47 |
|
48 |
+
class TransactionCreate(TransactionResponse):
|
49 |
+
user_id: int
|
50 |
+
|
51 |
+
|
52 |
class Transaction(TransactionResponse):
|
53 |
user: User
|
app/transactions_rag/categorize_transactions.ipynb
CHANGED
@@ -529,7 +529,7 @@
|
|
529 |
" print(\"\\nProcessing file\")\n",
|
530 |
" result = await asyncio.gather(processed_file)\n",
|
531 |
"\n",
|
532 |
-
" save_results(result)\n",
|
533 |
" print(result)\n",
|
534 |
"\n",
|
535 |
" output_file = open(CATEGORY_REFERENCE_OUTPUT_FILE, \"r+\")\n",
|
@@ -537,8 +537,7 @@
|
|
537 |
"\n",
|
538 |
"\n",
|
539 |
"result = await apply_categorization()\n",
|
540 |
-
"print(result)\n"
|
541 |
-
"\n"
|
542 |
]
|
543 |
}
|
544 |
],
|
|
|
529 |
" print(\"\\nProcessing file\")\n",
|
530 |
" result = await asyncio.gather(processed_file)\n",
|
531 |
"\n",
|
532 |
+
" await save_results(result)\n",
|
533 |
" print(result)\n",
|
534 |
"\n",
|
535 |
" output_file = open(CATEGORY_REFERENCE_OUTPUT_FILE, \"r+\")\n",
|
|
|
537 |
"\n",
|
538 |
"\n",
|
539 |
"result = await apply_categorization()\n",
|
540 |
+
"print(result)\n"
|
|
|
541 |
]
|
542 |
}
|
543 |
],
|