brightly-ai / db /db_utils.py
beweinreich's picture
bugfix db utils
05ca0e8
raw
history blame
No virus
10.4 kB
import os
import psycopg2
import logging
from dotenv import load_dotenv
from functools import lru_cache
from psycopg2.extras import execute_values
load_dotenv()
def get_connection():
DATABASE_URL = os.environ['DATABASE_URL']
print(f"Connecting to database...")
try:
conn = psycopg2.connect(DATABASE_URL, sslmode='require')
initialize_db(conn) # Ensure this function is defined and correctly initializes the database if needed
print("Database connection established")
return conn
except Exception as e:
print(f"Failed to connect to database: {e}")
raise
def initialize_db(conn):
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS mappings (
input_word TEXT PRIMARY KEY,
cleaned_word TEXT,
dictionary_word TEXT,
specificity TEXT,
similarity_score REAL,
confidence_score REAL,
similar_words TEXT,
is_food BOOLEAN,
food_nonfood_score REAL,
reviewed BOOLEAN DEFAULT FALSE,
gpt_reviewed BOOLEAN DEFAULT FALSE,
flagged BOOLEAN DEFAULT FALSE,
ignore BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS dictionary (
fdc_id INTEGER PRIMARY KEY,
description TEXT,
sr_legacy_food_category TEXT,
wweia_category TEXT,
water_content REAL,
dry_matter_content REAL,
leakage REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS organizations (
id SERIAL PRIMARY KEY,
name TEXT,
alt_spellings TEXT[],
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS run_meta (
run_key TEXT PRIMARY KEY,
organization_id INTEGER,
year TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS rollups (
run_key TEXT,
year TEXT,
donations_double_counting_correction REAL,
total_emissions_reduction_pre REAL,
total_emissions_reduction REAL,
total_weight_metric_tonnes REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS results (
id BIGSERIAL PRIMARY KEY,
run_key TEXT,
run_row INTEGER,
date TEXT,
input_word TEXT,
dictionary_word TEXT,
is_food BOOLEAN,
wweia_category TEXT,
sr_legacy_food_category TEXT,
dry_matter_content REAL,
leakage REAL,
weight REAL,
weight_metric_tonnes REAL,
donor TEXT,
similarity_score REAL,
food_nonfood_score REAL,
distance REAL,
ef REAL,
mt_lb_mile REAL,
baseline_emissions REAL,
leakage_emissions REAL,
project_emissions REAL,
total_emissions_reduction REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_cleaned_word ON mappings(cleaned_word);
CREATE INDEX IF NOT EXISTS idx_dictionary_word ON mappings(dictionary_word);
CREATE INDEX IF NOT EXISTS idx_description ON dictionary(description);
CREATE UNIQUE INDEX IF NOT EXISTS run_row_run_key_uniq ON results(run_key text_ops,run_row int4_ops);
CREATE UNIQUE INDEX IF NOT EXISTS rollups_run_key_key ON rollups(run_key text_ops);
''')
conn.commit()
@lru_cache(maxsize=1024)
def cached_get_mapping_from_db(db_cursor, word):
return get_mapping_from_db(db_cursor, word)
def get_mapping_from_db(cursor, cleaned_word):
cursor.execute('SELECT * FROM mappings WHERE cleaned_word = %s', (cleaned_word,))
row = cursor.fetchone()
if row:
columns = [col[0] for col in cursor.description]
return dict(zip(columns, row))
return None
def get_batch_mapping_from_db(cursor, cleaned_words):
if not cleaned_words:
return {}
# Create a query with a list of placeholders
placeholders = ', '.join(['%s'] * len(cleaned_words))
query = f'SELECT * FROM mappings WHERE cleaned_word IN ({placeholders})'
cursor.execute(query, tuple(cleaned_words))
rows = cursor.fetchall()
if rows:
columns = [col[0] for col in cursor.description]
return {row[columns.index('cleaned_word')]: dict(zip(columns, row)) for row in rows}
return {}
def get_dictionary_data_from_db(cursor, dictionary_word):
cursor.execute('SELECT * FROM dictionary WHERE description = %s', (dictionary_word,))
row = cursor.fetchone()
if row:
columns = [col[0] for col in cursor.description]
return dict(zip(columns, row))
return None
def store_mapping_to_db(cursor, conn, mapping):
logging.info(f" - Storing new mapping to db: {mapping}")
try:
cursor.execute('''
INSERT INTO mappings (input_word, cleaned_word, dictionary_word, similarity_score, confidence_score, similar_words, is_food, food_nonfood_score, specificity)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
''', (
mapping['input_word'],
mapping['cleaned_word'],
mapping['dictionary_word'],
mapping['similarity_score'],
mapping['confidence_score'],
mapping['similar_words'],
mapping['is_food'],
mapping['food_nonfood_score'],
mapping['specificity']
))
conn.commit()
except Exception as e:
logging.warn(f" - Error storing mapping to db: {e}")
conn.rollback()
return False
def store_result_to_db(cursor, conn, run_key, result):
values = (
run_key,
result['run_row'],
result['date'],
result['input_word'],
result['dictionary_word'],
result['is_food'],
result['sr_legacy_food_category'],
result['wweia_category'],
result['dry_matter_content'],
result['leakage'],
result['weight'],
result['weight_metric_tonnes'],
result['donor'],
result['similarity_score'],
result['food_nonfood_score'],
result['distance'],
result['ef'],
result['mt_lb_mile'],
result['baseline_emissions'],
result['leakage_emissions'],
result['project_emissions'],
result['total_emissions_reduction']
)
cursor.execute('''
INSERT INTO results (run_key, run_row, date, input_word, dictionary_word, is_food, sr_legacy_food_category, wweia_category, dry_matter_content, leakage, weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, project_emissions, total_emissions_reduction)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
''', values)
conn.commit()
return True
def store_batch_results_to_db(conn, cursor, run_key, results):
values = [
(
run_key,
result['run_row'],
result['date'],
result['input_word'],
result['dictionary_word'],
result['is_food'],
result['sr_legacy_food_category'],
result['wweia_category'],
result['dry_matter_content'],
result['leakage'],
result['weight'],
result['weight_metric_tonnes'],
result['donor'],
result['similarity_score'],
result['food_nonfood_score'],
result['distance'],
result['ef'],
result['mt_lb_mile'],
result['baseline_emissions'],
result['leakage_emissions'],
result['project_emissions'],
result['total_emissions_reduction']
)
for result in results
]
insert_query = '''
INSERT INTO results (
run_key, run_row, date, input_word, dictionary_word, is_food,
sr_legacy_food_category, wweia_category, dry_matter_content, leakage,
weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score,
distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions,
project_emissions, total_emissions_reduction
) VALUES %s
ON CONFLICT (run_key, run_row)
DO UPDATE SET
date = EXCLUDED.date,
input_word = EXCLUDED.input_word,
dictionary_word = EXCLUDED.dictionary_word,
is_food = EXCLUDED.is_food,
sr_legacy_food_category = EXCLUDED.sr_legacy_food_category,
wweia_category = EXCLUDED.wweia_category,
dry_matter_content = EXCLUDED.dry_matter_content,
leakage = EXCLUDED.leakage,
weight = EXCLUDED.weight,
weight_metric_tonnes = EXCLUDED.weight_metric_tonnes,
donor = EXCLUDED.donor,
similarity_score = EXCLUDED.similarity_score,
food_nonfood_score = EXCLUDED.food_nonfood_score,
distance = EXCLUDED.distance,
ef = EXCLUDED.ef,
mt_lb_mile = EXCLUDED.mt_lb_mile,
baseline_emissions = EXCLUDED.baseline_emissions,
leakage_emissions = EXCLUDED.leakage_emissions,
project_emissions = EXCLUDED.project_emissions,
total_emissions_reduction = EXCLUDED.total_emissions_reduction;
'''
execute_values(cursor, insert_query, values)
conn.commit()
return True