backend / app /categorization /categorizer.py
praneethys's picture
01-Categorization-Transactions (#1)
e612627 verified
raw
history blame
No virus
5.93 kB
# Standard library imports
import re
import ast
import json
import logging
import os
from typing import Any, List, Tuple, Optional, Dict, Union
# Third-party library imports
import numpy as np
import pandas as pd
import asyncio
from rapidfuzz import process
from tenacity import retry, wait_random_exponential, stop_after_attempt
from pydantic import ValidationError
# Local application/library specific imports
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from langchain.output_parsers import PydanticOutputParser, OutputFixingParser
from langchain.prompts import PromptTemplate
import app.categorization.template as CATEGORY_TEMPLATE
from app.categorization.config import CATEGORY_REFERENCE_OUTPUT_FILE, TX_PER_LLM_RUN
def fuzzy_match_list_categorizer(
description: str,
descriptions: np.ndarray,
description_category_pairs: pd.DataFrame,
threshold: int = 75,
) -> Optional[str]:
"""Find the most similar transaction description and return its category.
This function uses fuzzy string matching to compare the input description
against a list of known descriptions. If a sufficient match is found,
the function returns the category associated with the matched description.
Args:
description (str): The transaction description to categorize.
descriptions (np.ndarray): Known descriptions to compare against.
description_category_pairs (pd.DataFrame): DataFrame mapping descriptions to categories.
threshold (int): Minimum similarity score to consider a match.
Returns:
str or None: Category of the matched description, or None if no match found.
"""
# Fuzzy-match this description against the reference descriptions
match_results = process.extractOne(
description, descriptions, score_cutoff=threshold)
# If a match is found, return the category of the matched description
if match_results:
return description_category_pairs.at[match_results[2], 'category']
return None
async def llm_list_categorizer(tx_list: pd.DataFrame) -> pd.DataFrame:
"""Categorize a list of transactions using a language model.
This function uses a Language Model (LLM) to categorize a list of transaction descriptions.
It splits the input DataFrame into chunks and processes each chunk asynchronously to improve performance.
Args:
tx_list (pd.DataFrame): DataFrame containing the transaction descriptions to categorize.
Returns:
pd.DataFrame: DataFrame mapping transaction descriptions to their inferred categories.
"""
# Initialize language model and prompt
openai_api_key = os.environ['OPENAI_API_KEY']
llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-0125",
api_key=openai_api_key)
prompt = PromptTemplate.from_template(template=CATEGORY_TEMPLATE)
chain = LLMChain(llm=llm, prompt=prompt)
# Iterate over the DataFrame in batches of TX_PER_LLM_RUN transactions
tasks = [llm_sublist_categorizer(tx_list.attrs['file_name'], chain=chain, tx_descriptions="\n".join(chunk['name/description']).strip())
for chunk in np.array_split(tx_list, tx_list.shape[0] // TX_PER_LLM_RUN + 1)]
# Gather results and extract (valid) outputs
# The results variable is a list of 'results', each 'result' being the output of a single LLM run
results = await asyncio.gather(*tasks)
# Extract valid results (each valid result is a list of description-category pairs)
valid_results = [result['output'] for result in results if result['valid']]
# Flatten the list of valid results to obtain a single list of description-category pairs
valid_outputs = [
output for valid_result in valid_results for output in valid_result]
# Return a DataFrame with the valid outputs
return pd.DataFrame(valid_outputs, columns=['name/description', 'category'])
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6))
async def llm_sublist_categorizer(
file_name: str,
chain: LLMChain,
tx_descriptions: str,
) -> Dict[str, Union[bool, List[Tuple[str, str]]]]:
"""Categorize a batch of transactions using a language model.
This function takes a batch of transaction descriptions and passes them to a language model
for categorization. The function retries on failure, with an exponential backoff.
Args:
file_name (str): Name of the file the transaction descriptions were extracted from.
chain (LLMChain): Language model chain to use for categorization.
tx_descriptions (str): Concatenated transaction descriptions to categorize.
Returns:
dict: Dictionary containing a 'valid' flag and a list of categorized descriptions.
"""
raw_result = await chain.arun(input_data=tx_descriptions)
logger = logging.getLogger(__name__)
result = {'valid': True, 'output': []}
try:
# Create a pattern to match a list Description-Category pairs (List[Tuple[str, str]])
pattern = r"\['([^']+)', '([^']+)'\]"
# Use it to extract all the correctly formatted pairs from the raw result
matches = re.findall(pattern, raw_result.replace("\\'", "'"))
# Loop over the matches, and try to parse them to ensure the content is valid
valid_outputs = []
for match in matches:
try:
parsed_pair = ast.literal_eval(str(list(match)))
valid_outputs.append(parsed_pair)
except Exception as e:
logger.log(logging.ERROR,
f"Parsing Error: {e}\nMatch: {match}\n")
result['valid'] = False
result['output'] = valid_outputs
except Exception as e:
logging.log(
logging.ERROR, f"| File: {file_name} | Unexpected Error: {e}\nRaw Result: {raw_result}")
result['valid'] = False
return result