Spaces:
Runtime error
Runtime error
from fastapi import FastAPI, HTTPException, Request | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import JSONResponse, FileResponse | |
from fastapi.staticfiles import StaticFiles | |
from pydantic import BaseModel, Field | |
import os | |
import requests | |
import logging | |
from pathlib import Path | |
from dotenv import load_dotenv | |
import math | |
import subprocess | |
import uuid | |
import time | |
import openai | |
import shlex | |
from PIL import Image as PILImage | |
from collections import defaultdict | |
import datetime | |
# Update logging configuration at the top | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('video_generation.log') | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Load environment variables | |
load_dotenv() | |
# Create directories | |
IMAGES_DIR = Path("static/images") | |
IMAGES_DIR.mkdir(parents=True, exist_ok=True) | |
AUDIO_DIR = Path("static/audio") | |
AUDIO_DIR.mkdir(parents=True, exist_ok=True) | |
VIDEOS_DIR = Path("static/videos") | |
VIDEOS_DIR.mkdir(parents=True, exist_ok=True) | |
FONTS_DIR = Path("static/fonts") | |
FONTS_DIR.mkdir(parents=True, exist_ok=True) | |
# Video settings | |
VIDEO_ORIENTATIONS = { | |
"horizontal": { | |
"width": 1920, | |
"height": 1080, | |
}, | |
"vertical": { | |
"width": 1080, | |
"height": 1920, | |
} | |
} | |
VIDEO_QUALITY = "high" # Options: low, medium, high | |
# Font settings for captions | |
FONT_FILE = "OpenSans-Bold.ttf" | |
FONT_PATH = str(FONTS_DIR / FONT_FILE) | |
# Caption settings for different orientations | |
CAPTION_SETTINGS = { | |
"horizontal": { | |
"font_size": 48, | |
"y_position": "(h-text_h-100)", # 100 pixels from bottom | |
"max_width": "w*0.9", # 90% of video width | |
"line_spacing": 20, | |
"box_opacity": 0.8, | |
"box_padding": 15, | |
"font_color": "white", | |
"border_width": 3, | |
"border_color": "black@0.9" | |
}, | |
"vertical": { | |
"font_size": 52, # Larger font for vertical videos | |
"y_position": "(h-text_h-150)", # 150 pixels from bottom | |
"max_width": "w*0.95", # 95% of video width for vertical | |
"line_spacing": 22, | |
"box_opacity": 0.85, | |
"box_padding": 20, | |
"font_color": "white", | |
"border_width": 3, | |
"border_color": "black@0.9" | |
} | |
} | |
# Track video generations per IP | |
video_generations = defaultdict(list) | |
MAX_GENERATIONS_PER_IP = 2 | |
def escape_text_for_ffmpeg(text): | |
"""Properly escape text for FFmpeg drawtext filter""" | |
# Replace problematic characters | |
escaped = text.replace("'", "'\\\\\\''") # Escape single quotes | |
escaped = escaped.replace(":", "\\:") # Escape colons | |
escaped = escaped.replace(",", "\\,") # Escape commas | |
escaped = escaped.replace("[", "\\[") # Escape square brackets | |
escaped = escaped.replace("]", "\\]") | |
return escaped | |
def format_caption_text(text, max_chars=50): | |
"""Format caption text into lines with proper breaks""" | |
words = text.split() | |
lines = [] | |
current_line = [] | |
current_length = 0 | |
for word in words: | |
if current_length + len(word) + 1 <= max_chars: | |
current_line.append(word) | |
current_length += len(word) + 1 | |
else: | |
if current_line: # Only add non-empty lines | |
lines.append(" ".join(current_line)) | |
current_line = [word] | |
current_length = len(word) | |
if current_line: | |
lines.append(" ".join(current_line)) | |
# Escape each line and join with literal \n for FFmpeg | |
return "\\n".join(escape_text_for_ffmpeg(line) for line in lines) | |
# Check for FFmpeg installation | |
try: | |
subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True) | |
logger.info("FFmpeg found and working") | |
except (subprocess.CalledProcessError, FileNotFoundError): | |
logger.error("FFmpeg not found. Please install FFmpeg first.") | |
raise RuntimeError("FFmpeg not found. Please install FFmpeg first.") | |
# Deepgram API Key | |
DEEPGRAM_API_KEY = "1195c1dd285413e0471756a171b9b8571df8908a" | |
app = FastAPI(title="Image Generation API") | |
# Mount static files directory | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# Configure CORS | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["http://localhost:3000", "http://localhost:3002"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
class ImageRequest(BaseModel): | |
prompt: str | |
class TextToSpeechRequest(BaseModel): | |
text: str | |
def get_image_dimensions(image_path): | |
"""Get the dimensions of an image file""" | |
with PILImage.open(image_path) as img: | |
return img.size | |
def download_image(url: str, filename: str) -> str: | |
"""Download image and return local path""" | |
filepath = IMAGES_DIR / filename | |
response = requests.get(url) | |
response.raise_for_status() | |
with open(filepath, "wb") as f: | |
f.write(response.content) | |
return f"/static/images/{filename}" | |
def save_scene_info(scene_number: str, image_path: str, audio_path: str): | |
"""Save scene information to a report file""" | |
report_file = Path("static/scene_report.txt") | |
# Convert paths to proper format | |
if image_path and not image_path.startswith('/'): | |
image_path = f"/static/images/{image_path}" | |
if audio_path and not audio_path.startswith('/'): | |
audio_path = f"/static/audio/{audio_path}" | |
# Read existing content | |
content = "" | |
if report_file.exists(): | |
with open(report_file, "r") as f: | |
content = f.read() | |
# Split into sections and filter out empty sections | |
sections = [s.strip() for s in content.split("-" * 50) if s.strip()] | |
# Create a dictionary of existing scenes | |
scene_dict = {} | |
for section in sections: | |
lines = section.strip().split("\n") | |
if lines and lines[0].startswith("Scene"): | |
curr_scene = lines[0].split()[1].strip(":") | |
scene_dict[curr_scene] = { | |
"image": lines[1].replace("Image: ", ""), | |
"audio": lines[2].replace("Audio: ", "") | |
} | |
# Update or add new scene | |
if scene_number in scene_dict: | |
if image_path: | |
scene_dict[scene_number]["image"] = image_path | |
if audio_path: | |
scene_dict[scene_number]["audio"] = audio_path | |
else: | |
scene_dict[scene_number] = { | |
"image": image_path, | |
"audio": audio_path | |
} | |
# Sort scenes by number and create new content | |
sorted_scenes = sorted(scene_dict.keys(), key=lambda x: int(x)) | |
new_content = [] | |
for scene in sorted_scenes: | |
new_content.extend([ | |
f"Scene {scene}:", | |
f"Image: {scene_dict[scene]['image']}", | |
f"Audio: {scene_dict[scene]['audio']}" | |
]) | |
new_content.append("-" * 50) | |
# Write back to file | |
with open(report_file, "w") as f: | |
f.write("\n".join(new_content) + "\n") | |
logger.info(f"Scene report updated for scene {scene_number}") | |
def get_scene_paths(scene_number: str) -> tuple: | |
"""Get image and audio paths for a scene from the report""" | |
report_file = Path("static/scene_report.txt") | |
if not report_file.exists(): | |
raise Exception("Scene report file not found") | |
with open(report_file, "r") as f: | |
content = f.read() | |
# Find the section for this scene | |
scene_marker = f"Scene {scene_number}:" | |
sections = content.split("-" * 50) | |
for section in sections: | |
if scene_marker in section: | |
lines = section.strip().split("\n") | |
image_path = lines[1].replace("Image: ", "") | |
audio_path = lines[2].replace("Audio: ", "") | |
return image_path, audio_path | |
raise Exception(f"Scene {scene_number} not found in report") | |
async def generate_image(request: ImageRequest): | |
try: | |
logger.info(f"Received prompt: {request.prompt}") | |
# Extract scene number | |
scene_number = '1' | |
if 'Scene' in request.prompt: | |
try: | |
scene_text = request.prompt.split('Scene')[1].strip() | |
scene_number = scene_text.split()[0] | |
except: | |
pass | |
# Generate random filename | |
filename = f"image_{uuid.uuid4()}.webp" | |
filepath = IMAGES_DIR / filename | |
# Generate image using OpenAI DALL-E | |
try: | |
response = requests.post( | |
"https://api.openai.com/v1/images/generations", | |
headers={ | |
"Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}", | |
"Content-Type": "application/json" | |
}, | |
json={ | |
"model": "dall-e-2", | |
"prompt": request.prompt, | |
"n": 1, | |
"size": "1024x1024", | |
"response_format": "url" | |
} | |
) | |
if response.status_code != 200: | |
logger.error(f"DALL-E API error: {response.text}") | |
raise HTTPException(status_code=500, detail="Failed to generate image with DALL-E") | |
result = response.json() | |
image_url = result['data'][0]['url'] | |
# Download and save the image | |
image_response = requests.get(image_url) | |
image_response.raise_for_status() | |
with open(filepath, "wb") as f: | |
f.write(image_response.content) | |
logger.info(f"Image saved locally at: {filepath}") | |
local_url = f"/static/images/{filename}" | |
try: | |
save_scene_info(scene_number, filename, "") | |
logger.info(f"Scene {scene_number} info saved to report") | |
except Exception as e: | |
logger.error(f"Error saving scene info: {str(e)}") | |
return JSONResponse({ | |
"imageUrl": f"http://localhost:8000/static/images/{filename}", | |
"imagePath": f"/static/images/{filename}" | |
}) | |
except Exception as e: | |
logger.error(f"Error with DALL-E API: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
except Exception as e: | |
logger.error(f"Error generating image: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def generate_voice(request: TextToSpeechRequest): | |
try: | |
# Extract scene number | |
scene_number = '1' | |
if 'Scene' in request.text: | |
try: | |
scene_text = request.text.split('Scene')[1].strip() | |
scene_number = scene_text.split()[0] | |
except: | |
pass | |
# Generate filename for this scene's audio | |
filename = f"audio_scene{scene_number}_{uuid.uuid4()}.mp3" | |
filepath = AUDIO_DIR / filename | |
# Extract the voiceover text | |
voiceover_text = request.text | |
if 'Voiceover\n' in voiceover_text: | |
voiceover_text = voiceover_text.split('Voiceover\n')[1].strip() | |
else: | |
voiceover_text = request.text | |
# Setup Deepgram request | |
DEEPGRAM_URL = "https://api.deepgram.com/v1/speak?model=aura-asteria-en" | |
headers = { | |
"Authorization": f"Token {DEEPGRAM_API_KEY}", | |
"Content-Type": "application/json" | |
} | |
# Generate and save audio | |
with open(filepath, 'wb') as file_stream: | |
response = requests.post( | |
DEEPGRAM_URL, | |
headers=headers, | |
json={"text": voiceover_text}, | |
stream=True | |
) | |
if not response.ok: | |
logger.error(f"Deepgram API error: {response.text}") | |
raise HTTPException(status_code=500, detail="Failed to generate audio") | |
for chunk in response.iter_content(chunk_size=1024): | |
if chunk: | |
file_stream.write(chunk) | |
logger.info(f"Audio saved for scene {scene_number} as {filename}") | |
# Update scene information with audio path | |
try: | |
# Get existing scene info | |
image_path, _ = get_scene_paths(scene_number) | |
# Update audio path | |
save_scene_info( | |
scene_number=scene_number, | |
image_path=image_path.split('/')[-1] if image_path else "", | |
audio_path=filename | |
) | |
logger.info(f"Scene {scene_number} updated with audio: {filename}") | |
except Exception as e: | |
logger.error(f"Could not update scene report: {str(e)}") | |
raise | |
return JSONResponse({ | |
"audioUrl": f"http://localhost:8000/static/audio/{filename}", | |
"audioPath": f"/static/audio/{filename}", | |
"sceneNumber": scene_number | |
}) | |
except Exception as e: | |
logger.error(f"Error generating voice: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def health_check(): | |
return {"status": "healthy"} | |
# Add this function for detailed error logging | |
def log_ffmpeg_error(error: subprocess.CalledProcessError, stage: str): | |
"""Log detailed FFmpeg error information""" | |
logger.error(f"FFmpeg error during {stage}:") | |
logger.error(f"Command that failed: {' '.join(error.cmd)}") | |
logger.error(f"Error output: {error.stderr.decode() if error.stderr else 'No error output'}") | |
logger.error(f"Standard output: {error.stdout.decode() if error.stdout else 'No standard output'}") | |
logger.error(f"Return code: {error.returncode}") | |
# Add this helper function at the top level, after the other utility functions | |
def extract_seconds(time_str): | |
"""Extract seconds from a time string that might include 'seconds' or other text""" | |
# Remove "seconds" and any other non-digit characters | |
return int(''.join(filter(str.isdigit, time_str))) | |
def can_generate_video(ip_address: str) -> bool: | |
"""Check if an IP can generate more videos""" | |
today = datetime.datetime.now().date() | |
# Clean up old entries | |
video_generations[ip_address] = [ | |
timestamp for timestamp in video_generations[ip_address] | |
if timestamp.date() == today | |
] | |
return len(video_generations[ip_address]) < MAX_GENERATIONS_PER_IP | |
async def generate_video(request: Request): | |
try: | |
client_ip = request.client.host | |
data = await request.json() | |
is_recreate = data.get("isRecreate", False) | |
if not is_recreate and not can_generate_video(client_ip): | |
raise HTTPException( | |
status_code=429, | |
detail="Daily video generation limit reached. You can still recreate existing videos." | |
) | |
scenes = data.get("scenes", []) | |
orientation = data.get("orientation", "horizontal") | |
if not scenes: | |
raise HTTPException(status_code=400, detail="No scenes provided") | |
logger.info(f"Starting video generation with {len(scenes)} scenes") | |
logger.info(f"Video orientation: {orientation}") | |
# Get video dimensions based on orientation | |
video_config = VIDEO_ORIENTATIONS[orientation] | |
width = video_config["width"] | |
height = video_config["height"] | |
# Process scenes and generate final video | |
output_video = process_scenes(scenes, orientation) | |
if not os.path.exists(output_video): | |
raise HTTPException(status_code=500, detail="Failed to generate video") | |
# Calculate total duration safely | |
total_duration = 0 | |
for scene in scenes: | |
try: | |
if isinstance(scene, dict) and 'time' in scene: | |
time_parts = scene['time'].split('-') | |
if len(time_parts) == 2: | |
start = extract_seconds(time_parts[0]) | |
end = extract_seconds(time_parts[1]) | |
total_duration += (end - start) | |
else: | |
total_duration += 5 # Default duration if time format is invalid | |
else: | |
total_duration += 5 # Default duration if time is missing | |
except (ValueError, IndexError): | |
total_duration += 5 # Default duration if parsing fails | |
# Prepare response with video details | |
video_details = { | |
"resolution": f"{width}x{height}", | |
"quality": "High", | |
"scenes": len(scenes), | |
"hasCaptions": any(scene.get("voiceover") for scene in scenes), | |
"duration": total_duration, | |
"orientation": orientation | |
} | |
logger.info("Video generation completed successfully") | |
logger.info(f"Video details: {video_details}") | |
# Track generation if not a recreation | |
if not is_recreate: | |
video_generations[client_ip].append(datetime.datetime.now()) | |
# Fix the video URL to include the full server URL | |
video_filename = os.path.basename(output_video) | |
return { | |
"videoUrl": f"http://localhost:8000/static/videos/{video_filename}", | |
"details": video_details, | |
"remainingGenerations": MAX_GENERATIONS_PER_IP - len(video_generations[client_ip]) | |
} | |
except Exception as e: | |
logger.error(f"Error generating video: {str(e)}", exc_info=True) | |
raise HTTPException(status_code=500, detail=str(e)) | |
def process_scenes(scenes, orientation): | |
"""Process scenes and generate final video""" | |
import shutil | |
try: | |
# Create directories | |
videos_dir = Path("static/videos") | |
temp_dir = videos_dir / "temp" | |
videos_dir.mkdir(exist_ok=True) | |
temp_dir.mkdir(exist_ok=True) | |
logger.info("Created temporary directories for video processing") | |
# Generate unique video filename | |
video_id = uuid.uuid4() | |
timestamp = int(time.time()) | |
video_filename = f"video_{timestamp}_{video_id}_{orientation}.mp4" | |
video_path = videos_dir / video_filename | |
# Get video dimensions | |
video_config = VIDEO_ORIENTATIONS[orientation] | |
VIDEO_WIDTH = video_config["width"] | |
VIDEO_HEIGHT = video_config["height"] | |
scene_videos = [] | |
# Process each scene | |
for i, scene in enumerate(scenes, 1): | |
logger.info(f"Processing scene {i}/{len(scenes)}") | |
try: | |
# Validate required scene properties | |
if not isinstance(scene, dict): | |
raise ValueError(f"Scene {i} is not a valid dictionary") | |
if 'time' not in scene: | |
logger.warning(f"Scene {i} missing time property, using default duration") | |
scene['time'] = "0-5" # Default 5 second duration | |
# Get paths from scene data | |
image_url = scene.get('imageUrl', '') | |
audio_url = scene.get('audioUrl', '') | |
voiceover = scene.get('voiceover', '') | |
if not image_url or not audio_url: | |
raise ValueError(f"Scene {i} missing required image or audio URL") | |
# Convert URLs to local paths | |
image_path = image_url.replace('http://localhost:8000', '') | |
audio_path = audio_url.replace('http://localhost:8000', '') | |
# Convert to Path objects | |
local_image_path = Path(image_path.lstrip('/')) | |
local_audio_path = Path(audio_path.lstrip('/')) | |
if not local_image_path.exists(): | |
raise Exception(f"Image file not found: {image_path}") | |
if not local_audio_path.exists(): | |
raise Exception(f"Audio file not found: {audio_path}") | |
# Parse time safely | |
try: | |
time_parts = scene['time'].split('-') | |
start = extract_seconds(time_parts[0]) | |
end = extract_seconds(time_parts[1]) | |
duration = end - start | |
if duration <= 0: | |
logger.warning(f"Scene {i} has invalid duration, using default") | |
duration = 5 | |
except (IndexError, ValueError) as e: | |
logger.warning(f"Scene {i} has invalid time format, using default duration: {e}") | |
duration = 5 | |
# Get image dimensions | |
img_width, img_height = get_image_dimensions(local_image_path) | |
# Calculate scaling and padding with improved logic | |
target_ratio = VIDEO_WIDTH / VIDEO_HEIGHT | |
img_ratio = img_width / img_height | |
if orientation == "horizontal": | |
# For horizontal videos (16:9) | |
if img_ratio > target_ratio: | |
# Image is wider than target ratio | |
scale_width = VIDEO_WIDTH | |
scale_height = int(VIDEO_WIDTH / img_ratio) | |
pad_x = 0 | |
pad_y = (VIDEO_HEIGHT - scale_height) // 2 | |
else: | |
# Image is taller than target ratio | |
scale_height = VIDEO_HEIGHT | |
scale_width = int(VIDEO_HEIGHT * img_ratio) | |
pad_x = (VIDEO_WIDTH - scale_width) // 2 | |
pad_y = 0 | |
else: | |
# For vertical videos (9:16) | |
target_height = VIDEO_HEIGHT | |
target_width = VIDEO_WIDTH | |
# Calculate dimensions to fit within target while maintaining aspect ratio | |
if img_ratio > (target_width / target_height): | |
# Image is relatively wider | |
scale_width = target_width | |
scale_height = int(target_width / img_ratio) | |
pad_x = 0 | |
pad_y = (target_height - scale_height) // 2 | |
else: | |
# Image is relatively taller | |
scale_height = target_height | |
scale_width = int(target_height * img_ratio) | |
pad_x = (target_width - scale_width) // 2 | |
pad_y = 0 | |
# Create video from image | |
scene_video = temp_dir / f"scene_{i}.mp4" | |
# Update the FFmpeg command for better quality | |
img_cmd = [ | |
'ffmpeg', '-y', | |
'-loop', '1', | |
'-i', str(local_image_path), | |
'-c:v', 'libx264', | |
'-t', str(duration), | |
'-pix_fmt', 'yuv420p', | |
'-vf', ( | |
f'scale={scale_width}:{scale_height}:force_original_aspect_ratio=decrease,' | |
f'pad={VIDEO_WIDTH}:{VIDEO_HEIGHT}:{pad_x}:{pad_y}:color=black,' | |
'format=yuv420p' | |
), | |
'-preset', 'slow', | |
'-crf', '18', # Lower CRF for better quality (range 0-51, lower is better) | |
str(scene_video) | |
] | |
logger.info(f"Scene {i}: Creating base video from image") | |
try: | |
subprocess.run(img_cmd, check=True, capture_output=True) | |
logger.info(f"Scene {i}: Base video created successfully") | |
except subprocess.CalledProcessError as e: | |
log_ffmpeg_error(e, f"scene {i} image to video conversion") | |
raise | |
# Add audio to video | |
scene_with_audio = temp_dir / f"scene_{i}_with_audio.mp4" | |
audio_cmd = [ | |
'ffmpeg', '-y', | |
'-i', str(scene_video), | |
'-i', str(local_audio_path), | |
'-c:v', 'copy', | |
'-c:a', 'aac', | |
'-b:a', '192k', | |
str(scene_with_audio) | |
] | |
logger.info(f"Scene {i}: Adding audio") | |
try: | |
subprocess.run(audio_cmd, check=True, capture_output=True) | |
logger.info(f"Scene {i}: Audio added successfully") | |
except subprocess.CalledProcessError as e: | |
log_ffmpeg_error(e, f"scene {i} audio addition") | |
raise | |
# Add captions if voiceover exists | |
if voiceover: | |
logger.info(f"Scene {i}: Adding captions") | |
caption_video = temp_dir / f"scene_{i}_with_caption.mp4" | |
# Format and escape caption text | |
formatted_caption = format_caption_text(voiceover) | |
# Get caption settings for current orientation | |
caption_config = CAPTION_SETTINGS[orientation] | |
# Create improved drawtext filter with better positioning and styling | |
caption_filter = ( | |
f"drawtext=fontfile={FONT_PATH}:" | |
f"text='{formatted_caption}':" | |
f"fontcolor={caption_config['font_color']}:" | |
f"fontsize={caption_config['font_size']}:" | |
f"line_spacing={caption_config['line_spacing']}:" | |
f"x=(w-text_w)/2:" # Center horizontally | |
f"y={caption_config['y_position']}:" # Position from bottom | |
f"box=1:" | |
f"boxcolor=black@{caption_config['box_opacity']}:" | |
f"boxborderw={caption_config['box_padding']}:" | |
f"bordercolor={caption_config['border_color']}:" | |
f"borderw={caption_config['border_width']}:" | |
f"fix_bounds=true:" | |
f"shadowcolor=black@0.7:" # Add shadow for better readability | |
f"shadowx=2:" | |
f"shadowy=2:" | |
f"expansion=normal" | |
) | |
try: | |
drawtext_cmd = [ | |
'ffmpeg', '-y', | |
'-i', str(scene_with_audio), | |
'-vf', caption_filter, | |
'-codec:a', 'copy', | |
str(caption_video) | |
] | |
subprocess.run(drawtext_cmd, check=True, capture_output=True) | |
logger.info(f"Scene {i}: Captions added successfully") | |
scene_videos.append(caption_video) | |
except subprocess.CalledProcessError as e: | |
log_ffmpeg_error(e, f"scene {i} caption addition") | |
logger.warning(f"Scene {i}: Falling back to video without captions") | |
scene_videos.append(scene_with_audio) | |
else: | |
logger.info(f"Scene {i}: No captions to add") | |
scene_videos.append(scene_with_audio) | |
except Exception as e: | |
logger.error(f"Error processing scene {i}: {str(e)}", exc_info=True) | |
raise | |
logger.info("All scenes processed, creating final video") | |
# Create concat file | |
concat_file = temp_dir / "concat.txt" | |
with open(concat_file, 'w') as f: | |
for video in scene_videos: | |
f.write(f"file '{video.absolute()}'\n") | |
# Final concatenation command | |
concat_cmd = [ | |
'ffmpeg', '-y', | |
'-f', 'concat', | |
'-safe', '0', | |
'-i', str(concat_file), | |
'-c:v', 'libx264', | |
'-preset', 'slow', | |
'-crf', '18', | |
'-c:a', 'aac', | |
'-b:a', '192k', | |
str(video_path) | |
] | |
try: | |
subprocess.run(concat_cmd, check=True, capture_output=True) | |
logger.info(f"Successfully generated final video: {video_filename}") | |
except subprocess.CalledProcessError as e: | |
log_ffmpeg_error(e, "final video concatenation") | |
raise Exception("Failed to concatenate videos") | |
return video_path | |
except Exception as e: | |
logger.error(f"Error in process_scenes: {str(e)}", exc_info=True) | |
raise | |
finally: | |
# Clean up temporary files | |
if temp_dir.exists(): | |
shutil.rmtree(temp_dir) | |
logger.info("Cleaned up temporary files") | |
if __name__ == "__main__": | |
import uvicorn | |
# Check for API token | |
if not os.getenv('OPENAI_API_KEY'): | |
logger.warning("OPENAI_API_KEY not set in environment!") | |
else: | |
logger.info("OPENAI_API_KEY found in environment") | |
logger.info("Starting FastAPI server...") | |
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info") |