import sys import time from dataclasses import dataclass from gistillery.base import JobInput from gistillery.db import get_db_cursor from gistillery.registry import ToolRegistry, get_tool_registry SLEEP_INTERVAL = 5 def check_pending_jobs() -> list[JobInput]: """Check DB for pending jobs""" with get_db_cursor() as cursor: # fetch pending jobs, join author and content from entries table query = """ SELECT j.entry_id, e.author, e.source FROM jobs j JOIN entries e ON j.entry_id = e.id WHERE j.status = 'pending' """ res = cursor.execute(query).fetchall() return [ JobInput(id=_id, author=author, content=content) for _id, author, content in res ] @dataclass class JobOutput: processed: str summary: str tags: list[str] processor_name: str summarizer_name: str tagger_name: str def _process_job(job: JobInput, registry: ToolRegistry) -> JobOutput: processor = registry.get_processor(job) processor_name = processor.get_name() processed = processor(job) tagger = registry.get_tagger() tagger_name = tagger.get_name() tags = tagger(processed) summarizer = registry.get_summarizer() summarizer_name = summarizer.get_name() summary = summarizer(processed) return JobOutput( processed=processed, summary=summary, tags=tags, processor_name=processor_name, summarizer_name=summarizer_name, tagger_name=tagger_name, ) def store(job: JobInput, output: JobOutput) -> None: with get_db_cursor() as cursor: cursor.execute( "INSERT INTO inputs (entry_id, input) VALUES (?, ?)", (job.id, output.processed), ) cursor.execute( ( "INSERT INTO summaries (entry_id, summary, summarizer_name)" " VALUES (?, ?, ?)" ), (job.id, output.summary, output.summarizer_name), ) cursor.executemany( "INSERT INTO tags (entry_id, tag, tagger_name) VALUES (?, ?, ?)", [(job.id, tag, output.tagger_name) for tag in output.tags], ) def process_job(job: JobInput, registry: ToolRegistry) -> None: tic = time.perf_counter() print(f"Processing job for (id={job.id[:8]})") # care: acquire cursor (which leads to locking) as late as possible, since # the processing and we don't want to block other workers during that time try: output = _process_job(job, registry) store(job, output) # update job status to done with get_db_cursor() as cursor: cursor.execute( "UPDATE jobs SET status = 'done' WHERE entry_id = ?", (job.id,) ) except Exception as e: # update job status to failed with get_db_cursor() as cursor: cursor.execute( "UPDATE jobs SET status = 'failed' WHERE entry_id = ?", (job.id,) ) print(f"Failed to process job for (id={job.id[:8]}): {e}") toc = time.perf_counter() print(f"Finished processing job (id={job.id[:8]}) in {toc - tic:0.3f} seconds") def main() -> None: registry = get_tool_registry() while True: jobs = check_pending_jobs() if not jobs: print("No pending jobs found, sleeping...") time.sleep(SLEEP_INTERVAL) continue print(f"Found {len(jobs)} pending job(s), processing...") for job in jobs: process_job(job, registry) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("Shutting down...") sys.exit()