admaker / services /queue_manager.py
karthikeya1212's picture
Update services/queue_manager.py
756c289 verified
raw
history blame
2.07 kB
import asyncio
import uuid
from typing import Dict, Any, Optional
from enum import Enum
from pipeline.pipeline import run_pipeline
# ---------------------------
# Task status enum
# ---------------------------
class TaskStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
WAITING_CONFIRMATION = "waiting_for_confirmation"
CONFIRMED = "confirmed"
COMPLETED = "completed"
FAILED = "failed"
# ---------------------------
# Globals
# ---------------------------
tasks: Dict[str, Dict[str, Any]] = {}
task_queue = asyncio.Queue()
pending_confirmations: Dict[str, asyncio.Event] = {}
# ---------------------------
# Add task
# ---------------------------
async def add_task(idea: str) -> str:
task_id = str(uuid.uuid4())
confirmation_event = asyncio.Event()
tasks[task_id] = {
"id": task_id,
"idea": idea,
"status": TaskStatus.PENDING,
"result": {},
"confirmation_required": False
}
pending_confirmations[task_id] = confirmation_event
await task_queue.put(task_id)
# Start the pipeline immediately in background
asyncio.create_task(run_pipeline(tasks[task_id], confirmation_event))
print(f"🧩 Task added and pipeline started: {task_id}")
return task_id
# ---------------------------
# Confirm task
# ---------------------------
async def confirm_task(task_id: str):
task = tasks.get(task_id)
if not task:
return {"error": "Invalid task ID"}
if task["status"] != TaskStatus.WAITING_CONFIRMATION:
return {"error": "Task is not waiting for confirmation"}
event = pending_confirmations.get(task_id)
if event:
event.set()
return {"message": f"Task {task_id} confirmed"}
# ---------------------------
# Get task status
# ---------------------------
def get_task_status(task_id: str) -> Optional[Dict[str, Any]]:
return tasks.get(task_id)
# ---------------------------
# Worker (optional)
# ---------------------------
def start_worker():
print("⚙️ Worker loop not required, pipeline runs per task")