| """Extensive generation (researcher → creative director → designer → copywriter).""" |
|
|
| import asyncio |
| import uuid |
| from typing import Dict, Any, Optional |
| from fastapi import APIRouter, HTTPException, Depends |
|
|
| from api.schemas import ( |
| ExtensiveGenerateRequest, |
| ExtensiveJobResponse, |
| BatchResponse, |
| InventOnlyRequest, |
| InventOnlyResponse, |
| InventedEssentialSchema, |
| ) |
| from services.generator import ad_generator |
| from services.auth_dependency import get_current_user |
|
|
| router = APIRouter(tags=["extensive"]) |
|
|
| _extensive_jobs: Dict[str, Dict[str, Any]] = {} |
|
|
|
|
| async def _run_extensive_job_async( |
| job_id: str, |
| username: str, |
| effective_niche: str, |
| target_audience: Optional[str], |
| offer: Optional[str], |
| num_images: int, |
| image_model: Optional[str], |
| num_strategies: int, |
| use_creative_inventor: bool = True, |
| trend_context: Optional[str] = None, |
| ): |
| """Run extensive generation on the main event loop.""" |
| import logging |
| api_logger = logging.getLogger("api") |
| try: |
| results = await ad_generator.generate_ad_extensive( |
| niche=effective_niche, |
| target_audience=target_audience, |
| offer=offer, |
| num_images=num_images, |
| image_model=image_model, |
| num_strategies=num_strategies, |
| username=username, |
| use_creative_inventor=use_creative_inventor, |
| trend_context=trend_context, |
| ) |
| _extensive_jobs[job_id]["status"] = "completed" |
| _extensive_jobs[job_id]["result"] = BatchResponse(count=len(results), ads=results) |
| except Exception as e: |
| api_logger.exception("Extensive job %s failed", job_id) |
| _extensive_jobs[job_id]["status"] = "failed" |
| _extensive_jobs[job_id]["error"] = str(e) |
|
|
|
|
| @router.post("/extensive/generate", status_code=202) |
| async def generate_extensive( |
| request: ExtensiveGenerateRequest, |
| username: str = Depends(get_current_user), |
| ): |
| """ |
| Start extensive ad generation. Returns 202 with job_id. |
| Poll GET /extensive/status/{job_id} then GET /extensive/result/{job_id}. |
| """ |
| if request.niche == "others": |
| if not request.custom_niche or not request.custom_niche.strip(): |
| raise HTTPException(status_code=400, detail="custom_niche is required when niche is 'others'") |
| effective_niche = request.custom_niche.strip() |
| else: |
| effective_niche = request.niche |
|
|
| job_id = str(uuid.uuid4()) |
| _extensive_jobs[job_id] = { |
| "status": "running", |
| "result": None, |
| "error": None, |
| "username": username, |
| } |
| asyncio.create_task( |
| _run_extensive_job_async( |
| job_id, |
| username, |
| effective_niche, |
| request.target_audience, |
| request.offer, |
| request.num_images, |
| request.image_model, |
| request.num_strategies, |
| getattr(request, "use_creative_inventor", True), |
| getattr(request, "trend_context", None), |
| ) |
| ) |
| return ExtensiveJobResponse(job_id=job_id) |
|
|
|
|
| @router.get("/extensive/status/{job_id}") |
| async def extensive_job_status( |
| job_id: str, |
| username: str = Depends(get_current_user), |
| ): |
| """Get status of an extensive generation job.""" |
| if job_id not in _extensive_jobs: |
| raise HTTPException(status_code=404, detail="Job not found") |
| job = _extensive_jobs[job_id] |
| if job["username"] != username: |
| raise HTTPException(status_code=404, detail="Job not found") |
| return { |
| "job_id": job_id, |
| "status": job["status"], |
| "error": job.get("error") if job["status"] == "failed" else None, |
| } |
|
|
|
|
| @router.get("/extensive/result/{job_id}", response_model=BatchResponse) |
| async def extensive_job_result( |
| job_id: str, |
| username: str = Depends(get_current_user), |
| ): |
| """Get result of a completed extensive generation job. 425 if still running.""" |
| if job_id not in _extensive_jobs: |
| raise HTTPException(status_code=404, detail="Job not found") |
| job = _extensive_jobs[job_id] |
| if job["username"] != username: |
| raise HTTPException(status_code=404, detail="Job not found") |
| if job["status"] == "running": |
| raise HTTPException(status_code=425, detail="Generation still in progress") |
| if job["status"] == "failed": |
| raise HTTPException(status_code=500, detail=job.get("error", "Generation failed")) |
| return job["result"] |
|
|
|
|
| @router.post("/extensive/invent", response_model=InventOnlyResponse) |
| async def invent_only( |
| request: InventOnlyRequest, |
| username: str = Depends(get_current_user), |
| ): |
| """ |
| Invent new ad angles, concepts, visuals, and psychological triggers only (no ad generation). |
| Returns structured essentials for review, export, or later use in generation. |
| """ |
| from services.creative_inventor import creative_inventor_service |
|
|
| niche_display = request.niche.replace("_", " ").title() |
| offer = request.offer or f"Get the best {niche_display} solution" |
|
|
| essentials = await asyncio.to_thread( |
| creative_inventor_service.invent, |
| niche=niche_display, |
| offer=offer, |
| n=request.n, |
| target_audience_hint=request.target_audience, |
| trend_context=request.trend_context, |
| ) |
|
|
| schema_list = [ |
| InventedEssentialSchema( |
| psychology_trigger=e.psychology_trigger, |
| angles=e.angles, |
| concepts=e.concepts, |
| visual_directions=e.visual_directions, |
| hooks=getattr(e, "hooks", []) or [], |
| visual_styles=getattr(e, "visual_styles", []) or [], |
| target_audience=getattr(e, "target_audience", "") or "", |
| ) |
| for e in essentials |
| ] |
|
|
| export_text = None |
| if request.export_as_text and essentials: |
| export_text = await asyncio.to_thread( |
| creative_inventor_service.invent_and_export, |
| niche=niche_display, |
| offer=offer, |
| essentials=essentials, |
| target_audience_hint=request.target_audience, |
| ) |
|
|
| return InventOnlyResponse(essentials=schema_list, export_text=export_text) |
|
|