gistillery / src /worker.py
Benjamin Bossan
Initial commit
a240da9
import time
from base import JobInput
from db import get_db_cursor
from ml import ProcessorRegistry, Summarizer, Tagger
SLEEP_INTERVAL = 5
processor_registry = ProcessorRegistry()
summarizer = Summarizer()
tagger = Tagger()
print("loaded ML models")
def check_pending_jobs() -> list[JobInput]:
"""Check DB for pending jobs"""
with get_db_cursor() as cursor:
# fetch pending jobs, join authro and content from entries table
query = """
SELECT j.entry_id, e.author, e.source
FROM jobs j
JOIN entries e
ON j.entry_id = e.id
WHERE j.status = 'pending'
"""
res = list(cursor.execute(query))
return [
JobInput(id=_id, author=author, content=content) for _id, author, content in res
]
def store(
job: JobInput,
*,
summary: str,
tags: list[str],
processor_name: str,
summarizer_name: str,
tagger_name: str,
) -> None:
with get_db_cursor() as cursor:
# write to entries, summary, tags tables
cursor.execute(
(
"INSERT INTO summaries (entry_id, summary, summarizer_name)"
" VALUES (?, ?, ?)"
),
(job.id, summary, summarizer_name),
)
cursor.executemany(
"INSERT INTO tags (entry_id, tag, tagger_name) VALUES (?, ?, ?)",
[(job.id, tag, tagger_name) for tag in tags],
)
def process_job(job: JobInput) -> None:
tic = time.perf_counter()
print(f"Processing job for (id={job.id[:8]})")
# care: acquire cursor (which leads to locking) as late as possible, since
# the processing and we don't want to block other workers during that time
try:
processor = processor_registry.dispatch(job)
processor_name = processor.get_name()
processed = processor(job)
tagger_name = tagger.get_name()
tags = tagger(processed)
summarizer_name = summarizer.get_name()
summary = summarizer(processed)
store(
job,
summary=summary,
tags=tags,
processor_name=processor_name,
summarizer_name=summarizer_name,
tagger_name=tagger_name,
)
# update job status to done
with get_db_cursor() as cursor:
cursor.execute(
"UPDATE jobs SET status = 'done' WHERE entry_id = ?", (job.id,)
)
except Exception as e:
# update job status to failed
with get_db_cursor() as cursor:
cursor.execute(
"UPDATE jobs SET status = 'failed' WHERE entry_id = ?", (job.id,)
)
print(f"Failed to process job for (id={job.id[:8]}): {e}")
toc = time.perf_counter()
print(f"Finished processing job (id={job.id[:8]}) in {toc - tic:0.3f} seconds")
def main() -> None:
while True:
jobs = check_pending_jobs()
if not jobs:
print("No pending jobs found, sleeping...")
time.sleep(SLEEP_INTERVAL)
continue
print(f"Found {len(jobs)} pending job(s), processing...")
for job in jobs:
process_job(job)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Shutting down...")
exit(0)