brightly-ai / db /db_utils.py
beweinreich's picture
replace with foundation category
b1335de
raw
history blame
6.53 kB
import os
import psycopg2
import logging
from dotenv import load_dotenv
from functools import lru_cache
load_dotenv()
def get_connection():
DATABASE_URL = os.environ['DATABASE_URL']
conn = psycopg2.connect(DATABASE_URL, sslmode='require')
# conn = psycopg2.connect(
# dbname="brightly_ai",
# user="bw",
# password="",
# host="localhost",
# port="5432"
# )
initialize_db(conn)
return conn
def initialize_db(conn):
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS mappings (
input_word TEXT PRIMARY KEY,
cleaned_word TEXT,
matching_word TEXT,
dictionary_word TEXT,
similarity_score REAL,
confidence_score REAL,
similar_words TEXT,
is_food BOOLEAN,
food_nonfood_score REAL,
reviewed BOOLEAN DEFAULT FALSE,
flagged BOOLEAN DEFAULT FALSE,
ignored BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS dictionary (
fdc_id INTEGER PRIMARY KEY,
description TEXT,
foundation_category TEXT,
wweia_category TEXT,
water_content REAL,
dry_matter_content REAL,
leakage REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS results (
id BIGSERIAL PRIMARY KEY,
run_key TEXT,
run_row INTEGER,
date TEXT,
input_word TEXT,
dictionary_word TEXT,
is_food BOOLEAN,
wweia_category TEXT,
foundation_category TEXT,
dry_matter_content REAL,
leakage REAL,
weight REAL,
weight_metric_tonnes REAL,
donor TEXT,
similarity_score REAL,
food_nonfood_score REAL,
distance REAL,
ef REAL,
mt_lb_mile REAL,
baseline_emissions REAL,
leakage_emissions REAL,
project_emissions REAL,
total_emissions_reduction REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_cleaned_word ON mappings(cleaned_word);
CREATE INDEX IF NOT EXISTS idx_dictionary_word ON mappings(dictionary_word);
CREATE INDEX IF NOT EXISTS idx_description ON dictionary(description);
''')
conn.commit()
@lru_cache(maxsize=1024)
def cached_get_mapping_from_db(db_cursor, word):
return get_mapping_from_db(db_cursor, word)
def get_mapping_from_db(cursor, cleaned_word):
cursor.execute('SELECT * FROM mappings WHERE cleaned_word = %s', (cleaned_word,))
row = cursor.fetchone()
if row:
columns = [col[0] for col in cursor.description]
return dict(zip(columns, row))
return None
def get_batch_mapping_from_db(cursor, cleaned_words):
if not cleaned_words:
return {}
# Create a query with a list of placeholders
placeholders = ', '.join(['%s'] * len(cleaned_words))
query = f'SELECT * FROM mappings WHERE cleaned_word IN ({placeholders})'
cursor.execute(query, tuple(cleaned_words))
rows = cursor.fetchall()
if rows:
columns = [col[0] for col in cursor.description]
return {row[columns.index('cleaned_word')]: dict(zip(columns, row)) for row in rows}
return {}
def get_dictionary_data_from_db(cursor, dictionary_word):
cursor.execute('SELECT * FROM dictionary WHERE description = %s', (dictionary_word,))
row = cursor.fetchone()
if row:
columns = [col[0] for col in cursor.description]
return dict(zip(columns, row))
return None
def store_mapping_to_db(cursor, conn, mapping):
logging.info(f" - Storing new mapping to db: {mapping}")
try:
cursor.execute('''
INSERT INTO mappings (input_word, cleaned_word, matching_word, dictionary_word, similarity_score, confidence_score, similar_words, is_food, food_nonfood_score)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
''', (
mapping['input_word'],
mapping['cleaned_word'],
mapping['matching_word'],
mapping['dictionary_word'],
mapping['similarity_score'],
mapping['confidence_score'],
mapping['similar_words'],
mapping['is_food'],
mapping['food_nonfood_score']
))
conn.commit()
except Exception as e:
logging.info(f" - Error storing mapping to db: {e}")
conn.rollback()
return False
def store_result_to_db(cursor, conn, run_key, result):
try:
cursor.execute('''
INSERT INTO results (run_key, run_row, date, input_word, dictionary_word, is_food, foundation_category, wweia_category, dry_matter_content, leakage, weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, project_emissions, total_emissions_reduction)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
''', (
run_key,
result['run_row'],
result['date'],
result['input_word'],
result['dictionary_word'],
result['is_food'],
result['foundation_category'],
result['wweia_category'],
result['dry_matter_content'],
result['leakage'],
result['weight'],
result['weight_metric_tonnes'],
result['donor'],
result['similarity_score'],
result['food_nonfood_score'],
result['distance'],
result['ef'],
result['mt_lb_mile'],
result['baseline_emissions'],
result['leakage_emissions'],
result['project_emissions'],
result['total_emissions_reduction'],
))
conn.commit()
except Exception as e:
logging.info(f" - Error storing result to db: {e}")
conn.rollback()
return False