Spaces:
Running
Running
| import json | |
| import logging | |
| import os | |
| import re | |
| import numpy as np | |
| import requests | |
| from deepgram import DeepgramClient, SpeakOptions | |
| from dotenv import load_dotenv | |
| from moviepy import (AudioFileClip, CompositeVideoClip, TextClip, | |
| VideoFileClip, concatenate_videoclips) | |
| # Create logs directory if it doesn't exist | |
| os.makedirs('logs', exist_ok=True) | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('logs/video_generation.log'), | |
| logging.StreamHandler() | |
| ]) | |
| class SizzleReelVideoGenerator: | |
| def __init__(self, min_clip_duration=3, max_clip_duration=10): | |
| load_dotenv() | |
| # Collect Pexels API keys | |
| self.pexels_api_keys = [ | |
| os.getenv('PEXELS_API_KEY'), | |
| os.getenv('PEXELS_API_KEY_2'), | |
| os.getenv('PEXELS_API_KEY_3') | |
| ] | |
| self.pexels_api_keys = [key for key in self.pexels_api_keys if key] | |
| # Initialize ElevenLabs client with the single key | |
| self.deepgram_client = DeepgramClient(os.getenv('DEEPGRAM_API_KEY')) | |
| # Standard video settings | |
| self.target_width = 800 | |
| self.target_height = 600 | |
| self.target_aspect_ratio = 16/9 | |
| # Clip duration constraints | |
| self.min_clip_duration = min_clip_duration | |
| self.max_clip_duration = max_clip_duration | |
| # Use relative paths for outputs | |
| self.base_output_dir = os.path.join('outputs', 'sizzle_reel') | |
| self.audio_dir = os.path.join(self.base_output_dir, 'audio') | |
| self.video_dir = os.path.join(self.base_output_dir, 'videos') | |
| self.final_dir = os.path.join(self.base_output_dir, 'final') | |
| # Create all necessary directories | |
| os.makedirs('logs', exist_ok=True) | |
| for dir_path in [self.audio_dir, self.video_dir, self.final_dir]: | |
| os.makedirs(dir_path, exist_ok=True) | |
| def _rotate_pexels_key(self): | |
| """Rotate Pexels API keys if one fails""" | |
| if len(self.pexels_api_keys) > 1: | |
| # Move the first key to the end | |
| self.pexels_api_keys.append(self.pexels_api_keys.pop(0)) | |
| return self.pexels_api_keys[0] | |
| def fetch_pexels_video(self, query, max_retries=3): | |
| # Create a copy of the API keys to avoid modifying the original list | |
| current_keys = self.pexels_api_keys.copy() | |
| for attempt in range(max_retries): | |
| # If no keys left, break the loop | |
| if not current_keys: | |
| logging.error(f"No more Pexels API keys available for query: {query}") | |
| return None | |
| try: | |
| # Use the first available key | |
| current_key = current_keys[0] | |
| url = "https://api.pexels.com/videos/search" | |
| headers = {"Authorization": current_key} | |
| params = { | |
| "query": query, | |
| "per_page": 5, # Increase to have more fallback options | |
| "page": 1 | |
| } | |
| response = requests.get(url, headers=headers, params=params) | |
| # Check for rate limit or authentication error | |
| if response.status_code in [401, 403]: | |
| # Remove the current key and continue with the next | |
| current_keys.pop(0) | |
| logging.warning(f"Pexels API key failed. Trying next key. Remaining keys: {len(current_keys)}") | |
| continue | |
| # Raise an exception for other HTTP errors | |
| response.raise_for_status() | |
| data = response.json() | |
| if not data.get('videos'): | |
| logging.warning(f"No videos found for query: {query}") | |
| return None | |
| # Try to find an SD video from the list of results | |
| for video in data['videos']: | |
| video_files = video['video_files'] | |
| sd_videos = [v for v in video_files if v['quality'] == 'sd'] | |
| if sd_videos: | |
| sd_videos.sort(key=lambda x: x['size']) | |
| return sd_videos[0]['link'] | |
| # If no SD videos found | |
| logging.warning(f"No SD quality videos found for query: {query}") | |
| return None | |
| except Exception as e: | |
| logging.error(f"Error fetching Pexels video (Attempt {attempt + 1}): {e}") | |
| # Remove the current key and continue with the next | |
| current_keys.pop(0) | |
| logging.warning(f"Pexels API key failed. Trying next key. Remaining keys: {len(current_keys)}") | |
| # If all attempts fail | |
| logging.error(f"Failed to fetch video for query: {query} after multiple attempts") | |
| return None | |
| def _sanitize_filename(self, filename): | |
| sanitized = re.sub(r'[^\w\-_\. ]', '_', filename) | |
| return sanitized[:50] | |
| # def generate_elevenlabs_voiceover(self, text, step_description, voice_id="JBFqnCBsd6RMkjVDRZzb"): | |
| # audio_stream = self.elevenlabs_client.text_to_speech.convert( | |
| # text=text, | |
| # voice_id=voice_id, | |
| # model_id="eleven_flash_v2", | |
| # output_format="mp3_44100_128", | |
| # voice_settings={ | |
| # "stability": 0.0, | |
| # "similarity_boost": 1.0, | |
| # "style": 0.0, | |
| # "use_speaker_boost": True | |
| # } | |
| # ) | |
| # sanitized_step = self._sanitize_filename(step_description) | |
| # audio_filename = f"{sanitized_step}_voiceover.mp3" | |
| # audio_path = os.path.join(self.audio_dir, audio_filename) | |
| # with open(audio_path, 'wb') as f: | |
| # for chunk in audio_stream: | |
| # if chunk: | |
| # f.write(chunk) | |
| # return audio_path | |
| def generate_voiceover(self, text, step_description, voice_id="aura-athena-en"): | |
| try: | |
| # Prepare text and filename | |
| text_payload = {"text": text} | |
| sanitized_step = self._sanitize_filename(step_description) | |
| audio_filename = f"{sanitized_step}_voiceover.mp3" | |
| audio_path = os.path.join(self.audio_dir, audio_filename) | |
| # Configure speak options | |
| options = SpeakOptions( | |
| model=voice_id, # Use the specified voice, with a default | |
| ) | |
| # Generate and save the audio | |
| response = self.deepgram_client.speak.v("1").save(audio_path, text_payload, options) | |
| logging.info(f"Generated voiceover for step: {step_description}") | |
| return audio_path | |
| except Exception as e: | |
| logging.error(f"Error generating voiceover for step: {step_description}") | |
| logging.error(f"Error details: {e}") | |
| return None | |
| def download_video(self, video_url, step_description): | |
| response = requests.get(video_url, stream=True) | |
| sanitized_step = self._sanitize_filename(step_description) | |
| video_filename = f"{sanitized_step}_video.mp4" | |
| video_path = os.path.join(self.video_dir, video_filename) | |
| with open(video_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return video_path | |
| def generate_step_video(self, step, query_type='narrator'): | |
| # Alternate between action and narrator for video query | |
| video_query = step['action'] if query_type == 'action' else step['narrator'] | |
| try: | |
| # Generate voiceover | |
| audio_path = self.generate_voiceover( | |
| step['narrator'], | |
| step['step_description'] | |
| ) | |
| logging.info(f"Generated voiceover for step: {step['step_description']}") | |
| # Fetch and download video | |
| video_url = self.fetch_pexels_video(video_query) | |
| video_path = self.download_video( | |
| video_url, | |
| step['step_description'] | |
| ) | |
| logging.info(f"Downloaded video for query: {video_query}") | |
| # Get audio duration | |
| with AudioFileClip(audio_path) as audio_clip: | |
| audio_duration = audio_clip.duration | |
| return { | |
| 'audio_path': audio_path, | |
| 'video_path': video_path, | |
| 'step_description': step['step_description'], | |
| 'narrator': step['narrator'], | |
| 'duration': audio_duration | |
| } | |
| except Exception as e: | |
| logging.error(f"Error generating video for step: {step['step_description']}") | |
| logging.error(f"Error details: {e}") | |
| return None | |
| def add_captions_to_video(self, video_clip, narrator_text, audio_duration): | |
| # Create a TextClip for the captions | |
| caption = TextClip( | |
| text=narrator_text, | |
| font='Arial', | |
| color='white', | |
| stroke_color='black', | |
| stroke_width=1, | |
| method='caption', | |
| size=(video_clip.w * 0.8, 50), # Narrower width, shorter height | |
| bg_color='rgba(0,0,0,0.5)', # Semi-transparent background | |
| text_align='center', | |
| horizontal_align='center', | |
| vertical_align='bottom' | |
| ) | |
| # Position the caption at the bottom of the video | |
| caption = caption.with_position(('center', 'bottom')) | |
| # Set the duration to match audio | |
| caption = caption.with_duration(audio_duration) | |
| # Composite the video with the caption | |
| return CompositeVideoClip([video_clip, caption]) | |
| def create_smooth_transition(self, clip1, clip2, transition_duration=1): | |
| """ | |
| Create a smooth crossfade transition between two video clips | |
| """ | |
| # Extract the underlying video clip if it's a CompositeVideoClip | |
| if isinstance(clip1, CompositeVideoClip): | |
| clip1 = clip1.clips[0] # Assume the first clip is the base video | |
| if isinstance(clip2, CompositeVideoClip): | |
| clip2 = clip2.clips[0] # Assume the first clip is the base video | |
| # Both clips should already be standardized, but verify sizes | |
| assert clip1.size == (self.target_width, self.target_height), "Clip1 size mismatch" | |
| assert clip2.size == (self.target_width, self.target_height), "Clip2 size mismatch" | |
| # Ensure both clips have the same size and fps | |
| if clip1.size != clip2.size: | |
| clip2 = clip2.resized(clip1.size) | |
| # Create a transition clip | |
| def transition_func(t): | |
| if t < transition_duration: | |
| # Linear crossfade | |
| alpha1 = 1 - (t / transition_duration) | |
| alpha2 = t / transition_duration | |
| # Get frames from both clips | |
| frame1 = clip1.get_frame(clip1.duration - transition_duration + t) | |
| frame2 = clip2.get_frame(t) | |
| # Blend frames | |
| blended_frame = (alpha1 * frame1 + alpha2 * frame2).astype(np.uint8) | |
| return blended_frame | |
| else: | |
| # After transition, return the second clip's frame | |
| return clip2.get_frame(t - transition_duration) | |
| # Create a transition clip | |
| transition_clip = VideoFileClip( | |
| filename, | |
| audio=False | |
| ).with_duration(transition_duration) | |
| transition_clip.get_frame = transition_func | |
| return transition_clip | |
| def process_video_clip(self, video_path, audio_path): | |
| try: | |
| # Load video and audio | |
| video_clip = VideoFileClip(video_path) | |
| audio_clip = AudioFileClip(audio_path) | |
| # Standardize video size first | |
| video_clip = self._standardize_video_size(video_clip) | |
| # Synchronize duration with constraints | |
| audio_duration = audio_clip.duration | |
| video_duration = video_clip.duration | |
| # Adjust clip duration to be within min and max constraints | |
| clip_duration = max( | |
| self.min_clip_duration, | |
| min(audio_duration, self.max_clip_duration, video_duration) | |
| ) | |
| # Safely trim video and audio to synchronized duration | |
| try: | |
| video_clip = video_clip.subclipped(0, min(clip_duration, video_duration)) | |
| except Exception as ve: | |
| logging.warning(f"Error trimming video clip: {ve}. Using full video duration.") | |
| video_clip = video_clip.subclipped(0, video_duration) | |
| try: | |
| audio_clip = audio_clip.subclipped(0, min(clip_duration, audio_duration)) | |
| except Exception as ae: | |
| logging.warning(f"Error trimming audio clip: {ae}. Using full audio duration.") | |
| audio_clip = audio_clip.subclipped(0, audio_duration) | |
| # Ensure audio and video have the same duration | |
| min_duration = min(video_clip.duration, audio_clip.duration) | |
| video_clip = video_clip.subclipped(0, min_duration) | |
| audio_clip = audio_clip.subclipped(0, min_duration) | |
| # Attach audio to video | |
| video_clip = video_clip.with_audio(audio_clip) | |
| return video_clip | |
| except Exception as e: | |
| logging.error(f"Comprehensive error processing video clip: {e}") | |
| # Additional diagnostic logging | |
| try: | |
| logging.error(f"Video path: {video_path}") | |
| logging.error(f"Audio path: {audio_path}") | |
| # Log file details if possible | |
| import os | |
| video_exists = os.path.exists(video_path) | |
| audio_exists = os.path.exists(audio_path) | |
| logging.error(f"Video file exists: {video_exists}") | |
| logging.error(f"Audio file exists: {audio_exists}") | |
| if video_exists: | |
| video_clip = VideoFileClip(video_path) | |
| logging.error(f"Video duration: {video_clip.duration}") | |
| if audio_exists: | |
| audio_clip = AudioFileClip(audio_path) | |
| logging.error(f"Audio duration: {audio_clip.duration}") | |
| except Exception as diag_error: | |
| logging.error(f"Additional diagnostic error: {diag_error}") | |
| return None | |
| def add_captions_to_video(self, video_clip, narrator_text, audio_duration): | |
| # Create a TextClip for the captions | |
| caption = TextClip( | |
| text=narrator_text, | |
| font='Arial', | |
| color='white', | |
| stroke_color='black', | |
| stroke_width=2, | |
| method='caption', | |
| size=(video_clip.w, 100), | |
| bg_color=None, | |
| text_align='center', | |
| horizontal_align='center', | |
| vertical_align='center' | |
| ) | |
| # Position the caption at the bottom of the video | |
| caption = caption.with_position(('center', 'bottom')) | |
| # Set the duration to match audio | |
| caption = caption.with_duration(audio_duration) | |
| # Composite the video with the caption | |
| return CompositeVideoClip([video_clip, caption]) | |
| def create_smooth_transition(self, clip1, clip2, transition_duration=1): | |
| """ | |
| Create a smooth crossfade transition between two video clips | |
| """ | |
| # Extract the underlying video clip if it's a CompositeVideoClip | |
| if isinstance(clip1, CompositeVideoClip): | |
| clip1 = clip1.clips[0] # Assume the first clip is the base video | |
| if isinstance(clip2, CompositeVideoClip): | |
| clip2 = clip2.clips[0] # Assume the first clip is the base video | |
| # Ensure both clips have the same size and fps | |
| if clip1.size != clip2.size: | |
| clip2 = clip2.resized(clip1.size) | |
| # Create a transition clip | |
| def transition_func(t): | |
| if t < transition_duration: | |
| # Linear crossfade | |
| alpha1 = 1 - (t / transition_duration) | |
| alpha2 = t / transition_duration | |
| # Get frames from both clips | |
| frame1 = clip1.get_frame(clip1.duration - transition_duration + t) | |
| frame2 = clip2.get_frame(t) | |
| # Blend frames | |
| blended_frame = (alpha1 * frame1 + alpha2 * frame2).astype(np.uint8) | |
| return blended_frame | |
| else: | |
| # After transition, return the second clip's frame | |
| return clip2.get_frame(t - transition_duration) | |
| # Create a transition clip | |
| transition_clip = VideoFileClip( | |
| clip1.filename if hasattr(clip1, 'filename') else '/tmp/transition.mp4', | |
| audio=False | |
| ).with_duration(transition_duration) | |
| transition_clip.get_frame = transition_func | |
| return transition_clip | |
| def _standardize_video_size(self, clip): | |
| """ | |
| Standardize video size to target resolution while maintaining aspect ratio | |
| """ | |
| # Get current clip size | |
| w, h = clip.size | |
| current_aspect_ratio = w/h | |
| if current_aspect_ratio > self.target_aspect_ratio: | |
| # Video is wider than target ratio | |
| new_width = self.target_width | |
| new_height = int(new_width / current_aspect_ratio) | |
| else: | |
| # Video is taller than target ratio | |
| new_height = self.target_height | |
| new_width = int(new_height * current_aspect_ratio) | |
| # Resize video | |
| resized_clip = clip.resized(width=new_width, height=new_height) | |
| # Create black background of target size | |
| from moviepy.video.VideoClip import ColorClip | |
| bg = ColorClip(size=(self.target_width, self.target_height), | |
| color=(0,0,0)) | |
| bg = bg.with_duration(clip.duration) | |
| # Center the video on the background | |
| x_offset = (self.target_width - new_width) // 2 | |
| y_offset = (self.target_height - new_height) // 2 | |
| final_clip = CompositeVideoClip([ | |
| bg, | |
| resized_clip.with_position((x_offset, y_offset)) | |
| ]) | |
| return final_clip | |
| def generate_sizzle_reel(self, script_json, app_name="CleverApp"): | |
| # Parse script based on input type | |
| print("Debug", script_json) | |
| if isinstance(script_json, str): | |
| script = json.loads(script_json) | |
| script_steps = script.get('sizzle_reel_script', []) | |
| elif isinstance(script_json, dict) and 'sizzle_reel_script' in script_json: | |
| # Handle the case where sizzle_reel_script is a CrewOutput object | |
| crew_output = script_json['sizzle_reel_script'] | |
| if hasattr(crew_output, 'raw'): | |
| # Parse the raw JSON string from CrewOutput | |
| try: | |
| parsed_data = json.loads(crew_output.raw) | |
| script_steps = parsed_data.get('sizzle_reel_script', []) | |
| except json.JSONDecodeError: | |
| logging.error("Failed to parse raw CrewOutput JSON") | |
| script_steps = [] | |
| else: | |
| script_steps = crew_output | |
| else: | |
| script_steps = [] | |
| # Process video steps | |
| processed_clips = [] | |
| # Convert to list if it's not already | |
| if not isinstance(script_steps, list): | |
| script_steps = [script_steps] | |
| for i, step in enumerate(script_steps): | |
| # Convert Pydantic model to dict if necessary | |
| if hasattr(step, 'dict'): | |
| step = step.dict() | |
| # Alternate query type | |
| query_type = 'narrator' if i % 2 == 0 else 'action' | |
| # Generate step video | |
| step_video = self.generate_step_video(step, query_type) | |
| if not step_video: | |
| logging.warning(f"Skipping step {i} due to video generation failure") | |
| continue | |
| # Process video clip | |
| processed_clip = self.process_video_clip( | |
| step_video['video_path'], | |
| step_video['audio_path'] | |
| ) | |
| if processed_clip: | |
| # Add captions | |
| try: | |
| captioned_clip = self.add_captions_to_video( | |
| processed_clip, | |
| step_video['narrator'], | |
| processed_clip.duration | |
| ) | |
| processed_clips.append(captioned_clip) | |
| except Exception as caption_error: | |
| logging.error(f"Error adding captions to clip {i}: {caption_error}") | |
| # Fallback: use processed clip without captions | |
| processed_clips.append(processed_clip) | |
| # Check if we have any processed clips | |
| if not processed_clips: | |
| logging.error("No video clips could be generated") | |
| return None | |
| # Concatenate processed clips with smooth transitions | |
| final_clips = [] | |
| for i in range(len(processed_clips) - 1): | |
| final_clips.append(processed_clips[i]) | |
| # Add transition between clips | |
| try: | |
| transition = self.create_smooth_transition( | |
| processed_clips[i], | |
| processed_clips[i+1], | |
| transition_duration=1 | |
| ) | |
| final_clips.append(transition) | |
| except Exception as transition_error: | |
| logging.warning(f"Could not create transition between clips {i} and {i+1}: {transition_error}") | |
| # Add the last clip | |
| final_clips.append(processed_clips[-1]) | |
| # Concatenate video clips | |
| try: | |
| final_video = concatenate_videoclips(final_clips, method="compose") | |
| except Exception as e: | |
| logging.error(f"Error concatenating video clips: {e}") | |
| return None | |
| # Sanitize app name for filename | |
| sanitized_app_name = re.sub(r'[^\w\-_\. ]', '_', app_name) | |
| # Output final video with app name | |
| output_filename = f"{sanitized_app_name}_sizzle_reel.mp4" | |
| output_path = os.path.join(self.final_dir, output_filename) | |
| try: | |
| final_video.write_videofile( | |
| output_path, | |
| codec='libx264', | |
| audio_codec='aac', | |
| preset='ultrafast', # Changed from 'medium' to 'ultrafast' | |
| threads=12, | |
| # bitrate='600k', | |
| fps=24, | |
| ) | |
| logging.info(f"Successfully generated final video: {output_path}") | |
| except Exception as e: | |
| logging.error(f"Error writing final video: {e}") | |
| return None | |
| # Clean up resources | |
| for clip in processed_clips + final_clips + [final_video]: | |
| try: | |
| clip.close() | |
| except Exception as close_error: | |
| logging.warning(f"Error closing clip: {close_error}") | |
| return output_path | |
| # Main execution | |
| if __name__ == "__main__": | |
| # Your sample script here | |
| sample_script = { | |
| "sizzle_reel_script": [ | |
| { | |
| "step_description": "Establishing the Problem", | |
| "narrator": "In a world filled with stress and uncertainty, finding peace can feel like an uphill battle.", | |
| "action": "A split-screen shows a person overwhelmed with stress on the left and another individual feeling calm and focused on the right.", | |
| "features": [] | |
| }, | |
| { | |
| "step_description": "The Solution", | |
| "narrator": "Meet MindMate, your personal companion for mental well-being. Track your moods, find balance, and embrace tranquility.", | |
| "action": "Transition to the MindMate app interface on a smartphone, showcasing its sleek design.", | |
| "features": [] | |
| }, | |
| { | |
| "step_description": "The Onboarding Experience", | |
| "narrator": "Start your journey by downloading the app and sharing your story through a simple assessment.", | |
| "action": "Quick clips of a user downloading the app, creating an account, and completing the initial assessment.", | |
| "features": [] | |
| }, | |
| { | |
| "step_description": "Daily Engagement", | |
| "narrator": "Log your emotions daily and receive gentle prompts to help you stay connected with your feelings.", | |
| "action": "User logs their feelings in the mood tracker, with reminders popping up on their phone.", | |
| "features": [ | |
| {"feature_name": "Mood Tracker", "description": "Log daily emotions."}, | |
| {"feature_name": "Reminders", "description": "Gentle prompts for mood check-ins."} | |
| ] | |
| }, | |
| { | |
| "step_description": "Tailored Support", | |
| "narrator": "Experience personalized meditation sessions designed just for you, based on your unique mood patterns.", | |
| "action": "User engages in a serene meditation environment, surrounded by calming visuals and soundscapes.", | |
| "features": [ | |
| {"feature_name": "Personalized Meditations", "description": "Meditations based on mood patterns."} | |
| ] | |
| }, | |
| { | |
| "step_description": "Empowering Analytics", | |
| "narrator": "Track your progress and discover how meditation can transform your mental health journey.", | |
| "action": "User reviews analytics on their progress, smiling as they see improvements.", | |
| "features": [ | |
| {"feature_name": "Progress Tracking", "description": "View mood trends and meditation effectiveness."} | |
| ] | |
| }, | |
| { | |
| "step_description": "Connection", | |
| "narrator": "Join a community of individuals just like you, sharing stories, and celebrating progress together.", | |
| "action": "Users share experiences and feedback within the app community, fostering connection and support.", | |
| "features": [ | |
| {"feature_name": "Community Support", "description": "Share experiences and feedback."} | |
| ] | |
| }, | |
| { | |
| "step_description": "Download Now", | |
| "narrator": "Ready to take the first step towards a healthier mind? Download MindMate today and start your journey to tranquility.", | |
| "action": "The MindMate logo appears with app store icons for iOS and Android.", | |
| "features": [] | |
| }, | |
| { | |
| "step_description": "Inspirational Tone", | |
| "narrator": "Your mental well-being matters. Let MindMate guide you.", | |
| "action": "A serene landscape with the tagline overlaid: 'MindMate - Your Path to Peace.'", | |
| "features": [] | |
| } | |
| ] | |
| } | |
| generator = SizzleReelVideoGenerator() | |
| final_video = generator.generate_sizzle_reel(sample_script, app_name="MindMate") | |
| print(f"Sizzle reel generated: {final_video}") |