brightly-ai / algo.py
beweinreich's picture
rename spec classifier
0ff64fc
raw
history blame
15.2 kB
# algo.py
import math
import time
import queue
import logging
import threading
import pandas as pd
from tqdm import tqdm
from pluralizer import Pluralizer
from similarity_fast import SimilarityFast
from food_nonfood import classify_as_food_nonfood, pessimistic_food_nonfood_score
from utils import clean_word, is_empty_word
from db.db_utils import store_mapping_to_db, cached_get_mapping_from_db, get_dictionary_data_from_db, store_result_to_db
from ask_gpt import query_gpt
from multi_food_item_detector import extract_items, has_delimiters
from mapping_template import empty_template, heterogeneous_template, multi_item_template, nonfood_template, usda_template
from tasks import insert_result
from specificity_classifier import classify_text_to_specificity
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
similarity_threshold = 0.78
class Algo:
def __init__(self, db_conn, run_key=None):
self.db_conn = db_conn
self.run_key = run_key if run_key else int(time.time())
self.db_cursor = db_conn.cursor()
self.similarity_fast = SimilarityFast(self.db_cursor)
# self.similarity_slow = SimilaritySlow(self.db_cursor, self.db_conn)
self.pluralizer = Pluralizer()
self.mappings_with_dictionary = self.initialize_mappings()
def initialize_mappings(self):
self.db_cursor.execute('SELECT cleaned_word, dictionary_word, is_food, similarity_score, food_nonfood_score, wweia_category, sr_legacy_food_category, water_content, dry_matter_content, leakage, ignore, specificity from mappings join dictionary on mappings.dictionary_word = dictionary.description')
rows = self.db_cursor.fetchall()
mappings_with_dictionary = {}
for row in rows:
mappings_with_dictionary[row[0]] = {
'cleaned_word': row[0],
'dictionary_word': row[1],
'is_food': row[2],
'similarity_score': row[3],
'food_nonfood_score': row[4],
'wweia_category': row[5],
'sr_legacy_food_category': row[6],
'water_content': row[7],
'dry_matter_content': row[8],
'leakage': row[9],
'ignore': row[10],
'specificity': row[11]
}
return mappings_with_dictionary
def perform_mapping(self, input_word, attempts=0):
# if the input word is a USDA food item, we can skip the similarity check
# this is a special case because the USDA food items are Government Donation (Not Counted) items
if 'usda' in input_word.lower():
return usda_template(input_word, clean_word(input_word))
mapping = self.similarity_fast.find_most_similar_word(input_word)
logging.info(f" - Simlarity Fast mapping: {mapping}")
# check if the cleaned_word is a substring of the most_similar_word
is_substring = mapping['cleaned_word'] in mapping['most_similar_word']
if mapping['similarity_score'] < similarity_threshold and not is_substring:
logging.info(" - Attempting GPT mapping")
try:
gpt_recommended_word = query_gpt(input_word)
if gpt_recommended_word:
if gpt_recommended_word == 'Non-Food Item':
mapping.update(
{
'similarity_score': 1.0,
'confidence_score': 1.0,
'is_food': False,
'food_nonfood_score': 1.0
}
)
return mapping
elif gpt_recommended_word == 'Heterogeneous Mixture':
mapping.update(
{
'dictionary_word': 'Heterogeneous Mixture', 'similarity_score': 1.0,
'confidence_score': 1.0
}
)
return mapping
elif gpt_recommended_word == 'Broad Category':
category_mapping = self.similarity_fast.find_most_similar_word(input_word, True)
mapping.update(
{
'dictionary_word': category_mapping['dictionary_word'],
'similarity_score': category_mapping['similarity_score'],
'confidence_score': category_mapping['confidence_score']
}
)
else:
gpt_mapping = self.similarity_fast.find_most_similar_word(gpt_recommended_word)
if gpt_mapping['similarity_score'] > mapping['similarity_score']:
gpt_mapping.update(
{
'input_word': input_word,
'cleaned_word': mapping['cleaned_word']
}
)
mapping = gpt_mapping
except Exception as e:
logging.info(f" - Error querying GPT: {e}")
return mapping
def handle_multi_item(self, input_word):
# The input word has a comma or a slash in it
# If it has more commas, its comma-delimited
# If it has more slashes, its slash-delimited
# If it has equal number of commas and slashes, we'll go with slashes
logging.info(f"Handling multi-item {input_word}")
input_word_parts = extract_items(input_word)
logging.info(f" - Extracted items: {input_word_parts}")
mappings = []
for part in input_word_parts:
mapping = self.handle_single_item(part)
if mapping:
# Some words in the mapping can be ignored because they are
# just filler words that don't add any value to the mapping
if mapping['ignore'] == False:
mappings.append(mapping)
# look up the dictionary values for each mapping
# find the wweia category
# if all mappings have the same wweia category, return "homogenous", else "heterogeneous"
# if is_food is False for any mappings, return "Non-Food Item" as dictionary word
for mapping in mappings:
if mapping['is_food'] == False:
return nonfood_template(
input_word,
mapping['cleaned_word'],
mapping['food_nonfood_score']
)
break
dictionary_words = [mapping['dictionary_word'] for mapping in mappings]
if len(set(dictionary_words)) == 0:
return empty_template(input_word)
# check if "heterogeneous" is in the wweia category of any of the mappings
# otherwise we find the mapping with the lowest DMC value, and return that as the dictionary word, dmc, wc, and leakage values
heterogeneous_exists = False
most_conservative_mapping = None
for mapping in mappings:
if mapping['sr_legacy_food_category'] == "Heterogeneous Mixture":
heterogeneous_exists = True
break
else:
dry_matter_content = mapping.get('dry_matter_content')
if dry_matter_content is not None:
if most_conservative_mapping is None or dry_matter_content < most_conservative_mapping.get('dry_matter_content', float('inf')):
most_conservative_mapping = mapping
if heterogeneous_exists:
return heterogeneous_template(input_word)
elif most_conservative_mapping is not None:
return multi_item_template(input_word, None, most_conservative_mapping)
else:
logging.warning(f" - No mappings found for {input_word}")
return None
def handle_single_item(self, input_word):
input_word_clean = clean_word(input_word)
if not input_word_clean:
return None
if input_word_clean == "":
return None
# try the singular form of the word
singular = self.pluralizer.pluralize(input_word_clean, 1)
# mapping = cached_get_mapping_from_db(self.db_cursor, singular)
mapping_with_dict = self.mappings_with_dictionary.get(singular)
if mapping_with_dict:
mapping_with_dict.update({
'input_word': input_word,
})
logging.info(f" - Found mapping in db: {mapping_with_dict}")
return mapping_with_dict
# try the plural form of the word
plural = self.pluralizer.pluralize(input_word_clean, 2)
mapping_with_dict = self.mappings_with_dictionary.get(plural)
if mapping_with_dict:
mapping_with_dict.update({
'input_word': input_word,
})
logging.info(f" - Found mapping in db: {mapping_with_dict}")
return mapping_with_dict
food_nonfood = classify_as_food_nonfood(input_word_clean)
# if we're very confident that the word is non-food, let's not even classify it
if food_nonfood[1] > 0.9 and food_nonfood[0] == False:
mapping = nonfood_template(input_word, input_word_clean, food_nonfood[1])
store_mapping_to_db(self.db_cursor, self.db_conn, mapping)
self.mappings_with_dictionary[input_word_clean] = mapping
return self.wrap_mapping_with_dictionary_data(mapping)
mapping = self.perform_mapping(input_word)
specificity = classify_text_to_specificity(input_word_clean)
mapping.update({
'specificity': specificity
})
food_nonfood_pessimistic = pessimistic_food_nonfood_score(food_nonfood, mapping['similarity_score'])
mapping.update({
'is_food': food_nonfood_pessimistic[0],
'food_nonfood_score': food_nonfood_pessimistic[1]
})
store_mapping_to_db(self.db_cursor, self.db_conn, mapping)
self.mappings_with_dictionary[input_word_clean] = mapping
return self.wrap_mapping_with_dictionary_data(mapping)
def wrap_mapping_with_dictionary_data(self, mapping):
if not mapping:
return None
dictionary_result = get_dictionary_data_from_db(self.db_cursor, mapping['dictionary_word'])
# set default on ignore
ignore = mapping['ignore'] if 'ignore' in mapping else False
mapping.update({
'wweia_category': dictionary_result['wweia_category'] if dictionary_result else None,
'sr_legacy_food_category': dictionary_result['sr_legacy_food_category'] if dictionary_result else None,
'water_content': dictionary_result['water_content'] if dictionary_result else None,
'dry_matter_content': dictionary_result['dry_matter_content'] if dictionary_result else None,
'leakage': dictionary_result['leakage'] if dictionary_result else None,
'ignore': ignore
})
return mapping
def add_carbon_credit_data(self, mapping, donor, date, weight):
if not mapping:
return None
mapping.update({
'donor': donor
})
try:
weight = float(weight)
except ValueError:
weight = 0
except Exception as e:
logging.info(f" - Error converting weight to float: {e}")
weight = 0
if math.isnan(weight):
weight = 0
mapping.update({
'date': date,
'weight': weight,
'weight_metric_tonnes': weight * 0.000453592,
'distance': 250,
'ef': 2.968073544,
'mt_lb_mile': 0.0000000809,
})
required_fields_exist = 'leakage' in mapping and mapping['leakage'] is not None and 'dry_matter_content' in mapping and mapping['dry_matter_content'] is not None
if mapping['is_food'] == False or required_fields_exist == False:
return {
'baseline_emissions': None,
'leakage_emissions': None,
'project_emissions': None,
'total_emissions_reduction': None,
**mapping
}
logging.info(f" - Calculating carbon credits for: {mapping}")
baseline_emissions = mapping['weight_metric_tonnes'] * mapping['dry_matter_content'] * mapping['ef']
leakage_emissions = mapping['leakage'] * baseline_emissions
project_emissions = mapping['distance'] * mapping['mt_lb_mile'] * baseline_emissions
total_emissions_reduction = baseline_emissions - leakage_emissions - project_emissions
mapping.update({
'baseline_emissions': baseline_emissions,
'leakage_emissions': leakage_emissions,
'project_emissions': project_emissions,
'total_emissions_reduction': total_emissions_reduction
})
return mapping
def match_words(self, input_data):
# input_data is a list of tuples, where each tuple is (description, donor)
results = []
result_batch = []
for input_item in tqdm(input_data, desc="Processing input words"):
input_word = input_item[0]
input_word_alt = input_item[1] if len(input_item) > 1 else None
input_row_num = input_item[2] if len(input_item) > 2 else None
input_donor = input_item[3] if len(input_item) > 3 else None
input_date = input_item[4] if len(input_item) > 4 else None
input_weight = input_item[5] if len(input_item) > 5 else None
logging.info("")
logging.info(f"Processing: {input_word}")
is_empty = False
if is_empty_word(input_word):
if is_empty_word(input_word_alt):
mapping = empty_template(input_word)
is_empty = True
else:
input_word = input_word_alt
if not is_empty:
if has_delimiters(input_word):
mapping = self.handle_multi_item(input_word)
else:
mapping = self.handle_single_item(input_word)
if mapping:
mapping = dict(mapping)
mapping = self.add_carbon_credit_data(mapping, input_donor, input_date, input_weight)
mapping.update({
'run_row': input_row_num
})
result_batch.append(mapping)
# store_result_to_db(self.db_cursor, self.db_conn, self.run_key, mapping)
results.append(mapping)
if len(result_batch) >= 100:
insert_result(self.run_key, result_batch)
result_batch = []
if len(result_batch) > 0:
insert_result(self.run_key, result_batch)
result_batch = []
return results