TG-Storage / db.py
NitinBot001's picture
Update db.py
233e17d verified
"""
db.py β€” Supabase (PostgreSQL) metadata store.
Table: files
Before first run, create the table in Supabase SQL Editor:
CREATE TABLE IF NOT EXISTS files (
id UUID DEFAULT gen_random_uuid() PRIMARY KEY,
file_id TEXT UNIQUE NOT NULL,
filename TEXT NOT NULL,
mime_type TEXT NOT NULL,
size_bytes BIGINT NOT NULL,
tg_message_id BIGINT NOT NULL,
tg_file_id TEXT,
public_url TEXT NOT NULL,
custom_path TEXT UNIQUE,
uploaded_at TIMESTAMPTZ DEFAULT now()
);
"""
import os
from datetime import datetime, timezone
from typing import Optional
from supabase import create_client, Client
_supabase: Optional[Client] = None
TABLE = "files"
def _get_client() -> Client:
"""
FIX: env vars are now read INSIDE this function (not at module level).
This ensures load_dotenv() has already run before we read them,
and avoids caching empty strings at import time.
"""
global _supabase
if _supabase is None:
url = os.getenv("SUPABASE_URL", "")
key = os.getenv("SUPABASE_KEY", "")
if not url or not key:
raise RuntimeError(
"SUPABASE_URL and SUPABASE_KEY must be set in environment / .env"
)
_supabase = create_client(url, key)
return _supabase
def init_db():
"""Verify Supabase connection by performing a lightweight query."""
try:
client = _get_client()
client.table(TABLE).select("file_id").limit(1).execute()
except Exception as exc:
# Re-raise with a clearer message so _startup() can log it properly
raise RuntimeError(f"Supabase connection failed: {exc}") from exc
# ──────────────────────────────────────────────────
# CRUD
# ──────────────────────────────────────────────────
def save_file_record(
*,
file_id: str,
filename: str,
mime_type: str,
size: int,
tg_message_id: int,
tg_file_id: str | None,
public_url: str,
custom_path: str | None = None,
):
client = _get_client()
row = {
"file_id": file_id,
"filename": filename,
"mime_type": mime_type,
"size_bytes": size,
"tg_message_id": tg_message_id,
"tg_file_id": tg_file_id,
"public_url": public_url,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
}
if custom_path:
row["custom_path"] = custom_path
client.table(TABLE).insert(row).execute()
def get_file_record(file_id: str) -> dict | None:
client = _get_client()
resp = (
client.table(TABLE)
.select("*")
.eq("file_id", file_id)
.limit(1)
.execute()
)
if resp.data:
return resp.data[0]
return None
def get_file_by_custom_path(custom_path: str) -> dict | None:
client = _get_client()
resp = (
client.table(TABLE)
.select("*")
.eq("custom_path", custom_path)
.limit(1)
.execute()
)
if resp.data:
return resp.data[0]
return None
def list_file_records(limit: int = 50, offset: int = 0) -> list[dict]:
client = _get_client()
resp = (
client.table(TABLE)
.select("*")
.order("uploaded_at", desc=True)
.range(offset, offset + limit - 1)
.execute()
)
return resp.data or []
def delete_file_record(file_id: str):
client = _get_client()
client.table(TABLE).delete().eq("file_id", file_id).execute()
def count_files() -> int:
client = _get_client()
resp = (
client.table(TABLE)
.select("file_id", count="exact")
.execute()
)
return resp.count or 0