File size: 2,411 Bytes
cafc0bb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""
Request queue management for the Loomi Clothing Detection API.
"""
import asyncio
import logging
from typing import Any, Callable
from config import config

logger = logging.getLogger(__name__)

class RequestQueue:
    """Manages background processing of heavy API requests."""
    
    def __init__(self):
        self.queue = asyncio.Queue()
        self.processing = False
        self.workers = []
    
    async def start_workers(self, num_workers: int = None):
        """Start background workers."""
        if num_workers is None:
            num_workers = config.num_workers
            
        for i in range(num_workers):
            worker = asyncio.create_task(self._worker(f"worker-{i}"))
            self.workers.append(worker)
        logger.info(f"Started {num_workers} background workers")
    
    async def _worker(self, name: str):
        """Background worker for processing requests."""
        logger.info(f"Worker {name} started")
        while True:
            try:
                task = await self.queue.get()
                if task is None:  # Shutdown signal
                    break
                
                user_id, endpoint, process_func, args, future = task
                try:
                    # Process the request
                    result = await process_func(*args)
                    future.set_result(result)
                except Exception as e:
                    future.set_exception(e)
                finally:
                    self.queue.task_done()
                    
            except Exception as e:
                logger.error(f"Worker {name} error: {e}")
    
    async def submit_task(self, user_id: str, endpoint: str, process_func: Callable, *args) -> Any:
        """Submit a task to the queue."""
        future = asyncio.Future()
        await self.queue.put((user_id, endpoint, process_func, args, future))
        return await future
    
    async def shutdown(self):
        """Shutdown workers."""
        for _ in self.workers:
            await self.queue.put(None)
        await asyncio.gather(*self.workers, return_exceptions=True)
    
    def get_status(self) -> dict:
        """Get current queue status for health checks."""
        return {
            "queue_size": self.queue.qsize(),
            "active_workers": len(self.workers)
        }

# Global request queue instance
request_queue = RequestQueue()