brightly-ai / algo.py
beweinreich's picture
switch wweia -> sr_legacy
034c968
raw
history blame
No virus
16.4 kB
import time
import queue
import logging
import threading
import pandas as pd
from tqdm import tqdm
from pluralizer import Pluralizer
from similarity_fast import SimilarityFast
from food_nonfood import classify_as_food_nonfood, pessimistic_food_nonfood_score
from utils import clean_word, is_empty_word
from db.db_utils import store_mapping_to_db, cached_get_mapping_from_db, get_dictionary_data_from_db, store_result_to_db
from ask_gpt import query_gpt
from multi_food_item_detector import extract_items, has_delimiters
from mapping_template import empty_template
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s')
similarity_threshold = 0.75
class Algo:
def __init__(self, db_conn, run_key=None):
self.db_conn = db_conn
self.run_key = run_key if run_key else int(time.time())
self.db_cursor = db_conn.cursor()
self.similarity_fast = SimilarityFast(self.db_cursor)
# self.similarity_slow = SimilaritySlow(self.db_cursor, self.db_conn)
self.pluralizer = Pluralizer()
def perform_mapping(self, input_word, attempts=0):
# if the input word is a USDA food item, we can skip the similarity check
# this is a special case because the USDA food items are Government Donation (Not Counted) items
if 'usda' in input_word.lower():
return {
'input_word': input_word,
'cleaned_word': clean_word(input_word),
'matching_word': 'USDA Food Item',
'dictionary_word': 'USDA Food Item',
'similarity_score': 1.0,
'confidence_score': 1.0,
'similar_words': None,
'is_food': True,
'food_nonfood_score': 1.0
}
mapping = self.similarity_fast.find_most_similar_word(input_word)
# skip slow mapping for now
# if mapping['similarity_score'] < similarity_threshold:
# logging.info("Attempting slow mapping")
# slow_mapping = self.similarity_slow.find_most_similar_word(input_word)
# logging.info(f" - Slow: {slow_mapping}")
# if slow_mapping['similarity_score'] > mapping['similarity_score']:
# mapping = slow_mapping
# if mapping['similarity_score'] < similarity_threshold and len(input_word.split(' ')) > 1:
# logging.info(" - Attempting reverse mapping")
# reversed_input_word = ' '.join(input_word.split(' ')[::-1])
# reversed_mapping = self.similarity_fast.find_most_similar_word(reversed_input_word)
# if reversed_mapping['similarity_score'] > mapping['similarity_score']:
# reversed_mapping.update(
# {
# 'input_word': input_word,
# 'cleaned_word': mapping['cleaned_word']
# }
# )
# mapping = reversed_mapping
# check if the cleaned_word is a substring of the matching_word
is_substring = mapping['cleaned_word'] in mapping['matching_word']
if mapping['similarity_score'] < similarity_threshold and not is_substring:
logging.info(" - Attempting GPT mapping")
try:
gpt_recommended_word = query_gpt(input_word)
if gpt_recommended_word:
if gpt_recommended_word == 'Non-Food Item':
mapping.update(
{
'similarity_score': 1.0,
'confidence_score': 1.0,
'is_food': False,
'food_nonfood_score': 1.0
}
)
return mapping
elif gpt_recommended_word == 'Mixed Food Items':
mapping.update(
{
'matching_word': 'Mixed Food Items',
'dictionary_word': 'Mixed Food Items', 'similarity_score': 1.0,
'confidence_score': 1.0
}
)
return mapping
else:
gpt_mapping = self.similarity_fast.find_most_similar_word(gpt_recommended_word)
if gpt_mapping['similarity_score'] > mapping['similarity_score']:
gpt_mapping.update(
{
'input_word': input_word,
'cleaned_word': mapping['cleaned_word']
}
)
mapping = gpt_mapping
except Exception as e:
logging.info(f" - Error querying GPT: {e}")
return mapping
def handle_multi_item(self, input_word):
# The input word has a comma or a slash in it
# If it has more commas, its comma-delimited
# If it has more slashes, its slash-delimited
# If it has equal number of commas and slashes, we'll go with slashes
logging.info(f"Handling multi-item {input_word}")
input_word_parts = extract_items(input_word)
logging.info(f" - Extracted items: {input_word_parts}")
mappings = []
for part in input_word_parts:
mapping = self.handle_single_item(part)
if mapping:
# Some words in the mapping can be ignored because they are
# just filler words that don't add any value to the mapping
if mapping['ignore'] == False:
mappings.append(mapping)
# look up the dictionary values for each mapping
# find the wweia category
# if all mappings have the same wweia category, return "homogenous", else "heterogeneous"
# if is_food is False for any mappings, return "Non-Food Item" as dictionary word
for mapping in mappings:
if mapping['is_food'] == False:
return {
'input_word': input_word,
'cleaned_word': mapping['cleaned_word'],
'matching_word': 'Non-Food Item',
'dictionary_word': 'Non-Food Item',
'similarity_score': None,
'confidence_score': None,
'similar_words': None,
'is_food': False,
'food_nonfood_score': 1.0,
'wweia_category': 'Non-Food Item',
'sr_legacy_food_category': 'Non-Food Item',
'water_content': None,
'dry_matter_content': None,
'leakage': None
}
break
dictionary_words = [mapping['dictionary_word'] for mapping in mappings]
if len(set(dictionary_words)) == 0:
return {
'input_word': input_word,
'cleaned_word': None,
'matching_word': None,
'dictionary_word': None,
'similarity_score': None,
'confidence_score': None,
'similar_words': None,
'is_food': None,
'food_nonfood_score': None
}
# check if "heterogeneous" is in the wweia category of any of the mappings
# otherwise we find the mapping with the lowest DMC value, and return that as the dictionary word, dmc, wc, and leakage values
heterogeneous_exists = False
most_conservative_mapping = None
for mapping in mappings:
if mapping['sr_legacy_food_category'] == "Heterogeneous Mixture":
heterogeneous_exists = True
break
else:
dry_matter_content = mapping.get('dry_matter_content')
if dry_matter_content is not None:
if most_conservative_mapping is None or dry_matter_content < most_conservative_mapping.get('dry_matter_content', float('inf')):
most_conservative_mapping = mapping
if heterogeneous_exists:
mixture_data = {
'matching_word': 'Heterogeneous Mixture',
'dictionary_word': 'Heterogeneous Mixture',
'wweia_category': 'Heterogeneous Mixture',
'sr_legacy_food_category': 'Heterogeneous Mixture',
'dry_matter_content': 0.27,
'water_content': 0.73,
'leakage': 0.1
}
elif most_conservative_mapping is not None:
mixture_data = {
'matching_word': most_conservative_mapping['matching_word'],
'dictionary_word': f"{most_conservative_mapping['dictionary_word']} (Lowest DMC)",
'wweia_category': most_conservative_mapping['wweia_category'],
'sr_legacy_food_category': most_conservative_mapping['sr_legacy_food_category'],
'dry_matter_content': most_conservative_mapping['dry_matter_content'],
'water_content': most_conservative_mapping['water_content'],
'leakage': most_conservative_mapping['leakage']
}
else:
logging.warning(f" - No mappings found for {input_word}")
return None
logging.info(f" - Mixture data: {mixture_data}")
return {
'input_word': input_word,
'cleaned_word': None,
'similarity_score': None,
'confidence_score': None,
'similar_words': None,
'is_food': True,
'food_nonfood_score': 1.0,
**mixture_data
}
def handle_single_item(self, input_word):
input_word_clean = clean_word(input_word)
if not input_word_clean:
return None
if input_word_clean == "":
return None
# try the singular form of the word
singular = self.pluralizer.pluralize(input_word_clean, 1)
mapping = cached_get_mapping_from_db(self.db_cursor, singular)
if mapping:
logging.info(f" - Found mapping in db: {mapping}")
return self.wrap_mapping_with_dictionary_data(mapping)
# try the plural form of the word
plural = self.pluralizer.pluralize(input_word_clean, 2)
mapping = cached_get_mapping_from_db(self.db_cursor, plural)
if mapping:
logging.info(f" - Found mapping in db: {mapping}")
return self.wrap_mapping_with_dictionary_data(mapping)
food_nonfood = classify_as_food_nonfood(input_word_clean)
# if we're very confident that the word is non-food, let's not even classify it
if food_nonfood[1] > 0.9 and food_nonfood[0] == False:
mapping = {
'input_word': input_word,
'cleaned_word': input_word_clean,
'matching_word': 'Non-Food Item',
'dictionary_word': 'Non-Food Item',
'similarity_score': None,
'confidence_score': None,
'similar_words': None,
'is_food': False,
'food_nonfood_score': food_nonfood[1],
}
store_mapping_to_db(self.db_cursor, self.db_conn, mapping)
return self.wrap_mapping_with_dictionary_data(mapping)
mapping = self.perform_mapping(input_word)
food_nonfood_pessimistic = pessimistic_food_nonfood_score(food_nonfood, mapping['similarity_score'])
mapping.update({
'is_food': food_nonfood_pessimistic[0],
'food_nonfood_score': food_nonfood_pessimistic[1]
})
store_mapping_to_db(self.db_cursor, self.db_conn, mapping)
return self.wrap_mapping_with_dictionary_data(mapping)
def wrap_mapping_with_dictionary_data(self, mapping):
if not mapping:
return None
dictionary_result = get_dictionary_data_from_db(self.db_cursor, mapping['dictionary_word'])
# set default on ignore
ignore = mapping['ignore'] if 'ignore' in mapping else False
mapping.update({
'wweia_category': dictionary_result['wweia_category'] if dictionary_result else None,
'sr_legacy_food_category': dictionary_result['sr_legacy_food_category'] if dictionary_result else None,
'water_content': dictionary_result['water_content'] if dictionary_result else None,
'dry_matter_content': dictionary_result['dry_matter_content'] if dictionary_result else None,
'leakage': dictionary_result['leakage'] if dictionary_result else None,
'ignore': ignore
})
return mapping
def add_carbon_credit_data(self, mapping, donor, date, weight):
if not mapping:
return None
mapping.update({
'donor': donor
})
try:
weight = float(weight)
except ValueError:
weight = 0
except Exception as e:
logging.info(f" - Error converting weight to float: {e}")
weight = 0
mapping.update({
'date': date,
'weight': weight,
'weight_metric_tonnes': weight * 0.000453592,
'distance': 250,
'ef': 2.968073544,
'mt_lb_mile': 0.0000000809,
})
required_fields_exist = 'leakage' in mapping and mapping['leakage'] is not None and 'dry_matter_content' in mapping and mapping['dry_matter_content'] is not None
if mapping['is_food'] == False or required_fields_exist == False:
return {
'baseline_emissions': None,
'leakage_emissions': None,
'project_emissions': None,
'total_emissions_reduction': None,
**mapping
}
logging.info(f" - Calculating carbon credits for: {mapping}")
baseline_emissions = mapping['weight_metric_tonnes'] * mapping['dry_matter_content'] * mapping['ef']
leakage_emissions = mapping['leakage'] * baseline_emissions
project_emissions = mapping['distance'] * mapping['mt_lb_mile'] * baseline_emissions
total_emissions_reduction = baseline_emissions - leakage_emissions - project_emissions
mapping.update({
'baseline_emissions': baseline_emissions,
'leakage_emissions': leakage_emissions,
'project_emissions': project_emissions,
'total_emissions_reduction': total_emissions_reduction
})
return mapping
def match_words(self, input_data):
# input_data is a list of tuples, where each tuple is (description, donor)
results = []
for input_item in tqdm(input_data, desc="Processing input words"):
input_word = input_item[0]
input_word_alt = input_item[1] if len(input_item) > 1 else None
input_row_num = input_item[2] if len(input_item) > 2 else None
input_donor = input_item[3] if len(input_item) > 3 else None
input_date = input_item[4] if len(input_item) > 4 else None
input_weight = input_item[5] if len(input_item) > 5 else None
logging.info("")
logging.info(f"Processing: {input_word}")
is_empty = False
if is_empty_word(input_word):
if is_empty_word(input_word_alt):
mapping = empty_template(input_word)
is_empty = True
else:
input_word = input_word_alt
if not is_empty:
if has_delimiters(input_word):
mapping = self.handle_multi_item(input_word)
else:
mapping = self.handle_single_item(input_word)
if mapping:
mapping = self.add_carbon_credit_data(mapping, input_donor, input_date, input_weight)
mapping.update({
'run_row': input_row_num
})
results.append(mapping)
store_result_to_db(self.db_cursor, self.db_conn, self.run_key, mapping)
return results