gistillery / src /gistillery /webservice.py
Benjamin Bossan
Add a source snippet to the results
71965fb
import datetime as dt
import logging
import reprlib
import sqlite3
import uuid
from fastapi import FastAPI
from fastapi.responses import FileResponse
from gistillery.base import EntriesResult, JobStatus, JobStatusResult, RequestInput
from gistillery.db import TABLES, get_db_connection, get_db_cursor
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
app = FastAPI()
aRepr = reprlib.Repr()
aRepr.maxstring = 140
# status
@app.get("/status/")
def status() -> str:
return "OK"
@app.post("/submit/")
def submit_job(input: RequestInput) -> str:
# submit a new job, poor man's job queue
_id = uuid.uuid4().hex
logger.info(f"Submitting job for (_id={_id[:8]})")
with get_db_cursor() as cursor:
# create a job
query = "INSERT INTO jobs (entry_id, status) VALUES (?, ?)"
cursor.execute(query, (_id, "pending"))
# create an entry
source_snippet = aRepr.repr(input.content).strip("'")
query = (
"INSERT INTO entries (id, author, source, source_snippet) VALUES "
"(?, ?, ?, ?)"
)
cursor.execute(query, (_id, input.author, input.content, source_snippet))
return f"Submitted job {_id}"
@app.get("/check_job_status/")
def check_job_status() -> str:
with get_db_cursor() as cursor:
cursor.execute(
"SELECT entry_id "
"FROM jobs WHERE status = 'pending' "
"ORDER BY last_updated ASC"
)
result = cursor.fetchall()
if not result:
return "No pending jobs found"
entry_ids = [r.entry_id for r in result]
num_entries = len(entry_ids)
if len(entry_ids) > 3:
entry_ids = entry_ids[:3] + ["..."]
return f"Found {num_entries} pending job(s): {', '.join(entry_ids)}"
@app.get("/check_job_status/{_id}")
def check_job_status_id(_id: str) -> JobStatusResult:
with get_db_cursor() as cursor:
cursor.execute(
"SELECT status, last_updated FROM jobs WHERE entry_id = ?", (_id,)
)
result = cursor.fetchone()
if result is None:
return JobStatusResult(id=_id, status=JobStatus.not_found, last_updated=None)
status, last_updated = result
return JobStatusResult(id=_id, status=status, last_updated=last_updated)
@app.get("/recent/")
def recent() -> list[EntriesResult]:
with get_db_cursor() as cursor:
# get the last 10 entries, join summary and tag, where each tag is
# joined to a comma separated str
cursor.execute("""
SELECT
e.id, e.author, e.created_at, e.source_snippet,
s.summary, GROUP_CONCAT(t.tag, ",") tags
FROM entries e
JOIN summaries s ON e.id = s.entry_id
JOIN tags t ON e.id = t.entry_id
GROUP BY e.id
ORDER BY e.created_at DESC
LIMIT 100
""")
results = cursor.fetchall()
entries = []
for row in results:
entry = EntriesResult(
id=row.id,
author=row.author,
summary=row.summary,
source_snippet=row.source_snippet,
tags=row.tags.split(","),
date=row.created_at,
)
entries.append(entry)
return entries
@app.get("/recent/{tag}")
def recent_tag(tag: str) -> list[EntriesResult]:
tags = tag.split(",")
tags = ["#" + tag for tag in tags if not tag.startswith("#")]
# same as recent, but filter by tags, where at least one tag matches
with get_db_cursor() as cursor:
cursor.execute(
"""
SELECT
e.id, e.author, e.source_snippet, e.created_at,
s.summary, GROUP_CONCAT(t.tag, ",") tags
FROM entries e
JOIN summaries s ON e.id = s.entry_id
JOIN tags t ON e.id = t.entry_id
WHERE e.id IN (
SELECT entry_id FROM tags WHERE tag IN ({})
)
GROUP BY e.id
ORDER BY e.created_at DESC
LIMIT 100
""".format(",".join("?" * len(tags))),
tags,
)
results = cursor.fetchall()
entries = []
for row in results:
entry = EntriesResult(
id=row.id,
author=row.author,
summary=row.summary,
source_snippet=row.source_snippet,
tags=row.tags.split(","),
date=row.created_at,
)
entries.append(entry)
return entries
@app.get("/tag_counts/")
def tag_counts() -> dict[str, int]:
with get_db_cursor() as cursor:
cursor.execute("""
SELECT tag, COUNT(*) count
FROM tags
GROUP BY tag
ORDER BY count DESC
""")
results = cursor.fetchall()
return {tag: count for tag, count in results}
@app.get("/clear/")
def clear() -> str:
# clear all tables
logger.warning("Clearing all tables")
with get_db_cursor() as cursor:
for table_name in TABLES:
cursor.execute(f"DELETE FROM {table_name}")
return "OK"
@app.get("/backup/")
def backup() -> FileResponse:
# create a backup and return it
def progress(status: int, remaining: int, total: int) -> None:
logger.debug(f"DB: Copied {total-remaining} of {total} pages...")
now = dt.datetime.now(dt.timezone.utc)
fname = f"sqlite-data_backup_{now.strftime('%Y-%m-%d_%H-%M-%S')}.db"
try:
conn = get_db_connection()
backup_db = sqlite3.connect(fname)
with backup_db:
conn.backup(backup_db, pages=1, progress=progress)
except Exception as e:
logger.error(f"Error creating backup: {e}")
conn.close()
backup_db.close()
raise e
return FileResponse(fname, media_type="application/octet-stream", filename=fname)