gistillery / src /gistillery /webservice.py
Benjamin Bossan
Move stuff around, make installable
e6fd86e
raw
history blame
3.39 kB
import logging
import uuid
from fastapi import FastAPI
from gistillery.base import EntriesResult, JobStatus, JobStatusResult, RequestInput
from gistillery.db import TABLES, get_db_cursor
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
app = FastAPI()
# status
@app.get("/status/")
def status() -> str:
return "OK"
@app.post("/submit/")
def submit_job(input: RequestInput) -> str:
# submit a new job, poor man's job queue
_id = uuid.uuid4().hex
logger.info(f"Submitting job for (_id={_id[:8]})")
with get_db_cursor() as cursor:
# create a job
query = "INSERT INTO jobs (entry_id, status) VALUES (?, ?)"
cursor.execute(query, (_id, "pending"))
# create an entry
query = "INSERT INTO entries (id, author, source) VALUES (?, ?, ?)"
cursor.execute(query, (_id, input.author, input.content))
return f"Submitted job {_id}"
@app.get("/check_status/{_id}")
def check_status(_id: str) -> JobStatusResult:
with get_db_cursor() as cursor:
cursor.execute(
"SELECT status, last_updated FROM jobs WHERE entry_id = ?", (_id,)
)
result = cursor.fetchone()
if result is None:
return JobStatusResult(id=_id, status=JobStatus.not_found, last_updated=None)
status, last_updated = result
return JobStatusResult(id=_id, status=status, last_updated=last_updated)
@app.get("/recent/")
def recent() -> list[EntriesResult]:
with get_db_cursor() as cursor:
# get the last 10 entries, join summary and tag, where each tag is
# joined to a comma separated str
cursor.execute("""
SELECT e.id, e.author, s.summary, GROUP_CONCAT(t.tag, ","), e.created_at
FROM entries e
JOIN summaries s ON e.id = s.entry_id
JOIN tags t ON e.id = t.entry_id
GROUP BY e.id
ORDER BY e.created_at DESC
LIMIT 10
""")
results = cursor.fetchall()
entries = []
for _id, author, summary, tags, date in results:
entry = EntriesResult(
id=_id, author=author, summary=summary, tags=tags.split(","), date=date
)
entries.append(entry)
return entries
@app.get("/recent/{tag}")
def recent_tag(tag: str) -> list[EntriesResult]:
if not tag.startswith("#"):
tag = "#" + tag
# same as recent, but filter by tag
with get_db_cursor() as cursor:
cursor.execute(
"""
SELECT e.id, e.author, s.summary, GROUP_CONCAT(t.tag, ","), e.created_at
FROM entries e
JOIN summaries s ON e.id = s.entry_id
JOIN tags t ON e.id = t.entry_id
WHERE t.tag = ?
GROUP BY e.id
ORDER BY e.created_at DESC
LIMIT 10
""",
(tag,),
)
results = cursor.fetchall()
entries = []
for _id, author, summary, tags, date in results:
entry = EntriesResult(
id=_id, author=author, summary=summary, tags=tags.split(","), date=date
)
entries.append(entry)
return entries
@app.get("/clear/")
def clear() -> str:
# clear all tables
logger.warning("Clearing all tables")
with get_db_cursor() as cursor:
for table_name in TABLES:
cursor.execute(f"DELETE FROM {table_name}")
return "OK"