File size: 10,392 Bytes
9189e38
 
b72dd6f
9189e38
b59ded9
21fcd81
9189e38
 
21fcd81
9189e38
 
 
9323d71
 
 
 
 
 
 
 
 
 
9189e38
 
 
 
 
 
 
d93f20c
9189e38
 
 
 
 
 
0a288ad
73fda7b
20bd98f
9189e38
 
 
 
22ad617
 
 
 
1c28270
22ad617
 
 
 
 
 
 
 
59e569d
132d401
 
f7839d7
132d401
 
 
 
 
 
 
 
 
 
59e569d
 
 
 
 
 
 
 
9e8c21a
59e569d
 
1ea315f
59e569d
 
 
 
b1f9aab
 
 
 
efa7589
b1f9aab
 
 
 
 
1c28270
b1f9aab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b59ded9
 
 
 
59e569d
f7839d7
b59ded9
 
9189e38
 
b59ded9
 
 
 
9189e38
 
 
 
 
 
 
 
a216741
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b1c94e2
 
 
 
 
 
 
 
9189e38
dbcabfb
9189e38
 
d93f20c
 
9189e38
 
 
 
 
 
 
 
d93f20c
 
9189e38
 
 
1c8b854
9189e38
b1f9aab
 
 
3b8029a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21fcd81
 
05ca0e8
21fcd81
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
import os
import psycopg2
import logging
from dotenv import load_dotenv
from functools import lru_cache
from psycopg2.extras import execute_values


load_dotenv()

def get_connection():
    DATABASE_URL = os.environ['DATABASE_URL']
    print(f"Connecting to database...")
    try:
        conn = psycopg2.connect(DATABASE_URL, sslmode='require')
        initialize_db(conn)  # Ensure this function is defined and correctly initializes the database if needed
        print("Database connection established")
        return conn
    except Exception as e:
        print(f"Failed to connect to database: {e}")
        raise

