|
import gradio as gr |
|
import json |
|
import httpx |
|
import os |
|
import traceback |
|
import asyncio |
|
import threading |
|
import uvicorn |
|
from fastapi import FastAPI, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from dotenv import load_dotenv |
|
from utils import format_timestamp, extract_video_id |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
from api_server import app as fastapi_app |
|
|
|
|
|
def start_fastapi_server(): |
|
uvicorn.run(fastapi_app, host="0.0.0.0", port=8000) |
|
|
|
|
|
server_thread = threading.Thread(target=start_fastapi_server, daemon=True) |
|
server_thread.start() |
|
|
|
|
|
import time |
|
time.sleep(3) |
|
|
|
|
|
try: |
|
import requests |
|
response = requests.get("http://localhost:8000/") |
|
print("β
FastAPI server is running successfully") |
|
except Exception as e: |
|
print(f"β οΈ Warning: FastAPI server may not be ready: {e}") |
|
|
|
|
|
API_URL = "http://localhost:8000/api" |
|
|
|
async def search_youtube(query, max_results, order, video_duration): |
|
"""Function for searching videos on YouTube.""" |
|
try: |
|
async with httpx.AsyncClient() as client: |
|
response = await client.post( |
|
f"{API_URL}/search", |
|
json={ |
|
"query": query, |
|
"max_results": max_results, |
|
"order": order, |
|
"video_duration": video_duration if video_duration != "any" else None |
|
} |
|
) |
|
data = response.json() |
|
|
|
if "error" in data and data["error"]: |
|
return f"Error: {data['error']}", {"error": data['error']} |
|
|
|
results = data.get("content", []) |
|
formatted_results = [] |
|
|
|
for video in results: |
|
formatted_results.append( |
|
f"**{video['title']}**\n" |
|
f"ID: {video['video_id']}\n" |
|
f"Channel: {video['channel_title']}\n" |
|
f"Published: {video['published_at']}\n" |
|
f"[Thumbnail]({video['thumbnail']})\n\n" |
|
f"{video['description'][:200]}...\n\n" |
|
f"---\n" |
|
) |
|
|
|
return "\n".join(formatted_results), json.dumps(results, indent=2, ensure_ascii=False) |
|
except Exception as e: |
|
return f"Error: {str(e)}", {"error": str(e)} |
|
|
|
async def get_video_info(video_id): |
|
"""Function for getting video information.""" |
|
try: |
|
async with httpx.AsyncClient() as client: |
|
response = await client.post( |
|
f"{API_URL}/video_info", |
|
json={"video_id": video_id} |
|
) |
|
data = response.json() |
|
|
|
if "error" in data and data["error"]: |
|
return f"Error: {data['error']}", {"error": data['error']} |
|
|
|
video_info = data.get("content", {}) |
|
|
|
formatted_info = ( |
|
f"**{video_info.get('title')}**\n\n" |
|
f"Channel: {video_info.get('channel_title')}\n" |
|
f"Published: {video_info.get('published_at')}\n" |
|
f"Views: {video_info.get('view_count')}\n" |
|
f"Likes: {video_info.get('like_count')}\n" |
|
f"Comments: {video_info.get('comment_count')}\n" |
|
f"Duration: {video_info.get('duration')}\n\n" |
|
f"**Description:**\n{video_info.get('description')}\n\n" |
|
f"**Tags:**\n{', '.join(video_info.get('tags', []))}" |
|
) |
|
|
|
return formatted_info, json.dumps(video_info, indent=2, ensure_ascii=False) |
|
except Exception as e: |
|
return f"Error: {str(e)}", {"error": str(e)} |
|
|
|
async def get_transcript(video_id, language_code): |
|
"""Function for getting video transcript.""" |
|
try: |
|
async with httpx.AsyncClient() as client: |
|
response = await client.post( |
|
f"{API_URL}/transcript", |
|
json={ |
|
"video_id": video_id, |
|
"language_code": language_code if language_code else None |
|
} |
|
) |
|
data = response.json() |
|
|
|
if "error" in data and data["error"]: |
|
error_msg = data["error"] |
|
|
|
if "YouTube is blocking requests" in error_msg or "Could not retrieve a transcript" in error_msg: |
|
return ( |
|
"π« **YouTube IP Block Detected**\n\n" |
|
"YouTube is blocking requests from this cloud platform. This is a common limitation for:\n" |
|
"- Hugging Face Spaces\n" |
|
"- AWS, Google Cloud, Azure\n" |
|
"- Other cloud providers\n\n" |
|
"**Solutions:**\n" |
|
"- Try a different video\n" |
|
"- Use the Video Search feature (usually works)\n" |
|
"- Run this tool locally for full access\n\n" |
|
f"Technical error: {error_msg[:200]}..." |
|
), {"error": "IP_BLOCKED", "original_error": error_msg} |
|
return f"Error: {error_msg}", {"error": error_msg} |
|
|
|
transcript = data.get("content", []) |
|
|
|
formatted_transcript = "" |
|
for entry in transcript: |
|
start_time = entry.get("start", 0) |
|
duration = entry.get("duration", 0) |
|
end_time = start_time + duration |
|
|
|
|
|
start_formatted = format_timestamp(start_time) |
|
end_formatted = format_timestamp(end_time) |
|
|
|
formatted_transcript += f"[{start_formatted} - {end_formatted}] {entry.get('text', '')}\n\n" |
|
|
|
return formatted_transcript, json.dumps(transcript, indent=2, ensure_ascii=False) |
|
except Exception as e: |
|
return f"Error: {str(e)}", {"error": str(e)} |
|
|
|
async def get_available_languages(video_id): |
|
"""Function for getting available transcript languages.""" |
|
try: |
|
async with httpx.AsyncClient() as client: |
|
response = await client.post( |
|
f"{API_URL}/transcript_languages", |
|
json={"video_id": video_id} |
|
) |
|
data = response.json() |
|
|
|
if "error" in data and data["error"]: |
|
error_msg = data["error"] |
|
|
|
if "YouTube is blocking requests" in error_msg or "Could not retrieve a transcript" in error_msg: |
|
return ( |
|
"π« **YouTube IP Block Detected**\n\n" |
|
"Cannot check available languages due to YouTube's cloud IP blocking.\n\n" |
|
"**Try:**\n" |
|
"- Video Search (usually works)\n" |
|
"- Different video\n" |
|
"- Local installation\n\n" |
|
f"Technical error: {error_msg[:200]}..." |
|
), {"error": "IP_BLOCKED", "original_error": error_msg} |
|
return f"Error: {error_msg}", {"error": error_msg} |
|
|
|
languages = data.get("content", []) |
|
|
|
formatted_languages = [] |
|
for lang in languages: |
|
status = "Auto-generated" if lang.get("is_generated") else "Official subtitles" |
|
translatable = "Translation available" if lang.get("is_translatable") else "Translation not available" |
|
formatted_languages.append( |
|
f"{lang.get('language')} ({lang.get('language_code')}): {status}, {translatable}" |
|
) |
|
|
|
return "\n".join(formatted_languages), json.dumps(languages, indent=2, ensure_ascii=False) |
|
except Exception as e: |
|
return f"Error: {str(e)}", {"error": str(e)} |
|
|
|
async def generate_timecodes(video_id, language_code, segment_length, format_type): |
|
"""Function for generating timecodes.""" |
|
try: |
|
async with httpx.AsyncClient() as client: |
|
response = await client.post( |
|
f"{API_URL}/timecodes", |
|
json={ |
|
"video_id": video_id, |
|
"language_code": language_code if language_code else None, |
|
"segment_length": segment_length, |
|
"format": format_type |
|
} |
|
) |
|
data = response.json() |
|
|
|
if "error" in data and data["error"]: |
|
error_msg = data["error"] |
|
|
|
if "YouTube is blocking requests" in error_msg or "Could not retrieve a transcript" in error_msg: |
|
return ( |
|
"π« **Cannot Generate Timecodes - YouTube IP Block**\n\n" |
|
"Timecode generation requires transcript access, which is blocked by YouTube for cloud platforms.\n\n" |
|
"**Alternatives:**\n" |
|
"- Use Video Search to find videos\n" |
|
"- Try AI Timecodes (may work with different approach)\n" |
|
"- Run locally for full transcript access\n\n" |
|
f"Technical error: {error_msg[:200]}..." |
|
), {"error": "IP_BLOCKED", "original_error": error_msg} |
|
return f"Error: {error_msg}", {"error": error_msg} |
|
|
|
timecodes = data.get("content", {}).get("timecodes", []) |
|
|
|
if format_type == "youtube": |
|
formatted_timecodes = "```\n" + "\n".join(timecodes) + "\n```" |
|
elif format_type == "markdown": |
|
formatted_timecodes = "\n".join(timecodes) |
|
|
|
return formatted_timecodes, json.dumps(data.get("content", {}), indent=2, ensure_ascii=False) |
|
except Exception as e: |
|
return f"Error: {str(e)}", {"error": str(e)} |
|
|
|
async def generate_gemini_timecodes(video_id, language_code, format_type, model): |
|
"""Function for generating timecodes using Gemini.""" |
|
try: |
|
print(f"Sending request to {API_URL}/gemini_timecodes") |
|
print(f"Parameters: video_id={video_id}, language_code={language_code}, format={format_type}, model={model}") |
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
response = await client.post( |
|
f"{API_URL}/gemini_timecodes", |
|
json={ |
|
"video_id": video_id, |
|
"language_code": language_code, |
|
"format": format_type, |
|
"model": model |
|
}, |
|
timeout=120 |
|
) |
|
|
|
print(f"Response status: {response.status_code}") |
|
|
|
|
|
data = response.json() |
|
|
|
if "error" in data: |
|
print(f"Error in API response: {data['error']}") |
|
error_msg = data['error'] |
|
|
|
|
|
if "YouTube is blocking requests" in error_msg or "Could not retrieve a transcript" in error_msg: |
|
return ( |
|
"π« **AI Timecodes Unavailable - YouTube IP Block**\n\n" |
|
"AI timecode generation requires transcript access, which YouTube blocks on cloud platforms.\n\n" |
|
"**What still works:**\n" |
|
"- π Video Search\n" |
|
"- βΉοΈ Video Info\n\n" |
|
"**For full AI features:**\n" |
|
"- Download and run locally\n" |
|
"- Use personal computer/server\n\n" |
|
f"Technical details: {error_msg[:200]}..." |
|
), {"error": "IP_BLOCKED", "original_error": error_msg} |
|
|
|
return f"β οΈ Error: {error_msg}", {"error": error_msg} |
|
|
|
|
|
content = data.get("content", {}) |
|
timecodes = content.get("timecodes", []) |
|
|
|
print(f"Received {len(timecodes)} timecodes") |
|
|
|
|
|
if timecodes: |
|
timecodes_text = "\n".join(timecodes) |
|
|
|
|
|
model_info = content.get("model", "Unknown") |
|
language_info = content.get("detected_language", "Unknown") |
|
duration_info = content.get("video_duration_minutes", "Unknown") |
|
|
|
summary = f"π€ Model: {model_info}\nπ£οΈ Language: {language_info}\nβ±οΈ Duration: {duration_info} min\nπ Timecodes: {len(timecodes)}" |
|
|
|
return summary, content |
|
else: |
|
return "β οΈ No timecodes generated", {"message": "No timecodes generated"} |
|
|
|
except Exception as e: |
|
print(f"Exception during timecode generation: {str(e)}") |
|
traceback.print_exc() |
|
return f"Error: {str(e)}", {"error": str(e)} |
|
|
|
|
|
with gr.Blocks(title="YouTube MCP", theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# π¬ YouTube Creator MetaData Extractor") |
|
gr.Markdown("This tool helps content creators analyze YouTube videos and generate metadata using AI") |
|
gr.Markdown("### Supports all YouTube URL formats: regular links, short links, shorts and embedded videos") |
|
gr.Markdown("**π‘ Language codes:** uk = Ukrainian, ru = Russian, en = English (ISO 639-1 standard)") |
|
|
|
|
|
gr.Markdown(""" |
|
### β οΈ Important Notice for Cloud Platforms |
|
**YouTube may block requests from cloud IPs (Hugging Face Spaces, AWS, etc.)** |
|
|
|
If you encounter "Video not found" or "IP blocked" errors: |
|
- This is a YouTube limitation, not a bug in our tool |
|
- **Video Search** and **Video Info** should work normally |
|
- **Transcripts** and **Timecodes** may be limited for some videos |
|
- For full functionality, consider running this tool locally |
|
""") |
|
|
|
gr.Markdown("---") |
|
|
|
with gr.Tab("π Video Search"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
search_query = gr.Textbox(label="Search Query", placeholder="Enter your search query...") |
|
with gr.Row(): |
|
max_results = gr.Slider(minimum=1, maximum=50, value=10, step=1, label="Max Results") |
|
order = gr.Dropdown( |
|
choices=["relevance", "date", "viewCount", "rating", "title"], |
|
value="relevance", |
|
label="Sort By" |
|
) |
|
video_duration = gr.Dropdown( |
|
choices=["any", "short", "medium", "long"], |
|
value="any", |
|
label="Duration" |
|
) |
|
search_button = gr.Button("π Search", variant="primary") |
|
|
|
with gr.Column(): |
|
search_results = gr.Markdown(label="Search Results") |
|
search_json = gr.JSON(label="JSON Data") |
|
|
|
search_button.click( |
|
search_youtube, |
|
inputs=[search_query, max_results, order, video_duration], |
|
outputs=[search_results, search_json] |
|
) |
|
|
|
with gr.Tab("βΉοΈ Video Info"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
video_id_input = gr.Textbox( |
|
label="Video ID or URL", |
|
placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..." |
|
) |
|
get_info_button = gr.Button("π Get Info", variant="primary") |
|
|
|
with gr.Column(): |
|
video_info_output = gr.Markdown(label="Video Information") |
|
video_info_json = gr.JSON(label="JSON Data") |
|
|
|
get_info_button.click( |
|
get_video_info, |
|
inputs=[video_id_input], |
|
outputs=[video_info_output, video_info_json] |
|
) |
|
|
|
with gr.Tab("π Transcript"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
transcript_video_id = gr.Textbox( |
|
label="Video ID or URL", |
|
placeholder="Enter video ID or full URL..." |
|
) |
|
language_code = gr.Textbox(label="Language Code (optional)", placeholder="uk (Ukrainian), ru (Russian), en (English), etc...") |
|
with gr.Row(): |
|
get_transcript_button = gr.Button("π Get Transcript", variant="primary") |
|
get_languages_button = gr.Button("π Available Languages") |
|
|
|
with gr.Column(): |
|
transcript_output = gr.Markdown(label="Transcript") |
|
transcript_json = gr.JSON(label="JSON Data") |
|
|
|
get_transcript_button.click( |
|
get_transcript, |
|
inputs=[transcript_video_id, language_code], |
|
outputs=[transcript_output, transcript_json] |
|
) |
|
|
|
get_languages_button.click( |
|
get_available_languages, |
|
inputs=[transcript_video_id], |
|
outputs=[transcript_output, transcript_json] |
|
) |
|
|
|
with gr.Tab("β±οΈ Basic Timecodes"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
timecode_video_id = gr.Textbox( |
|
label="Video ID or URL", |
|
placeholder="Enter video ID or full URL..." |
|
) |
|
timecode_language = gr.Textbox(label="Language Code (optional)", placeholder="uk (Ukrainian), ru (Russian), en (English), etc...") |
|
segment_length = gr.Slider(minimum=30, maximum=300, value=60, step=30, label="Segment Length (seconds)") |
|
format_type = gr.Dropdown( |
|
choices=["youtube", "markdown"], |
|
value="youtube", |
|
label="Format" |
|
) |
|
generate_timecodes_button = gr.Button("β±οΈ Generate Timecodes", variant="primary") |
|
|
|
with gr.Column(): |
|
timecodes_output = gr.Markdown(label="Timecodes") |
|
timecodes_json = gr.JSON(label="JSON Data") |
|
|
|
generate_timecodes_button.click( |
|
generate_timecodes, |
|
inputs=[timecode_video_id, timecode_language, segment_length, format_type], |
|
outputs=[timecodes_output, timecodes_json] |
|
) |
|
|
|
with gr.Tab("π€ AI Timecodes"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
gemini_video_id = gr.Textbox( |
|
label="Video ID or URL", |
|
placeholder="Enter video ID or full URL..." |
|
) |
|
gemini_language = gr.Textbox(label="Language Code (optional)", placeholder="uk (Ukrainian), ru (Russian), en (English), etc...") |
|
gemini_format = gr.Dropdown( |
|
choices=["youtube", "markdown"], |
|
value="youtube", |
|
label="Format" |
|
) |
|
gemini_model = gr.Dropdown( |
|
choices=["gemini-2.0-flash-001", "gemini-2.0-pro-001", "gemini-2.0-pro-vision-001"], |
|
value="gemini-2.0-flash-001", |
|
label="AI Model" |
|
) |
|
generate_gemini_button = gr.Button("π€ Generate AI Timecodes", variant="primary") |
|
|
|
with gr.Column(): |
|
gemini_output = gr.Markdown(label="Generation Info") |
|
gemini_timecodes = gr.Textbox(label="AI Timecodes", lines=10, max_lines=20, show_copy_button=True) |
|
gemini_json = gr.JSON(label="JSON Data") |
|
|
|
async def process_gemini_result(video_id, language_code, format_type, model): |
|
result = await generate_gemini_timecodes(video_id, language_code, format_type, model) |
|
if result is None: |
|
return "Error occurred", "", {} |
|
|
|
summary, json_data = result |
|
|
|
|
|
timecodes = json_data.get("timecodes", []) |
|
timecodes_text = "\n".join(timecodes) if timecodes else "No timecodes generated" |
|
|
|
return summary, timecodes_text, json_data |
|
|
|
generate_gemini_button.click( |
|
process_gemini_result, |
|
inputs=[gemini_video_id, gemini_language, gemini_format, gemini_model], |
|
outputs=[gemini_output, gemini_timecodes, gemini_json] |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
demo.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=False, |
|
show_error=True, |
|
debug=False |
|
) |