Spaces:
Sleeping
Sleeping
File size: 5,930 Bytes
e612627 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 |
# Standard library imports
import re
import ast
import json
import logging
import os
from typing import Any, List, Tuple, Optional, Dict, Union
# Third-party library imports
import numpy as np
import pandas as pd
import asyncio
from rapidfuzz import process
from tenacity import retry, wait_random_exponential, stop_after_attempt
from pydantic import ValidationError
# Local application/library specific imports
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from langchain.output_parsers import PydanticOutputParser, OutputFixingParser
from langchain.prompts import PromptTemplate
import app.categorization.template as CATEGORY_TEMPLATE
from app.categorization.config import CATEGORY_REFERENCE_OUTPUT_FILE, TX_PER_LLM_RUN
def fuzzy_match_list_categorizer(
description: str,
descriptions: np.ndarray,
description_category_pairs: pd.DataFrame,
threshold: int = 75,
) -> Optional[str]:
"""Find the most similar transaction description and return its category.
This function uses fuzzy string matching to compare the input description
against a list of known descriptions. If a sufficient match is found,
the function returns the category associated with the matched description.
Args:
description (str): The transaction description to categorize.
descriptions (np.ndarray): Known descriptions to compare against.
description_category_pairs (pd.DataFrame): DataFrame mapping descriptions to categories.
threshold (int): Minimum similarity score to consider a match.
Returns:
str or None: Category of the matched description, or None if no match found.
"""
# Fuzzy-match this description against the reference descriptions
match_results = process.extractOne(
description, descriptions, score_cutoff=threshold)
# If a match is found, return the category of the matched description
if match_results:
return description_category_pairs.at[match_results[2], 'category']
return None
async def llm_list_categorizer(tx_list: pd.DataFrame) -> pd.DataFrame:
"""Categorize a list of transactions using a language model.
This function uses a Language Model (LLM) to categorize a list of transaction descriptions.
It splits the input DataFrame into chunks and processes each chunk asynchronously to improve performance.
Args:
tx_list (pd.DataFrame): DataFrame containing the transaction descriptions to categorize.
Returns:
pd.DataFrame: DataFrame mapping transaction descriptions to their inferred categories.
"""
# Initialize language model and prompt
openai_api_key = os.environ['OPENAI_API_KEY']
llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-0125",
api_key=openai_api_key)
prompt = PromptTemplate.from_template(template=CATEGORY_TEMPLATE)
chain = LLMChain(llm=llm, prompt=prompt)
# Iterate over the DataFrame in batches of TX_PER_LLM_RUN transactions
tasks = [llm_sublist_categorizer(tx_list.attrs['file_name'], chain=chain, tx_descriptions="\n".join(chunk['name/description']).strip())
for chunk in np.array_split(tx_list, tx_list.shape[0] // TX_PER_LLM_RUN + 1)]
# Gather results and extract (valid) outputs
# The results variable is a list of 'results', each 'result' being the output of a single LLM run
results = await asyncio.gather(*tasks)
# Extract valid results (each valid result is a list of description-category pairs)
valid_results = [result['output'] for result in results if result['valid']]
# Flatten the list of valid results to obtain a single list of description-category pairs
valid_outputs = [
output for valid_result in valid_results for output in valid_result]
# Return a DataFrame with the valid outputs
return pd.DataFrame(valid_outputs, columns=['name/description', 'category'])
@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6))
async def llm_sublist_categorizer(
file_name: str,
chain: LLMChain,
tx_descriptions: str,
) -> Dict[str, Union[bool, List[Tuple[str, str]]]]:
"""Categorize a batch of transactions using a language model.
This function takes a batch of transaction descriptions and passes them to a language model
for categorization. The function retries on failure, with an exponential backoff.
Args:
file_name (str): Name of the file the transaction descriptions were extracted from.
chain (LLMChain): Language model chain to use for categorization.
tx_descriptions (str): Concatenated transaction descriptions to categorize.
Returns:
dict: Dictionary containing a 'valid' flag and a list of categorized descriptions.
"""
raw_result = await chain.arun(input_data=tx_descriptions)
logger = logging.getLogger(__name__)
result = {'valid': True, 'output': []}
try:
# Create a pattern to match a list Description-Category pairs (List[Tuple[str, str]])
pattern = r"\['([^']+)', '([^']+)'\]"
# Use it to extract all the correctly formatted pairs from the raw result
matches = re.findall(pattern, raw_result.replace("\\'", "'"))
# Loop over the matches, and try to parse them to ensure the content is valid
valid_outputs = []
for match in matches:
try:
parsed_pair = ast.literal_eval(str(list(match)))
valid_outputs.append(parsed_pair)
except Exception as e:
logger.log(logging.ERROR,
f"Parsing Error: {e}\nMatch: {match}\n")
result['valid'] = False
result['output'] = valid_outputs
except Exception as e:
logging.log(
logging.ERROR, f"| File: {file_name} | Unexpected Error: {e}\nRaw Result: {raw_result}")
result['valid'] = False
return result
|