fmab777's picture
Update main.py
cd72c3f verified
raw
history blame
20 kB
# main.py (Revised with background task connection fixes)
import os
import re
import logging
import asyncio
import json
import html
import contextlib
import traceback
from typing import Optional
# --- Frameworks ---
from flask import Flask, request, Response
from starlette.applications import Starlette
from starlette.routing import Mount
from starlette.middleware.wsgi import WSGIMiddleware
# --- Telegram Bot ---
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, Bot
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
CallbackQueryHandler,
)
from telegram.constants import ParseMode
from telegram.error import NetworkError, RetryAfter, TimedOut, BadRequest
from telegram.request import HTTPXRequest
# --- Other Libraries ---
import httpx
from youtube_transcript_api import YouTubeTranscriptApi
import requests
from bs4 import BeautifulSoup
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
_apify_token_exists = bool(os.environ.get('APIFY_API_TOKEN'))
if _apify_token_exists:
from apify_client import ApifyClient
else:
ApifyClient = None
# --- Logging Setup ---
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.DEBUG
)
logging.getLogger("httpx").setLevel(logging.WARNING)
if ApifyClient: logging.getLogger("apify_client").setLevel(logging.WARNING)
logging.getLogger("telegram.ext").setLevel(logging.INFO)
logging.getLogger('telegram.bot').setLevel(logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO)
logging.getLogger('gunicorn.error').setLevel(logging.INFO)
logging.getLogger('uvicorn').setLevel(logging.INFO)
logging.getLogger('starlette').setLevel(logging.INFO)
logger = logging.getLogger(__name__)
logger.info("Logging configured.")
# --- Global variable for PTB app ---
ptb_app: Optional[Application] = None
# --- Environment Variable Loading ---
logger.info("Attempting to load secrets...")
def get_secret(secret_name):
value = os.environ.get(secret_name)
if value: logger.info(f"Secret '{secret_name}': Found (Value length: {len(value)})")
else: logger.warning(f"Secret '{secret_name}': Not Found")
return value
TELEGRAM_TOKEN = get_secret('TELEGRAM_TOKEN')
OPENROUTER_API_KEY = get_secret('OPENROUTER_API_KEY')
URLTOTEXT_API_KEY = get_secret('URLTOTEXT_API_KEY')
SUPADATA_API_KEY = get_secret('SUPADATA_API_KEY')
APIFY_API_TOKEN = get_secret('APIFY_API_TOKEN')
logger.info("Secret loading attempt finished.")
# --- Retry Decorator for Bot Operations ---
def retry_bot_operation(func):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception_type((NetworkError, RuntimeError)),
before_sleep=lambda retry_state: logger.warning(
f"Retrying bot operation due to {retry_state.outcome.exception()}. "
f"Attempt {retry_state.attempt_number}/3"
)
)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"Operation failed after retries: {e}")
raise
return wrapper
# --- Helper Functions (unchanged from your original) ---
def is_youtube_url(url):
"""Checks if the URL is a valid YouTube video or shorts URL."""
youtube_regex = r'(https?://)?(www\.)?(youtube\.com/(watch\?v=|shorts/)|youtu\.be/)([\w-]{11})'
match = re.search(youtube_regex, url)
logger.debug(f"is_youtube_url check for '{url}': {'Match found' if match else 'No match'}")
return bool(match)
def extract_youtube_id(url):
"""Extracts the YouTube video ID from a URL."""
youtube_id_regex = r'(?:youtube\.com/(?:watch\?v=|shorts/)|youtu\.be/)([\w-]{11})(?:\?|&|\s|$)'
match = re.search(youtube_id_regex, url)
if match:
video_id = match.group(1)
logger.debug(f"Extracted YouTube ID '{video_id}' from URL: {url}")
return video_id
else:
logger.warning(f"Could not extract YouTube ID from URL: {url}")
return None
# --- Content Fetching Functions (unchanged from your original) ---
# [Keep all your existing content fetching functions exactly as they were]
# get_transcript_via_supadata, get_transcript_via_apify, get_youtube_transcript,
# get_website_content_via_requests, get_website_content_via_urltotext_api, generate_summary
# --- Revised Background Task Processing ---
async def process_summary_task(
user_id: int,
chat_id: int,
message_id_to_edit: int,
url: str,
summary_type: str,
bot_token: str # Now receiving token instead of bot instance
) -> None:
"""Handles the actual fetching and summarization in a background task."""
task_id = f"{user_id}-{message_id_to_edit}"
logger.info(f"[Task {task_id}] Starting processing for URL: {url}")
# Create a new bot instance for this task
bot = Bot(token=bot_token)
try:
# --- Inform User Processing Has Started ---
processing_message_text = f"⏳ Working on your '{summary_type}' summary for the link...\n_(This might take up to a minute depending on the content)_"
status_message_sent_id = None
@retry_bot_operation
async def edit_or_send_status():
nonlocal status_message_sent_id, message_id_to_edit
try:
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id_to_edit,
text=processing_message_text
)
logger.debug(f"[Task {task_id}] Successfully edited message {message_id_to_edit}")
except (TimedOut, NetworkError, BadRequest) as e:
logger.warning(f"[Task {task_id}] Could not edit original message: {e}. Sending new status message.")
message_id_to_edit = None
status_message = await bot.send_message(
chat_id=chat_id,
text=processing_message_text
)
status_message_sent_id = status_message.message_id
logger.debug(f"[Task {task_id}] Sent new status message {status_message_sent_id}")
await edit_or_send_status()
# --- Main Content Fetching and Summarization ---
content = None
user_feedback_message = None
success = False
try:
# Send 'typing' action
@retry_bot_operation
async def send_typing():
await bot.send_chat_action(chat_id=chat_id, action='typing')
await send_typing()
# --- Determine Content Type and Fetch ---
is_yt = is_youtube_url(url)
logger.debug(f"[Task {task_id}] URL is YouTube: {is_yt}")
if is_yt:
video_id = extract_youtube_id(url)
if video_id:
logger.info(f"[Task {task_id}] Fetching YouTube transcript for {video_id}")
content = await get_youtube_transcript(
video_id,
url,
SUPADATA_API_KEY,
APIFY_API_TOKEN
)
if not content:
user_feedback_message = "⚠️ Sorry, I couldn't retrieve the transcript for that YouTube video."
else:
logger.info(f"[Task {task_id}] Attempting website scrape for: {url}")
content = await get_website_content_via_requests(url)
if not content and URLTOTEXT_API_KEY:
await send_typing()
content = await get_website_content_via_urltotext_api(url, URLTOTEXT_API_KEY)
if not content:
user_feedback_message = "⚠️ Sorry, I couldn't fetch the content from that website."
# --- Generate Summary if Content Was Fetched ---
if content:
logger.info(f"[Task {task_id}] Generating '{summary_type}' summary")
await send_typing()
summary = await generate_summary(content, summary_type, OPENROUTER_API_KEY)
if summary.startswith("Error:") or summary.startswith("Sorry,"):
user_feedback_message = f"⚠️ {summary}"
else:
@retry_bot_operation
async def send_summary():
await bot.send_message(
chat_id=chat_id,
text=summary,
parse_mode=ParseMode.MARKDOWN,
link_preview_options={'is_disabled': True}
)
await send_summary()
success = True
except Exception as e:
logger.error(f"[Task {task_id}] Error during processing: {e}", exc_info=True)
user_feedback_message = "❌ An unexpected error occurred while processing your request."
# --- Send Final Feedback Message if Processing Failed ---
if user_feedback_message and not success:
@retry_bot_operation
async def send_feedback():
await bot.send_message(chat_id=chat_id, text=user_feedback_message)
await send_feedback()
except Exception as e:
logger.error(f"[Task {task_id}] Critical error in task: {e}", exc_info=True)
try:
await bot.send_message(
chat_id=chat_id,
text="❌ A critical error occurred. Please try again later."
)
except Exception:
pass
finally:
# --- Clean up Status Message(s) ---
try:
if status_message_sent_id:
await bot.delete_message(chat_id=chat_id, message_id=status_message_sent_id)
elif message_id_to_edit and success:
await bot.delete_message(chat_id=chat_id, message_id=message_id_to_edit)
elif message_id_to_edit and not success:
final_error_text = user_feedback_message or "❌ An error occurred."
await bot.edit_message_text(
chat_id=chat_id,
message_id=message_id_to_edit,
text=final_error_text[:4090]
)
except Exception as e:
logger.warning(f"[Task {task_id}] Cleanup error: {e}")
# Ensure bot session is closed
try:
await bot.session.close()
except Exception:
pass
logger.info(f"[Task {task_id}] Task completed. Success: {success}")
# --- Telegram Bot Handlers ---
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handles the /start command."""
user = update.effective_user
if not user: return
logger.info(f"User {user.id} initiated /start.")
mention = user.mention_html() if user.username else user.first_name
start_message = (
f"👋 Hello {mention}!\n\n"
"I can summarise YouTube videos or web articles for you.\n\n"
"Just send me a link (URL) and I'll ask you whether you want the summary as a paragraph or bullet points.\n\n"
"Type /help for more details."
)
await update.message.reply_html(start_message)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handles the /help command."""
user = update.effective_user
logger.info(f"User {user.id if user else '?'} requested /help.")
help_text = (
"**How to Use Me:**\n"
"1. Send me a direct link (URL) to a YouTube video or a web article.\n"
"2. I will ask you to choose the summary format: `Paragraph` or `Points`.\n"
"3. Click the button for your preferred format.\n"
"4. I'll fetch the content, summarise it using AI, and send it back to you!\n\n"
"**Important Notes:**\n"
"- **YouTube:** Getting transcripts can sometimes fail if they are disabled or unavailable.\n"
"- **Websites:** Complex websites might not work perfectly.\n"
"- **AI Summaries:** The AI tries its best to be accurate.\n\n"
"Just send a link to get started!"
)
await update.message.reply_text(help_text, parse_mode=ParseMode.MARKDOWN)
async def handle_potential_url(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handles messages containing potential URLs."""
if not update.message or not update.message.text: return
message_text = update.message.text.strip()
user = update.effective_user
if not user: return
url_pattern = r'https?://(?:[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]{2,6}(?:/[^\s]*)?'
match = re.search(url_pattern, message_text)
if match:
url = match.group(0)
logger.info(f"User {user.id} sent URL: {url}")
context.user_data['url_to_summarize'] = url
keyboard = [
[
InlineKeyboardButton("📜 Paragraph Summary", callback_data="paragraph"),
InlineKeyboardButton("🔹 Bullet Points", callback_data="points")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Link received:\n`{url}`\n\nChoose your desired summary format:",
reply_markup=reply_markup,
parse_mode=ParseMode.MARKDOWN,
link_preview_options={'is_disabled': True}
)
elif not message_text.startswith('/'):
await update.message.reply_text("Please send me a valid URL (starting with http:// or https://) to summarize.")
async def handle_summary_type_callback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handles button presses for summary type selection."""
query = update.callback_query
if not query or not query.from_user or not query.message:
try:
await query.answer()
except:
pass
return
await query.answer() # Acknowledge the button press immediately
user = query.from_user
summary_type = query.data
url = context.user_data.get('url_to_summarize')
query_id = query.id
logger.info(f"User {user.id} chose summary type '{summary_type}'")
if not url:
logger.warning(f"No URL found for user {user.id}")
try:
await query.edit_message_text(text="⚠️ Oops! I lost the context for that link. Please send the link again.")
except Exception as e:
logger.error(f"Failed to edit message: {e}")
return
# Clear the URL from context
context.user_data.pop('url_to_summarize', None)
# Schedule background task with token instead of bot instance
asyncio.create_task(
process_summary_task(
user_id=user.id,
chat_id=query.message.chat_id,
message_id_to_edit=query.message.message_id,
url=url,
summary_type=summary_type,
bot_token=TELEGRAM_TOKEN
),
name=f"SummaryTask-{user.id}-{query.message.message_id}"
)
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Log Errors caused by Updates or background tasks."""
if context.error:
logger.error(f"Exception while handling an update: {context.error}", exc_info=context.error)
# --- Bot Setup Function ---
async def setup_bot_config() -> Application:
"""Configures the PTB Application."""
logger.info("Configuring Telegram Application...")
if not TELEGRAM_TOKEN:
raise ValueError("TELEGRAM_TOKEN environment variable not set.")
custom_request = HTTPXRequest(
connect_timeout=10.0,
read_timeout=30.0,
write_timeout=30.0,
pool_timeout=30.0,
http_version="1.1"
)
application = (
Application.builder()
.token(TELEGRAM_TOKEN)
.request(custom_request)
.build()
)
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_potential_url))
application.add_handler(CallbackQueryHandler(handle_summary_type_callback))
application.add_error_handler(error_handler)
logger.info("Telegram application handlers configured.")
return application
# --- ASGI Lifespan Context Manager ---
@contextlib.asynccontextmanager
async def lifespan(app: Starlette):
"""Handles PTB startup and shutdown during ASGI lifespan."""
global ptb_app
logger.info("ASGI Lifespan: Startup sequence initiated...")
try:
ptb_app = await setup_bot_config()
await ptb_app.initialize()
await ptb_app.start()
bot_info = await ptb_app.bot.get_me()
logger.info(f"Bot started: @{bot_info.username}")
WEBHOOK_URL_BASE = os.environ.get("SPACE_HOST")
if WEBHOOK_URL_BASE:
if not WEBHOOK_URL_BASE.startswith("https://"):
WEBHOOK_URL_BASE = f"https://{WEBHOOK_URL_BASE}"
webhook_path = "/webhook"
full_webhook_url = f"{WEBHOOK_URL_BASE.rstrip('/')}{webhook_path}"
logger.info(f"Setting webhook to: {full_webhook_url}")
await asyncio.sleep(2.0)
try:
await ptb_app.bot.set_webhook(
url=full_webhook_url,
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=True
)
webhook_info = await ptb_app.bot.get_webhook_info()
logger.info(f"Webhook set: {webhook_info}")
except Exception as e:
logger.error(f"Failed to set webhook: {e}")
logger.info("ASGI Lifespan: Startup complete.")
yield
except Exception as startup_err:
logger.critical(f"Startup error: {startup_err}", exc_info=True)
raise
finally:
logger.info("ASGI Lifespan: Shutdown sequence initiated...")
if ptb_app:
try:
await ptb_app.stop()
await ptb_app.shutdown()
logger.info("PTB Application shut down gracefully.")
except Exception as shutdown_err:
logger.error(f"Shutdown error: {shutdown_err}")
logger.info("ASGI Lifespan: Shutdown complete.")
# --- Flask App Setup ---
flask_core_app = Flask(__name__)
@flask_core_app.route('/')
def index():
"""Basic health check endpoint."""
bot_status = "Unknown"
if ptb_app and ptb_app.bot:
bot_status = f"Running (@{ptb_app.bot.username})"
return f"Telegram Bot Summarizer - Status: {bot_status}"
@flask_core_app.route('/webhook', methods=['POST'])
async def webhook() -> Response:
"""Webhook endpoint called by Telegram."""
if not ptb_app:
return Response('Bot not initialized', status=503)
try:
update_data = request.get_json()
if not update_data:
return Response('Bad Request', status=400)
update = Update.de_json(update_data, ptb_app.bot)
await ptb_app.process_update(update)
return Response('ok', status=200)
except Exception as e:
logger.error(f"Webhook error: {e}")
return Response('Internal Server Error', status=500)
# --- Create Starlette ASGI Application ---
app = Starlette(
debug=False,
lifespan=lifespan,
routes=[
Mount("/", app=WSGIMiddleware(flask_core_app))
]
)
logger.info("Starlette ASGI application created.")
# --- Development Server Execution Block ---
if __name__ == '__main__':
logger.warning("Running in development mode (Flask server only)")
local_port = int(os.environ.get('PORT', 8080))
flask_core_app.run(host='0.0.0.0', port=local_port, debug=True, use_reloader=False)