|
import feedparser |
|
import requests |
|
from typing import Dict, List, Any, Optional |
|
from datetime import datetime |
|
import time |
|
import hashlib |
|
import re |
|
from urllib.parse import urlparse |
|
|
|
from utils.logging import setup_logger |
|
from utils.error_handling import handle_exceptions, IntegrationError |
|
from utils.storage import load_data, save_data |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
class RSSFeedIntegration: |
|
"""RSS Feed integration for content aggregation""" |
|
|
|
def __init__(self): |
|
"""Initialize RSS Feed integration""" |
|
self.feeds = {} |
|
self.last_fetch = {} |
|
self.cached_entries = {} |
|
|
|
@handle_exceptions |
|
def add_feed(self, url: str, name: Optional[str] = None, category: str = "General") -> Dict[str, Any]: |
|
"""Add an RSS feed |
|
|
|
Args: |
|
url: Feed URL |
|
name: Feed name (optional, will be extracted from feed if not provided) |
|
category: Feed category (default: General) |
|
|
|
Returns: |
|
Feed information |
|
""" |
|
|
|
parsed_url = urlparse(url) |
|
if not parsed_url.scheme or not parsed_url.netloc: |
|
raise IntegrationError(f"Invalid feed URL: {url}") |
|
|
|
|
|
feed_id = self._generate_feed_id(url) |
|
if feed_id in self.feeds: |
|
return self.feeds[feed_id] |
|
|
|
|
|
try: |
|
feed_data = feedparser.parse(url) |
|
|
|
if feed_data.get("bozo", 0) == 1 and not feed_data.get("entries"): |
|
bozo_exception = feed_data.get("bozo_exception") |
|
error_msg = str(bozo_exception) if bozo_exception else "Unknown error" |
|
raise IntegrationError(f"Invalid feed: {error_msg}") |
|
|
|
|
|
feed_info = { |
|
"id": feed_id, |
|
"url": url, |
|
"name": name or feed_data.feed.get("title", url), |
|
"description": feed_data.feed.get("description", ""), |
|
"category": category, |
|
"last_updated": feed_data.feed.get("updated", ""), |
|
"added_at": datetime.now().isoformat(), |
|
"entry_count": len(feed_data.entries) |
|
} |
|
|
|
|
|
self.feeds[feed_id] = feed_info |
|
self.last_fetch[feed_id] = time.time() |
|
|
|
|
|
self._cache_entries(feed_id, feed_data.entries) |
|
|
|
return feed_info |
|
|
|
except Exception as e: |
|
if not isinstance(e, IntegrationError): |
|
logger.error(f"Failed to add feed {url}: {str(e)}") |
|
raise IntegrationError(f"Failed to add feed: {str(e)}") |
|
raise |
|
|
|
@handle_exceptions |
|
def remove_feed(self, feed_id: str) -> bool: |
|
"""Remove an RSS feed |
|
|
|
Args: |
|
feed_id: Feed ID |
|
|
|
Returns: |
|
True if successful, False otherwise |
|
""" |
|
if feed_id not in self.feeds: |
|
return False |
|
|
|
|
|
del self.feeds[feed_id] |
|
if feed_id in self.last_fetch: |
|
del self.last_fetch[feed_id] |
|
if feed_id in self.cached_entries: |
|
del self.cached_entries[feed_id] |
|
|
|
return True |
|
|
|
@handle_exceptions |
|
def get_feeds(self, category: Optional[str] = None) -> List[Dict[str, Any]]: |
|
"""Get all feeds or feeds in a specific category |
|
|
|
Args: |
|
category: Feed category (optional) |
|
|
|
Returns: |
|
List of feed information |
|
""" |
|
if category: |
|
return [feed for feed in self.feeds.values() if feed.get("category") == category] |
|
else: |
|
return list(self.feeds.values()) |
|
|
|
@handle_exceptions |
|
def update_feed(self, feed_id: str, name: Optional[str] = None, |
|
category: Optional[str] = None) -> Dict[str, Any]: |
|
"""Update feed information |
|
|
|
Args: |
|
feed_id: Feed ID |
|
name: New feed name (optional) |
|
category: New feed category (optional) |
|
|
|
Returns: |
|
Updated feed information |
|
""" |
|
if feed_id not in self.feeds: |
|
raise IntegrationError(f"Feed not found: {feed_id}") |
|
|
|
feed_info = self.feeds[feed_id] |
|
|
|
|
|
if name is not None: |
|
feed_info["name"] = name |
|
|
|
if category is not None: |
|
feed_info["category"] = category |
|
|
|
|
|
feed_info["updated_at"] = datetime.now().isoformat() |
|
|
|
|
|
self.feeds[feed_id] = feed_info |
|
|
|
return feed_info |
|
|
|
@handle_exceptions |
|
def fetch_feed_entries(self, feed_id: str, max_entries: int = 20, |
|
force_refresh: bool = False) -> List[Dict[str, Any]]: |
|
"""Fetch entries from a feed |
|
|
|
Args: |
|
feed_id: Feed ID |
|
max_entries: Maximum number of entries to fetch (default: 20) |
|
force_refresh: Force refresh even if cache is recent (default: False) |
|
|
|
Returns: |
|
List of feed entries |
|
""" |
|
if feed_id not in self.feeds: |
|
raise IntegrationError(f"Feed not found: {feed_id}") |
|
|
|
feed_info = self.feeds[feed_id] |
|
current_time = time.time() |
|
|
|
|
|
cache_age = current_time - self.last_fetch.get(feed_id, 0) |
|
if force_refresh or cache_age > 300: |
|
try: |
|
feed_data = feedparser.parse(feed_info["url"]) |
|
|
|
|
|
feed_info["last_updated"] = feed_data.feed.get("updated", "") |
|
feed_info["entry_count"] = len(feed_data.entries) |
|
self.feeds[feed_id] = feed_info |
|
|
|
|
|
self.last_fetch[feed_id] = current_time |
|
self._cache_entries(feed_id, feed_data.entries) |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to fetch feed {feed_info['url']}: {str(e)}") |
|
|
|
if feed_id not in self.cached_entries: |
|
raise IntegrationError(f"Failed to fetch feed: {str(e)}") |
|
|
|
|
|
entries = self.cached_entries.get(feed_id, []) |
|
return entries[:max_entries] |
|
|
|
@handle_exceptions |
|
def fetch_all_entries(self, max_entries_per_feed: int = 10, |
|
categories: Optional[List[str]] = None) -> Dict[str, List[Dict[str, Any]]]: |
|
"""Fetch entries from all feeds or feeds in specific categories |
|
|
|
Args: |
|
max_entries_per_feed: Maximum number of entries per feed (default: 10) |
|
categories: List of categories to include (optional) |
|
|
|
Returns: |
|
Dictionary mapping feed IDs to lists of entries |
|
""" |
|
result = {} |
|
|
|
|
|
feeds_to_fetch = self.feeds.values() |
|
if categories: |
|
feeds_to_fetch = [feed for feed in feeds_to_fetch if feed.get("category") in categories] |
|
|
|
|
|
for feed in feeds_to_fetch: |
|
try: |
|
entries = self.fetch_feed_entries(feed["id"], max_entries_per_feed) |
|
result[feed["id"]] = entries |
|
except Exception as e: |
|
logger.error(f"Failed to fetch entries for feed {feed['url']}: {str(e)}") |
|
result[feed["id"]] = [] |
|
|
|
return result |
|
|
|
@handle_exceptions |
|
def get_latest_entries(self, max_entries: int = 20, |
|
categories: Optional[List[str]] = None) -> List[Dict[str, Any]]: |
|
"""Get latest entries from all feeds or feeds in specific categories |
|
|
|
Args: |
|
max_entries: Maximum number of entries to return (default: 20) |
|
categories: List of categories to include (optional) |
|
|
|
Returns: |
|
List of latest entries |
|
""" |
|
|
|
all_entries = self.fetch_all_entries(max_entries, categories) |
|
|
|
|
|
entries = [] |
|
for feed_id, feed_entries in all_entries.items(): |
|
for entry in feed_entries: |
|
entry["feed_id"] = feed_id |
|
entry["feed_name"] = self.feeds[feed_id]["name"] |
|
entries.append(entry) |
|
|
|
|
|
entries.sort(key=lambda x: x.get("published_parsed", 0), reverse=True) |
|
|
|
return entries[:max_entries] |
|
|
|
@handle_exceptions |
|
def search_entries(self, query: str, max_results: int = 20, |
|
categories: Optional[List[str]] = None) -> List[Dict[str, Any]]: |
|
"""Search for entries matching a query |
|
|
|
Args: |
|
query: Search query |
|
max_results: Maximum number of results to return (default: 20) |
|
categories: List of categories to include (optional) |
|
|
|
Returns: |
|
List of matching entries |
|
""" |
|
|
|
all_entries = self.fetch_all_entries(50, categories) |
|
|
|
|
|
entries = [] |
|
for feed_id, feed_entries in all_entries.items(): |
|
for entry in feed_entries: |
|
entry["feed_id"] = feed_id |
|
entry["feed_name"] = self.feeds[feed_id]["name"] |
|
entries.append(entry) |
|
|
|
|
|
query = query.lower() |
|
matching_entries = [] |
|
|
|
for entry in entries: |
|
|
|
title = entry.get("title", "").lower() |
|
summary = entry.get("summary", "").lower() |
|
content = "" |
|
|
|
|
|
if "content" in entry: |
|
for content_item in entry["content"]: |
|
content += content_item.get("value", "").lower() |
|
|
|
|
|
if query in title or query in summary or query in content: |
|
matching_entries.append(entry) |
|
|
|
return matching_entries[:max_results] |
|
|
|
@handle_exceptions |
|
def get_feed_categories(self) -> List[str]: |
|
"""Get all feed categories |
|
|
|
Returns: |
|
List of categories |
|
""" |
|
categories = set(feed.get("category", "General") for feed in self.feeds.values()) |
|
return sorted(list(categories)) |
|
|
|
@handle_exceptions |
|
def export_opml(self) -> str: |
|
"""Export feeds as OPML |
|
|
|
Returns: |
|
OPML content as string |
|
""" |
|
opml = '<?xml version="1.0" encoding="UTF-8"?>\n' |
|
opml += '<opml version="2.0">\n' |
|
opml += ' <head>\n' |
|
opml += f' <title>MONA RSS Feeds Export</title>\n' |
|
opml += f' <dateCreated>{datetime.now().strftime("%a, %d %b %Y %H:%M:%S %z")}</dateCreated>\n' |
|
opml += ' </head>\n' |
|
opml += ' <body>\n' |
|
|
|
|
|
categories = {} |
|
for feed in self.feeds.values(): |
|
category = feed.get("category", "General") |
|
if category not in categories: |
|
categories[category] = [] |
|
categories[category].append(feed) |
|
|
|
|
|
for category, feeds in categories.items(): |
|
opml += f' <outline text="{category}" title="{category}">\n' |
|
|
|
for feed in feeds: |
|
title = feed.get("name", "").replace('"', '"') |
|
url = feed.get("url", "").replace('"', '"') |
|
description = feed.get("description", "").replace('"', '"') |
|
|
|
opml += f' <outline type="rss" text="{title}" title="{title}" xmlUrl="{url}" description="{description}" />\n' |
|
|
|
opml += ' </outline>\n' |
|
|
|
opml += ' </body>\n' |
|
opml += '</opml>' |
|
|
|
return opml |
|
|
|
@handle_exceptions |
|
def import_opml(self, opml_content: str) -> Dict[str, Any]: |
|
"""Import feeds from OPML |
|
|
|
Args: |
|
opml_content: OPML content as string |
|
|
|
Returns: |
|
Import results |
|
""" |
|
import xml.etree.ElementTree as ET |
|
|
|
try: |
|
|
|
root = ET.fromstring(opml_content) |
|
|
|
|
|
results = { |
|
"total": 0, |
|
"imported": 0, |
|
"failed": 0, |
|
"existing": 0, |
|
"feeds": [] |
|
} |
|
|
|
|
|
for outline in root.findall(".//outline"): |
|
|
|
outline_type = outline.get("type") |
|
|
|
if outline_type == "rss" or outline.get("xmlUrl"): |
|
|
|
results["total"] += 1 |
|
|
|
url = outline.get("xmlUrl") |
|
title = outline.get("title") or outline.get("text") |
|
|
|
|
|
category = "General" |
|
parent = outline.getparent() if hasattr(outline, "getparent") else None |
|
if parent is not None and parent.get("title"): |
|
category = parent.get("title") |
|
|
|
|
|
try: |
|
feed_id = self._generate_feed_id(url) |
|
|
|
if feed_id in self.feeds: |
|
results["existing"] += 1 |
|
results["feeds"].append({ |
|
"url": url, |
|
"title": title, |
|
"status": "existing" |
|
}) |
|
else: |
|
self.add_feed(url, title, category) |
|
results["imported"] += 1 |
|
results["feeds"].append({ |
|
"url": url, |
|
"title": title, |
|
"status": "imported" |
|
}) |
|
|
|
except Exception as e: |
|
results["failed"] += 1 |
|
results["feeds"].append({ |
|
"url": url, |
|
"title": title, |
|
"status": "failed", |
|
"error": str(e) |
|
}) |
|
|
|
return results |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to import OPML: {str(e)}") |
|
raise IntegrationError(f"Failed to import OPML: {str(e)}") |
|
|
|
def _generate_feed_id(self, url: str) -> str: |
|
"""Generate a unique ID for a feed URL |
|
|
|
Args: |
|
url: Feed URL |
|
|
|
Returns: |
|
Feed ID |
|
""" |
|
return hashlib.md5(url.encode()).hexdigest() |
|
|
|
def _cache_entries(self, feed_id: str, entries: List[Dict[str, Any]]) -> None: |
|
"""Cache feed entries |
|
|
|
Args: |
|
feed_id: Feed ID |
|
entries: List of feed entries |
|
""" |
|
|
|
processed_entries = [] |
|
|
|
for entry in entries: |
|
|
|
processed_entry = { |
|
"id": entry.get("id", ""), |
|
"title": entry.get("title", ""), |
|
"link": entry.get("link", ""), |
|
"summary": entry.get("summary", ""), |
|
"published": entry.get("published", ""), |
|
"published_parsed": entry.get("published_parsed"), |
|
"updated": entry.get("updated", ""), |
|
"updated_parsed": entry.get("updated_parsed"), |
|
"authors": entry.get("authors", []), |
|
"tags": entry.get("tags", []) |
|
} |
|
|
|
|
|
if "content" in entry: |
|
processed_entry["content"] = entry["content"] |
|
|
|
|
|
if processed_entry["summary"] and re.search(r"<[^>]+>", processed_entry["summary"]): |
|
|
|
processed_entry["summary_text"] = re.sub(r"<[^>]+>", "", processed_entry["summary"]) |
|
|
|
processed_entries.append(processed_entry) |
|
|
|
|
|
self.cached_entries[feed_id] = processed_entries |