Spaces:
Running
Running
from functools import wraps | |
import logging | |
import gradio as gr | |
import os | |
import modal | |
from openai import OpenAI | |
from dotenv import load_dotenv | |
import re | |
import time | |
import uuid | |
import yt_dlp | |
import tempfile | |
import shutil | |
from pathlib import Path | |
load_dotenv() | |
process_media_remotely = modal.Function.from_name("clipscript-processing-service", "process_media") | |
asr_handle = modal.Cls.from_name("clipscript-asr-service", "ASR") | |
upload_volume = modal.Volume.from_name("clipscript-uploads", create_if_missing=True) | |
llm = "deepseek/deepseek-r1-0528:free" | |
api_key = os.environ.get("OPENROUTER_API_KEY") | |
def retry_on_rate_limit(max_retries: int = 5, base_delay: float = 2.0): | |
"""Decorator for exponential backoff on rate limits""" | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
delay = base_delay | |
for attempt in range(max_retries): | |
try: | |
return func(*args, **kwargs) | |
except Exception as e: | |
# Check for 429 status code in different ways | |
status_code = getattr(getattr(e, 'response', None), 'status_code', None) | |
if status_code == 429 or '429' in str(e) or 'rate limit' in str(e).lower(): | |
logging.warning(f"Rate limit hit. Retrying in {delay:.1f} seconds...") | |
time.sleep(delay) | |
delay *= 2 | |
else: | |
raise | |
raise Exception("Max retries exceeded due to rate limits or other persistent errors.") | |
return wrapper | |
return decorator | |
def extract_youtube_video_id(url: str) -> str: | |
"""Extract YouTube video ID from various YouTube URL formats.""" | |
patterns = [ | |
r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/embed\/|youtube\.com\/v\/)([^&\n?#]+)', | |
r'youtube\.com\/watch\?.*v=([^&\n?#]+)' | |
] | |
for pattern in patterns: | |
match = re.search(pattern, url) | |
if match: | |
return match.group(1) | |
return None | |
def get_youtube_thumbnail_url(video_id: str) -> str: | |
"""Get the high quality thumbnail URL for a YouTube video.""" | |
return f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg" | |
client = OpenAI( | |
base_url="https://openrouter.ai/api/v1", | |
api_key=api_key, | |
) | |
def download_and_convert_youtube_audio(url: str) -> str: | |
""" | |
Downloads audio from a YouTube URL and converts it to a 16kHz mono WAV file. | |
Uses a temporary directory for all intermediate files, ensuring cleanup. | |
Returns the path to the final temporary WAV file. | |
""" | |
temp_dir = tempfile.mkdtemp() | |
cookie_file_path = None | |
try: | |
# Check for YouTube cookies in secrets and write to a temporary file | |
youtube_cookies = os.environ.get("YOUTUBE_COOKIES") | |
if youtube_cookies: | |
# Use NamedTemporaryFile to handle the file creation and cleanup | |
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as tmp_cookie_file: | |
tmp_cookie_file.write(youtube_cookies) | |
cookie_file_path = tmp_cookie_file.name | |
print("Using YouTube cookies from secrets.") | |
output_tmpl = os.path.join(temp_dir, "audio.%(ext)s") | |
ydl_opts = { | |
"format": "bestaudio/best", | |
"outtmpl": output_tmpl, | |
"postprocessors": [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'wav', | |
}], | |
'postprocessor_args': { | |
'extractaudio': ['-ar', '16000', '-ac', '1'] | |
}, | |
"quiet": True, | |
} | |
# Add cookiefile to options if it exists | |
if cookie_file_path: | |
ydl_opts['cookiefile'] = cookie_file_path | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([url]) | |
# Find the downloaded .wav file | |
downloaded_files = list(Path(temp_dir).glob("*.wav")) | |
if not downloaded_files: | |
raise FileNotFoundError("yt-dlp failed to create a WAV file. The video might be protected or unavailable.") | |
# Move the final file to a new temporary location so we can clean up the directory | |
source_path = downloaded_files[0] | |
fd, dest_path = tempfile.mkstemp(suffix=".wav") | |
os.close(fd) | |
shutil.move(source_path, dest_path) | |
return dest_path | |
finally: | |
# Clean up the cookie file if it was created | |
if cookie_file_path and os.path.exists(cookie_file_path): | |
os.remove(cookie_file_path) | |
shutil.rmtree(temp_dir) | |
def handle_transcription(file, url): | |
if not file and not (url and url.strip()): | |
gr.Warning("Please upload a file or enter a URL.") | |
return "Error: Please upload a file or enter a URL." | |
gr.Info("Starting secure transcription... This might take a moment.") | |
try: | |
result = None | |
if url and url.strip(): | |
video_id = extract_youtube_video_id(url) | |
if video_id: | |
converted_wav_path = None | |
try: | |
print(f"Detected YouTube URL. Processing locally: {url}") | |
converted_wav_path = download_and_convert_youtube_audio(url) | |
# Read audio bytes and call ASR service | |
with open(converted_wav_path, "rb") as f: | |
audio_bytes = f.read() | |
print("Sending audio bytes to ASR service.") | |
result = asr_handle().transcribe.remote(audio_bytes=audio_bytes) | |
finally: | |
# Clean up the final temp file | |
if converted_wav_path and os.path.exists(converted_wav_path): | |
os.remove(converted_wav_path) | |
else: | |
# Process other URLs remotely and securely. | |
print(f"Sending URL to Modal for processing: {url}") | |
result = process_media_remotely.remote(url=url) | |
elif file is not None: | |
# For file uploads: | |
# 1. Generate a unique ID for the file. | |
upload_id = f"upload-{uuid.uuid4()}" | |
print(f"Uploading file to Modal volume with ID: {upload_id}") | |
# 2. Upload the local file to the remote volume | |
with upload_volume.batch_upload() as batch: | |
batch.put_file(file, upload_id) | |
# 3. Trigger remote processing by passing the upload ID. | |
print(f"Sending upload ID to Modal for processing: {upload_id}") | |
result = process_media_remotely.remote(upload_id=upload_id) | |
if result.get("error"): | |
return f"Error from ASR service: {result['error']}" | |
return result["text"] | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
# It's good practice to remove the local temp file if it exists | |
if file and os.path.exists(file): | |
os.remove(file) | |
return f"Error: {str(e)}" | |
finally: | |
# Gradio's gr.File widget creates a temporary file. We should clean it up. | |
if file and os.path.exists(file): | |
os.remove(file) | |
def add_transcript_to_chat(transcript: str): | |
if transcript.startswith("Error"): | |
gr.Error("Transcription failed. Please check the logs.") | |
return [] | |
gr.Info("Transcript ready! Generating blog post...") | |
# Return empty list for display but store transcript for LLM processing | |
return [] | |
def user_chat(user_message: str, history: list): | |
return "", history + [{"role": "user", "content": user_message}] | |
def _stream_chat_response(history: list, system_prompt: str, transcript: str = None): | |
if not history and not transcript: | |
# Don't do anything if there's no history and no transcript | |
return | |
if transcript and transcript.startswith("Error"): | |
return | |
# Include transcript as first user message if provided, but don't display it | |
messages = [{"role": "system", "content": system_prompt}] | |
if transcript: | |
messages.append({"role": "user", "content": transcript}) | |
messages.extend(history) | |
stream = client.chat.completions.create( | |
model=llm, | |
messages=messages, | |
stream=True | |
) | |
history.append({"role": "assistant", "content": ""}) | |
response_content = "" | |
for chunk in stream: | |
content = chunk.choices[0].delta.content | |
if content: | |
response_content += content | |
history[-1]["content"] = response_content | |
yield history | |
def generate_blog_post(history: list, transcript: str, context: str): | |
system_prompt = """You are an expert blog writer and editor. Your task is to transform a raw video transcription into a well-structured, engaging, and publish-ready blog post in Markdown format. | |
Core Mandate: Erase the Video Origin | |
This is a critical function. The reader must not know the content came from a video. | |
Eliminate all video-specific language: Remove phrases like "in this video," "thanks for watching," "as you can see here," "welcome to the channel," etc. | |
Scrub all platform calls-to-action: No "like and subscribe," "hit the bell icon," or "comment below." | |
Remove sponsor messages and ads: Completely omit any sponsor mentions. | |
Rephrase visual references: Convert "look at this screen" to a description of the information itself (e.g., "The data reveals that..."). | |
Content & Formatting Rules: | |
Title: Create a compelling, SEO-friendly H1 title. | |
Structure: Use ## for main headings and ### for subheadings to create a logical flow. | |
Readability: Use short paragraphs, bulleted/numbered lists, and bolding for key terms. | |
Refine Prose: Convert conversational speech into clean, professional writing. | |
Remove all filler words (um, uh, like, you know). | |
Fix grammar and consolidate rambling sentences. | |
Flow: Start with a strong introduction and end with a concise summary or conclusion. | |
Your output must be a complete, polished article in Markdown.""" | |
# Combine transcript with additional context if provided | |
full_transcript = transcript | |
if context and context.strip(): | |
full_transcript = f"{transcript}\n\n--- Additional Context ---\n{context.strip()}\n\nThis is some additional context relevant to the transcription above." | |
yield from _stream_chat_response(history, system_prompt, full_transcript) | |
def bot_chat(history: list): | |
system_prompt = "You are a helpful assistant that helps refine a blog post created from an audio transcript. The user will provide instructions for changes and you will return only the updated blog post." | |
yield from _stream_chat_response(history, system_prompt) | |
def update_thumbnail_display(url: str): | |
"""Update the thumbnail display when YouTube URL is entered.""" | |
if not url or not url.strip(): | |
return gr.update(visible=False, value=None) | |
video_id = extract_youtube_video_id(url) | |
if video_id: | |
thumbnail_url = get_youtube_thumbnail_url(video_id) | |
return gr.update(visible=True, value=thumbnail_url) | |
else: | |
return gr.update(visible=False, value=None) | |
# Gradio Interface | |
theme = gr.themes.Ocean() | |
with gr.Blocks(title="ClipScript", theme=theme) as demo: | |
gr.Markdown("# 🎬➡️📝 ClipScript: Video-to-Blog Transformer", elem_classes="hero-title") | |
gr.Markdown("### Upload an audio file, or provide a YouTube/direct URL *of any size*.") | |
with gr.Row(): | |
# Column 1: File input, URL input, and thumbnail | |
with gr.Column(scale=1): | |
file_input = gr.File(label="Upload any audio file (Recommended)", type="filepath", height=200, file_types=["audio", ".webm", ".mp3", ".mp4", ".m4a", ".ogg", ".wav"]) | |
with gr.Row(): | |
with gr.Column(): | |
url_input = gr.Textbox( | |
label="YouTube or Direct Audio URL", | |
placeholder="youtube.com/watch?v=... OR xyz.com/audio.mp3", | |
scale=2 | |
) | |
# YouTube thumbnail display | |
thumbnail_display = gr.Image( | |
label="Thumbnail", | |
visible=False, | |
height=100, | |
show_download_button=False, | |
interactive=False, | |
scale=2 | |
) | |
# Column 2: Transcript view | |
with gr.Column(scale=2): | |
transcript_output = gr.Textbox(label="Transcription POWERED by Modal Labs", lines=12, interactive=True, show_copy_button=True) | |
transcribe_button = gr.Button("Blogify", variant="primary") | |
gr.Markdown("---") | |
# Add Context section | |
context_input = gr.Textbox( | |
label="Additional Context", | |
placeholder="Enter any additional context, code, articles, or any references that relate to the video content...", | |
lines=5, | |
interactive=True | |
) | |
chatbot = gr.Chatbot( | |
label="Blog Post", type="messages", height=500, show_copy_all_button=True, show_copy_button=True, show_share_button=True | |
) | |
chat_input = gr.Textbox( | |
label="Your message", | |
placeholder="Refine the blog post or ask for changes...", | |
container=False, | |
) | |
clear_button = gr.ClearButton([chat_input, chatbot]) | |
# Event handlers to disable/enable inputs based on usage | |
def on_file_upload(file): | |
if file is not None: | |
return gr.update(interactive=False), gr.update(visible=False, value=None) | |
else: | |
return gr.update(interactive=True), gr.update(visible=False, value=None) | |
def on_url_change(url): | |
if url and url.strip(): | |
thumbnail_update = update_thumbnail_display(url) | |
return gr.update(interactive=False), thumbnail_update | |
else: | |
return gr.update(interactive=True), gr.update(visible=False, value=None) | |
file_input.change(fn=on_file_upload, inputs=file_input, outputs=[url_input, thumbnail_display]) | |
url_input.change(fn=on_url_change, inputs=url_input, outputs=[file_input, thumbnail_display]) | |
# Chained events for blog generation | |
( | |
transcribe_button.click( | |
fn=handle_transcription, | |
inputs=[file_input, url_input], | |
outputs=transcript_output, | |
) | |
.then( | |
fn=lambda: gr.update(value=None, interactive=True), | |
outputs=file_input, | |
queue=False, | |
) | |
.then( | |
fn=add_transcript_to_chat, | |
inputs=transcript_output, | |
outputs=chatbot, | |
queue=False, | |
) | |
.then(fn=generate_blog_post, inputs=[chatbot, transcript_output, context_input], outputs=chatbot) | |
) | |
# Event handler for follow-up chat | |
chat_input.submit( | |
fn=user_chat, | |
inputs=[chat_input, chatbot], | |
outputs=[chat_input, chatbot], | |
queue=False, | |
).then(fn=bot_chat, inputs=chatbot, outputs=chatbot) | |
if __name__ == "__main__": | |
demo.launch() |