Spaces:
Sleeping
Sleeping
File size: 2,411 Bytes
cafc0bb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
"""
Request queue management for the Loomi Clothing Detection API.
"""
import asyncio
import logging
from typing import Any, Callable
from config import config
logger = logging.getLogger(__name__)
class RequestQueue:
"""Manages background processing of heavy API requests."""
def __init__(self):
self.queue = asyncio.Queue()
self.processing = False
self.workers = []
async def start_workers(self, num_workers: int = None):
"""Start background workers."""
if num_workers is None:
num_workers = config.num_workers
for i in range(num_workers):
worker = asyncio.create_task(self._worker(f"worker-{i}"))
self.workers.append(worker)
logger.info(f"Started {num_workers} background workers")
async def _worker(self, name: str):
"""Background worker for processing requests."""
logger.info(f"Worker {name} started")
while True:
try:
task = await self.queue.get()
if task is None: # Shutdown signal
break
user_id, endpoint, process_func, args, future = task
try:
# Process the request
result = await process_func(*args)
future.set_result(result)
except Exception as e:
future.set_exception(e)
finally:
self.queue.task_done()
except Exception as e:
logger.error(f"Worker {name} error: {e}")
async def submit_task(self, user_id: str, endpoint: str, process_func: Callable, *args) -> Any:
"""Submit a task to the queue."""
future = asyncio.Future()
await self.queue.put((user_id, endpoint, process_func, args, future))
return await future
async def shutdown(self):
"""Shutdown workers."""
for _ in self.workers:
await self.queue.put(None)
await asyncio.gather(*self.workers, return_exceptions=True)
def get_status(self) -> dict:
"""Get current queue status for health checks."""
return {
"queue_size": self.queue.qsize(),
"active_workers": len(self.workers)
}
# Global request queue instance
request_queue = RequestQueue()
|