Spaces:
Runtime error
Runtime error
import time | |
from base import JobInput | |
from db import get_db_cursor | |
from ml import ProcessorRegistry, Summarizer, Tagger | |
SLEEP_INTERVAL = 5 | |
processor_registry = ProcessorRegistry() | |
summarizer = Summarizer() | |
tagger = Tagger() | |
print("loaded ML models") | |
def check_pending_jobs() -> list[JobInput]: | |
"""Check DB for pending jobs""" | |
with get_db_cursor() as cursor: | |
# fetch pending jobs, join authro and content from entries table | |
query = """ | |
SELECT j.entry_id, e.author, e.source | |
FROM jobs j | |
JOIN entries e | |
ON j.entry_id = e.id | |
WHERE j.status = 'pending' | |
""" | |
res = list(cursor.execute(query)) | |
return [ | |
JobInput(id=_id, author=author, content=content) for _id, author, content in res | |
] | |
def store( | |
job: JobInput, | |
*, | |
summary: str, | |
tags: list[str], | |
processor_name: str, | |
summarizer_name: str, | |
tagger_name: str, | |
) -> None: | |
with get_db_cursor() as cursor: | |
# write to entries, summary, tags tables | |
cursor.execute( | |
( | |
"INSERT INTO summaries (entry_id, summary, summarizer_name)" | |
" VALUES (?, ?, ?)" | |
), | |
(job.id, summary, summarizer_name), | |
) | |
cursor.executemany( | |
"INSERT INTO tags (entry_id, tag, tagger_name) VALUES (?, ?, ?)", | |
[(job.id, tag, tagger_name) for tag in tags], | |
) | |
def process_job(job: JobInput) -> None: | |
tic = time.perf_counter() | |
print(f"Processing job for (id={job.id[:8]})") | |
# care: acquire cursor (which leads to locking) as late as possible, since | |
# the processing and we don't want to block other workers during that time | |
try: | |
processor = processor_registry.dispatch(job) | |
processor_name = processor.get_name() | |
processed = processor(job) | |
tagger_name = tagger.get_name() | |
tags = tagger(processed) | |
summarizer_name = summarizer.get_name() | |
summary = summarizer(processed) | |
store( | |
job, | |
summary=summary, | |
tags=tags, | |
processor_name=processor_name, | |
summarizer_name=summarizer_name, | |
tagger_name=tagger_name, | |
) | |
# update job status to done | |
with get_db_cursor() as cursor: | |
cursor.execute( | |
"UPDATE jobs SET status = 'done' WHERE entry_id = ?", (job.id,) | |
) | |
except Exception as e: | |
# update job status to failed | |
with get_db_cursor() as cursor: | |
cursor.execute( | |
"UPDATE jobs SET status = 'failed' WHERE entry_id = ?", (job.id,) | |
) | |
print(f"Failed to process job for (id={job.id[:8]}): {e}") | |
toc = time.perf_counter() | |
print(f"Finished processing job (id={job.id[:8]}) in {toc - tic:0.3f} seconds") | |
def main() -> None: | |
while True: | |
jobs = check_pending_jobs() | |
if not jobs: | |
print("No pending jobs found, sleeping...") | |
time.sleep(SLEEP_INTERVAL) | |
continue | |
print(f"Found {len(jobs)} pending job(s), processing...") | |
for job in jobs: | |
process_job(job) | |
if __name__ == "__main__": | |
try: | |
main() | |
except KeyboardInterrupt: | |
print("Shutting down...") | |
exit(0) | |