gistillery / src /db.py
Benjamin Bossan
Initial commit
a240da9
import logging
import sqlite3
from contextlib import contextmanager
from typing import Generator
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
schema_entries = """
CREATE TABLE entries
(
id TEXT PRIMARY KEY,
author TEXT NOT NULL,
source TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
# create schema for 'summary' table, id is a uuid4
schema_summary = """
CREATE TABLE summaries
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
entry_id TEXT NOT NULL,
summary TEXT NOT NULL,
summarizer_name TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(entry_id) REFERENCES entries(id)
)
"""
schema_tag = """
CREATE TABLE tags
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
entry_id TEXT NOT NULL,
tag TEXT NOT NULL,
tagger_name TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(entry_id) REFERENCES entries(id)
)
"""
schema_job = """
CREATE TABLE jobs
(
id INTEGER PRIMARY KEY AUTOINCREMENT,
entry_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY(entry_id) REFERENCES entries(id)
)
"""
TABLES = {
"entries": schema_entries,
"summaries": schema_summary,
"tags": schema_tag,
"jobs": schema_job,
}
TABLES_CREATED = False
def _get_db_connection() -> sqlite3.Connection:
global TABLES_CREATED
# sqlite cannot deal with concurrent access, so we set a big timeout
conn = sqlite3.connect("sqlite-data.db", timeout=30)
if TABLES_CREATED:
return conn
cursor = conn.cursor()
# create tables if needed
for table_name, schema in TABLES.items():
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,),
)
table_exists = cursor.fetchone() is not None
if not table_exists:
logger.info(f"'{table_name}' table does not exist, creating it now...")
cursor.execute(schema)
conn.commit()
logger.info("done")
TABLES_CREATED = True
return conn
@contextmanager
def get_db_cursor() -> Generator[sqlite3.Cursor, None, None]:
conn = _get_db_connection()
cursor = conn.cursor()
try:
yield cursor
finally:
conn.commit()
cursor.close()
conn.close()