admaker / api /server.py
karthikeya1212's picture
Upload 24 files
eb8c5e1 verified
raw
history blame
12.8 kB
# # api/server.py
# from fastapi import FastAPI, HTTPException
# from pydantic import BaseModel
# import asyncio
# from pipeline.pipeline_runner import run_pipeline_task
# app = FastAPI(title="AI ADD Maker API", version="1.0")
# # Request body schema
# class GenerateRequest(BaseModel):
# idea: str
# # Response schema (optional, for docs clarity)
# class GenerateResponse(BaseModel):
# task_id: str
# scenes: list
# images: list
# video: dict
# music: dict
# final_output: dict
# @app.post("/generate", response_model=GenerateResponse)
# async def generate_video(request: GenerateRequest):
# """
# Trigger the AI Ad/Video pipeline with a product or idea description.
# Returns all intermediate and final outputs.
# """
# idea = request.idea.strip()
# if not idea:
# raise HTTPException(status_code=400, detail="Idea cannot be empty.")
# try:
# # Run the pipeline asynchronously
# result = await run_pipeline_task(idea)
# return result
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Pipeline failed: {str(e)}")
# # Optional health check endpoint
# @app.get("/health")
# async def health_check():
# return {"status": "ok"}
# # server.py
# import uuid
# import asyncio
# from fastapi import FastAPI, HTTPException
# from pydantic import BaseModel
# from queue import QueueManager # your queue.py module
# import logging
# # -------------------------------
# # Setup logging
# # -------------------------------
# logging.basicConfig(
# level=logging.INFO,
# format="%(asctime)s [%(levelname)s] %(message)s"
# )
# # -------------------------------
# # FastAPI app
# # -------------------------------
# app = FastAPI(title="ADD Maker Server", version="1.0")
# # -------------------------------
# # Pydantic models
# # -------------------------------
# class IdeaRequest(BaseModel):
# idea: str
# class ConfirmationRequest(BaseModel):
# task_id: str
# confirm: bool
# # -------------------------------
# # Queue Manager Instance
# # -------------------------------
# queue_manager = QueueManager()
# # In-memory task storage for confirmation
# pending_confirmations = {} # task_id -> asyncio.Event
# # -------------------------------
# # Helper function for confirmation
# # -------------------------------
# async def wait_for_confirmation(task_id: str, timeout: int = 120):
# """Wait for user confirmation or auto-confirm after timeout."""
# event = asyncio.Event()
# pending_confirmations[task_id] = event
# try:
# await asyncio.wait_for(event.wait(), timeout=timeout)
# logging.info(f"Task {task_id} confirmed by user.")
# return True
# except asyncio.TimeoutError:
# logging.info(f"Task {task_id} auto-confirmed after {timeout} seconds.")
# return True
# finally:
# pending_confirmations.pop(task_id, None)
# # -------------------------------
# # API Endpoints
# # -------------------------------
# @app.post("/submit_idea")
# async def submit_idea(request: IdeaRequest):
# task_id = str(uuid.uuid4())
# logging.info(f"Received idea: {request.idea} | Task ID: {task_id}")
# # Push task to queue
# await queue_manager.enqueue({
# "task_id": task_id,
# "idea": request.idea
# })
# # Start confirmation wait in background
# asyncio.create_task(wait_for_confirmation(task_id))
# return {"status": "submitted", "task_id": task_id, "message": "Idea received, waiting for confirmation."}
# @app.post("/confirm")
# async def confirm_task(request: ConfirmationRequest):
# task_id = request.task_id
# if task_id not in pending_confirmations:
# raise HTTPException(status_code=404, detail="Task not pending confirmation or already confirmed.")
# if request.confirm:
# pending_confirmations[task_id].set()
# return {"status": "confirmed", "task_id": task_id}
# else:
# return {"status": "rejected", "task_id": task_id}
# @app.get("/")
# async def health_check():
# return {"status": "running"}
# # -------------------------------
# # Startup / Shutdown events
# # -------------------------------
# @app.on_event("startup")
# async def startup_event():
# logging.info("Server starting up...")
# @app.on_event("shutdown")
# async def shutdown_event():
# logging.info("Server shutting down...")
# # best
# # server.py
# import uuid
# import asyncio
# from fastapi import FastAPI, HTTPException
# from pydantic import BaseModel
# from services import queue_manager as queue_manager # βœ… import your actual queue module
# import logging
# from fastapi.middleware.cors import CORSMiddleware
# # -------------------------------
# # Setup logging
# # -------------------------------
# logging.basicConfig(
# level=logging.INFO,
# format="%(asctime)s [%(levelname)s] %(message)s"
# )
# # -------------------------------
# # FastAPI app
# # -------------------------------
# app = FastAPI(title="AI ADD Generator Server", version="1.0")
# # Enable CORS for local testing
# app.add_middleware(
# CORSMiddleware,
# allow_origins=["*"], # Allow requests from anywhere (for testing)
# allow_credentials=True,
# allow_methods=["*"], # Allow all HTTP methods
# allow_headers=["*"], # Allow all headers
# )
# # -------------------------------
# # Pydantic models
# # -------------------------------
# class IdeaRequest(BaseModel):
# idea: str
# class ConfirmationRequest(BaseModel):
# task_id: str
# confirm: bool
# # -------------------------------
# # In-memory confirmation tracker
# # -------------------------------
# pending_confirmations = {} # task_id -> asyncio.Event
# # -------------------------------
# # Helper function for confirmation
# # -------------------------------
# async def wait_for_confirmation(task_id: str, timeout: int = 120):
# """Wait for user confirmation or auto-confirm after timeout."""
# event = asyncio.Event()
# pending_confirmations[task_id] = event
# try:
# await asyncio.wait_for(event.wait(), timeout=timeout)
# logging.info(f"βœ… Task {task_id} confirmed by user.")
# await queue_manager.confirm_task(task_id)
# return True
# except asyncio.TimeoutError:
# logging.info(f"βŒ› Task {task_id} auto-confirmed after {timeout}s.")
# await queue_manager.confirm_task(task_id)
# return True
# finally:
# pending_confirmations.pop(task_id, None)
# # -------------------------------
# # API Endpoints
# # -------------------------------
# @app.post("/submit_idea")
# async def submit_idea(request: IdeaRequest):
# """Receives a new ad idea and enqueues it."""
# task_id = await queue_manager.add_task(request.idea)
# logging.info(f"πŸ’‘ New idea received | Task ID: {task_id}")
# # Start confirmation listener
# asyncio.create_task(wait_for_confirmation(task_id))
# return {
# "status": "submitted",
# "task_id": task_id,
# "message": "Idea received. Waiting for user confirmation after script generation."
# }
# @app.post("/confirm")
# async def confirm_task(request: ConfirmationRequest):
# """Confirms a paused task and continues the pipeline."""
# task_id = request.task_id
# if task_id not in pending_confirmations:
# raise HTTPException(status_code=404, detail="Task not pending confirmation or already confirmed.")
# if request.confirm:
# pending_confirmations[task_id].set()
# return {"status": "confirmed", "task_id": task_id}
# else:
# return {"status": "rejected", "task_id": task_id}
# @app.get("/status/{task_id}")
# async def get_status(task_id: str):
# """Check the current status of a task."""
# status = queue_manager.get_task_status(task_id)
# if not status:
# raise HTTPException(status_code=404, detail="Task not found.")
# return status
# @app.get("/")
# async def health_check():
# return {"status": "running", "message": "AI ADD Generator is live."}
# # -------------------------------
# # Startup / Shutdown events
# # -------------------------------
# @app.on_event("startup")
# async def startup_event():
# logging.info("πŸš€ Server starting up...")
# queue_manager.start_worker() # βœ… Start async worker loop
# @app.on_event("shutdown")
# async def shutdown_event():
# logging.info("πŸ›‘ Server shutting down...")
import uuid
import asyncio
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from services import queue_manager # βœ… import your actual queue module
import logging
from fastapi.middleware.cors import CORSMiddleware
# -------------------------------
# Setup logging
# -------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
# -------------------------------
# FastAPI app
# -------------------------------
app = FastAPI(title="AI ADD Generator Server", version="1.0")
# Enable CORS for local testing
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# -------------------------------
# Pydantic models
# -------------------------------
class IdeaRequest(BaseModel):
idea: str
class ConfirmationRequest(BaseModel):
task_id: str
confirm: bool
# -------------------------------
# In-memory confirmation tracker
# -------------------------------
pending_confirmations = {} # task_id -> asyncio.Event
script_results = {} # task_id -> generated script for confirmation
# -------------------------------
# API Endpoints
# -------------------------------
@app.post("/submit_idea")
async def submit_idea(request: IdeaRequest):
"""Receives a new ad idea and enqueues it."""
task_id = await queue_manager.add_task(request.idea)
logging.info(f"πŸ’‘ New idea received | Task ID: {task_id}")
# Start worker listener
asyncio.create_task(queue_manager.wait_for_script(task_id, script_results))
return {
"status": "submitted",
"task_id": task_id,
"message": "Idea received. Script will be generated shortly.",
}
@app.post("/confirm")
async def confirm_task(request: ConfirmationRequest):
"""Confirms a paused task, generates story, and returns full JSON."""
task_id = request.task_id
task = queue_manager.get_task_status(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found.")
if task["status"] != queue_manager.TaskStatus.WAITING_CONFIRMATION:
raise HTTPException(status_code=400, detail="Task not waiting for confirmation.")
if request.confirm:
# Confirm task
await queue_manager.confirm_task(task_id)
logging.info(f"βœ… Task {task_id} confirmed by user.")
# Generate story immediately
script_result = task["result"]["script"]
story_result = await queue_manager.generate_story_after_confirm(script_result)
task["result"]["story_script"] = story_result
task["status"] = queue_manager.TaskStatus.COMPLETED
logging.info(f"🎬 Task {task_id} story generated and task completed.")
return {"status": "completed", "task": task}
else:
task["status"] = queue_manager.TaskStatus.FAILED
return {"status": "rejected", "task_id": task_id}
@app.get("/status/{task_id}")
async def get_status(task_id: str):
"""Check the current status of a task."""
task = queue_manager.get_task_status(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found.")
# If waiting confirmation, return script only
if task["status"] == queue_manager.TaskStatus.WAITING_CONFIRMATION:
return {"status": task["status"], "script": task["result"]["script"]}
return task
@app.get("/")
async def health_check():
return {"status": "running", "message": "AI ADD Generator is live."}
# -------------------------------
# Startup / Shutdown events
# -------------------------------
@app.on_event("startup")
async def startup_event():
logging.info("πŸš€ Server starting up...")
queue_manager.start_worker()
@app.on_event("shutdown")
async def shutdown_event():
logging.info("πŸ›‘ Server shutting down...")