save processed results to database

#8
app/categorization/file_processing.py CHANGED
@@ -11,6 +11,9 @@ from dateparser import parse
11
 
12
  from app.categorization.categorizer_list import categorize_list
13
  from app.categorization.config import RESULT_OUTPUT_FILE, CATEGORY_REFERENCE_OUTPUT_FILE
 
 
 
14
 
15
  # Read file and process it (e.g. categorize transactions)
16
  async def process_file(file_path: str) -> Dict[str, Union[str, pd.DataFrame]]:
@@ -22,26 +25,25 @@ async def process_file(file_path: str) -> Dict[str, Union[str, pd.DataFrame]]:
22
 
23
  Returns:
24
  Dict[str, Union[str, pd.DataFrame]]: Dictionary containing the file name, processed output, and error information if any
25
- """
26
 
27
  file_name = os.path.basename(file_path)
28
- result= {'file_name': file_name, 'output': pd.DataFrame(), 'error': ''}
29
  try:
30
- # Read file into standardized tx format: source, date, type, category, description, amount
31
  tx_list = standardize_csv_file(file_path)
32
 
33
  # Categorize transactions
34
- result['output'] = await categorize_list(tx_list)
35
- print(f'File processed sucessfully: {file_name}')
36
 
37
  except Exception as e:
38
  # Return an error indicator and exception info
39
  logging.log(logging.ERROR, f"| File: {file_name} | Unexpected Error: {e}")
40
- print(f'ERROR processing file {file_name}: {e}')
41
- result['error'] = str(e)
42
-
43
- return result
44
 
 
45
 
46
 
47
  def standardize_csv_file(file_path: str) -> pd.DataFrame:
@@ -55,21 +57,21 @@ def standardize_csv_file(file_path: str) -> pd.DataFrame:
55
  pd.DataFrame: Prepared transaction data.
56
  """
57
 
58
- tx_list = pd.read_csv(file_path, index_col=False)
59
- tx_list.attrs['file_name'] = file_path
60
  tx_list.columns = tx_list.columns.str.lower().str.strip()
61
 
62
  # Standardize dates to YYYY/MM/DD format
63
- tx_list['date'] = pd.to_datetime(tx_list['date']).dt.strftime('%Y/%m/%d')
64
 
65
  # Add source and reindex to desired tx format; category column is new and therefore empty
66
- tx_list.loc[:, 'source'] = os.path.basename(file_path)
67
- tx_list = tx_list.reindex(columns=['date', 'expense/income', 'category', 'name/description', 'amount'])
68
 
69
  return tx_list
70
 
71
 
72
- def save_results(results: List) -> None:
73
  """
74
  Merge all interim results in the input folder and write the merged results to the output file.
75
 
@@ -87,29 +89,33 @@ def save_results(results: List) -> None:
87
  ko_files = []
88
  error_messages = []
89
 
90
- col_list = ['date', 'expense/income', 'category', 'name/description', 'amount']
91
  tx_list = pd.DataFrame(columns=col_list)
92
  for result in results:
93
- if not result['error']:
94
- ok_files.append(result['file_name'])
95
- result_df = result['output']
96
  result_df.columns = col_list
97
  tx_list = pd.concat([tx_list, result_df], ignore_index=True)
98
  else:
99
- ko_files.append(result['file_name'])
100
- error_messages.append(f"{result['file_name']}: {result['error']}")
101
 
102
- # Write contents to output file (based on file type)
103
- tx_list.to_csv(RESULT_OUTPUT_FILE, mode="a", index=False, header=not os.path.exists(RESULT_OUTPUT_FILE))
 
 
104
 
105
- new_ref_data = tx_list[['name/description', 'category']]
106
  if os.path.exists(CATEGORY_REFERENCE_OUTPUT_FILE):
107
  # If it exists, add master file to interim results
108
- old_ref_data = pd.read_csv(CATEGORY_REFERENCE_OUTPUT_FILE, names=['name/description', 'category'], header=0)
109
  new_ref_data = pd.concat([old_ref_data, new_ref_data], ignore_index=True)
110
-
111
  # Drop duplicates, sort, and write to create new Master File
112
- new_ref_data.drop_duplicates(subset=['name/description']).sort_values(by=['name/description']).to_csv(CATEGORY_REFERENCE_OUTPUT_FILE, mode="w", index=False, header=True)
 
 
113
 
114
  # Summarize results
115
  print(f"\nProcessed {len(results)} files: {len(ok_files)} successful, {len(ko_files)} with errors\n")
@@ -117,4 +123,4 @@ def save_results(results: List) -> None:
117
  print(f"Errors in the following files:")
118
  for message in error_messages:
119
  print(f" {message}")
120
- print('\n')
 
11
 
12
  from app.categorization.categorizer_list import categorize_list
13
  from app.categorization.config import RESULT_OUTPUT_FILE, CATEGORY_REFERENCE_OUTPUT_FILE
14
+ from app.model.transaction import Transaction
15
+ from app.schema.index import TransactionCreate
16
+
17
 
18
  # Read file and process it (e.g. categorize transactions)
19
  async def process_file(file_path: str) -> Dict[str, Union[str, pd.DataFrame]]:
 
25
 
26
  Returns:
27
  Dict[str, Union[str, pd.DataFrame]]: Dictionary containing the file name, processed output, and error information if any
28
+ """
29
 
