import os import psycopg2 import logging from dotenv import load_dotenv from functools import lru_cache load_dotenv() def get_connection(): DATABASE_URL = os.environ['DATABASE_URL'] conn = psycopg2.connect(DATABASE_URL, sslmode='require') initialize_db(conn) return conn def initialize_db(conn): cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS mappings ( input_word TEXT PRIMARY KEY, cleaned_word TEXT, dictionary_word TEXT, similarity_score REAL, confidence_score REAL, similar_words TEXT, is_food BOOLEAN, food_nonfood_score REAL, reviewed BOOLEAN DEFAULT FALSE, flagged BOOLEAN DEFAULT FALSE, ignore BOOLEAN DEFAULT FALSE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS dictionary ( fdc_id INTEGER PRIMARY KEY, description TEXT, sr_legacy_food_category TEXT, wweia_category TEXT, water_content REAL, dry_matter_content REAL, leakage REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS organizations ( id SERIAL PRIMARY KEY, name TEXT, alt_spellings TEXT[], created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS run_meta ( run_key TEXT PRIMARY KEY, organization_id INTEGER, year TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS rollups ( run_key TEXT, year TEXT, donations_double_counting_correction REAL, total_emissions_reduction_pre REAL, total_emissions_reduction REAL, total_weight_metric_tonnes REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS results ( id BIGSERIAL PRIMARY KEY, run_key TEXT, run_row INTEGER, date TEXT, input_word TEXT, dictionary_word TEXT, is_food BOOLEAN, wweia_category TEXT, sr_legacy_food_category TEXT, dry_matter_content REAL, leakage REAL, weight REAL, weight_metric_tonnes REAL, donor TEXT, similarity_score REAL, food_nonfood_score REAL, distance REAL, ef REAL, mt_lb_mile REAL, baseline_emissions REAL, leakage_emissions REAL, project_emissions REAL, total_emissions_reduction REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_cleaned_word ON mappings(cleaned_word); CREATE INDEX IF NOT EXISTS idx_dictionary_word ON mappings(dictionary_word); CREATE INDEX IF NOT EXISTS idx_description ON dictionary(description); CREATE UNIQUE INDEX IF NOT EXISTS run_row_run_key_uniq ON results(run_key text_ops,run_row int4_ops); CREATE UNIQUE INDEX IF NOT EXISTS rollups_run_key_key ON rollups(run_key text_ops); ''') conn.commit() @lru_cache(maxsize=1024) def cached_get_mapping_from_db(db_cursor, word): return get_mapping_from_db(db_cursor, word) def get_mapping_from_db(cursor, cleaned_word): cursor.execute('SELECT * FROM mappings WHERE cleaned_word = %s', (cleaned_word,)) row = cursor.fetchone() if row: columns = [col[0] for col in cursor.description] return dict(zip(columns, row)) return None def get_batch_mapping_from_db(cursor, cleaned_words): if not cleaned_words: return {} # Create a query with a list of placeholders placeholders = ', '.join(['%s'] * len(cleaned_words)) query = f'SELECT * FROM mappings WHERE cleaned_word IN ({placeholders})' cursor.execute(query, tuple(cleaned_words)) rows = cursor.fetchall() if rows: columns = [col[0] for col in cursor.description] return {row[columns.index('cleaned_word')]: dict(zip(columns, row)) for row in rows} return {} def get_dictionary_data_from_db(cursor, dictionary_word): cursor.execute('SELECT * FROM dictionary WHERE description = %s', (dictionary_word,)) row = cursor.fetchone() if row: columns = [col[0] for col in cursor.description] return dict(zip(columns, row)) return None def store_mapping_to_db(cursor, conn, mapping): logging.info(f" - Storing new mapping to db: {mapping}") try: cursor.execute(''' INSERT INTO mappings (input_word, cleaned_word, dictionary_word, similarity_score, confidence_score, similar_words, is_food, food_nonfood_score) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ''', ( mapping['input_word'], mapping['cleaned_word'], mapping['dictionary_word'], mapping['similarity_score'], mapping['confidence_score'], mapping['similar_words'], mapping['is_food'], mapping['food_nonfood_score'] )) conn.commit() except Exception as e: logging.warn(f" - Error storing mapping to db: {e}") conn.rollback() return False def store_result_to_db(cursor, conn, run_key, result): values = ( run_key, result['run_row'], result['date'], result['input_word'], result['dictionary_word'], result['is_food'], result['sr_legacy_food_category'], result['wweia_category'], result['dry_matter_content'], result['leakage'], result['weight'], result['weight_metric_tonnes'], result['donor'], result['similarity_score'], result['food_nonfood_score'], result['distance'], result['ef'], result['mt_lb_mile'], result['baseline_emissions'], result['leakage_emissions'], result['project_emissions'], result['total_emissions_reduction'] ) cursor.execute(''' INSERT INTO results (run_key, run_row, date, input_word, dictionary_word, is_food, sr_legacy_food_category, wweia_category, dry_matter_content, leakage, weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, project_emissions, total_emissions_reduction) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ''', values) conn.commit() return True