Spaces:
Running
Running
# --- Standard Libraries --- | |
import os | |
import re | |
import logging | |
import asyncio | |
import json | |
import html | |
import contextlib | |
import traceback | |
from typing import Optional, Dict, Any, Tuple | |
# --- Frameworks --- | |
from starlette.applications import Starlette | |
from starlette.routing import Route | |
from starlette.responses import PlainTextResponse, JSONResponse, Response | |
from starlette.requests import Request | |
# --- Telegram Bot --- | |
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, Bot | |
from telegram.ext import ( | |
Application, | |
CommandHandler, | |
MessageHandler, | |
filters, | |
ContextTypes, | |
CallbackQueryHandler, | |
) | |
from telegram.constants import ParseMode | |
from telegram.error import NetworkError, RetryAfter, TimedOut, BadRequest, TelegramError | |
from telegram.request import HTTPXRequest, BaseRequest | |
# --- Other Libraries --- | |
import httpx | |
from youtube_transcript_api import YouTubeTranscriptApi, TranscriptsDisabled, NoTranscriptFound | |
from bs4 import BeautifulSoup | |
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, before_sleep_log | |
try: | |
import lxml | |
DEFAULT_PARSER = 'lxml' | |
except ImportError: | |
DEFAULT_PARSER = 'html.parser' | |
# --- Crawl4AI (New Primary Web Scraper) --- | |
try: | |
from crawl4ai import AsyncWebCrawler | |
_crawl4ai_available = True | |
except ImportError: | |
AsyncWebCrawler = None | |
_crawl4ai_available = False | |
# logger defined later | |
# --- Google Gemini --- | |
try: | |
import google.generativeai as genai | |
# Import specific types needed, check library for exact names if errors occur | |
from google.generativeai.types import HarmCategory, HarmBlockThreshold, GenerateContentResponse | |
_gemini_available = True | |
except ImportError: | |
genai = None | |
HarmCategory = None | |
HarmBlockThreshold = None | |
GenerateContentResponse = None # Add this for type hinting if needed | |
_gemini_available = False | |
# logger defined later | |
# --- Logging Setup --- | |
logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO ) | |
logging.getLogger("httpx").setLevel(logging.WARNING) | |
logging.getLogger("telegram.ext").setLevel(logging.INFO) | |
logging.getLogger('telegram.bot').setLevel(logging.INFO) | |
logging.getLogger("urllib3").setLevel(logging.INFO) | |
logging.getLogger('gunicorn.error').setLevel(logging.INFO) | |
logging.getLogger('uvicorn').setLevel(logging.INFO) | |
logging.getLogger('starlette').setLevel(logging.INFO) | |
if _gemini_available: logging.getLogger("google.ai.generativelanguage").setLevel(logging.WARNING) | |
# Suppress noisy crawl4ai/playwright logs if needed | |
logging.getLogger("crawl4ai").setLevel(logging.INFO) # Adjust level as needed | |
logging.getLogger("playwright").setLevel(logging.WARNING) | |
logger = logging.getLogger(__name__) # Use __name__ | |
logger.info(f"Logging configured. Using BS4 parser: {DEFAULT_PARSER}") | |
if not _gemini_available: logger.warning("google-generativeai library not found. Gemini functionality disabled.") | |
if not _crawl4ai_available: logger.warning("crawl4ai library not found. Primary web scraping disabled.") | |
# --- Global variable for PTB app --- | |
ptb_app: Optional[Application] = None | |
# --- Environment Variable Loading & Configuration --- | |
logger.info("Attempting to load secrets and configuration...") | |
def get_secret(secret_name): | |
# (Function remains the same) | |
value = os.environ.get(secret_name) | |
if value: | |
status = "Found" | |
log_length = min(len(value), 8) | |
value_start = value[:log_length] | |
logger.info(f"Secret '{secret_name}': {status} (Value starts with: {value_start}...)") | |
else: | |
status = "Not Found" | |
logger.warning(f"Secret '{secret_name}': {status}") | |
return value | |
TELEGRAM_TOKEN = get_secret('TELEGRAM_TOKEN') | |
OPENROUTER_API_KEY = get_secret('OPENROUTER_API_KEY') # Fallback Summarizer | |
URLTOTEXT_API_KEY = get_secret('URLTOTEXT_API_KEY') # Fallback Web Scraper 2 | |
SUPADATA_API_KEY = get_secret('SUPADATA_API_KEY') # Fallback YT Transcript 1 | |
APIFY_API_TOKEN = get_secret('APIFY_API_TOKEN') # Fallback YT Transcript 2 | |
WEBHOOK_SECRET = get_secret('WEBHOOK_SECRET') | |
GEMINI_API_KEY = get_secret('GEMINI_API_KEY') # Primary Summarizer | |
# Models (User can still configure via env vars) | |
OPENROUTER_MODEL = os.environ.get("OPENROUTER_MODEL", "deepseek/deepseek-chat-v3-0324:free") # Fallback Model | |
APIFY_ACTOR_ID = os.environ.get("APIFY_ACTOR_ID", "karamelo~youtube-transcripts") | |
# *** Reverted Model Name as requested *** | |
GEMINI_MODEL = os.environ.get("GEMINI_MODEL", "gemini-2.0-flash-001") # Primary Model | |
# --- Configuration Checks --- | |
if not TELEGRAM_TOKEN: logger.critical("❌ FATAL: TELEGRAM_TOKEN not found."); raise RuntimeError("Exiting: Telegram token missing.") | |
if not GEMINI_API_KEY: logger.error("❌ ERROR: GEMINI_API_KEY not found. Primary summarization (Gemini) will fail.") | |
if not OPENROUTER_API_KEY: logger.warning("⚠️ WARNING: OPENROUTER_API_KEY not found. Fallback summarization will fail.") | |
_gemini_primary_enabled = _gemini_available and bool(GEMINI_API_KEY) | |
if not _gemini_available: logger.warning("⚠️ WARNING: google-generativeai library missing. Gemini disabled.") | |
elif not GEMINI_API_KEY: logger.warning("⚠️ WARNING: GEMINI_API_KEY not found or empty. Gemini disabled.") | |
_openrouter_fallback_enabled = bool(OPENROUTER_API_KEY) | |
if not _openrouter_fallback_enabled: logger.warning("⚠️ WARNING: OPENROUTER_API_KEY not found. Fallback disabled.") | |
_crawl4ai_primary_web_enabled = _crawl4ai_available | |
if not _crawl4ai_primary_web_enabled: logger.warning("⚠️ WARNING: crawl4ai library missing. Primary Web Scraper disabled.") | |
_urltotext_fallback_enabled = bool(URLTOTEXT_API_KEY) | |
if not _urltotext_fallback_enabled: logger.info("ℹ️ INFO: URLTOTEXT_API_KEY not found. Fallback Web Scraper 2 (API) disabled.") | |
else: logger.info("ℹ️ INFO: URLTOTEXT_API_KEY found. Fallback Web Scraper 2 (API) enabled.") | |
if not SUPADATA_API_KEY: logger.info("ℹ️ INFO: SUPADATA_API_KEY not found. Fallback YT Transcript 1 (API) disabled.") | |
if not APIFY_API_TOKEN: logger.info("ℹ️ INFO: APIFY_API_TOKEN not found. Fallback YT Transcript 2 (API) disabled.") | |
if not WEBHOOK_SECRET: logger.info("ℹ️ INFO: Optional secret 'WEBHOOK_SECRET' not found. Webhook security disabled.") | |
logger.info("Secret loading and configuration check finished.") | |
logger.info(f"Primary Web Scraper: {'Crawl4AI' if _crawl4ai_primary_web_enabled else 'DISABLED'}") | |
logger.info(f"Fallback Web Scraper 1: BeautifulSoup (Always available)") | |
logger.info(f"Fallback Web Scraper 2: urltotext.com API {'ENABLED' if _urltotext_fallback_enabled else 'DISABLED'}") | |
logger.info(f"Primary Summarizer: Gemini ({GEMINI_MODEL if _gemini_primary_enabled else 'DISABLED'})") | |
logger.info(f"Fallback Summarizer: OpenRouter ({OPENROUTER_MODEL if _openrouter_fallback_enabled else 'DISABLED'})") | |
logger.info(f"Primary YT Transcript: youtube-transcript-api (Always available)") | |
logger.info(f"Fallback YT Transcript 1: Supadata API {'ENABLED' if SUPADATA_API_KEY else 'DISABLED'}") | |
logger.info(f"Fallback YT Transcript 2: Apify REST API {'ENABLED' if APIFY_API_TOKEN else 'DISABLED'}") | |
_apify_token_exists = bool(APIFY_API_TOKEN) # Keep this for health check | |
if _gemini_primary_enabled: | |
try: | |
genai.configure(api_key=GEMINI_API_KEY) | |
logger.info("Google GenAI client configured successfully.") | |
except Exception as e: | |
logger.error(f"Failed to configure Google GenAI client: {e}") | |
_gemini_primary_enabled = False | |
# --- Constants --- | |
MAX_SUMMARY_CHUNK_SIZE = 4000 # Max characters per Telegram message (allow buffer) | |
# Adjust based on gemini-2.0-flash context window if known, 1M was for 1.5 Flash | |
# Let's use a more conservative estimate for 2.0 Flash, e.g., 128k tokens ~ 500k chars? Check Gemini docs. | |
# Sticking with a large number for now, but be aware this might need adjustment. | |
MAX_INPUT_TOKEN_APPROX = 500000 | |
# --- Retry Decorator --- | |
# (Remains the same) | |
async def retry_bot_operation(func, *args, **kwargs): | |
try: | |
return await func(*args, **kwargs) | |
except BadRequest as e: | |
ignore_errors = [ | |
"message is not modified", "query is too old", "message to edit not found", | |
"chat not found", "bot was blocked by the user", | |
] | |
if any(err in str(e).lower() for err in ignore_errors): | |
logger.warning(f"Ignoring non-critical BadRequest: {e}") | |
return None | |
# Only raise if it's not an ignored error | |
logger.error(f"Potentially critical BadRequest: {e}") | |
raise | |
except TelegramError as e: | |
# Log specific transient errors that might trigger retry | |
if isinstance(e, (TimedOut, NetworkError, RetryAfter)): | |
logger.warning(f"Telegram transient error (will retry): {e}") | |
else: | |
logger.error(f"Unhandled TelegramError: {e}") # Log other Telegram errors | |
raise # Reraise to allow tenacity to handle retry | |
except Exception as e: | |
logger.error(f"Unexpected error during bot operation: {e}", exc_info=True) | |
raise | |
# --- Helper Functions --- | |
# (is_youtube_url, extract_youtube_id remain the same) | |
def is_youtube_url(url): | |
youtube_regex = re.compile( r'(?:https?://)?(?:www.)?(?:m.)?(?:youtube(?:-nocookie)?.com|youtu.be)/' r'(?:watch?v=|embed/|v/|shorts/|live/|attribution_link?a=.&u=/watch?v=)?' r'([\w-]{11})' r'(?:\S+)?', re.IGNORECASE) | |
match = youtube_regex.search(url); logger.debug(f"is_youtube_url '{url}': {bool(match)}"); return bool(match) | |
def extract_youtube_id(url): | |
youtube_regex = re.compile( r'(?:https?://)?(?:www.)?(?:m.)?(?:youtube(?:-nocookie)?.com|youtu.be)/' r'(?:watch?v=|embed/|v/|shorts/|live/|attribution_link?a=.&u=/watch?v=)?' r'([\w-]{11})' r'(?:\S+)?', re.IGNORECASE) | |
match = youtube_regex.search(url) | |
if match: video_id = match.group(1); logger.debug(f"Extracted YT ID '{video_id}' from {url}"); return video_id | |
else: logger.warning(f"Could not extract YT ID from {url}"); return None | |
# --- Content Fetching Functions --- | |
# - YouTube Transcript Fetching (get_transcript_via_supadata, get_transcript_via_apify, get_youtube_transcript) - | |
# (No changes needed in these functions) | |
async def get_transcript_via_supadata(video_id: str, api_key: str) -> Optional[str]: | |
# ... (Keep existing implementation) ... | |
if not video_id: logger.error("[Supadata] No video_id provided"); return None | |
if not api_key: logger.error("[Supadata] API key missing."); return None | |
logger.info(f"[Supadata] Attempting fetch for video ID: {video_id}") | |
api_endpoint = "https://api.supadata.ai/v1/youtube/transcript" | |
params = {"videoId": video_id, "format": "text"}; headers = {"X-API-Key": api_key} | |
try: | |
async with httpx.AsyncClient(timeout=30.0) as client: | |
response = await client.get(api_endpoint, headers=headers, params=params) | |
logger.debug(f"[Supadata] Status code {response.status_code} for {video_id}") | |
if response.status_code == 200: | |
try: | |
try: data = response.json() | |
except json.JSONDecodeError: data = None | |
content = None | |
if data: content = data if isinstance(data, str) else data.get("transcript") or data.get("text") or data.get("data") | |
if not content and response.text: content = response.text | |
if content and isinstance(content, str): logger.info(f"[Supadata] Success for {video_id}. Length: {len(content)}"); return content.strip() | |
else: logger.warning(f"[Supadata] Success but content empty/invalid for {video_id}. Response: {response.text[:200]}"); return None | |
except Exception as e: logger.error(f"[Supadata] Error processing success response for {video_id}: {e}", exc_info=True); return None | |
elif response.status_code in [401, 403]: logger.error(f"[Supadata] Auth error ({response.status_code}). Check API key."); return None | |
elif response.status_code == 404: logger.warning(f"[Supadata] Not found (404) for {video_id}."); return None | |
else: logger.error(f"[Supadata] Unexpected status {response.status_code} for {video_id}. Resp: {response.text[:200]}"); return None | |
except httpx.TimeoutException: logger.error(f"[Supadata] Timeout connecting for {video_id}"); return None | |
except httpx.RequestError as e: | |
if "CERTIFICATE_VERIFY_FAILED" in str(e): logger.error(f"[Supadata] SSL Cert Verify Failed for {video_id}: {e}") | |
else: logger.error(f"[Supadata] Request error for {video_id}: {e}") | |
return None | |
except Exception as e: logger.error(f"[Supadata] Unexpected error for {video_id}: {e}", exc_info=True); return None | |
async def get_transcript_via_apify(video_url: str, api_token: str) -> Optional[str]: | |
# ... (Keep existing implementation) ... | |
global APIFY_ACTOR_ID | |
if not video_url: logger.error("[Apify SyncItems] No video_url provided"); return None | |
if not api_token: logger.error("[Apify SyncItems] API token missing."); return None | |
logger.info(f"[Apify SyncItems] Attempting fetch for URL: {video_url} (Actor: {APIFY_ACTOR_ID})") | |
sync_items_endpoint = f"https://api.apify.com/v2/acts/{APIFY_ACTOR_ID}/run-sync-get-dataset-items" | |
params = {"token": api_token} | |
payload = { "urls": [video_url], "outputFormat": "singleStringText", "maxRetries": 5, "channelHandleBoolean": False, "channelNameBoolean": False, "datePublishedBoolean": False, "relativeDateTextBoolean": False, } | |
headers = {"Content-Type": "application/json"} | |
try: | |
async with httpx.AsyncClient(timeout=120.0) as client: | |
log_headers = {k: v for k, v in headers.items()} | |
logger.debug(f"[Apify SyncItems] POST Request Details:\nURL: {sync_items_endpoint}\nParams: {params}\nHeaders: {log_headers}\nPayload: {json.dumps(payload)}") | |
response = await client.post(sync_items_endpoint, headers=headers, params=params, json=payload) | |
logger.debug(f"[Apify SyncItems] Received status code {response.status_code} for {video_url}") | |
if response.status_code == 200: | |
try: | |
results = response.json() | |
if isinstance(results, list) and len(results) > 0: | |
item = results[0] | |
content = None | |
# ... (Keep existing parsing logic) ... | |
if "captions" in item and isinstance(item["captions"], str): content = item["captions"] | |
elif "text" in item and isinstance(item["text"], str): content = item["text"] | |
elif "transcript" in item and isinstance(item["transcript"], str): content = item["transcript"] | |
elif "captions" in item and isinstance(item["captions"], list): | |
if len(item["captions"]) > 0 and isinstance(item["captions"][0], dict) and 'text' in item["captions"][0]: content = " ".join(line.get("text", "") for line in item["captions"] if line.get("text")) | |
elif len(item["captions"]) > 0 and isinstance(item["captions"][0], str): content = " ".join(item["captions"]) | |
if content and isinstance(content, str): logger.info(f"[Apify SyncItems] Success via REST for {video_url}. Length: {len(content)}"); return content.strip() | |
else: logger.warning(f"[Apify SyncItems] Dataset item parsed but transcript content empty/invalid format for {video_url}. Item keys: {list(item.keys())}"); return None | |
else: logger.warning(f"[Apify SyncItems] Actor success but dataset was empty for {video_url}. Response: {results}"); return None | |
except json.JSONDecodeError: logger.error(f"[Apify SyncItems] Failed JSON decode. Status:{response.status_code}. Resp:{response.text[:200]}"); return None | |
except Exception as e: logger.error(f"[Apify SyncItems] Error processing success response for {video_url}: {e}", exc_info=True); return None | |
elif response.status_code == 400: logger.error(f"[Apify SyncItems] Bad Request (400) for {video_url}. Check payload. Resp:{response.text[:200]}"); return None | |
elif response.status_code == 401: logger.error("[Apify SyncItems] Auth error (401). Check token."); return None | |
elif response.status_code == 404: | |
error_info = ""; | |
try: error_info = response.json().get("error", {}).get("message", "") | |
except Exception: pass | |
logger.error(f"[Apify SyncItems] Endpoint/Actor Not Found (404). Error: '{error_info}' Resp:{response.text[:200]}"); | |
return None | |
else: logger.error(f"[Apify SyncItems] Unexpected status {response.status_code} for {video_url}. Resp:{response.text[:200]}"); return None | |
except httpx.TimeoutException as e: logger.error(f"[Apify SyncItems] Timeout during API interaction for {video_url}: {e}"); return None | |
except httpx.HTTPStatusError as e: logger.error(f"[Apify SyncItems] HTTP Status Error during API interaction for {video_url}: {e}"); return None | |
except httpx.RequestError as e: logger.error(f"[Apify SyncItems] Request error during API interaction for {video_url}: {e}"); return None | |
except Exception as e: logger.error(f"[Apify SyncItems] Unexpected error during Apify SyncItems REST call for {video_url}: {e}", exc_info=True); return None | |
async def get_youtube_transcript(video_id: str, video_url: str) -> Optional[str]: | |
# ... (Keep existing implementation) ... | |
global SUPADATA_API_KEY, APIFY_API_TOKEN | |
if not video_id: logger.error("get_youtube_transcript: No video_id"); return None | |
logger.info(f"Fetching transcript for video ID: {video_id} (URL: {video_url})") | |
transcript_text = None | |
logger.info("[Primary YT] Attempting youtube-transcript-api...") | |
try: | |
# Run the blocking call in a separate thread | |
transcript_list = await asyncio.to_thread( | |
YouTubeTranscriptApi.list_transcripts(video_id).find_generated_transcript(['en', 'en-GB', 'en-US']).fetch | |
) | |
# transcript_list = await asyncio.to_thread( YouTubeTranscriptApi.get_transcript, video_id, languages=['en', 'en-GB', 'en-US'] ) # Old way | |
if transcript_list: transcript_text = " ".join([item['text'] for item in transcript_list if 'text' in item]) | |
if transcript_text: logger.info(f"[Primary YT] Success via lib for {video_id} (len: {len(transcript_text)})"); return transcript_text.strip() | |
else: logger.warning(f"[Primary YT] Transcript list/text empty for {video_id}"); transcript_text = None | |
except TranscriptsDisabled: | |
logger.warning(f"[Primary YT] Transcripts are disabled for video {video_id}") | |
transcript_text = None | |
except NoTranscriptFound: | |
logger.warning(f"[Primary YT] No English transcript found for video {video_id}") | |
transcript_text = None | |
except Exception as e: | |
logger.warning(f"[Primary YT] Error via lib for {video_id}: {e}") | |
# if isinstance(e, (NoTranscriptFound, TranscriptsDisabled)): logger.warning(f"[Primary YT] Known issue: {type(e).__name__}") # Handled above | |
transcript_text = None | |
if transcript_text is None: | |
logger.info("[Fallback YT 1] Trying Supadata API...") | |
if SUPADATA_API_KEY: | |
transcript_text = await get_transcript_via_supadata(video_id, SUPADATA_API_KEY) | |
if transcript_text: logger.info(f"[Fallback YT 1] Success via Supadata for {video_id}"); return transcript_text # Already stripped | |
else: logger.warning(f"[Fallback YT 1] Supadata failed or no content for {video_id}.") | |
else: logger.warning("[Fallback YT 1] Supadata API key unavailable. Skipping.") | |
if transcript_text is None: | |
logger.info("[Fallback YT 2] Trying Apify REST API (SyncItems)...") | |
if APIFY_API_TOKEN: | |
transcript_text = await get_transcript_via_apify(video_url, APIFY_API_TOKEN) | |
if transcript_text: logger.info(f"[Fallback YT 2] Success via Apify SyncItems REST for {video_url}"); return transcript_text # Already stripped | |
else: logger.warning(f"[Fallback YT 2] Apify SyncItems REST failed or no content for {video_url}.") | |
else: logger.warning("[Fallback YT 2] Apify API token unavailable. Skipping.") | |
if transcript_text is None: logger.error(f"All methods failed for YT transcript: {video_id}"); return None | |
# This return was missing, ensuring the final value is returned if fallbacks succeed | |
return transcript_text | |
# - Website Content Fetching - | |
# NEW: Primary Web Scraper using Crawl4AI | |
async def get_website_content_via_crawl4ai(url: str) -> Optional[str]: | |
"""Fetches website content using Crawl4AI (Primary Method).""" | |
global _crawl4ai_primary_web_enabled | |
if not _crawl4ai_primary_web_enabled: | |
logger.error("[Crawl4AI Primary] Called but library is not available.") | |
return None | |
if not url: | |
logger.error("[Crawl4AI Primary] No URL provided.") | |
return None | |
logger.info(f"[Crawl4AI Primary] Attempting to crawl URL: {url}") | |
# Define a writable cache directory (use /tmp in container environments) | |
# No longer creating dir here, relying on HOME=/tmp from Dockerfile | |
# cache_dir_path = "/tmp/.crawl4ai" | |
# try: | |
# os.makedirs(cache_dir_path, exist_ok=True) | |
# logger.info(f"[Crawl4AI Primary] Ensured cache directory exists: {cache_dir_path}") | |
# except OSError as e: | |
# logger.error(f"[Crawl4AI Primary] Failed to create cache directory {cache_dir_path}: {e}. Crawl may fail.") | |
# except Exception as e: | |
# logger.error(f"[Crawl4AI Primary] Unexpected error creating cache directory {cache_dir_path}: {e}") | |
try: | |
# Use AsyncWebCrawler context manager. | |
# Removed explicit cache_dir, relying on HOME=/tmp from Dockerfile | |
async with AsyncWebCrawler() as crawler: | |
logger.info(f"[Crawl4AI Primary] Initialized crawler (HOME should be /tmp).") | |
# Use arun for a single URL crawl | |
result = await crawler.arun(url=url, crawler_strategy="playwright", timeout=90) # 90 sec timeout | |
if result and result.markdown: | |
content = result.markdown.strip() | |
if content: | |
logger.info(f"[Crawl4AI Primary] Success crawling {url}. Markdown length: {len(content)}") | |
return content | |
else: | |
logger.warning(f"[Crawl4AI Primary] Crawl successful for {url}, but extracted markdown content is empty.") | |
# Try result.text as a fallback within crawl4ai success | |
if result.text: | |
content = result.text.strip() | |
if content: | |
logger.info(f"[Crawl4AI Primary] Using .text fallback after empty markdown. Length: {len(content)}") | |
return content | |
return None # Return None if both markdown and text are empty | |
elif result and result.text: # Fallback to raw text if markdown is somehow missing entirely | |
content = result.text.strip() | |
if content: | |
logger.info(f"[Crawl4AI Primary] Success crawling {url} (using .text, markdown missing). Length: {len(content)}") | |
return content | |
else: | |
logger.warning(f"[Crawl4AI Primary] Crawl successful for {url}, but extracted text content is empty (markdown missing).") | |
return None | |
else: | |
logger.warning(f"[Crawl4AI Primary] Crawl failed or returned no result/content for {url}.") | |
return None | |
except asyncio.TimeoutError: | |
logger.error(f"[Crawl4AI Primary] Timeout occurred while crawling {url}") | |
return None | |
except PermissionError as e: # Catch the specific error | |
# Log more detail if possible | |
logger.error(f"[Crawl4AI Primary] Permission denied during crawl for {url}. Target path: '{e.filename}'. Likely filesystem issue or HOME=/tmp not effective. Error: {e}", exc_info=True) | |
return None # Fail gracefully for this method | |
except Exception as e: | |
logger.error(f"[Crawl4AI Primary] Unexpected error during crawl for {url}: {e}", exc_info=True) | |
# Log specific crawl4ai errors if they become apparent | |
return None | |
# Fallback 1: Original BeautifulSoup Scraper (fetch_url_content_for_scrape + get_website_content_bs4) | |
async def fetch_url_content_for_scrape(url: str, timeout: int = 25) -> Optional[str]: | |
# ... (Keep existing implementation) ... | |
headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,/;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive', 'DNT': '1', 'Upgrade-Insecure-Requests': '1' } | |
try: | |
async with httpx.AsyncClient(follow_redirects=True, timeout=timeout, headers=headers) as client: | |
logger.debug(f"[Web Scrape BS4] Sending request to {url}") | |
response = await client.get(url) | |
logger.debug(f"[Web Scrape BS4] Received response {response.status_code} from {url}") | |
response.raise_for_status() | |
content_type = response.headers.get('content-type', '').lower() | |
if 'html' not in content_type: logger.warning(f"[Web Scrape BS4] Non-HTML content type from {url}: {content_type}"); return None | |
try: return response.text # Use response.text to let httpx handle decoding | |
except Exception as e: logger.error(f"[Web Scrape BS4] Error getting response text for {url}: {e}"); return None | |
except httpx.HTTPStatusError as e: logger.error(f"[Web Scrape BS4] HTTP error {e.response.status_code} fetching {url}: {e}") | |
except httpx.TimeoutException: logger.error(f"[Web Scrape BS4] Timeout error fetching {url}") | |
except httpx.TooManyRedirects: logger.error(f"[Web Scrape BS4] Too many redirects fetching {url}") | |
except httpx.RequestError as e: logger.error(f"[Web Scrape BS4] Request error fetching {url}: {e}") | |
except Exception as e: logger.error(f"[Web Scrape BS4] Unexpected error fetching {url}: {e}", exc_info=True) | |
return None | |
async def get_website_content_bs4(url: str) -> Optional[str]: | |
"""Fetches and parses website content using BeautifulSoup (Fallback 1).""" | |
# ... (Keep existing implementation) ... | |
if not url: logger.error("[BS4 Fallback] get_website_content_bs4: No URL"); return None | |
logger.info(f"[BS4 Fallback] Attempting basic fetch & parse for: {url}") | |
html_content = await fetch_url_content_for_scrape(url) | |
if not html_content: | |
logger.warning(f"[BS4 Fallback] Failed to fetch HTML for {url}") | |
return None | |
try: | |
# Inner function for parsing to use asyncio.to_thread | |
def parse_html(content): | |
soup = BeautifulSoup(content, DEFAULT_PARSER) | |
# Remove common non-content elements | |
for element in soup(["script", "style", "header", "footer", "nav", "aside", "form", "button", "input", "iframe", "img", "svg", "link", "meta", "noscript", "figure", "figcaption", "video", "audio"]): | |
element.extract() | |
# Try to find main content areas more broadly | |
selectors = ['main', 'article', '[role="main"]', '#content', '.content', '#main-content', '.main-content', '#body', '.body', '#article-body', '.article-body', '.post-content', '.entry-content'] # Added more common selectors | |
target_element = None | |
for selector in selectors: | |
try: | |
target_element = soup.select_one(selector) | |
if target_element: break | |
except Exception as sel_e: # Catch potential invalid CSS selectors from list | |
logger.warning(f"[BS4 Fallback] Invalid selector '{selector}': {sel_e}") | |
continue | |
if not target_element: target_element = soup.body # Fallback to body | |
if not target_element: logger.warning(f"[BS4 Fallback] Could not find body/main for parsing {url}"); return None | |
# Extract text, clean up whitespace aggressively | |
lines = [line.strip() for line in target_element.get_text(separator='\n', strip=True).splitlines() if line.strip()] | |
text = " ".join(lines) # Join lines with spaces | |
# Basic post-cleaning | |
text = re.sub(r'\s{2,}', ' ', text).strip() # Replace multiple spaces with single space | |
if not text: logger.warning(f"[BS4 Fallback] Extracted text is empty after cleaning for {url}"); return None | |
return text | |
text_content = await asyncio.to_thread(parse_html, html_content) | |
if text_content: | |
logger.info(f"[BS4 Fallback] Success scrape/parse for {url} (final len: {len(text_content)})") | |
return text_content | |
else: | |
logger.warning(f"[BS4 Fallback] Parsing resulted in empty content for {url}") | |
return None | |
except Exception as e: | |
logger.error(f"[BS4 Fallback] Error scraping/parsing {url}: {e}", exc_info=True) | |
return None | |
# Fallback 2: urltotext.com API | |
async def get_website_content_via_api(url: str, api_key: str) -> Optional[str]: | |
"""Fetches website content using urltotext.com API (Fallback 2).""" | |
# ... (Keep existing implementation) ... | |
if not url: logger.error("[API Fallback] No URL"); return None | |
if not api_key: logger.error("[API Fallback] urltotext.com API key missing."); return None | |
logger.info(f"[API Fallback] Attempting fetch for: {url} using urltotext.com API") | |
api_endpoint = "https://urltotext.com/api/v1/urltotext/" | |
payload = { "url": url, "output_format": "text", "extract_main_content": True, "render_javascript": True, "residential_proxy": False } | |
headers = { "Authorization": f"Token {api_key}", "Content-Type": "application/json" } | |
try: | |
async with httpx.AsyncClient(timeout=45.0) as client: | |
logger.debug(f"[API Fallback] Sending request to urltotext.com API for {url}") | |
response = await client.post(api_endpoint, headers=headers, json=payload) | |
logger.debug(f"[API Fallback] Received status {response.status_code} from urltotext.com API for {url}") | |
if response.status_code == 200: | |
try: | |
data = response.json() | |
content = data.get("data", {}).get("content"); credits = data.get("credits_used", "N/A"); warning = data.get("data", {}).get("warning") | |
if warning: logger.warning(f"[API Fallback] urltotext.com API Warning for {url}: {warning}") | |
if content: logger.info(f"[API Fallback] Success via urltotext.com API for {url}. Len: {len(content)}. Credits: {credits}"); return content.strip() | |
else: logger.warning(f"[API Fallback] urltotext.com API success but content empty for {url}. Resp: {data}"); return None | |
except json.JSONDecodeError: logger.error(f"[API Fallback] Failed JSON decode urltotext.com for {url}. Resp:{response.text[:500]}"); return None | |
except Exception as e: logger.error(f"[API Fallback] Error processing urltotext.com success response for {url}: {e}", exc_info=True); return None | |
elif response.status_code == 402: # Specifically handle insufficient credits | |
logger.error(f"[API Fallback] Error 402 (Insufficient Credits) from urltotext.com API for {url}. Resp:{response.text[:200]}"); return None | |
elif response.status_code in [400, 401, 403, 422, 500]: logger.error(f"[API Fallback] Error {response.status_code} from urltotext.com API for {url}. Resp:{response.text[:200]}"); return None | |
else: logger.error(f"[API Fallback] Unexpected status {response.status_code} from urltotext.com API for {url}. Resp:{response.text[:200]}"); return None | |
except httpx.TimeoutException: logger.error(f"[API Fallback] Timeout connecting to urltotext.com API for {url}"); return None | |
except httpx.RequestError as e: logger.error(f"[API Fallback] Request error connecting to urltotext.com API for {url}: {e}"); return None | |
except Exception as e: logger.error(f"[API Fallback] Unexpected error during urltotext.com API call for {url}: {e}", exc_info=True); return None | |
# --- Summarization Functions --- | |
async def _call_gemini(text: str, summary_type: str) -> Tuple[Optional[str], Optional[str]]: | |
""" Calls the Google Gemini API to generate a summary. """ | |
global GEMINI_MODEL, _gemini_primary_enabled | |
if not _gemini_primary_enabled: | |
logger.error("[Gemini Primary] Called but is disabled."); | |
return None, "Error: Primary AI service (Gemini) not configured/available." | |
# Truncate input text if it exceeds the approximate limit | |
if len(text) > MAX_INPUT_TOKEN_APPROX: | |
logger.warning(f"[Gemini Primary] Input text length ({len(text)}) exceeds limit ({MAX_INPUT_TOKEN_APPROX}). Truncating.") | |
text = text[:MAX_INPUT_TOKEN_APPROX] | |
logger.info(f"[Gemini Primary] Generating {summary_type} summary using {GEMINI_MODEL}. Input length: {len(text)}") | |
# Define prompts | |
if summary_type == "paragraph": | |
prompt = f"""Please summarise the following text into a concise paragraph. Focus on the main points and key information. Avoid unnecessary jargon or overly complex sentences. | |
Text to summarise: | |
--- | |
{text} | |
--- | |
Concise Paragraph Summary:""" | |
elif summary_type == "points": | |
prompt = f"""Please summarise the following text into a list of key bullet points. Each point should capture a distinct main idea or important piece of information. Aim for clarity and conciseness. | |
Text to summarise: | |
--- | |
{text} | |
--- | |
Key Bullet Points Summary:""" | |
else: | |
logger.error(f"[Gemini Primary] Invalid summary_type: {summary_type}") | |
return None, f"Error: Invalid summary type '{summary_type}' specified." | |
# Configure safety settings (adjust as needed) | |
safety_settings = { | |
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, | |
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, | |
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, | |
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE, | |
} | |
# Configure generation settings (optional) | |
generation_config = genai.types.GenerationConfig( | |
# candidate_count=1, # Default is 1 | |
# stop_sequences=["\n"], | |
max_output_tokens=2048, # Increased max tokens | |
temperature=0.7, # Adjust creativity vs factualness | |
# top_p=1.0, # Default | |
# top_k=None # Default | |
) | |
try: | |
model = genai.GenerativeModel(GEMINI_MODEL) | |
logger.debug(f"[Gemini Primary] Sending request to model {GEMINI_MODEL}") | |
response: GenerateContentResponse = await model.generate_content_async( # Use async version | |
prompt, | |
generation_config=generation_config, | |
safety_settings=safety_settings, | |
# request_options={"timeout": 100} # Optional: Add timeout for the API call itself | |
) | |
# Check for safety blocks or other issues in response | |
if not response.candidates: | |
block_reason = "Unknown" | |
if hasattr(response, 'prompt_feedback') and response.prompt_feedback: | |
block_reason = response.prompt_feedback.block_reason or "Reason not specified" | |
error_msg = f"Error: Gemini response blocked or empty. Reason: {block_reason}" | |
logger.error(f"[Gemini Primary] {error_msg}") | |
return None, error_msg | |
# *** FIXED FinishReason Check *** | |
# Access finish_reason from the first candidate | |
finish_reason_val = response.candidates[0].finish_reason | |
logger.debug(f"[Gemini Primary] Received response. Finish reason value: {finish_reason_val}") | |
# Convert the finish reason (might be enum/int/string) to uppercase string for reliable comparison | |
finish_reason_str = str(finish_reason_val).upper() | |
# Check if it's NOT a successful or expected non-error finish | |
# Expected "good" finishes: STOP, MAX_TOKENS | |
# Problematic finishes: SAFETY, RECITATION, OTHER, etc. | |
if finish_reason_str not in ["STOP", "MAX_TOKENS"]: | |
# Log safety ratings if available | |
safety_ratings_str = "N/A" | |
if hasattr(response.candidates[0], 'safety_ratings') and response.candidates[0].safety_ratings: | |
safety_ratings_str = ', '.join([f"{r.category.name}: {r.probability.name}" for r in response.candidates[0].safety_ratings]) | |
error_msg = f"Error: Gemini generation finished unexpectedly. Reason: {finish_reason_str}. Safety: {safety_ratings_str}" | |
logger.error(f"[Gemini Primary] {error_msg}") | |
# Decide how to handle specific non-STOP reasons | |
if finish_reason_str == "SAFETY": | |
return None, error_msg # Return specific error for safety blocks | |
# For RECITATION, OTHER, etc., it's safer to return an error than potentially broken output | |
return None, f"Error: Gemini generation finished unexpectedly ({finish_reason_str})." | |
# Extract text (use response.text shortcut if available and reliable) | |
summary_text = "" | |
try: | |
# response.text might combine parts automatically, safer if available | |
summary_text = response.text | |
except ValueError as e: | |
# Handle cases where .text might raise error (e.g., if blocked, but caught above ideally) | |
logger.warning(f"[Gemini Primary] Error accessing response.text: {e}. Trying parts manually.") | |
# Fallback to joining parts if .text fails (less common now) | |
if response.candidates and response.candidates[0].content and response.candidates[0].content.parts: | |
summary_text = "".join(part.text for part in response.candidates[0].content.parts) | |
if not summary_text or not summary_text.strip(): | |
logger.warning("[Gemini Primary] Gemini returned an empty summary.") | |
return None, "Error: AI generated an empty summary." | |
logger.info(f"[Gemini Primary] Summary generated successfully (len: {len(summary_text)}). Finish: {finish_reason_str}") | |
return summary_text.strip(), None | |
except AttributeError as e: | |
# Catch potential errors if response structure is different than expected | |
logger.error(f"[Gemini Primary] Attribute error accessing Gemini response: {e}. Response structure might have changed.", exc_info=True) | |
return None, f"Error: Failed to parse Gemini response. Details: {e}" | |
except Exception as e: | |
logger.error(f"[Gemini Primary] Error during API call to {GEMINI_MODEL}: {e}", exc_info=True) | |
# Check for specific Google API errors if needed | |
# from google.api_core import exceptions as google_exceptions | |
# if isinstance(e, google_exceptions.GoogleAPIError): ... | |
return None, f"Error: Failed to communicate with the primary AI service (Gemini). Details: {e}" | |
async def _call_openrouter(text: str, summary_type: str) -> Tuple[Optional[str], Optional[str]]: | |
""" Calls the OpenRouter API to generate a summary. """ | |
# ... (Keep existing implementation - no changes needed here based on logs) ... | |
global OPENROUTER_API_KEY, OPENROUTER_MODEL, _openrouter_fallback_enabled | |
if not _openrouter_fallback_enabled: | |
logger.error("[OpenRouter Fallback] Called but is disabled."); | |
return None, "Error: Fallback AI service (OpenRouter) not configured/available." | |
max_input_len_openrouter = 100000 # Adjust based on OPENROUTER_MODEL limits if known (Deepseek is large) | |
if len(text) > max_input_len_openrouter: | |
logger.warning(f"[OpenRouter Fallback] Input text length ({len(text)}) exceeds approx limit ({max_input_len_openrouter}) for {OPENROUTER_MODEL}. Truncating.") | |
text = text[:max_input_len_openrouter] | |
logger.info(f"[OpenRouter Fallback] Generating {summary_type} summary using {OPENROUTER_MODEL}. Input length: {len(text)}") | |
# Define prompts (similar structure to Gemini) | |
if summary_type == "paragraph": | |
prompt_content = f"""Please summarise the following text into a concise paragraph. Focus on the main points and key information. | |
Text: | |
--- | |
{text} | |
--- | |
Concise Paragraph Summary:""" | |
elif summary_type == "points": | |
prompt_content = f"""Please summarise the following text into a list of key bullet points. Each point should capture a distinct main idea. | |
Text: | |
--- | |
{text} | |
--- | |
Key Bullet Points Summary:""" | |
else: | |
logger.error(f"[OpenRouter Fallback] Invalid summary_type: {summary_type}") | |
return None, f"Error: Invalid summary type '{summary_type}' specified." | |
headers = { | |
"Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
"Content-Type": "application/json", | |
"HTTP-Referer": "https://github.com/fmab777/telegram-summary-bot", # Optional: Identify your app | |
"X-Title": "Telegram Summary Bot", # Optional: Identify your app | |
} | |
payload = { | |
"model": OPENROUTER_MODEL, | |
"messages": [ | |
{"role": "system", "content": "You are an expert summarizer. Provide summaries as requested."}, | |
{"role": "user", "content": prompt_content} | |
], | |
"max_tokens": 2048, # Adjust as needed | |
"temperature": 0.7, | |
} | |
api_url = "https://openrouter.ai/api/v1/chat/completions" | |
try: | |
async with httpx.AsyncClient(timeout=120.0) as client: # Longer timeout for potentially slower models | |
logger.debug(f"[OpenRouter Fallback] Sending request to {api_url} for model {OPENROUTER_MODEL}") | |
response = await client.post(api_url, headers=headers, json=payload) | |
logger.debug(f"[OpenRouter Fallback] Received status code {response.status_code}") | |
if response.status_code == 200: | |
try: | |
data = response.json() | |
if data.get("choices") and len(data["choices"]) > 0: | |
choice = data["choices"][0] | |
message = choice.get("message") | |
finish_reason = choice.get("finish_reason", "N/A") # Get finish reason | |
if message and message.get("content"): | |
summary_text = message["content"].strip() | |
if summary_text: | |
logger.info(f"[OpenRouter Fallback] Summary generated successfully (len: {len(summary_text)}). Finish: {finish_reason}") | |
# Check for length finish reason? | |
if finish_reason == 'length': | |
logger.warning("[OpenRouter Fallback] Summary may be truncated due to max_tokens limit.") | |
return summary_text, None | |
else: | |
logger.warning("[OpenRouter Fallback] OpenRouter returned an empty summary content.") | |
return None, "Error: Fallback AI generated an empty summary." | |
else: | |
logger.error(f"[OpenRouter Fallback] Invalid response structure (missing message/content). Data: {data}") | |
return None, "Error: Fallback AI returned an invalid response format." | |
else: | |
logger.error(f"[OpenRouter Fallback] Invalid response structure (missing choices). Data: {data}") | |
# Check for error object in response | |
api_error = data.get("error", {}).get("message", "Unknown API error") | |
return None, f"Error: Fallback AI response missing summary. API msg: {api_error}" | |
except json.JSONDecodeError: | |
logger.error(f"[OpenRouter Fallback] Failed to decode JSON response. Status: {response.status_code}, Text: {response.text[:500]}") | |
return None, "Error: Fallback AI sent an invalid JSON response." | |
except Exception as e: | |
logger.error(f"[OpenRouter Fallback] Error processing success response: {e}", exc_info=True) | |
return None, f"Error: Failed to process Fallback AI response. Details: {e}" | |
else: | |
# Handle API errors (rate limits, auth, etc.) | |
error_message = f"Error: Fallback AI service ({OPENROUTER_MODEL}) returned status {response.status_code}." | |
try: | |
error_details = response.json().get("error", {}).get("message", response.text[:200]) | |
error_message += f" Details: {error_details}" | |
except Exception: | |
error_message += f" Response: {response.text[:200]}" | |
logger.error(f"[OpenRouter Fallback] {error_message}") | |
return None, error_message | |
except httpx.TimeoutException: | |
logger.error(f"[OpenRouter Fallback] Timeout connecting to OpenRouter API for {OPENROUTER_MODEL}") | |
return None, "Error: Timed out connecting to the fallback AI service." | |
except httpx.RequestError as e: | |
logger.error(f"[OpenRouter Fallback] Request error connecting to OpenRouter API: {e}") | |
return None, f"Error: Network error connecting to the fallback AI service. Details: {e}" | |
except Exception as e: | |
logger.error(f"[OpenRouter Fallback] Unexpected error during OpenRouter API call: {e}", exc_info=True) | |
return None, f"Error: Unexpected issue with the fallback AI service. Details: {e}" | |
async def generate_summary(text: str, summary_type: str) -> str: | |
""" Generates a summary using the primary AI (Gemini) and falling back to OpenRouter. """ | |
# ... (Keep existing implementation - no changes needed here based on logs) ... | |
global _gemini_primary_enabled, _openrouter_fallback_enabled, GEMINI_MODEL, OPENROUTER_MODEL | |
logger.info(f"[Summary Generation] Starting process. Primary: Gemini ({GEMINI_MODEL}), Fallback: OpenRouter ({OPENROUTER_MODEL})") | |
final_summary: Optional[str] = None | |
error_message: Optional[str] = None # Accumulates errors | |
# --- Attempt Primary AI (Gemini) --- | |
if _gemini_primary_enabled: | |
logger.info(f"[Summary Generation] Attempting primary AI: Gemini ({GEMINI_MODEL})") | |
primary_summary, primary_error = await _call_gemini(text, summary_type) | |
if primary_summary: | |
logger.info(f"[Summary Generation] Success with primary AI (Gemini).") | |
return primary_summary # Return successful primary summary immediately | |
else: | |
logger.warning(f"[Summary Generation] Primary AI (Gemini) failed. Error: {primary_error}. Proceeding to fallback.") | |
error_message = f"Primary AI ({GEMINI_MODEL}) failed: {primary_error}" # Store primary error | |
else: | |
logger.warning("[Summary Generation] Primary AI (Gemini) disabled. Proceeding to fallback.") | |
error_message = "Primary AI (Gemini) unavailable." | |
# --- Attempt Fallback AI (OpenRouter) --- | |
if _openrouter_fallback_enabled: | |
logger.info(f"[Summary Generation] Attempting fallback AI: OpenRouter ({OPENROUTER_MODEL})") | |
fallback_summary, fallback_error = await _call_openrouter(text, summary_type) | |
if fallback_summary: | |
logger.info(f"[Summary Generation] Success with fallback AI (OpenRouter).") | |
return fallback_summary # Return successful fallback summary | |
else: | |
logger.error(f"[Summary Generation] Fallback AI (OpenRouter) also failed. Error: {fallback_error}") | |
# Combine errors for final message | |
if error_message: # If primary also failed | |
return f"{error_message}\nFallback AI ({OPENROUTER_MODEL}) also failed: {fallback_error}" | |
else: # Should not happen if logic is correct, but fallback just in case | |
return f"Fallback AI ({OPENROUTER_MODEL}) failed: {fallback_error}" | |
else: | |
logger.error("[Summary Generation] Fallback AI (OpenRouter) is disabled. Cannot proceed.") | |
if error_message: # Primary failed AND fallback disabled | |
return f"{error_message}\nFallback AI is also unavailable." | |
else: # Primary disabled AND fallback disabled | |
return "Error: Both primary and fallback AI services are unavailable." | |
# This part should ideally not be reached if the logic above is sound | |
logger.error("[Summary Generation] Reached end of function unexpectedly. No summary generated.") | |
final_error = error_message or "Unknown summary generation error." | |
return f"Sorry, an error occurred: {final_error}" | |
# --- Main Processing Task --- | |
async def process_summary_task( | |
user_id: int, | |
chat_id: int, | |
message_id_to_edit: Optional[int], | |
url: str, | |
summary_type: str, | |
bot_token: str | |
) -> None: | |
"""Handles fetching content, generating summary, and sending results.""" | |
# ... (Keep existing implementation - no changes needed here based on logs) ... | |
task_id = f"{user_id}-{message_id_to_edit or 'new'}" | |
logger.info(f"[Task {task_id}] Starting processing for URL: {url}") | |
background_request: Optional[BaseRequest] = None | |
bot: Optional[Bot] = None | |
content: Optional[str] = None | |
user_feedback_message: Optional[str] = None | |
success = False | |
status_message_id = message_id_to_edit # Keep track of the original button message ID | |
message_to_delete_later_id: Optional[int] = None # For temporary "Processing..." messages | |
try: | |
# Initialize background bot | |
background_request = HTTPXRequest(connect_timeout=15.0, read_timeout=60.0, write_timeout=60.0, pool_timeout=60.0) | |
bot = Bot(token=bot_token, request=background_request) | |
except Exception as e: | |
logger.critical(f"[Task {task_id}] Failed to create background bot: {e}", exc_info=True) | |
# Cannot send feedback without bot | |
return | |
try: | |
# Send/Edit "Processing..." message | |
# We *always* edit the original button message first | |
processing_message_text = f"Got it! Generating '{summary_type}' summary for:\n{html.escape(url)}\n\nThis might take a moment..." | |
edited_original_msg = False | |
if status_message_id: # If we have the ID of the message with buttons | |
try: | |
await retry_bot_operation( | |
bot.edit_message_text, | |
chat_id=chat_id, | |
message_id=status_message_id, | |
text=processing_message_text, | |
parse_mode=ParseMode.HTML, # Use HTML for escaped URL | |
reply_markup=None, # Remove buttons | |
link_preview_options={'is_disabled': True} # Disable preview here too | |
) | |
logger.debug(f"[Task {task_id}] Edited original button message {status_message_id} to 'Processing'") | |
edited_original_msg = True | |
except Exception as e: | |
logger.warning(f"[Task {task_id}] Could not edit original button message {status_message_id}: {e}. Will proceed without initial status update on that message.") | |
# Don't send a new message here, the final result/error will handle it. | |
# Keep status_message_id so we know the original message still exists. | |
# Indicate activity | |
try: await retry_bot_operation(bot.send_chat_action, chat_id=chat_id, action='typing') | |
except Exception: pass # Non-critical if this fails | |
# --- Determine URL Type and Fetch Content --- | |
is_youtube = is_youtube_url(url) | |
logger.debug(f"[Task {task_id}] URL type: {'YouTube' if is_youtube else 'Website'}") | |
if is_youtube: | |
video_id = extract_youtube_id(url) | |
if video_id: | |
content = await get_youtube_transcript(video_id, url) # Uses its own internal fallbacks | |
else: | |
user_feedback_message = "Sorry, I couldn't understand that YouTube URL format." | |
if not content and not user_feedback_message: | |
user_feedback_message = "Sorry, I couldn't get the transcript for that YouTube video using any available method (unavailable/private/no captions?)." | |
else: | |
# Website URL - New Fetching Logic | |
logger.info(f"[Task {task_id}] URL is website. Attempting Crawl4AI (Primary)...") | |
content = await get_website_content_via_crawl4ai(url) | |
if not content: | |
# Log the failure reason from Crawl4AI if available (already logged in the function) | |
logger.warning(f"[Task {task_id}] Crawl4AI failed for {url}. Attempting BeautifulSoup (Fallback 1)...") | |
try: await retry_bot_operation(bot.send_chat_action, chat_id=chat_id, action='typing') | |
except Exception: pass | |
content = await get_website_content_bs4(url) | |
if not content: | |
logger.warning(f"[Task {task_id}] BeautifulSoup also failed for {url}. Attempting API (Fallback 2)...") | |
global URLTOTEXT_API_KEY, _urltotext_fallback_enabled | |
if _urltotext_fallback_enabled: | |
try: await retry_bot_operation(bot.send_chat_action, chat_id=chat_id, action='typing') | |
except Exception: pass | |
content = await get_website_content_via_api(url, URLTOTEXT_API_KEY) | |
if not content: | |
logger.error(f"[Task {task_id}] API fallback (urltotext) also failed for {url}.") | |
user_feedback_message = "Sorry, I couldn't fetch content from that website using any method (Crawl4AI/BS4 failed, API failed or ran out of credits)." # Updated message | |
else: | |
logger.warning(f"[Task {task_id}] API fallback is disabled. Cannot attempt Fallback 2.") | |
user_feedback_message = "Sorry, I couldn't fetch content from that website using Crawl4AI or BeautifulSoup, and the API fallback is not enabled." # Updated message | |
# Final check if all web methods failed | |
if not content and not user_feedback_message: | |
logger.error(f"[Task {task_id}] All website fetching methods seem to have failed without setting a specific user message.") | |
user_feedback_message = "Sorry, I couldn't fetch content from that website using any available method (blocked/inaccessible/empty?)." | |
# --- Generate Summary if Content was Fetched --- | |
if content and not user_feedback_message: | |
logger.info(f"[Task {task_id}] Content fetched (len:{len(content)}). Generating summary type: '{summary_type}'.") | |
try: await retry_bot_operation(bot.send_chat_action, chat_id=chat_id, action='typing') | |
except Exception: pass | |
final_summary = await generate_summary(content, summary_type) # Calls Gemini -> OpenRouter | |
if final_summary.startswith("Error:") or final_summary.startswith("Sorry,"): | |
user_feedback_message = final_summary # Pass AI error message to user | |
logger.warning(f"[Task {task_id}] Summary generation failed: {final_summary}") | |
success = False # Explicitly mark as failure | |
else: | |
# Success - Split and Send the summary | |
summary_parts = [] | |
current_part = "" | |
# Split respecting newlines, ensure no part exceeds MAX_SUMMARY_CHUNK_SIZE | |
lines = final_summary.splitlines(keepends=True) | |
for line in lines: | |
if len(current_part) + len(line) > MAX_SUMMARY_CHUNK_SIZE: | |
if current_part.strip(): # Don't add empty parts | |
summary_parts.append(current_part.strip()) | |
current_part = line | |
if len(current_part) > MAX_SUMMARY_CHUNK_SIZE: | |
logger.warning(f"[Task {task_id}] Truncating overly long line in summary.") | |
current_part = current_part[:MAX_SUMMARY_CHUNK_SIZE] | |
else: | |
current_part += line | |
if current_part.strip(): summary_parts.append(current_part.strip()) | |
if not summary_parts: | |
summary_parts.append("Summary generated, but it appears to be empty.") | |
logger.warning(f"[Task {task_id}] Summary was non-empty initially but splitting resulted in zero parts.") | |
logger.info(f"[Task {task_id}] Summary generated (orig len: {len(final_summary)}). Sending in {len(summary_parts)} part(s).") | |
# Try to edit the original button message with the *first* part of the summary | |
edited_final_result = False | |
if status_message_id: # If we have the original button message ID | |
try: | |
await retry_bot_operation( | |
bot.edit_message_text, | |
chat_id=chat_id, | |
message_id=status_message_id, | |
text=summary_parts[0], | |
parse_mode=None, # Send as plain text initially, safer | |
link_preview_options={'is_disabled': True} | |
) | |
logger.debug(f"[Task {task_id}] Edited original button message {status_message_id} with first summary part.") | |
edited_final_result = True | |
except Exception as edit_err: | |
logger.warning(f"[Task {task_id}] Failed to edit original button message {status_message_id} with summary part 1: {edit_err}. Sending as new message.") | |
# Original message remains as "Processing..." or its initial state if first edit failed | |
# If editing failed, or if there was no original message_id (shouldn't happen), send first part as new message | |
if not edited_final_result: | |
sent_msg = await retry_bot_operation( | |
bot.send_message, | |
chat_id=chat_id, | |
text=summary_parts[0], | |
parse_mode=None, | |
link_preview_options={'is_disabled': True} | |
) | |
if not sent_msg: | |
logger.error(f"[Task {task_id}] Failed to send first summary part even as new message.") | |
user_feedback_message = "Sorry, failed to send the summary." | |
success = False | |
# Send remaining parts (if any and first part succeeded) | |
if success and len(summary_parts) > 1: # Check success flag before sending more parts | |
for i, part in enumerate(summary_parts[1:], start=2): | |
await asyncio.sleep(0.5) # Small delay between parts | |
try: | |
await retry_bot_operation( | |
bot.send_message, | |
chat_id=chat_id, | |
text=part, | |
parse_mode=None, | |
link_preview_options={'is_disabled': True} | |
) | |
logger.debug(f"[Task {task_id}] Sent summary part {i}/{len(summary_parts)}.") | |
except Exception as part_err: | |
logger.error(f"[Task {task_id}] Failed to send summary part {i}: {part_err}") | |
user_feedback_message = f"Sorry, failed to send part {i} of the summary." | |
success = False # Mark as failure | |
break # Stop sending remaining parts | |
# Determine overall success based on whether feedback message is set *during this block* | |
if not user_feedback_message: | |
success = True | |
# --- Handle Failure Cases (Content Fetching Failed or Summary Failed) --- | |
if not success: # Check overall success flag | |
if not user_feedback_message: # If success is false but no specific message, set a generic one | |
user_feedback_message = "Sorry, an unknown error occurred during processing." | |
logger.error(f"[Task {task_id}] Task failed but no specific user_feedback_message was set.") | |
logger.warning(f"[Task {task_id}] Sending failure/error feedback to user: {user_feedback_message}") | |
try: | |
# Try editing the original button message with the error | |
edited_final_error = False | |
if status_message_id: | |
try: | |
await retry_bot_operation( | |
bot.edit_message_text, | |
chat_id=chat_id, | |
message_id=status_message_id, | |
text=user_feedback_message, | |
link_preview_options={'is_disabled': True}, | |
reply_markup=None # Remove buttons | |
) | |
logger.debug(f"[Task {task_id}] Edited original button message {status_message_id} with failure feedback.") | |
edited_final_error = True | |
except Exception as edit_err: | |
logger.warning(f"[Task {task_id}] Failed to edit original button message {status_message_id} with failure feedback: {edit_err}. Sending as new message.") | |
# If editing failed or wasn't applicable, send error as a new message | |
if not edited_final_error: | |
await retry_bot_operation( | |
bot.send_message, | |
chat_id=chat_id, | |
text=user_feedback_message, | |
link_preview_options={'is_disabled': True} | |
) | |
logger.debug(f"[Task {task_id}] Sent failure feedback as new message.") | |
except Exception as send_err: | |
logger.error(f"[Task {task_id}] Failed even to send failure feedback message: {send_err}") | |
except Exception as e: | |
# Catch-all for unexpected errors during the main processing logic | |
logger.error(f"[Task {task_id}] Unexpected error during processing task: {e}", exc_info=True) | |
success = False | |
user_feedback_message = "Oops! Something went wrong while processing your request. Please try again later." | |
if bot: # Ensure bot exists before trying to send | |
try: | |
# Attempt to send a final error message (maybe edit original if possible?) | |
edited_final_crash_error = False | |
if status_message_id: | |
try: | |
await retry_bot_operation( bot.edit_message_text, chat_id=chat_id, message_id=status_message_id, text=user_feedback_message, reply_markup=None, link_preview_options={'is_disabled': True} ) | |
edited_final_crash_error = True | |
except Exception: pass # Ignore error editing here, just try sending new | |
if not edited_final_crash_error: | |
await retry_bot_operation( bot.send_message, chat_id=chat_id, text=user_feedback_message ) | |
except Exception as final_err: | |
logger.error(f"[Task {task_id}] Failed to send the final unexpected error feedback: {final_err}") | |
finally: | |
# --- Cleanup --- | |
# No automatic deletion needed now. The original message (status_message_id) is either | |
# edited with the result/error, or left as is if editing failed. Temporary messages | |
# were not introduced in this version of the logic. | |
# Close the background bot's HTTP client | |
if background_request and hasattr(background_request, '_client') and background_request._client: | |
try: | |
await background_request._client.aclose() | |
logger.debug(f"[Task {task_id}] Background bot's HTTPX client closed.") | |
except Exception as close_err: | |
logger.warning(f"[Task {task_id}] Error closing background bot's client: {close_err}") | |
logger.info(f"[Task {task_id}] Task completed. Success: {success}") | |
# --- Telegram Handlers --- | |
# (start, help_command, handle_potential_url, handle_summary_type_callback, error_handler) | |
# No changes needed in these handlers based on logs. | |
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
user = update.effective_user; mention = user.mention_html() | |
if not user or not update.message: return | |
logger.info(f"User {user.id} ({user.username or 'N/A'}) used /start.") | |
await update.message.reply_html( f"👋 Hello {mention}! I can summarise YouTube links or website URLs.\n\nJust send me a link anytime!" ) | |
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
user = update.effective_user | |
if not user or not update.message: return | |
logger.info(f"User {user.id} ({user.username or 'N/A'}) used /help.") | |
help_text = ( "🔍 **How to use:**\n\n" | |
"1. Send me any YouTube video link or website URL.\n" | |
"2. I'll ask how you want it summarised (paragraph or points).\n" | |
"3. Click the button for your choice.\n" | |
"4. Wait while I process it!\n\n" # Slightly rephrased | |
"⚙️ **Behind the scenes:**\n" | |
f"• **Websites:** I try `Crawl4AI` (smart crawl), then `BeautifulSoup` (basic scrape), and `urltotext.com` API (if configured & credits available).\n" | |
"• **YouTube:** I use `youtube-transcript-api` first, then fall back to `Supadata` and `Apify` APIs if needed.\n" | |
f"• **Summaries:** Generated using Google `{GEMINI_MODEL}` (primary) or `{OPENROUTER_MODEL}` (fallback, if configured).\n\n" | |
"**Commands:**\n" | |
"`/start` - Display welcome message\n" | |
"`/help` - Show this help message" ) | |
await update.message.reply_text(help_text, parse_mode=ParseMode.MARKDOWN) | |
async def handle_potential_url(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
if not update.message or not update.message.text: return | |
url = update.message.text.strip(); user = update.effective_user | |
if not user: return | |
# Basic URL validation | |
url_pattern = re.compile(r'https?://[^\s/$.?#].[^\s]*', re.IGNORECASE) # Slightly improved regex | |
if not url_pattern.match(url): | |
logger.debug(f"Ignoring non-URL from {user.id}: {url}") | |
await update.message.reply_text("Hmm, that doesn't look like a valid web URL. Please make sure it starts with `http://` or `https://` and includes a domain.", parse_mode=ParseMode.MARKDOWN) | |
return | |
logger.info(f"User {user.id} ({user.username or 'N/A'}) sent potential URL: {url}") | |
# Store URL and original message ID in user_data | |
context.user_data['url_to_summarize'] = url | |
context.user_data['original_message_id'] = update.message.message_id | |
keyboard = [[ InlineKeyboardButton("Paragraph Summary", callback_data="paragraph"), InlineKeyboardButton("Points Summary", callback_data="points") ]] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
escaped_url = html.escape(url) # Escape URL for HTML display | |
await update.message.reply_html( f"Okay, I see this link:\n<code>{escaped_url}</code>\n\nHow would you like it summarised?", reply_markup=reply_markup, disable_web_page_preview=True ) | |
async def handle_summary_type_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
query = update.callback_query | |
if not query or not query.message or not query.from_user: logger.warning("Callback query missing data."); return | |
user = query.from_user; summary_type = query.data; query_id = query.id | |
try: await query.answer(); logger.debug(f"Ack callback {query_id} from {user.id} ({user.username or 'N/A'})") | |
except Exception as e: logger.warning(f"Error answering callback {query_id}: {e}") # Log warning, don't stop | |
url = context.user_data.get('url_to_summarize') | |
message_id_to_edit = query.message.message_id # The message with the buttons | |
logger.info(f"User {user.id} ({user.username or 'N/A'}) chose '{summary_type}' for msg {message_id_to_edit}. URL in context: {'Yes' if url else 'No'}") | |
if not url: | |
logger.warning(f"No URL in context for user {user.id} (cb {query_id}). Expired?") | |
try: | |
await query.edit_message_text(text="Sorry, I seem to have lost the context for that link. 🤔 Please send the URL again.", reply_markup=None) | |
except BadRequest as e: | |
if "message is not modified" in str(e).lower(): pass # Ignore if text is the same | |
else: logger.error(f"Failed edit 'URL not found' msg: {e}") | |
except Exception as e: logger.error(f"Failed edit 'URL not found' msg: {e}") | |
return # Do not proceed further | |
# Check necessary configurations before scheduling | |
global TELEGRAM_TOKEN, _gemini_primary_enabled, _openrouter_fallback_enabled | |
if not TELEGRAM_TOKEN: | |
logger.critical("TG TOKEN missing! Cannot schedule task.") | |
try: await query.edit_message_text(text="❌ Bot configuration error (Token Missing). Cannot proceed.", reply_markup=None) | |
except Exception: pass | |
return | |
if not _gemini_primary_enabled and not _openrouter_fallback_enabled: | |
logger.critical("Neither Gemini nor OpenRouter API keys are configured/valid! Cannot summarize.") | |
try: await query.edit_message_text(text="❌ AI configuration error: No summarization models are available. Cannot proceed.", reply_markup=None) | |
except Exception: pass | |
return | |
elif not _gemini_primary_enabled: logger.warning("Primary AI (Gemini) is unavailable, will rely on fallback (OpenRouter).") | |
elif not _openrouter_fallback_enabled: logger.warning("Fallback AI (OpenRouter) is unavailable, relying on primary (Gemini).") | |
# Schedule the background task | |
logger.info(f"Scheduling task for user {user.id}, chat {query.message.chat_id}, msg {message_id_to_edit} (URL: {url})") | |
# Use asyncio.ensure_future for slightly better practice than create_task directly here | |
asyncio.ensure_future( | |
process_summary_task( | |
user_id=user.id, | |
chat_id=query.message.chat_id, | |
message_id_to_edit=message_id_to_edit, # Pass the ID of the message with buttons | |
url=url, | |
summary_type=summary_type, | |
bot_token=TELEGRAM_TOKEN # Pass token explicitly | |
) | |
# Consider adding .set_name(...) if using Python 3.8+ for better debugging | |
# .set_name(f"SummaryTask-{user.id}-{message_id_to_edit}") | |
) | |
# Clear context AFTER scheduling the task | |
context.user_data.pop('url_to_summarize', None) | |
context.user_data.pop('original_message_id', None) | |
logger.debug(f"Cleared URL context for user {user.id} after scheduling task.") | |
# Do not edit the message here - let the task handle its own status updates by editing this message | |
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None: | |
# ... (Keep existing implementation) ... | |
ignore_errors = (AttributeError, BadRequest, TimedOut, NetworkError, RetryAfter) | |
if isinstance(context.error, ignore_errors): | |
ignore_messages = ["message is not modified", "query is too old", "message to edit not found", "chat not found", "bot was blocked by the user"] | |
err_str = str(context.error).lower() | |
if any(msg in err_str for msg in ignore_messages) or isinstance(context.error, (TimedOut, NetworkError, RetryAfter)): | |
logger.warning(f"Ignoring known/handled/transient error in error_handler: {context.error}") | |
return | |
logger.error("Exception while handling an update:", exc_info=context.error) | |
# --- Application Setup --- | |
async def setup_bot_config() -> Application: | |
# ... (Keep existing implementation) ... | |
logger.info("Configuring Telegram Application..."); global TELEGRAM_TOKEN | |
if not TELEGRAM_TOKEN: raise ValueError("TELEGRAM_TOKEN missing.") | |
custom_request = HTTPXRequest( connect_timeout=10.0, read_timeout=30.0, write_timeout=30.0, pool_timeout=60.0 ) | |
application = ( Application.builder() .token(TELEGRAM_TOKEN) .request(custom_request) .build() ) | |
# Add Handlers | |
application.add_handler(CommandHandler("start", start)) | |
application.add_handler(CommandHandler("help", help_command)) | |
url_filter = filters.TEXT & ~filters.COMMAND & (filters.Entity("url") | filters.Entity("text_link") | filters.Regex(r'https?://[^\s]+')) | |
application.add_handler(MessageHandler(url_filter, handle_potential_url)) | |
application.add_handler(CallbackQueryHandler(handle_summary_type_callback)) | |
application.add_error_handler(error_handler) | |
logger.info("Telegram application handlers configured."); return application | |
# --- ASGI Lifespan & Routes --- | |
# (lifespan, telegram_webhook remain the same) | |
async def lifespan(app: Starlette): | |
global ptb_app, WEBHOOK_SECRET, TELEGRAM_TOKEN | |
logger.info("ASGI Lifespan: Startup initiated..."); | |
if not TELEGRAM_TOKEN: logger.critical("TG TOKEN missing."); raise RuntimeError("Telegram token missing.") | |
try: | |
ptb_app = await setup_bot_config() | |
await ptb_app.initialize() # This sets the _initialized flag | |
bot_info = await ptb_app.bot.get_me() | |
logger.info(f"Bot initialized: @{bot_info.username} (ID: {bot_info.id})") | |
# Webhook setup logic (remains the same) | |
current_webhook_info = await ptb_app.bot.get_webhook_info() | |
webhook_delete_success = True # Assume success unless proven otherwise | |
if current_webhook_info and current_webhook_info.url: | |
logger.info(f"Found existing webhook: {current_webhook_info.url}. Deleting...") | |
try: | |
if await ptb_app.bot.delete_webhook(drop_pending_updates=True): logger.info("Webhook deleted.") | |
else: logger.warning("Failed delete webhook (API returned False)."); webhook_delete_success = False | |
except Exception as e: logger.warning(f"Could not delete webhook: {e}"); await asyncio.sleep(1); webhook_delete_success = False | |
# Determine Webhook URL | |
space_host = os.environ.get("SPACE_HOST") | |
webhook_path = "/webhook" # Standard path | |
full_webhook_url = None | |
if space_host: | |
protocol = "https" | |
host = space_host.split('://')[-1] | |
full_webhook_url = f"{protocol}://{host.rstrip('/')}{webhook_path}" | |
logger.info(f"Using SPACE_HOST to determine webhook URL: {full_webhook_url}") | |
else: | |
logger.critical("Could not construct webhook URL (SPACE_HOST env var missing or invalid).") | |
raise RuntimeError("Webhook URL undetermined.") | |
# Set Webhook | |
if full_webhook_url and webhook_delete_success: | |
logger.info(f"Attempting to set webhook: {full_webhook_url}") | |
set_webhook_args = { | |
"url": full_webhook_url, | |
"allowed_updates": Update.ALL_TYPES, | |
"drop_pending_updates": True | |
} | |
if WEBHOOK_SECRET: | |
set_webhook_args["secret_token"] = WEBHOOK_SECRET | |
logger.info("Webhook secret token will be used.") | |
await asyncio.sleep(1.0) # Short delay before setting | |
try: | |
webhook_set = await ptb_app.bot.set_webhook(**set_webhook_args) | |
if not webhook_set: | |
raise RuntimeError("set_webhook API call returned False.") | |
await asyncio.sleep(1.5) # Longer delay after setting before verification | |
webhook_info = await ptb_app.bot.get_webhook_info() | |
if webhook_info and webhook_info.url == full_webhook_url: | |
logger.info(f"Webhook set and verified successfully: URL='{webhook_info.url}', Secret={'YES' if WEBHOOK_SECRET else 'NO'}") | |
if webhook_info.last_error_message: | |
logger.warning(f"Webhook status has last error: {webhook_info.last_error_message}") | |
else: | |
logger.error(f"Webhook verification failed! Expected '{full_webhook_url}', Got info: {webhook_info}") | |
raise RuntimeError("Webhook verification failed after setting.") | |
await ptb_app.start() # Start PTB processing updates AFTER successful webhook setup | |
logger.info("PTB Application started (webhook mode).") | |
except Exception as e: | |
logger.error(f"FATAL: Failed to set or verify webhook: {e}", exc_info=True) | |
raise RuntimeError(f"Failed to set/verify webhook: {e}") from e | |
elif not webhook_delete_success: | |
logger.error("FATAL: Failed to delete previous webhook. Cannot set new one.") | |
raise RuntimeError("Failed to delete previous webhook.") | |
# else: # Already handled by raising error if full_webhook_url is None | |
logger.info("ASGI Lifespan: Startup complete."); | |
yield # Application runs here | |
except Exception as startup_err: | |
logger.critical(f"Application startup failed: {startup_err}", exc_info=True) | |
# Ensure cleanup happens even if startup fails partially | |
if ptb_app: | |
try: | |
if ptb_app.running: await ptb_app.stop() | |
# Check if initialized before shutting down | |
if ptb_app._initialized: await ptb_app.shutdown() | |
except Exception as shutdown_err: | |
logger.error(f"Error during shutdown after startup failure: {shutdown_err}") | |
raise # Reraise the original startup error | |
finally: | |
# Shutdown sequence | |
logger.info("ASGI Lifespan: Shutdown initiated...") | |
if ptb_app: | |
try: | |
if ptb_app.running: | |
logger.info("Stopping PTB application...") | |
await ptb_app.stop() | |
# Check if initialized before shutting down | |
if ptb_app._initialized: | |
logger.info("Shutting down PTB application...") | |
await ptb_app.shutdown() | |
logger.info("PTB Application shut down successfully.") | |
else: | |
logger.info("PTB Application was not fully initialized, skipping shutdown.") | |
except Exception as e: | |
logger.error(f"Error during PTB shutdown: {e}", exc_info=True) | |
else: | |
logger.info("PTB application object not created.") | |
logger.info("ASGI Lifespan: Shutdown complete.") | |
async def health_check(request: Request) -> PlainTextResponse: | |
global OPENROUTER_MODEL, GEMINI_MODEL, APIFY_ACTOR_ID, _apify_token_exists, _gemini_primary_enabled, _openrouter_fallback_enabled, _crawl4ai_primary_web_enabled, _urltotext_fallback_enabled, SUPADATA_API_KEY | |
bot_status = "Not Initialized" | |
bot_username = "N/A" | |
# *** FIXED Attribute Name *** | |
if ptb_app and ptb_app.bot and ptb_app._initialized: # Check if initialized using _initialized | |
try: | |
# Quick check if webhook seems ok, more reliable than get_me() sometimes | |
wh_info = await ptb_app.bot.get_webhook_info() | |
# Check running status as well | |
if ptb_app.running and wh_info and wh_info.url: | |
bot_info = await ptb_app.bot.get_me() | |
bot_username = f"@{bot_info.username}" | |
bot_status = f"Running (Webhook OK, {bot_username})" | |
elif ptb_app.running: | |
bot_status = f"Running (Webhook Status: {wh_info.url if wh_info else 'N/A'}, Last Error: {wh_info.last_error_message if wh_info else 'N/A'})" | |
else: bot_status = "Initialized/Not running" | |
except Exception as e: | |
logger.error(f"Error checking bot status in health check: {e}", exc_info=True) | |
bot_status = f"Error checking status: {e}" | |
elif ptb_app: | |
bot_status = "Initializing..." # Or Initialized but not running yet | |
health_info = [ | |
f"=== Telegram Summary Bot Status ===", | |
f"Bot Application: {bot_status}", | |
"--- Services ---", | |
f"Primary Web Scraper: {'Crawl4AI' if _crawl4ai_primary_web_enabled else 'DISABLED (Lib Missing)'}", | |
f"Fallback Web Scraper 1: BeautifulSoup", | |
f"Fallback Web Scraper 2: {'urltotext.com API' if _urltotext_fallback_enabled else 'DISABLED (No Key)'}", | |
f"Primary Summarizer: {'Gemini (' + GEMINI_MODEL + ')' if _gemini_primary_enabled else 'DISABLED (No Key/Lib)'}", | |
f"Fallback Summarizer: {'OpenRouter (' + OPENROUTER_MODEL + ')' if _openrouter_fallback_enabled else 'DISABLED (No Key)'}", | |
f"Primary YT Transcript: youtube-transcript-api", | |
f"Fallback YT Transcript 1: {'Supadata API' if SUPADATA_API_KEY else 'DISABLED (No Key)'}", | |
f"Fallback YT Transcript 2: {'Apify (' + APIFY_ACTOR_ID + ')' if _apify_token_exists else 'DISABLED (No Key)'}" | |
] | |
return PlainTextResponse("\n".join(health_info)) | |
async def telegram_webhook(request: Request) -> Response: | |
# ... (Keep existing implementation) ... | |
global WEBHOOK_SECRET, ptb_app | |
if not ptb_app: | |
logger.error("Webhook received but PTB application not initialized.") | |
return PlainTextResponse('Bot not initialized', status_code=503) # Service Unavailable | |
if not ptb_app._initialized: # Check _initialized flag | |
logger.error("Webhook received but PTB application not running (not initialized).") | |
return PlainTextResponse('Bot not initialized', status_code=503) | |
if not ptb_app.running: | |
logger.warning("Webhook received but PTB application not running.") | |
return PlainTextResponse('Bot not running', status_code=503) # Service Unavailable | |
# Validate webhook secret if configured | |
if WEBHOOK_SECRET: | |
token_header = request.headers.get("X-Telegram-Bot-Api-Secret-Token") | |
if token_header != WEBHOOK_SECRET: | |
logger.warning(f"Webhook received with invalid secret token. Header: '{token_header}'") | |
return Response(content="Invalid secret token", status_code=403) # Forbidden | |
try: | |
# Process update | |
update_data = await request.json() | |
update = Update.de_json(data=update_data, bot=ptb_app.bot) | |
logger.debug(f"Processing update_id: {update.update_id} via webhook") | |
# Use process_update which handles tasks in the background | |
await ptb_app.process_update(update) | |
return Response(status_code=200) # OK - Acknowledge receipt quickly | |
except json.JSONDecodeError: | |
logger.error("Webhook received invalid JSON.") | |
return PlainTextResponse('Bad Request: Invalid JSON', status_code=400) | |
except Exception as e: | |
# Log error but return OK to Telegram to prevent retries for processing errors | |
logger.error(f"Error processing webhook update: {e}", exc_info=True) | |
return Response(status_code=200) # OK | |
# --- ASGI App Definition --- | |
# (Remains the same) | |
app = Starlette( | |
debug=False, # Set to True for more verbose errors during development if needed | |
lifespan=lifespan, | |
routes=[ | |
Route("/", endpoint=health_check, methods=["GET"]), | |
Route("/webhook", endpoint=telegram_webhook, methods=["POST"]), | |
] | |
) | |
logger.info("Starlette ASGI application created with native routes.") | |
# --- Development Runner --- | |
# (Remains the same) | |
if __name__ == '__main__': | |
import uvicorn | |
logger.warning("Running in development mode using Uvicorn directly - FOR LOCAL TESTING ONLY") | |
log_level = os.environ.get("LOGGING_LEVEL", "info").lower() | |
# Use the PORT env var for local running too, defaulting to 8080 | |
local_port = int(os.environ.get('PORT', 8080)) | |
# Make sure necessary env vars are loaded for local dev if not set system-wide | |
# Example using python-dotenv if you add it to requirements-dev.txt | |
try: | |
from dotenv import load_dotenv | |
load_dotenv() | |
logger.info("Loaded environment variables from .env file for local development.") | |
except ImportError: | |
logger.info(".env file not found or python-dotenv not installed, using system environment variables.") | |
# Re-check required tokens after potential .env load | |
if not get_secret('TELEGRAM_TOKEN'): logger.critical("Local Dev: TELEGRAM_TOKEN not found.") | |
if not get_secret('GEMINI_API_KEY'): logger.error("Local Dev: GEMINI_API_KEY not found.") | |
# Add checks for other keys as needed for local testing | |
uvicorn.run( | |
"main:app", | |
host='0.0.0.0', | |
port=local_port, | |
log_level=log_level, | |
reload=True # Enable auto-reload for development | |
) |