import gradio as gr import json import httpx import os import traceback from dotenv import load_dotenv from utils import format_timestamp, extract_video_id # Load environment variables load_dotenv() # API URL for local development API_URL = "http://127.0.0.1:8080/api" # API URL for Hugging Face Spaces # API_URL = "https://your-huggingface-space-url/api" async def search_youtube(query, max_results, order, video_duration): """Function for searching videos on YouTube.""" try: async with httpx.AsyncClient() as client: response = await client.post( f"{API_URL}/search", json={ "query": query, "max_results": max_results, "order": order, "video_duration": video_duration if video_duration != "any" else None } ) data = response.json() if "error" in data and data["error"]: return f"Error: {data['error']}", None results = data.get("content", []) formatted_results = [] for video in results: formatted_results.append( f"**{video['title']}**\n" f"ID: {video['video_id']}\n" f"Channel: {video['channel_title']}\n" f"Published: {video['published_at']}\n" f"[Thumbnail]({video['thumbnail']})\n\n" f"{video['description'][:200]}...\n\n" f"---\n" ) return "\n".join(formatted_results), json.dumps(results, indent=2, ensure_ascii=False) except Exception as e: return f"Error: {str(e)}", None async def get_video_info(video_id): """Function for getting video information.""" try: # No need to extract video ID here, it is done on the server async with httpx.AsyncClient() as client: response = await client.post( f"{API_URL}/video_info", json={"video_id": video_id} ) data = response.json() if "error" in data and data["error"]: return f"Error: {data['error']}", None video_info = data.get("content", {}) formatted_info = ( f"**{video_info.get('title')}**\n\n" f"Channel: {video_info.get('channel_title')}\n" f"Published: {video_info.get('published_at')}\n" f"Views: {video_info.get('view_count')}\n" f"Likes: {video_info.get('like_count')}\n" f"Comments: {video_info.get('comment_count')}\n" f"Duration: {video_info.get('duration')}\n\n" f"**Description:**\n{video_info.get('description')}\n\n" f"**Tags:**\n{', '.join(video_info.get('tags', []))}" ) return formatted_info, json.dumps(video_info, indent=2, ensure_ascii=False) except Exception as e: return f"Error: {str(e)}", None async def get_transcript(video_id, language_code): """Function for getting video transcript.""" try: async with httpx.AsyncClient() as client: response = await client.post( f"{API_URL}/transcript", json={ "video_id": video_id, "language_code": language_code if language_code else None } ) data = response.json() if "error" in data and data["error"]: return f"Error: {data['error']}", None transcript = data.get("content", []) formatted_transcript = "" for entry in transcript: start_time = entry.get("start", 0) duration = entry.get("duration", 0) end_time = start_time + duration # Format time to hours:minutes:seconds format start_formatted = format_timestamp(start_time) end_formatted = format_timestamp(end_time) formatted_transcript += f"[{start_formatted} - {end_formatted}] {entry.get('text', '')}\n\n" return formatted_transcript, json.dumps(transcript, indent=2, ensure_ascii=False) except Exception as e: return f"Error: {str(e)}", None async def get_available_languages(video_id): """Function for getting available transcript languages.""" try: async with httpx.AsyncClient() as client: response = await client.post( f"{API_URL}/transcript_languages", json={"video_id": video_id} ) data = response.json() if "error" in data and data["error"]: return f"Error: {data['error']}", None languages = data.get("content", []) formatted_languages = [] for lang in languages: status = "Auto-generated" if lang.get("is_generated") else "Official subtitles" translatable = "Translation available" if lang.get("is_translatable") else "Translation not available" formatted_languages.append( f"{lang.get('language')} ({lang.get('language_code')}): {status}, {translatable}" ) return "\n".join(formatted_languages), json.dumps(languages, indent=2, ensure_ascii=False) except Exception as e: return f"Error: {str(e)}", None async def generate_timecodes(video_id, language_code, segment_length, format_type): """Function for generating timecodes.""" try: async with httpx.AsyncClient() as client: response = await client.post( f"{API_URL}/timecodes", json={ "video_id": video_id, "language_code": language_code if language_code else None, "segment_length": segment_length, "format": format_type } ) data = response.json() if "error" in data and data["error"]: return f"Error: {data['error']}", None timecodes = data.get("content", {}).get("timecodes", []) if format_type == "youtube": formatted_timecodes = "```\n" + "\n".join(timecodes) + "\n```" elif format_type == "markdown": formatted_timecodes = "\n".join(timecodes) else: formatted_timecodes = "```\n" + "\n".join(timecodes) + "\n```" return formatted_timecodes, json.dumps(data.get("content", {}), indent=2, ensure_ascii=False) except Exception as e: return f"Error: {str(e)}", None async def generate_gemini_timecodes(video_id, language_code, format_type, model): """Function for generating timecodes using Gemini.""" try: print(f"Sending request to {API_URL}/gemini_timecodes") print(f"Parameters: video_id={video_id}, language_code={language_code}, format={format_type}, model={model}") # Send request to API async with httpx.AsyncClient() as client: response = await client.post( f"{API_URL}/gemini_timecodes", json={ "video_id": video_id, "language_code": language_code, "format": format_type, "model": model }, timeout=120 # Increase timeout for Gemini API ) print(f"Response status: {response.status_code}") # Parse response data = response.json() if "error" in data: print(f"Error in API response: {data['error']}") return f"⚠️ Error: {data['error']}", {"error": data['error']} # Extract timecodes from response content = data.get("content", {}) timecodes = content.get("timecodes", []) print(f"Received {len(timecodes)} timecodes") # Format timecodes for display if timecodes: timecodes_text = "\n".join(timecodes) # Model and language information model_info = content.get("model", "Unknown") language_info = content.get("detected_language", "Unknown") duration_info = content.get("video_duration_minutes", "Unknown") summary = f"🤖 Model: {model_info}\n🗣️ Language: {language_info}\n⏱️ Duration: {duration_info} min\n📝 Timecodes: {len(timecodes)}" return summary, content # Return content object instead of timecodes_text else: return "⚠️ No timecodes generated", {"message": "No timecodes generated"} except Exception as e: print(f"Exception during timecode generation: {str(e)}") traceback.print_exc() return f"Error: {str(e)}", {"error": str(e)} # Create Gradio interface with gr.Blocks(title="YouTube MCP") as demo: gr.Markdown("# YouTube Model Context Protocol (MCP)") gr.Markdown("This interface allows interaction with YouTube API through MCP protocol") with gr.Tab("Video Search"): with gr.Row(): with gr.Column(): search_query = gr.Textbox(label="Search Query", placeholder="Enter search query...") with gr.Row(): max_results = gr.Slider(minimum=1, maximum=50, value=10, step=1, label="Number of Results") order = gr.Dropdown( choices=["relevance", "date", "viewCount", "rating", "title"], value="relevance", label="Sort By" ) video_duration = gr.Dropdown( choices=["any", "short", "medium", "long"], value="any", label="Duration" ) search_button = gr.Button("Search") with gr.Column(): search_results = gr.Markdown(label="Results") search_json = gr.JSON(label="JSON Data") search_button.click( search_youtube, inputs=[search_query, max_results, order, video_duration], outputs=[search_results, search_json] ) with gr.Tab("Video Information"): with gr.Row(): with gr.Column(): video_id_input = gr.Textbox( label="Video ID or URL", placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..." ) get_info_button = gr.Button("Get Information") with gr.Column(): video_info_output = gr.Markdown(label="Video Information") video_info_json = gr.JSON(label="JSON Data") get_info_button.click( get_video_info, inputs=[video_id_input], outputs=[video_info_output, video_info_json] ) with gr.Tab("Video Transcript"): with gr.Row(): with gr.Column(): transcript_video_id = gr.Textbox( label="Video ID or URL", placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..." ) language_code = gr.Textbox(label="Language Code (optional)", placeholder="ru, en, etc...") with gr.Row(): get_transcript_button = gr.Button("Get Transcript") get_languages_button = gr.Button("Get Available Languages") with gr.Column(): transcript_output = gr.Markdown(label="Transcript") transcript_json = gr.JSON(label="JSON Data") get_transcript_button.click( get_transcript, inputs=[transcript_video_id, language_code], outputs=[transcript_output, transcript_json] ) get_languages_button.click( get_available_languages, inputs=[transcript_video_id], outputs=[transcript_output, transcript_json] ) with gr.Tab("Timecodes"): with gr.Row(): with gr.Column(): timecode_video_id = gr.Textbox( label="Video ID or URL", placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..." ) timecode_language = gr.Textbox(label="Language Code (optional)", placeholder="ru, en, etc...") segment_length = gr.Slider(minimum=30, maximum=300, value=60, step=30, label="Segment Length (seconds)") format_type = gr.Dropdown( choices=["youtube", "markdown"], value="youtube", label="Timecode Format" ) generate_timecodes_button = gr.Button("Generate Timecodes") with gr.Column(): timecodes_output = gr.Markdown(label="Timecodes") timecodes_json = gr.JSON(label="JSON Data") generate_timecodes_button.click( generate_timecodes, inputs=[timecode_video_id, timecode_language, segment_length, format_type], outputs=[timecodes_output, timecodes_json] ) with gr.Tab("Gemini Timecodes"): with gr.Row(): with gr.Column(): gemini_video_id = gr.Textbox( label="Video ID or URL", placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..." ) gemini_language = gr.Textbox(label="Language Code (optional)", placeholder="ru, en, etc...") gemini_format = gr.Dropdown( choices=["youtube", "markdown"], value="youtube", label="Timecode Format" ) gemini_model = gr.Dropdown( choices=["gemini-2.0-flash-001", "gemini-2.0-pro-001", "gemini-2.0-pro-vision-001"], value="gemini-2.0-flash-001", label="Gemini Model" ) generate_gemini_button = gr.Button("Generate Timecodes with Gemini") with gr.Column(): gemini_output = gr.Markdown(label="Generation Information") gemini_timecodes = gr.Textbox(label="Timecodes", lines=10, max_lines=20, show_copy_button=True) gemini_json = gr.JSON(label="JSON Data") async def process_gemini_result(video_id, language_code, format_type, model): result = await generate_gemini_timecodes(video_id, language_code, format_type, model) if result is None: return "Error occurred", "", {} summary, json_data = result # Extract timecodes from json_data timecodes = json_data.get("timecodes", []) timecodes_text = "\n".join(timecodes) if timecodes else "No timecodes generated" return summary, timecodes_text, json_data generate_gemini_button.click( process_gemini_result, inputs=[gemini_video_id, gemini_language, gemini_format, gemini_model], outputs=[gemini_output, gemini_timecodes, gemini_json] ) # Launch the application if __name__ == "__main__": demo.launch()