Spaces:
Sleeping
Sleeping
| import asyncio | |
| import uuid | |
| from typing import Dict, Any, Optional | |
| from enum import Enum | |
| from pipeline.pipeline import run_pipeline | |
| # --------------------------- | |
| # Task status enum | |
| # --------------------------- | |
| class TaskStatus(str, Enum): | |
| PENDING = "pending" | |
| RUNNING = "running" | |
| WAITING_CONFIRMATION = "waiting_for_confirmation" | |
| CONFIRMED = "confirmed" | |
| COMPLETED = "completed" | |
| FAILED = "failed" | |
| # --------------------------- | |
| # Globals | |
| # --------------------------- | |
| tasks: Dict[str, Dict[str, Any]] = {} | |
| task_queue = asyncio.Queue() | |
| pending_confirmations: Dict[str, asyncio.Event] = {} | |
| # --------------------------- | |
| # Add task | |
| # --------------------------- | |
| async def add_task(idea: str) -> str: | |
| task_id = str(uuid.uuid4()) | |
| confirmation_event = asyncio.Event() | |
| tasks[task_id] = { | |
| "id": task_id, | |
| "idea": idea, | |
| "status": TaskStatus.PENDING, | |
| "result": {}, | |
| "confirmation_required": False | |
| } | |
| pending_confirmations[task_id] = confirmation_event | |
| await task_queue.put(task_id) | |
| # Start the pipeline immediately in background | |
| asyncio.create_task(run_pipeline(tasks[task_id], confirmation_event)) | |
| print(f"🧩 Task added and pipeline started: {task_id}") | |
| return task_id | |
| # --------------------------- | |
| # Confirm task | |
| # --------------------------- | |
| async def confirm_task(task_id: str): | |
| task = tasks.get(task_id) | |
| if not task: | |
| return {"error": "Invalid task ID"} | |
| if task["status"] != TaskStatus.WAITING_CONFIRMATION: | |
| return {"error": "Task is not waiting for confirmation"} | |
| event = pending_confirmations.get(task_id) | |
| if event: | |
| event.set() | |
| return {"message": f"Task {task_id} confirmed"} | |
| # --------------------------- | |
| # Get task status | |
| # --------------------------- | |
| def get_task_status(task_id: str) -> Optional[Dict[str, Any]]: | |
| return tasks.get(task_id) | |
| # --------------------------- | |
| # Worker (optional) | |
| # --------------------------- | |
| def start_worker(): | |
| print("⚙️ Worker loop not required, pipeline runs per task") | |