Spaces:
Paused
Paused
# algo.py | |
import math | |
import time | |
import queue | |
import logging | |
import threading | |
import pandas as pd | |
from tqdm import tqdm | |
from pluralizer import Pluralizer | |
from similarity_fast import SimilarityFast | |
from food_nonfood import classify_as_food_nonfood, pessimistic_food_nonfood_score | |
from utils import clean_word, is_empty_word | |
from db.db_utils import store_mapping_to_db, cached_get_mapping_from_db, get_dictionary_data_from_db, store_result_to_db | |
from ask_gpt import query_gpt | |
from multi_food_item_detector import extract_items, has_delimiters | |
from mapping_template import empty_template, heterogeneous_template, multi_item_template, nonfood_template, usda_template | |
from specificity_classifier import classify_text_to_specificity | |
similarity_threshold = 0.78 | |
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s') | |
class Algo: | |
def __init__(self, db_conn, run_key=None): | |
self.db_conn = db_conn | |
self.run_key = run_key if run_key else int(time.time()) | |
self.db_cursor = db_conn.cursor() | |
self.similarity_fast = SimilarityFast(self.db_cursor) | |
# self.similarity_slow = SimilaritySlow(self.db_cursor, self.db_conn) | |
self.pluralizer = Pluralizer() | |
self.mappings_with_dictionary = self.initialize_mappings() | |
def initialize_mappings(self): | |
self.db_cursor.execute('SELECT cleaned_word, dictionary_word, is_food, similarity_score, food_nonfood_score, wweia_category, sr_legacy_food_category, water_content, dry_matter_content, leakage, ignore, specificity from mappings join dictionary on mappings.dictionary_word = dictionary.description') | |
rows = self.db_cursor.fetchall() | |
mappings_with_dictionary = {} | |
for row in rows: | |
mappings_with_dictionary[row[0]] = { | |
'cleaned_word': row[0], | |
'dictionary_word': row[1], | |
'is_food': row[2], | |
'similarity_score': row[3], | |
'food_nonfood_score': row[4], | |
'wweia_category': row[5], | |
'sr_legacy_food_category': row[6], | |
'water_content': row[7], | |
'dry_matter_content': row[8], | |
'leakage': row[9], | |
'ignore': row[10], | |
'specificity': row[11] | |
} | |
return mappings_with_dictionary | |
def perform_mapping(self, input_word, attempts=0): | |
# if the input word is a USDA food item, we can skip the similarity check | |
# this is a special case because the USDA food items are Government Donation (Not Counted) items | |
if 'usda' in input_word.lower(): | |
return usda_template(input_word, clean_word(input_word)) | |
mapping = self.similarity_fast.find_most_similar_word(input_word) | |
logging.info(f" - Simlarity Fast mapping: {mapping}") | |
# check if the cleaned_word is a substring of the most_similar_word | |
is_substring = mapping['cleaned_word'] in mapping['most_similar_word'] | |
if mapping['similarity_score'] < similarity_threshold and not is_substring: | |
logging.info(" - Attempting GPT mapping") | |
try: | |
gpt_recommended_word = query_gpt(input_word) | |
if gpt_recommended_word: | |
if gpt_recommended_word == 'Non-Food Item': | |
mapping.update( | |
{ | |
'similarity_score': 1.0, | |
'confidence_score': 1.0, | |
'is_food': False, | |
'food_nonfood_score': 1.0 | |
} | |
) | |
return mapping | |
elif gpt_recommended_word == 'Heterogeneous Mixture': | |
mapping.update( | |
{ | |
'dictionary_word': 'Heterogeneous Mixture', 'similarity_score': 1.0, | |
'confidence_score': 1.0 | |
} | |
) | |
return mapping | |
elif gpt_recommended_word == 'Broad Category': | |
category_mapping = self.similarity_fast.find_most_similar_word(input_word, True) | |
mapping.update( | |
{ | |
'dictionary_word': category_mapping['dictionary_word'], | |
'similarity_score': category_mapping['similarity_score'], | |
'confidence_score': category_mapping['confidence_score'] | |
} | |
) | |
else: | |
gpt_mapping = self.similarity_fast.find_most_similar_word(gpt_recommended_word) | |
if gpt_mapping['similarity_score'] > mapping['similarity_score']: | |
gpt_mapping.update( | |
{ | |
'input_word': input_word, | |
'cleaned_word': mapping['cleaned_word'] | |
} | |
) | |
mapping = gpt_mapping | |
except Exception as e: | |
logging.info(f" - Error querying GPT: {e}") | |
return mapping | |
def handle_multi_item(self, input_word): | |
# The input word has a comma or a slash in it | |
# If it has more commas, its comma-delimited | |
# If it has more slashes, its slash-delimited | |
# If it has equal number of commas and slashes, we'll go with slashes | |
logging.info(f"Handling multi-item {input_word}") | |
input_word_parts = extract_items(input_word) | |
logging.info(f" - Extracted items: {input_word_parts}") | |
mappings = [] | |
for part in input_word_parts: | |
mapping = self.handle_single_item(part) | |
if mapping: | |
# Some words in the mapping can be ignored because they are | |
# just filler words that don't add any value to the mapping | |
if mapping['ignore'] == False: | |
mappings.append(mapping) | |
# look up the dictionary values for each mapping | |
# find the wweia category | |
# if all mappings have the same wweia category, return "homogenous", else "heterogeneous" | |
# if is_food is False for any mappings, return "Non-Food Item" as dictionary word | |
for mapping in mappings: | |
if mapping['is_food'] == False: | |
return nonfood_template( | |
input_word, | |
mapping['cleaned_word'], | |
mapping['food_nonfood_score'] | |
) | |
break | |
dictionary_words = [mapping['dictionary_word'] for mapping in mappings] | |
if len(set(dictionary_words)) == 0: | |
return empty_template(input_word) | |
# check if "heterogeneous" is in the wweia category of any of the mappings | |
# otherwise we find the mapping with the lowest DMC value, and return that as the dictionary word, dmc, wc, and leakage values | |
heterogeneous_exists = False | |
most_conservative_mapping = None | |
for mapping in mappings: | |
if mapping['sr_legacy_food_category'] == "Heterogeneous Mixture": | |
heterogeneous_exists = True | |
break | |
else: | |
dry_matter_content = mapping.get('dry_matter_content') | |
if dry_matter_content is not None: | |
if most_conservative_mapping is None or dry_matter_content < most_conservative_mapping.get('dry_matter_content', float('inf')): | |
most_conservative_mapping = mapping | |
if heterogeneous_exists: | |
return heterogeneous_template(input_word) | |
elif most_conservative_mapping is not None: | |
return multi_item_template(input_word, None, most_conservative_mapping) | |
else: | |
logging.warning(f" - No mappings found for {input_word}") | |
return None | |
def handle_single_item(self, input_word): | |
input_word_clean = clean_word(input_word) | |
if not input_word_clean: | |
return None | |
if input_word_clean == "": | |
return None | |
# try the singular form of the word | |
singular = self.pluralizer.pluralize(input_word_clean, 1) | |
# mapping = cached_get_mapping_from_db(self.db_cursor, singular) | |
mapping_with_dict = self.mappings_with_dictionary.get(singular) | |
if mapping_with_dict: | |
mapping_with_dict.update({ | |
'input_word': input_word, | |
}) | |
logging.info(f" - Found mapping in db: {mapping_with_dict}") | |
return mapping_with_dict | |
# try the plural form of the word | |
plural = self.pluralizer.pluralize(input_word_clean, 2) | |
mapping_with_dict = self.mappings_with_dictionary.get(plural) | |
if mapping_with_dict: | |
mapping_with_dict.update({ | |
'input_word': input_word, | |
}) | |
logging.info(f" - Found mapping in db: {mapping_with_dict}") | |
return mapping_with_dict | |
food_nonfood = classify_as_food_nonfood(input_word_clean) | |
# if we're very confident that the word is non-food, let's not even classify it | |
if food_nonfood[1] > 0.9 and food_nonfood[0] == False: | |
mapping = nonfood_template(input_word, input_word_clean, food_nonfood[1]) | |
store_mapping_to_db(self.db_cursor, self.db_conn, mapping) | |
self.mappings_with_dictionary[input_word_clean] = mapping | |
return self.wrap_mapping_with_dictionary_data(mapping) | |
mapping = self.perform_mapping(input_word) | |
specificity = classify_text_to_specificity(input_word_clean) | |
mapping.update({ | |
'specificity': specificity | |
}) | |
food_nonfood_pessimistic = pessimistic_food_nonfood_score(food_nonfood, mapping['similarity_score']) | |
mapping.update({ | |
'is_food': food_nonfood_pessimistic[0], | |
'food_nonfood_score': food_nonfood_pessimistic[1] | |
}) | |
store_mapping_to_db(self.db_cursor, self.db_conn, mapping) | |
self.mappings_with_dictionary[input_word_clean] = mapping | |
return self.wrap_mapping_with_dictionary_data(mapping) | |
def wrap_mapping_with_dictionary_data(self, mapping): | |
if not mapping: | |
return None | |
dictionary_result = get_dictionary_data_from_db(self.db_cursor, mapping['dictionary_word']) | |
# set default on ignore | |
ignore = mapping['ignore'] if 'ignore' in mapping else False | |
mapping.update({ | |
'wweia_category': dictionary_result['wweia_category'] if dictionary_result else None, | |
'sr_legacy_food_category': dictionary_result['sr_legacy_food_category'] if dictionary_result else None, | |
'water_content': dictionary_result['water_content'] if dictionary_result else None, | |
'dry_matter_content': dictionary_result['dry_matter_content'] if dictionary_result else None, | |
'leakage': dictionary_result['leakage'] if dictionary_result else None, | |
'ignore': ignore | |
}) | |
return mapping | |
def add_carbon_credit_data(self, mapping, donor, date, weight): | |
if not mapping: | |
return None | |
mapping.update({ | |
'donor': donor | |
}) | |
try: | |
weight = float(weight) | |
except ValueError: | |
weight = 0 | |
except Exception as e: | |
logging.info(f" - Error converting weight to float: {e}") | |
weight = 0 | |
if math.isnan(weight): | |
weight = 0 | |
mapping.update({ | |
'date': date, | |
'weight': weight, | |
'weight_metric_tonnes': weight * 0.000453592, | |
'distance': 250, | |
'ef': 2.968073544, | |
'mt_lb_mile': 0.0000000809, | |
}) | |
required_fields_exist = 'leakage' in mapping and mapping['leakage'] is not None and 'dry_matter_content' in mapping and mapping['dry_matter_content'] is not None | |
if mapping['is_food'] == False or required_fields_exist == False: | |
return { | |
'baseline_emissions': None, | |
'leakage_emissions': None, | |
'project_emissions': None, | |
'total_emissions_reduction': None, | |
**mapping | |
} | |
logging.info(f" - Calculating carbon credits for: {mapping}") | |
baseline_emissions = mapping['weight_metric_tonnes'] * mapping['dry_matter_content'] * mapping['ef'] | |
leakage_emissions = mapping['leakage'] * baseline_emissions | |
project_emissions = mapping['distance'] * mapping['mt_lb_mile'] * baseline_emissions | |
total_emissions_reduction = baseline_emissions - leakage_emissions - project_emissions | |
mapping.update({ | |
'baseline_emissions': baseline_emissions, | |
'leakage_emissions': leakage_emissions, | |
'project_emissions': project_emissions, | |
'total_emissions_reduction': total_emissions_reduction | |
}) | |
return mapping | |
def match_words(self, input_data): | |
# input_data is a list of tuples, where each tuple is (description, donor) | |
results = [] | |
result_batch = [] | |
for input_item in tqdm(input_data, desc="Processing input words"): | |
input_word = input_item[0] | |
input_word_alt = input_item[1] if len(input_item) > 1 else None | |
input_row_num = input_item[2] if len(input_item) > 2 else None | |
input_donor = input_item[3] if len(input_item) > 3 else None | |
input_date = input_item[4] if len(input_item) > 4 else None | |
input_weight = input_item[5] if len(input_item) > 5 else None | |
logging.info("") | |
logging.info(f"Processing: {input_word}") | |
is_empty = False | |
if is_empty_word(input_word): | |
if is_empty_word(input_word_alt): | |
mapping = empty_template(input_word) | |
is_empty = True | |
else: | |
input_word = input_word_alt | |
if not is_empty: | |
if has_delimiters(input_word): | |
mapping = self.handle_multi_item(input_word) | |
else: | |
mapping = self.handle_single_item(input_word) | |
if mapping: | |
mapping = dict(mapping) | |
mapping = self.add_carbon_credit_data(mapping, input_donor, input_date, input_weight) | |
mapping.update({ | |
'run_row': input_row_num | |
}) | |
result_batch.append(mapping) | |
# store_result_to_db(self.db_cursor, self.db_conn, self.run_key, mapping) | |
results.append(mapping) | |
if len(result_batch) >= 500: | |
store_batch_results_to_db(self.db_conn, self.run_key, result_batch) | |
result_batch = [] | |
if len(result_batch) > 0: | |
store_batch_results_to_db(self.db_conn, self.run_key, result_batch) | |
result_batch = [] | |
return results | |