# main.py (Revised with background task connection fixes) import os import re import logging import asyncio import json import html import contextlib import traceback from typing import Optional # --- Frameworks --- from flask import Flask, request, Response from starlette.applications import Starlette from starlette.routing import Mount from starlette.middleware.wsgi import WSGIMiddleware # --- Telegram Bot --- from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, Bot from telegram.ext import ( Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler, ) from telegram.constants import ParseMode from telegram.error import NetworkError, RetryAfter, TimedOut, BadRequest from telegram.request import HTTPXRequest # --- Other Libraries --- import httpx from youtube_transcript_api import YouTubeTranscriptApi import requests from bs4 import BeautifulSoup from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type _apify_token_exists = bool(os.environ.get('APIFY_API_TOKEN')) if _apify_token_exists: from apify_client import ApifyClient else: ApifyClient = None # --- Logging Setup --- logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.DEBUG ) logging.getLogger("httpx").setLevel(logging.WARNING) if ApifyClient: logging.getLogger("apify_client").setLevel(logging.WARNING) logging.getLogger("telegram.ext").setLevel(logging.INFO) logging.getLogger('telegram.bot').setLevel(logging.INFO) logging.getLogger("urllib3").setLevel(logging.INFO) logging.getLogger('gunicorn.error').setLevel(logging.INFO) logging.getLogger('uvicorn').setLevel(logging.INFO) logging.getLogger('starlette').setLevel(logging.INFO) logger = logging.getLogger(__name__) logger.info("Logging configured.") # --- Global variable for PTB app --- ptb_app: Optional[Application] = None # --- Environment Variable Loading --- logger.info("Attempting to load secrets...") def get_secret(secret_name): value = os.environ.get(secret_name) if value: logger.info(f"Secret '{secret_name}': Found (Value length: {len(value)})") else: logger.warning(f"Secret '{secret_name}': Not Found") return value TELEGRAM_TOKEN = get_secret('TELEGRAM_TOKEN') OPENROUTER_API_KEY = get_secret('OPENROUTER_API_KEY') URLTOTEXT_API_KEY = get_secret('URLTOTEXT_API_KEY') SUPADATA_API_KEY = get_secret('SUPADATA_API_KEY') APIFY_API_TOKEN = get_secret('APIFY_API_TOKEN') logger.info("Secret loading attempt finished.") # --- Retry Decorator for Bot Operations --- def retry_bot_operation(func): @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10), retry=retry_if_exception_type((NetworkError, RuntimeError)), before_sleep=lambda retry_state: logger.warning( f"Retrying bot operation due to {retry_state.outcome.exception()}. " f"Attempt {retry_state.attempt_number}/3" ) ) async def wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except Exception as e: logger.error(f"Operation failed after retries: {e}") raise return wrapper # --- Helper Functions (unchanged from your original) --- def is_youtube_url(url): """Checks if the URL is a valid YouTube video or shorts URL.""" youtube_regex = r'(https?://)?(www\.)?(youtube\.com/(watch\?v=|shorts/)|youtu\.be/)([\w-]{11})' match = re.search(youtube_regex, url) logger.debug(f"is_youtube_url check for '{url}': {'Match found' if match else 'No match'}") return bool(match) def extract_youtube_id(url): """Extracts the YouTube video ID from a URL.""" youtube_id_regex = r'(?:youtube\.com/(?:watch\?v=|shorts/)|youtu\.be/)([\w-]{11})(?:\?|&|\s|$)' match = re.search(youtube_id_regex, url) if match: video_id = match.group(1) logger.debug(f"Extracted YouTube ID '{video_id}' from URL: {url}") return video_id else: logger.warning(f"Could not extract YouTube ID from URL: {url}") return None # --- Content Fetching Functions (unchanged from your original) --- # [Keep all your existing content fetching functions exactly as they were] # get_transcript_via_supadata, get_transcript_via_apify, get_youtube_transcript, # get_website_content_via_requests, get_website_content_via_urltotext_api, generate_summary # --- Revised Background Task Processing --- async def process_summary_task( user_id: int, chat_id: int, message_id_to_edit: int, url: str, summary_type: str, bot_token: str # Now receiving token instead of bot instance ) -> None: """Handles the actual fetching and summarization in a background task.""" task_id = f"{user_id}-{message_id_to_edit}" logger.info(f"[Task {task_id}] Starting processing for URL: {url}") # Create a new bot instance for this task bot = Bot(token=bot_token) try: # --- Inform User Processing Has Started --- processing_message_text = f"⏳ Working on your '{summary_type}' summary for the link...\n_(This might take up to a minute depending on the content)_" status_message_sent_id = None @retry_bot_operation async def edit_or_send_status(): nonlocal status_message_sent_id, message_id_to_edit try: await bot.edit_message_text( chat_id=chat_id, message_id=message_id_to_edit, text=processing_message_text ) logger.debug(f"[Task {task_id}] Successfully edited message {message_id_to_edit}") except (TimedOut, NetworkError, BadRequest) as e: logger.warning(f"[Task {task_id}] Could not edit original message: {e}. Sending new status message.") message_id_to_edit = None status_message = await bot.send_message( chat_id=chat_id, text=processing_message_text ) status_message_sent_id = status_message.message_id logger.debug(f"[Task {task_id}] Sent new status message {status_message_sent_id}") await edit_or_send_status() # --- Main Content Fetching and Summarization --- content = None user_feedback_message = None success = False try: # Send 'typing' action @retry_bot_operation async def send_typing(): await bot.send_chat_action(chat_id=chat_id, action='typing') await send_typing() # --- Determine Content Type and Fetch --- is_yt = is_youtube_url(url) logger.debug(f"[Task {task_id}] URL is YouTube: {is_yt}") if is_yt: video_id = extract_youtube_id(url) if video_id: logger.info(f"[Task {task_id}] Fetching YouTube transcript for {video_id}") content = await get_youtube_transcript( video_id, url, SUPADATA_API_KEY, APIFY_API_TOKEN ) if not content: user_feedback_message = "⚠️ Sorry, I couldn't retrieve the transcript for that YouTube video." else: logger.info(f"[Task {task_id}] Attempting website scrape for: {url}") content = await get_website_content_via_requests(url) if not content and URLTOTEXT_API_KEY: await send_typing() content = await get_website_content_via_urltotext_api(url, URLTOTEXT_API_KEY) if not content: user_feedback_message = "⚠️ Sorry, I couldn't fetch the content from that website." # --- Generate Summary if Content Was Fetched --- if content: logger.info(f"[Task {task_id}] Generating '{summary_type}' summary") await send_typing() summary = await generate_summary(content, summary_type, OPENROUTER_API_KEY) if summary.startswith("Error:") or summary.startswith("Sorry,"): user_feedback_message = f"⚠️ {summary}" else: @retry_bot_operation async def send_summary(): await bot.send_message( chat_id=chat_id, text=summary, parse_mode=ParseMode.MARKDOWN, link_preview_options={'is_disabled': True} ) await send_summary() success = True except Exception as e: logger.error(f"[Task {task_id}] Error during processing: {e}", exc_info=True) user_feedback_message = "❌ An unexpected error occurred while processing your request." # --- Send Final Feedback Message if Processing Failed --- if user_feedback_message and not success: @retry_bot_operation async def send_feedback(): await bot.send_message(chat_id=chat_id, text=user_feedback_message) await send_feedback() except Exception as e: logger.error(f"[Task {task_id}] Critical error in task: {e}", exc_info=True) try: await bot.send_message( chat_id=chat_id, text="❌ A critical error occurred. Please try again later." ) except Exception: pass finally: # --- Clean up Status Message(s) --- try: if status_message_sent_id: await bot.delete_message(chat_id=chat_id, message_id=status_message_sent_id) elif message_id_to_edit and success: await bot.delete_message(chat_id=chat_id, message_id=message_id_to_edit) elif message_id_to_edit and not success: final_error_text = user_feedback_message or "❌ An error occurred." await bot.edit_message_text( chat_id=chat_id, message_id=message_id_to_edit, text=final_error_text[:4090] ) except Exception as e: logger.warning(f"[Task {task_id}] Cleanup error: {e}") # Ensure bot session is closed try: await bot.session.close() except Exception: pass logger.info(f"[Task {task_id}] Task completed. Success: {success}") # --- Telegram Bot Handlers --- async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handles the /start command.""" user = update.effective_user if not user: return logger.info(f"User {user.id} initiated /start.") mention = user.mention_html() if user.username else user.first_name start_message = ( f"👋 Hello {mention}!\n\n" "I can summarise YouTube videos or web articles for you.\n\n" "Just send me a link (URL) and I'll ask you whether you want the summary as a paragraph or bullet points.\n\n" "Type /help for more details." ) await update.message.reply_html(start_message) async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handles the /help command.""" user = update.effective_user logger.info(f"User {user.id if user else '?'} requested /help.") help_text = ( "**How to Use Me:**\n" "1. Send me a direct link (URL) to a YouTube video or a web article.\n" "2. I will ask you to choose the summary format: `Paragraph` or `Points`.\n" "3. Click the button for your preferred format.\n" "4. I'll fetch the content, summarise it using AI, and send it back to you!\n\n" "**Important Notes:**\n" "- **YouTube:** Getting transcripts can sometimes fail if they are disabled or unavailable.\n" "- **Websites:** Complex websites might not work perfectly.\n" "- **AI Summaries:** The AI tries its best to be accurate.\n\n" "Just send a link to get started!" ) await update.message.reply_text(help_text, parse_mode=ParseMode.MARKDOWN) async def handle_potential_url(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handles messages containing potential URLs.""" if not update.message or not update.message.text: return message_text = update.message.text.strip() user = update.effective_user if not user: return url_pattern = r'https?://(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,6}(?:/[^\s]*)?' match = re.search(url_pattern, message_text) if match: url = match.group(0) logger.info(f"User {user.id} sent URL: {url}") context.user_data['url_to_summarize'] = url keyboard = [ [ InlineKeyboardButton("📜 Paragraph Summary", callback_data="paragraph"), InlineKeyboardButton("🔹 Bullet Points", callback_data="points") ] ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"✅ Link received:\n`{url}`\n\nChoose your desired summary format:", reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN, link_preview_options={'is_disabled': True} ) elif not message_text.startswith('/'): await update.message.reply_text("Please send me a valid URL (starting with http:// or https://) to summarize.") async def handle_summary_type_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handles button presses for summary type selection.""" query = update.callback_query if not query or not query.from_user or not query.message: try: await query.answer() except: pass return await query.answer() # Acknowledge the button press immediately user = query.from_user summary_type = query.data url = context.user_data.get('url_to_summarize') query_id = query.id logger.info(f"User {user.id} chose summary type '{summary_type}'") if not url: logger.warning(f"No URL found for user {user.id}") try: await query.edit_message_text(text="⚠️ Oops! I lost the context for that link. Please send the link again.") except Exception as e: logger.error(f"Failed to edit message: {e}") return # Clear the URL from context context.user_data.pop('url_to_summarize', None) # Schedule background task with token instead of bot instance asyncio.create_task( process_summary_task( user_id=user.id, chat_id=query.message.chat_id, message_id_to_edit=query.message.message_id, url=url, summary_type=summary_type, bot_token=TELEGRAM_TOKEN ), name=f"SummaryTask-{user.id}-{query.message.message_id}" ) async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None: """Log Errors caused by Updates or background tasks.""" if context.error: logger.error(f"Exception while handling an update: {context.error}", exc_info=context.error) # --- Bot Setup Function --- async def setup_bot_config() -> Application: """Configures the PTB Application.""" logger.info("Configuring Telegram Application...") if not TELEGRAM_TOKEN: raise ValueError("TELEGRAM_TOKEN environment variable not set.") custom_request = HTTPXRequest( connect_timeout=10.0, read_timeout=30.0, write_timeout=30.0, pool_timeout=30.0, http_version="1.1" ) application = ( Application.builder() .token(TELEGRAM_TOKEN) .request(custom_request) .build() ) application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("help", help_command)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_potential_url)) application.add_handler(CallbackQueryHandler(handle_summary_type_callback)) application.add_error_handler(error_handler) logger.info("Telegram application handlers configured.") return application # --- ASGI Lifespan Context Manager --- @contextlib.asynccontextmanager async def lifespan(app: Starlette): """Handles PTB startup and shutdown during ASGI lifespan.""" global ptb_app logger.info("ASGI Lifespan: Startup sequence initiated...") try: ptb_app = await setup_bot_config() await ptb_app.initialize() await ptb_app.start() bot_info = await ptb_app.bot.get_me() logger.info(f"Bot started: @{bot_info.username}") WEBHOOK_URL_BASE = os.environ.get("SPACE_HOST") if WEBHOOK_URL_BASE: if not WEBHOOK_URL_BASE.startswith("https://"): WEBHOOK_URL_BASE = f"https://{WEBHOOK_URL_BASE}" webhook_path = "/webhook" full_webhook_url = f"{WEBHOOK_URL_BASE.rstrip('/')}{webhook_path}" logger.info(f"Setting webhook to: {full_webhook_url}") await asyncio.sleep(2.0) try: await ptb_app.bot.set_webhook( url=full_webhook_url, allowed_updates=Update.ALL_TYPES, drop_pending_updates=True ) webhook_info = await ptb_app.bot.get_webhook_info() logger.info(f"Webhook set: {webhook_info}") except Exception as e: logger.error(f"Failed to set webhook: {e}") logger.info("ASGI Lifespan: Startup complete.") yield except Exception as startup_err: logger.critical(f"Startup error: {startup_err}", exc_info=True) raise finally: logger.info("ASGI Lifespan: Shutdown sequence initiated...") if ptb_app: try: await ptb_app.stop() await ptb_app.shutdown() logger.info("PTB Application shut down gracefully.") except Exception as shutdown_err: logger.error(f"Shutdown error: {shutdown_err}") logger.info("ASGI Lifespan: Shutdown complete.") # --- Flask App Setup --- flask_core_app = Flask(__name__) @flask_core_app.route('/') def index(): """Basic health check endpoint.""" bot_status = "Unknown" if ptb_app and ptb_app.bot: bot_status = f"Running (@{ptb_app.bot.username})" return f"Telegram Bot Summarizer - Status: {bot_status}" @flask_core_app.route('/webhook', methods=['POST']) async def webhook() -> Response: """Webhook endpoint called by Telegram.""" if not ptb_app: return Response('Bot not initialized', status=503) try: update_data = request.get_json() if not update_data: return Response('Bad Request', status=400) update = Update.de_json(update_data, ptb_app.bot) await ptb_app.process_update(update) return Response('ok', status=200) except Exception as e: logger.error(f"Webhook error: {e}") return Response('Internal Server Error', status=500) # --- Create Starlette ASGI Application --- app = Starlette( debug=False, lifespan=lifespan, routes=[ Mount("/", app=WSGIMiddleware(flask_core_app)) ] ) logger.info("Starlette ASGI application created.") # --- Development Server Execution Block --- if __name__ == '__main__': logger.warning("Running in development mode (Flask server only)") local_port = int(os.environ.get('PORT', 8080)) flask_core_app.run(host='0.0.0.0', port=local_port, debug=True, use_reloader=False)