Spaces:
Paused
Paused
import os | |
import psycopg2 | |
import logging | |
from dotenv import load_dotenv | |
from functools import lru_cache | |
load_dotenv() | |
def get_connection(): | |
DATABASE_URL = os.environ['DATABASE_URL'] | |
conn = psycopg2.connect(DATABASE_URL, sslmode='require') | |
initialize_db(conn) | |
return conn | |
def initialize_db(conn): | |
cursor = conn.cursor() | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS mappings ( | |
input_word TEXT PRIMARY KEY, | |
cleaned_word TEXT, | |
dictionary_word TEXT, | |
specificity TEXT, | |
similarity_score REAL, | |
confidence_score REAL, | |
similar_words TEXT, | |
is_food BOOLEAN, | |
food_nonfood_score REAL, | |
reviewed BOOLEAN DEFAULT FALSE, | |
gpt_reviewed BOOLEAN DEFAULT FALSE, | |
flagged BOOLEAN DEFAULT FALSE, | |
ignore BOOLEAN DEFAULT FALSE, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS dictionary ( | |
fdc_id INTEGER PRIMARY KEY, | |
description TEXT, | |
sr_legacy_food_category TEXT, | |
wweia_category TEXT, | |
water_content REAL, | |
dry_matter_content REAL, | |
leakage REAL, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS organizations ( | |
id SERIAL PRIMARY KEY, | |
name TEXT, | |
alt_spellings TEXT[], | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS run_meta ( | |
run_key TEXT PRIMARY KEY, | |
organization_id INTEGER, | |
year TEXT, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS rollups ( | |
run_key TEXT, | |
year TEXT, | |
donations_double_counting_correction REAL, | |
total_emissions_reduction_pre REAL, | |
total_emissions_reduction REAL, | |
total_weight_metric_tonnes REAL, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute(''' | |
CREATE TABLE IF NOT EXISTS results ( | |
id BIGSERIAL PRIMARY KEY, | |
run_key TEXT, | |
run_row INTEGER, | |
date TEXT, | |
input_word TEXT, | |
dictionary_word TEXT, | |
is_food BOOLEAN, | |
wweia_category TEXT, | |
sr_legacy_food_category TEXT, | |
dry_matter_content REAL, | |
leakage REAL, | |
weight REAL, | |
weight_metric_tonnes REAL, | |
donor TEXT, | |
similarity_score REAL, | |
food_nonfood_score REAL, | |
distance REAL, | |
ef REAL, | |
mt_lb_mile REAL, | |
baseline_emissions REAL, | |
leakage_emissions REAL, | |
project_emissions REAL, | |
total_emissions_reduction REAL, | |
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
) | |
''') | |
cursor.execute(''' | |
CREATE INDEX IF NOT EXISTS idx_cleaned_word ON mappings(cleaned_word); | |
CREATE INDEX IF NOT EXISTS idx_dictionary_word ON mappings(dictionary_word); | |
CREATE INDEX IF NOT EXISTS idx_description ON dictionary(description); | |
CREATE UNIQUE INDEX IF NOT EXISTS run_row_run_key_uniq ON results(run_key text_ops,run_row int4_ops); | |
CREATE UNIQUE INDEX IF NOT EXISTS rollups_run_key_key ON rollups(run_key text_ops); | |
''') | |
conn.commit() | |
def cached_get_mapping_from_db(db_cursor, word): | |
return get_mapping_from_db(db_cursor, word) | |
def get_mapping_from_db(cursor, cleaned_word): | |
cursor.execute('SELECT * FROM mappings WHERE cleaned_word = %s', (cleaned_word,)) | |
row = cursor.fetchone() | |
if row: | |
columns = [col[0] for col in cursor.description] | |
return dict(zip(columns, row)) | |
return None | |
def get_batch_mapping_from_db(cursor, cleaned_words): | |
if not cleaned_words: | |
return {} | |
# Create a query with a list of placeholders | |
placeholders = ', '.join(['%s'] * len(cleaned_words)) | |
query = f'SELECT * FROM mappings WHERE cleaned_word IN ({placeholders})' | |
cursor.execute(query, tuple(cleaned_words)) | |
rows = cursor.fetchall() | |
if rows: | |
columns = [col[0] for col in cursor.description] | |
return {row[columns.index('cleaned_word')]: dict(zip(columns, row)) for row in rows} | |
return {} | |
def get_dictionary_data_from_db(cursor, dictionary_word): | |
cursor.execute('SELECT * FROM dictionary WHERE description = %s', (dictionary_word,)) | |
row = cursor.fetchone() | |
if row: | |
columns = [col[0] for col in cursor.description] | |
return dict(zip(columns, row)) | |
return None | |
def store_mapping_to_db(cursor, conn, mapping): | |
logging.info(f" - Storing new mapping to db: {mapping}") | |
try: | |
cursor.execute(''' | |
INSERT INTO mappings (input_word, cleaned_word, dictionary_word, similarity_score, confidence_score, similar_words, is_food, food_nonfood_score, specificity) | |
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) | |
''', ( | |
mapping['input_word'], | |
mapping['cleaned_word'], | |
mapping['dictionary_word'], | |
mapping['similarity_score'], | |
mapping['confidence_score'], | |
mapping['similar_words'], | |
mapping['is_food'], | |
mapping['food_nonfood_score'], | |
mapping['specificity'] | |
)) | |
conn.commit() | |
except Exception as e: | |
logging.warn(f" - Error storing mapping to db: {e}") | |
conn.rollback() | |
return False | |
def store_result_to_db(cursor, conn, run_key, result): | |
values = ( | |
run_key, | |
result['run_row'], | |
result['date'], | |
result['input_word'], | |
result['dictionary_word'], | |
result['is_food'], | |
result['sr_legacy_food_category'], | |
result['wweia_category'], | |
result['dry_matter_content'], | |
result['leakage'], | |
result['weight'], | |
result['weight_metric_tonnes'], | |
result['donor'], | |
result['similarity_score'], | |
result['food_nonfood_score'], | |
result['distance'], | |
result['ef'], | |
result['mt_lb_mile'], | |
result['baseline_emissions'], | |
result['leakage_emissions'], | |
result['project_emissions'], | |
result['total_emissions_reduction'] | |
) | |
cursor.execute(''' | |
INSERT INTO results (run_key, run_row, date, input_word, dictionary_word, is_food, sr_legacy_food_category, wweia_category, dry_matter_content, leakage, weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, project_emissions, total_emissions_reduction) | |
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) | |
''', values) | |
conn.commit() | |
return True | |