brightly-ai / tasks.py
beweinreich's picture
enqueue items if they arent complete
1ab9a2a
raw
history blame
No virus
4.4 kB
# tasks.py
import os
import logging
import time
import pandas as pd
from algo import Algo
from dotenv import load_dotenv
from redis import Redis
from rq import Queue
from db.db_utils import get_connection, store_result_to_db
load_dotenv()
# This didn't work
# def enqueue_unique_job(queue, func, job_id, *args, **kwargs):
# # Check if a job with the same ID already exists
# try:
# existing_job = Job.fetch(job_id, connection=redis_conn)
# if existing_job and not existing_job.is_finished and not existing_job.is_failed:
# print(f"Job with ID {job_id} already exists and is in progress.")
# return None
# except Job.DoesNotExist:
# # Job does not exist, so it's safe to enqueue a new one
# pass
# # Enqueue the job with a unique ID and additional kwargs
# job = queue.enqueue(func, job_id=job_id, *args, **kwargs)
# return job
def file_is_complete(raw_file_name, run_key, db_cursor):
if not raw_file_name.endswith('.csv'):
return True, 0
# find the number of rows that were already processed associated with this run key
db_cursor.execute('SELECT run_row FROM results WHERE run_key = %s ORDER BY run_row DESC', (run_key,))
# get the last row that was processed
last_row = db_cursor.fetchone()
input_file_path = f'./raw/{raw_file_name}'
df_input = pd.read_csv(input_file_path)
# Convert column headers to lowercase
df_input.columns = df_input.columns.str.lower()
descriptions = df_input['description'].astype(str).tolist()
# run_row is the the last row from the CSV file that was processed, so let's offset from there
num_rows = len(descriptions)
last_row_num = 0
if last_row:
last_row_num = last_row[0] - 1
# num_rows is the total number of rows in the CSV file but we add one row for the header
print(f"CSV has {num_rows+1} rows")
csv_complete = last_row_num >= num_rows
print("CSV is complete" if csv_complete else "CSV is not complete")
return csv_complete, last_row_num
def process_file(raw_file_name):
print(f"Processing {raw_file_name}")
if not raw_file_name.endswith('.csv'):
return
# chop off the extension for the results run key
# result_file_name = raw_file_name.split('.')[0]
# run_key = f"{result_file_name}-{int(time.time())}"
run_key = raw_file_name.split('.')[0]
print(f"Processing {raw_file_name}")
db_conn = get_connection()
db_cursor = db_conn.cursor()
print("obtained db connection")
# Check if the file is in the run_meta table
db_cursor.execute('SELECT run_key FROM run_meta WHERE run_key = %s', (run_key,))
run_meta_row = db_cursor.fetchone()
if not run_meta_row:
# prompt the user for the organization_id and year
# the user can select from a list of organizations
db_cursor.execute('SELECT id, name FROM organizations')
organizations = db_cursor.fetchall()
for i, org in enumerate(organizations):
print(f"{i+1}. {org[1]}")
org_choice = int(input("Select an organization: "))
organization_id = organizations[org_choice-1][0]
year = int(input("Enter the year: "))
db_cursor.execute('INSERT INTO run_meta (run_key, organization_id, year) VALUES (%s, %s, %s)', (run_key, organization_id, year))
db_conn.commit()
input_file_path = f'./raw/{raw_file_name}'
df_input = pd.read_csv(input_file_path)
# Convert column headers to lowercase
df_input.columns = df_input.columns.str.lower()
descriptions = df_input['description'].astype(str).tolist()
descriptions2 = df_input.get('description2', pd.Series([None] * len(df_input))).astype(str).tolist()
donors = df_input['donor'].astype(str).tolist()
dates = df_input['date'].astype(str).tolist()
weights = df_input['weight'].astype(str).tolist()
input_data = [(desc, desc2, i + 2, donor, date, weight) for i, (desc, desc2, donor, date, weight) in enumerate(zip(descriptions, descriptions2, donors, dates, weights))]
csv_complete, last_row_num = file_is_complete(raw_file_name, run_key, db_cursor)
if not csv_complete:
print(f"Starting at row #{last_row_num + 1}")
input_data = input_data[last_row_num:]
algo = Algo(db_conn, run_key)
algo.match_words(input_data)