@woai
🧹 Major code cleanup and internationalization - Remove Russian comments/strings, translate UI to English, clean linter errors, remove hardcoded tokens, delete test files. Ready for production deployment
e775565
import gradio as gr | |
import json | |
import httpx | |
import os | |
import traceback | |
from dotenv import load_dotenv | |
from utils import format_timestamp, extract_video_id | |
# Load environment variables | |
load_dotenv() | |
# API URL for local development | |
API_URL = "http://127.0.0.1:8080/api" | |
# API URL for Hugging Face Spaces | |
# API_URL = "https://your-huggingface-space-url/api" | |
async def search_youtube(query, max_results, order, video_duration): | |
"""Function for searching videos on YouTube.""" | |
try: | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
f"{API_URL}/search", | |
json={ | |
"query": query, | |
"max_results": max_results, | |
"order": order, | |
"video_duration": video_duration if video_duration != "any" else None | |
} | |
) | |
data = response.json() | |
if "error" in data and data["error"]: | |
return f"Error: {data['error']}", None | |
results = data.get("content", []) | |
formatted_results = [] | |
for video in results: | |
formatted_results.append( | |
f"**{video['title']}**\n" | |
f"ID: {video['video_id']}\n" | |
f"Channel: {video['channel_title']}\n" | |
f"Published: {video['published_at']}\n" | |
f"[Thumbnail]({video['thumbnail']})\n\n" | |
f"{video['description'][:200]}...\n\n" | |
f"---\n" | |
) | |
return "\n".join(formatted_results), json.dumps(results, indent=2, ensure_ascii=False) | |
except Exception as e: | |
return f"Error: {str(e)}", None | |
async def get_video_info(video_id): | |
"""Function for getting video information.""" | |
try: | |
# No need to extract video ID here, it is done on the server | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
f"{API_URL}/video_info", | |
json={"video_id": video_id} | |
) | |
data = response.json() | |
if "error" in data and data["error"]: | |
return f"Error: {data['error']}", None | |
video_info = data.get("content", {}) | |
formatted_info = ( | |
f"**{video_info.get('title')}**\n\n" | |
f"Channel: {video_info.get('channel_title')}\n" | |
f"Published: {video_info.get('published_at')}\n" | |
f"Views: {video_info.get('view_count')}\n" | |
f"Likes: {video_info.get('like_count')}\n" | |
f"Comments: {video_info.get('comment_count')}\n" | |
f"Duration: {video_info.get('duration')}\n\n" | |
f"**Description:**\n{video_info.get('description')}\n\n" | |
f"**Tags:**\n{', '.join(video_info.get('tags', []))}" | |
) | |
return formatted_info, json.dumps(video_info, indent=2, ensure_ascii=False) | |
except Exception as e: | |
return f"Error: {str(e)}", None | |
async def get_transcript(video_id, language_code): | |
"""Function for getting video transcript.""" | |
try: | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
f"{API_URL}/transcript", | |
json={ | |
"video_id": video_id, | |
"language_code": language_code if language_code else None | |
} | |
) | |
data = response.json() | |
if "error" in data and data["error"]: | |
return f"Error: {data['error']}", None | |
transcript = data.get("content", []) | |
formatted_transcript = "" | |
for entry in transcript: | |
start_time = entry.get("start", 0) | |
duration = entry.get("duration", 0) | |
end_time = start_time + duration | |
# Format time to hours:minutes:seconds format | |
start_formatted = format_timestamp(start_time) | |
end_formatted = format_timestamp(end_time) | |
formatted_transcript += f"[{start_formatted} - {end_formatted}] {entry.get('text', '')}\n\n" | |
return formatted_transcript, json.dumps(transcript, indent=2, ensure_ascii=False) | |
except Exception as e: | |
return f"Error: {str(e)}", None | |
async def get_available_languages(video_id): | |
"""Function for getting available transcript languages.""" | |
try: | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
f"{API_URL}/transcript_languages", | |
json={"video_id": video_id} | |
) | |
data = response.json() | |
if "error" in data and data["error"]: | |
return f"Error: {data['error']}", None | |
languages = data.get("content", []) | |
formatted_languages = [] | |
for lang in languages: | |
status = "Auto-generated" if lang.get("is_generated") else "Official subtitles" | |
translatable = "Translation available" if lang.get("is_translatable") else "Translation not available" | |
formatted_languages.append( | |
f"{lang.get('language')} ({lang.get('language_code')}): {status}, {translatable}" | |
) | |
return "\n".join(formatted_languages), json.dumps(languages, indent=2, ensure_ascii=False) | |
except Exception as e: | |
return f"Error: {str(e)}", None | |
async def generate_timecodes(video_id, language_code, segment_length, format_type): | |
"""Function for generating timecodes.""" | |
try: | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
f"{API_URL}/timecodes", | |
json={ | |
"video_id": video_id, | |
"language_code": language_code if language_code else None, | |
"segment_length": segment_length, | |
"format": format_type | |
} | |
) | |
data = response.json() | |
if "error" in data and data["error"]: | |
return f"Error: {data['error']}", None | |
timecodes = data.get("content", {}).get("timecodes", []) | |
if format_type == "youtube": | |
formatted_timecodes = "```\n" + "\n".join(timecodes) + "\n```" | |
elif format_type == "markdown": | |
formatted_timecodes = "\n".join(timecodes) | |
else: | |
formatted_timecodes = "```\n" + "\n".join(timecodes) + "\n```" | |
return formatted_timecodes, json.dumps(data.get("content", {}), indent=2, ensure_ascii=False) | |
except Exception as e: | |
return f"Error: {str(e)}", None | |
async def generate_gemini_timecodes(video_id, language_code, format_type, model): | |
"""Function for generating timecodes using Gemini.""" | |
try: | |
print(f"Sending request to {API_URL}/gemini_timecodes") | |
print(f"Parameters: video_id={video_id}, language_code={language_code}, format={format_type}, model={model}") | |
# Send request to API | |
async with httpx.AsyncClient() as client: | |
response = await client.post( | |
f"{API_URL}/gemini_timecodes", | |
json={ | |
"video_id": video_id, | |
"language_code": language_code, | |
"format": format_type, | |
"model": model | |
}, | |
timeout=120 # Increase timeout for Gemini API | |
) | |
print(f"Response status: {response.status_code}") | |
# Parse response | |
data = response.json() | |
if "error" in data: | |
print(f"Error in API response: {data['error']}") | |
return f"⚠️ Error: {data['error']}", {"error": data['error']} | |
# Extract timecodes from response | |
content = data.get("content", {}) | |
timecodes = content.get("timecodes", []) | |
print(f"Received {len(timecodes)} timecodes") | |
# Format timecodes for display | |
if timecodes: | |
timecodes_text = "\n".join(timecodes) | |
# Model and language information | |
model_info = content.get("model", "Unknown") | |
language_info = content.get("detected_language", "Unknown") | |
duration_info = content.get("video_duration_minutes", "Unknown") | |
summary = f"🤖 Model: {model_info}\n🗣️ Language: {language_info}\n⏱️ Duration: {duration_info} min\n📝 Timecodes: {len(timecodes)}" | |
return summary, content # Return content object instead of timecodes_text | |
else: | |
return "⚠️ No timecodes generated", {"message": "No timecodes generated"} | |
except Exception as e: | |
print(f"Exception during timecode generation: {str(e)}") | |
traceback.print_exc() | |
return f"Error: {str(e)}", {"error": str(e)} | |
# Create Gradio interface | |
with gr.Blocks(title="YouTube MCP") as demo: | |
gr.Markdown("# YouTube Model Context Protocol (MCP)") | |
gr.Markdown("This interface allows interaction with YouTube API through MCP protocol") | |
with gr.Tab("Video Search"): | |
with gr.Row(): | |
with gr.Column(): | |
search_query = gr.Textbox(label="Search Query", placeholder="Enter search query...") | |
with gr.Row(): | |
max_results = gr.Slider(minimum=1, maximum=50, value=10, step=1, label="Number of Results") | |
order = gr.Dropdown( | |
choices=["relevance", "date", "viewCount", "rating", "title"], | |
value="relevance", | |
label="Sort By" | |
) | |
video_duration = gr.Dropdown( | |
choices=["any", "short", "medium", "long"], | |
value="any", | |
label="Duration" | |
) | |
search_button = gr.Button("Search") | |
with gr.Column(): | |
search_results = gr.Markdown(label="Results") | |
search_json = gr.JSON(label="JSON Data") | |
search_button.click( | |
search_youtube, | |
inputs=[search_query, max_results, order, video_duration], | |
outputs=[search_results, search_json] | |
) | |
with gr.Tab("Video Information"): | |
with gr.Row(): | |
with gr.Column(): | |
video_id_input = gr.Textbox( | |
label="Video ID or URL", | |
placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..." | |
) | |
get_info_button = gr.Button("Get Information") | |
with gr.Column(): | |
video_info_output = gr.Markdown(label="Video Information") | |
video_info_json = gr.JSON(label="JSON Data") | |
get_info_button.click( | |
get_video_info, | |
inputs=[video_id_input], | |
outputs=[video_info_output, video_info_json] | |
) | |
with gr.Tab("Video Transcript"): | |
with gr.Row(): | |
with gr.Column(): | |
transcript_video_id = gr.Textbox( | |
label="Video ID or URL", | |
placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..." | |
) | |
language_code = gr.Textbox(label="Language Code (optional)", placeholder="ru, en, etc...") | |
with gr.Row(): | |
get_transcript_button = gr.Button("Get Transcript") | |
get_languages_button = gr.Button("Get Available Languages") | |
with gr.Column(): | |
transcript_output = gr.Markdown(label="Transcript") | |
transcript_json = gr.JSON(label="JSON Data") | |
get_transcript_button.click( | |
get_transcript, | |
inputs=[transcript_video_id, language_code], | |
outputs=[transcript_output, transcript_json] | |
) | |
get_languages_button.click( | |
get_available_languages, | |
inputs=[transcript_video_id], | |
outputs=[transcript_output, transcript_json] | |
) | |
with gr.Tab("Timecodes"): | |
with gr.Row(): | |
with gr.Column(): | |
timecode_video_id = gr.Textbox( | |
label="Video ID or URL", | |
placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..." | |
) | |
timecode_language = gr.Textbox(label="Language Code (optional)", placeholder="ru, en, etc...") | |
segment_length = gr.Slider(minimum=30, maximum=300, value=60, step=30, label="Segment Length (seconds)") | |
format_type = gr.Dropdown( | |
choices=["youtube", "markdown"], | |
value="youtube", | |
label="Timecode Format" | |
) | |
generate_timecodes_button = gr.Button("Generate Timecodes") | |
with gr.Column(): | |
timecodes_output = gr.Markdown(label="Timecodes") | |
timecodes_json = gr.JSON(label="JSON Data") | |
generate_timecodes_button.click( | |
generate_timecodes, | |
inputs=[timecode_video_id, timecode_language, segment_length, format_type], | |
outputs=[timecodes_output, timecodes_json] | |
) | |
with gr.Tab("Gemini Timecodes"): | |
with gr.Row(): | |
with gr.Column(): | |
gemini_video_id = gr.Textbox( | |
label="Video ID or URL", | |
placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..." | |
) | |
gemini_language = gr.Textbox(label="Language Code (optional)", placeholder="ru, en, etc...") | |
gemini_format = gr.Dropdown( | |
choices=["youtube", "markdown"], | |
value="youtube", | |
label="Timecode Format" | |
) | |
gemini_model = gr.Dropdown( | |
choices=["gemini-2.0-flash-001", "gemini-2.0-pro-001", "gemini-2.0-pro-vision-001"], | |
value="gemini-2.0-flash-001", | |
label="Gemini Model" | |
) | |
generate_gemini_button = gr.Button("Generate Timecodes with Gemini") | |
with gr.Column(): | |
gemini_output = gr.Markdown(label="Generation Information") | |
gemini_timecodes = gr.Textbox(label="Timecodes", lines=10, max_lines=20, show_copy_button=True) | |
gemini_json = gr.JSON(label="JSON Data") | |
async def process_gemini_result(video_id, language_code, format_type, model): | |
result = await generate_gemini_timecodes(video_id, language_code, format_type, model) | |
if result is None: | |
return "Error occurred", "", {} | |
summary, json_data = result | |
# Extract timecodes from json_data | |
timecodes = json_data.get("timecodes", []) | |
timecodes_text = "\n".join(timecodes) if timecodes else "No timecodes generated" | |
return summary, timecodes_text, json_data | |
generate_gemini_button.click( | |
process_gemini_result, | |
inputs=[gemini_video_id, gemini_language, gemini_format, gemini_model], | |
outputs=[gemini_output, gemini_timecodes, gemini_json] | |
) | |
# Launch the application | |
if __name__ == "__main__": | |
demo.launch() | |