gistillery / src /webservice.py
Benjamin Bossan
Initial commit
a240da9
import logging
import uuid
from fastapi import FastAPI
from base import EntriesResult, JobStatus, JobStatusResult, RequestInput
from db import TABLES, get_db_cursor
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
app = FastAPI()
@app.post("/submit/")
def submit_job(input: RequestInput) -> str:
# submit a new job, poor man's job queue
_id = uuid.uuid4().hex
logger.info(f"Submitting job for (_id={_id[:8]})")
with get_db_cursor() as cursor:
# create a job
query = "INSERT INTO jobs (entry_id, status) VALUES (?, ?)"
cursor.execute(query, (_id, "pending"))
# create an entry
query = "INSERT INTO entries (id, author, source) VALUES (?, ?, ?)"
cursor.execute(query, (_id, input.author, input.content))
return f"Submitted job {_id}"
@app.get("/check_status/{_id}")
def check_status(_id: str) -> JobStatusResult:
with get_db_cursor() as cursor:
cursor.execute(
"SELECT status, last_updated FROM jobs WHERE entry_id = ?", (_id,)
)
result = cursor.fetchone()
if result is None:
return JobStatusResult(id=_id, status=JobStatus.not_found, last_updated=None)
status, last_updated = result
return JobStatusResult(id=_id, status=status, last_updated=last_updated)
@app.get("/recent/")
def recent() -> list[EntriesResult]:
with get_db_cursor() as cursor:
# get the last 10 entries, join summary and tag, where each tag is
# joined to a comma separated str
cursor.execute("""
SELECT e.id, e.author, s.summary, GROUP_CONCAT(t.tag, ","), e.created_at
FROM entries e
JOIN summaries s ON e.id = s.entry_id
JOIN tags t ON e.id = t.entry_id
GROUP BY e.id
ORDER BY e.created_at DESC
LIMIT 10
""")
results = cursor.fetchall()
entries = []
for _id, author, summary, tags, date in results:
entry = EntriesResult(
id=_id, author=author, summary=summary, tags=tags.split(","), date=date
)
entries.append(entry)
return entries
@app.get("/recent/{tag}")
def recent_tag(tag: str) -> list[EntriesResult]:
if not tag.startswith("#"):
tag = "#" + tag
# same as recent, but filter by tag
with get_db_cursor() as cursor:
cursor.execute(
"""
SELECT e.id, e.author, s.summary, GROUP_CONCAT(t.tag, ","), e.created_at
FROM entries e
JOIN summaries s ON e.id = s.entry_id
JOIN tags t ON e.id = t.entry_id
WHERE t.tag = ?
GROUP BY e.id
ORDER BY e.created_at DESC
LIMIT 10
""",
(tag,),
)
results = cursor.fetchall()
entries = []
for _id, author, summary, tags, date in results:
entry = EntriesResult(
id=_id, author=author, summary=summary, tags=tags.split(","), date=date
)
entries.append(entry)
return entries
@app.get("/clear/")
def clear() -> str:
# clear all tables
logger.warning("Clearing all tables")
with get_db_cursor() as cursor:
for table_name in TABLES:
cursor.execute(f"DELETE FROM {table_name}")
return "OK"