Spaces:
Paused
Paused
File size: 7,333 Bytes
9189e38 b72dd6f 9189e38 b59ded9 9189e38 73fda7b 20bd98f 9189e38 22ad617 1c28270 22ad617 59e569d 132d401 f7839d7 132d401 59e569d 9e8c21a 59e569d 1ea315f 59e569d b1f9aab efa7589 b1f9aab 1c28270 b1f9aab b59ded9 59e569d f7839d7 b59ded9 9189e38 b59ded9 9189e38 a216741 b1c94e2 9189e38 dbcabfb 9189e38 e5de092 1c8b854 9189e38 1c8b854 9189e38 b1f9aab 3b8029a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
import os
import psycopg2
import logging
from dotenv import load_dotenv
from functools import lru_cache
load_dotenv()
def get_connection():
DATABASE_URL = os.environ['DATABASE_URL']
conn = psycopg2.connect(DATABASE_URL, sslmode='require')
initialize_db(conn)
return conn
def initialize_db(conn):
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS mappings (
input_word TEXT PRIMARY KEY,
cleaned_word TEXT,
dictionary_word TEXT,
similarity_score REAL,
confidence_score REAL,
similar_words TEXT,
is_food BOOLEAN,
food_nonfood_score REAL,
reviewed BOOLEAN DEFAULT FALSE,
flagged BOOLEAN DEFAULT FALSE,
ignore BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS dictionary (
fdc_id INTEGER PRIMARY KEY,
description TEXT,
sr_legacy_food_category TEXT,
wweia_category TEXT,
water_content REAL,
dry_matter_content REAL,
leakage REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS organizations (
id SERIAL PRIMARY KEY,
name TEXT,
alt_spellings TEXT[],
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS run_meta (
run_key TEXT PRIMARY KEY,
organization_id INTEGER,
year TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS rollups (
run_key TEXT,
year TEXT,
donations_double_counting_correction REAL,
total_emissions_reduction_pre REAL,
total_emissions_reduction REAL,
total_weight_metric_tonnes REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS results (
id BIGSERIAL PRIMARY KEY,
run_key TEXT,
run_row INTEGER,
date TEXT,
input_word TEXT,
dictionary_word TEXT,
is_food BOOLEAN,
wweia_category TEXT,
sr_legacy_food_category TEXT,
dry_matter_content REAL,
leakage REAL,
weight REAL,
weight_metric_tonnes REAL,
donor TEXT,
similarity_score REAL,
food_nonfood_score REAL,
distance REAL,
ef REAL,
mt_lb_mile REAL,
baseline_emissions REAL,
leakage_emissions REAL,
project_emissions REAL,
total_emissions_reduction REAL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_cleaned_word ON mappings(cleaned_word);
CREATE INDEX IF NOT EXISTS idx_dictionary_word ON mappings(dictionary_word);
CREATE INDEX IF NOT EXISTS idx_description ON dictionary(description);
CREATE UNIQUE INDEX IF NOT EXISTS run_row_run_key_uniq ON results(run_key text_ops,run_row int4_ops);
CREATE UNIQUE INDEX IF NOT EXISTS rollups_run_key_key ON rollups(run_key text_ops);
''')
conn.commit()
@lru_cache(maxsize=1024)
def cached_get_mapping_from_db(db_cursor, word):
return get_mapping_from_db(db_cursor, word)
def get_mapping_from_db(cursor, cleaned_word):
cursor.execute('SELECT * FROM mappings WHERE cleaned_word = %s', (cleaned_word,))
row = cursor.fetchone()
if row:
columns = [col[0] for col in cursor.description]
return dict(zip(columns, row))
return None
def get_batch_mapping_from_db(cursor, cleaned_words):
if not cleaned_words:
return {}
# Create a query with a list of placeholders
placeholders = ', '.join(['%s'] * len(cleaned_words))
query = f'SELECT * FROM mappings WHERE cleaned_word IN ({placeholders})'
cursor.execute(query, tuple(cleaned_words))
rows = cursor.fetchall()
if rows:
columns = [col[0] for col in cursor.description]
return {row[columns.index('cleaned_word')]: dict(zip(columns, row)) for row in rows}
return {}
def get_dictionary_data_from_db(cursor, dictionary_word):
cursor.execute('SELECT * FROM dictionary WHERE description = %s', (dictionary_word,))
row = cursor.fetchone()
if row:
columns = [col[0] for col in cursor.description]
return dict(zip(columns, row))
return None
def store_mapping_to_db(cursor, conn, mapping):
logging.info(f" - Storing new mapping to db: {mapping}")
try:
cursor.execute('''
INSERT INTO mappings (input_word, cleaned_word, dictionary_word, similarity_score, confidence_score, similar_words, is_food, food_nonfood_score)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
''', (
mapping['input_word'],
mapping['cleaned_word'],
mapping['dictionary_word'],
mapping['similarity_score'],
mapping['confidence_score'],
mapping['similar_words'],
mapping['is_food'],
mapping['food_nonfood_score']
))
conn.commit()
except Exception as e:
logging.warn(f" - Error storing mapping to db: {e}")
conn.rollback()
return False
def store_result_to_db(cursor, conn, run_key, result):
values = (
run_key,
result['run_row'],
result['date'],
result['input_word'],
result['dictionary_word'],
result['is_food'],
result['sr_legacy_food_category'],
result['wweia_category'],
result['dry_matter_content'],
result['leakage'],
result['weight'],
result['weight_metric_tonnes'],
result['donor'],
result['similarity_score'],
result['food_nonfood_score'],
result['distance'],
result['ef'],
result['mt_lb_mile'],
result['baseline_emissions'],
result['leakage_emissions'],
result['project_emissions'],
result['total_emissions_reduction']
)
cursor.execute('''
INSERT INTO results (run_key, run_row, date, input_word, dictionary_word, is_food, sr_legacy_food_category, wweia_category, dry_matter_content, leakage, weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, project_emissions, total_emissions_reduction)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
''', values)
conn.commit()
return True
|