Spaces:
Running
Running
| import os | |
| from datetime import datetime | |
| from typing import Any, Dict, Optional | |
| from generator_function.video_analyzer_services import run_and_store_video_analysis | |
| from generator_function.script_generator import generate_scripts | |
| from database.operations import insert_script_result, start_job, finish_job | |
| from helpers_function.helpers import get_video_thumbnail_base64 | |
| from database.connections import get_results_collection | |
| from generator_function.image_function import generate_images_parallel | |
| from generator_function.image_processor import process_zip_and_generate_images | |
| from helpers_function.helper_email import all_tasks_completed_notification | |
| from core.task_enum import TaskType | |
| from .utils import safe_copy_temp, safe_unlink | |
| # ---------- TEXT TO IMAGE ---------- | |
| def background_text_to_image( | |
| job_id: str, | |
| progress_cb, | |
| *, | |
| model_key: str, | |
| aspect_ratio: str, | |
| prompt: str, | |
| num_images: int, | |
| category: Optional[str], | |
| platform: Optional[str], | |
| created_by: Optional[str], | |
| ) -> Dict[str, Any]: | |
| progress_cb(job_id, 10, "Starting image generation...") | |
| results_col = get_results_collection() | |
| db_job_id: Optional[str] = None | |
| if results_col is not None: | |
| db_job_id = start_job( | |
| results_col, | |
| type="generation", | |
| created_by=created_by, | |
| category=(category or "general"), | |
| inputs={ | |
| "model_key": model_key, | |
| "aspect_ratio": aspect_ratio, | |
| "num_images": num_images, | |
| }, | |
| settings={"platform": platform}, | |
| user_prompt=prompt, | |
| ) | |
| r2_urls, source_urls, errors = generate_images_parallel( | |
| model_key, aspect_ratio, prompt, num_images | |
| ) | |
| urls = r2_urls or source_urls | |
| if results_col is not None and db_job_id: | |
| finish_job( | |
| results_col, | |
| db_job_id, | |
| status="completed" if urls else "failed", | |
| outputs_urls=urls or [], | |
| provider_update={"errors": errors} if errors else None, | |
| ) | |
| progress_cb(job_id, 100, f"Generated {len(urls)} image(s).") | |
| return { | |
| "success": True, | |
| "type": TaskType.TEXT_TO_IMAGE, | |
| "urls": urls, | |
| "errors": errors or [], | |
| } | |
| def register_text_to_image_tasks(task_manager) -> None: | |
| task_manager.register(TaskType.TEXT_TO_IMAGE, background_text_to_image) | |
| def enqueue_text_to_image(task_manager, **kwargs) -> str: | |
| job_id = task_manager.create_job(TaskType.TEXT_TO_IMAGE) | |
| task_manager.submit(job_id, background_text_to_image, **kwargs) | |
| return job_id | |
| # ---------- IMAGE GEN ---------- | |
| def background_image_gen( | |
| job_id: str, | |
| progress_cb, | |
| *, | |
| upload_path: str, | |
| category: str, | |
| size: str, | |
| quality: str, | |
| user_prompt: str, | |
| sentiment: str, | |
| platform: str, | |
| num_images: int, | |
| blur: bool, | |
| created_by: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """Runs image generation from uploaded asset(s) (zip or single).""" | |
| progress_cb(job_id, 10, "Processing input archive...") | |
| images = process_zip_and_generate_images( | |
| upload_path, | |
| category, | |
| size, | |
| quality, | |
| user_prompt, | |
| sentiment, | |
| platform, | |
| num_images, | |
| False, | |
| [], | |
| blur, | |
| created_by or "anonymous", | |
| ) | |
| progress_cb(job_id, 100, f"Generated {len(images)} images.") | |
| if created_by: | |
| all_tasks_completed_notification(created_by, category) | |
| return {"success": True, "type": TaskType.IMAGE_GEN, "images": images or []} | |
| def register_image_gen_tasks(task_manager) -> None: | |
| task_manager.register(TaskType.IMAGE_GEN, background_image_gen) | |
| def enqueue_image_gen(task_manager, **kwargs) -> str: | |
| job_id = task_manager.create_job(TaskType.IMAGE_GEN) | |
| task_manager.submit(job_id, background_image_gen, **kwargs) | |
| return job_id | |
| # ---------- VIDEO ANALYZER ---------- | |
| def background_video_analyzer( | |
| job_id: str, | |
| progress_cb, | |
| *, | |
| uploaded_file_path: str, | |
| uploaded_file_name: str, | |
| category: str, | |
| created_by: Optional[str], | |
| script_num: int = 3, | |
| script_duration: int = 60, | |
| offer_details: str = "", | |
| target_audience: str = "", | |
| specific_hooks: str = "", | |
| additional_context: str = "", | |
| task_manager=None, | |
| ) -> Dict[str, Any]: | |
| progress_cb(job_id, 5, "Preparing video...") | |
| if not os.path.exists(uploaded_file_path): | |
| raise FileNotFoundError(uploaded_file_path) | |
| tmp = safe_copy_temp( | |
| uploaded_file_path, | |
| suffix=(os.path.splitext(uploaded_file_name or ".mp4")[1] or ".mp4"), | |
| ) | |
| try: | |
| progress_cb(job_id, 20, "Analyzing video...") | |
| result = run_and_store_video_analysis( | |
| category=category, | |
| uploaded_file_path=tmp, | |
| created_by=created_by, | |
| ) | |
| if not isinstance(result, dict) or not result.get("results"): | |
| raise RuntimeError("Video analysis failed.") | |
| progress_cb(job_id, 95, "Analysis saved.") | |
| chained_job_id = None | |
| if task_manager: | |
| chained_job_id = enqueue_script_generation( | |
| task_manager, | |
| video_path=tmp, | |
| video_name=uploaded_file_name, | |
| offer_details=offer_details, | |
| target_audience=target_audience, | |
| specific_hooks=specific_hooks, | |
| additional_context=additional_context, | |
| num_scripts=script_num, | |
| duration=script_duration, | |
| created_by=created_by, | |
| category=category, | |
| ) | |
| progress_cb(job_id, 100, "Video analysis complete.") | |
| if created_by: | |
| all_tasks_completed_notification(created_by, category) | |
| return { | |
| "success": True, | |
| "type": TaskType.VIDEO_ANALYZER, | |
| "video_analysis_id": result.get("_id"), | |
| "chained_script_job_id": chained_job_id, | |
| } | |
| finally: | |
| safe_unlink(tmp) | |
| def register_video_analyzer_tasks(task_manager) -> None: | |
| task_manager.register(TaskType.VIDEO_ANALYZER, background_video_analyzer) | |
| def enqueue_video_analyzer(task_manager, **kwargs) -> str: | |
| job_id = task_manager.create_job(TaskType.VIDEO_ANALYZER) | |
| task_manager.submit( | |
| job_id, background_video_analyzer, task_manager=task_manager, **kwargs | |
| ) | |
| return job_id | |
| # ---------- SCRIPT GENERATOR ---------- | |
| def background_script_generation( | |
| job_id: str, | |
| progress_cb, | |
| *, | |
| video_path: str, | |
| video_name: str, | |
| offer_details: str, | |
| target_audience: str, | |
| specific_hooks: str, | |
| additional_context: str, | |
| num_scripts: int, | |
| duration: int, | |
| created_by: Optional[str], | |
| category: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """Generates script variations from the provided video and persists the run.""" | |
| progress_cb(job_id, 5, "Preparing inputs...") | |
| if not os.path.exists(video_path): | |
| raise FileNotFoundError(video_path) | |
| tmp = safe_copy_temp( | |
| video_path, suffix=(os.path.splitext(video_name or ".mp4")[1] or ".mp4") | |
| ) | |
| try: | |
| progress_cb(job_id, 20, "Generating scripts...") | |
| gen = generate_scripts( | |
| tmp, | |
| offer_details, | |
| target_audience, | |
| specific_hooks, | |
| additional_context, | |
| num_scripts=max(1, int(num_scripts)), | |
| duration=max(0, int(duration)), | |
| ) | |
| if not gen or "script_variations" not in gen or not gen["script_variations"]: | |
| raise RuntimeError("Script generation returned no variations.") | |
| packed_round = [ | |
| {"prompt_used": "Auto after analysis", "variations": gen["script_variations"]} | |
| ] | |
| progress_cb(job_id, 75, "Creating thumbnail...") | |
| thumb = "" | |
| try: | |
| thumb = get_video_thumbnail_base64(tmp) or "" | |
| except Exception: | |
| pass | |
| progress_cb(job_id, 90, "Saving run...") | |
| insert_script_result( | |
| video_name=video_name or os.path.basename(tmp), | |
| offer_details=offer_details or "", | |
| target_audience=target_audience or "", | |
| specific_hook=specific_hooks or "", | |
| additional_context=additional_context or "", | |
| response=packed_round, | |
| thumbnail=thumb, | |
| created_by=created_by, | |
| num_scripts=len(gen["script_variations"]), | |
| category=category or "general", | |
| ) | |
| progress_cb(job_id, 100, "Scripts saved.") | |
| if created_by: | |
| all_tasks_completed_notification(created_by, category or video_name) | |
| return { | |
| "success": True, | |
| "type": TaskType.SCRIPT_GENERATION, | |
| "num_variations": len(gen["script_variations"]), | |
| "created_at": datetime.utcnow().isoformat(), | |
| } | |
| finally: | |
| safe_unlink(tmp) | |
| def register_script_generator_tasks(task_manager) -> None: | |
| task_manager.register(TaskType.SCRIPT_GENERATION, background_script_generation) | |
| def enqueue_script_generation(task_manager, **kwargs) -> str: | |
| job_id = task_manager.create_job(TaskType.SCRIPT_GENERATION) | |
| task_manager.submit(job_id, background_script_generation, **kwargs) | |
| return job_id | |