@woai
π§Ή Major code cleanup and internationalization - Remove Russian comments/strings, translate UI to English, clean linter errors, remove hardcoded tokens, delete test files. Ready for production deployment
e775565
import asyncio | |
import logging | |
from typing import Optional | |
import aiohttp | |
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup | |
from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ContextTypes | |
from telegram.constants import ParseMode | |
import os | |
from dotenv import load_dotenv | |
import warnings | |
# Ignore standard warnings | |
warnings.filterwarnings("ignore", message="SSL shutdown timed out") | |
warnings.filterwarnings("ignore", message="Certificate verification failed") | |
warnings.filterwarnings("ignore", message="SSL handshake failed") | |
warnings.filterwarnings("ignore", message="Connection lost") | |
# Disable SSL error logging from all possible sources | |
logging.getLogger("httpx").setLevel(logging.WARNING) | |
logging.getLogger("httpcore").setLevel(logging.WARNING) | |
logging.getLogger("httpcore.connection").setLevel(logging.ERROR) | |
logging.getLogger("httpcore.http11").setLevel(logging.ERROR) | |
logging.getLogger("asyncio").setLevel(logging.WARNING) | |
# Create custom filter to suppress SSL errors | |
class SSLErrorFilter(logging.Filter): | |
def filter(self, record): | |
message = record.getMessage() | |
return not any(phrase in message.lower() for phrase in [ | |
'ssl shutdown timed out', | |
'connection lost', | |
'ssl handshake failed', | |
'certificate verification failed' | |
]) | |
# Apply filter to root logger | |
logging.getLogger().addFilter(SSLErrorFilter()) | |
# Load environment variables | |
load_dotenv() | |
# Configuration | |
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") | |
if not TELEGRAM_TOKEN: | |
raise ValueError("TELEGRAM_TOKEN environment variable is required") | |
# Support both variable name variants | |
MCP_BASE_URL = os.getenv("MCP_BASE_URL", os.getenv("MCP_BASE_URL", "https://youtube-bot.tuttech.net/api/mcp")) | |
# Set up logging | |
logging.basicConfig( | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
level=logging.INFO | |
) | |
logger = logging.getLogger(__name__) | |
def escape_markdown(text: str) -> str: | |
"""Escape markdown special characters.""" | |
escape_chars = ['_', '*', '[', ']', '(', ')', '~', '`', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!'] | |
for char in escape_chars: | |
text = text.replace(char, '\\' + char) | |
return text | |
class TubeMetaBot: | |
def __init__(self): | |
self.app = Application.builder().token(TELEGRAM_TOKEN).build() | |
self.setup_handlers() | |
def setup_handlers(self): | |
"""Set up command and message handlers""" | |
self.app.add_handler(CommandHandler("start", self.start_command)) | |
self.app.add_handler(CommandHandler("help", self.help_command)) | |
self.app.add_handler(CommandHandler("search", self.search_command)) | |
self.app.add_handler(CommandHandler("analyze", self.analyze_command)) | |
self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) | |
self.app.add_handler(CallbackQueryHandler(self.handle_callback_query)) | |
async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
"""Handle /start command""" | |
welcome_text = """ | |
π¬ **Welcome to TubeMeta Bot!** | |
I can help you with YouTube videos: | |
β’ π Search for videos | |
β’ π Get video metadata | |
β’ π Extract transcripts | |
β’ β° Generate AI timecodes with Gemini 2.0 | |
**How to use:** | |
β’ Send me a YouTube URL for full analysis | |
β’ Use `/search <query>` to find videos | |
β’ Send any text to search YouTube | |
Type `/help` for more information! | |
""" | |
await update.message.reply_text(welcome_text, parse_mode=ParseMode.MARKDOWN) | |
async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
"""Handle /help command""" | |
help_text = """ | |
π€ **TubeMeta Bot Help** | |
**Commands:** | |
β’ `/start` - Welcome message | |
β’ `/help` - Show this help | |
β’ `/search <query>` - Search YouTube videos | |
β’ `/analyze` - Analyze YouTube video (send after this command) | |
**Features:** | |
β’ π **Video Search** - Find YouTube videos by keywords | |
β’ π **Video Analysis** - Get detailed metadata (title, duration, views, etc.) | |
β’ π **Transcripts** - Extract video transcripts/subtitles | |
β’ β° **AI Timecodes** - Generate smart timecodes with Gemini 2.0 | |
**Usage Examples:** | |
β’ Search: `/search machine learning tutorial` | |
β’ Analysis: `/analyze` then send YouTube URL | |
β’ Or just send: `python programming` for search | |
**Supported Languages:** | |
πΊπ¦ Ukrainian | π·πΊ Russian | π¬π§ English | |
Powered by Gemini 2.0 AI π§ | |
""" | |
await update.message.reply_text(help_text, parse_mode=ParseMode.MARKDOWN) | |
async def search_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
"""Handle /search command""" | |
if not context.args: | |
await update.message.reply_text("Please provide a search query. Example: `/search python tutorial`") | |
return | |
query = " ".join(context.args) | |
await self.handle_search(update, query) | |
async def analyze_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
"""Handle /analyze command""" | |
if not update.message: | |
return # Skip if no message (shouldn't happen in command handlers) | |
if context.args: | |
# URL provided with command | |
url = " ".join(context.args) | |
if self.is_youtube_url(url): | |
await self.handle_youtube_url(update, url) | |
else: | |
await update.message.reply_text("β Please provide a valid YouTube URL. Example: `/analyze https://youtu.be/dQw4w9WgXcQ`") | |
else: | |
# Ask for URL | |
await update.message.reply_text("πΊ Please send me a YouTube URL to analyze.\n\nExample: `https://youtu.be/dQw4w9WgXcQ`", parse_mode=ParseMode.MARKDOWN) | |
async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
"""Handle regular text messages""" | |
text = update.message.text.strip() | |
# Check if it's a YouTube URL | |
if self.is_youtube_url(text): | |
await update.message.reply_text( | |
"πΊ I see you sent a YouTube URL! Use the `/analyze` command to analyze it.\n\n" | |
"Example: `/analyze https://youtu.be/dQw4w9WgXcQ`\n" | |
"Or just type `/analyze` and then send the URL.", | |
parse_mode=ParseMode.MARKDOWN | |
) | |
else: | |
# Treat as search query | |
await self.handle_search(update, text) | |
def is_youtube_url(self, text: str) -> bool: | |
"""Check if text contains a YouTube URL""" | |
youtube_domains = [ | |
'youtube.com', 'youtu.be', 'www.youtube.com', | |
'm.youtube.com', 'music.youtube.com' | |
] | |
return any(domain in text.lower() for domain in youtube_domains) | |
async def handle_youtube_url(self, update: Update, url: str): | |
"""Handle YouTube URL - provide full analysis options""" | |
# Send initial message | |
processing_msg = await update.message.reply_text("π Analyzing YouTube video...") | |
try: | |
# Get basic video info first | |
video_info_response = await self.call_mcp_action("video_info", {"video_id": url}) | |
# Check if we got a valid response | |
if not video_info_response: | |
await processing_msg.edit_text("β Could not analyze this YouTube video. Please check the URL.") | |
return | |
# Check for error in response | |
if video_info_response.get("error"): | |
await processing_msg.edit_text(f"β Error: {video_info_response['error']}") | |
return | |
# Check if we have video data | |
video_data = video_info_response.get("data") | |
if not video_data: | |
await processing_msg.edit_text("β Could not retrieve video information. Please check the URL.") | |
return | |
# Format video info for display | |
info_text = self.format_video_info_from_data(video_data) | |
# Create action buttons | |
video_id = video_data.get("video_id", url) | |
# Limit callback data to avoid Button_data_invalid error (Telegram limit is 64 bytes) | |
safe_video_id = video_id[:30] if video_id else url[:30] # Limit video ID | |
video_title = video_data.get('title', 'related videos') | |
# Truncate title for search callback to fit in 64 byte limit | |
safe_title = video_title[:30] if len(video_title) > 30 else video_title | |
keyboard = [ | |
[ | |
InlineKeyboardButton("π Get Transcript", callback_data=f"transcript:{safe_video_id}"), | |
InlineKeyboardButton("β° AI Timecodes", callback_data=f"timecodes:{safe_video_id}") | |
], | |
[ | |
InlineKeyboardButton("π Search Similar", callback_data=f"search:{safe_title}") | |
] | |
] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
await processing_msg.edit_text(info_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
except Exception as e: | |
logger.error(f"Error handling YouTube URL: {e}") | |
await processing_msg.edit_text("β An error occurred while analyzing the video.") | |
def format_video_info_from_data(self, video_data: dict) -> str: | |
"""Format video information from MCP response data for display using HTML""" | |
title = video_data.get("title", "Unknown Title") | |
channel = video_data.get("channel_title", "Unknown Channel") | |
duration = video_data.get("duration", "Unknown") | |
view_count = video_data.get("view_count", "Unknown") | |
upload_date = video_data.get("published_at", "Unknown") | |
like_count = video_data.get("like_count", "Unknown") | |
comment_count = video_data.get("comment_count", "Unknown") | |
info_text = f"""π¬ <b>{title}</b> | |
π€ <b>Channel:</b> {channel} | |
β±οΈ <b>Duration:</b> {duration} | |
ποΈ <b>Views:</b> {view_count} | |
π <b>Likes:</b> {like_count} | |
π¬ <b>Comments:</b> {comment_count} | |
π <b>Uploaded:</b> {upload_date} | |
Choose an action below:""" | |
return info_text | |
async def handle_search(self, update: Update, query: str): | |
"""Handle search query""" | |
processing_msg = await update.message.reply_text(f"π Searching for: <b>{query}</b>", parse_mode=ParseMode.HTML) | |
try: | |
results = await self.call_mcp_action("search", {"query": query, "max_results": 5}) | |
if not results or (isinstance(results, dict) and "error" in results): | |
await processing_msg.edit_text("β No results found for your search.") | |
return | |
# Format search results | |
search_text = f"π <b>Search Results for:</b> {query}\n\n" | |
# Handle the case where results is a list (new format) | |
if isinstance(results, list): | |
videos = results | |
else: | |
# Fallback for old format | |
videos = results.get("videos", []) | |
for i, video_obj in enumerate(videos, 1): | |
# Extract video data from the response object | |
if isinstance(video_obj, dict) and "data" in video_obj: | |
video = video_obj["data"] | |
else: | |
video = video_obj | |
# Build video info (HTML auto-escapes dangerous chars) | |
title = video.get('title', 'Unknown Title') | |
channel = video.get('channel_title', video.get('channel', 'Unknown Channel')) | |
duration = str(video.get('duration', 'Unknown')) | |
view_count = str(video.get('view_count', 'Unknown')) | |
video_id = video.get('video_id', '') | |
search_text += f"<b>{i}. {title}</b>\n" | |
search_text += f"π€ {channel}\n" | |
if duration != 'Unknown': | |
search_text += f"β±οΈ {duration}\n" | |
if view_count != 'Unknown': | |
search_text += f"ποΈ {view_count} views\n" | |
search_text += f"π https://www.youtube.com/watch?v={video_id}\n\n" | |
# Add search refinement buttons | |
keyboard = [ | |
[InlineKeyboardButton("π New Search", callback_data="new_search")] | |
] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
await processing_msg.edit_text(search_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
except Exception as e: | |
logger.error(f"Error handling search: {e}") | |
await processing_msg.edit_text("β An error occurred during search.") | |
async def handle_callback_query(self, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
"""Handle inline keyboard button presses""" | |
query = update.callback_query | |
await query.answer() | |
data = query.data | |
logger.info(f"Callback query: {data}") | |
if data.startswith("transcript:"): | |
url = data.replace("transcript:", "") | |
await self.get_transcript(query, url) | |
elif data.startswith("timecodes:"): | |
url = data.replace("timecodes:", "") | |
await self.get_timecodes(query, url) | |
elif data.startswith("search:"): | |
search_query = data.replace("search:", "") | |
await self.handle_search_callback(query, search_query) | |
elif data.startswith("back:"): | |
url = data.replace("back:", "") | |
await self.handle_back_to_video(query, url) | |
elif data.startswith("full_transcript:"): | |
url = data.replace("full_transcript:", "") | |
await self.send_full_transcript(query, url) | |
elif data.startswith("full_timecodes:"): | |
url = data.replace("full_timecodes:", "") | |
await self.send_full_timecodes(query, url) | |
elif data.startswith("analyze:"): | |
video_id = data.replace("analyze:", "") | |
await self.analyze_video(query, f"https://www.youtube.com/watch?v={video_id}") | |
elif data.startswith("back_to_analysis:"): | |
url = data.replace("back_to_analysis:", "") | |
await self.handle_back_to_video(query, url) | |
elif data == "new_search": | |
await query.edit_message_text( | |
"π **Send me a new search query!**\n\nJust type your search terms and I'll find YouTube videos for you.", | |
parse_mode=ParseMode.MARKDOWN | |
) | |
else: | |
await query.edit_message_text("β Unknown action") | |
async def get_transcript(self, query, url: str): | |
"""Get video transcript""" | |
await query.edit_message_text("π Extracting transcript...") | |
try: | |
transcript_response = await self.call_mcp_action("transcript", {"video_id": url}) | |
# Check if we got a valid response | |
if not transcript_response: | |
await query.edit_message_text("β Could not extract transcript. Please try again later.") | |
return | |
# Check for error in response | |
if transcript_response.get("error"): | |
await query.edit_message_text(f"β {transcript_response['error']}") | |
return | |
# Check if we have transcript data | |
if transcript_response.get("type") not in ["youtube_transcript"]: | |
await query.edit_message_text("β Invalid transcript response format.") | |
return | |
# Get the markdown formatted transcript | |
transcript_text = transcript_response.get("markdown", "") | |
if not transcript_text: | |
await query.edit_message_text("β Transcript is empty or unavailable.") | |
return | |
# Handle long transcripts more intelligently | |
max_length = 4000 # Leave room for buttons and formatting | |
if len(transcript_text) > max_length: | |
# Create a summary message with first part | |
summary_text = "π **Transcript Preview** (showing first {} characters)\n\n".format(max_length) | |
summary_text += transcript_text[:max_length-200] + "...\n\n" | |
summary_text += f"<i>π Full transcript: {len(transcript_text)} characters total</i>\n" | |
summary_text += "<i>β οΈ Transcript is too long for Telegram. Showing preview only.</i>" | |
# Add back button and full transcript button | |
keyboard = [ | |
[ | |
InlineKeyboardButton("π Get Full Text", callback_data=f"full_transcript:{url}"), | |
InlineKeyboardButton("β¬ οΈ Back", callback_data=f"back:{url}") | |
] | |
] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
await query.edit_message_text(summary_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
else: | |
# Short enough to display fully | |
# Add back button | |
keyboard = [[InlineKeyboardButton("β¬ οΈ Back", callback_data=f"back:{url}")]] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
await query.edit_message_text(transcript_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
except Exception as e: | |
logger.error(f"Error getting transcript: {e}") | |
await query.edit_message_text("β An error occurred while extracting transcript.") | |
async def get_timecodes(self, query, url: str): | |
"""Generate AI timecodes""" | |
await query.edit_message_text("β° Generating AI timecodes with Gemini 2.0...") | |
try: | |
timecodes_response = await self.call_mcp_action("gemini_timecodes", { | |
"video_id": url, | |
"format": "youtube" | |
}) | |
# Check if we got a valid response | |
if not timecodes_response: | |
await query.edit_message_text("β Could not generate timecodes. Please try again later.") | |
return | |
# Check for error in response | |
if timecodes_response.get("error"): | |
await query.edit_message_text(f"β {timecodes_response['error']}") | |
return | |
# Check if we have timecodes data | |
if timecodes_response.get("type") not in ["youtube_gemini_timecodes"]: | |
await query.edit_message_text("β Invalid timecodes response format.") | |
return | |
# Get the markdown formatted timecodes | |
timecodes_text = timecodes_response.get("markdown", "") | |
if not timecodes_text: | |
await query.edit_message_text("β No timecodes were generated.") | |
return | |
# Handle long timecodes more intelligently | |
max_length = 4000 # Leave room for buttons and formatting | |
if len(timecodes_text) > max_length: | |
# Create a summary message with preview | |
data = timecodes_response.get("data", {}) | |
timecodes_list = data.get("timecodes", []) | |
detected_language = data.get("detected_language", "unknown") | |
summary_text = "β° **AI Timecodes Generated**\n\n" | |
summary_text += f"π€ **Model:** {data.get('model', 'Gemini AI')}\n" | |
summary_text += f"π **Language:** {detected_language}\n" | |
summary_text += f"π **Total timecodes:** {len(timecodes_list)}\n\n" | |
# Calculate how many timecodes we can show | |
available_space = max_length - len(summary_text) - 300 # Reserve space for buttons and footer | |
# Show as many timecodes as possible within space limit | |
preview_text = "<b>Timecodes Preview:</b>\n<pre>" | |
current_length = 0 | |
shown_count = 0 | |
for tc in timecodes_list: | |
tc_line = f"{tc}\n" | |
if current_length + len(tc_line) < available_space: | |
preview_text += tc_line | |
current_length += len(tc_line) | |
shown_count += 1 | |
else: | |
break | |
preview_text += "</pre>\n\n" | |
if shown_count < len(timecodes_list): | |
summary_text += preview_text | |
summary_text += f"<i>π Showing {shown_count} of {len(timecodes_list)} timecodes</i>\n" | |
summary_text += "<i>πΎ Download full file for complete list</i>" | |
else: | |
# All timecodes fit, show them directly | |
summary_text = timecodes_text | |
# Add buttons for full timecodes and back | |
keyboard = [ | |
[ | |
InlineKeyboardButton("π Get Full List", callback_data=f"full_timecodes:{url}"), | |
InlineKeyboardButton("β¬ οΈ Back", callback_data=f"back:{url}") | |
] | |
] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
await query.edit_message_text(summary_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
else: | |
# Short enough to display fully | |
# Add back button | |
keyboard = [[InlineKeyboardButton("β¬ οΈ Back", callback_data=f"back:{url}")]] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
# Convert markdown to HTML for proper code block rendering | |
html_timecodes = self.convert_markdown_to_html(timecodes_text) | |
await query.edit_message_text(html_timecodes, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
except Exception as e: | |
logger.error(f"Error generating timecodes: {e}") | |
await query.edit_message_text("β An error occurred while generating timecodes.") | |
async def send_full_transcript(self, query, url: str): | |
"""Send full transcript as a text file""" | |
await query.edit_message_text("π Preparing full transcript file...") | |
try: | |
transcript_response = await self.call_mcp_action("transcript", {"video_id": url}) | |
if not transcript_response or transcript_response.get("error"): | |
await query.edit_message_text("β Could not extract full transcript.") | |
return | |
# Get full transcript text | |
full_transcript = transcript_response.get("markdown", "") | |
if not full_transcript: | |
await query.edit_message_text("β Transcript is empty.") | |
return | |
# Create a simple text version (without markdown formatting) | |
simple_text = full_transcript.replace("# π Transcript\n\n", "") | |
simple_text = simple_text.replace("**[", "[").replace("]**", "]") | |
# Send as document | |
from io import BytesIO | |
import re | |
# Extract video title from URL for filename | |
video_info = await self.call_mcp_action("video_info", {"video_id": url}) | |
title = "transcript" | |
if video_info and video_info.get("data"): | |
title = video_info["data"].get("title", "transcript") | |
# Clean title for filename | |
title = re.sub(r'[<>:"/\\|?*]', '_', title)[:50] | |
# Create file | |
transcript_file = BytesIO(simple_text.encode('utf-8')) | |
transcript_file.name = f"{title}_transcript.txt" | |
# Send file and create a new message instead of editing | |
await query.message.reply_document( | |
document=transcript_file, | |
caption=f"π Full transcript for: {title}", | |
reply_markup=InlineKeyboardMarkup([[ | |
InlineKeyboardButton("πΉ Back to Video", callback_data=f"back_to_analysis:{url}") | |
]]) | |
) | |
# Edit original message to show completion | |
await query.edit_message_text( | |
"β Transcript file sent!\n\nπ Check the file above for the complete transcript.", | |
reply_markup=InlineKeyboardMarkup([[ | |
InlineKeyboardButton("πΉ Back to Video", callback_data=f"back_to_analysis:{url}") | |
]]) | |
) | |
except Exception as e: | |
logger.error(f"Error sending full transcript: {e}") | |
await query.edit_message_text("β An error occurred while preparing transcript file.") | |
async def send_full_timecodes(self, query, url: str): | |
"""Send full timecodes as a text file""" | |
await query.edit_message_text("π Preparing full timecodes file...") | |
try: | |
timecodes_response = await self.call_mcp_action("gemini_timecodes", { | |
"video_id": url, | |
"format": "youtube" | |
}) | |
if not timecodes_response or timecodes_response.get("error"): | |
await query.edit_message_text("β Could not generate full timecodes.") | |
return | |
# Get full timecodes | |
data = timecodes_response.get("data", {}) | |
timecodes_list = data.get("timecodes", []) | |
if not timecodes_list: | |
await query.edit_message_text("β No timecodes available.") | |
return | |
# Create text content | |
content = "AI Generated Timecodes\n" | |
content += f"Model: {data.get('model', 'Gemini AI')}\n" | |
content += f"Language: {data.get('detected_language', 'auto-detected')}\n" | |
content += f"Total: {len(timecodes_list)} timecodes\n\n" | |
content += "\n".join(timecodes_list) | |
# Send as document | |
from io import BytesIO | |
import re | |
# Extract video title for filename | |
video_info = await self.call_mcp_action("video_info", {"video_id": url}) | |
title = "timecodes" | |
if video_info and video_info.get("data"): | |
title = video_info["data"].get("title", "timecodes") | |
# Clean title for filename | |
title = re.sub(r'[<>:"/\\|?*]', '_', title)[:50] | |
# Create file | |
timecodes_file = BytesIO(content.encode('utf-8')) | |
timecodes_file.name = f"{title}_timecodes.txt" | |
# Send file and create a new message instead of editing | |
await query.message.reply_document( | |
document=timecodes_file, | |
caption=f"β° AI Timecodes for: {title}", | |
reply_markup=InlineKeyboardMarkup([[ | |
InlineKeyboardButton("πΉ Back to Video", callback_data=f"back_to_analysis:{url}") | |
]]) | |
) | |
# Edit original message to show completion | |
await query.edit_message_text( | |
"β Timecodes file sent!\n\nπ Check the file above for all timecodes.", | |
reply_markup=InlineKeyboardMarkup([[ | |
InlineKeyboardButton("πΉ Back to Video", callback_data=f"back_to_analysis:{url}") | |
]]) | |
) | |
except Exception as e: | |
logger.error(f"Error sending full timecodes: {e}") | |
await query.edit_message_text("β An error occurred while preparing timecodes file.") | |
async def handle_back_to_video(self, query, url: str): | |
"""Return to video analysis view""" | |
try: | |
# Check if the current message has text that can be edited | |
current_message = query.message | |
if not current_message or not current_message.text: | |
# If no text to edit, send a new message instead of editing | |
await query.answer("π Loading video information...") | |
# Get basic video info | |
video_info_response = await self.call_mcp_action("video_info", {"video_id": url}) | |
if not video_info_response or video_info_response.get("error"): | |
await current_message.reply_text("β Could not analyze this YouTube video. Please check the URL.") | |
return | |
video_data = video_info_response.get("data") | |
if not video_data: | |
await current_message.reply_text("β Could not retrieve video information. Please check the URL.") | |
return | |
# Format video info for display | |
info_text = self.format_video_info_from_data(video_data) | |
# Create action buttons | |
video_id = video_data.get("video_id", url) | |
safe_video_id = video_id[:30] if video_id else url[:30] | |
video_title = video_data.get('title', 'related videos') | |
safe_title = video_title[:30] if len(video_title) > 30 else video_title | |
keyboard = [ | |
[ | |
InlineKeyboardButton("π Get Transcript", callback_data=f"transcript:{safe_video_id}"), | |
InlineKeyboardButton("β° AI Timecodes", callback_data=f"timecodes:{safe_video_id}") | |
], | |
[ | |
InlineKeyboardButton("π Search Similar", callback_data=f"search:{safe_title}") | |
] | |
] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
await current_message.reply_text(info_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
return | |
# If message has text, proceed with normal editing | |
await query.edit_message_text("π Loading video information...") | |
# Re-analyze the video by calling handle_youtube_url logic | |
# Get basic video info first | |
video_info_response = await self.call_mcp_action("video_info", {"video_id": url}) | |
# Check if we got a valid response | |
if not video_info_response: | |
await query.edit_message_text("β Could not analyze this YouTube video. Please check the URL.") | |
return | |
# Check for error in response | |
if video_info_response.get("error"): | |
await query.edit_message_text(f"β Error: {video_info_response['error']}") | |
return | |
# Check if we have video data | |
video_data = video_info_response.get("data") | |
if not video_data: | |
await query.edit_message_text("β Could not retrieve video information. Please check the URL.") | |
return | |
# Format video info for display | |
info_text = self.format_video_info_from_data(video_data) | |
# Create action buttons | |
video_id = video_data.get("video_id", url) | |
# Limit callback data to avoid Button_data_invalid error (Telegram limit is 64 bytes) | |
safe_video_id = video_id[:30] if video_id else url[:30] # Limit video ID | |
video_title = video_data.get('title', 'related videos') | |
# Truncate title for search callback to fit in 64 byte limit | |
safe_title = video_title[:30] if len(video_title) > 30 else video_title | |
keyboard = [ | |
[ | |
InlineKeyboardButton("π Get Transcript", callback_data=f"transcript:{safe_video_id}"), | |
InlineKeyboardButton("β° AI Timecodes", callback_data=f"timecodes:{safe_video_id}") | |
], | |
[ | |
InlineKeyboardButton("π Search Similar", callback_data=f"search:{safe_title}") | |
] | |
] | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
await query.edit_message_text(info_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
except Exception as e: | |
logger.error(f"Error returning to video: {e}") | |
# If edit fails, try to send a new message | |
try: | |
await query.answer("β Could not load video information.") | |
await query.message.reply_text("β Could not load video information. Please try again.") | |
except Exception: | |
pass # Ignore if this also fails | |
async def analyze_video(self, query, url: str): | |
"""Analyze video for callback queries (alias for handle_back_to_video)""" | |
await self.handle_back_to_video(query, url) | |
async def handle_search_callback(self, query, search_query: str): | |
"""Handle search callback from inline keyboard""" | |
await query.edit_message_text(f"π Searching for: {search_query}...") | |
try: | |
# Call MCP search action | |
search_response = await self.call_mcp_action("search", { | |
"query": search_query, | |
"max_results": 5 | |
}) | |
if not search_response: | |
await query.edit_message_text("β No results found for your search.") | |
return | |
# Format search results | |
if isinstance(search_response, list) and len(search_response) > 0: | |
results_text = f"π **Search Results for:** {search_query}\n\n" | |
keyboard = [] | |
for i, result in enumerate(search_response[:5], 1): | |
if result.get("data"): | |
video_data = result["data"] | |
title = video_data.get("title", "Unknown Title")[:50] | |
channel = video_data.get("channel_title", "Unknown Channel") | |
video_id = video_data.get("video_id", "") | |
results_text += f"**{i}.** {title}\n" | |
results_text += f"π€ {channel}\n\n" | |
# Add analyze button for each video | |
if video_id: | |
keyboard.append([InlineKeyboardButton( | |
f"πΉ Analyze Video {i}", | |
callback_data=f"analyze:{video_id}" | |
)]) | |
# Add new search button | |
keyboard.append([InlineKeyboardButton("π New Search", callback_data="new_search")]) | |
reply_markup = InlineKeyboardMarkup(keyboard) | |
await query.edit_message_text(results_text, reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN) | |
else: | |
await query.edit_message_text("β No results found for your search.") | |
except Exception as e: | |
logger.error(f"Error in search callback: {e}") | |
await query.edit_message_text("β An error occurred during search.") | |
async def call_mcp_action(self, action: str, params: dict) -> Optional[dict]: | |
"""Call MCP server action""" | |
try: | |
# Configure timeout and connection settings | |
timeout = aiohttp.ClientTimeout(total=30, connect=10) | |
# Create connector without SSL for full SSL verification bypass | |
connector = aiohttp.TCPConnector( | |
limit=10, | |
limit_per_host=5, | |
ttl_dns_cache=300, | |
use_dns_cache=True, | |
enable_cleanup_closed=False, # Disable cleanup to prevent errors | |
ssl=False | |
) | |
# Create session with full SSL bypass | |
async with aiohttp.ClientSession( | |
timeout=timeout, | |
connector=connector, | |
trust_env=True, | |
skip_auto_headers={'User-Agent'} | |
) as session: | |
payload = { | |
"action": action, | |
"parameters": params | |
} | |
headers = { | |
'Content-Type': 'application/json', | |
'User-Agent': 'TubeMetaBot/1.0' | |
} | |
try: | |
# Execute request with full SSL bypass | |
async with session.post( | |
MCP_BASE_URL, | |
json=payload, | |
headers=headers, | |
ssl=False | |
) as response: | |
if response.status == 200: | |
result = await response.json() | |
logger.info(f"MCP request successful: {action}") | |
return result | |
else: | |
logger.error(f"MCP server error: {response.status}") | |
return None | |
except aiohttp.ClientConnectorError as e: | |
logger.error(f"Connection error: {e}") | |
return None | |
except asyncio.TimeoutError as e: | |
logger.error(f"Timeout error: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Request error: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Error calling MCP server: {e}") | |
return None | |
def format_video_info(self, video_info: dict) -> str: | |
"""Format video information for display using HTML""" | |
title = video_info.get("title", "Unknown Title") | |
channel = video_info.get("channel", "Unknown Channel") | |
duration = video_info.get("duration", "Unknown") | |
view_count = video_info.get("view_count", "Unknown") | |
upload_date = video_info.get("upload_date", "Unknown") | |
description = video_info.get("description", "") | |
# Truncate description if too long | |
if len(description) > 200: | |
description = description[:200] + "..." | |
info_text = f"""π¬ <b>{title}</b> | |
π€ <b>Channel:</b> {channel} | |
β±οΈ <b>Duration:</b> {duration} | |
ποΈ <b>Views:</b> {view_count} | |
π <b>Uploaded:</b> {upload_date} | |
π <b>Description:</b> | |
{description} | |
Choose an action below:""" | |
return info_text | |
async def handle_analyze(self, update: Update): | |
"""Handle analyze request""" | |
await update.message.reply_text("πΊ Please send me a YouTube URL to analyze.\n\nExample: `https://youtu.be/dQw4w9WgXcQ`", parse_mode=ParseMode.MARKDOWN) | |
async def run(self): | |
"""Start the bot""" | |
logger.info("Starting TubeMeta Bot...") | |
try: | |
await self.app.initialize() | |
await self.app.start() | |
await self.app.updater.start_polling(drop_pending_updates=True) | |
logger.info(f"Bot successfully started! MCP URL: {MCP_BASE_URL}") | |
# Keep the bot running | |
await asyncio.Event().wait() | |
except KeyboardInterrupt: | |
logger.info("Shutting down bot due to keyboard interrupt...") | |
except Exception as e: | |
logger.error(f"Error in bot operation: {e}") | |
finally: | |
# Graceful shutdown | |
logger.info("Shutting down bot...") | |
try: | |
await self.app.updater.stop() | |
await self.app.stop() | |
await self.app.shutdown() | |
logger.info("Bot shutdown complete") | |
except Exception as e: | |
logger.error(f"Error during shutdown: {e}") | |
def convert_markdown_to_html(self, text: str) -> str: | |
"""Convert markdown formatting to HTML for Telegram.""" | |
# Convert code blocks (```) to HTML | |
import re | |
# Replace triple backticks with HTML pre tags | |
text = re.sub(r'```\n(.*?)\n```', r'<pre>\1</pre>', text, flags=re.DOTALL) | |
# Convert **bold** to HTML | |
text = re.sub(r'\*\*(.*?)\*\*', r'<b>\1</b>', text) | |
# Convert *italic* to HTML | |
text = re.sub(r'\*(.*?)\*', r'<i>\1</i>', text) | |
# Convert inline code `code` to HTML | |
text = re.sub(r'`(.*?)`', r'<code>\1</code>', text) | |
return text | |
async def main(): | |
"""Main function""" | |
bot = TubeMetaBot() | |
await bot.run() | |
if __name__ == "__main__": | |
asyncio.run(main()) | |