shorts-generator / modal_deploy.py
vanshcodeworks's picture
Upload 18 files
5168d07 verified
import modal
import os
import sys
import random
from dotenv import load_dotenv
# Add current directory to the Python path to enable direct imports
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.insert(0, current_dir)
# Load environment variables first
load_dotenv()
# Create a completely fresh Modal app
app = modal.App("shorts-generator")
# Create volume for persistent storage
volume = modal.Volume.from_name("shorts-generator-vol", create_if_missing=True)
# Define base image with all dependencies
image = (
modal.Image.debian_slim()
.apt_install(["ffmpeg", "libsm6", "libxext6"])
.pip_install([
"ffmpeg-python",
"openai>=1.0.0",
"openai-whisper>=20231117",
"pytube>=15.0.0",
"yt-dlp>=2023.3.4",
"langchain>=0.1.0",
"python-dotenv>=1.0.0"
])
)
def setup_modal_secrets():
"""Set up Modal secrets using environment variables"""
try:
# Create environment dictionary
secret_env = {
"OPENAI_API_KEY": os.environ.get("OPENAI_API_KEY", ""),
"MISTRAL_API_KEY": os.environ.get("MISTRAL_API_KEY", ""),
"YOUTUBE_API_KEY": os.environ.get("YOUTUBE_API_KEY", ""),
"GOOGLE_API_KEY": os.environ.get("GOOGLE_API_KEY", ""),
}
# Create the secret with proper API usage
secret = modal.Secret(secret_env)
secret.save("shorts-generator-secrets")
print("Modal secrets configured successfully")
except Exception as e:
print(f"Error setting up Modal secrets: {e}")
# Create a setup_directories function
@app.function(image=image, volumes={"/data": volume})
def setup_directories():
import os
os.makedirs("/data/videos", exist_ok=True)
os.makedirs("/data/clips", exist_ok=True)
print("Directories created in Modal volume")
return True
# Define the download_youtube_video function at the module level
@app.function(
image=image,
volumes={"/data": volume},
timeout=600,
secrets=[modal.Secret.from_name("shorts-generator-secrets")]
)
def download_youtube_video(youtube_url):
import os
import uuid
import yt_dlp
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
# Create output path
video_id = f"video_{uuid.uuid4().hex}.mp4"
output_path = f"/data/videos/{video_id}"
# Download with yt-dlp
logger.info(f"Downloading video from: {youtube_url}")
ydl_opts = {
'format': 'mp4',
'outtmpl': output_path,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(youtube_url, download=True)
title = info.get('title', 'Unknown')
logger.info(f"Downloaded: {title}")
return output_path, title
except Exception as e:
logger.error(f"Download failed: {str(e)}")
return None, str(e)
# Define transcribe_video_enhanced at the module level
@app.function(
image=image,
volumes={"/data": volume},
gpu="T4", # Request GPU for faster transcription
timeout=900,
secrets=[modal.Secret.from_name("shorts-generator-secrets")]
)
def transcribe_video_enhanced(video_path_or_url):
"""Enhanced video transcription with better error handling and validation"""
import os
import tempfile
import whisper
import subprocess
import json
import logging
import requests
import uuid
import yt_dlp
import shutil
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info(f"Processing transcription request for: {video_path_or_url}")
# Function to repair a corrupted video file
def repair_video(path):
"""Attempts to repair a corrupted video file"""
logger.info(f"Attempting to repair video file: {path}")
if not os.path.exists(path):
return False, "File not found"
try:
# Create temporary directory for repair
repair_dir = tempfile.mkdtemp(dir="/data/tmp")
repaired_path = os.path.join(repair_dir, f"repaired_{os.path.basename(path)}")
# Try to repair with ffmpeg by re-encoding
cmd = [
"ffmpeg",
"-y",
"-err_detect", "ignore_err",
"-analyzeduration", "100M",
"-probesize", "100M",
"-i", path,
"-c:v", "libx264",
"-preset", "ultrafast",
"-c:a", "aac",
repaired_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Repair failed: {result.stderr}")
return False, f"Repair failed: {result.stderr[:100]}..."
# Check if repaired file is valid
valid, msg = validate_video(repaired_path)
if valid:
return True, repaired_path
else:
return False, f"Repaired file still invalid: {msg}"
except Exception as e:
logger.error(f"Error during repair: {str(e)}")
return False, f"Repair error: {str(e)}"
# Function to validate a video file
def validate_video(path):
if not os.path.exists(path):
return False, "File not found"
# Check file size first
file_size = os.path.getsize(path)
if file_size < 10000: # Less than 10KB
return False, f"File too small: {file_size} bytes"
# Use ffprobe with increased analyzeduration and probesize
cmd = [
"ffprobe",
"-v", "error",
"-analyzeduration", "100M",
"-probesize", "100M",
"-show_entries", "stream=codec_type,codec_name,width,height,pix_fmt",
"-of", "json",
path
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode == 0:
data = json.loads(result.stdout)
streams = data.get('streams', [])
# Check if we have valid video streams
for stream in streams:
if stream.get('codec_type') == 'video':
# Check if pixel format is specified
if not stream.get('pix_fmt'):
return False, "Video stream has no pixel format"
# Check dimensions
if not stream.get('width') or not stream.get('height'):
return False, "Video stream has no dimensions"
# If we got here, the video stream seems valid
return True, "Video validated"
# No valid video stream found
return False, "No valid video stream found"
else:
return False, f"FFprobe error: {result.stderr[:100]}..."
except Exception as e:
return False, f"Validation error: {str(e)}"
# Function to extract audio from video
def extract_audio(video_path, output_dir):
"""Extract audio from video file for transcription"""
audio_path = os.path.join(output_dir, "audio.wav")
# Try two different approaches for extraction
try:
# First attempt: standard extraction
cmd = [
"ffmpeg",
"-y",
"-analyzeduration", "100M",
"-probesize", "100M",
"-i", video_path,
"-vn", # No video
"-acodec", "pcm_s16le", # PCM 16-bit audio
"-ar", "16000", # 16kHz sample rate
"-ac", "1", # Mono
audio_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0 and os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000:
return True, audio_path
# Second attempt: copy stream directly
logger.info("First audio extraction failed, trying alternate method")
cmd = [
"ffmpeg",
"-y",
"-analyzeduration", "100M",
"-probesize", "100M",
"-i", video_path,
"-vn",
"-acodec", "copy",
os.path.join(output_dir, "audio_copy.aac")
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
# Convert the copied audio to WAV
if result.returncode == 0:
cmd = [
"ffmpeg",
"-y",
"-i", os.path.join(output_dir, "audio_copy.aac"),
"-acodec", "pcm_s16le",
"-ar", "16000",
"-ac", "1",
audio_path
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0 and os.path.exists(audio_path) and os.path.getsize(audio_path) > 1000:
return True, audio_path
# If both methods failed, return failure
return False, f"Audio extraction failed: {result.stderr[:100]}..."
except Exception as e:
logger.error(f"Error extracting audio: {str(e)}")
return False, f"Audio extraction error: {str(e)}"
# Handle URL or local path differently
video_path = video_path_or_url
temp_dir = None
proc_temp_dir = None
try:
# Create temp directories
temp_dir = tempfile.mkdtemp(dir="/data/tmp")
proc_temp_dir = tempfile.mkdtemp(dir="/data/tmp")
# If it's a URL, download it first
if isinstance(video_path_or_url, str) and video_path_or_url.startswith(('http://', 'https://')):
logger.info("Input is a URL, downloading first...")
try:
output_path = os.path.join(temp_dir, f"video_{uuid.uuid4().hex}.mp4")
# Try yt-dlp first (better for YouTube)
ydl_opts = {
'format': 'best[ext=mp4]',
'outtmpl': output_path,
'quiet': False,
'no_warnings': False
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_path_or_url])
video_path = output_path
logger.info(f"Downloaded video to {video_path}")
except Exception as e:
logger.error(f"Error downloading video: {str(e)}")
raise Exception(f"Failed to download video: {str(e)}")
# Validate the video file
logger.info(f"Validating video: {video_path}")
is_valid, message = validate_video(video_path)
# If invalid, try to repair
if not is_valid:
logger.warning(f"Invalid video file: {message}. Attempting repair...")
repair_success, repair_result = repair_video(video_path)
if repair_success:
logger.info("Video repaired successfully!")
video_path = repair_result
else:
logger.error(f"Video repair failed: {repair_result}")
raise Exception(f"Invalid video file and repair failed: {repair_result}")
# Create a temporary copy for whisper processing
working_video_path = os.path.join(proc_temp_dir, os.path.basename(video_path))
logger.info(f"Creating working copy at {working_video_path}")
shutil.copy2(video_path, working_video_path)
# Extract audio to improve transcription reliability
logger.info("Extracting audio for better transcription...")
audio_success, audio_path = extract_audio(working_video_path, proc_temp_dir)
if not audio_success:
logger.error(f"Failed to extract audio: {audio_path}")
raise Exception(f"Failed to extract audio: {audio_path}")
logger.info(f"Audio extracted to {audio_path}")
# Load Whisper model with GPU acceleration
logger.info("Loading Whisper model...")
try:
# Try faster tiny model first
model = whisper.load_model("tiny")
logger.info("Using tiny Whisper model for initial pass")
except Exception as e:
logger.warning(f"Error loading tiny model: {str(e)}, trying base")
model = whisper.load_model("base")
# Transcribe the audio
logger.info("Starting transcription...")
result = model.transcribe(
audio_path,
fp16=True, # Use FP16 for GPU acceleration
language="en", # Specify language if known
word_timestamps=True
)
# Check if we got a good result
if not result.get('text') or len(result.get('text', '')) < 10:
logger.warning("Initial transcription returned little or no text, trying with base model")
try:
# Try using base model for better quality
model = whisper.load_model("base")
result = model.transcribe(
audio_path,
fp16=True,
language="en",
word_timestamps=True
)
except Exception as e:
logger.error(f"Error in second transcription attempt: {str(e)}")
# Format output with timestamps
transcript_with_timestamps = []
for segment in result["segments"]:
transcript_with_timestamps.append({
"start": segment["start"],
"end": segment["end"],
"text": segment["text"]
})
# Clean up temp files
try:
if temp_dir:
shutil.rmtree(temp_dir, ignore_errors=True)
if proc_temp_dir:
shutil.rmtree(proc_temp_dir, ignore_errors=True)
except Exception as e:
logger.warning(f"Failed to clean up some temporary files: {str(e)}")
# Return structured result
transcript_result = {
"full_text": result["text"],
"segments": transcript_with_timestamps,
"method": "modal-whisper-gpu"
}
logger.info(f"Transcription completed successfully. Text length: {len(result['text'])}")
return transcript_result
except Exception as e:
logger.error(f"Transcription failed: {str(e)}")
# Clean up temp files in case of error
try:
if temp_dir:
shutil.rmtree(temp_dir, ignore_errors=True)
if proc_temp_dir:
shutil.rmtree(proc_temp_dir, ignore_errors=True)
except:
pass
# Return a minimal valid response rather than fail completely
return {
"full_text": f"Transcription failed: {str(e)}",
"segments": [{"start": 0, "end": 5, "text": "Transcription failed"}],
"method": "modal-error"
}
# Add a video validation and repair function at the module level
@app.function(
image=image,
volumes={"/data": volume},
timeout=600,
secrets=[modal.Secret.from_name("shorts-generator-secrets")]
)
def validate_repair_video(video_path):
"""Validate a video file and repair/re-download if needed"""
import os
import subprocess
import json
import logging
import yt_dlp
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info(f"Validating video: {video_path}")
# Check if file exists
if not os.path.exists(video_path):
logger.error(f"Video file not found: {video_path}")
return False, "File not found"
# Check if video is valid using ffprobe with increased analyzeduration and probesize
probe_cmd = [
"ffprobe",
"-v", "error",
"-analyzeduration", "100M",
"-probesize", "100M",
"-show_entries",
"stream=codec_type,codec_name,width,height",
"-of", "json",
video_path
]
try:
result = subprocess.run(probe_cmd, capture_output=True, text=True)
if result.returncode == 0:
data = json.loads(result.stdout)
streams = data.get('streams', [])
# Check if we have video streams with dimensions
valid = any(s.get('codec_type') == 'video' and
s.get('width') is not None and
s.get('height') is not None
for s in streams)
if valid:
logger.info("Video file is valid")
return True, "Video is valid"
else:
logger.warning("Video has no valid video streams")
else:
logger.warning(f"FFprobe validation failed: {result.stderr}")
except Exception as e:
logger.error(f"Error validating video: {str(e)}")
logger.warning("Video file is corrupt or invalid, cleaning cache...")
# Try to remove the invalid file
try:
os.remove(video_path)
logger.info(f"Removed invalid file: {video_path}")
except Exception as e:
logger.error(f"Failed to remove file: {str(e)}")
return False, "Video is invalid"
# Helper function for generic highlights
def _generate_generic_highlights(num_highlights, duration=60):
"""Generate generic highlights when all else fails"""
import random
highlights = []
for i in range(num_highlights):
start_time = i * 90 # Space out every 90 seconds
highlights.append({
"start_time": start_time,
"end_time": start_time + duration,
"title": f"Highlight {i+1}",
"description": f"Auto-selected highlight starting at {int(start_time//60)}:{int(start_time%60):02d}"
})
return highlights
# Add a new smart clip generator function at the module level
@app.function(
image=image,
volumes={"/data": volume},
timeout=600,
secrets=[modal.Secret.from_name("shorts-generator-secrets")]
)
def create_smart_clips(video_path, transcript_data, min_duration=20, max_duration=60,
target_clips=3):
"""
Creates variable-length clips based on content relevance rather than fixed duration.
Args:
video_path: Path to the video file
transcript_data: Transcript with timestamps
min_duration: Minimum clip duration in seconds (default: 20)
max_duration: Maximum clip duration in seconds (default: 60)
target_clips: Number of clips to generate (default: 3)
Returns:
List of clip info dictionaries with paths and metadata
"""
import os
import subprocess
import json
import logging
import uuid
import tempfile
from math import ceil
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info(f"Creating smart variable-length clips from: {video_path}")
logger.info(f"Parameters: min={min_duration}s, max={max_duration}s, target={target_clips} clips")
def find_natural_segments(transcript_data):
"""Find natural break points in the transcript for better clip boundaries"""
segments = transcript_data.get("segments", [])
if not segments:
logger.warning("No transcript segments found, using time-based segmentation")
return []
natural_breaks = []
# Find pauses between sentences (typically longer gaps)
for i in range(len(segments) - 1):
current_seg = segments[i]
next_seg = segments[i + 1]
# Calculate gap between segments
gap = next_seg['start'] - current_seg['end']
# Check if segment ends with sentence-ending punctuation
ends_sentence = current_seg['text'].rstrip().endswith(('.', '!', '?'))
# Consider it a natural break if there's a significant pause or sentence end
if gap > 0.75 or ends_sentence:
natural_breaks.append({
'time': current_seg['end'],
'quality': (5 if ends_sentence else 3) + (min(gap * 2, 5)), # Score quality of break
'text_context': current_seg['text']
})
# Sort by quality (higher is better)
natural_breaks.sort(key=lambda x: x['quality'], reverse=True)
return natural_breaks
def create_clip(input_path, output_dir, start_time, end_time, index):
"""Create a clip using ffmpeg with the specified start and end times"""
# Create a unique filename
output_filename = f"clip_{index}_smart_{int(start_time)}to{int(end_time)}.mp4"
output_path = os.path.join(output_dir, output_filename)
# Build the ffmpeg command
duration = end_time - start_time
cmd = [
"ffmpeg", "-y",
"-analyzeduration", "100M", "-probesize", "100M",
"-ss", str(start_time),
"-i", input_path,
"-t", str(duration),
"-c:v", "libx264", "-preset", "medium",
"-c:a", "aac", "-strict", "experimental", "-b:a", "128k",
output_path
]
try:
logger.info(f"Creating clip {index+1}: {start_time:.1f}s to {end_time:.1f}s (duration: {duration:.1f}s)")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Failed to create clip: {result.stderr[:200]}")
return None
return {
"path": output_path,
"start_time": start_time,
"end_time": end_time,
"duration": duration,
"title": f"Clip {index+1}: {start_time:.1f}s to {end_time:.1f}s"
}
except Exception as e:
logger.error(f"Error creating clip: {str(e)}")
return None
try:
# Create output directory if using local path
output_dir = "/data/clips"
os.makedirs(output_dir, exist_ok=True)
# Get total video duration
duration_cmd = [
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "json",
video_path
]
result = subprocess.run(duration_cmd, capture_output=True, text=True)
duration_data = json.loads(result.stdout)
total_duration = float(duration_data.get('format', {}).get('duration', 0))
if total_duration <= 0:
logger.error("Could not determine video duration")
return []
logger.info(f"Video total duration: {total_duration:.2f} seconds")
# Find natural breaks in the content
natural_breaks = find_natural_segments(transcript_data)
logger.info(f"Found {len(natural_breaks)} potential natural break points")
clips = []
# Use natural breaks if we have enough of them
if natural_breaks and len(natural_breaks) >= target_clips - 1:
# Take top N-1 best breaks to create N clips
selected_breaks = natural_breaks[:target_clips - 1]
selected_times = sorted([b['time'] for b in selected_breaks])
# Create start/end pairs for clips
start_times = [0] + selected_times
end_times = selected_times + [total_duration]
# Validate and adjust segments
for i in range(len(start_times)):
start = start_times[i]
end = end_times[i]
duration = end - start
# Skip segments that are too short
if duration < min_duration:
continue
# Cap segments that are too long
if duration > max_duration:
end = start + max_duration
# Create the clip
clip_info = create_clip(video_path, output_dir, start, end, i)
if clip_info:
clips.append(clip_info)
else:
# If we don't have good natural breaks, use evenly spaced clips
# with slight variations for more natural feel
logger.info("Using evenly spaced clips with variations")
# Determine base clip duration with some randomness
base_duration = min(max_duration, total_duration / target_clips)
for i in range(target_clips):
# Add some variation to make it feel more natural
variation = random.uniform(-3, 3) if base_duration > 25 else 0
clip_duration = base_duration + variation
# Ensure duration constraints
clip_duration = max(min_duration, min(clip_duration, max_duration))
# Calculate start and end, ensuring we don't exceed video length
start_time = i * (total_duration / target_clips)
end_time = min(start_time + clip_duration, total_duration)
# Create the clip
clip_info = create_clip(video_path, output_dir, start_time, end_time, i)
if clip_info:
clips.append(clip_info)
logger.info(f"Successfully created {len(clips)} variable-length clips")
return clips
except Exception as e:
logger.error(f"Error in smart clip creation: {str(e)}")
return []
# Add a smart highlight selector at the module level
@app.function(
image=image,
volumes={"/data": volume},
timeout=600,
secrets=[modal.Secret.from_name("shorts-generator-secrets")]
)
def smart_highlight_selector(transcript_data, video_title, num_highlights=3,
min_duration=15, max_duration=60, content_type="interesting"):
"""
Selects highlights from a transcript based on specific content preferences.
Args:
transcript_data: Transcript with timestamps
video_title: Title of the video
num_highlights: Number of highlights to select
min_duration: Minimum highlight duration in seconds
max_duration: Maximum highlight duration in seconds
content_type: Type of content to look for (funny, interesting, etc.)
Returns:
List of highlight info dictionaries with timestamps
"""
import os
import json
import openai
import logging
import random
from difflib import SequenceMatcher
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configure OpenAI client
client = openai.OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
# Define content type specific instructions
content_type_instructions = {
"interesting": "Focus on intellectually engaging and thought-provoking moments that viewers will find fascinating.",
"funny": "Look for humorous moments, jokes, laughter, or amusing anecdotes that will entertain viewers.",
"dramatic": "Identify emotionally powerful moments with high tension, conflicts, or impactful revelations.",
"educational": "Find explanations of concepts, demonstrations, or moments that teach something valuable.",
"surprising": "Look for unexpected twists, shocking revelations, or moments that defy expectations.",
"inspiring": "Identify motivational content, success stories, or uplifting moments that inspire action."
}
# Get specific instructions or use default
content_instruction = content_type_instructions.get(
content_type, "Focus on the most engaging moments that will work well as short clips."
)
logger.info(f"Selecting {content_type} highlights from video: {video_title}")
# Extract transcript segments and full text
segments = transcript_data.get("segments", [])
full_text = transcript_data.get("full_text", "")
if not segments or not full_text:
logger.warning("Empty transcript data, returning generic highlights")
return _generate_generic_highlights(num_highlights, max_duration)
try:
# Create a GPT prompt that focuses on the specified content type
prompt = f"""
You are an expert video editor specializing in finding {content_type} moments in videos for social media shorts.
{content_instruction}
For the video titled "{video_title}", analyze this transcript and identify {num_highlights}
distinct moments that would make great shorts with durations between {min_duration}
and {max_duration} seconds.
TRANSCRIPT:
{full_text[:4000]} # Limit text to stay within token limit
For each highlight, provide:
1. A brief description of the {content_type} moment
2. A catchy title that will grab viewer attention
3. Specific text from the transcript that matches this moment
Format your response as a JSON array:
[
{{
"title": "Catchy Title Here",
"description": "Description of the moment",
"transcript_text": "Exact text from transcript for matching"
}}
]
ONLY include the JSON array in your response, no other text.
"""
# Call the LLM to identify highlights
logger.info("Calling LLM to identify highlights...")
response = client.chat.completions.create(
model="gpt-4-turbo", # Use an appropriate model
messages=[
{"role": "system", "content": f"You are an expert video editor specializing in {content_type} content for social media."},
{"role": "user", "content": prompt}
],
temperature=0.7
)
# Parse the response
content = response.choices[0].message.content
# Extract JSON (handling the possibility of code blocks or plain JSON)
import re
json_match = re.search(r'(\[[\s\S]*\])', content)
if json_match:
highlighted_moments = json.loads(json_match.group(1))
else:
# Try parsing the whole thing as JSON
try:
highlighted_moments = json.loads(content)
except:
logger.error("Could not parse LLM response as JSON")
return _generate_generic_highlights(num_highlights, max_duration)
logger.info(f"Found {len(highlighted_moments)} potential highlighted moments")
# Match each highlighted moment with transcript segments
highlights = []
for moment in highlighted_moments:
transcript_text = moment.get("transcript_text", "").lower()
# Find best matching segment
best_segment = None
highest_similarity = 0
for segment in segments:
segment_text = segment["text"].lower()
# Calculate similarity between the moment text and segment text
similarity = SequenceMatcher(None, transcript_text, segment_text).ratio()
if similarity > highest_similarity:
highest_similarity = similarity
best_segment = segment
if not best_segment:
# Fall back to random segment if no match found
best_segment = random.choice(segments)
# Calculate start and end times
start_time = best_segment["start"]
# Get segments that fit within desired duration
clip_segments = []
current_duration = 0
for segment in segments:
if segment["start"] >= start_time:
segment_duration = segment["end"] - segment["start"]
if current_duration + segment_duration <= max_duration:
clip_segments.append(segment)
current_duration += segment_duration
else:
break
if clip_segments:
end_time = clip_segments[-1]["end"]
# Ensure minimum duration
if end_time - start_time < min_duration:
end_time = start_time + min_duration
# Add highlight
highlights.append({
"start_time": start_time,
"end_time": end_time,
"title": moment.get("title", f"{content_type.capitalize()} Highlight"),
"description": moment.get("description", f"A {content_type} moment from {video_title}")
})
# If we got fewer highlights than requested, pad with generic ones
if len(highlights) < num_highlights:
additional_needed = num_highlights - len(highlights)
highlights.extend(_generate_generic_highlights(additional_needed, max_duration))
return highlights
except Exception as e:
logger.error(f"Error selecting highlights: {str(e)}")
return _generate_generic_highlights(num_highlights, max_duration)
# Add select_highlights at the module level
@app.function(
image=image,
volumes={"/data": volume},
timeout=300,
secrets=[modal.Secret.from_name("shorts-generator-secrets")]
)
def select_highlights(transcript_data, video_title, num_highlights=3, max_duration=60):
"""
Basic highlight selection based on transcript data.
This is used as a fallback when smart_highlight_selector is not available.
"""
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info(f"Selecting basic highlights from {video_title}")
segments = transcript_data.get("segments", [])
if not segments:
logger.warning("No transcript segments found, generating generic highlights")
return _generate_generic_highlights(num_highlights, max_duration)
# Find segments spaced evenly throughout the video
total_segments = len(segments)
step = max(1, total_segments // (num_highlights + 1))
highlights = []
for i in range(1, min(num_highlights + 1, total_segments)):
idx = i * step
if idx >= total_segments:
break
segment = segments[idx]
start_time = segment["start"]
end_time = min(start_time + max_duration, segments[-1]["end"])
highlights.append({
"start_time": start_time,
"end_time": end_time,
"title": f"Highlight {i}",
"description": f"Segment starting at {int(start_time//60)}:{int(start_time%60):02d}"
})
logger.info(f"Selected {len(highlights)} basic highlights")
return highlights
# Add clip_video at the module level
@app.function(
image=image,
volumes={"/data": volume},
timeout=300,
secrets=[modal.Secret.from_name("shorts-generator-secrets")]
)
def clip_video(video_path, highlights):
"""
Create video clips based on highlight timestamps.
This is used as a fallback when create_smart_clips is not available.
"""
import os
import subprocess
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info(f"Creating clips from: {video_path}")
output_dir = "/data/clips"
os.makedirs(output_dir, exist_ok=True)
clips = []
for i, highlight in enumerate(highlights):
start_time = highlight.get("start_time", 0)
end_time = highlight.get("end_time", start_time + 60)
duration = end_time - start_time
output_path = os.path.join(output_dir, f"clip_{i}_{int(start_time)}to{int(end_time)}.mp4")
# Build the ffmpeg command
cmd = [
"ffmpeg", "-y",
"-analyzeduration", "100M", "-probesize", "100M",
"-ss", str(start_time),
"-i", video_path,
"-t", str(duration),
"-c:v", "libx264", "-preset", "medium",
"-c:a", "aac",
output_path
]
try:
logger.info(f"Creating clip {i+1}: {start_time:.1f}s to {end_time:.1f}s")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.error(f"Failed to create clip: {result.stderr[:200]}")
continue
clips.append({
"path": output_path,
"start_time": start_time,
"end_time": end_time,
"duration": duration,
"title": highlight.get("title", f"Clip {i+1}")
})
except Exception as e:
logger.error(f"Error creating clip: {str(e)}")
logger.info(f"Created {len(clips)} clips")
return clips
# Add generate_caption at the module level
@app.function(
image=image,
timeout=300,
secrets=[modal.Secret.from_name("shorts-generator-secrets")]
)
def generate_caption(clip_info, transcript_data, video_title):
"""
Generate engaging captions for a video clip.
"""
import os
import openai
import logging
import json
import re
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
logger.info(f"Generating caption for clip: {clip_info.get('title')}")
# Extract the transcript text for this clip's time range
start_time = clip_info.get("start_time", 0)
end_time = clip_info.get("end_time", 0)
clip_text = ""
segments = transcript_data.get("segments", [])
for segment in segments:
# Include segments that overlap with the clip time range
if (segment["start"] <= end_time and segment["end"] >= start_time):
clip_text += segment["text"] + " "
clip_text = clip_text.strip()
# If no text found in clip range, use generic caption
if not clip_text:
logger.warning("No transcript text found for clip range")
return {
"title": f"Interesting moment from {video_title}",
"caption": f"Check out this clip from {video_title}! #shorts",
"hashtags": "#shorts #viral #trending"
}
# Generate caption using OpenAI
try:
client = openai.OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
prompt = f"""
You are a social media expert creating engaging captions for YouTube Shorts.
Video title: "{video_title}"
Transcript of clip:
{clip_text[:500]}
Create a captivating caption package with:
1. A catchy title (max 60 characters)
2. An engaging caption (2-3 sentences max)
3. 3-5 relevant hashtags
Format as JSON: {{"title": "...", "caption": "...", "hashtags": "..."}}
"""
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a social media caption expert."},
{"role": "user", "content": prompt}
],
temperature=0.7
)
content = response.choices[0].message.content
# Extract JSON response
json_match = re.search(r'{.*}', content, re.DOTALL)
if json_match:
caption_data = json.loads(json_match.group(0))
else:
logger.warning("Could not parse JSON response, using generic caption")
caption_data = {
"title": f"Highlight from {video_title}",
"caption": f"Check out this amazing moment! #shorts",
"hashtags": "#shorts #trending"
}
# Ensure all required fields exist
if "title" not in caption_data:
caption_data["title"] = f"Highlight from {video_title}"
if "caption" not in caption_data:
caption_data["caption"] = f"Check out this amazing moment! #shorts"
if "hashtags" not in caption_data:
caption_data["hashtags"] = "#shorts #trending"
logger.info(f"Generated caption for clip: {caption_data['title']}")
return caption_data
except Exception as e:
logger.error(f"Error generating caption: {str(e)}")
return {
"title": f"Highlight from {video_title}",
"caption": f"Check out this amazing moment! #shorts",
"hashtags": "#shorts #trending"
}
# Main execution block
if __name__ == "__main__":
print("Starting Modal deployment process...")
# First set up secrets
setup_modal_secrets()
# Deploy the app - this must be done BEFORE any remote functions are called
print("Deploying Modal app 'shorts-generator'...")
app.deploy()
print("Modal app deployed successfully!")
# Let's skip the client-based function call since it's causing issues
print("Note: We're skipping directory setup as it seems the Modal Client API has changed.")
print("The directories will be automatically created when functions are called.")
# Add Windows-specific connection handling
if os.name == 'nt': # Check if running on Windows
print("\nNote: On Windows, you may see 'ConnectionResetError' messages in the console.")
print("These are harmless asyncio socket issues and can be safely ignored.")
print("Your videos should still process correctly despite these messages.")
print("\n===== SHORTS GENERATOR FOR MCP HACKATHON =====")
print("YouTube Shorts Generator using Modal for processing & Gradio for UI")
print("Part of the MCP Hackathon - Track 3: Agentic Demo Showcase")
print("\nTo work around Modal Client API issues, use these options:")
print("1. Add processing functions directly to this file:")
print("\n2. Test your functions directly:")
print("""
# Example test of function (add this to the end of the script)
test_url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" # Change to a valid URL
print(f"\\nTesting download with: {test_url}")
try:
# Direct call without using Client API
result = download_youtube_video.call(test_url)
print(f"Success! Video path: {result[0]}, Title: {result[1]}")
except Exception as e:
print(f"Test failed: {str(e)}")
""")
print("\nℹ️ MCP Hackathon Submission:")
print("- README.md should include tag: \"agent-demo-track\"")
print("- Include a video demo link in README.md")
print("- Deadline: June 8 at 11:59 PM UTC")
print("- Documentation: modelcontextprotocol.io")
print("\n==== TROUBLESHOOTING CORRUPT VIDEOS ====")
print("If you're experiencing issues with corrupted video files:")
print("1. Clear the local cache: delete files in %TEMP%/shorts_generator_cache")
print("2. Use the validate_repair_video function to check video integrity:")
print("""
# Example for validating videos:
video_path = "/data/videos/your_video.mp4"
is_valid, message = validate_repair_video.call(video_path)
print(f"Video valid: {is_valid}, Message: {message}")
""")
print("3. Add analyzeduration and probesize options to ffmpeg commands:")
print(" ffmpeg -analyzeduration 100M -probesize 100M -i video.mp4 ...")
print("\nModal deployment complete!")