news-whisper-api / backend /utils /db_utils.py
Devang1290
feat: deploy News Whisper on-demand search API (FastAPI + Docker)
2cb327c
"""
Database Utilities (Supabase)
=============================
Manages all Supabase operations: deduplication checks and article insertion.
Tables used:
- `registry` — tracks which article IDs have been processed (deduplication)
- `articles` — stores final processed articles with summaries and audio URLs
Configuration (required in .env):
SUPABASE_URL=https://your-project.supabase.co
SUPABASE_KEY=your_service_role_key
Usage:
from backend.utils.db_utils import DatabaseManager
db = DatabaseManager()
# Check which articles are already processed
existing = db.check_registry(["id1", "id2", "id3"])
# Insert a fully processed article
db.insert_article(article_dict)
"""
import os
from supabase import create_client, Client
from dotenv import load_dotenv
import logging
load_dotenv()
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Thread-safe Supabase client for article deduplication and storage.
If SUPABASE_URL or SUPABASE_KEY are missing, all operations gracefully
return empty results / False instead of raising.
"""
def __init__(self):
url: str = os.environ.get("SUPABASE_URL", "").strip()
key: str = os.environ.get("SUPABASE_KEY", "").strip()
# Auto-correct missing https:// prefix
if url and not url.startswith("http"):
url = f"https://{url}"
if not url or not key or url == "https://":
logger.warning("Supabase URL or Key not found. Database operations will be skipped.")
self.supabase: Client = None
else:
try:
self.supabase: Client = create_client(url, key)
except Exception as e:
logger.error(f"Failed to initialize Supabase client: {e}")
self.supabase: Client = None
def check_registry(self, article_ids: list) -> set:
"""Check which article IDs are already in the registry table.
Args:
article_ids: List of article ID strings to check.
Returns:
Set of IDs that already exist in registry (should be skipped).
"""
if not self.supabase or not article_ids:
return set()
try:
response = self.supabase.table("registry").select("id").in_("id", article_ids).execute()
return {item['id'] for item in response.data}
except Exception as e:
logger.error(f"Error checking registry: {str(e)}")
return set()
def insert_article(self, article_data: dict) -> bool:
"""Insert a processed article into both `articles` and `registry` tables.
Uses upsert to handle re-runs gracefully. The article must have an 'id' key.
Args:
article_data: Dict with keys matching the articles table schema:
id, category, title, author, url, content, summary,
audio_url, published_date, scraped_at, summary_generated_at
Returns:
True on success, False on failure or missing Supabase config.
"""
if not self.supabase:
return False
try:
article_id = article_data.get('id')
if not article_id:
return False
article_record = {
"id": article_id,
"category": article_data.get('category', ''),
"title": article_data.get('title', ''),
"author": article_data.get('author', ''),
"url": article_data.get('url', ''),
"content": article_data.get('content', ''),
"summary": article_data.get('summary', ''),
"audio_url": article_data.get('audio_url', ''),
"published_at": article_data.get('published_date'),
"scraped_at": article_data.get('scraped_at'),
"summary_generated_at": article_data.get('summary_generated_at')
}
registry_record = {
"id": article_id,
"category": article_data.get('category', ''),
"title": article_data.get('title', ''),
"status": "completed"
}
self.supabase.table("articles").upsert(article_record).execute()
self.supabase.table("registry").upsert(registry_record).execute()
logger.debug(f"Successfully saved article {article_id} to database.")
return True
except Exception as e:
logger.error(f"Error inserting article {article_data.get('id')}: {str(e)}")
return False