30
  file_name = os.path.basename(file_path)
31
+ result = {"file_name": file_name, "output": pd.DataFrame(), "error": ""}
32
  try:
33
+ # Read file into standardized tx format: source, date, type, category, description, amount
34
  tx_list = standardize_csv_file(file_path)
35
 
36
  # Categorize transactions
37
+ result["output"] = await categorize_list(tx_list)
38
+ print(f"File processed sucessfully: {file_name}")
39
 
40
  except Exception as e:
41
  # Return an error indicator and exception info
42
  logging.log(logging.ERROR, f"| File: {file_name} | Unexpected Error: {e}")
43
+ print(f"ERROR processing file {file_name}: {e}")
44
+ result["error"] = str(e)
 
 
45
 
46
+ return result
47
 
48
 
49
  def standardize_csv_file(file_path: str) -> pd.DataFrame:
 
57
  pd.DataFrame: Prepared transaction data.
58
  """
59
 
60
+ tx_list = pd.read_csv(file_path, index_col=False)
61
+ tx_list.attrs["file_name"] = file_path
62
  tx_list.columns = tx_list.columns.str.lower().str.strip()
63
 
64
  # Standardize dates to YYYY/MM/DD format
65
+ tx_list["date"] = pd.to_datetime(tx_list["date"]).dt.strftime("%Y/%m/%d")
66
 
67
  # Add source and reindex to desired tx format; category column is new and therefore empty
68
+ tx_list.loc[:, "source"] = os.path.basename(file_path)
69
+ tx_list = tx_list.reindex(columns=["date", "expense/income", "category", "name/description", "amount"])
70
 
71
  return tx_list
72
 
73
 
74
+ async def save_results(results: List) -> None:
75
  """
76
  Merge all interim results in the input folder and write the merged results to the output file.
77
 
 
89
  ko_files = []
90
  error_messages = []
91
 
92
+ col_list = ["transaction_date", "type", "category", "name_description", "amount"]
93
  tx_list = pd.DataFrame(columns=col_list)
94
  for result in results:
95
+ if not result["error"]:
96
+ ok_files.append(result["file_name"])
97
+ result_df = result["output"]
98
  result_df.columns = col_list
99
  tx_list = pd.concat([tx_list, result_df], ignore_index=True)
100
  else:
101
+ ko_files.append(result["file_name"])
102
+ error_messages.append(f"{result['file_name']}: {result['error']}")
103
 
104
+ # Save to database
105
+ # FIXME: get user_id from session
106
+ txn_list_to_save = [TransactionCreate(**row.to_dict(), user_id=1) for _, row in tx_list.iterrows()]
107
+ await Transaction.bulk_create(txn_list_to_save)
108
 
109
+ new_ref_data = tx_list[["name/description", "category"]]
110
  if os.path.exists(CATEGORY_REFERENCE_OUTPUT_FILE):
111
  # If it exists, add master file to interim results
112
+ old_ref_data = pd.read_csv(CATEGORY_REFERENCE_OUTPUT_FILE, names=["name/description", "category"], header=0)
113
  new_ref_data = pd.concat([old_ref_data, new_ref_data], ignore_index=True)
114
+
115
  # Drop duplicates, sort, and write to create new Master File
