File size: 3,744 Bytes
b9a27ee
773fa99
b9a27ee
9323d71
 
 
773fa99
9323d71
 
 
b9a27ee
 
773fa99
b9a27ee
e938c9d
b9a27ee
e938c9d
 
 
 
 
 
 
 
 
b9a27ee
e938c9d
9323d71
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22bb999
9323d71
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# tasks.py
import os
import logging
import time
import pandas as pd
from algo import Algo
from dotenv import load_dotenv
from redis import Redis
from rq import Queue
# from celery import Celery
from db.db_utils import get_connection, store_result_to_db

load_dotenv()

# app = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)

# app.conf.update(
#     result_expires=3600,
#     task_serializer='json',
#     result_serializer='json',
#     accept_content=['json'],
#     timezone='UTC',
#     enable_utc=True,
#     broker_connection_retry_on_startup=True
# )

# @app.task
# def insert_result(db_conn, run_key, mappings):
#     db_cursor = db_conn.cursor()
#     for mapping in mappings:
#         store_result_to_db(db_cursor, db_conn, run_key, mapping)

# @app.task
def process_file(raw_file_name):
    print(f"Processing {raw_file_name}")
    if not raw_file_name.endswith('.csv'):
        return

    # chop off the extension for the results run key
    # result_file_name = raw_file_name.split('.')[0]
    # run_key = f"{result_file_name}-{int(time.time())}"
    run_key = raw_file_name.split('.')[0]
    print(f"Processing {raw_file_name}")
    
    db_conn = get_connection()
    db_cursor = db_conn.cursor()
    
    print("obtained db connection")

    # Check if the file is in the run_meta table
    db_cursor.execute('SELECT run_key FROM run_meta WHERE run_key = %s', (run_key,))
    run_meta_row = db_cursor.fetchone()
    if not run_meta_row:
        # prompt the user for the organization_id and year
        # the user can select from a list of organizations
        db_cursor.execute('SELECT id, name FROM organizations')
        organizations = db_cursor.fetchall()
        for i, org in enumerate(organizations):
            print(f"{i+1}. {org[1]}")
        org_choice = int(input("Select an organization: "))
        
        organization_id = organizations[org_choice-1][0]

        year = int(input("Enter the year: "))

        db_cursor.execute('INSERT INTO run_meta (run_key, organization_id, year) VALUES (%s, %s, %s)', (run_key, organization_id, year))
        db_conn.commit()

    # find the number of rows that were already processed associated with this run key
    db_cursor.execute('SELECT run_row FROM results WHERE run_key = %s ORDER BY run_row DESC', (run_key,))
    
    # get the last row that was processed
    last_row = db_cursor.fetchone()
    
    input_file_path = f'./raw/{raw_file_name}'
    df_input = pd.read_csv(input_file_path)

    # Convert column headers to lowercase
    df_input.columns = df_input.columns.str.lower()

    descriptions = df_input['description'].astype(str).tolist()
    descriptions2 = df_input.get('description2', pd.Series([None] * len(df_input))).astype(str).tolist()
    donors = df_input['donor'].astype(str).tolist()
    dates = df_input['date'].astype(str).tolist()
    weights = df_input['weight'].astype(str).tolist()

    input_data = [(desc, desc2, i + 2, donor, date, weight) for i, (desc, desc2, donor, date, weight) in enumerate(zip(descriptions, descriptions2, donors, dates, weights))]
    
    # run_row is the the last row from the CSV file that was processed, so let's offset from there
    num_rows = len(input_data)
    last_row_num = 0
    if last_row:
        last_row_num = last_row[0] - 1

    # num_rows is the total number of rows in the CSV file but we add one row for the header
    print(f"CSV has {num_rows+1} rows")
    csv_complete = last_row_num >= num_rows
    print("CSV is complete" if csv_complete else "CSV is not complete")
    if not csv_complete:
        print(f"Starting at row #{last_row_num + 1}")

        input_data = input_data[last_row_num:]

        algo = Algo(db_conn, run_key)
        algo.match_words(input_data)