SizzleReelGenerator / video_gen_new.py
amagastya's picture
Add rest of files
8a656d1 verified
import json
import logging
import os
import re
import numpy as np
import requests
from deepgram import DeepgramClient, SpeakOptions
from dotenv import load_dotenv
from moviepy import (AudioFileClip, CompositeVideoClip, TextClip,
VideoFileClip, concatenate_videoclips)
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/video_generation.log'),
logging.StreamHandler()
])
class SizzleReelVideoGenerator:
def __init__(self, min_clip_duration=3, max_clip_duration=10):
load_dotenv()
# Collect Pexels API keys
self.pexels_api_keys = [
os.getenv('PEXELS_API_KEY'),
os.getenv('PEXELS_API_KEY_2'),
os.getenv('PEXELS_API_KEY_3')
]
self.pexels_api_keys = [key for key in self.pexels_api_keys if key]
# Initialize ElevenLabs client with the single key
self.deepgram_client = DeepgramClient(os.getenv('DEEPGRAM_API_KEY'))
# Standard video settings
self.target_width = 800
self.target_height = 600
self.target_aspect_ratio = 16/9
# Clip duration constraints
self.min_clip_duration = min_clip_duration
self.max_clip_duration = max_clip_duration
# Use relative paths for outputs
self.base_output_dir = os.path.join('outputs', 'sizzle_reel')
self.audio_dir = os.path.join(self.base_output_dir, 'audio')
self.video_dir = os.path.join(self.base_output_dir, 'videos')
self.final_dir = os.path.join(self.base_output_dir, 'final')
# Create all necessary directories
os.makedirs('logs', exist_ok=True)
for dir_path in [self.audio_dir, self.video_dir, self.final_dir]:
os.makedirs(dir_path, exist_ok=True)
def _rotate_pexels_key(self):
"""Rotate Pexels API keys if one fails"""
if len(self.pexels_api_keys) > 1:
# Move the first key to the end
self.pexels_api_keys.append(self.pexels_api_keys.pop(0))
return self.pexels_api_keys[0]
def fetch_pexels_video(self, query, max_retries=3):
# Create a copy of the API keys to avoid modifying the original list
current_keys = self.pexels_api_keys.copy()
for attempt in range(max_retries):
# If no keys left, break the loop
if not current_keys:
logging.error(f"No more Pexels API keys available for query: {query}")
return None
try:
# Use the first available key
current_key = current_keys[0]
url = "https://api.pexels.com/videos/search"
headers = {"Authorization": current_key}
params = {
"query": query,
"per_page": 5, # Increase to have more fallback options
"page": 1
}
response = requests.get(url, headers=headers, params=params)
# Check for rate limit or authentication error
if response.status_code in [401, 403]:
# Remove the current key and continue with the next
current_keys.pop(0)
logging.warning(f"Pexels API key failed. Trying next key. Remaining keys: {len(current_keys)}")
continue
# Raise an exception for other HTTP errors
response.raise_for_status()
data = response.json()
if not data.get('videos'):
logging.warning(f"No videos found for query: {query}")
return None
# Try to find an SD video from the list of results
for video in data['videos']:
video_files = video['video_files']
sd_videos = [v for v in video_files if v['quality'] == 'sd']
if sd_videos:
sd_videos.sort(key=lambda x: x['size'])
return sd_videos[0]['link']
# If no SD videos found
logging.warning(f"No SD quality videos found for query: {query}")
return None
except Exception as e:
logging.error(f"Error fetching Pexels video (Attempt {attempt + 1}): {e}")
# Remove the current key and continue with the next
current_keys.pop(0)
logging.warning(f"Pexels API key failed. Trying next key. Remaining keys: {len(current_keys)}")
# If all attempts fail
logging.error(f"Failed to fetch video for query: {query} after multiple attempts")
return None
def _sanitize_filename(self, filename):
sanitized = re.sub(r'[^\w\-_\. ]', '_', filename)
return sanitized[:50]
# def generate_elevenlabs_voiceover(self, text, step_description, voice_id="JBFqnCBsd6RMkjVDRZzb"):
# audio_stream = self.elevenlabs_client.text_to_speech.convert(
# text=text,
# voice_id=voice_id,
# model_id="eleven_flash_v2",
# output_format="mp3_44100_128",
# voice_settings={
# "stability": 0.0,
# "similarity_boost": 1.0,
# "style": 0.0,
# "use_speaker_boost": True
# }
# )
# sanitized_step = self._sanitize_filename(step_description)
# audio_filename = f"{sanitized_step}_voiceover.mp3"
# audio_path = os.path.join(self.audio_dir, audio_filename)
# with open(audio_path, 'wb') as f:
# for chunk in audio_stream:
# if chunk:
# f.write(chunk)
# return audio_path
def generate_voiceover(self, text, step_description, voice_id="aura-athena-en"):
try:
# Prepare text and filename
text_payload = {"text": text}
sanitized_step = self._sanitize_filename(step_description)
audio_filename = f"{sanitized_step}_voiceover.mp3"
audio_path = os.path.join(self.audio_dir, audio_filename)
# Configure speak options
options = SpeakOptions(
model=voice_id, # Use the specified voice, with a default
)
# Generate and save the audio
response = self.deepgram_client.speak.v("1").save(audio_path, text_payload, options)
logging.info(f"Generated voiceover for step: {step_description}")
return audio_path
except Exception as e:
logging.error(f"Error generating voiceover for step: {step_description}")
logging.error(f"Error details: {e}")
return None
def download_video(self, video_url, step_description):
response = requests.get(video_url, stream=True)
sanitized_step = self._sanitize_filename(step_description)
video_filename = f"{sanitized_step}_video.mp4"
video_path = os.path.join(self.video_dir, video_filename)
with open(video_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return video_path
def generate_step_video(self, step, query_type='narrator'):
# Alternate between action and narrator for video query
video_query = step['action'] if query_type == 'action' else step['narrator']
try:
# Generate voiceover
audio_path = self.generate_voiceover(
step['narrator'],
step['step_description']
)
logging.info(f"Generated voiceover for step: {step['step_description']}")
# Fetch and download video
video_url = self.fetch_pexels_video(video_query)
video_path = self.download_video(
video_url,
step['step_description']
)
logging.info(f"Downloaded video for query: {video_query}")
# Get audio duration
with AudioFileClip(audio_path) as audio_clip:
audio_duration = audio_clip.duration
return {
'audio_path': audio_path,
'video_path': video_path,
'step_description': step['step_description'],
'narrator': step['narrator'],
'duration': audio_duration
}
except Exception as e:
logging.error(f"Error generating video for step: {step['step_description']}")
logging.error(f"Error details: {e}")
return None
def add_captions_to_video(self, video_clip, narrator_text, audio_duration):
# Create a TextClip for the captions
caption = TextClip(
text=narrator_text,
font='Arial',
color='white',
stroke_color='black',
stroke_width=1,
method='caption',
size=(video_clip.w * 0.8, 50), # Narrower width, shorter height
bg_color='rgba(0,0,0,0.5)', # Semi-transparent background
text_align='center',
horizontal_align='center',
vertical_align='bottom'
)
# Position the caption at the bottom of the video
caption = caption.with_position(('center', 'bottom'))
# Set the duration to match audio
caption = caption.with_duration(audio_duration)
# Composite the video with the caption
return CompositeVideoClip([video_clip, caption])
def create_smooth_transition(self, clip1, clip2, transition_duration=1):
"""
Create a smooth crossfade transition between two video clips
"""
# Extract the underlying video clip if it's a CompositeVideoClip
if isinstance(clip1, CompositeVideoClip):
clip1 = clip1.clips[0] # Assume the first clip is the base video
if isinstance(clip2, CompositeVideoClip):
clip2 = clip2.clips[0] # Assume the first clip is the base video
# Both clips should already be standardized, but verify sizes
assert clip1.size == (self.target_width, self.target_height), "Clip1 size mismatch"
assert clip2.size == (self.target_width, self.target_height), "Clip2 size mismatch"
# Ensure both clips have the same size and fps
if clip1.size != clip2.size:
clip2 = clip2.resized(clip1.size)
# Create a transition clip
def transition_func(t):
if t < transition_duration:
# Linear crossfade
alpha1 = 1 - (t / transition_duration)
alpha2 = t / transition_duration
# Get frames from both clips
frame1 = clip1.get_frame(clip1.duration - transition_duration + t)
frame2 = clip2.get_frame(t)
# Blend frames
blended_frame = (alpha1 * frame1 + alpha2 * frame2).astype(np.uint8)
return blended_frame
else:
# After transition, return the second clip's frame
return clip2.get_frame(t - transition_duration)
# Create a transition clip
transition_clip = VideoFileClip(
filename,
audio=False
).with_duration(transition_duration)
transition_clip.get_frame = transition_func
return transition_clip
def process_video_clip(self, video_path, audio_path):
try:
# Load video and audio
video_clip = VideoFileClip(video_path)
audio_clip = AudioFileClip(audio_path)
# Standardize video size first
video_clip = self._standardize_video_size(video_clip)
# Synchronize duration with constraints
audio_duration = audio_clip.duration
video_duration = video_clip.duration
# Adjust clip duration to be within min and max constraints
clip_duration = max(
self.min_clip_duration,
min(audio_duration, self.max_clip_duration, video_duration)
)
# Safely trim video and audio to synchronized duration
try:
video_clip = video_clip.subclipped(0, min(clip_duration, video_duration))
except Exception as ve:
logging.warning(f"Error trimming video clip: {ve}. Using full video duration.")
video_clip = video_clip.subclipped(0, video_duration)
try:
audio_clip = audio_clip.subclipped(0, min(clip_duration, audio_duration))
except Exception as ae:
logging.warning(f"Error trimming audio clip: {ae}. Using full audio duration.")
audio_clip = audio_clip.subclipped(0, audio_duration)
# Ensure audio and video have the same duration
min_duration = min(video_clip.duration, audio_clip.duration)
video_clip = video_clip.subclipped(0, min_duration)
audio_clip = audio_clip.subclipped(0, min_duration)
# Attach audio to video
video_clip = video_clip.with_audio(audio_clip)
return video_clip
except Exception as e:
logging.error(f"Comprehensive error processing video clip: {e}")
# Additional diagnostic logging
try:
logging.error(f"Video path: {video_path}")
logging.error(f"Audio path: {audio_path}")
# Log file details if possible
import os
video_exists = os.path.exists(video_path)
audio_exists = os.path.exists(audio_path)
logging.error(f"Video file exists: {video_exists}")
logging.error(f"Audio file exists: {audio_exists}")
if video_exists:
video_clip = VideoFileClip(video_path)
logging.error(f"Video duration: {video_clip.duration}")
if audio_exists:
audio_clip = AudioFileClip(audio_path)
logging.error(f"Audio duration: {audio_clip.duration}")
except Exception as diag_error:
logging.error(f"Additional diagnostic error: {diag_error}")
return None
def add_captions_to_video(self, video_clip, narrator_text, audio_duration):
# Create a TextClip for the captions
caption = TextClip(
text=narrator_text,
font='Arial',
color='white',
stroke_color='black',
stroke_width=2,
method='caption',
size=(video_clip.w, 100),
bg_color=None,
text_align='center',
horizontal_align='center',
vertical_align='center'
)
# Position the caption at the bottom of the video
caption = caption.with_position(('center', 'bottom'))
# Set the duration to match audio
caption = caption.with_duration(audio_duration)
# Composite the video with the caption
return CompositeVideoClip([video_clip, caption])
def create_smooth_transition(self, clip1, clip2, transition_duration=1):
"""
Create a smooth crossfade transition between two video clips
"""
# Extract the underlying video clip if it's a CompositeVideoClip
if isinstance(clip1, CompositeVideoClip):
clip1 = clip1.clips[0] # Assume the first clip is the base video
if isinstance(clip2, CompositeVideoClip):
clip2 = clip2.clips[0] # Assume the first clip is the base video
# Ensure both clips have the same size and fps
if clip1.size != clip2.size:
clip2 = clip2.resized(clip1.size)
# Create a transition clip
def transition_func(t):
if t < transition_duration:
# Linear crossfade
alpha1 = 1 - (t / transition_duration)
alpha2 = t / transition_duration
# Get frames from both clips
frame1 = clip1.get_frame(clip1.duration - transition_duration + t)
frame2 = clip2.get_frame(t)
# Blend frames
blended_frame = (alpha1 * frame1 + alpha2 * frame2).astype(np.uint8)
return blended_frame
else:
# After transition, return the second clip's frame
return clip2.get_frame(t - transition_duration)
# Create a transition clip
transition_clip = VideoFileClip(
clip1.filename if hasattr(clip1, 'filename') else '/tmp/transition.mp4',
audio=False
).with_duration(transition_duration)
transition_clip.get_frame = transition_func
return transition_clip
def _standardize_video_size(self, clip):
"""
Standardize video size to target resolution while maintaining aspect ratio
"""
# Get current clip size
w, h = clip.size
current_aspect_ratio = w/h
if current_aspect_ratio > self.target_aspect_ratio:
# Video is wider than target ratio
new_width = self.target_width
new_height = int(new_width / current_aspect_ratio)
else:
# Video is taller than target ratio
new_height = self.target_height
new_width = int(new_height * current_aspect_ratio)
# Resize video
resized_clip = clip.resized(width=new_width, height=new_height)
# Create black background of target size
from moviepy.video.VideoClip import ColorClip
bg = ColorClip(size=(self.target_width, self.target_height),
color=(0,0,0))
bg = bg.with_duration(clip.duration)
# Center the video on the background
x_offset = (self.target_width - new_width) // 2
y_offset = (self.target_height - new_height) // 2
final_clip = CompositeVideoClip([
bg,
resized_clip.with_position((x_offset, y_offset))
])
return final_clip
def generate_sizzle_reel(self, script_json, app_name="CleverApp"):
# Parse script based on input type
print("Debug", script_json)
if isinstance(script_json, str):
script = json.loads(script_json)
script_steps = script.get('sizzle_reel_script', [])
elif isinstance(script_json, dict) and 'sizzle_reel_script' in script_json:
# Handle the case where sizzle_reel_script is a CrewOutput object
crew_output = script_json['sizzle_reel_script']
if hasattr(crew_output, 'raw'):
# Parse the raw JSON string from CrewOutput
try:
parsed_data = json.loads(crew_output.raw)
script_steps = parsed_data.get('sizzle_reel_script', [])
except json.JSONDecodeError:
logging.error("Failed to parse raw CrewOutput JSON")
script_steps = []
else:
script_steps = crew_output
else:
script_steps = []
# Process video steps
processed_clips = []
# Convert to list if it's not already
if not isinstance(script_steps, list):
script_steps = [script_steps]
for i, step in enumerate(script_steps):
# Convert Pydantic model to dict if necessary
if hasattr(step, 'dict'):
step = step.dict()
# Alternate query type
query_type = 'narrator' if i % 2 == 0 else 'action'
# Generate step video
step_video = self.generate_step_video(step, query_type)
if not step_video:
logging.warning(f"Skipping step {i} due to video generation failure")
continue
# Process video clip
processed_clip = self.process_video_clip(
step_video['video_path'],
step_video['audio_path']
)
if processed_clip:
# Add captions
try:
captioned_clip = self.add_captions_to_video(
processed_clip,
step_video['narrator'],
processed_clip.duration
)
processed_clips.append(captioned_clip)
except Exception as caption_error:
logging.error(f"Error adding captions to clip {i}: {caption_error}")
# Fallback: use processed clip without captions
processed_clips.append(processed_clip)
# Check if we have any processed clips
if not processed_clips:
logging.error("No video clips could be generated")
return None
# Concatenate processed clips with smooth transitions
final_clips = []
for i in range(len(processed_clips) - 1):
final_clips.append(processed_clips[i])
# Add transition between clips
try:
transition = self.create_smooth_transition(
processed_clips[i],
processed_clips[i+1],
transition_duration=1
)
final_clips.append(transition)
except Exception as transition_error:
logging.warning(f"Could not create transition between clips {i} and {i+1}: {transition_error}")
# Add the last clip
final_clips.append(processed_clips[-1])
# Concatenate video clips
try:
final_video = concatenate_videoclips(final_clips, method="compose")
except Exception as e:
logging.error(f"Error concatenating video clips: {e}")
return None
# Sanitize app name for filename
sanitized_app_name = re.sub(r'[^\w\-_\. ]', '_', app_name)
# Output final video with app name
output_filename = f"{sanitized_app_name}_sizzle_reel.mp4"
output_path = os.path.join(self.final_dir, output_filename)
try:
final_video.write_videofile(
output_path,
codec='libx264',
audio_codec='aac',
preset='ultrafast', # Changed from 'medium' to 'ultrafast'
threads=12,
# bitrate='600k',
fps=24,
)
logging.info(f"Successfully generated final video: {output_path}")
except Exception as e:
logging.error(f"Error writing final video: {e}")
return None
# Clean up resources
for clip in processed_clips + final_clips + [final_video]:
try:
clip.close()
except Exception as close_error:
logging.warning(f"Error closing clip: {close_error}")
return output_path
# Main execution
if __name__ == "__main__":
# Your sample script here
sample_script = {
"sizzle_reel_script": [
{
"step_description": "Establishing the Problem",
"narrator": "In a world filled with stress and uncertainty, finding peace can feel like an uphill battle.",
"action": "A split-screen shows a person overwhelmed with stress on the left and another individual feeling calm and focused on the right.",
"features": []
},
{
"step_description": "The Solution",
"narrator": "Meet MindMate, your personal companion for mental well-being. Track your moods, find balance, and embrace tranquility.",
"action": "Transition to the MindMate app interface on a smartphone, showcasing its sleek design.",
"features": []
},
{
"step_description": "The Onboarding Experience",
"narrator": "Start your journey by downloading the app and sharing your story through a simple assessment.",
"action": "Quick clips of a user downloading the app, creating an account, and completing the initial assessment.",
"features": []
},
{
"step_description": "Daily Engagement",
"narrator": "Log your emotions daily and receive gentle prompts to help you stay connected with your feelings.",
"action": "User logs their feelings in the mood tracker, with reminders popping up on their phone.",
"features": [
{"feature_name": "Mood Tracker", "description": "Log daily emotions."},
{"feature_name": "Reminders", "description": "Gentle prompts for mood check-ins."}
]
},
{
"step_description": "Tailored Support",
"narrator": "Experience personalized meditation sessions designed just for you, based on your unique mood patterns.",
"action": "User engages in a serene meditation environment, surrounded by calming visuals and soundscapes.",
"features": [
{"feature_name": "Personalized Meditations", "description": "Meditations based on mood patterns."}
]
},
{
"step_description": "Empowering Analytics",
"narrator": "Track your progress and discover how meditation can transform your mental health journey.",
"action": "User reviews analytics on their progress, smiling as they see improvements.",
"features": [
{"feature_name": "Progress Tracking", "description": "View mood trends and meditation effectiveness."}
]
},
{
"step_description": "Connection",
"narrator": "Join a community of individuals just like you, sharing stories, and celebrating progress together.",
"action": "Users share experiences and feedback within the app community, fostering connection and support.",
"features": [
{"feature_name": "Community Support", "description": "Share experiences and feedback."}
]
},
{
"step_description": "Download Now",
"narrator": "Ready to take the first step towards a healthier mind? Download MindMate today and start your journey to tranquility.",
"action": "The MindMate logo appears with app store icons for iOS and Android.",
"features": []
},
{
"step_description": "Inspirational Tone",
"narrator": "Your mental well-being matters. Let MindMate guide you.",
"action": "A serene landscape with the tagline overlaid: 'MindMate - Your Path to Peace.'",
"features": []
}
]
}
generator = SizzleReelVideoGenerator()
final_video = generator.generate_sizzle_reel(sample_script, app_name="MindMate")
print(f"Sizzle reel generated: {final_video}")