Benjamin Bossan
Add a source snippet to the results
71965fb
import sys
import time
from dataclasses import dataclass
from gistillery.base import JobInput
from gistillery.db import get_db_cursor
from gistillery.registry import ToolRegistry, get_tool_registry
SLEEP_INTERVAL = 5
def check_pending_jobs() -> list[JobInput]:
"""Check DB for pending jobs"""
with get_db_cursor() as cursor:
# fetch pending jobs, join author and content from entries table
query = """
SELECT j.entry_id, e.author, e.source
FROM jobs j
JOIN entries e
ON j.entry_id = e.id
WHERE j.status = 'pending'
"""
res = cursor.execute(query).fetchall()
return [
JobInput(id=_id, author=author, content=content) for _id, author, content in res
]
@dataclass
class JobOutput:
processed: str
summary: str
tags: list[str]
processor_name: str
summarizer_name: str
tagger_name: str
def _process_job(job: JobInput, registry: ToolRegistry) -> JobOutput:
processor = registry.get_processor(job)
processor_name = processor.get_name()
processed = processor(job)
tagger = registry.get_tagger()
tagger_name = tagger.get_name()
tags = tagger(processed)
summarizer = registry.get_summarizer()
summarizer_name = summarizer.get_name()
summary = summarizer(processed)
return JobOutput(
processed=processed,
summary=summary,
tags=tags,
processor_name=processor_name,
summarizer_name=summarizer_name,
tagger_name=tagger_name,
)
def store(job: JobInput, output: JobOutput) -> None:
with get_db_cursor() as cursor:
cursor.execute(
"INSERT INTO inputs (entry_id, input) VALUES (?, ?)",
(job.id, output.processed),
)
cursor.execute(
(
"INSERT INTO summaries (entry_id, summary, summarizer_name)"
" VALUES (?, ?, ?)"
),
(job.id, output.summary, output.summarizer_name),
)
cursor.executemany(
"INSERT INTO tags (entry_id, tag, tagger_name) VALUES (?, ?, ?)",
[(job.id, tag, output.tagger_name) for tag in output.tags],
)
def process_job(job: JobInput, registry: ToolRegistry) -> None:
tic = time.perf_counter()
print(f"Processing job for (id={job.id[:8]})")
# care: acquire cursor (which leads to locking) as late as possible, since
# the processing and we don't want to block other workers during that time
try:
output = _process_job(job, registry)
store(job, output)
# update job status to done
with get_db_cursor() as cursor:
cursor.execute(
"UPDATE jobs SET status = 'done' WHERE entry_id = ?", (job.id,)
)
except Exception as e:
# update job status to failed
with get_db_cursor() as cursor:
cursor.execute(
"UPDATE jobs SET status = 'failed' WHERE entry_id = ?", (job.id,)
)
print(f"Failed to process job for (id={job.id[:8]}): {e}")
toc = time.perf_counter()
print(f"Finished processing job (id={job.id[:8]}) in {toc - tic:0.3f} seconds")
def main() -> None:
registry = get_tool_registry()
while True:
jobs = check_pending_jobs()
if not jobs:
print("No pending jobs found, sleeping...")
time.sleep(SLEEP_INTERVAL)
continue
print(f"Found {len(jobs)} pending job(s), processing...")
for job in jobs:
process_job(job, registry)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("Shutting down...")
sys.exit()