File size: 5,930 Bytes
e612627
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Standard library imports
import re
import ast
import json
import logging
import os
from typing import Any, List, Tuple, Optional, Dict, Union

# Third-party library imports
import numpy as np
import pandas as pd
import asyncio
from rapidfuzz import process
from tenacity import retry, wait_random_exponential, stop_after_attempt
from pydantic import ValidationError

# Local application/library specific imports
from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from langchain.output_parsers import PydanticOutputParser, OutputFixingParser
from langchain.prompts import PromptTemplate
import app.categorization.template as CATEGORY_TEMPLATE
from app.categorization.config import CATEGORY_REFERENCE_OUTPUT_FILE, TX_PER_LLM_RUN


def fuzzy_match_list_categorizer(
    description: str,
    descriptions: np.ndarray,
    description_category_pairs: pd.DataFrame,
    threshold: int = 75,
) -> Optional[str]:
    """Find the most similar transaction description and return its category.

    This function uses fuzzy string matching to compare the input description
    against a list of known descriptions. If a sufficient match is found,
    the function returns the category associated with the matched description.

    Args:
        description (str): The transaction description to categorize.
        descriptions (np.ndarray): Known descriptions to compare against.
        description_category_pairs (pd.DataFrame): DataFrame mapping descriptions to categories.
        threshold (int): Minimum similarity score to consider a match.

    Returns:
        str or None: Category of the matched description, or None if no match found.
    """

    # Fuzzy-match this description against the reference descriptions
    match_results = process.extractOne(
        description, descriptions, score_cutoff=threshold)

    # If a match is found, return the category of the matched description
    if match_results:
        return description_category_pairs.at[match_results[2], 'category']

    return None


async def llm_list_categorizer(tx_list: pd.DataFrame) -> pd.DataFrame:
    """Categorize a list of transactions using a language model.

    This function uses a Language Model (LLM) to categorize a list of transaction descriptions.
    It splits the input DataFrame into chunks and processes each chunk asynchronously to improve performance.

    Args:
        tx_list (pd.DataFrame): DataFrame containing the transaction descriptions to categorize.

    Returns:
        pd.DataFrame: DataFrame mapping transaction descriptions to their inferred categories.
    """

    # Initialize language model and prompt
    openai_api_key = os.environ['OPENAI_API_KEY']
    llm = ChatOpenAI(temperature=0, model="gpt-3.5-turbo-0125",
                     api_key=openai_api_key)
    prompt = PromptTemplate.from_template(template=CATEGORY_TEMPLATE)
    chain = LLMChain(llm=llm, prompt=prompt)

    # Iterate over the DataFrame in batches of TX_PER_LLM_RUN transactions
    tasks = [llm_sublist_categorizer(tx_list.attrs['file_name'], chain=chain, tx_descriptions="\n".join(chunk['name/description']).strip())
             for chunk in np.array_split(tx_list, tx_list.shape[0] // TX_PER_LLM_RUN + 1)]

    # Gather results and extract (valid) outputs
    # The results variable is a list of 'results', each 'result' being the output of a single LLM run
    results = await asyncio.gather(*tasks)

    # Extract valid results (each valid result is a list of description-category pairs)
    valid_results = [result['output'] for result in results if result['valid']]

    # Flatten the list of valid results to obtain a single list of description-category pairs
    valid_outputs = [
        output for valid_result in valid_results for output in valid_result]

    # Return a DataFrame with the valid outputs
    return pd.DataFrame(valid_outputs, columns=['name/description', 'category'])


@retry(wait=wait_random_exponential(min=1, max=20), stop=stop_after_attempt(6))
async def llm_sublist_categorizer(
    file_name: str,
    chain: LLMChain,
    tx_descriptions: str,
) -> Dict[str, Union[bool, List[Tuple[str, str]]]]:
    """Categorize a batch of transactions using a language model.

    This function takes a batch of transaction descriptions and passes them to a language model
    for categorization. The function retries on failure, with an exponential backoff.

    Args:
        file_name (str): Name of the file the transaction descriptions were extracted from.
        chain (LLMChain): Language model chain to use for categorization.
        tx_descriptions (str): Concatenated transaction descriptions to categorize.

    Returns:
        dict: Dictionary containing a 'valid' flag and a list of categorized descriptions.
    """

    raw_result = await chain.arun(input_data=tx_descriptions)

    logger = logging.getLogger(__name__)
    result = {'valid': True, 'output': []}
    try:
        # Create a pattern to match a list Description-Category pairs (List[Tuple[str, str]])
        pattern = r"\['([^']+)', '([^']+)'\]"

        # Use it to extract all the correctly formatted pairs from the raw result
        matches = re.findall(pattern, raw_result.replace("\\'", "'"))

        # Loop over the matches, and try to parse them to ensure the content is valid
        valid_outputs = []
        for match in matches:
            try:
                parsed_pair = ast.literal_eval(str(list(match)))
                valid_outputs.append(parsed_pair)
            except Exception as e:
                logger.log(logging.ERROR,
                           f"Parsing Error: {e}\nMatch: {match}\n")
                result['valid'] = False

        result['output'] = valid_outputs

    except Exception as e:
        logging.log(
            logging.ERROR, f"| File: {file_name} | Unexpected Error: {e}\nRaw Result: {raw_result}")
        result['valid'] = False

    return result