ClipScript / app.py
muzzz's picture
fix
594b1dc
from functools import wraps
import logging
import gradio as gr
import os
import modal
from openai import OpenAI
from dotenv import load_dotenv
import re
import time
import uuid
import yt_dlp
import tempfile
import shutil
from pathlib import Path
load_dotenv()
process_media_remotely = modal.Function.from_name("clipscript-processing-service", "process_media")
asr_handle = modal.Cls.from_name("clipscript-asr-service", "ASR")
upload_volume = modal.Volume.from_name("clipscript-uploads", create_if_missing=True)
llm = "deepseek/deepseek-r1-0528:free"
api_key = os.environ.get("OPENROUTER_API_KEY")
def retry_on_rate_limit(max_retries: int = 5, base_delay: float = 2.0):
"""Decorator for exponential backoff on rate limits"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
delay = base_delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
# Check for 429 status code in different ways
status_code = getattr(getattr(e, 'response', None), 'status_code', None)
if status_code == 429 or '429' in str(e) or 'rate limit' in str(e).lower():
logging.warning(f"Rate limit hit. Retrying in {delay:.1f} seconds...")
time.sleep(delay)
delay *= 2
else:
raise
raise Exception("Max retries exceeded due to rate limits or other persistent errors.")
return wrapper
return decorator
def extract_youtube_video_id(url: str) -> str:
"""Extract YouTube video ID from various YouTube URL formats."""
patterns = [
r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/embed\/|youtube\.com\/v\/)([^&\n?#]+)',
r'youtube\.com\/watch\?.*v=([^&\n?#]+)'
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def get_youtube_thumbnail_url(video_id: str) -> str:
"""Get the high quality thumbnail URL for a YouTube video."""
return f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg"
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key,
)
def download_and_convert_youtube_audio(url: str) -> str:
"""
Downloads audio from a YouTube URL and converts it to a 16kHz mono WAV file.
Uses a temporary directory for all intermediate files, ensuring cleanup.
Returns the path to the final temporary WAV file.
"""
temp_dir = tempfile.mkdtemp()
cookie_file_path = None
try:
# Check for YouTube cookies in secrets and write to a temporary file
youtube_cookies = os.environ.get("YOUTUBE_COOKIES")
if youtube_cookies:
# Use NamedTemporaryFile to handle the file creation and cleanup
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as tmp_cookie_file:
tmp_cookie_file.write(youtube_cookies)
cookie_file_path = tmp_cookie_file.name
print("Using YouTube cookies from secrets.")
output_tmpl = os.path.join(temp_dir, "audio.%(ext)s")
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": output_tmpl,
"postprocessors": [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
}],
'postprocessor_args': {
'extractaudio': ['-ar', '16000', '-ac', '1']
},
"quiet": True,
}
# Add cookiefile to options if it exists
if cookie_file_path:
ydl_opts['cookiefile'] = cookie_file_path
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
# Find the downloaded .wav file
downloaded_files = list(Path(temp_dir).glob("*.wav"))
if not downloaded_files:
raise FileNotFoundError("yt-dlp failed to create a WAV file. The video might be protected or unavailable.")
# Move the final file to a new temporary location so we can clean up the directory
source_path = downloaded_files[0]
fd, dest_path = tempfile.mkstemp(suffix=".wav")
os.close(fd)
shutil.move(source_path, dest_path)
return dest_path
finally:
# Clean up the cookie file if it was created
if cookie_file_path and os.path.exists(cookie_file_path):
os.remove(cookie_file_path)
shutil.rmtree(temp_dir)
def handle_transcription(file, url):
if not file and not (url and url.strip()):
gr.Warning("Please upload a file or enter a URL.")
return "Error: Please upload a file or enter a URL."
gr.Info("Starting secure transcription... This might take a moment.")
try:
result = None
if url and url.strip():
video_id = extract_youtube_video_id(url)
if video_id:
converted_wav_path = None
try:
print(f"Detected YouTube URL. Processing locally: {url}")
converted_wav_path = download_and_convert_youtube_audio(url)
# Read audio bytes and call ASR service
with open(converted_wav_path, "rb") as f:
audio_bytes = f.read()
print("Sending audio bytes to ASR service.")
result = asr_handle().transcribe.remote(audio_bytes=audio_bytes)
finally:
# Clean up the final temp file
if converted_wav_path and os.path.exists(converted_wav_path):
os.remove(converted_wav_path)
else:
# Process other URLs remotely and securely.
print(f"Sending URL to Modal for processing: {url}")
result = process_media_remotely.remote(url=url)
elif file is not None:
# For file uploads:
# 1. Generate a unique ID for the file.
upload_id = f"upload-{uuid.uuid4()}"
print(f"Uploading file to Modal volume with ID: {upload_id}")
# 2. Upload the local file to the remote volume
with upload_volume.batch_upload() as batch:
batch.put_file(file, upload_id)
# 3. Trigger remote processing by passing the upload ID.
print(f"Sending upload ID to Modal for processing: {upload_id}")
result = process_media_remotely.remote(upload_id=upload_id)
if result.get("error"):
return f"Error from ASR service: {result['error']}"
return result["text"]
except Exception as e:
print(f"An error occurred: {e}")
# It's good practice to remove the local temp file if it exists
if file and os.path.exists(file):
os.remove(file)
return f"Error: {str(e)}"
finally:
# Gradio's gr.File widget creates a temporary file. We should clean it up.
if file and os.path.exists(file):
os.remove(file)
def add_transcript_to_chat(transcript: str):
if transcript.startswith("Error"):
gr.Error("Transcription failed. Please check the logs.")
return []
gr.Info("Transcript ready! Generating blog post...")
# Return empty list for display but store transcript for LLM processing
return []
def user_chat(user_message: str, history: list):
return "", history + [{"role": "user", "content": user_message}]
@retry_on_rate_limit(max_retries=3, base_delay=1.0)
def _stream_chat_response(history: list, system_prompt: str, transcript: str = None):
if not history and not transcript:
# Don't do anything if there's no history and no transcript
return
if transcript and transcript.startswith("Error"):
return
# Include transcript as first user message if provided, but don't display it
messages = [{"role": "system", "content": system_prompt}]
if transcript:
messages.append({"role": "user", "content": transcript})
messages.extend(history)
stream = client.chat.completions.create(
model=llm,
messages=messages,
stream=True
)
history.append({"role": "assistant", "content": ""})
response_content = ""
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
response_content += content
history[-1]["content"] = response_content
yield history
def generate_blog_post(history: list, transcript: str, context: str):
system_prompt = """You are an expert blog writer and editor. Your task is to transform a raw video transcription into a well-structured, engaging, and publish-ready blog post in Markdown format.
Core Mandate: Erase the Video Origin
This is a critical function. The reader must not know the content came from a video.
Eliminate all video-specific language: Remove phrases like "in this video," "thanks for watching," "as you can see here," "welcome to the channel," etc.
Scrub all platform calls-to-action: No "like and subscribe," "hit the bell icon," or "comment below."
Remove sponsor messages and ads: Completely omit any sponsor mentions.
Rephrase visual references: Convert "look at this screen" to a description of the information itself (e.g., "The data reveals that...").
Content & Formatting Rules:
Title: Create a compelling, SEO-friendly H1 title.
Structure: Use ## for main headings and ### for subheadings to create a logical flow.
Readability: Use short paragraphs, bulleted/numbered lists, and bolding for key terms.
Refine Prose: Convert conversational speech into clean, professional writing.
Remove all filler words (um, uh, like, you know).
Fix grammar and consolidate rambling sentences.
Flow: Start with a strong introduction and end with a concise summary or conclusion.
Your output must be a complete, polished article in Markdown."""
# Combine transcript with additional context if provided
full_transcript = transcript
if context and context.strip():
full_transcript = f"{transcript}\n\n--- Additional Context ---\n{context.strip()}\n\nThis is some additional context relevant to the transcription above."
yield from _stream_chat_response(history, system_prompt, full_transcript)
def bot_chat(history: list):
system_prompt = "You are a helpful assistant that helps refine a blog post created from an audio transcript. The user will provide instructions for changes and you will return only the updated blog post."
yield from _stream_chat_response(history, system_prompt)
def update_thumbnail_display(url: str):
"""Update the thumbnail display when YouTube URL is entered."""
if not url or not url.strip():
return gr.update(visible=False, value=None)
video_id = extract_youtube_video_id(url)
if video_id:
thumbnail_url = get_youtube_thumbnail_url(video_id)
return gr.update(visible=True, value=thumbnail_url)
else:
return gr.update(visible=False, value=None)
# Gradio Interface
theme = gr.themes.Ocean()
with gr.Blocks(title="ClipScript", theme=theme) as demo:
gr.Markdown("# 🎬➡️📝 ClipScript: Video-to-Blog Transformer", elem_classes="hero-title")
gr.Markdown("### Upload an audio file, or provide a YouTube/direct URL *of any size*.")
with gr.Row():
# Column 1: File input, URL input, and thumbnail
with gr.Column(scale=1):
file_input = gr.File(label="Upload any audio file (Recommended)", type="filepath", height=200, file_types=["audio", ".webm", ".mp3", ".mp4", ".m4a", ".ogg", ".wav"])
with gr.Row():
with gr.Column():
url_input = gr.Textbox(
label="YouTube or Direct Audio URL",
placeholder="youtube.com/watch?v=... OR xyz.com/audio.mp3",
scale=2
)
# YouTube thumbnail display
thumbnail_display = gr.Image(
label="Thumbnail",
visible=False,
height=100,
show_download_button=False,
interactive=False,
scale=2
)
# Column 2: Transcript view
with gr.Column(scale=2):
transcript_output = gr.Textbox(label="Transcription POWERED by Modal Labs", lines=12, interactive=True, show_copy_button=True)
transcribe_button = gr.Button("Blogify", variant="primary")
gr.Markdown("---")
# Add Context section
context_input = gr.Textbox(
label="Additional Context",
placeholder="Enter any additional context, code, articles, or any references that relate to the video content...",
lines=5,
interactive=True
)
chatbot = gr.Chatbot(
label="Blog Post", type="messages", height=500, show_copy_all_button=True, show_copy_button=True, show_share_button=True
)
chat_input = gr.Textbox(
label="Your message",
placeholder="Refine the blog post or ask for changes...",
container=False,
)
clear_button = gr.ClearButton([chat_input, chatbot])
# Event handlers to disable/enable inputs based on usage
def on_file_upload(file):
if file is not None:
return gr.update(interactive=False), gr.update(visible=False, value=None)
else:
return gr.update(interactive=True), gr.update(visible=False, value=None)
def on_url_change(url):
if url and url.strip():
thumbnail_update = update_thumbnail_display(url)
return gr.update(interactive=False), thumbnail_update
else:
return gr.update(interactive=True), gr.update(visible=False, value=None)
file_input.change(fn=on_file_upload, inputs=file_input, outputs=[url_input, thumbnail_display])
url_input.change(fn=on_url_change, inputs=url_input, outputs=[file_input, thumbnail_display])
# Chained events for blog generation
(
transcribe_button.click(
fn=handle_transcription,
inputs=[file_input, url_input],
outputs=transcript_output,
)
.then(
fn=lambda: gr.update(value=None, interactive=True),
outputs=file_input,
queue=False,
)
.then(
fn=add_transcript_to_chat,
inputs=transcript_output,
outputs=chatbot,
queue=False,
)
.then(fn=generate_blog_post, inputs=[chatbot, transcript_output, context_input], outputs=chatbot)
)
# Event handler for follow-up chat
chat_input.submit(
fn=user_chat,
inputs=[chat_input, chatbot],
outputs=[chat_input, chatbot],
queue=False,
).then(fn=bot_chat, inputs=chatbot, outputs=chatbot)
if __name__ == "__main__":
demo.launch()