# algo.py import math import time import queue import logging import threading import pandas as pd from tqdm import tqdm from pluralizer import Pluralizer from similarity_fast import SimilarityFast from food_nonfood import classify_as_food_nonfood, pessimistic_food_nonfood_score from utils import clean_word, is_empty_word from db.db_utils import store_mapping_to_db, cached_get_mapping_from_db, get_dictionary_data_from_db, store_result_to_db, store_batch_results_to_db from ask_gpt import query_gpt from multi_food_item_detector import extract_items, has_delimiters from mapping_template import empty_template, heterogeneous_template, multi_item_template, nonfood_template, usda_template from specificity_classifier import classify_text_to_specificity similarity_threshold = 0.78 logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s') class Algo: def __init__(self, db_conn, run_key=None): self.db_conn = db_conn self.run_key = run_key if run_key else int(time.time()) self.db_cursor = db_conn.cursor() self.similarity_fast = SimilarityFast(self.db_cursor) # self.similarity_slow = SimilaritySlow(self.db_cursor, self.db_conn) self.pluralizer = Pluralizer() self.mappings_with_dictionary = self.initialize_mappings() def initialize_mappings(self): self.db_cursor.execute('SELECT cleaned_word, dictionary_word, is_food, similarity_score, food_nonfood_score, wweia_category, sr_legacy_food_category, water_content, dry_matter_content, leakage, ignore, specificity from mappings join dictionary on mappings.dictionary_word = dictionary.description') rows = self.db_cursor.fetchall() mappings_with_dictionary = {} for row in rows: mappings_with_dictionary[row[0]] = { 'cleaned_word': row[0], 'dictionary_word': row[1], 'is_food': row[2], 'similarity_score': row[3], 'food_nonfood_score': row[4], 'wweia_category': row[5], 'sr_legacy_food_category': row[6], 'water_content': row[7], 'dry_matter_content': row[8], 'leakage': row[9], 'ignore': row[10], 'specificity': row[11] } return mappings_with_dictionary def perform_mapping(self, input_word, attempts=0): # if the input word is a USDA food item, we can skip the similarity check # this is a special case because the USDA food items are Government Donation (Not Counted) items if 'usda' in input_word.lower(): return usda_template(input_word, clean_word(input_word)) mapping = self.similarity_fast.find_most_similar_word(input_word) logging.info(f" - Simlarity Fast mapping: {mapping}") # check if the cleaned_word is a substring of the most_similar_word is_substring = mapping['cleaned_word'] in mapping['most_similar_word'] if mapping['similarity_score'] < similarity_threshold and not is_substring: logging.info(" - Attempting GPT mapping") try: gpt_recommended_word = query_gpt(input_word) if gpt_recommended_word: if gpt_recommended_word == 'Non-Food Item': mapping.update( { 'similarity_score': 1.0, 'confidence_score': 1.0, 'is_food': False, 'food_nonfood_score': 1.0 } ) return mapping elif gpt_recommended_word == 'Heterogeneous Mixture': mapping.update( { 'dictionary_word': 'Heterogeneous Mixture', 'similarity_score': 1.0, 'confidence_score': 1.0 } ) return mapping elif gpt_recommended_word == 'Broad Category': category_mapping = self.similarity_fast.find_most_similar_word(input_word, True) mapping.update( { 'dictionary_word': category_mapping['dictionary_word'], 'similarity_score': category_mapping['similarity_score'], 'confidence_score': category_mapping['confidence_score'] } ) else: gpt_mapping = self.similarity_fast.find_most_similar_word(gpt_recommended_word) if gpt_mapping['similarity_score'] > mapping['similarity_score']: gpt_mapping.update( { 'input_word': input_word, 'cleaned_word': mapping['cleaned_word'] } ) mapping = gpt_mapping except Exception as e: logging.info(f" - Error querying GPT: {e}") return mapping def handle_multi_item(self, input_word): # The input word has a comma or a slash in it # If it has more commas, its comma-delimited # If it has more slashes, its slash-delimited # If it has equal number of commas and slashes, we'll go with slashes logging.info(f"Handling multi-item {input_word}") input_word_parts = extract_items(input_word) logging.info(f" - Extracted items: {input_word_parts}") mappings = [] for part in input_word_parts: mapping = self.handle_single_item(part) if mapping: # Some words in the mapping can be ignored because they are # just filler words that don't add any value to the mapping if mapping['ignore'] == False: mappings.append(mapping) # look up the dictionary values for each mapping # find the wweia category # if all mappings have the same wweia category, return "homogenous", else "heterogeneous" # if is_food is False for any mappings, return "Non-Food Item" as dictionary word for mapping in mappings: if mapping['is_food'] == False: return nonfood_template( input_word, mapping['cleaned_word'], mapping['food_nonfood_score'] ) break dictionary_words = [mapping['dictionary_word'] for mapping in mappings] if len(set(dictionary_words)) == 0: return empty_template(input_word) # check if "heterogeneous" is in the wweia category of any of the mappings # otherwise we find the mapping with the lowest DMC value, and return that as the dictionary word, dmc, wc, and leakage values heterogeneous_exists = False most_conservative_mapping = None for mapping in mappings: if mapping['sr_legacy_food_category'] == "Heterogeneous Mixture": heterogeneous_exists = True break else: dry_matter_content = mapping.get('dry_matter_content') if dry_matter_content is not None: if most_conservative_mapping is None or dry_matter_content < most_conservative_mapping.get('dry_matter_content', float('inf')): most_conservative_mapping = mapping if heterogeneous_exists: return heterogeneous_template(input_word) elif most_conservative_mapping is not None: return multi_item_template(input_word, None, most_conservative_mapping) else: logging.warning(f" - No mappings found for {input_word}") return None def handle_single_item(self, input_word): input_word_clean = clean_word(input_word) if not input_word_clean: return None if input_word_clean == "": return None # try the singular form of the word singular = self.pluralizer.pluralize(input_word_clean, 1) # mapping = cached_get_mapping_from_db(self.db_cursor, singular) mapping_with_dict = self.mappings_with_dictionary.get(singular) if mapping_with_dict: mapping_with_dict.update({ 'input_word': input_word, }) logging.info(f" - Found mapping in db: {mapping_with_dict}") return mapping_with_dict # try the plural form of the word plural = self.pluralizer.pluralize(input_word_clean, 2) mapping_with_dict = self.mappings_with_dictionary.get(plural) if mapping_with_dict: mapping_with_dict.update({ 'input_word': input_word, }) logging.info(f" - Found mapping in db: {mapping_with_dict}") return mapping_with_dict food_nonfood = classify_as_food_nonfood(input_word_clean) # if we're very confident that the word is non-food, let's not even classify it if food_nonfood[1] > 0.9 and food_nonfood[0] == False: mapping = nonfood_template(input_word, input_word_clean, food_nonfood[1]) store_mapping_to_db(self.db_cursor, self.db_conn, mapping) self.mappings_with_dictionary[input_word_clean] = mapping return self.wrap_mapping_with_dictionary_data(mapping) mapping = self.perform_mapping(input_word) specificity = classify_text_to_specificity(input_word_clean) mapping.update({ 'specificity': specificity }) food_nonfood_pessimistic = pessimistic_food_nonfood_score(food_nonfood, mapping['similarity_score']) mapping.update({ 'is_food': food_nonfood_pessimistic[0], 'food_nonfood_score': food_nonfood_pessimistic[1] }) store_mapping_to_db(self.db_cursor, self.db_conn, mapping) self.mappings_with_dictionary[input_word_clean] = mapping return self.wrap_mapping_with_dictionary_data(mapping) def wrap_mapping_with_dictionary_data(self, mapping): if not mapping: return None dictionary_result = get_dictionary_data_from_db(self.db_cursor, mapping['dictionary_word']) # set default on ignore ignore = mapping['ignore'] if 'ignore' in mapping else False mapping.update({ 'wweia_category': dictionary_result['wweia_category'] if dictionary_result else None, 'sr_legacy_food_category': dictionary_result['sr_legacy_food_category'] if dictionary_result else None, 'water_content': dictionary_result['water_content'] if dictionary_result else None, 'dry_matter_content': dictionary_result['dry_matter_content'] if dictionary_result else None, 'leakage': dictionary_result['leakage'] if dictionary_result else None, 'ignore': ignore }) return mapping def add_carbon_credit_data(self, mapping, donor, date, weight): if not mapping: return None mapping.update({ 'donor': donor }) try: weight = float(weight) except ValueError: weight = 0 except Exception as e: logging.info(f" - Error converting weight to float: {e}") weight = 0 if math.isnan(weight): weight = 0 mapping.update({ 'date': date, 'weight': weight, 'weight_metric_tonnes': weight * 0.000453592, 'distance': 250, 'ef': 2.968073544, 'mt_lb_mile': 0.0000000809, }) required_fields_exist = 'leakage' in mapping and mapping['leakage'] is not None and 'dry_matter_content' in mapping and mapping['dry_matter_content'] is not None if mapping['is_food'] == False or required_fields_exist == False: return { 'baseline_emissions': None, 'leakage_emissions': None, 'project_emissions': None, 'total_emissions_reduction': None, **mapping } logging.info(f" - Calculating carbon credits for: {mapping}") baseline_emissions = mapping['weight_metric_tonnes'] * mapping['dry_matter_content'] * mapping['ef'] leakage_emissions = mapping['leakage'] * baseline_emissions project_emissions = mapping['distance'] * mapping['mt_lb_mile'] * baseline_emissions total_emissions_reduction = baseline_emissions - leakage_emissions - project_emissions mapping.update({ 'baseline_emissions': baseline_emissions, 'leakage_emissions': leakage_emissions, 'project_emissions': project_emissions, 'total_emissions_reduction': total_emissions_reduction }) return mapping def match_words(self, input_data): # input_data is a list of tuples, where each tuple is (description, donor) results = [] result_batch = [] for input_item in tqdm(input_data, desc="Processing input words"): input_word = input_item[0] input_word_alt = input_item[1] if len(input_item) > 1 else None input_row_num = input_item[2] if len(input_item) > 2 else None input_donor = input_item[3] if len(input_item) > 3 else None input_date = input_item[4] if len(input_item) > 4 else None input_weight = input_item[5] if len(input_item) > 5 else None logging.info("") logging.info(f"Processing: {input_word}") is_empty = False if is_empty_word(input_word): if is_empty_word(input_word_alt): mapping = empty_template(input_word) is_empty = True else: input_word = input_word_alt if not is_empty: if has_delimiters(input_word): mapping = self.handle_multi_item(input_word) else: mapping = self.handle_single_item(input_word) if mapping: mapping = dict(mapping) mapping = self.add_carbon_credit_data(mapping, input_donor, input_date, input_weight) mapping.update({ 'run_row': input_row_num }) result_batch.append(mapping) # store_result_to_db(self.db_cursor, self.db_conn, self.run_key, mapping) results.append(mapping) if len(result_batch) >= 500: store_batch_results_to_db(self.db_conn, self.db_cursor, self.run_key, result_batch) result_batch = [] if len(result_batch) > 0: store_batch_results_to_db(self.db_conn, self.db_cursor, self.run_key, result_batch) result_batch = [] return results