Spaces:
Running
Running
# main.py (Revised with background task connection fixes) | |
import os | |
import re | |
import logging | |
import asyncio | |
import json | |
import html | |
import contextlib | |
import traceback | |
from typing import Optional | |
# --- Frameworks --- | |
from flask import Flask, request, Response | |
from starlette.applications import Starlette | |
from starlette.routing import Mount | |
from starlette.middleware.wsgi import WSGIMiddleware | |
# --- Telegram Bot --- | |
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, Bot | |
from telegram.ext import ( | |
Application, | |
CommandHandler, | |
MessageHandler, | |
filters, | |
ContextTypes, | |
CallbackQueryHandler, | |
) | |
from telegram.constants import ParseMode | |
from telegram.error import NetworkError, RetryAfter, TimedOut, BadRequest | |
from telegram.request import HTTPXRequest | |
# --- Other Libraries --- | |
import httpx | |
from youtube_transcript_api import YouTubeTranscriptApi | |
import requests | |
from bs4 import BeautifulSoup | |
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type | |
_apify_token_exists = bool(os.environ.get('APIFY_API_TOKEN')) | |
if _apify_token_exists: | |
from apify_client import ApifyClient | |
else: | |
ApifyClient = None | |
# --- Logging Setup --- | |
logging.basicConfig( | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
level=logging.DEBUG | |
) | |
logging.getLogger("httpx").setLevel(logging.WARNING) | |
if ApifyClient: logging.getLogger("apify_client").setLevel(logging.WARNING) | |
logging.getLogger("telegram.ext").setLevel(logging.INFO) | |
logging.getLogger('telegram.bot').setLevel(logging.INFO) | |
logging.getLogger("urllib3").setLevel(logging.INFO) | |
logging.getLogger('gunicorn.error').setLevel(logging.INFO) | |
logging.getLogger('uvicorn').setLevel(logging.INFO) | |
logging.getLogger('starlette').setLevel(logging.INFO) | |
logger = logging.getLogger(__name__) | |
logger.info("Logging configured.") | |
# --- Global variable for PTB app --- | |
ptb_app: Optional[Application] = None | |
# --- Environment Variable Loading --- | |
logger.info("Attempting to load secrets...") | |
def get_secret(secret_name): | |
value = os.environ.get(secret_name) | |
if value: logger.info(f"Secret '{secret_name}': Found (Value length: {len(value)})") | |
else: logger.warning(f"Secret '{secret_name}': Not Found") | |
return value | |
TELEGRAM_TOKEN = get_secret('TELEGRAM_TOKEN') | |
OPENROUTER_API_KEY = get_secret('OPENROUTER_API_KEY') | |
URLTOTEXT_API_KEY = get_secret('URLTOTEXT_API_KEY') | |
SUPADATA_API_KEY = get_secret('SUPADATA_API_KEY') | |
APIFY_API_TOKEN = get_secret('APIFY_API_TOKEN') | |
logger.info("Secret loading attempt finished.") | |
# --- Retry Decorator for Bot Operations --- | |
def retry_bot_operation(func): | |
async def wrapper(*args, **kwargs): | |
try: | |
return await func(*args, **kwargs) | |
except Exception as e: | |
logger.error(f"Operation failed after retries: {e}") | |
raise | |
return wrapper | |
# --- Helper Functions (unchanged from your original) --- | |
def is_youtube_url(url): | |
"""Checks if the URL is a valid YouTube video or shorts URL.""" | |
youtube_regex = r'(https?://)?(www\.)?(youtube\.com/(watch\?v=|shorts/)|youtu\.be/)([\w-]{11})' | |
match = re.search(youtube_regex, url) | |
logger.debug(f"is_youtube_url check for '{url}': {'Match found' if match else 'No match'}") | |
return bool(match) | |
def extract_youtube_id(url): | |
"""Extracts the YouTube video ID from a URL.""" | |
youtube_id_regex = r'(?:youtube\.com/(?:watch\?v=|shorts/)|youtu\.be/)([\w-]{11})(?:\?|&|\s|$)' | |
match = re.search(youtube_id_regex, url) | |
if match: | |
video_id = match.group(1) | |
logger.debug(f"Extracted YouTube ID '{video_id}' from URL: {url}") | |
return video_id | |
else: | |
logger.warning(f"Could not extract YouTube ID from URL: {url}") | |
return None | |
# --- Content Fetching Functions (unchanged from your original) --- | |
# [Keep all your existing content fetching functions exactly as they were] | |
# get_transcript_via_supadata, get_transcript_via_apify, get_youtube_transcript, | |
# get_website_content_via_requests, get_website_content_via_urltotext_api, generate_summary | |
# --- Revised Background Task Processing --- | |
async def process_summary_task( | |
user_id: int, | |
chat_id: int, | |
message_id_to_edit: int, | |
url: str, | |
summary_type: str, | |
bot_token: str # Now receiving token instead of bot instance | |
) -> None: | |
"""Handles the actual fetching and summarization in a background task.""" | |
task_id = f"{user_id}-{message_id_to_edit}" | |
logger.info(f"[Task {task_id}] Starting processing for URL: {url}") | |
# Create a new bot instance for this task | |
bot = Bot(token=bot_token) | |
try: | |
# --- Inform User Processing Has Started --- | |
processing_message_text = f"⏳ Working on your '{summary_type}' summary for the link...\n_(This might take up to a minute depending on the content)_" | |
status_message_sent_id = None | |
async def edit_or_send_status(): | |
nonlocal status_message_sent_id, message_id_to_edit | |
try: | |
await bot.edit_message_text( | |
chat_id=chat_id, | |
message_id=message_id_to_edit, | |
text=processing_message_text | |
) | |
logger.debug(f"[Task {task_id}] Successfully edited message {message_id_to_edit}") | |
except (TimedOut, NetworkError, BadRequest) as e: | |
logger.warning(f"[Task {task_id}] Could not edit original message: {e}. Sending new status message.") | |
message_id_to_edit = None | |
status_message = await bot.send_message( | |
chat_id=chat_id, | |
text=processing_message_text | |
) | |
status_message_sent_id = status_message.message_id | |
logger.debug(f"[Task {task_id}] Sent new status message {status_message_sent_id}") | |
await edit_or_send_status() | |
# --- Main Content Fetching and Summarization --- | |
content = None | |
user_feedback_message = None | |
success = False | |
try: | |
# Send 'typing' action | |
async def send_typing(): | |
await bot.send_chat_action(chat_id=chat_id, action='typing') | |
await send_typing() | |
# --- Determine Content Type and Fetch --- | |
is_yt = is_youtube_url(url) | |
logger.debug(f"[Task {task_id}] URL is YouTube: {is_yt}") | |
if is_yt: | |
video_id = extract_youtube_id(url) | |
if video_id: | |
logger.info(f"[Task {task_id}] Fetching YouTube transcript for {video_id}") | |
content = await get_youtube_transcript( | |
video_id, | |
url, | |
SUPADATA_API_KEY, | |
APIFY_API_TOKEN | |
) | |
if not content: | |
user_feedback_message = "⚠️ Sorry, I couldn't retrieve the transcript for that YouTube video." | |
else: | |
logger.info(f"[Task {task_id}] Attempting website scrape for: {url}") | |
content = await get_website_content_via_requests(url) | |
if not content and URLTOTEXT_API_KEY: | |
await send_typing() | |
content = await get_website_content_via_urltotext_api(url, URLTOTEXT_API_KEY) | |
if not content: | |
user_feedback_message = "⚠️ Sorry, I couldn't fetch the content from that website." | |
# --- Generate Summary if Content Was Fetched --- | |
if content: | |
logger.info(f"[Task {task_id}] Generating '{summary_type}' summary") | |
await send_typing() | |
summary = await generate_summary(content, summary_type, OPENROUTER_API_KEY) | |
if summary.startswith("Error:") or summary.startswith("Sorry,"): | |
user_feedback_message = f"⚠️ {summary}" | |
else: | |
async def send_summary(): | |
await bot.send_message( | |
chat_id=chat_id, | |
text=summary, | |
parse_mode=ParseMode.MARKDOWN, | |
link_preview_options={'is_disabled': True} | |
) | |
await send_summary() | |
success = True | |
except Exception as e: | |
logger.error(f"[Task {task_id}] Error during processing: {e}", exc_info=True) | |
user_feedback_message = "❌ An unexpected error occurred while processing your request." | |
# --- Send Final Feedback Message if Processing Failed --- | |
if user_feedback_message and not success: | |
async def send_feedback(): | |
await bot.send_message(chat_id=chat_id, text=user_feedback_message) | |
await send_feedback() | |
except Exception as e: | |
logger.error(f"[Task {task_id}] Critical error in task: {e}", exc_info=True) | |
try: | |
await bot.send_message( | |
chat_id=chat_id, | |
text="❌ A critical error occurred. Please try again later." | |
) | |
except Exception: | |
pass | |
finally: | |
# --- Clean up Status Message(s) --- | |
try: | |
if status_message_sent_id: | |
await bot.delete_message(chat_id=chat_id, message_id=status_message_sent_id) | |
elif message_id_to_edit and success: | |
await bot.delete_message(chat_id=chat_id, message_id=message_id_to_edit) | |
elif message_id_to_edit and not success: | |
final_error_text = user_feedback_message or "❌ An error occurred." | |
await bot.edit_message_text( | |
chat_id=chat_id, | |
message_id=message_id_to_edit, | |
text=final_error_text[:4090] | |
) | |
except Exception as e: | |
logger.warning(f"[Task {task_id}] Cleanup error: {e}") | |
# Ensure bot session is closed | |
try: | |
await bot.session.close() | |
except Exception: | |
pass | |
logger.info(f"[Task {task_id}] Task completed. Success: {success}") | |
# --- Telegram Bot Handlers --- | |
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Handles the /start command.""" | |
user = update.effective_user | |
if not user: return | |
logger.info(f"User {user.id} initiated /start.") | |
mention = user.mention_html() if user.username else user.first_name | |
start_message = ( | |
f"👋 Hello {mention}!\n\n" | |
"I can summarise YouTube videos or web articles for you.\n\n" | |
"Just send me a link (URL) and I'll ask you whether you want the summary as a paragraph or bullet points.\n\n" | |
"Type /help for more details." | |
) | |
await update.message.reply_html(start_message) | |
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Handles the /help command.""" | |
user = update.effective_user | |
logger.info(f"User {user.id if user else '?'} requested /help.") | |
help_text = ( | |
"**How to Use Me:**\n" | |
"1. Send me a direct link (URL) to a YouTube video or a web article.\n" | |
"2. I will ask you to choose the summary format: `Paragraph` or `Points`.\n" | |
"3. Click the button for your preferred format.\n" | |
"4. I'll fetch the content, summarise it using AI, and send it back to you!\n\n" | |
"**Important Notes:**\n" | |
"- **YouTube:** Getting transcripts can sometimes fail if they are disabled or unavailable.\n" | |
"- **Websites:** Complex websites might not work perfectly.\n" | |
"- **AI Summaries:** The AI tries its best to be accurate.\n\n" | |
"Just send a link to get started!" | |
) | |
await update.message.reply_text(help_text, parse_mode=ParseMode.MARKDOWN) | |
async def handle_potential_url(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Handles messages containing potential URLs.""" | |
if not update.message or not update.message.text: return | |
message_text = update.message.text.strip() | |
user = update.effective_user | |
if not user: return | |
url_pattern = r'https?://(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,6}(?:/[^\s]*)?' | |
match = re.search(url_pattern, message_text) | |
if match: | |
url = match.group(0) | |
logger.info(f"User {user.id} sent URL: {url}") | |
context.user_data['url_to_summarize'] = url | |
keyboard = [ | |
[ | |
InlineKeyboardButton("📜 Paragraph Summary", callback_data="paragraph"), | |
InlineKeyboardButton("🔹 Bullet Points", callback_data="points") | |
] | |
] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
await update.message.reply_text( | |
f"✅ Link received:\n`{url}`\n\nChoose your desired summary format:", | |
reply_markup=reply_markup, | |
parse_mode=ParseMode.MARKDOWN, | |
link_preview_options={'is_disabled': True} | |
) | |
elif not message_text.startswith('/'): | |
await update.message.reply_text("Please send me a valid URL (starting with http:// or https://) to summarize.") | |
async def handle_summary_type_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Handles button presses for summary type selection.""" | |
query = update.callback_query | |
if not query or not query.from_user or not query.message: | |
try: | |
await query.answer() | |
except: | |
pass | |
return | |
await query.answer() # Acknowledge the button press immediately | |
user = query.from_user | |
summary_type = query.data | |
url = context.user_data.get('url_to_summarize') | |
query_id = query.id | |
logger.info(f"User {user.id} chose summary type '{summary_type}'") | |
if not url: | |
logger.warning(f"No URL found for user {user.id}") | |
try: | |
await query.edit_message_text(text="⚠️ Oops! I lost the context for that link. Please send the link again.") | |
except Exception as e: | |
logger.error(f"Failed to edit message: {e}") | |
return | |
# Clear the URL from context | |
context.user_data.pop('url_to_summarize', None) | |
# Schedule background task with token instead of bot instance | |
asyncio.create_task( | |
process_summary_task( | |
user_id=user.id, | |
chat_id=query.message.chat_id, | |
message_id_to_edit=query.message.message_id, | |
url=url, | |
summary_type=summary_type, | |
bot_token=TELEGRAM_TOKEN | |
), | |
name=f"SummaryTask-{user.id}-{query.message.message_id}" | |
) | |
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None: | |
"""Log Errors caused by Updates or background tasks.""" | |
if context.error: | |
logger.error(f"Exception while handling an update: {context.error}", exc_info=context.error) | |
# --- Bot Setup Function --- | |
async def setup_bot_config() -> Application: | |
"""Configures the PTB Application.""" | |
logger.info("Configuring Telegram Application...") | |
if not TELEGRAM_TOKEN: | |
raise ValueError("TELEGRAM_TOKEN environment variable not set.") | |
custom_request = HTTPXRequest( | |
connect_timeout=10.0, | |
read_timeout=30.0, | |
write_timeout=30.0, | |
pool_timeout=30.0, | |
http_version="1.1" | |
) | |
application = ( | |
Application.builder() | |
.token(TELEGRAM_TOKEN) | |
.request(custom_request) | |
.build() | |
) | |
application.add_handler(CommandHandler("start", start)) | |
application.add_handler(CommandHandler("help", help_command)) | |
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_potential_url)) | |
application.add_handler(CallbackQueryHandler(handle_summary_type_callback)) | |
application.add_error_handler(error_handler) | |
logger.info("Telegram application handlers configured.") | |
return application | |
# --- ASGI Lifespan Context Manager --- | |
async def lifespan(app: Starlette): | |
"""Handles PTB startup and shutdown during ASGI lifespan.""" | |
global ptb_app | |
logger.info("ASGI Lifespan: Startup sequence initiated...") | |
try: | |
ptb_app = await setup_bot_config() | |
await ptb_app.initialize() | |
await ptb_app.start() | |
bot_info = await ptb_app.bot.get_me() | |
logger.info(f"Bot started: @{bot_info.username}") | |
WEBHOOK_URL_BASE = os.environ.get("SPACE_HOST") | |
if WEBHOOK_URL_BASE: | |
if not WEBHOOK_URL_BASE.startswith("https://"): | |
WEBHOOK_URL_BASE = f"https://{WEBHOOK_URL_BASE}" | |
webhook_path = "/webhook" | |
full_webhook_url = f"{WEBHOOK_URL_BASE.rstrip('/')}{webhook_path}" | |
logger.info(f"Setting webhook to: {full_webhook_url}") | |
await asyncio.sleep(2.0) | |
try: | |
await ptb_app.bot.set_webhook( | |
url=full_webhook_url, | |
allowed_updates=Update.ALL_TYPES, | |
drop_pending_updates=True | |
) | |
webhook_info = await ptb_app.bot.get_webhook_info() | |
logger.info(f"Webhook set: {webhook_info}") | |
except Exception as e: | |
logger.error(f"Failed to set webhook: {e}") | |
logger.info("ASGI Lifespan: Startup complete.") | |
yield | |
except Exception as startup_err: | |
logger.critical(f"Startup error: {startup_err}", exc_info=True) | |
raise | |
finally: | |
logger.info("ASGI Lifespan: Shutdown sequence initiated...") | |
if ptb_app: | |
try: | |
await ptb_app.stop() | |
await ptb_app.shutdown() | |
logger.info("PTB Application shut down gracefully.") | |
except Exception as shutdown_err: | |
logger.error(f"Shutdown error: {shutdown_err}") | |
logger.info("ASGI Lifespan: Shutdown complete.") | |
# --- Flask App Setup --- | |
flask_core_app = Flask(__name__) | |
def index(): | |
"""Basic health check endpoint.""" | |
bot_status = "Unknown" | |
if ptb_app and ptb_app.bot: | |
bot_status = f"Running (@{ptb_app.bot.username})" | |
return f"Telegram Bot Summarizer - Status: {bot_status}" | |
async def webhook() -> Response: | |
"""Webhook endpoint called by Telegram.""" | |
if not ptb_app: | |
return Response('Bot not initialized', status=503) | |
try: | |
update_data = request.get_json() | |
if not update_data: | |
return Response('Bad Request', status=400) | |
update = Update.de_json(update_data, ptb_app.bot) | |
await ptb_app.process_update(update) | |
return Response('ok', status=200) | |
except Exception as e: | |
logger.error(f"Webhook error: {e}") | |
return Response('Internal Server Error', status=500) | |
# --- Create Starlette ASGI Application --- | |
app = Starlette( | |
debug=False, | |
lifespan=lifespan, | |
routes=[ | |
Mount("/", app=WSGIMiddleware(flask_core_app)) | |
] | |
) | |
logger.info("Starlette ASGI application created.") | |
# --- Development Server Execution Block --- | |
if __name__ == '__main__': | |
logger.warning("Running in development mode (Flask server only)") | |
local_port = int(os.environ.get('PORT', 8080)) | |
flask_core_app.run(host='0.0.0.0', port=local_port, debug=True, use_reloader=False) |