116
+ new_ref_data.drop_duplicates(subset=["name/description"]).sort_values(by=["name/description"]).to_csv(
117
+ CATEGORY_REFERENCE_OUTPUT_FILE, mode="w", index=False, header=True
118
+ )
119
 
120
  # Summarize results
121
  print(f"\nProcessed {len(results)} files: {len(ok_files)} successful, {len(ko_files)} with errors\n")
 
123
  print(f"Errors in the following files:")
124
  for message in error_messages:
125
  print(f" {message}")
126
+ print("\n")
app/model/transaction.py CHANGED
@@ -7,6 +7,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
7
 
8
  from app.model.base import BaseModel
9
  from app.engine.postgresdb import Base
 
10
 
11
 
12
  class Transaction(Base, BaseModel):
@@ -23,11 +24,17 @@ class Transaction(Base, BaseModel):
23
  @classmethod
24
  async def create(cls: "type[Transaction]", db: AsyncSession, **kwargs) -> "Transaction":
25
  query = sql.insert(cls).values(**kwargs).returning(cls.id)
26
- transactions = await db.scalars(query)
27
  transaction = transactions.first()
28
  await db.commit()
29
  return transaction
30
 
 
 
 
 
 
 
31
  @classmethod
32
  async def update(cls: "type[Transaction]", db: AsyncSession, id: int, **kwargs) -> "Transaction":
33
  query = sql.update(cls).where(cls.id == id).values(**kwargs).execution_options(synchronize_session="fetch")
 
7
 
8
  from app.model.base import BaseModel
9
  from app.engine.postgresdb import Base
10
+ from app.schema.index import TransactionCreate
11
 
12
 
13
  class Transaction(Base, BaseModel):
 
24
  @classmethod
25
  async def create(cls: "type[Transaction]", db: AsyncSession, **kwargs) -> "Transaction":
26
  query = sql.insert(cls).values(**kwargs).returning(cls.id)
27
+ transactions = await db.execute(query)
28
  transaction = transactions.first()
29
  await db.commit()
30
  return transaction
31
 
32
+ @classmethod
33
+ async def bulk_create(cls: "type[Transaction]", db: AsyncSession, transactions: List[TransactionCreate]) -> None:
34
+ query = sql.insert(cls).values(transactions)
35
+ await db.execute(query)
36
+ await db.commit()
37
+
38
  @classmethod
39
  async def update(cls: "type[Transaction]", db: AsyncSession, id: int, **kwargs) -> "Transaction":
40
  query = sql.update(cls).where(cls.id == id).values(**kwargs).execution_options(synchronize_session="fetch")
app/schema/index.py CHANGED
@@ -45,5 +45,9 @@ class TransactionResponse(PydanticBaseModel):
45
  type: TransactionType
46
 
47
 
 
 
 
 
48
  class Transaction(TransactionResponse):
49
  user: User
 
45
  type: TransactionType
46
 
47
 
48
+ class TransactionCreate(TransactionResponse):
49
+ user_id: int
50
+
51
+
52
  class Transaction(TransactionResponse):
53
  user: User
app/transactions_rag/categorize_transactions.ipynb CHANGED
@@ -529,7 +529,7 @@
529
  " print(\"\\nProcessing file\")\n",
530
  " result = await asyncio.gather(processed_file)\n",
531
  "\n",
532
- " save_results(result)\n",
533
  " print(result)\n",
534
  "\n",
535
  " output_file = open(CATEGORY_REFERENCE_OUTPUT_FILE, \"r+\")\n",
@@ -537,8 +537,7 @@
537
  "\n",
538
  "\n",
539
  "result = await apply_categorization()\n",
540
- "print(result)\n",
541
- "\n"
542
  ]
543
  }
544
  ],
 
529
  " print(\"\\nProcessing file\")\n",
530
  " result = await asyncio.gather(processed_file)\n",
531
  "\n",
532
+ " await save_results(result)\n",
533
  " print(result)\n",
534
  "\n",
535
  " output_file = open(CATEGORY_REFERENCE_OUTPUT_FILE, \"r+\")\n",
 
537
  "\n",
538
  "\n",
539
  "result = await apply_categorization()\n",
540
+ "print(result)\n"
 
541
  ]
542
  }
543
  ],