beweinreich commited on
Commit
9323d71
1 Parent(s): 0e61327

switch to redis queue

Browse files
Files changed (12) hide show
  1. .gitignore +1 -0
  2. README.md +12 -4
  3. algo.py +9 -3
  4. audits/1720098227.csv +1 -0
  5. audits/1720098318.csv +1 -0
  6. db/db_utils.py +10 -3
  7. kill_redis.py +34 -27
  8. redis_queue.py +6 -0
  9. requirements.txt +1 -0
  10. run.py +20 -69
  11. tasks.py +83 -9
  12. tasks.py.orig +0 -44
.gitignore CHANGED
@@ -3,6 +3,7 @@
3
  *.pyc
4
  .env
5
  raw/*
 
6
  results/*
7
  logs/*
8
  specificity-model/*
 
3
  *.pyc
4
  .env
5
  raw/*
6
+ raw copy/*
7
  results/*
8
  logs/*
9
  specificity-model/*
README.md CHANGED
@@ -27,12 +27,20 @@ Additionally, it handles various word forms and multi-term descriptions to maint
27
  ## Running
28
 
29
  ```
30
- # Start celery worker
31
- celery -A tasks worker --loglevel=info
32
  python run.py
33
 
34
- # clear tasks queue
35
- celery -A tasks purge
 
 
 
 
 
 
 
 
36
  ```
37
 
38
  ```
 
27
  ## Running
28
 
29
  ```
30
+ # Start redis queue worker
31
+ rq worker -c redis_queue
32
  python run.py
33
 
34
+ # view rq-dashboard
35
+ pip install rq-dashboard
36
+ rq-dashboard --redis-url REDIS_URL
37
+ ```
38
+
39
+ ## Kill redis connections
40
+
41
+ ```
42
+ heroku redis:cli -a brightly-ai-db
43
+ CLIENT KILL TYPE normal
44
  ```
45
 
46
  ```
algo.py CHANGED
@@ -14,13 +14,19 @@ from db.db_utils import store_mapping_to_db, cached_get_mapping_from_db, get_dic
14
  from ask_gpt import query_gpt
15
  from multi_food_item_detector import extract_items, has_delimiters
16
  from mapping_template import empty_template, heterogeneous_template, multi_item_template, nonfood_template, usda_template
17
- from tasks import insert_result
18
  from specificity_classifier import classify_text_to_specificity
19
 
20
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
21
  similarity_threshold = 0.78
22
 
23
 
 
 
 
 
 
 
24
  class Algo:
25
  def __init__(self, db_conn, run_key=None):
26
  self.db_conn = db_conn
@@ -342,11 +348,11 @@ class Algo:
342
  results.append(mapping)
343
 
344
  if len(result_batch) >= 100:
345
- insert_result(self.run_key, result_batch)
346
  result_batch = []
347
 
348
  if len(result_batch) > 0:
349
- insert_result(self.run_key, result_batch)
350
  result_batch = []
351
 
352
 
 
14
  from ask_gpt import query_gpt
15
  from multi_food_item_detector import extract_items, has_delimiters
16
  from mapping_template import empty_template, heterogeneous_template, multi_item_template, nonfood_template, usda_template
17
+ # from tasks import insert_result
18
  from specificity_classifier import classify_text_to_specificity
19
 
20
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
21
  similarity_threshold = 0.78
22
 
23
 
24
+ def insert_result(db_conn, run_key, mappings):
25
+ db_cursor = db_conn.cursor()
26
+ for mapping in mappings:
27
+ store_result_to_db(db_cursor, db_conn, run_key, mapping)
28
+
29
+
30
  class Algo:
31
  def __init__(self, db_conn, run_key=None):
32
  self.db_conn = db_conn
 
348
  results.append(mapping)
349
 
350
  if len(result_batch) >= 100:
351
+ insert_result(self.db_conn, self.run_key, result_batch)
352
  result_batch = []
353
 
354
  if len(result_batch) > 0:
355
+ insert_result(self.db_conn, self.run_key, result_batch)
356
  result_batch = []
357
 
358
 
audits/1720098227.csv ADDED
@@ -0,0 +1 @@
 
 
1
+ input_word,original_dictionary_word,new_dictionary_word
audits/1720098318.csv ADDED
@@ -0,0 +1 @@
 
 
1
+ input_word,original_dictionary_word,new_dictionary_word
db/db_utils.py CHANGED
@@ -9,9 +9,16 @@ load_dotenv()
9
 
10
  def get_connection():
11
  DATABASE_URL = os.environ['DATABASE_URL']
12
- conn = psycopg2.connect(DATABASE_URL, sslmode='require')
13
- initialize_db(conn)
14
- return conn
 
 
 
 
 
 
 
15
 
16
  def initialize_db(conn):
17
  cursor = conn.cursor()
 
9
 
10
  def get_connection():
11
  DATABASE_URL = os.environ['DATABASE_URL']
12
+ print(f"Connecting to database...")
13
+ try:
14
+ conn = psycopg2.connect(DATABASE_URL, sslmode='require')
15
+ initialize_db(conn) # Ensure this function is defined and correctly initializes the database if needed
16
+ print("Database connection established")
17
+ return conn
18
+ except Exception as e:
19
+ print(f"Failed to connect to database: {e}")
20
+ raise
21
+
22
 
23
  def initialize_db(conn):
24
  cursor = conn.cursor()
kill_redis.py CHANGED
@@ -1,37 +1,44 @@
1
- import os
2
- import subprocess
3
- from dotenv import load_dotenv
4
- from redis import Redis
5
 
6
- load_dotenv()
 
7
 
8
- REDIS_URL = os.environ['REDIS_URL']
9
- redis_client = Redis.from_url(REDIS_URL)
10
 
11
- def kill_all_redis_clients():
12
- try:
13
- # Fetch Redis URL from environment variable
14
- if not REDIS_URL:
15
- raise ValueError("REDIS_URL is not set in the environment variables.")
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
- # Extract host and port from the Redis URL
18
- redis_host = REDIS_URL.split("//")[-1].split("@")[1].split(":")[0]
19
- redis_port = REDIS_URL.split("//")[-1].split("@")[1].split(":")[1]
20
 
21
- # Execute Redis CLI commands to list and kill clients
22
- client_list_command = f'redis-cli -h {redis_host} -p {redis_port} CLIENT LIST'
23
- client_list_output = subprocess.check_output(client_list_command, shell=True).decode('utf-8')
24
 
25
- client_ids = [line.split(' ')[0].split('=')[1] for line in client_list_output.strip().split('\n') if 'id=' in line]
26
 
27
- for client_id in client_ids:
28
- kill_command = f'redis-cli -h {redis_host} -p {redis_port} CLIENT KILL ID {client_id}'
29
- subprocess.check_call(kill_command, shell=True)
30
 
31
- print("Successfully killed all Redis clients.")
32
 
33
- except Exception as e:
34
- print(f"An error occurred: {e}")
35
 
36
- if __name__ == "__main__":
37
- kill_all_redis_clients()
 
 
 
 
 
1
 
2
+ heroku redis:cli -a brightly-ai-db
3
+ CLIENT KILL TYPE normal
4
 
 
 
5
 
6
+
7
+
8
+ # import os
9
+ # import subprocess
10
+ # from dotenv import load_dotenv
11
+ # from redis import Redis
12
+
13
+ # load_dotenv()
14
+
15
+ # REDIS_URL = os.environ['REDIS_URL']
16
+ # redis_client = Redis.from_url(REDIS_URL)
17
+
18
+ # def kill_all_redis_clients():
19
+ # try:
20
+ # # Fetch Redis URL from environment variable
21
+ # if not REDIS_URL:
22
+ # raise ValueError("REDIS_URL is not set in the environment variables.")
23
 
24
+ # # Extract host and port from the Redis URL
25
+ # redis_host = REDIS_URL.split("//")[-1].split("@")[1].split(":")[0]
26
+ # redis_port = REDIS_URL.split("//")[-1].split("@")[1].split(":")[1]
27
 
28
+ # # Execute Redis CLI commands to list and kill clients
29
+ # client_list_command = f'redis-cli -h {redis_host} -p {redis_port} CLIENT LIST'
30
+ # client_list_output = subprocess.check_output(client_list_command, shell=True).decode('utf-8')
31
 
32
+ # client_ids = [line.split(' ')[0].split('=')[1] for line in client_list_output.strip().split('\n') if 'id=' in line]
33
 
34
+ # for client_id in client_ids:
35
+ # kill_command = f'redis-cli -h {redis_host} -p {redis_port} CLIENT KILL ID {client_id}'
36
+ # subprocess.check_call(kill_command, shell=True)
37
 
38
+ # print("Successfully killed all Redis clients.")
39
 
40
+ # except Exception as e:
41
+ # print(f"An error occurred: {e}")
42
 
43
+ # if __name__ == "__main__":
44
+ # kill_all_redis_clients()
redis_queue.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ import os
2
+ from dotenv import load_dotenv
3
+
4
+ load_dotenv()
5
+
6
+ REDIS_URL = os.environ['REDIS_URL']
requirements.txt CHANGED
@@ -13,6 +13,7 @@ psycopg2-binary==2.9.9
13
  python-dotenv==1.0.1
14
  python-Levenshtein==0.25.1
15
  redis==5.0.7
 
16
  requests==2.32.3
17
  sentence_transformers==3.0.1
18
  spacy==3.7.5
 
13
  python-dotenv==1.0.1
14
  python-Levenshtein==0.25.1
15
  redis==5.0.7
16
+ rq==1.16.2
17
  requests==2.32.3
18
  sentence_transformers==3.0.1
19
  spacy==3.7.5
run.py CHANGED
@@ -1,14 +1,28 @@
 
1
  import os
2
  import time
3
  import cProfile
4
  import pstats
5
  import pandas as pd
 
6
  from algo import Algo
7
  from db.db_utils import get_connection
 
 
 
 
 
 
 
 
 
 
 
 
8
 
9
  if __name__ == "__main__":
10
- db_conn = get_connection()
11
- db_cursor = db_conn.cursor()
12
  # raw_file_name = 'food-forward-2022-raw-data.csv'
13
  # raw_file_name = 'MFB-2023-raw-data.csv'
14
 
@@ -20,74 +34,11 @@ if __name__ == "__main__":
20
  # for raw_file_name in ['sharing-excess-2020-raw-data.csv', 'sharing-excess-2021-raw-data.csv', 'sharing-excess-2022-raw-data.csv', 'sharing-excess-2023-raw-data.csv']:
21
  # for raw_file_name in ['spoonfuls-2023-Raw-Data.csv']:
22
  for raw_file_name in raw_files:
23
- if not raw_file_name.endswith('.csv'):
24
- continue
25
-
26
- # chop off the extension for the results run key
27
- # result_file_name = raw_file_name.split('.')[0]
28
- # run_key = f"{result_file_name}-{int(time.time())}"
29
- run_key = raw_file_name.split('.')[0]
30
- print(f"Processing {raw_file_name}")
31
-
32
-
33
- # Check if the file is in the run_meta table
34
- db_cursor.execute('SELECT run_key FROM run_meta WHERE run_key = %s', (run_key,))
35
- run_meta_row = db_cursor.fetchone()
36
- if not run_meta_row:
37
- # prompt the user for the organization_id and year
38
- # the user can select from a list of organizations
39
- db_cursor.execute('SELECT id, name FROM organizations')
40
- organizations = db_cursor.fetchall()
41
- for i, org in enumerate(organizations):
42
- print(f"{i+1}. {org[1]}")
43
- org_choice = int(input("Select an organization: "))
44
-
45
- organization_id = organizations[org_choice-1][0]
46
-
47
- year = int(input("Enter the year: "))
48
-
49
- db_cursor.execute('INSERT INTO run_meta (run_key, organization_id, year) VALUES (%s, %s, %s)', (run_key, organization_id, year))
50
- db_conn.commit()
51
-
52
- # find the number of rows that were already processed associated with this run key
53
- db_cursor.execute('SELECT run_row FROM results WHERE run_key = %s ORDER BY run_row DESC', (run_key,))
54
-
55
- # get the last row that was processed
56
- last_row = db_cursor.fetchone()
57
-
58
- input_file_path = f'./raw/{raw_file_name}'
59
- df_input = pd.read_csv(input_file_path)
60
-
61
- # Convert column headers to lowercase
62
- df_input.columns = df_input.columns.str.lower()
63
-
64
- descriptions = df_input['description'].astype(str).tolist()
65
- descriptions2 = df_input.get('description2', pd.Series([None] * len(df_input))).astype(str).tolist()
66
- donors = df_input['donor'].astype(str).tolist()
67
- dates = df_input['date'].astype(str).tolist()
68
- weights = df_input['weight'].astype(str).tolist()
69
-
70
- input_data = [(desc, desc2, i + 2, donor, date, weight) for i, (desc, desc2, donor, date, weight) in enumerate(zip(descriptions, descriptions2, donors, dates, weights))]
71
-
72
- # run_row is the the last row from the CSV file that was processed, so let's offset from there
73
- num_rows = len(input_data)
74
- last_row_num = 0
75
- if last_row:
76
- last_row_num = last_row[0] - 1
77
-
78
- # num_rows is the total number of rows in the CSV file but we add one row for the header
79
- print(f"CSV has {num_rows+1} rows")
80
- csv_complete = last_row_num >= num_rows
81
- print("CSV is complete" if csv_complete else "CSV is not complete")
82
- if not csv_complete:
83
- print(f"Starting at row #{last_row_num + 1}")
84
-
85
- input_data = input_data[last_row_num:]
86
-
87
- algo = Algo(db_conn, run_key)
88
- algo.match_words(input_data)
89
 
 
90
 
91
  # algo.match_words([['bananas']])
92
 
93
- db_conn.close()
 
1
+ # run.py
2
  import os
3
  import time
4
  import cProfile
5
  import pstats
6
  import pandas as pd
7
+ from dotenv import load_dotenv
8
  from algo import Algo
9
  from db.db_utils import get_connection
10
+ from tasks import process_file
11
+ from redis import Redis
12
+ from rq import Queue
13
+
14
+ load_dotenv()
15
+
16
+ REDIS_URL = os.environ['REDIS_URL']
17
+ WORKER_TIMEOUT = 7200 # 2 hours
18
+
19
+ redis_conn = Redis.from_url(REDIS_URL)
20
+ q = Queue('default', connection=redis_conn)
21
+
22
 
23
  if __name__ == "__main__":
24
+ # db_conn = get_connection()
25
+ # db_cursor = db_conn.cursor()
26
  # raw_file_name = 'food-forward-2022-raw-data.csv'
27
  # raw_file_name = 'MFB-2023-raw-data.csv'
28
 
 
34
  # for raw_file_name in ['sharing-excess-2020-raw-data.csv', 'sharing-excess-2021-raw-data.csv', 'sharing-excess-2022-raw-data.csv', 'sharing-excess-2023-raw-data.csv']:
35
  # for raw_file_name in ['spoonfuls-2023-Raw-Data.csv']:
36
  for raw_file_name in raw_files:
37
+ job = q.enqueue(process_file, raw_file_name, job_timeout=WORKER_TIMEOUT)
38
+ print(f"Task enqueued with job ID: {job.id}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
39
 
40
+ # process_file.delay(raw_file_name)
41
 
42
  # algo.match_words([['bananas']])
43
 
44
+ # db_conn.close()
tasks.py CHANGED
@@ -1,14 +1,17 @@
1
  # tasks.py
2
  import os
3
  import logging
4
- from redis import Redis
 
 
5
  from dotenv import load_dotenv
6
- from celery import Celery
 
 
7
  from db.db_utils import get_connection, store_result_to_db
8
 
9
  load_dotenv()
10
 
11
- # REDIS_URL = os.environ['REDIS_URL']
12
  # app = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)
13
 
14
  # app.conf.update(
@@ -22,10 +25,81 @@ load_dotenv()
22
  # )
23
 
24
  # @app.task
25
- db_conn = get_connection()
26
- db_cursor = db_conn.cursor()
27
- def insert_result(run_key, mappings):
28
- for mapping in mappings:
29
- store_result_to_db(db_cursor, db_conn, run_key, mapping)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
 
31
- # db_conn.close()
 
 
1
  # tasks.py
2
  import os
3
  import logging
4
+ import time
5
+ import pandas as pd
6
+ from algo import Algo
7
  from dotenv import load_dotenv
8
+ from redis import Redis
9
+ from rq import Queue
10
+ # from celery import Celery
11
  from db.db_utils import get_connection, store_result_to_db
12
 
13
  load_dotenv()
14
 
 
15
  # app = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)
16
 
17
  # app.conf.update(
 
25
  # )
26
 
27
  # @app.task
28
+ # def insert_result(db_conn, run_key, mappings):
29
+ # db_cursor = db_conn.cursor()
30
+ # for mapping in mappings:
31
+ # store_result_to_db(db_cursor, db_conn, run_key, mapping)
32
+
33
+ # @app.task
34
+ def process_file(raw_file_name):
35
+ print(f"Processing {raw_file_name}")
36
+ if not raw_file_name.endswith('.csv'):
37
+ return
38
+
39
+ # chop off the extension for the results run key
40
+ # result_file_name = raw_file_name.split('.')[0]
41
+ # run_key = f"{result_file_name}-{int(time.time())}"
42
+ run_key = raw_file_name.split('.')[0]
43
+ print(f"Processing {raw_file_name}")
44
+
45
+ db_conn = get_connection()
46
+ db_cursor = db_conn.cursor()
47
+
48
+ print("obtained db connection")
49
+
50
+ # Check if the file is in the run_meta table
51
+ db_cursor.execute('SELECT run_key FROM run_meta WHERE run_key = %s', (run_key,))
52
+ run_meta_row = db_cursor.fetchone()
53
+ if not run_meta_row:
54
+ # prompt the user for the organization_id and year
55
+ # the user can select from a list of organizations
56
+ db_cursor.execute('SELECT id, name FROM organizations')
57
+ organizations = db_cursor.fetchall()
58
+ for i, org in enumerate(organizations):
59
+ print(f"{i+1}. {org[1]}")
60
+ org_choice = int(input("Select an organization: "))
61
+
62
+ organization_id = organizations[org_choice-1][0]
63
+
64
+ year = int(input("Enter the year: "))
65
+
66
+ db_cursor.execute('INSERT INTO run_meta (run_key, organization_id, year) VALUES (%s, %s, %s)', (run_key, organization_id, year))
67
+ db_conn.commit()
68
+
69
+ # find the number of rows that were already processed associated with this run key
70
+ db_cursor.execute('SELECT run_row FROM results WHERE run_key = %s ORDER BY run_row DESC', (run_key,))
71
+
72
+ # get the last row that was processed
73
+ last_row = db_cursor.fetchone()
74
+
75
+ input_file_path = f'./raw/{raw_file_name}'
76
+ df_input = pd.read_csv(input_file_path)
77
+
78
+ # Convert column headers to lowercase
79
+ df_input.columns = df_input.columns.str.lower()
80
+
81
+ descriptions = df_input['description'].astype(str).tolist()
82
+ descriptions2 = df_input.get('description2', pd.Series([None] * len(df_input))).astype(str).tolist()
83
+ donors = df_input['donor'].astype(str).tolist()
84
+ dates = df_input['date'].astype(str).tolist()
85
+ weights = df_input['weight'].astype(str).tolist()
86
+
87
+ input_data = [(desc, desc2, i + 2, donor, date, weight) for i, (desc, desc2, donor, date, weight) in enumerate(zip(descriptions, descriptions2, donors, dates, weights))]
88
+
89
+ # run_row is the the last row from the CSV file that was processed, so let's offset from there
90
+ num_rows = len(input_data)
91
+ last_row_num = 0
92
+ if last_row:
93
+ last_row_num = last_row[0] - 1
94
+
95
+ # num_rows is the total number of rows in the CSV file but we add one row for the header
96
+ print(f"CSV has {num_rows+1} rows")
97
+ csv_complete = last_row_num >= num_rows
98
+ print("CSV is complete" if csv_complete else "CSV is not complete")
99
+ if not csv_complete:
100
+ print(f"Starting at row #{last_row_num + 1}")
101
+
102
+ input_data = input_data[last_row_num:]
103
 
104
+ algo = Algo(db_conn, run_key)
105
+ algo.match_words(input_data)
tasks.py.orig DELETED
@@ -1,44 +0,0 @@
1
- # tasks.py
2
- import os
3
- import logging
4
- from redis import Redis
5
- from dotenv import load_dotenv
6
- from celery import Celery
7
- from db.db_utils import get_connection, store_result_to_db
8
-
9
- load_dotenv()
10
-
11
- # REDIS_URL = os.environ['REDIS_URL']
12
- # app = Celery('tasks', broker=REDIS_URL, backend=REDIS_URL)
13
-
14
- # app.conf.update(
15
- # result_expires=3600,
16
- # task_serializer='json',
17
- # result_serializer='json',
18
- # accept_content=['json'],
19
- # timezone='UTC',
20
- # enable_utc=True,
21
- # broker_connection_retry_on_startup=True
22
- # )
23
- <<<<<<< HEAD
24
-
25
- # @app.task
26
- db_conn = get_connection()
27
- db_cursor = db_conn.cursor()
28
-
29
- =======
30
-
31
- # @app.task
32
- db_conn = get_connection()
33
- db_cursor = db_conn.cursor()
34
- >>>>>>> e938c9da41dfd18544b4ea8aa7107f56cf05b2f2
35
- def insert_result(run_key, mappings):
36
- for mapping in mappings:
37
- store_result_to_db(db_cursor, db_conn, run_key, mapping)
38
-
39
- <<<<<<< HEAD
40
-
41
- # db_conn.close()
42
- =======
43
- # db_conn.close()
44
- >>>>>>> e938c9da41dfd18544b4ea8aa7107f56cf05b2f2