Medium-MCP / src /database.py
Nikhil Pravin Pise
feat: comprehensive migration - merge Scraper + MCP Server
ae588db
import sqlite_utils
from datetime import datetime
import os
import json
from typing import Dict, Optional, Any
DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "articles.db")
def get_db():
"""Returns a connection to the SQLite database."""
db = sqlite_utils.Database(DB_PATH)
# Create table if not exists
if "articles" not in db.table_names():
db["articles"].create({
"url": str,
"title": str,
"author": str,
"publication": str,
"markdown_content": str,
"json_state": str, # Raw Apollo/JSON-LD state
"html_content": str,
"last_scraped": str,
"source": str, # 'apollo', 'json-ld', 'html', 'archive', 'vision'
"embedding": str # JSON string of float list
}, pk="url")
# Enable Full Text Search
db["articles"].enable_fts(["title", "markdown_content", "author"])
return db
def save_article(data: Dict[str, Any]):
"""Saves or updates an article in the database."""
db = get_db()
# Prepare record
record = {
"url": data.get("url"),
"title": data.get("title"),
"author": data.get("author", {}).get("name") if isinstance(data.get("author"), dict) else data.get("author"),
"publication": data.get("publication"),
"markdown_content": data.get("markdownContent"),
"json_state": json.dumps(data.get("json_state", {})),
"html_content": data.get("html_debug", "")[:100000], # Truncate if too huge
"last_scraped": datetime.now().isoformat(),
"source": data.get("source", "unknown"),
"embedding": json.dumps(data.get("embedding")) if data.get("embedding") else None
}
# Validation: Don't save if it looks like an error page
content = record.get("markdown_content", "")
if "Apologies, but something went wrong" in content or "500" in content and len(content) < 500:
print(f"Refusing to save invalid article: {record['url']}")
return
# Upsert
db["articles"].upsert(record, pk="url")
def get_article(url: str) -> Optional[Dict[str, Any]]:
"""Retrieves an article from the database."""
db = get_db()
try:
record = db["articles"].get(url)
if record:
# Parse JSON state back
try:
record["json_state"] = json.loads(record["json_state"])
except:
record["json_state"] = {}
# Map back to expected format
return {
"url": record["url"],
"title": record["title"],
"author": {"name": record["author"]},
"publication": record["publication"],
"markdownContent": record["markdown_content"],
"json_state": record["json_state"],
"source": record["source"],
"cached": True
}
except sqlite_utils.db.NotFoundError:
return None
return None
def is_fresh(url: str, max_age_hours: int = 24) -> bool:
"""Checks if the cached article is fresh enough."""
article = get_article(url)
if not article:
return False
db = get_db()
record = db["articles"].get(url)
last_scraped = datetime.fromisoformat(record["last_scraped"])
age = (datetime.now() - last_scraped).total_seconds() / 3600
return age < max_age_hours
def search_similar(query_embedding: list, limit: int = 5) -> list:
"""
Searches for similar articles using cosine similarity.
Note: This is a brute-force implementation in Python.
"""
import numpy as np
db = get_db()
results = []
# Fetch all embeddings
# In a real production system, use a Vector DB
rows = db.query("SELECT url, title, embedding FROM articles WHERE embedding IS NOT NULL")
query_vec = np.array(query_embedding)
norm_query = np.linalg.norm(query_vec)
for row in rows:
try:
emb = json.loads(row["embedding"])
vec = np.array(emb)
norm_vec = np.linalg.norm(vec)
if norm_vec == 0 or norm_query == 0:
continue
similarity = np.dot(query_vec, vec) / (norm_query * norm_vec)
results.append({
"url": row["url"],
"title": row["title"],
"similarity": float(similarity)
})
except:
continue
# Sort by similarity desc
results.sort(key=lambda x: x["similarity"], reverse=True)
return results[:limit]