Spaces:
Paused
Paused
File size: 8,876 Bytes
9189e38 b72dd6f 9189e38 b59ded9 9189e38 cc37897 9189e38 73fda7b a04fda8 9189e38 22ad617 b1f9aab efa7589 b1f9aab b59ded9 9189e38 b59ded9 9189e38 a216741 b1c94e2 9189e38 dbcabfb 9189e38 b72dd6f 9189e38 b1f9aab efa7589 b1f9aab efa7589 b1f9aab b72dd6f b1f9aab efa7589 f2740a4 a4b0df8 49b296c a4b0df8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 |
import os
import psycopg2
import logging
from dotenv import load_dotenv
from functools import lru_cache
load_dotenv()
def get_connection():
DATABASE_URL = os.environ['DATABASE_URL']
conn = psycopg2.connect(DATABASE_URL, sslmode='require')
# conn = psycopg2.connect(
# dbname="brightly_ai",
# user="bw",
# password="",
# host="localhost",
# port="5432"
# )
initialize_db(conn)
return conn
def initialize_db(conn):
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS mappings (
input_word TEXT PRIMARY KEY,
cleaned_word TEXT,
matching_word TEXT,
dictionary_word TEXT,
similarity_score REAL,
confidence_score REAL,
similar_words TEXT,
is_food BOOLEAN,
food_nonfood_score REAL,
reviewed BOOLEAN DEFAULT FALSE,
flagged BOOLEAN DEFAULT FALSE,
ignored BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS dictionary (
fdc_id INTEGER PRIMARY KEY,
description TEXT,
food_category TEXT,
wweia_category TEXT,
water_content REAL,
dry_matter_content REAL,
leakage REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS results (
id BIGSERIAL PRIMARY KEY,
run_key TEXT,
run_row INTEGER,
date TEXT,
input_word TEXT,
dictionary_word TEXT,
is_food BOOLEAN,
wweia_category TEXT,
dry_matter_content REAL,
leakage REAL,
weight REAL,
weight_metric_tonnes REAL,
donor TEXT,
similarity_score REAL,
food_nonfood_score REAL,
distance REAL,
ef REAL,
mt_lb_mile REAL,
baseline_emissions REAL,
leakage_emissions REAL,
project_emissions REAL,
total_emissions_reduction REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_cleaned_word ON mappings(cleaned_word);
CREATE INDEX IF NOT EXISTS idx_dictionary_word ON mappings(dictionary_word);
CREATE INDEX IF NOT EXISTS idx_description ON dictionary(description);
''')
conn.commit()
@lru_cache(maxsize=1024)
def cached_get_mapping_from_db(db_cursor, word):
return get_mapping_from_db(db_cursor, word)
def get_mapping_from_db(cursor, cleaned_word):
cursor.execute('SELECT * FROM mappings WHERE cleaned_word = %s', (cleaned_word,))
row = cursor.fetchone()
if row:
columns = [col[0] for col in cursor.description]
return dict(zip(columns, row))
return None
def get_batch_mapping_from_db(cursor, cleaned_words):
if not cleaned_words:
return {}
# Create a query with a list of placeholders
placeholders = ', '.join(['%s'] * len(cleaned_words))
query = f'SELECT * FROM mappings WHERE cleaned_word IN ({placeholders})'
cursor.execute(query, tuple(cleaned_words))
rows = cursor.fetchall()
if rows:
columns = [col[0] for col in cursor.description]
return {row[columns.index('cleaned_word')]: dict(zip(columns, row)) for row in rows}
return {}
def get_dictionary_data_from_db(cursor, dictionary_word):
cursor.execute('SELECT * FROM dictionary WHERE description = %s', (dictionary_word,))
row = cursor.fetchone()
if row:
columns = [col[0] for col in cursor.description]
return dict(zip(columns, row))
return None
def store_mapping_to_db(cursor, conn, mapping):
logging.info(f" - Storing new mapping to db: {mapping}")
try:
cursor.execute('''
INSERT INTO mappings (input_word, cleaned_word, matching_word, dictionary_word, similarity_score, confidence_score, similar_words, is_food, food_nonfood_score)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
''', (
mapping['input_word'],
mapping['cleaned_word'],
mapping['matching_word'],
mapping['dictionary_word'],
mapping['similarity_score'],
mapping['confidence_score'],
mapping['similar_words'],
mapping['is_food'],
mapping['food_nonfood_score']
))
conn.commit()
except Exception as e:
logging.info(f" - Error storing mapping to db: {e}")
conn.rollback()
return False
def store_result_to_db(cursor, conn, run_key, result):
try:
cursor.execute('''
INSERT INTO results (run_key, run_row, date, input_word, dictionary_word, is_food, wweia_category, dry_matter_content, leakage, weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, project_emissions, total_emissions_reduction)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
''', (
run_key,
result['run_row'],
result['date'],
result['input_word'],
result['dictionary_word'],
result['is_food'],
result['wweia_category'],
result['dry_matter_content'],
result['leakage'],
result['weight'],
result['weight_metric_tonnes'],
result['donor'],
result['similarity_score'],
result['food_nonfood_score'],
result['distance'],
result['ef'],
result['mt_lb_mile'],
result['baseline_emissions'],
result['leakage_emissions'],
result['project_emissions'],
result['total_emissions_reduction'],
))
conn.commit()
except Exception as e:
logging.info(f" - Error storing result to db: {e}")
conn.rollback()
return False
def store_results_batch(cursor, conn, run_key, results_batch):
logging.info(f"Storing batch of length {len(results_batch)}")
cursor.executemany('''
INSERT INTO results (
run_key, run_row, date, input_word, dictionary_word, is_food, wweia_category, dry_matter_content, leakage, weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, project_emissions, total_emissions_reduction
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (run_key, run_row)
DO UPDATE SET
date = EXCLUDED.date,
input_word = EXCLUDED.input_word,
dictionary_word = EXCLUDED.dictionary_word,
is_food = EXCLUDED.is_food,
wweia_category = EXCLUDED.wweia_category,
dry_matter_content = EXCLUDED.dry_matter_content,
leakage = EXCLUDED.leakage,
weight = EXCLUDED.weight,
weight_metric_tonnes = EXCLUDED.weight_metric_tonnes,
donor = EXCLUDED.donor,
similarity_score = EXCLUDED.similarity_score,
food_nonfood_score = EXCLUDED.food_nonfood_score,
distance = EXCLUDED.distance,
ef = EXCLUDED.ef,
mt_lb_mile = EXCLUDED.mt_lb_mile,
baseline_emissions = EXCLUDED.baseline_emissions,
leakage_emissions = EXCLUDED.leakage_emissions,
project_emissions = EXCLUDED.project_emissions,
total_emissions_reduction = EXCLUDED.total_emissions_reduction
''', [
(
run_key,
result['run_row'],
result['date'],
result['input_word'],
result['dictionary_word'],
result['is_food'],
result['wweia_category'],
result['dry_matter_content'],
result['leakage'],
result['weight'],
result['weight_metric_tonnes'],
result['donor'],
result['similarity_score'],
result['food_nonfood_score'],
result['distance'],
result['ef'],
result['mt_lb_mile'],
result['baseline_emissions'],
result['leakage_emissions'],
result['project_emissions'],
result['total_emissions_reduction']
) for result in results_batch
])
conn.commit()
|