def initialize_db(conn):
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS mappings (
            input_word TEXT PRIMARY KEY,
            cleaned_word TEXT,
            dictionary_word TEXT,
            specificity TEXT,
            similarity_score REAL,
            confidence_score REAL,
            similar_words TEXT,
            is_food BOOLEAN,
            food_nonfood_score REAL,
            reviewed BOOLEAN DEFAULT FALSE,
            gpt_reviewed BOOLEAN DEFAULT FALSE,
            flagged BOOLEAN DEFAULT FALSE,
            ignore BOOLEAN DEFAULT FALSE,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS dictionary (
            fdc_id INTEGER PRIMARY KEY,
            description TEXT,
            sr_legacy_food_category TEXT,
            wweia_category TEXT,
            water_content REAL,
            dry_matter_content REAL,
            leakage REAL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS organizations (
            id SERIAL PRIMARY KEY,
            name TEXT,
            alt_spellings TEXT[],
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS run_meta (
            run_key TEXT PRIMARY KEY,
            organization_id INTEGER,
            year TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS rollups (
            run_key TEXT,
            year TEXT,
            donations_double_counting_correction REAL,
            total_emissions_reduction_pre REAL,
            total_emissions_reduction REAL,
            total_weight_metric_tonnes REAL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS results (
            id BIGSERIAL PRIMARY KEY,
            run_key TEXT,
            run_row INTEGER,
            date TEXT,
            input_word TEXT,
            dictionary_word TEXT,
            is_food BOOLEAN,
            wweia_category TEXT,
            sr_legacy_food_category TEXT,
            dry_matter_content REAL,
            leakage REAL,
            weight REAL,
            weight_metric_tonnes REAL,
            donor TEXT,
            similarity_score REAL,
            food_nonfood_score REAL,
            distance REAL,
            ef REAL,
            mt_lb_mile REAL,
            baseline_emissions REAL,
            leakage_emissions REAL,
            project_emissions REAL,
            total_emissions_reduction REAL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute('''
        CREATE INDEX IF NOT EXISTS idx_cleaned_word ON mappings(cleaned_word);
        CREATE INDEX IF NOT EXISTS idx_dictionary_word ON mappings(dictionary_word);
        CREATE INDEX IF NOT EXISTS idx_description ON dictionary(description);
        CREATE UNIQUE INDEX IF NOT EXISTS run_row_run_key_uniq ON results(run_key text_ops,run_row int4_ops);
        CREATE UNIQUE INDEX IF NOT EXISTS rollups_run_key_key ON rollups(run_key text_ops);
    ''')

    conn.commit()

@lru_cache(maxsize=1024)
def cached_get_mapping_from_db(db_cursor, word):
    return get_mapping_from_db(db_cursor, word)

def get_mapping_from_db(cursor, cleaned_word):
    cursor.execute('SELECT * FROM mappings WHERE cleaned_word = %s', (cleaned_word,))
    row = cursor.fetchone()
    if row:
        columns = [col[0] for col in cursor.description]
        return dict(zip(columns, row))
    return None

def get_batch_mapping_from_db(cursor, cleaned_words):
    if not cleaned_words:
        return {}
    
    # Create a query with a list of placeholders
    placeholders = ', '.join(['%s'] * len(cleaned_words))
    query = f'SELECT * FROM mappings WHERE cleaned_word IN ({placeholders})'
    
    cursor.execute(query, tuple(cleaned_words))
    rows = cursor.fetchall()
    
    if rows:
        columns = [col[0] for col in cursor.description]
        return {row[columns.index('cleaned_word')]: dict(zip(columns, row)) for row in rows}
    
    return {}

def get_dictionary_data_from_db(cursor, dictionary_word):
    cursor.execute('SELECT * FROM dictionary WHERE description = %s', (dictionary_word,))
    row = cursor.fetchone()
    if row:
        columns = [col[0] for col in cursor.description]
        return dict(zip(columns, row))
    return None

def store_mapping_to_db(cursor, conn, mapping):
    logging.info(f" - Storing new mapping to db: {mapping}")
    try:
        cursor.execute('''
            INSERT INTO mappings (input_word, cleaned_word, dictionary_word, similarity_score, confidence_score, similar_words, is_food, food_nonfood_score, specificity)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
        ''', (
            mapping['input_word'], 
            mapping['cleaned_word'], 
            mapping['dictionary_word'],
            mapping['similarity_score'], 
            mapping['confidence_score'], 
            mapping['similar_words'], 
            mapping['is_food'], 
            mapping['food_nonfood_score'],
            mapping['specificity']
        ))
        conn.commit()
    except Exception as e:
        logging.warn(f" - Error storing mapping to db: {e}")
        conn.rollback()
        return False

def store_result_to_db(cursor, conn, run_key, result):
    values = (
        run_key,
        result['run_row'],
        result['date'], 
        result['input_word'], 
        result['dictionary_word'], 
        result['is_food'], 
        result['sr_legacy_food_category'], 
        result['wweia_category'], 
        result['dry_matter_content'],
        result['leakage'],
        result['weight'],
        result['weight_metric_tonnes'],
        result['donor'],
        result['similarity_score'], 
        result['food_nonfood_score'],
        result['distance'],
        result['ef'], 
        result['mt_lb_mile'], 
        result['baseline_emissions'], 
        result['leakage_emissions'],
        result['project_emissions'],  
        result['total_emissions_reduction']
    )
    cursor.execute('''
        INSERT INTO results (run_key, run_row, date, input_word, dictionary_word, is_food, sr_legacy_food_category, wweia_category, dry_matter_content, leakage, weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, project_emissions, total_emissions_reduction)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ''', values)
    
    conn.commit()
    return True


def store_batch_results_to_db(conn, cursor, run_key, results):
    values = [
        (
            run_key,
            result['run_row'],
            result['date'], 
            result['input_word'], 
            result['dictionary_word'], 
            result['is_food'], 
            result['sr_legacy_food_category'], 
            result['wweia_category'], 
            result['dry_matter_content'],
            result['leakage'],
            result['weight'],
            result['weight_metric_tonnes'],
            result['donor'],
            result['similarity_score'], 
            result['food_nonfood_score'],
            result['distance'],
            result['ef'], 
            result['mt_lb_mile'], 
            result['baseline_emissions'], 
            result['leakage_emissions'],
            result['project_emissions'],  
            result['total_emissions_reduction']
        )
        for result in results
    ]
    
    insert_query = '''
        INSERT INTO results (
            run_key, run_row, date, input_word, dictionary_word, is_food, 
            sr_legacy_food_category, wweia_category, dry_matter_content, leakage, 
            weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, 
            distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, 
            project_emissions, total_emissions_reduction
        ) VALUES %s
        ON CONFLICT (run_key, run_row) 
        DO UPDATE SET 
            date = EXCLUDED.date,
            input_word = EXCLUDED.input_word,
            dictionary_word = EXCLUDED.dictionary_word,
            is_food = EXCLUDED.is_food,
            sr_legacy_food_category = EXCLUDED.sr_legacy_food_category,
            wweia_category = EXCLUDED.wweia_category,
            dry_matter_content = EXCLUDED.dry_matter_content,
            leakage = EXCLUDED.leakage,
            weight = EXCLUDED.weight,
            weight_metric_tonnes = EXCLUDED.weight_metric_tonnes,
            donor = EXCLUDED.donor,
            similarity_score = EXCLUDED.similarity_score,
            food_nonfood_score = EXCLUDED.food_nonfood_score,
            distance = EXCLUDED.distance,
            ef = EXCLUDED.ef,
            mt_lb_mile = EXCLUDED.mt_lb_mile,
            baseline_emissions = EXCLUDED.baseline_emissions,
            leakage_emissions = EXCLUDED.leakage_emissions,
            project_emissions = EXCLUDED.project_emissions,
            total_emissions_reduction = EXCLUDED.total_emissions_reduction;
    '''
    
    execute_values(cursor, insert_query, values)
    conn.commit()
    return True