|
|
import asyncio |
|
|
import time |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Optional, Callable, Dict, Any |
|
|
from services.logging_service import get_logger |
|
|
|
|
|
|
|
|
class LongRunningTaskMonitor: |
|
|
""" |
|
|
长时间运行任务监控器,用于在MCP工具执行期间定期发送心跳 |
|
|
""" |
|
|
|
|
|
def __init__(self, heartbeat_interval: int = 300): |
|
|
self.heartbeat_interval = heartbeat_interval |
|
|
self.logger = get_logger() |
|
|
self.active_tasks: Dict[str, Dict[str, Any]] = {} |
|
|
|
|
|
def start_monitoring(self, task_id: str, task_name: str, chat_id: Optional[str] = None, |
|
|
heartbeat_callback: Optional[Callable] = None): |
|
|
""" |
|
|
开始监控一个长时间运行的任务 |
|
|
|
|
|
Args: |
|
|
task_id: 任务唯一标识 |
|
|
task_name: 任务名称 |
|
|
chat_id: 聊天ID |
|
|
heartbeat_callback: 心跳回调函数 |
|
|
""" |
|
|
self.active_tasks[task_id] = { |
|
|
'task_name': task_name, |
|
|
'chat_id': chat_id, |
|
|
'start_time': time.time(), |
|
|
'heartbeat_callback': heartbeat_callback, |
|
|
'last_heartbeat': time.time(), |
|
|
'heartbeat_count': 0 |
|
|
} |
|
|
|
|
|
self.logger.log_system_status( |
|
|
f"Started monitoring long-running task: {task_name}", |
|
|
{'task_id': task_id, 'chat_id': chat_id} |
|
|
) |
|
|
|
|
|
def stop_monitoring(self, task_id: str): |
|
|
""" |
|
|
停止监控一个任务 |
|
|
|
|
|
Args: |
|
|
task_id: 任务唯一标识 |
|
|
""" |
|
|
if task_id in self.active_tasks: |
|
|
task_info = self.active_tasks[task_id] |
|
|
duration = time.time() - task_info['start_time'] |
|
|
|
|
|
self.logger.log_long_running_task( |
|
|
task_info['task_name'], |
|
|
duration, |
|
|
task_info['chat_id'] |
|
|
) |
|
|
|
|
|
del self.active_tasks[task_id] |
|
|
|
|
|
async def send_heartbeat(self, task_id: str): |
|
|
""" |
|
|
发送心跳信号 |
|
|
|
|
|
Args: |
|
|
task_id: 任务唯一标识 |
|
|
""" |
|
|
if task_id not in self.active_tasks: |
|
|
return |
|
|
|
|
|
task_info = self.active_tasks[task_id] |
|
|
current_time = time.time() |
|
|
|
|
|
|
|
|
if current_time - task_info['last_heartbeat'] >= self.heartbeat_interval: |
|
|
task_info['last_heartbeat'] = current_time |
|
|
task_info['heartbeat_count'] += 1 |
|
|
|
|
|
duration = current_time - task_info['start_time'] |
|
|
|
|
|
|
|
|
self.logger.log_system_status( |
|
|
f"Heartbeat for long-running task: {task_info['task_name']}", |
|
|
{ |
|
|
'task_id': task_id, |
|
|
'chat_id': task_info['chat_id'], |
|
|
'duration_seconds': duration, |
|
|
'heartbeat_count': task_info['heartbeat_count'] |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
if task_info['heartbeat_callback']: |
|
|
try: |
|
|
await task_info['heartbeat_callback'](task_id, task_info) |
|
|
except Exception as e: |
|
|
self.logger.log_error( |
|
|
"HeartbeatCallbackError", |
|
|
str(e), |
|
|
{'task_id': task_id, 'task_name': task_info['task_name']} |
|
|
) |
|
|
|
|
|
async def monitor_all_tasks(self): |
|
|
""" |
|
|
监控所有活跃任务并发送心跳 |
|
|
""" |
|
|
while True: |
|
|
try: |
|
|
|
|
|
for task_id in list(self.active_tasks.keys()): |
|
|
await self.send_heartbeat(task_id) |
|
|
|
|
|
|
|
|
await asyncio.sleep(60) |
|
|
|
|
|
except Exception as e: |
|
|
self.logger.log_error( |
|
|
"TaskMonitorError", |
|
|
str(e), |
|
|
{'active_tasks_count': len(self.active_tasks)} |
|
|
) |
|
|
await asyncio.sleep(60) |
|
|
|
|
|
def get_active_tasks_info(self) -> Dict[str, Dict[str, Any]]: |
|
|
""" |
|
|
获取所有活跃任务的信息 |
|
|
|
|
|
Returns: |
|
|
活跃任务信息字典 |
|
|
""" |
|
|
result = {} |
|
|
current_time = time.time() |
|
|
|
|
|
for task_id, task_info in self.active_tasks.items(): |
|
|
duration = current_time - task_info['start_time'] |
|
|
result[task_id] = { |
|
|
'task_name': task_info['task_name'], |
|
|
'chat_id': task_info['chat_id'], |
|
|
'duration_seconds': duration, |
|
|
'heartbeat_count': task_info['heartbeat_count'], |
|
|
'last_heartbeat_seconds_ago': current_time - task_info['last_heartbeat'] |
|
|
} |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
task_monitor = LongRunningTaskMonitor() |
|
|
|
|
|
|
|
|
def get_task_monitor(): |
|
|
"""获取全局任务监控器""" |
|
|
return task_monitor |
|
|
|
|
|
|
|
|
async def start_task_monitoring(): |
|
|
"""启动任务监控""" |
|
|
monitor = get_task_monitor() |
|
|
await monitor.monitor_all_tasks() |