| import feedparser |
| import pytz |
| from datetime import datetime |
| from typing import Optional |
| from bot.logger import get_logger |
|
|
| logger = get_logger("monitor") |
|
|
| CHANNEL_ID = "UCzHMxST9eZUo0XIaSyaTizA" |
| RSS_URL = f"https://www.youtube.com/feeds/videos.xml?channel_id={CHANNEL_ID}" |
| MMT = pytz.timezone("Asia/Yangon") |
|
|
|
|
| def fetch_latest_videos(seen_ids: set) -> list[dict]: |
| """RSS Feed ကို parse လုပ်ပြီး အသစ်ဝင်လာသော video များကို return ပြန်သည်""" |
| try: |
| feed = feedparser.parse(RSS_URL) |
| new_videos = [] |
|
|
| for entry in feed.entries: |
| vid_id = entry.get("yt_videoid") or entry.get("id", "").split(":")[-1] |
| if not vid_id or vid_id in seen_ids: |
| continue |
|
|
| published_str = entry.get("published", "") |
| pub_dt = _parse_published(published_str) |
| if pub_dt is None: |
| continue |
|
|
| video = { |
| "video_id": vid_id, |
| "title": entry.get("title", "No Title"), |
| "url": f"https://www.youtube.com/watch?v={vid_id}", |
| "published_time": pub_dt, |
| "published_mmt": pub_dt.astimezone(MMT), |
| } |
| new_videos.append(video) |
| logger.info(f"New video detected: [{vid_id}] '{video['title']}' @ {video['published_mmt']}") |
|
|
| return new_videos |
|
|
| except Exception as e: |
| logger.error(f"RSS fetch error: {e}") |
| return [] |
|
|
|
|
| def _parse_published(s: str) -> Optional[datetime]: |
| """ISO 8601 string → UTC-aware datetime""" |
| if not s: |
| return None |
| try: |
| |
| import time |
| from email.utils import parsedate_to_datetime |
| try: |
| return parsedate_to_datetime(s) |
| except Exception: |
| pass |
| |
| import calendar |
| |
| dt = datetime.fromisoformat(s.replace("Z", "+00:00")) |
| return dt |
| except Exception as e: |
| logger.warning(f"Cannot parse date '{s}': {e}") |
| return None |
|
|