Spaces:
Sleeping
Sleeping
# Standard library imports | |
import re | |
import ast | |
import json | |
import logging | |
import os | |
from typing import Any, List, Tuple, Optional, Dict, Union | |
# Third-party library imports | |
import numpy as np | |
import pandas as pd | |
import asyncio | |
from rapidfuzz import process | |
from tenacity import retry, wait_random_exponential, stop_after_attempt | |
from pydantic import ValidationError | |
# Local application/library specific imports | |
from langchain_openai import ChatOpenAI | |
from langchain.chains import LLMChain | |
from langchain.output_parsers import PydanticOutputParser, OutputFixingParser | |
from langchain.prompts import PromptTemplate | |
import app.categorization.template as CATEGORY_TEMPLATE | |
from app.categorization.config import CATEGORY_REFERENCE_OUTPUT_FILE, TX_PER_LLM_RUN | |
def fuzzy_match_list_categorizer( | |
description: str, | |
descriptions: np.ndarray, | |
description_category_pairs: pd.DataFrame, | |
threshold: int = 75, | |
) -> Optional[str]: | |
"""Find the most similar transaction description and return its category. | |
This function uses fuzzy string matching to compare the input description | |
against a list of known descriptions. If a sufficient match is found, | |
the function returns the category associated with the matched description. | |
Args: | |
description (str): The transaction description to categorize. | |
descriptions (np.ndarray): Known descriptions to compare against. | |
description_category_pairs (pd.DataFrame): DataFrame mapping descriptions to categories. | |
threshold (int): Minimum similarity score to consider a match. | |
Returns: | |
str or None: Category of the matched description, or None if no match found. | |
""" | |
# Fuzzy-match this description against the reference descriptions | |
match_results = process.extractOne( | |
description, descriptions, score_cutoff=threshold) | |
# If a match is found, return the category of the matched description | |
if match_results: | |
return description_category_pairs.at[match_results[2], 'category'] | |
return None | |
async def llm_list_categorizer(tx_list: pd.DataFrame) -> pd.DataFrame: | |
"""Categorize a list of transactions using a language model. | |
This function uses a Language Model (LLM) to categorize a list of transaction descriptions. | |
It splits the input DataFrame into chunks and processes each chunk asynchronously to improve performance. | |
Args: | |
tx_list (pd.DataFrame): DataFrame containing the transaction descriptions to categorize. | |
Returns: | |
pd.DataFrame: DataFrame mapping transaction descriptions to their inferred categories. | |
""" | |
# Initialize language model and prompt | |
openai_api_key = os.environ['OPENAI_API_KEY'] | |
llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-0125", | |
api_key=openai_api_key) | |
prompt = PromptTemplate.from_template(template=CATEGORY_TEMPLATE) | |
chain = LLMChain(llm=llm, prompt=prompt) | |
# Iterate over the DataFrame in batches of TX_PER_LLM_RUN transactions | |
tasks = [llm_sublist_categorizer(tx_list.attrs['file_name'], chain=chain, tx_descriptions="\n".join(chunk['name/description']).strip()) | |
for chunk in np.array_split(tx_list, tx_list.shape[0] // TX_PER_LLM_RUN + 1)] | |
# Gather results and extract (valid) outputs | |
# The results variable is a list of 'results', each 'result' being the output of a single LLM run | |
results = await asyncio.gather(*tasks) | |
# Extract valid results (each valid result is a list of description-category pairs) | |
valid_results = [result['output'] for result in results if result['valid']] | |
# Flatten the list of valid results to obtain a single list of description-category pairs | |
valid_outputs = [ | |
output for valid_result in valid_results for output in valid_result] | |
# Return a DataFrame with the valid outputs | |
return pd.DataFrame(valid_outputs, columns=['name/description', 'category']) | |
async def llm_sublist_categorizer( | |
file_name: str, | |
chain: LLMChain, | |
tx_descriptions: str, | |
) -> Dict[str, Union[bool, List[Tuple[str, str]]]]: | |
"""Categorize a batch of transactions using a language model. | |
This function takes a batch of transaction descriptions and passes them to a language model | |
for categorization. The function retries on failure, with an exponential backoff. | |
Args: | |
file_name (str): Name of the file the transaction descriptions were extracted from. | |
chain (LLMChain): Language model chain to use for categorization. | |
tx_descriptions (str): Concatenated transaction descriptions to categorize. | |
Returns: | |
dict: Dictionary containing a 'valid' flag and a list of categorized descriptions. | |
""" | |
raw_result = await chain.arun(input_data=tx_descriptions) | |
logger = logging.getLogger(__name__) | |
result = {'valid': True, 'output': []} | |
try: | |
# Create a pattern to match a list Description-Category pairs (List[Tuple[str, str]]) | |
pattern = r"\['([^']+)', '([^']+)'\]" | |
# Use it to extract all the correctly formatted pairs from the raw result | |
matches = re.findall(pattern, raw_result.replace("\\'", "'")) | |
# Loop over the matches, and try to parse them to ensure the content is valid | |
valid_outputs = [] | |
for match in matches: | |
try: | |
parsed_pair = ast.literal_eval(str(list(match))) | |
valid_outputs.append(parsed_pair) | |
except Exception as e: | |
logger.log(logging.ERROR, | |
f"Parsing Error: {e}\nMatch: {match}\n") | |
result['valid'] = False | |
result['output'] = valid_outputs | |
except Exception as e: | |
logging.log( | |
logging.ERROR, f"| File: {file_name} | Unexpected Error: {e}\nRaw Result: {raw_result}") | |
result['valid'] = False | |
return result | |