Spaces:
Paused
Paused
# tasks.py | |
import os | |
import logging | |
import time | |
import pandas as pd | |
from algo import Algo | |
from dotenv import load_dotenv | |
from redis import Redis | |
from rq import Queue | |
# from celery import Celery | |
from db.db_utils import get_connection, store_result_to_db | |
load_dotenv() | |
# app = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL) | |
# app.conf.update( | |
# result_expires=3600, | |
# task_serializer='json', | |
# result_serializer='json', | |
# accept_content=['json'], | |
# timezone='UTC', | |
# enable_utc=True, | |
# broker_connection_retry_on_startup=True | |
# ) | |
# @app.task | |
# def insert_result(db_conn, run_key, mappings): | |
# db_cursor = db_conn.cursor() | |
# for mapping in mappings: | |
# store_result_to_db(db_cursor, db_conn, run_key, mapping) | |
# @app.task | |
def process_file(raw_file_name): | |
print(f"Processing {raw_file_name}") | |
if not raw_file_name.endswith('.csv'): | |
return | |
# chop off the extension for the results run key | |
# result_file_name = raw_file_name.split('.')[0] | |
# run_key = f"{result_file_name}-{int(time.time())}" | |
run_key = raw_file_name.split('.')[0] | |
print(f"Processing {raw_file_name}") | |
db_conn = get_connection() | |
db_cursor = db_conn.cursor() | |
print("obtained db connection") | |
# Check if the file is in the run_meta table | |
db_cursor.execute('SELECT run_key FROM run_meta WHERE run_key = %s', (run_key,)) | |
run_meta_row = db_cursor.fetchone() | |
if not run_meta_row: | |
# prompt the user for the organization_id and year | |
# the user can select from a list of organizations | |
db_cursor.execute('SELECT id, name FROM organizations') | |
organizations = db_cursor.fetchall() | |
for i, org in enumerate(organizations): | |
print(f"{i+1}. {org[1]}") | |
org_choice = int(input("Select an organization: ")) | |
organization_id = organizations[org_choice-1][0] | |
year = int(input("Enter the year: ")) | |
db_cursor.execute('INSERT INTO run_meta (run_key, organization_id, year) VALUES (%s, %s, %s)', (run_key, organization_id, year)) | |
db_conn.commit() | |
# find the number of rows that were already processed associated with this run key | |
db_cursor.execute('SELECT run_row FROM results WHERE run_key = %s ORDER BY run_row DESC', (run_key,)) | |
# get the last row that was processed | |
last_row = db_cursor.fetchone() | |
input_file_path = f'./raw/{raw_file_name}' | |
df_input = pd.read_csv(input_file_path) | |
# Convert column headers to lowercase | |
df_input.columns = df_input.columns.str.lower() | |
descriptions = df_input['description'].astype(str).tolist() | |
descriptions2 = df_input.get('description2', pd.Series([None] * len(df_input))).astype(str).tolist() | |
donors = df_input['donor'].astype(str).tolist() | |
dates = df_input['date'].astype(str).tolist() | |
weights = df_input['weight'].astype(str).tolist() | |
input_data = [(desc, desc2, i + 2, donor, date, weight) for i, (desc, desc2, donor, date, weight) in enumerate(zip(descriptions, descriptions2, donors, dates, weights))] | |
# run_row is the the last row from the CSV file that was processed, so let's offset from there | |
num_rows = len(input_data) | |
last_row_num = 0 | |
if last_row: | |
last_row_num = last_row[0] - 1 | |
# num_rows is the total number of rows in the CSV file but we add one row for the header | |
print(f"CSV has {num_rows+1} rows") | |
csv_complete = last_row_num >= num_rows | |
print("CSV is complete" if csv_complete else "CSV is not complete") | |
if not csv_complete: | |
print(f"Starting at row #{last_row_num + 1}") | |
input_data = input_data[last_row_num:] | |
algo = Algo(db_conn, run_key) | |
algo.match_words(input_data) |