import time from base import JobInput from db import get_db_cursor from ml import ProcessorRegistry, Summarizer, Tagger SLEEP_INTERVAL = 5 processor_registry = ProcessorRegistry() summarizer = Summarizer() tagger = Tagger() print("loaded ML models") def check_pending_jobs() -> list[JobInput]: """Check DB for pending jobs""" with get_db_cursor() as cursor: # fetch pending jobs, join authro and content from entries table query = """ SELECT j.entry_id, e.author, e.source FROM jobs j JOIN entries e ON j.entry_id = e.id WHERE j.status = 'pending' """ res = list(cursor.execute(query)) return [ JobInput(id=_id, author=author, content=content) for _id, author, content in res ] def store( job: JobInput, *, summary: str, tags: list[str], processor_name: str, summarizer_name: str, tagger_name: str, ) -> None: with get_db_cursor() as cursor: # write to entries, summary, tags tables cursor.execute( ( "INSERT INTO summaries (entry_id, summary, summarizer_name)" " VALUES (?, ?, ?)" ), (job.id, summary, summarizer_name), ) cursor.executemany( "INSERT INTO tags (entry_id, tag, tagger_name) VALUES (?, ?, ?)", [(job.id, tag, tagger_name) for tag in tags], ) def process_job(job: JobInput) -> None: tic = time.perf_counter() print(f"Processing job for (id={job.id[:8]})") # care: acquire cursor (which leads to locking) as late as possible, since # the processing and we don't want to block other workers during that time try: processor = processor_registry.dispatch(job) processor_name = processor.get_name() processed = processor(job) tagger_name = tagger.get_name() tags = tagger(processed) summarizer_name = summarizer.get_name() summary = summarizer(processed) store( job, summary=summary, tags=tags, processor_name=processor_name, summarizer_name=summarizer_name, tagger_name=tagger_name, ) # update job status to done with get_db_cursor() as cursor: cursor.execute( "UPDATE jobs SET status = 'done' WHERE entry_id = ?", (job.id,) ) except Exception as e: # update job status to failed with get_db_cursor() as cursor: cursor.execute( "UPDATE jobs SET status = 'failed' WHERE entry_id = ?", (job.id,) ) print(f"Failed to process job for (id={job.id[:8]}): {e}") toc = time.perf_counter() print(f"Finished processing job (id={job.id[:8]}) in {toc - tic:0.3f} seconds") def main() -> None: while True: jobs = check_pending_jobs() if not jobs: print("No pending jobs found, sleeping...") time.sleep(SLEEP_INTERVAL) continue print(f"Found {len(jobs)} pending job(s), processing...") for job in jobs: process_job(job) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("Shutting down...") exit(0)