Spaces:
Paused
Paused
import os | |
import psycopg2 | |
import logging | |
from dotenv import load_dotenv | |
from functools import lru_cache | |
load_dotenv() | |
def get_connection(): | |
DATABASE_URL = os.environ['DATABASE_URL'] | |
conn = psycopg2.connect(DATABASE_URL, sslmode='require') | |
# conn = psycopg2.connect( | |
# dbname="brightly_ai", | |
# user="bw", | |
# password="", | |
# host="localhost", | |
# port="5432" | |
# ) | |
initialize_db(conn) | |
return conn | |
def initialize_db(conn): | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS mappings ( | |
input_word TEXT PRIMARY KEY, | |
cleaned_word TEXT, | |
matching_word TEXT, | |
dictionary_word TEXT, | |
similarity_score REAL, | |
confidence_score REAL, | |
similar_words TEXT, | |
is_food BOOLEAN, | |
food_nonfood_score REAL, | |
reviewed BOOLEAN DEFAULT FALSE, | |
flagged BOOLEAN DEFAULT FALSE, | |
ignored BOOLEAN DEFAULT FALSE, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS dictionary ( | |
fdc_id INTEGER PRIMARY KEY, | |
description TEXT, | |
food_category TEXT, | |
wweia_category TEXT, | |
water_content REAL, | |
dry_matter_content REAL, | |
leakage REAL, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS results ( | |
id BIGSERIAL PRIMARY KEY, | |
run_key TEXT, | |
run_row INTEGER, | |
date TEXT, | |
input_word TEXT, | |
dictionary_word TEXT, | |
is_food BOOLEAN, | |
wweia_category TEXT, | |
dry_matter_content REAL, | |
leakage REAL, | |
weight REAL, | |
weight_metric_tonnes REAL, | |
donor TEXT, | |
similarity_score REAL, | |
food_nonfood_score REAL, | |
distance REAL, | |
ef REAL, | |
mt_lb_mile REAL, | |
baseline_emissions REAL, | |
leakage_emissions REAL, | |
project_emissions REAL, | |
total_emissions_reduction REAL, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute(''' | |
CREATE INDEX IF NOT EXISTS idx_cleaned_word ON mappings(cleaned_word); | |
CREATE INDEX IF NOT EXISTS idx_dictionary_word ON mappings(dictionary_word); | |
CREATE INDEX IF NOT EXISTS idx_description ON dictionary(description); | |
''') | |
conn.commit() | |
def cached_get_mapping_from_db(db_cursor, word): | |
return get_mapping_from_db(db_cursor, word) | |
def get_mapping_from_db(cursor, cleaned_word): | |
cursor.execute('SELECT * FROM mappings WHERE cleaned_word = %s', (cleaned_word,)) | |
row = cursor.fetchone() | |
if row: | |
columns = [col[0] for col in cursor.description] | |
return dict(zip(columns, row)) | |
return None | |
def get_batch_mapping_from_db(cursor, cleaned_words): | |
if not cleaned_words: | |
return {} | |
# Create a query with a list of placeholders | |
placeholders = ', '.join(['%s'] * len(cleaned_words)) | |
query = f'SELECT * FROM mappings WHERE cleaned_word IN ({placeholders})' | |
cursor.execute(query, tuple(cleaned_words)) | |
rows = cursor.fetchall() | |
if rows: | |
columns = [col[0] for col in cursor.description] | |
return {row[columns.index('cleaned_word')]: dict(zip(columns, row)) for row in rows} | |
return {} | |
def get_dictionary_data_from_db(cursor, dictionary_word): | |
cursor.execute('SELECT * FROM dictionary WHERE description = %s', (dictionary_word,)) | |
row = cursor.fetchone() | |
if row: | |
columns = [col[0] for col in cursor.description] | |
return dict(zip(columns, row)) | |
return None | |
def store_mapping_to_db(cursor, conn, mapping): | |
logging.info(f" - Storing new mapping to db: {mapping}") | |
try: | |
cursor.execute(''' | |
INSERT INTO mappings (input_word, cleaned_word, matching_word, dictionary_word, similarity_score, confidence_score, similar_words, is_food, food_nonfood_score) | |
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) | |
''', ( | |
mapping['input_word'], | |
mapping['cleaned_word'], | |
mapping['matching_word'], | |
mapping['dictionary_word'], | |
mapping['similarity_score'], | |
mapping['confidence_score'], | |
mapping['similar_words'], | |
mapping['is_food'], | |
mapping['food_nonfood_score'] | |
)) | |
conn.commit() | |
except Exception as e: | |
logging.info(f" - Error storing mapping to db: {e}") | |
conn.rollback() | |
return False | |
def store_result_to_db(cursor, conn, run_key, result): | |
try: | |
cursor.execute(''' | |
INSERT INTO results (run_key, run_row, date, input_word, dictionary_word, is_food, wweia_category, dry_matter_content, leakage, weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, project_emissions, total_emissions_reduction) | |
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) | |
''', ( | |
run_key, | |
result['run_row'], | |
result['date'], | |
result['input_word'], | |
result['dictionary_word'], | |
result['is_food'], | |
result['wweia_category'], | |
result['dry_matter_content'], | |
result['leakage'], | |
result['weight'], | |
result['weight_metric_tonnes'], | |
result['donor'], | |
result['similarity_score'], | |
result['food_nonfood_score'], | |
result['distance'], | |
result['ef'], | |
result['mt_lb_mile'], | |
result['baseline_emissions'], | |
result['leakage_emissions'], | |
result['project_emissions'], | |
result['total_emissions_reduction'], | |
)) | |
conn.commit() | |
except Exception as e: | |
logging.info(f" - Error storing result to db: {e}") | |
conn.rollback() | |
return False | |
def store_results_batch(cursor, conn, run_key, results_batch): | |
logging.info(f"Storing batch of length {len(results_batch)}") | |
cursor.executemany(''' | |
INSERT INTO results ( | |
run_key, run_row, date, input_word, dictionary_word, is_food, wweia_category, dry_matter_content, leakage, weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, project_emissions, total_emissions_reduction | |
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) | |
ON CONFLICT (run_key, run_row) | |
DO UPDATE SET | |
date = EXCLUDED.date, | |
input_word = EXCLUDED.input_word, | |
dictionary_word = EXCLUDED.dictionary_word, | |
is_food = EXCLUDED.is_food, | |
wweia_category = EXCLUDED.wweia_category, | |
dry_matter_content = EXCLUDED.dry_matter_content, | |
leakage = EXCLUDED.leakage, | |
weight = EXCLUDED.weight, | |
weight_metric_tonnes = EXCLUDED.weight_metric_tonnes, | |
donor = EXCLUDED.donor, | |
similarity_score = EXCLUDED.similarity_score, | |
food_nonfood_score = EXCLUDED.food_nonfood_score, | |
distance = EXCLUDED.distance, | |
ef = EXCLUDED.ef, | |
mt_lb_mile = EXCLUDED.mt_lb_mile, | |
baseline_emissions = EXCLUDED.baseline_emissions, | |
leakage_emissions = EXCLUDED.leakage_emissions, | |
project_emissions = EXCLUDED.project_emissions, | |
total_emissions_reduction = EXCLUDED.total_emissions_reduction | |
''', [ | |
( | |
run_key, | |
result['run_row'], | |
result['date'], | |
result['input_word'], | |
result['dictionary_word'], | |
result['is_food'], | |
result['wweia_category'], | |
result['dry_matter_content'], | |
result['leakage'], | |
result['weight'], | |
result['weight_metric_tonnes'], | |
result['donor'], | |
result['similarity_score'], | |
result['food_nonfood_score'], | |
result['distance'], | |
result['ef'], | |
result['mt_lb_mile'], | |
result['baseline_emissions'], | |
result['leakage_emissions'], | |
result['project_emissions'], | |
result['total_emissions_reduction'] | |
) for result in results_batch | |
]) | |
conn.commit() | |