import datetime # Import datetime for dynamic date import json import logging import os from typing import Dict, List, Optional import redis import requests logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") # ----------------------------- # Helpers for clean environment # ----------------------------- def getenv_strip(name: str, default: Optional[str] = None) -> Optional[str]: """Get an env var and strip leading/trailing whitespace.""" v = os.environ.get(name, default) return v.strip() if isinstance(v, str) else v # ----------------------------------------- # ๐ŸŒ Configuration from Environment Vars # ----------------------------------------- REDIS_URL = getenv_strip("UPSTASH_REDIS_URL", "redis://localhost:6379") # Gupshup (your outbound provider) # Default path aligned with your recent logs; override via WHATSAPP_API_URL if needed. WHATSAPP_API_URL = getenv_strip("WHATSAPP_API_URL", "https://api.gupshup.io/sm/api/v1/msg") WHATSAPP_TOKEN = getenv_strip("WHATSAPP_TOKEN") WHATSAPP_TO_NUMBER = getenv_strip("WHATSAPP_TO_NUMBER", "353899495777") GUPSHUP_SOURCE_NUMBER = getenv_strip("GUPSHUP_SOURCE_NUMBER") GUPSHUP_APP_NAME = getenv_strip("GUPSHUP_APP_NAME") # โœ… Redis connection try: redis_client = redis.Redis.from_url(REDIS_URL, decode_responses=True) redis_client.ping() logging.info("Redis client connected successfully.") except Exception as exc: logging.error("โŒ Failed to connect to Redis: %s", exc) raise # --- Topic mapping for display names and emojis --- TOPIC_DISPLAY_MAP = { "india": {"name": "India", "emoji": "๐Ÿ‡ฎ๐Ÿ‡ณ"}, "world": {"name": "World", "emoji": "๐ŸŒ"}, "tech": {"name": "Technology", "emoji": "๐Ÿง "}, "finance": {"name": "Business & Markets", "emoji": "๐Ÿ’ฐ"}, "sports": {"name": "Sports", "emoji": "๐Ÿ†"}, } # ----------------------------- # Config sanity & builders # ----------------------------- def _missing_config(destination_number: Optional[str] = None) -> Optional[Dict[str, str]]: missing = [] if not WHATSAPP_TOKEN: missing.append("WHATSAPP_TOKEN") if not GUPSHUP_SOURCE_NUMBER: missing.append("GUPSHUP_SOURCE_NUMBER") if not GUPSHUP_APP_NAME: missing.append("GUPSHUP_APP_NAME") if missing: error_msg = ( "โŒ Missing one or more critical Gupshup WhatsApp API environment variables: " + ", ".join(missing) ) logging.error(error_msg) return {"status": "failed", "error": error_msg, "code": 500} if not destination_number: error_msg = "โŒ Destination number cannot be empty." logging.error(error_msg) return {"status": "failed", "error": error_msg, "code": 400} return None def _build_headers() -> Dict[str, str]: return { "Content-Type": "application/x-www-form-urlencoded", "apikey": WHATSAPP_TOKEN, "Cache-Control": "no-cache", "accept": "application/json", } def _base_payload(destination_number: str) -> Dict[str, object]: return { "channel": "whatsapp", "source": GUPSHUP_SOURCE_NUMBER, "destination": destination_number, "src.name": GUPSHUP_APP_NAME, "disablePreview": False, "encode": False, } def _post_to_gupshup(payload: Dict[str, object], action: str) -> Dict[str, object]: try: logging.info( "Attempting to send %s to %s via Gupshup. API URL: %s", action, payload.get("destination"), WHATSAPP_API_URL, ) response = requests.post( WHATSAPP_API_URL, headers=_build_headers(), data=payload, ) text = response.text if response.status_code >= 400: logging.error("Gupshup POST %s failed: %s | %s", WHATSAPP_API_URL, response.status_code, text) try: body = response.json() except ValueError: body = {"raw": text} return {"status": "failed", "details": body, "code": response.status_code} try: body = response.json() except ValueError: body = {"raw": text} return {"status": "success", "details": body} except requests.exceptions.RequestException as exc: logging.error("โŒ Failed to send %s: %s", action, exc) return { "status": "failed", "error": str(exc), "code": getattr(getattr(exc, "response", None), "status_code", 500), } except Exception as exc: # pragma: no cover - defensive guardrail logging.error("โŒ An unexpected error occurred during %s send: %s", action, exc) return {"status": "failed", "error": str(exc), "code": 500} # ----------------------------- # Business logic # ----------------------------- def fetch_cached_headlines(selected_topics: Optional[List[str]] = None) -> str: """Fetch and format the daily headlines into a WhatsApp-friendly message.""" try: raw = redis_client.get("daily_news_feed_cache") if not raw: logging.warning("โš ๏ธ No detailed news headlines found in cache.") return "โš ๏ธ No daily headlines found in cache." data = json.loads(raw) except Exception as exc: logging.error("โŒ Error reading from Redis: %s", exc) return f"โŒ Error reading from Redis: {exc}" today = datetime.date.today() formatted_date = today.strftime("%B %d, %Y") message_parts = [] message_parts.append(f"Nuse Daily - {formatted_date}") message_parts.append("") normalized_topics: List[str] = [] if selected_topics: seen = set() for topic in selected_topics: slug = (topic or "").strip().lower() if slug in TOPIC_DISPLAY_MAP and slug not in seen: normalized_topics.append(slug) seen.add(slug) topics_to_render = normalized_topics or list(TOPIC_DISPLAY_MAP.keys()) if selected_topics and not normalized_topics: message_parts.append( "(I couldnโ€™t match those preferences yet, so hereโ€™s the full digest.)" ) message_parts.append("") for topic_key in topics_to_render: display_info = TOPIC_DISPLAY_MAP.get(topic_key) if not display_info: continue stories_for_topic = data.get(topic_key) if not stories_for_topic: continue message_parts.append(f"{display_info['emoji']} *{display_info['name']}*") sorted_story_ids = sorted(stories_for_topic.keys(), key=int) for ref_id in sorted_story_ids: item = stories_for_topic[ref_id] summary = item.get("summary", "") description = item.get("explanation", "") message_parts.append(f"\u2007{ref_id}. \u2007{summary} - {description}") message_parts.append("") message_parts.append("") message_parts.append("Curated by Nuse.") return "\n".join(message_parts) def send_to_whatsapp(message_text: str, destination_number: str) -> Dict[str, object]: """Send a standard text message via Gupshup.""" config_error = _missing_config(destination_number) if config_error: return config_error payload = _base_payload(destination_number) payload["message"] = message_text return _post_to_gupshup(payload, "standard text WhatsApp message") def send_seen_receipt(destination_number: str, message_id: Optional[str]) -> Dict[str, object]: """Stub for mark-as-read; Meta integration disabled.""" logging.debug( "Seen receipt skipped for %s (Meta integration disabled). message_id=%s", destination_number, message_id, ) return {"status": "skipped", "reason": "meta_disabled"} def send_typing_indicator(destination_number: str, status: str = "typing") -> Dict[str, object]: logging.debug( "Typing indicator '%s' skipped for %s (Meta integration disabled).", status, destination_number, ) return {"status": "skipped", "reason": "meta_disabled"}