Spaces:
Runtime error
Runtime error
import sys | |
import time | |
from dataclasses import dataclass | |
from gistillery.base import JobInput | |
from gistillery.db import get_db_cursor | |
from gistillery.registry import ToolRegistry, get_tool_registry | |
SLEEP_INTERVAL = 5 | |
def check_pending_jobs() -> list[JobInput]: | |
"""Check DB for pending jobs""" | |
with get_db_cursor() as cursor: | |
# fetch pending jobs, join author and content from entries table | |
query = """ | |
SELECT j.entry_id, e.author, e.source | |
FROM jobs j | |
JOIN entries e | |
ON j.entry_id = e.id | |
WHERE j.status = 'pending' | |
""" | |
res = cursor.execute(query).fetchall() | |
return [ | |
JobInput(id=_id, author=author, content=content) for _id, author, content in res | |
] | |
class JobOutput: | |
processed: str | |
summary: str | |
tags: list[str] | |
processor_name: str | |
summarizer_name: str | |
tagger_name: str | |
def _process_job(job: JobInput, registry: ToolRegistry) -> JobOutput: | |
processor = registry.get_processor(job) | |
processor_name = processor.get_name() | |
processed = processor(job) | |
tagger = registry.get_tagger() | |
tagger_name = tagger.get_name() | |
tags = tagger(processed) | |
summarizer = registry.get_summarizer() | |
summarizer_name = summarizer.get_name() | |
summary = summarizer(processed) | |
return JobOutput( | |
processed=processed, | |
summary=summary, | |
tags=tags, | |
processor_name=processor_name, | |
summarizer_name=summarizer_name, | |
tagger_name=tagger_name, | |
) | |
def store(job: JobInput, output: JobOutput) -> None: | |
with get_db_cursor() as cursor: | |
cursor.execute( | |
"INSERT INTO inputs (entry_id, input) VALUES (?, ?)", | |
(job.id, output.processed), | |
) | |
cursor.execute( | |
( | |
"INSERT INTO summaries (entry_id, summary, summarizer_name)" | |
" VALUES (?, ?, ?)" | |
), | |
(job.id, output.summary, output.summarizer_name), | |
) | |
cursor.executemany( | |
"INSERT INTO tags (entry_id, tag, tagger_name) VALUES (?, ?, ?)", | |
[(job.id, tag, output.tagger_name) for tag in output.tags], | |
) | |
def process_job(job: JobInput, registry: ToolRegistry) -> None: | |
tic = time.perf_counter() | |
print(f"Processing job for (id={job.id[:8]})") | |
# care: acquire cursor (which leads to locking) as late as possible, since | |
# the processing and we don't want to block other workers during that time | |
try: | |
output = _process_job(job, registry) | |
store(job, output) | |
# update job status to done | |
with get_db_cursor() as cursor: | |
cursor.execute( | |
"UPDATE jobs SET status = 'done' WHERE entry_id = ?", (job.id,) | |
) | |
except Exception as e: | |
# update job status to failed | |
with get_db_cursor() as cursor: | |
cursor.execute( | |
"UPDATE jobs SET status = 'failed' WHERE entry_id = ?", (job.id,) | |
) | |
print(f"Failed to process job for (id={job.id[:8]}): {e}") | |
toc = time.perf_counter() | |
print(f"Finished processing job (id={job.id[:8]}) in {toc - tic:0.3f} seconds") | |
def main() -> None: | |
registry = get_tool_registry() | |
while True: | |
jobs = check_pending_jobs() | |
if not jobs: | |
print("No pending jobs found, sleeping...") | |
time.sleep(SLEEP_INTERVAL) | |
continue | |
print(f"Found {len(jobs)} pending job(s), processing...") | |
for job in jobs: | |
process_job(job, registry) | |
if __name__ == "__main__": | |
try: | |
main() | |
except KeyboardInterrupt: | |
print("Shutting down...") | |
sys.exit() | |