import queue import threading import time from dataclasses import dataclass, field from typing import Dict, List, Any, Optional import uuid @dataclass class QueueItem: """代表队列中的一个评估请求""" id: str # 唯一标识符 input_data: Dict[str, Any] # 输入数据 status: str = "queued" # 状态:queued, processing, completed, error result: Optional[Dict[str, Any]] = None # 评估结果 created_at: float = field(default_factory=time.time) # 创建时间 started_at: Optional[float] = None # 开始处理时间 completed_at: Optional[float] = None # 完成时间 class QueueManager: """管理代码评估请求队列""" def __init__(self): self.queue = queue.Queue() # 请求队列 self.queue_items = {} # 存储所有队列项,以ID为键 self.processing_thread = None # 处理线程 self.running = False # 是否正在运行 self.evaluator = None # 评估器,将在开始时设置 def set_evaluator(self, evaluator_func): """设置评估函数 Args: evaluator_func: 评估函数,接受输入数据并返回结果 """ self.evaluator = evaluator_func def enqueue(self, input_data): """添加新的评估请求到队列 Args: input_data: 输入数据,包含要评估的代码 Returns: str: 请求的唯一ID """ item_id = str(uuid.uuid4()) queue_item = QueueItem(id=item_id, input_data=input_data) self.queue_items[item_id] = queue_item self.queue.put(item_id) # 如果处理线程未运行,则启动它 if not self.running: self.start_processing() return item_id def start_processing(self): """启动队列处理线程""" if self.processing_thread is None or not self.processing_thread.is_alive(): self.running = True self.processing_thread = threading.Thread(target=self._process_queue) self.processing_thread.daemon = True self.processing_thread.start() def stop_processing(self): """停止队列处理""" self.running = False if self.processing_thread and self.processing_thread.is_alive(): self.processing_thread.join(timeout=1.0) def _process_queue(self): """处理队列中的请求(在单独的线程中运行)""" while self.running: try: # 尝试从队列获取一个项目,如果队列为空,等待1秒后重试 try: item_id = self.queue.get(timeout=1.0) except queue.Empty: continue # 获取队列项并更新状态 queue_item = self.queue_items[item_id] queue_item.status = "processing" queue_item.started_at = time.time() # 处理请求 try: if self.evaluator: result = self.evaluator(queue_item.input_data) queue_item.result = result queue_item.status = "completed" else: queue_item.status = "error" queue_item.result = {"status": "Error", "error": "No evaluator function set"} except Exception as e: queue_item.status = "error" queue_item.result = {"status": "Exception", "error": str(e)} # 更新完成时间 queue_item.completed_at = time.time() # 标记任务为完成 self.queue.task_done() except Exception as e: print(f"Error in queue processing: {e}") time.sleep(1) # 防止过度CPU使用 def get_queue_status(self): """获取队列状态信息 Returns: dict: 包含队列状态的字典 """ queued = 0 processing = 0 completed = 0 error = 0 # 统计各状态的数量 for item in self.queue_items.values(): if item.status == "queued": queued += 1 elif item.status == "processing": processing += 1 elif item.status == "completed": completed += 1 elif item.status == "error": error += 1 # 获取队列中的项目,仅包括等待和处理中的 queue_items = [] for item in self.queue_items.values(): if item.status in ["queued", "processing"]: queue_items.append({ "id": item.id, "status": item.status, "created_at": item.created_at, "started_at": item.started_at, }) # 按创建时间排序 queue_items.sort(key=lambda x: x["created_at"]) return { "stats": { "queued": queued, "processing": processing, "completed": completed, "error": error, "total": len(self.queue_items) }, "queue_items": queue_items } def get_result(self, item_id): """获取特定请求的结果 Args: item_id: 请求的唯一ID Returns: dict: 请求的状态和结果 """ if item_id in self.queue_items: item = self.queue_items[item_id] return { "id": item.id, "status": item.status, "result": item.result, "created_at": item.created_at, "started_at": item.started_at, "completed_at": item.completed_at } return {"status": "not_found", "error": "Request ID not found"} def clean_completed(self, max_age=3600): """清理完成的请求 Args: max_age: 最大保留时间(秒),默认为1小时 """ current_time = time.time() to_remove = [] for item_id, item in self.queue_items.items(): if item.status in ["completed", "error"]: if item.completed_at and (current_time - item.completed_at) > max_age: to_remove.append(item_id) for item_id in to_remove: del self.queue_items[item_id]