# tasks.py import os import logging import time import pandas as pd from algo import Algo from dotenv import load_dotenv from redis import Redis from rq import Queue # from celery import Celery from db.db_utils import get_connection, store_result_to_db load_dotenv() # app = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL) # app.conf.update( # result_expires=3600, # task_serializer='json', # result_serializer='json', # accept_content=['json'], # timezone='UTC', # enable_utc=True, # broker_connection_retry_on_startup=True # ) # @app.task # def insert_result(db_conn, run_key, mappings): # db_cursor = db_conn.cursor() # for mapping in mappings: # store_result_to_db(db_cursor, db_conn, run_key, mapping) # @app.task def process_file(raw_file_name): print(f"Processing {raw_file_name}") if not raw_file_name.endswith('.csv'): return # chop off the extension for the results run key # result_file_name = raw_file_name.split('.')[0] # run_key = f"{result_file_name}-{int(time.time())}" run_key = raw_file_name.split('.')[0] print(f"Processing {raw_file_name}") db_conn = get_connection() db_cursor = db_conn.cursor() print("obtained db connection") # Check if the file is in the run_meta table db_cursor.execute('SELECT run_key FROM run_meta WHERE run_key = %s', (run_key,)) run_meta_row = db_cursor.fetchone() if not run_meta_row: # prompt the user for the organization_id and year # the user can select from a list of organizations db_cursor.execute('SELECT id, name FROM organizations') organizations = db_cursor.fetchall() for i, org in enumerate(organizations): print(f"{i+1}. {org[1]}") org_choice = int(input("Select an organization: ")) organization_id = organizations[org_choice-1][0] year = int(input("Enter the year: ")) db_cursor.execute('INSERT INTO run_meta (run_key, organization_id, year) VALUES (%s, %s, %s)', (run_key, organization_id, year)) db_conn.commit() # find the number of rows that were already processed associated with this run key db_cursor.execute('SELECT run_row FROM results WHERE run_key = %s ORDER BY run_row DESC', (run_key,)) # get the last row that was processed last_row = db_cursor.fetchone() input_file_path = f'./raw/{raw_file_name}' df_input = pd.read_csv(input_file_path) # Convert column headers to lowercase df_input.columns = df_input.columns.str.lower() descriptions = df_input['description'].astype(str).tolist() descriptions2 = df_input.get('description2', pd.Series([None] * len(df_input))).astype(str).tolist() donors = df_input['donor'].astype(str).tolist() dates = df_input['date'].astype(str).tolist() weights = df_input['weight'].astype(str).tolist() input_data = [(desc, desc2, i + 2, donor, date, weight) for i, (desc, desc2, donor, date, weight) in enumerate(zip(descriptions, descriptions2, donors, dates, weights))] # run_row is the the last row from the CSV file that was processed, so let's offset from there num_rows = len(input_data) last_row_num = 0 if last_row: last_row_num = last_row[0] - 1 # num_rows is the total number of rows in the CSV file but we add one row for the header print(f"CSV has {num_rows+1} rows") csv_complete = last_row_num >= num_rows print("CSV is complete" if csv_complete else "CSV is not complete") if not csv_complete: print(f"Starting at row #{last_row_num + 1}") input_data = input_data[last_row_num:] algo = Algo(db_conn, run_key) algo.match_words(input_data)