Praneeth Yerrapragada commited on
Commit
6d92adc
1 Parent(s): ad33b38

feat: add llm service to generate income statement

Browse files
app/api/routers/file_upload.py CHANGED
@@ -1,11 +1,14 @@
1
  from typing import Annotated
2
- from fastapi import APIRouter, UploadFile
3
  from app.categorization.file_processing import process_file, save_results
4
  from app.schema.index import FileUploadCreate
5
  import asyncio
6
  import os
7
  import csv
8
 
 
 
 
9
  file_upload_router = r = APIRouter(prefix="/api/v1/file_upload", tags=["file_upload"])
10
 
11
  @r.post(
@@ -16,7 +19,7 @@ file_upload_router = r = APIRouter(prefix="/api/v1/file_upload", tags=["file_upl
16
  500: {"description": "Internal server error"},
17
  },
18
  )
19
- async def create_file(input_file: UploadFile):
20
  try:
21
  # Create directory to store all uploaded .csv files
22
  file_upload_directory_path = "data/tx_data/input"
@@ -31,10 +34,10 @@ async def create_file(input_file: UploadFile):
31
  # With the newly created file and it's path, process and save it for embedding
32
  processed_file = process_file(os.path.realpath(input_file.filename))
33
  result = await asyncio.gather(processed_file)
34
- save_results(result)
35
 
36
  except Exception:
37
  return {"message": "There was an error uploading this file. Ensure you have a .csv file with the following columns:"
38
- "\n source, date, type, category, description, amount"}
39
 
40
  return {"message": f"Successfully uploaded {input_file.filename}"}
 
1
  from typing import Annotated
2
+ from fastapi import APIRouter, UploadFile, Depends
3
  from app.categorization.file_processing import process_file, save_results
4
  from app.schema.index import FileUploadCreate
5
  import asyncio
6
  import os
7
  import csv
8
 
9
+ from app.engine.postgresdb import get_db_session
10
+ from sqlalchemy.ext.asyncio import AsyncSession
11
+
12
  file_upload_router = r = APIRouter(prefix="/api/v1/file_upload", tags=["file_upload"])
13
 
14
  @r.post(
 
19
  500: {"description": "Internal server error"},
20
  },
21
  )
22
+ async def create_file(input_file: UploadFile, db: AsyncSession = Depends(get_db_session)):
23
  try:
24
  # Create directory to store all uploaded .csv files
25
  file_upload_directory_path = "data/tx_data/input"
 
34
  # With the newly created file and it's path, process and save it for embedding
35
  processed_file = process_file(os.path.realpath(input_file.filename))
36
  result = await asyncio.gather(processed_file)
37
+ await save_results(db, result)
38
 
39
  except Exception:
40
  return {"message": "There was an error uploading this file. Ensure you have a .csv file with the following columns:"
41
+ "\n transaction_date, type, category, name_description, amount"}
42
 
43
  return {"message": f"Successfully uploaded {input_file.filename}"}
app/categorization/file_processing.py CHANGED
@@ -14,6 +14,8 @@ from app.categorization.config import RESULT_OUTPUT_FILE, CATEGORY_REFERENCE_OUT
14
  from app.model.transaction import Transaction
15
  from app.schema.index import TransactionCreate
16
 
 
 
17
 
18
  # Read file and process it (e.g. categorize transactions)
19
  async def process_file(file_path: str) -> Dict[str, Union[str, pd.DataFrame]]:
@@ -71,7 +73,7 @@ def standardize_csv_file(file_path: str) -> pd.DataFrame:
71
  return tx_list
72
 
73
 
74
- async def save_results(results: List) -> None:
75
  """
76
  Merge all interim results in the input folder and write the merged results to the output file.
77
 
@@ -104,7 +106,7 @@ async def save_results(results: List) -> None:
104
  # Save to database
105
  # FIXME: get user_id from session
106
  txn_list_to_save = [TransactionCreate(**row.to_dict(), user_id=1) for _, row in tx_list.iterrows()]
107
- await Transaction.bulk_create(txn_list_to_save)
108
 
109
  new_ref_data = tx_list[["name/description", "category"]]
110
  if os.path.exists(CATEGORY_REFERENCE_OUTPUT_FILE):
 
14
  from app.model.transaction import Transaction
15
  from app.schema.index import TransactionCreate
16
 
17
+ from sqlalchemy.ext.asyncio import AsyncSession
18
+
19
 
20
  # Read file and process it (e.g. categorize transactions)
21
  async def process_file(file_path: str) -> Dict[str, Union[str, pd.DataFrame]]:
 
73
  return tx_list
74
 
75
 
76
+ async def save_results(db: AsyncSession, results: List) -> None:
77
  """
78
  Merge all interim results in the input folder and write the merged results to the output file.
79
 
 
106
  # Save to database
107
  # FIXME: get user_id from session
108
  txn_list_to_save = [TransactionCreate(**row.to_dict(), user_id=1) for _, row in tx_list.iterrows()]
109
+ await Transaction.bulk_create(db, txn_list_to_save)
110
 
111
  new_ref_data = tx_list[["name/description", "category"]]
112
  if os.path.exists(CATEGORY_REFERENCE_OUTPUT_FILE):
app/model/transaction.py CHANGED
@@ -31,8 +31,12 @@ class Transaction(Base, BaseModel):
31
 
32
  @classmethod
