File size: 7,612 Bytes
99fc92f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8b166bc
 
99fc92f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""

API管理器 - 管理API请求的节流控制和队列调度

提供API调用的桥接功能和请求限流控制

"""
import time
import threading
import importlib
from typing import Any, Callable, Dict, List, Tuple, Union
import config

class ApiManager:
    """

    API管理器类 - 负责处理API请求的节流控制和队列管理

    

    为每个平台实现两个队列:

    1. 完成队列: 记录发起请求的时间戳

    2. 等待队列: 记录等待执行的请求

    

    定时任务每秒执行一次,清理完成队列中的过期请求,

    并处理等待队列中的请求(如果有空位)

    """
    def __init__(self):
        # 从config导入节流控制配置
        self.platform_limits = config.PLATFORM_LIMITS
        self.time_window = config.TIME_WINDOW
        
        # 初始化平台队列
        self.completed_queues = {platform: [] for platform in self.platform_limits}
        self.waiting_queues = {platform: [] for platform in self.platform_limits}
        
        # 线程安全锁
        self.locks = {platform: threading.Lock() for platform in self.platform_limits}
        
        # 启动调度器
        self._scheduler_active = True
        self._start_scheduler()
    
    def _start_scheduler(self):
        """启动定时调度器,每秒执行一次队列管理"""
        self._process_queues()
        if self._scheduler_active:
            # 每秒执行一次
            threading.Timer(1.0, self._start_scheduler).start()
    
    def _process_queues(self):
        """处理所有平台的队列"""
        current_time = time.time()
        
        for platform in self.platform_limits:
            with self.locks[platform]:
                # 1. 移除已超过时间窗口的完成请求
                self._clean_completed_queue(platform, current_time)
                
                # 2. 处理等待队列中的请求(如果有空位)
                self._process_waiting_queue(platform)
    
    def _clean_completed_queue(self, platform: str, current_time: float):
        """清理完成队列中超时的请求"""
        # 移除已经超过时间窗口的请求
        self.completed_queues[platform] = [
            timestamp for timestamp in self.completed_queues[platform]
            if current_time - timestamp < self.time_window
        ]
    
    def _process_waiting_queue(self, platform: str):
        """处理等待队列中的请求"""
        # 获取当前可用的请求数量
        available_slots = self._get_available_slots(platform)
        
        # 处理等待队列中的请求
        while available_slots > 0 and self.waiting_queues[platform]:
            # 获取等待队列头部的请求
            request_data = self.waiting_queues[platform].pop(0)
            request_func, args, kwargs, result_event, result_container = request_data
            
            # 在新线程中执行请求
            threading.Thread(
                target=self._execute_request,
                args=(platform, request_func, args, kwargs, result_event, result_container)
            ).start()
            
            # 减少可用槽位
            available_slots -= 1
    
    def _get_available_slots(self, platform: str) -> int:
        """计算平台当前可用的请求槽位数"""
        limit = self.platform_limits.get(platform, self.platform_limits['default'])
        return max(0, limit - len(self.completed_queues[platform]))
    
    def _execute_request(self, platform: str, request_func: Callable, 

                        args: Tuple, kwargs: Dict, 

                        result_event: threading.Event, 

                        result_container: List):
        """执行请求并存储结果"""
        try:
            # 立即记录请求时间(添加到完成队列)
            with self.locks[platform]:
                self.completed_queues[platform].append(time.time())
            
            # 执行请求
            result = request_func(*args, **kwargs)
            
            # 存储结果
            result_container.append(result)
        except Exception as e:
            # 存储异常
            result_container.append(e)
        finally:
            # 通知等待线程结果已准备好
            result_event.set()
    
    def execute(self, platform: str, method_name: str, *args, **kwargs):
        """

        执行API请求,处理节流控制

        

        Args:

            platform: API平台名称 (如 'openai', 'anthropic' 等)

            method_name: 要调用的方法名称 (如 'validate_api_key')

            *args, **kwargs: 传递给方法的参数

            

        Returns:

            方法的返回值

        """
        # 确保平台支持
        if platform not in self.platform_limits and platform != 'default':
            # 如果不是已知平台,则使用默认限制
            platform = 'default'
            
        # 导入相应平台的模块
        try:
            module = importlib.import_module(f"core.api.{platform}")
        except ImportError:
            raise ValueError(f"不支持的平台: {platform}")
        
        # 获取方法
        if not hasattr(module, method_name):
            raise ValueError(f"平台 {platform} 没有方法 {method_name}")
        
        method = getattr(module, method_name)
        
        # 创建结果容器和事件
        result_container = []
        result_event = threading.Event()
        
        # 请求函数
        request_func = lambda *a, **kw: method(*a, **kw)
        
        with self.locks[platform]:
            # 检查是否有可用槽位
            if len(self.completed_queues[platform]) < self.platform_limits.get(platform, self.platform_limits['default']):
                # 有槽位,立即执行
                threading.Thread(
                    target=self._execute_request,
                    args=(platform, request_func, args, kwargs, result_event, result_container)
                ).start()
            else:
                # 没有槽位,添加到等待队列
                self.waiting_queues[platform].append((request_func, args, kwargs, result_event, result_container))
        
        # 等待结果(同步阻塞)
        result_event.wait()
        
        # 获取结果
        if not result_container:
            raise RuntimeError("请求执行失败,没有结果")
        
        result = result_container[0]
        if isinstance(result, Exception):
            raise result
            
        return result
        
    def shutdown(self):
        """关闭调度器"""
        self._scheduler_active = False

# 全局API管理器实例
_api_manager = None

def get_api_manager():
    """

    获取全局API管理器实例

    

    Returns:

        ApiManager: API管理器实例

    """
    global _api_manager
    if _api_manager is None:
        _api_manager = ApiManager()
    return _api_manager

def start_service():
    """

    启动API管理服务

    

    Returns:

        list: 服务相关的线程列表

    """
    # 初始化API管理器
    api_manager = get_api_manager()
    
    # 此处可以添加其他需要启动的服务线程
    # 例如: 监控线程、日志线程等
    
    # 返回服务线程列表(目前API管理器的调度线程是内部管理的,所以返回空列表)
    return []