import json import logging import os import re import numpy as np import requests from deepgram import DeepgramClient, SpeakOptions from dotenv import load_dotenv from moviepy import (AudioFileClip, CompositeVideoClip, TextClip, VideoFileClip, concatenate_videoclips) # Create logs directory if it doesn't exist os.makedirs('logs', exist_ok=True) # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/video_generation.log'), logging.StreamHandler() ]) class SizzleReelVideoGenerator: def __init__(self, min_clip_duration=3, max_clip_duration=10): load_dotenv() # Collect Pexels API keys self.pexels_api_keys = [ os.getenv('PEXELS_API_KEY'), os.getenv('PEXELS_API_KEY_2'), os.getenv('PEXELS_API_KEY_3') ] self.pexels_api_keys = [key for key in self.pexels_api_keys if key] # Initialize ElevenLabs client with the single key self.deepgram_client = DeepgramClient(os.getenv('DEEPGRAM_API_KEY')) # Standard video settings self.target_width = 800 self.target_height = 600 self.target_aspect_ratio = 16/9 # Clip duration constraints self.min_clip_duration = min_clip_duration self.max_clip_duration = max_clip_duration # Use relative paths for outputs self.base_output_dir = os.path.join('outputs', 'sizzle_reel') self.audio_dir = os.path.join(self.base_output_dir, 'audio') self.video_dir = os.path.join(self.base_output_dir, 'videos') self.final_dir = os.path.join(self.base_output_dir, 'final') # Create all necessary directories os.makedirs('logs', exist_ok=True) for dir_path in [self.audio_dir, self.video_dir, self.final_dir]: os.makedirs(dir_path, exist_ok=True) def _rotate_pexels_key(self): """Rotate Pexels API keys if one fails""" if len(self.pexels_api_keys) > 1: # Move the first key to the end self.pexels_api_keys.append(self.pexels_api_keys.pop(0)) return self.pexels_api_keys[0] def fetch_pexels_video(self, query, max_retries=3): # Create a copy of the API keys to avoid modifying the original list current_keys = self.pexels_api_keys.copy() for attempt in range(max_retries): # If no keys left, break the loop if not current_keys: logging.error(f"No more Pexels API keys available for query: {query}") return None try: # Use the first available key current_key = current_keys[0] url = "https://api.pexels.com/videos/search" headers = {"Authorization": current_key} params = { "query": query, "per_page": 5, # Increase to have more fallback options "page": 1 } response = requests.get(url, headers=headers, params=params) # Check for rate limit or authentication error if response.status_code in [401, 403]: # Remove the current key and continue with the next current_keys.pop(0) logging.warning(f"Pexels API key failed. Trying next key. Remaining keys: {len(current_keys)}") continue # Raise an exception for other HTTP errors response.raise_for_status() data = response.json() if not data.get('videos'): logging.warning(f"No videos found for query: {query}") return None # Try to find an SD video from the list of results for video in data['videos']: video_files = video['video_files'] sd_videos = [v for v in video_files if v['quality'] == 'sd'] if sd_videos: sd_videos.sort(key=lambda x: x['size']) return sd_videos[0]['link'] # If no SD videos found logging.warning(f"No SD quality videos found for query: {query}") return None except Exception as e: logging.error(f"Error fetching Pexels video (Attempt {attempt + 1}): {e}") # Remove the current key and continue with the next current_keys.pop(0) logging.warning(f"Pexels API key failed. Trying next key. Remaining keys: {len(current_keys)}") # If all attempts fail logging.error(f"Failed to fetch video for query: {query} after multiple attempts") return None def _sanitize_filename(self, filename): sanitized = re.sub(r'[^\w\-_\. ]', '_', filename) return sanitized[:50] # def generate_elevenlabs_voiceover(self, text, step_description, voice_id="JBFqnCBsd6RMkjVDRZzb"): # audio_stream = self.elevenlabs_client.text_to_speech.convert( # text=text, # voice_id=voice_id, # model_id="eleven_flash_v2", # output_format="mp3_44100_128", # voice_settings={ # "stability": 0.0, # "similarity_boost": 1.0, # "style": 0.0, # "use_speaker_boost": True # } # ) # sanitized_step = self._sanitize_filename(step_description) # audio_filename = f"{sanitized_step}_voiceover.mp3" # audio_path = os.path.join(self.audio_dir, audio_filename) # with open(audio_path, 'wb') as f: # for chunk in audio_stream: # if chunk: # f.write(chunk) # return audio_path def generate_voiceover(self, text, step_description, voice_id="aura-athena-en"): try: # Prepare text and filename text_payload = {"text": text} sanitized_step = self._sanitize_filename(step_description) audio_filename = f"{sanitized_step}_voiceover.mp3" audio_path = os.path.join(self.audio_dir, audio_filename) # Configure speak options options = SpeakOptions( model=voice_id, # Use the specified voice, with a default ) # Generate and save the audio response = self.deepgram_client.speak.v("1").save(audio_path, text_payload, options) logging.info(f"Generated voiceover for step: {step_description}") return audio_path except Exception as e: logging.error(f"Error generating voiceover for step: {step_description}") logging.error(f"Error details: {e}") return None def download_video(self, video_url, step_description): response = requests.get(video_url, stream=True) sanitized_step = self._sanitize_filename(step_description) video_filename = f"{sanitized_step}_video.mp4" video_path = os.path.join(self.video_dir, video_filename) with open(video_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return video_path def generate_step_video(self, step, query_type='narrator'): # Alternate between action and narrator for video query video_query = step['action'] if query_type == 'action' else step['narrator'] try: # Generate voiceover audio_path = self.generate_voiceover( step['narrator'], step['step_description'] ) logging.info(f"Generated voiceover for step: {step['step_description']}") # Fetch and download video video_url = self.fetch_pexels_video(video_query) video_path = self.download_video( video_url, step['step_description'] ) logging.info(f"Downloaded video for query: {video_query}") # Get audio duration with AudioFileClip(audio_path) as audio_clip: audio_duration = audio_clip.duration return { 'audio_path': audio_path, 'video_path': video_path, 'step_description': step['step_description'], 'narrator': step['narrator'], 'duration': audio_duration } except Exception as e: logging.error(f"Error generating video for step: {step['step_description']}") logging.error(f"Error details: {e}") return None def add_captions_to_video(self, video_clip, narrator_text, audio_duration): # Create a TextClip for the captions caption = TextClip( text=narrator_text, font='Arial', color='white', stroke_color='black', stroke_width=1, method='caption', size=(video_clip.w * 0.8, 50), # Narrower width, shorter height bg_color='rgba(0,0,0,0.5)', # Semi-transparent background text_align='center', horizontal_align='center', vertical_align='bottom' ) # Position the caption at the bottom of the video caption = caption.with_position(('center', 'bottom')) # Set the duration to match audio caption = caption.with_duration(audio_duration) # Composite the video with the caption return CompositeVideoClip([video_clip, caption]) def create_smooth_transition(self, clip1, clip2, transition_duration=1): """ Create a smooth crossfade transition between two video clips """ # Extract the underlying video clip if it's a CompositeVideoClip if isinstance(clip1, CompositeVideoClip): clip1 = clip1.clips[0] # Assume the first clip is the base video if isinstance(clip2, CompositeVideoClip): clip2 = clip2.clips[0] # Assume the first clip is the base video # Both clips should already be standardized, but verify sizes assert clip1.size == (self.target_width, self.target_height), "Clip1 size mismatch" assert clip2.size == (self.target_width, self.target_height), "Clip2 size mismatch" # Ensure both clips have the same size and fps if clip1.size != clip2.size: clip2 = clip2.resized(clip1.size) # Create a transition clip def transition_func(t): if t < transition_duration: # Linear crossfade alpha1 = 1 - (t / transition_duration) alpha2 = t / transition_duration # Get frames from both clips frame1 = clip1.get_frame(clip1.duration - transition_duration + t) frame2 = clip2.get_frame(t) # Blend frames blended_frame = (alpha1 * frame1 + alpha2 * frame2).astype(np.uint8) return blended_frame else: # After transition, return the second clip's frame return clip2.get_frame(t - transition_duration) # Create a transition clip transition_clip = VideoFileClip( filename, audio=False ).with_duration(transition_duration) transition_clip.get_frame = transition_func return transition_clip def process_video_clip(self, video_path, audio_path): try: # Load video and audio video_clip = VideoFileClip(video_path) audio_clip = AudioFileClip(audio_path) # Standardize video size first video_clip = self._standardize_video_size(video_clip) # Synchronize duration with constraints audio_duration = audio_clip.duration video_duration = video_clip.duration # Adjust clip duration to be within min and max constraints clip_duration = max( self.min_clip_duration, min(audio_duration, self.max_clip_duration, video_duration) ) # Safely trim video and audio to synchronized duration try: video_clip = video_clip.subclipped(0, min(clip_duration, video_duration)) except Exception as ve: logging.warning(f"Error trimming video clip: {ve}. Using full video duration.") video_clip = video_clip.subclipped(0, video_duration) try: audio_clip = audio_clip.subclipped(0, min(clip_duration, audio_duration)) except Exception as ae: logging.warning(f"Error trimming audio clip: {ae}. Using full audio duration.") audio_clip = audio_clip.subclipped(0, audio_duration) # Ensure audio and video have the same duration min_duration = min(video_clip.duration, audio_clip.duration) video_clip = video_clip.subclipped(0, min_duration) audio_clip = audio_clip.subclipped(0, min_duration) # Attach audio to video video_clip = video_clip.with_audio(audio_clip) return video_clip except Exception as e: logging.error(f"Comprehensive error processing video clip: {e}") # Additional diagnostic logging try: logging.error(f"Video path: {video_path}") logging.error(f"Audio path: {audio_path}") # Log file details if possible import os video_exists = os.path.exists(video_path) audio_exists = os.path.exists(audio_path) logging.error(f"Video file exists: {video_exists}") logging.error(f"Audio file exists: {audio_exists}") if video_exists: video_clip = VideoFileClip(video_path) logging.error(f"Video duration: {video_clip.duration}") if audio_exists: audio_clip = AudioFileClip(audio_path) logging.error(f"Audio duration: {audio_clip.duration}") except Exception as diag_error: logging.error(f"Additional diagnostic error: {diag_error}") return None def add_captions_to_video(self, video_clip, narrator_text, audio_duration): # Create a TextClip for the captions caption = TextClip( text=narrator_text, font='Arial', color='white', stroke_color='black', stroke_width=2, method='caption', size=(video_clip.w, 100), bg_color=None, text_align='center', horizontal_align='center', vertical_align='center' ) # Position the caption at the bottom of the video caption = caption.with_position(('center', 'bottom')) # Set the duration to match audio caption = caption.with_duration(audio_duration) # Composite the video with the caption return CompositeVideoClip([video_clip, caption]) def create_smooth_transition(self, clip1, clip2, transition_duration=1): """ Create a smooth crossfade transition between two video clips """ # Extract the underlying video clip if it's a CompositeVideoClip if isinstance(clip1, CompositeVideoClip): clip1 = clip1.clips[0] # Assume the first clip is the base video if isinstance(clip2, CompositeVideoClip): clip2 = clip2.clips[0] # Assume the first clip is the base video # Ensure both clips have the same size and fps if clip1.size != clip2.size: clip2 = clip2.resized(clip1.size) # Create a transition clip def transition_func(t): if t < transition_duration: # Linear crossfade alpha1 = 1 - (t / transition_duration) alpha2 = t / transition_duration # Get frames from both clips frame1 = clip1.get_frame(clip1.duration - transition_duration + t) frame2 = clip2.get_frame(t) # Blend frames blended_frame = (alpha1 * frame1 + alpha2 * frame2).astype(np.uint8) return blended_frame else: # After transition, return the second clip's frame return clip2.get_frame(t - transition_duration) # Create a transition clip transition_clip = VideoFileClip( clip1.filename if hasattr(clip1, 'filename') else '/tmp/transition.mp4', audio=False ).with_duration(transition_duration) transition_clip.get_frame = transition_func return transition_clip def _standardize_video_size(self, clip): """ Standardize video size to target resolution while maintaining aspect ratio """ # Get current clip size w, h = clip.size current_aspect_ratio = w/h if current_aspect_ratio > self.target_aspect_ratio: # Video is wider than target ratio new_width = self.target_width new_height = int(new_width / current_aspect_ratio) else: # Video is taller than target ratio new_height = self.target_height new_width = int(new_height * current_aspect_ratio) # Resize video resized_clip = clip.resized(width=new_width, height=new_height) # Create black background of target size from moviepy.video.VideoClip import ColorClip bg = ColorClip(size=(self.target_width, self.target_height), color=(0,0,0)) bg = bg.with_duration(clip.duration) # Center the video on the background x_offset = (self.target_width - new_width) // 2 y_offset = (self.target_height - new_height) // 2 final_clip = CompositeVideoClip([ bg, resized_clip.with_position((x_offset, y_offset)) ]) return final_clip def generate_sizzle_reel(self, script_json, app_name="CleverApp"): # Parse script based on input type print("Debug", script_json) if isinstance(script_json, str): script = json.loads(script_json) script_steps = script.get('sizzle_reel_script', []) elif isinstance(script_json, dict) and 'sizzle_reel_script' in script_json: # Handle the case where sizzle_reel_script is a CrewOutput object crew_output = script_json['sizzle_reel_script'] if hasattr(crew_output, 'raw'): # Parse the raw JSON string from CrewOutput try: parsed_data = json.loads(crew_output.raw) script_steps = parsed_data.get('sizzle_reel_script', []) except json.JSONDecodeError: logging.error("Failed to parse raw CrewOutput JSON") script_steps = [] else: script_steps = crew_output else: script_steps = [] # Process video steps processed_clips = [] # Convert to list if it's not already if not isinstance(script_steps, list): script_steps = [script_steps] for i, step in enumerate(script_steps): # Convert Pydantic model to dict if necessary if hasattr(step, 'dict'): step = step.dict() # Alternate query type query_type = 'narrator' if i % 2 == 0 else 'action' # Generate step video step_video = self.generate_step_video(step, query_type) if not step_video: logging.warning(f"Skipping step {i} due to video generation failure") continue # Process video clip processed_clip = self.process_video_clip( step_video['video_path'], step_video['audio_path'] ) if processed_clip: # Add captions try: captioned_clip = self.add_captions_to_video( processed_clip, step_video['narrator'], processed_clip.duration ) processed_clips.append(captioned_clip) except Exception as caption_error: logging.error(f"Error adding captions to clip {i}: {caption_error}") # Fallback: use processed clip without captions processed_clips.append(processed_clip) # Check if we have any processed clips if not processed_clips: logging.error("No video clips could be generated") return None # Concatenate processed clips with smooth transitions final_clips = [] for i in range(len(processed_clips) - 1): final_clips.append(processed_clips[i]) # Add transition between clips try: transition = self.create_smooth_transition( processed_clips[i], processed_clips[i+1], transition_duration=1 ) final_clips.append(transition) except Exception as transition_error: logging.warning(f"Could not create transition between clips {i} and {i+1}: {transition_error}") # Add the last clip final_clips.append(processed_clips[-1]) # Concatenate video clips try: final_video = concatenate_videoclips(final_clips, method="compose") except Exception as e: logging.error(f"Error concatenating video clips: {e}") return None # Sanitize app name for filename sanitized_app_name = re.sub(r'[^\w\-_\. ]', '_', app_name) # Output final video with app name output_filename = f"{sanitized_app_name}_sizzle_reel.mp4" output_path = os.path.join(self.final_dir, output_filename) try: final_video.write_videofile( output_path, codec='libx264', audio_codec='aac', preset='ultrafast', # Changed from 'medium' to 'ultrafast' threads=12, # bitrate='600k', fps=24, ) logging.info(f"Successfully generated final video: {output_path}") except Exception as e: logging.error(f"Error writing final video: {e}") return None # Clean up resources for clip in processed_clips + final_clips + [final_video]: try: clip.close() except Exception as close_error: logging.warning(f"Error closing clip: {close_error}") return output_path # Main execution if __name__ == "__main__": # Your sample script here sample_script = { "sizzle_reel_script": [ { "step_description": "Establishing the Problem", "narrator": "In a world filled with stress and uncertainty, finding peace can feel like an uphill battle.", "action": "A split-screen shows a person overwhelmed with stress on the left and another individual feeling calm and focused on the right.", "features": [] }, { "step_description": "The Solution", "narrator": "Meet MindMate, your personal companion for mental well-being. Track your moods, find balance, and embrace tranquility.", "action": "Transition to the MindMate app interface on a smartphone, showcasing its sleek design.", "features": [] }, { "step_description": "The Onboarding Experience", "narrator": "Start your journey by downloading the app and sharing your story through a simple assessment.", "action": "Quick clips of a user downloading the app, creating an account, and completing the initial assessment.", "features": [] }, { "step_description": "Daily Engagement", "narrator": "Log your emotions daily and receive gentle prompts to help you stay connected with your feelings.", "action": "User logs their feelings in the mood tracker, with reminders popping up on their phone.", "features": [ {"feature_name": "Mood Tracker", "description": "Log daily emotions."}, {"feature_name": "Reminders", "description": "Gentle prompts for mood check-ins."} ] }, { "step_description": "Tailored Support", "narrator": "Experience personalized meditation sessions designed just for you, based on your unique mood patterns.", "action": "User engages in a serene meditation environment, surrounded by calming visuals and soundscapes.", "features": [ {"feature_name": "Personalized Meditations", "description": "Meditations based on mood patterns."} ] }, { "step_description": "Empowering Analytics", "narrator": "Track your progress and discover how meditation can transform your mental health journey.", "action": "User reviews analytics on their progress, smiling as they see improvements.", "features": [ {"feature_name": "Progress Tracking", "description": "View mood trends and meditation effectiveness."} ] }, { "step_description": "Connection", "narrator": "Join a community of individuals just like you, sharing stories, and celebrating progress together.", "action": "Users share experiences and feedback within the app community, fostering connection and support.", "features": [ {"feature_name": "Community Support", "description": "Share experiences and feedback."} ] }, { "step_description": "Download Now", "narrator": "Ready to take the first step towards a healthier mind? Download MindMate today and start your journey to tranquility.", "action": "The MindMate logo appears with app store icons for iOS and Android.", "features": [] }, { "step_description": "Inspirational Tone", "narrator": "Your mental well-being matters. Let MindMate guide you.", "action": "A serene landscape with the tagline overlaid: 'MindMate - Your Path to Peace.'", "features": [] } ] } generator = SizzleReelVideoGenerator() final_video = generator.generate_sizzle_reel(sample_script, app_name="MindMate") print(f"Sizzle reel generated: {final_video}")