Spaces:
Sleeping
Sleeping
File size: 2,446 Bytes
4fc93b8 e8d43fa 4fc93b8 fda4b67 4fc93b8 e8d43fa 4fc93b8 e8d43fa 4fc93b8 e8d43fa 4fc93b8 e8d43fa 4fc93b8 e8d43fa 4fc93b8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | import logging
from typing import Optional, Any, Dict
import json
import redis.asyncio as redis
from app.core.config import get_settings
logger = logging.getLogger(__name__)
class QueueService:
"""
Queue service for managing asynchronous analysis tasks.
Currently uses in-memory queue, can be extended to use Redis.
"""
def __init__(self):
"""Initialize the queue service."""
self.settings = get_settings()
self.redis_client = None
if self.settings.REDIS_ENABLED:
self._initialize_redis()
def _initialize_redis(self):
try:
self.redis_client = redis.from_url(
self.settings.REDIS_URL, decode_responses=True
)
logger.info(
f"Redis queue service initialized successfully: {self.settings.REDIS_URL}"
)
except Exception as e:
logger.error(f"Failed to initialize Redis client: {e}")
self.redis_client = None
async def enqueue_analysis(
self,
file_url: str,
model: str,
task_id: str,
) -> bool:
"""
Enqueue an analysis task (future background worker implementation).
"""
task_data = {
"task_id": task_id,
"file_url": file_url,
"model": model,
}
logger.info(f"Enqueuing analysis task: {task_id}")
if self.settings.REDIS_ENABLED and self.redis_client:
await self.redis_client.lpush(
self.settings.REDIS_QUEUE_NAME,
json.dumps(task_data)
)
return True
return False
async def get_task_result(self, task_id: str) -> Optional[Dict[str, Any]]:
"""
Get analysis result for a task.
"""
logger.info(f"Retrieving result for task: {task_id}")
if self.settings.REDIS_ENABLED and self.redis_client:
result = await self.redis_client.get(f"result:{task_id}")
return json.loads(result) if result else None
return None
# Singleton instance
_queue_service: Optional[QueueService] = None
def get_queue_service() -> QueueService:
"""Get or create the queue service singleton."""
global _queue_service
if _queue_service is None:
_queue_service = QueueService()
return _queue_service
|