AdGenesis-App / background_task /generation_tasks.py
userIdc2024's picture
Update background_task/generation_tasks.py
b0ee0be verified
import os
from datetime import datetime
from typing import Any, Dict, Optional
from generator_function.video_analyzer_services import run_and_store_video_analysis
from generator_function.script_generator import generate_scripts
from database.operations import insert_script_result, start_job, finish_job
from helpers_function.helpers import get_video_thumbnail_base64
from database.connections import get_results_collection
from generator_function.image_function import generate_images_parallel
from generator_function.image_processor import process_zip_and_generate_images
from helpers_function.helper_email import all_tasks_completed_notification
from core.task_enum import TaskType
from .utils import safe_copy_temp, safe_unlink
# ---------- TEXT TO IMAGE ----------
def background_text_to_image(
job_id: str,
progress_cb,
*,
model_key: str,
aspect_ratio: str,
prompt: str,
num_images: int,
category: Optional[str],
platform: Optional[str],
created_by: Optional[str],
) -> Dict[str, Any]:
progress_cb(job_id, 10, "Starting image generation...")
results_col = get_results_collection()
db_job_id: Optional[str] = None
if results_col is not None:
db_job_id = start_job(
results_col,
type="generation",
created_by=created_by,
category=(category or "general"),
inputs={
"model_key": model_key,
"aspect_ratio": aspect_ratio,
"num_images": num_images,
},
settings={"platform": platform},
user_prompt=prompt,
)
r2_urls, source_urls, errors = generate_images_parallel(
model_key, aspect_ratio, prompt, num_images
)
urls = r2_urls or source_urls
if results_col is not None and db_job_id:
finish_job(
results_col,
db_job_id,
status="completed" if urls else "failed",
outputs_urls=urls or [],
provider_update={"errors": errors} if errors else None,
)
progress_cb(job_id, 100, f"Generated {len(urls)} image(s).")
return {
"success": True,
"type": TaskType.TEXT_TO_IMAGE,
"urls": urls,
"errors": errors or [],
}
def register_text_to_image_tasks(task_manager) -> None:
task_manager.register(TaskType.TEXT_TO_IMAGE, background_text_to_image)
def enqueue_text_to_image(task_manager, **kwargs) -> str:
job_id = task_manager.create_job(TaskType.TEXT_TO_IMAGE)
task_manager.submit(job_id, background_text_to_image, **kwargs)
return job_id
# ---------- IMAGE GEN ----------
def background_image_gen(
job_id: str,
progress_cb,
*,
upload_path: str,
category: str,
size: str,
quality: str,
user_prompt: str,
sentiment: str,
platform: str,
num_images: int,
blur: bool,
created_by: Optional[str] = None,
) -> Dict[str, Any]:
"""Runs image generation from uploaded asset(s) (zip or single)."""
progress_cb(job_id, 10, "Processing input archive...")
images = process_zip_and_generate_images(
upload_path,
category,
size,
quality,
user_prompt,
sentiment,
platform,
num_images,
False,
[],
blur,
created_by or "anonymous",
)
progress_cb(job_id, 100, f"Generated {len(images)} images.")
if created_by:
all_tasks_completed_notification(created_by, category)
return {"success": True, "type": TaskType.IMAGE_GEN, "images": images or []}
def register_image_gen_tasks(task_manager) -> None:
task_manager.register(TaskType.IMAGE_GEN, background_image_gen)
def enqueue_image_gen(task_manager, **kwargs) -> str:
job_id = task_manager.create_job(TaskType.IMAGE_GEN)
task_manager.submit(job_id, background_image_gen, **kwargs)
return job_id
# ---------- VIDEO ANALYZER ----------
def background_video_analyzer(
job_id: str,
progress_cb,
*,
uploaded_file_path: str,
uploaded_file_name: str,
category: str,
created_by: Optional[str],
script_num: int = 3,
script_duration: int = 60,
offer_details: str = "",
target_audience: str = "",
specific_hooks: str = "",
additional_context: str = "",
task_manager=None,
) -> Dict[str, Any]:
progress_cb(job_id, 5, "Preparing video...")
if not os.path.exists(uploaded_file_path):
raise FileNotFoundError(uploaded_file_path)
tmp = safe_copy_temp(
uploaded_file_path,
suffix=(os.path.splitext(uploaded_file_name or ".mp4")[1] or ".mp4"),
)
try:
progress_cb(job_id, 20, "Analyzing video...")
result = run_and_store_video_analysis(
category=category,
uploaded_file_path=tmp,
created_by=created_by,
)
if not isinstance(result, dict) or not result.get("results"):
raise RuntimeError("Video analysis failed.")
progress_cb(job_id, 95, "Analysis saved.")
chained_job_id = None
if task_manager:
chained_job_id = enqueue_script_generation(
task_manager,
video_path=tmp,
video_name=uploaded_file_name,
offer_details=offer_details,
target_audience=target_audience,
specific_hooks=specific_hooks,
additional_context=additional_context,
num_scripts=script_num,
duration=script_duration,
created_by=created_by,
category=category,
)
progress_cb(job_id, 100, "Video analysis complete.")
if created_by:
all_tasks_completed_notification(created_by, category)
return {
"success": True,
"type": TaskType.VIDEO_ANALYZER,
"video_analysis_id": result.get("_id"),
"chained_script_job_id": chained_job_id,
}
finally:
safe_unlink(tmp)
def register_video_analyzer_tasks(task_manager) -> None:
task_manager.register(TaskType.VIDEO_ANALYZER, background_video_analyzer)
def enqueue_video_analyzer(task_manager, **kwargs) -> str:
job_id = task_manager.create_job(TaskType.VIDEO_ANALYZER)
task_manager.submit(
job_id, background_video_analyzer, task_manager=task_manager, **kwargs
)
return job_id
# ---------- SCRIPT GENERATOR ----------
def background_script_generation(
job_id: str,
progress_cb,
*,
video_path: str,
video_name: str,
offer_details: str,
target_audience: str,
specific_hooks: str,
additional_context: str,
num_scripts: int,
duration: int,
created_by: Optional[str],
category: Optional[str] = None,
) -> Dict[str, Any]:
"""Generates script variations from the provided video and persists the run."""
progress_cb(job_id, 5, "Preparing inputs...")
if not os.path.exists(video_path):
raise FileNotFoundError(video_path)
tmp = safe_copy_temp(
video_path, suffix=(os.path.splitext(video_name or ".mp4")[1] or ".mp4")
)
try:
progress_cb(job_id, 20, "Generating scripts...")
gen = generate_scripts(
tmp,
offer_details,
target_audience,
specific_hooks,
additional_context,
num_scripts=max(1, int(num_scripts)),
duration=max(0, int(duration)),
)
if not gen or "script_variations" not in gen or not gen["script_variations"]:
raise RuntimeError("Script generation returned no variations.")
packed_round = [
{"prompt_used": "Auto after analysis", "variations": gen["script_variations"]}
]
progress_cb(job_id, 75, "Creating thumbnail...")
thumb = ""
try:
thumb = get_video_thumbnail_base64(tmp) or ""
except Exception:
pass
progress_cb(job_id, 90, "Saving run...")
insert_script_result(
video_name=video_name or os.path.basename(tmp),
offer_details=offer_details or "",
target_audience=target_audience or "",
specific_hook=specific_hooks or "",
additional_context=additional_context or "",
response=packed_round,
thumbnail=thumb,
created_by=created_by,
num_scripts=len(gen["script_variations"]),
category=category or "general",
)
progress_cb(job_id, 100, "Scripts saved.")
if created_by:
all_tasks_completed_notification(created_by, category or video_name)
return {
"success": True,
"type": TaskType.SCRIPT_GENERATION,
"num_variations": len(gen["script_variations"]),
"created_at": datetime.utcnow().isoformat(),
}
finally:
safe_unlink(tmp)
def register_script_generator_tasks(task_manager) -> None:
task_manager.register(TaskType.SCRIPT_GENERATION, background_script_generation)
def enqueue_script_generation(task_manager, **kwargs) -> str:
job_id = task_manager.create_job(TaskType.SCRIPT_GENERATION)
task_manager.submit(job_id, background_script_generation, **kwargs)
return job_id