Spaces:
Sleeping
Sleeping
| import sqlite_utils | |
| from datetime import datetime | |
| import os | |
| import json | |
| from typing import Dict, Optional, Any | |
| DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "articles.db") | |
| def get_db(): | |
| """Returns a connection to the SQLite database.""" | |
| db = sqlite_utils.Database(DB_PATH) | |
| # Create table if not exists | |
| if "articles" not in db.table_names(): | |
| db["articles"].create({ | |
| "url": str, | |
| "title": str, | |
| "author": str, | |
| "publication": str, | |
| "markdown_content": str, | |
| "json_state": str, # Raw Apollo/JSON-LD state | |
| "html_content": str, | |
| "last_scraped": str, | |
| "source": str, # 'apollo', 'json-ld', 'html', 'archive', 'vision' | |
| "embedding": str # JSON string of float list | |
| }, pk="url") | |
| # Enable Full Text Search | |
| db["articles"].enable_fts(["title", "markdown_content", "author"]) | |
| return db | |
| def save_article(data: Dict[str, Any]): | |
| """Saves or updates an article in the database.""" | |
| db = get_db() | |
| # Prepare record | |
| record = { | |
| "url": data.get("url"), | |
| "title": data.get("title"), | |
| "author": data.get("author", {}).get("name") if isinstance(data.get("author"), dict) else data.get("author"), | |
| "publication": data.get("publication"), | |
| "markdown_content": data.get("markdownContent"), | |
| "json_state": json.dumps(data.get("json_state", {})), | |
| "html_content": data.get("html_debug", "")[:100000], # Truncate if too huge | |
| "last_scraped": datetime.now().isoformat(), | |
| "source": data.get("source", "unknown"), | |
| "embedding": json.dumps(data.get("embedding")) if data.get("embedding") else None | |
| } | |
| # Validation: Don't save if it looks like an error page | |
| content = record.get("markdown_content", "") | |
| if "Apologies, but something went wrong" in content or "500" in content and len(content) < 500: | |
| print(f"Refusing to save invalid article: {record['url']}") | |
| return | |
| # Upsert | |
| db["articles"].upsert(record, pk="url") | |
| def get_article(url: str) -> Optional[Dict[str, Any]]: | |
| """Retrieves an article from the database.""" | |
| db = get_db() | |
| try: | |
| record = db["articles"].get(url) | |
| if record: | |
| # Parse JSON state back | |
| try: | |
| record["json_state"] = json.loads(record["json_state"]) | |
| except: | |
| record["json_state"] = {} | |
| # Map back to expected format | |
| return { | |
| "url": record["url"], | |
| "title": record["title"], | |
| "author": {"name": record["author"]}, | |
| "publication": record["publication"], | |
| "markdownContent": record["markdown_content"], | |
| "json_state": record["json_state"], | |
| "source": record["source"], | |
| "cached": True | |
| } | |
| except sqlite_utils.db.NotFoundError: | |
| return None | |
| return None | |
| def is_fresh(url: str, max_age_hours: int = 24) -> bool: | |
| """Checks if the cached article is fresh enough.""" | |
| article = get_article(url) | |
| if not article: | |
| return False | |
| db = get_db() | |
| record = db["articles"].get(url) | |
| last_scraped = datetime.fromisoformat(record["last_scraped"]) | |
| age = (datetime.now() - last_scraped).total_seconds() / 3600 | |
| return age < max_age_hours | |
| def search_similar(query_embedding: list, limit: int = 5) -> list: | |
| """ | |
| Searches for similar articles using cosine similarity. | |
| Note: This is a brute-force implementation in Python. | |
| """ | |
| import numpy as np | |
| db = get_db() | |
| results = [] | |
| # Fetch all embeddings | |
| # In a real production system, use a Vector DB | |
| rows = db.query("SELECT url, title, embedding FROM articles WHERE embedding IS NOT NULL") | |
| query_vec = np.array(query_embedding) | |
| norm_query = np.linalg.norm(query_vec) | |
| for row in rows: | |
| try: | |
| emb = json.loads(row["embedding"]) | |
| vec = np.array(emb) | |
| norm_vec = np.linalg.norm(vec) | |
| if norm_vec == 0 or norm_query == 0: | |
| continue | |
| similarity = np.dot(query_vec, vec) / (norm_query * norm_vec) | |
| results.append({ | |
| "url": row["url"], | |
| "title": row["title"], | |
| "similarity": float(similarity) | |
| }) | |
| except: | |
| continue | |
| # Sort by similarity desc | |
| results.sort(key=lambda x: x["similarity"], reverse=True) | |
| return results[:limit] | |