File size: 7,333 Bytes
9189e38
 
b72dd6f
9189e38
b59ded9
9189e38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73fda7b
20bd98f
9189e38
 
 
 
22ad617
 
 
 
1c28270
22ad617
 
 
 
 
 
 
 
59e569d
132d401
 
f7839d7
132d401
 
 
 
 
 
 
 
 
 
59e569d
 
 
 
 
 
 
 
9e8c21a
59e569d
 
1ea315f
59e569d
 
 
 
b1f9aab
 
 
 
efa7589
b1f9aab
 
 
 
 
1c28270
b1f9aab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b59ded9
 
 
 
59e569d
f7839d7
b59ded9
 
9189e38
 
b59ded9
 
 
 
9189e38
 
 
 
 
 
 
 
a216741
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b1c94e2
 
 
 
 
 
 
 
9189e38
dbcabfb
9189e38
 
e5de092
1c8b854
9189e38
 
 
 
 
 
 
 
 
 
 
 
1c8b854
9189e38
b1f9aab
 
 
3b8029a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import os
import psycopg2
import logging
from dotenv import load_dotenv
from functools import lru_cache

load_dotenv()


def get_connection():
    DATABASE_URL = os.environ['DATABASE_URL']
    conn = psycopg2.connect(DATABASE_URL, sslmode='require')
    initialize_db(conn)
    return conn

def initialize_db(conn):
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS mappings (
            input_word TEXT PRIMARY KEY,
            cleaned_word TEXT,
            dictionary_word TEXT,
            similarity_score REAL,
            confidence_score REAL,
            similar_words TEXT,
            is_food BOOLEAN,
            food_nonfood_score REAL,
            reviewed BOOLEAN DEFAULT FALSE,
            flagged BOOLEAN DEFAULT FALSE,
            ignore BOOLEAN DEFAULT FALSE,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS dictionary (
            fdc_id INTEGER PRIMARY KEY,
            description TEXT,
            sr_legacy_food_category TEXT,
            wweia_category TEXT,
            water_content REAL,
            dry_matter_content REAL,
            leakage REAL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS organizations (
            id SERIAL PRIMARY KEY,
            name TEXT,
            alt_spellings TEXT[],
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS run_meta (
            run_key TEXT PRIMARY KEY,
            organization_id INTEGER,
            year TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS rollups (
            run_key TEXT,
            year TEXT,
            donations_double_counting_correction REAL,
            total_emissions_reduction_pre REAL,
            total_emissions_reduction REAL,
            total_weight_metric_tonnes REAL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS results (
            id BIGSERIAL PRIMARY KEY,
            run_key TEXT,
            run_row INTEGER,
            date TEXT,
            input_word TEXT,
            dictionary_word TEXT,
            is_food BOOLEAN,
            wweia_category TEXT,
            sr_legacy_food_category TEXT,
            dry_matter_content REAL,
            leakage REAL,
            weight REAL,
            weight_metric_tonnes REAL,
            donor TEXT,
            similarity_score REAL,
            food_nonfood_score REAL,
            distance REAL,
            ef REAL,
            mt_lb_mile REAL,
            baseline_emissions REAL,
            leakage_emissions REAL,
            project_emissions REAL,
            total_emissions_reduction REAL,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    cursor.execute('''
        CREATE INDEX IF NOT EXISTS idx_cleaned_word ON mappings(cleaned_word);
        CREATE INDEX IF NOT EXISTS idx_dictionary_word ON mappings(dictionary_word);
        CREATE INDEX IF NOT EXISTS idx_description ON dictionary(description);
        CREATE UNIQUE INDEX IF NOT EXISTS run_row_run_key_uniq ON results(run_key text_ops,run_row int4_ops);
        CREATE UNIQUE INDEX IF NOT EXISTS rollups_run_key_key ON rollups(run_key text_ops);
    ''')

    conn.commit()

@lru_cache(maxsize=1024)
def cached_get_mapping_from_db(db_cursor, word):
    return get_mapping_from_db(db_cursor, word)

def get_mapping_from_db(cursor, cleaned_word):
    cursor.execute('SELECT * FROM mappings WHERE cleaned_word = %s', (cleaned_word,))
    row = cursor.fetchone()
    if row:
        columns = [col[0] for col in cursor.description]
        return dict(zip(columns, row))
    return None

def get_batch_mapping_from_db(cursor, cleaned_words):
    if not cleaned_words:
        return {}
    
    # Create a query with a list of placeholders
    placeholders = ', '.join(['%s'] * len(cleaned_words))
    query = f'SELECT * FROM mappings WHERE cleaned_word IN ({placeholders})'
    
    cursor.execute(query, tuple(cleaned_words))
    rows = cursor.fetchall()
    
    if rows:
        columns = [col[0] for col in cursor.description]
        return {row[columns.index('cleaned_word')]: dict(zip(columns, row)) for row in rows}
    
    return {}

def get_dictionary_data_from_db(cursor, dictionary_word):
    cursor.execute('SELECT * FROM dictionary WHERE description = %s', (dictionary_word,))
    row = cursor.fetchone()
    if row:
        columns = [col[0] for col in cursor.description]
        return dict(zip(columns, row))
    return None

def store_mapping_to_db(cursor, conn, mapping):
    logging.info(f" - Storing new mapping to db: {mapping}")
    try:
        cursor.execute('''
            INSERT INTO mappings (input_word, cleaned_word, dictionary_word, similarity_score, confidence_score, similar_words, is_food, food_nonfood_score)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        ''', (
            mapping['input_word'], 
            mapping['cleaned_word'], 
            mapping['dictionary_word'],
            mapping['similarity_score'], 
            mapping['confidence_score'], 
            mapping['similar_words'], 
            mapping['is_food'], 
            mapping['food_nonfood_score']
        ))
        conn.commit()
    except Exception as e:
        logging.warn(f" - Error storing mapping to db: {e}")
        conn.rollback()
        return False

def store_result_to_db(cursor, conn, run_key, result):
    values = (
        run_key,
        result['run_row'],
        result['date'], 
        result['input_word'], 
        result['dictionary_word'], 
        result['is_food'], 
        result['sr_legacy_food_category'], 
        result['wweia_category'], 
        result['dry_matter_content'],
        result['leakage'],
        result['weight'],
        result['weight_metric_tonnes'],
        result['donor'],
        result['similarity_score'], 
        result['food_nonfood_score'],
        result['distance'],
        result['ef'], 
        result['mt_lb_mile'], 
        result['baseline_emissions'], 
        result['leakage_emissions'],
        result['project_emissions'],  
        result['total_emissions_reduction']
    )
    cursor.execute('''
        INSERT INTO results (run_key, run_row, date, input_word, dictionary_word, is_food, sr_legacy_food_category, wweia_category, dry_matter_content, leakage, weight, weight_metric_tonnes, donor, similarity_score, food_nonfood_score, distance, ef, mt_lb_mile, baseline_emissions, leakage_emissions, project_emissions, total_emissions_reduction)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    ''', values)
    
    conn.commit()
    return True