File size: 12,190 Bytes
287c9ca 09d5c67 5470dfc 287c9ca 41b47a8 287c9ca 50c620f 09d5c67 41b47a8 50c620f 5470dfc 41b47a8 09d5c67 b97795f 03bb9f6 5470dfc 09d5c67 41b47a8 09d5c67 b97795f 50c620f 41b47a8 09d5c67 41b47a8 50c620f 5470dfc 50c620f 41b47a8 5470dfc 41b47a8 50c620f 41b47a8 09d5c67 41b47a8 09d5c67 41b47a8 09d5c67 41b47a8 5470dfc 41b47a8 5470dfc 41b47a8 5470dfc 41b47a8 b97795f 41b47a8 09d5c67 287c9ca 09d5c67 41b47a8 b97795f 287c9ca 09d5c67 287c9ca 41b47a8 287c9ca 41b47a8 287c9ca 41b47a8 b97795f 41b47a8 09d5c67 41b47a8 b97795f 41b47a8 b97795f 41b47a8 b97795f 41b47a8 b97795f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# core/visual_engine.py
from PIL import Image, ImageDraw, ImageFont
from moviepy.editor import ImageClip, concatenate_videoclips
import os
import openai # Import OpenAI library
import requests # To download images from URLs
import io # To handle image data in memory
class VisualEngine:
def __init__(self, output_dir="temp_generated_media"):
self.output_dir = output_dir
os.makedirs(self.output_dir, exist_ok=True)
self.font_filename = "arial.ttf"
self.font_path_in_container = f"/usr/local/share/fonts/truetype/mycustomfonts/{self.font_filename}"
self.font_size_pil = 24
try:
self.font = ImageFont.truetype(self.font_path_in_container, self.font_size_pil)
print(f"Successfully loaded font: {self.font_path_in_container} for placeholders.")
except IOError:
print(f"Warning: Could not load font from '{self.font_path_in_container}'. Placeholders will use default font.")
self.font = ImageFont.load_default()
self.font_size_pil = 11
# --- OpenAI API Client Setup ---
self.openai_api_key = None
self.USE_AI_IMAGE_GENERATION = False # Default to False
try:
# Try to get API key from Hugging Face secrets (via Streamlit's secrets)
# This assumes app.py has loaded st.secrets["OPENAI_API_KEY"] into st.session_state
# A better way for a library class is to pass the key in or have it set globally
# For now, let's assume it will be set via a method or directly if running outside Streamlit context
# In app.py, you would do: st.session_state.visual_engine.set_openai_api_key(st.secrets["OPENAI_API_KEY"])
pass # Key will be set by set_openai_api_key method
except Exception as e:
print(f"OpenAI API key not immediately available for VisualEngine: {e}")
# You can choose DALL-E 2 or DALL-E 3. DALL-E 3 is generally better.
# For DALL-E 3, the 'model' parameter is "dall-e-3"
# For DALL-E 2, the 'model' parameter is "dall-e-2" (or implicitly if not specified for older image create)
self.dalle_model = "dall-e-3"
self.image_size = "1024x1024" # DALL-E 3 supports 1024x1024, 1792x1024, or 1024x1792
def set_openai_api_key(self, api_key):
if api_key:
self.openai_api_key = api_key
openai.api_key = self.openai_api_key # Set it for the openai library
self.USE_AI_IMAGE_GENERATION = True
print("OpenAI API key set. AI Image Generation Enabled with DALL-E.")
else:
self.USE_AI_IMAGE_GENERATION = False
print("OpenAI API key not provided. AI Image Generation Disabled. Using placeholders.")
def _get_text_dimensions(self, text_content, font_obj):
# ... (this method remains the same as your last working version) ...
if text_content == "" or text_content is None:
return 0, self.font_size_pil
try:
if hasattr(font_obj, 'getbbox'):
bbox = font_obj.getbbox(text_content)
width = bbox[2] - bbox[0]
height = bbox[3] - bbox[1]
return width, height if height > 0 else self.font_size_pil
elif hasattr(font_obj, 'getsize'):
width, height = font_obj.getsize(text_content)
return width, height if height > 0 else self.font_size_pil
else:
avg_char_width = self.font_size_pil * 0.6
height_estimate = self.font_size_pil * 1.2
return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil)
except Exception as e:
print(f"Warning: Error getting text dimensions for '{text_content}': {e}. Using estimates.")
avg_char_width = self.font_size_pil * 0.6
height_estimate = self.font_size_pil * 1.2
return int(len(text_content) * avg_char_width), int(height_estimate if height_estimate > 0 else self.font_size_pil)
def _create_placeholder_image_content(self, text_description, filename, size=(1024, 576)):
# ... (this method remains the same as your last working version) ...
img = Image.new('RGB', size, color=(30, 30, 60))
draw = ImageDraw.Draw(img)
padding = 30
max_text_width = size[0] - (2 * padding)
lines = []
if not text_description: text_description = "(No description provided for placeholder)"
words = text_description.split()
current_line = ""
for word in words:
test_line_candidate = current_line + word + " "
line_width, _ = self._get_text_dimensions(test_line_candidate.strip(), self.font)
if line_width <= max_text_width and current_line != "": current_line = test_line_candidate
elif line_width <= max_text_width and current_line == "": current_line = test_line_candidate
elif current_line != "":
lines.append(current_line.strip())
current_line = word + " "
else:
temp_word = word
while self._get_text_dimensions(temp_word, self.font)[0] > max_text_width and len(temp_word) > 0: temp_word = temp_word[:-1]
lines.append(temp_word)
current_line = ""
if current_line.strip(): lines.append(current_line.strip())
if not lines: lines.append("(Text error in placeholder)")
_, single_line_height = self._get_text_dimensions("Tg", self.font)
if single_line_height == 0: single_line_height = self.font_size_pil
line_spacing_factor = 1.3
estimated_line_block_height = len(lines) * single_line_height * line_spacing_factor
y_text = (size[1] - estimated_line_block_height) / 2.0
if y_text < padding: y_text = float(padding)
for line_idx, line in enumerate(lines):
if line_idx >= 7 and len(lines) > 8:
draw.text(xy=(float(padding), y_text), text="...", fill=(200, 200, 130), font=self.font)
break
line_width, _ = self._get_text_dimensions(line, self.font)
x_text = (size[0] - line_width) / 2.0
if x_text < padding: x_text = float(padding)
draw.text(xy=(x_text, y_text), text=line, fill=(220, 220, 150), font=self.font)
y_text += single_line_height * line_spacing_factor
filepath = os.path.join(self.output_dir, filename)
try:
img.save(filepath)
except Exception as e:
print(f"Error saving placeholder image {filepath}: {e}")
return None
return filepath
def generate_image_visual(self, image_prompt_text, scene_identifier_filename):
filepath = os.path.join(self.output_dir, scene_identifier_filename)
if self.USE_AI_IMAGE_GENERATION and self.openai_api_key:
try:
print(f"Generating DALL-E ({self.dalle_model}) image for: {image_prompt_text[:100]}...")
# Note: Prompts for DALL-E 3 are often best if they are quite descriptive.
# DALL-E 3 also automatically revises prompts to be more detailed if they are too short.
# You might want to consider passing the "revised_prompt" back to the UI if you display it.
# Using the newer client syntax for openai >= 1.0.0
client = openai.OpenAI(api_key=self.openai_api_key) # Initialize client with key
response = client.images.generate(
model=self.dalle_model,
prompt=image_prompt_text,
n=1, # Number of images to generate
size=self.image_size, # e.g., "1024x1024"
quality="standard", # or "hd" for DALL-E 3 (hd costs more)
response_format="url" # Get a URL to download the image
# style="vivid" # or "natural" for DALL-E 3
)
image_url = response.data[0].url
revised_prompt_dalle3 = response.data[0].revised_prompt # DALL-E 3 provides this
if revised_prompt_dalle3:
print(f"DALL-E 3 revised prompt: {revised_prompt_dalle3[:150]}...")
# Download the image from the URL
image_response = requests.get(image_url, timeout=30) # Added timeout
image_response.raise_for_status() # Raise an exception for bad status codes
# Save the image
img_data = Image.open(io.BytesIO(image_response.content))
# DALL-E images are usually PNG. Let's ensure we save as PNG.
# The filename already has .png from app.py, so this should be fine.
img_data.save(filepath)
print(f"AI Image (DALL-E) saved: {filepath}")
return filepath
except openai.APIError as e: # Catch OpenAI specific errors
print(f"OpenAI API Error generating image: {e}")
print(f"Status Code: {e.status_code}, Error Type: {e.type}")
print(f"Message: {e.message}")
except requests.exceptions.RequestException as e:
print(f"Error downloading image from DALL-E URL: {e}")
except Exception as e:
print(f"Generic error during DALL-E image generation or saving: {e}")
# Fallback to placeholder if any AI generation error occurs
print("Falling back to placeholder image due to DALL-E error.")
return self._create_placeholder_image_content(
f"[DALL-E Generation Failed] Original Prompt: {image_prompt_text}",
scene_identifier_filename
)
else: # Fallback to placeholder if AI generation is not enabled or API key missing
# print(f"AI image generation not enabled/ready. Creating placeholder for: {image_prompt_text[:70]}...")
return self._create_placeholder_image_content(image_prompt_text, scene_identifier_filename)
def create_video_from_images(self, image_paths, output_filename="final_video.mp4", fps=1, duration_per_image=3):
# ... (this method remains the same as your last working version) ...
if not image_paths:
print("No images provided to create video.")
return None
valid_image_paths = [p for p in image_paths if p and os.path.exists(p)]
if not valid_image_paths:
print("No valid image paths found to create video.")
return None
print(f"Attempting to create video from {len(valid_image_paths)} images.")
try:
clips = []
for m_path in valid_image_paths:
try:
clip = ImageClip(m_path).set_duration(duration_per_image)
clips.append(clip)
except Exception as e_clip:
print(f"Error creating ImageClip for {m_path}: {e_clip}. Skipping.")
if not clips:
print("Could not create any ImageClips.")
return None
video_clip = concatenate_videoclips(clips, method="compose")
output_path = os.path.join(self.output_dir, output_filename)
print(f"Writing video to: {output_path}")
video_clip.write_videofile(
output_path, fps=fps, codec='libx264', audio_codec='aac',
temp_audiofile=os.path.join(self.output_dir, f'temp-audio-{os.urandom(4).hex()}.m4a'),
remove_temp=True, threads=os.cpu_count() or 2, logger='bar'
)
for clip_to_close in clips: clip_to_close.close()
if hasattr(video_clip, 'close'): video_clip.close()
print(f"Video successfully created: {output_path}")
return output_path
except Exception as e:
print(f"Error during video creation: {e}")
return None |