Spaces:
Paused
Paused
File size: 6,526 Bytes
9189e38 b72dd6f 9189e38 b59ded9 9189e38 cc37897 9189e38 73fda7b a04fda8 9189e38 22ad617 b1335de 22ad617 b1f9aab efa7589 b1f9aab b1335de b1f9aab b59ded9 9189e38 b59ded9 9189e38 a216741 b1c94e2 9189e38 dbcabfb 9189e38 b72dd6f 9189e38 b1f9aab b1335de efa7589 b1f9aab efa7589 b1f9aab b1335de b1f9aab b72dd6f b1f9aab efa7589 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
import os
import psycopg2
import logging
from dotenv import load_dotenv
from functools import lru_cache
load_dotenv()
def get_connection():
DATABASE_URL = os.environ['DATABASE_URL']
conn = psycopg2.connect(DATABASE_URL, sslmode='require')
# conn = psycopg2.connect(
# dbname="brightly_ai",
# user="bw",
# password="",
# host="localhost",
# port="5432"
# )
initialize_db(conn)
return conn
def initialize_db(conn):
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS mappings (
input_word TEXT PRIMARY KEY,
cleaned_word TEXT,
matching_word TEXT,
dictionary_word TEXT,
similarity_score REAL,
confidence_score REAL,
similar_words TEXT,
is_food BOOLEAN,
food_nonfood_score REAL,
reviewed BOOLEAN DEFAULT FALSE,
flagged BOOLEAN DEFAULT FALSE,
ignored BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS dictionary (
fdc_id INTEGER PRIMARY KEY,
description TEXT,
foundation_category TEXT,
wweia_category TEXT,
water_content REAL,
dry_matter_content REAL,
leakage REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS results (
id BIGSERIAL PRIMARY KEY,
run_key TEXT,
run_row INTEGER,
date TEXT,
input_word TEXT,
dictionary_word TEXT,
is_food BOOLEAN,
wweia_category TEXT,
foundation_category TEXT,
dry_matter_content REAL,
leakage REAL,
weight REAL,
weight_metric_tonnes REAL,
donor TEXT,
similarity_score REAL,
food_nonfood_score REAL,
distance REAL,
ef REAL,
mt_lb_mile REAL,
baseline_emissions REAL,
leakage_emissions REAL,
project_emissions REAL,
total_emissions_reduction REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_cleaned_word ON mappings(cleaned_word);
CREATE INDEX IF NOT EXISTS idx_dictionary_word ON mappings(dictionary_word);
CREATE INDEX IF NOT EXISTS idx_description ON dictionary(description);
''')
conn.commit()
@lru_cache(maxsize=1024)
def cached_get_mapping_from_db(db_cursor, word):
return get_mapping_from_db(db_cursor, word)
def get_mapping_from_db(cursor, cleaned_word):
cursor.execute('SELECT * FROM mappings WHERE cleaned_word = %s', (cleaned_word,))
row = cursor.fetchone()
if row:
columns = [col[0] for col in cursor.description]
return dict(zip(columns, row))
return None
def get_batch_mapping_from_db(cursor, cleaned_words):
if not cleaned_words:
return {}
# Create a query with a list of placeholders
placeholders = ', '.join(['%s'] * len(cleaned_words))
query = f'SELECT * FROM mappings WHERE cleaned_word IN ({placeholders})'
cursor.execute(query, tuple(cleaned_words))
rows = cursor.fetchall()
if rows:
columns = [col[0] for col in cursor.description]
return {row[columns.index('cleaned_word')]: dict(zip(columns, row)) for row in rows}
return {}
def get_dictionary_data_from_db(cursor, dictionary_word):
cursor.execute('SELECT * FROM dictionary WHERE description = %s', (dictionary_word,))
row = cursor.fetchone()
if row:
columns = [col[0] for col in cursor.description]
return dict(zip(columns, row))
return None
def store_mapping_to_db(cursor, conn, mapping):
logging.info(f" - Storing new mapping to db: {mapping}")
try:
cursor.execute('''
INSERT INTO mappings (input_word, cleaned_word, matching_word, dictionary_word, similarity_score, confidence_score, similar_words, is_food, food_nonfood_score)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
''', (
mapping['input_word'],
mapping['cleaned_word'],
mapping['matching_word'],
mapping['dictionary_word'],
mapping['similarity_score'],
mapping['confidence_score'],
mapping['similar_words'],
mapping['is_food'],
mapping['food_nonfood_score']
))
conn.commit()
except Exception as e:
logging.info(f" - Error storing mapping to db: {e}")
conn.rollback()
return False
def store_result_to_db(cursor, conn, run_key, result):
try:
cursor.execute('''
INSERT INTO results (run_key, run_row, date, input_word, dictionary_word, is_food, foundation_category, wweia_category, dry_matter_content, leakage, weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, project_emissions, total_emissions_reduction)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
''', (
run_key,
result['run_row'],
result['date'],
result['input_word'],
result['dictionary_word'],
result['is_food'],
result['foundation_category'],
result['wweia_category'],
result['dry_matter_content'],
result['leakage'],
result['weight'],
result['weight_metric_tonnes'],
result['donor'],
result['similarity_score'],
result['food_nonfood_score'],
result['distance'],
result['ef'],
result['mt_lb_mile'],
result['baseline_emissions'],
result['leakage_emissions'],
result['project_emissions'],
result['total_emissions_reduction'],
))
conn.commit()
except Exception as e:
logging.info(f" - Error storing result to db: {e}")
conn.rollback()
return False
|