Spaces:
Paused
Paused
import time | |
import queue | |
import logging | |
import threading | |
import pandas as pd | |
from tqdm import tqdm | |
from pluralizer import Pluralizer | |
from similarity_fast import SimilarityFast | |
from food_nonfood import classify_as_food_nonfood, pessimistic_food_nonfood_score | |
from utils import clean_word, is_empty_word | |
from db.db_utils import store_mapping_to_db, cached_get_mapping_from_db, get_dictionary_data_from_db, store_result_to_db | |
from ask_gpt import query_gpt | |
from multi_food_item_detector import extract_items, has_delimiters | |
from mapping_template import empty_template | |
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s') | |
similarity_threshold = 0.75 | |
class Algo: | |
def __init__(self, db_conn, run_key=None): | |
self.db_conn = db_conn | |
self.run_key = run_key if run_key else int(time.time()) | |
self.db_cursor = db_conn.cursor() | |
self.similarity_fast = SimilarityFast(self.db_cursor) | |
# self.similarity_slow = SimilaritySlow(self.db_cursor, self.db_conn) | |
self.pluralizer = Pluralizer() | |
def perform_mapping(self, input_word, attempts=0): | |
# if the input word is a USDA food item, we can skip the similarity check | |
# this is a special case because the USDA food items are Government Donation (Not Counted) items | |
if 'usda' in input_word.lower(): | |
return { | |
'input_word': input_word, | |
'cleaned_word': clean_word(input_word), | |
'matching_word': 'USDA Food Item', | |
'dictionary_word': 'USDA Food Item', | |
'similarity_score': 1.0, | |
'confidence_score': 1.0, | |
'similar_words': None, | |
'is_food': True, | |
'food_nonfood_score': 1.0 | |
} | |
mapping = self.similarity_fast.find_most_similar_word(input_word) | |
# skip slow mapping for now | |
# if mapping['similarity_score'] < similarity_threshold: | |
# logging.info("Attempting slow mapping") | |
# slow_mapping = self.similarity_slow.find_most_similar_word(input_word) | |
# logging.info(f" - Slow: {slow_mapping}") | |
# if slow_mapping['similarity_score'] > mapping['similarity_score']: | |
# mapping = slow_mapping | |
# if mapping['similarity_score'] < similarity_threshold and len(input_word.split(' ')) > 1: | |
# logging.info(" - Attempting reverse mapping") | |
# reversed_input_word = ' '.join(input_word.split(' ')[::-1]) | |
# reversed_mapping = self.similarity_fast.find_most_similar_word(reversed_input_word) | |
# if reversed_mapping['similarity_score'] > mapping['similarity_score']: | |
# reversed_mapping.update( | |
# { | |
# 'input_word': input_word, | |
# 'cleaned_word': mapping['cleaned_word'] | |
# } | |
# ) | |
# mapping = reversed_mapping | |
# check if the cleaned_word is a substring of the matching_word | |
is_substring = mapping['cleaned_word'] in mapping['matching_word'] | |
if mapping['similarity_score'] < similarity_threshold and not is_substring: | |
logging.info(" - Attempting GPT mapping") | |
try: | |
gpt_recommended_word = query_gpt(input_word) | |
if gpt_recommended_word: | |
if gpt_recommended_word == 'Non-Food Item': | |
mapping.update( | |
{ | |
'similarity_score': 1.0, | |
'confidence_score': 1.0, | |
'is_food': False, | |
'food_nonfood_score': 1.0 | |
} | |
) | |
return mapping | |
elif gpt_recommended_word == 'Mixed Food Items': | |
mapping.update( | |
{ | |
'matching_word': 'Mixed Food Items', | |
'dictionary_word': 'Mixed Food Items', 'similarity_score': 1.0, | |
'confidence_score': 1.0 | |
} | |
) | |
return mapping | |
else: | |
gpt_mapping = self.similarity_fast.find_most_similar_word(gpt_recommended_word) | |
if gpt_mapping['similarity_score'] > mapping['similarity_score']: | |
gpt_mapping.update( | |
{ | |
'input_word': input_word, | |
'cleaned_word': mapping['cleaned_word'] | |
} | |
) | |
mapping = gpt_mapping | |
except Exception as e: | |
logging.info(f" - Error querying GPT: {e}") | |
return mapping | |
def handle_multi_item(self, input_word): | |
# The input word has a comma or a slash in it | |
# If it has more commas, its comma-delimited | |
# If it has more slashes, its slash-delimited | |
# If it has equal number of commas and slashes, we'll go with slashes | |
logging.info(f"Handling multi-item {input_word}") | |
input_word_parts = extract_items(input_word) | |
logging.info(f" - Extracted items: {input_word_parts}") | |
mappings = [] | |
for part in input_word_parts: | |
mapping = self.handle_single_item(part) | |
if mapping: | |
# Some words in the mapping can be ignored because they are | |
# just filler words that don't add any value to the mapping | |
if mapping['ignore'] == False: | |
mappings.append(mapping) | |
# look up the dictionary values for each mapping | |
# find the wweia category | |
# if all mappings have the same wweia category, return "homogenous", else "heterogeneous" | |
# if is_food is False for any mappings, return "Non-Food Item" as dictionary word | |
for mapping in mappings: | |
if mapping['is_food'] == False: | |
return { | |
'input_word': input_word, | |
'cleaned_word': mapping['cleaned_word'], | |
'matching_word': 'Non-Food Item', | |
'dictionary_word': 'Non-Food Item', | |
'similarity_score': None, | |
'confidence_score': None, | |
'similar_words': None, | |
'is_food': False, | |
'food_nonfood_score': 1.0, | |
'wweia_category': 'Non-Food Item', | |
'foundation_category': 'Non-Food Item', | |
'water_content': None, | |
'dry_matter_content': None, | |
'leakage': None | |
} | |
break | |
dictionary_words = [mapping['dictionary_word'] for mapping in mappings] | |
if len(set(dictionary_words)) == 0: | |
return { | |
'input_word': input_word, | |
'cleaned_word': None, | |
'matching_word': None, | |
'dictionary_word': None, | |
'similarity_score': None, | |
'confidence_score': None, | |
'similar_words': None, | |
'is_food': None, | |
'food_nonfood_score': None | |
} | |
# check if "heterogeneous" is in the wweia category of any of the mappings | |
# otherwise we find the mapping with the lowest DMC value, and return that as the dictionary word, dmc, wc, and leakage values | |
heterogeneous_exists = False | |
most_conservative_mapping = None | |
for mapping in mappings: | |
if mapping['wweia_category'] == "Heterogeneous Mixture": | |
heterogeneous_exists = True | |
break | |
else: | |
dry_matter_content = mapping.get('dry_matter_content') | |
if dry_matter_content is not None: | |
if most_conservative_mapping is None or dry_matter_content < most_conservative_mapping.get('dry_matter_content', float('inf')): | |
most_conservative_mapping = mapping | |
if heterogeneous_exists: | |
mixture_data = { | |
'matching_word': 'Heterogeneous Mixture', | |
'dictionary_word': 'Heterogeneous Mixture', | |
'wweia_category': 'Heterogeneous Mixture', | |
'foundation_category': 'Heterogeneous Mixture', | |
'dry_matter_content': 0.27, | |
'water_content': 0.73, | |
'leakage': 0.1 | |
} | |
elif most_conservative_mapping is not None: | |
mixture_data = { | |
'matching_word': most_conservative_mapping['matching_word'], | |
'dictionary_word': f"{most_conservative_mapping['dictionary_word']} (Lowest DMC)", | |
'wweia_category': most_conservative_mapping['wweia_category'], | |
'foundation_category': most_conservative_mapping['foundation_category'], | |
'dry_matter_content': most_conservative_mapping['dry_matter_content'], | |
'water_content': most_conservative_mapping['water_content'], | |
'leakage': most_conservative_mapping['leakage'] | |
} | |
else: | |
logging.warning(f" - No mappings found for {input_word}") | |
return None | |
logging.info(f" - Mixture data: {mixture_data}") | |
return { | |
'input_word': input_word, | |
'cleaned_word': None, | |
'similarity_score': None, | |
'confidence_score': None, | |
'similar_words': None, | |
'is_food': True, | |
'food_nonfood_score': 1.0, | |
**mixture_data | |
} | |
def handle_single_item(self, input_word): | |
input_word_clean = clean_word(input_word) | |
if not input_word_clean: | |
return None | |
if input_word_clean == "": | |
return None | |
# try the singular form of the word | |
singular = self.pluralizer.pluralize(input_word_clean, 1) | |
mapping = cached_get_mapping_from_db(self.db_cursor, singular) | |
if mapping: | |
logging.info(f" - Found mapping in db: {mapping}") | |
return self.wrap_mapping_with_dictionary_data(mapping) | |
# try the plural form of the word | |
plural = self.pluralizer.pluralize(input_word_clean, 2) | |
mapping = cached_get_mapping_from_db(self.db_cursor, plural) | |
if mapping: | |
logging.info(f" - Found mapping in db: {mapping}") | |
return self.wrap_mapping_with_dictionary_data(mapping) | |
food_nonfood = classify_as_food_nonfood(input_word_clean) | |
# if we're very confident that the word is non-food, let's not even classify it | |
if food_nonfood[1] > 0.9 and food_nonfood[0] == False: | |
mapping = { | |
'input_word': input_word, | |
'cleaned_word': input_word_clean, | |
'matching_word': 'Non-Food Item', | |
'dictionary_word': 'Non-Food Item', | |
'similarity_score': None, | |
'confidence_score': None, | |
'similar_words': None, | |
'is_food': False, | |
'food_nonfood_score': food_nonfood[1], | |
} | |
store_mapping_to_db(self.db_cursor, self.db_conn, mapping) | |
return self.wrap_mapping_with_dictionary_data(mapping) | |
mapping = self.perform_mapping(input_word) | |
food_nonfood_pessimistic = pessimistic_food_nonfood_score(food_nonfood, mapping['similarity_score']) | |
mapping.update({ | |
'is_food': food_nonfood_pessimistic[0], | |
'food_nonfood_score': food_nonfood_pessimistic[1] | |
}) | |
store_mapping_to_db(self.db_cursor, self.db_conn, mapping) | |
return self.wrap_mapping_with_dictionary_data(mapping) | |
def wrap_mapping_with_dictionary_data(self, mapping): | |
if not mapping: | |
return None | |
dictionary_result = get_dictionary_data_from_db(self.db_cursor, mapping['dictionary_word']) | |
# set default on ignore | |
ignore = mapping['ignore'] if 'ignore' in mapping else False | |
mapping.update({ | |
'wweia_category': dictionary_result['wweia_category'] if dictionary_result else None, | |
'foundation_category': dictionary_result['foundation_category'] if dictionary_result else None, | |
'water_content': dictionary_result['water_content'] if dictionary_result else None, | |
'dry_matter_content': dictionary_result['dry_matter_content'] if dictionary_result else None, | |
'leakage': dictionary_result['leakage'] if dictionary_result else None, | |
'ignore': ignore | |
}) | |
return mapping | |
def add_carbon_credit_data(self, mapping, donor, date, weight): | |
if not mapping: | |
return None | |
mapping.update({ | |
'donor': donor | |
}) | |
try: | |
weight = float(weight) | |
except ValueError: | |
weight = 0 | |
except Exception as e: | |
logging.info(f" - Error converting weight to float: {e}") | |
weight = 0 | |
mapping.update({ | |
'date': date, | |
'weight': weight, | |
'weight_metric_tonnes': weight * 0.000453592, | |
'distance': 250, | |
'ef': 2.968073544, | |
'mt_lb_mile': 0.0000000809, | |
}) | |
required_fields_exist = 'leakage' in mapping and mapping['leakage'] is not None and 'dry_matter_content' in mapping and mapping['dry_matter_content'] is not None | |
if mapping['is_food'] == False or required_fields_exist == False: | |
return { | |
'baseline_emissions': None, | |
'leakage_emissions': None, | |
'project_emissions': None, | |
'total_emissions_reduction': None, | |
**mapping | |
} | |
logging.info(f" - Calculating carbon credits for: {mapping}") | |
baseline_emissions = mapping['weight_metric_tonnes'] * mapping['dry_matter_content'] * mapping['ef'] | |
leakage_emissions = mapping['leakage'] * baseline_emissions | |
project_emissions = mapping['distance'] * mapping['mt_lb_mile'] * baseline_emissions | |
total_emissions_reduction = baseline_emissions - leakage_emissions - project_emissions | |
mapping.update({ | |
'baseline_emissions': baseline_emissions, | |
'leakage_emissions': leakage_emissions, | |
'project_emissions': project_emissions, | |
'total_emissions_reduction': total_emissions_reduction | |
}) | |
return mapping | |
def match_words(self, input_data): | |
# input_data is a list of tuples, where each tuple is (description, donor) | |
results = [] | |
for input_item in tqdm(input_data, desc="Processing input words"): | |
input_word = input_item[0] | |
input_word_alt = input_item[1] if len(input_item) > 1 else None | |
input_row_num = input_item[2] if len(input_item) > 2 else None | |
input_donor = input_item[3] if len(input_item) > 3 else None | |
input_date = input_item[4] if len(input_item) > 4 else None | |
input_weight = input_item[5] if len(input_item) > 5 else None | |
logging.info("") | |
logging.info(f"Processing: {input_word}") | |
is_empty = False | |
if is_empty_word(input_word): | |
if is_empty_word(input_word_alt): | |
mapping = empty_template(input_word) | |
is_empty = True | |
else: | |
input_word = input_word_alt | |
if not is_empty: | |
if has_delimiters(input_word): | |
mapping = self.handle_multi_item(input_word) | |
else: | |
mapping = self.handle_single_item(input_word) | |
if mapping: | |
mapping = self.add_carbon_credit_data(mapping, input_donor, input_date, input_weight) | |
mapping.update({ | |
'run_row': input_row_num | |
}) | |
results.append(mapping) | |
store_result_to_db(self.db_cursor, self.db_conn, self.run_key, mapping) | |
return results | |