33
  async def bulk_create(cls: "type[Transaction]", db: AsyncSession, transactions: List[TransactionCreate]) -> None:
34
- query = sql.insert(cls).values(transactions)
35
- await db.execute(query)
 
 
 
 
36
  await db.commit()
37
 
38
  @classmethod
 
31
 
32
  @classmethod
33
  async def bulk_create(cls: "type[Transaction]", db: AsyncSession, transactions: List[TransactionCreate]) -> None:
34
+ transactions_list = [cls(transaction_date=transaction.transaction_date,
35
+ category=transaction.category,
36
+ type=transaction.type,
37
+ amount= transaction.amount,
38
+ name_description=transaction.name_description) for transaction in transactions]
39
+ db.add(transactions_list)
40
  await db.commit()
41
 
42
  @classmethod
app/schema/index.py CHANGED
@@ -74,3 +74,7 @@ class IncomeStatementResponse(PydanticBaseModel):
74
  date_to: datetime
75
  income: dict
76
  expenses: dict
 
 
 
 
 
74
  date_to: datetime
75
  income: dict
76
  expenses: dict
77
+
78
+ class IncomeStatementLLMResponse(PydanticBaseModel):
79
+ income: dict
80
+ expenses: dict
app/service/income_statement.py CHANGED
@@ -3,13 +3,16 @@ from app.model.transaction import Transaction as TransactionModel
3
  from app.model.income_statement import IncomeStatement as IncomeStatementModel
4
  from sqlalchemy.ext.asyncio import AsyncSession
5
 
 
 
6
 
7
  async def call_llm_to_create_income_statement(payload: IncomeStatementCreate, db: AsyncSession) -> None:
8
  transactions = await TransactionModel.get_by_user_between_dates(
9
  db, payload.user_id, payload.date_from, payload.date_to
10
  )
11
 
12
- # TODO: Call LLM to generate income and expenses
13
- income = {}
14
- expenses = {}
 
15
  await IncomeStatementModel.create(db, **payload, income=income, expenses=expenses)
 
3
  from app.model.income_statement import IncomeStatement as IncomeStatementModel
4
  from sqlalchemy.ext.asyncio import AsyncSession
5
 
6
+ from app.service.llm import call_llm
7
+
8
 
9
  async def call_llm_to_create_income_statement(payload: IncomeStatementCreate, db: AsyncSession) -> None:
10
  transactions = await TransactionModel.get_by_user_between_dates(
11
  db, payload.user_id, payload.date_from, payload.date_to
12
  )
13
 
14
+ response = call_llm(transactions)
15
+ income = response.income
16
+ expenses = response.expenses
17
+
18
  await IncomeStatementModel.create(db, **payload, income=income, expenses=expenses)
app/service/llm.py ADDED
@@ -0,0 +1,53 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from llama_index.core.settings import Settings
3
+ from llama_index.core import PromptTemplate
4
+
5
+ from app.model.transaction import Transaction
6
+ from app.schema.index import IncomeStatementLLMResponse
7
+
8
+ def income_statement_prompt (inputData: Transaction) -> PromptTemplate:
9
+ context_str = f"""
10
+ You are an accountant skilled at organizing transactions from multiple different bank
11
+ accounts and credit card statements to prepare an income statement.
12
+
13
+ Input data has the following format: transaction_date, type, category, name_description, amount.
14
+ The <IN> tag is prepended to the input data as follows:
15
+ ```<IN>
16
+ {inputData}
17
+ ```
18
+
19
+ Your task is to prepare an income statement. The income statement's output should be in a json format.
20
+ An example of the expected output is as follows with the <OUT> tag. Note that not all categories of the transactions
21
+ have been listed below. Use below <OUT> tag as a reference in preparing the income statement.
22
+ ```<OUT>
23
+ {
24
+ "REVENUE": {
25
+ "Gross Sales": 73351.11,
26
+ "Other Income": 0,
27
+ "Balance Dec 2022": 3987.39,
28
+ },
29
+ "EXPENSES": {
30
+ "Advertising": 0,
31
+ "Commissions": 0,
32
+ "Insurance": 0,
33
+ "Memberships": 0,
34
+ "Utilities": 0,
35
+ }
36
+ }
37
+ ```
38
+
39
+ """
40
+ prompt = PromptTemplate(context_str, template_var_mappings={"inputData": inputData})
41
+ logging.info(f"Prompt: {prompt}")
42
+
43
+ async def call_llm(prompt: PromptTemplate) -> str:
44
+ llm = Settings.llm.copy()
45
+ prompt = income_statement_prompt()
46
+ llm.check_prompts(prompt)
47
+
48
+ llm.system_prompt = prompt
49
+
50
+ output = await llm.astructured_predict(output_cls=IncomeStatementLLMResponse, prompt=prompt)
51
+
52
+ logging.info(f"Output: {output}")
53
+ return output
pyproject.toml CHANGED
@@ -52,6 +52,9 @@ version = "0.2.2"
52
  [tool.black]
53
  line-length = 119
54
 
 
 
 
55
  [build-system]
56
  requires = [ "poetry-core" ]
57
  build-backend = "poetry.core.masonry.api"
 
52
  [tool.black]
53
  line-length = 119
54
 
55
+ [tool.pytest.ini_options]
56
+ asyncio_mode = "auto"
57
+
58
  [build-system]
59
  requires = [ "poetry-core" ]
60
  build-backend = "poetry.core.masonry.api"