| """Load balancing module for Flow2API""" |
| import random |
| from typing import Optional |
| from ..core.models import Token |
| from .concurrency_manager import ConcurrencyManager |
| from ..core.logger import debug_logger |
|
|
|
|
| class LoadBalancer: |
| """Token load balancer with random selection""" |
|
|
| def __init__(self, token_manager, concurrency_manager: Optional[ConcurrencyManager] = None): |
| self.token_manager = token_manager |
| self.concurrency_manager = concurrency_manager |
|
|
| async def select_token( |
| self, |
| for_image_generation: bool = False, |
| for_video_generation: bool = False, |
| model: Optional[str] = None |
| ) -> Optional[Token]: |
| """ |
| Select a token using random load balancing |
| |
| Args: |
| for_image_generation: If True, only select tokens with image_enabled=True |
| for_video_generation: If True, only select tokens with video_enabled=True |
| model: Model name (used to filter tokens for specific models) |
| |
| Returns: |
| Selected token or None if no available tokens |
| """ |
| debug_logger.log_info(f"[LOAD_BALANCER] 开始选择Token (图片生成={for_image_generation}, 视频生成={for_video_generation}, 模型={model})") |
|
|
| active_tokens = await self.token_manager.get_active_tokens() |
| debug_logger.log_info(f"[LOAD_BALANCER] 获取到 {len(active_tokens)} 个活跃Token") |
|
|
| if not active_tokens: |
| debug_logger.log_info(f"[LOAD_BALANCER] ❌ 没有活跃的Token") |
| return None |
|
|
| |
| available_tokens = [] |
| filtered_reasons = {} |
|
|
| for token in active_tokens: |
| |
| if not await self.token_manager.is_at_valid(token.id): |
| filtered_reasons[token.id] = "AT无效或已过期" |
| continue |
|
|
| |
| if for_image_generation: |
| if not token.image_enabled: |
| filtered_reasons[token.id] = "图片生成已禁用" |
| continue |
|
|
| |
| if self.concurrency_manager and not await self.concurrency_manager.can_use_image(token.id): |
| filtered_reasons[token.id] = "图片并发已满" |
| continue |
|
|
| |
| if for_video_generation: |
| if not token.video_enabled: |
| filtered_reasons[token.id] = "视频生成已禁用" |
| continue |
|
|
| |
| if self.concurrency_manager and not await self.concurrency_manager.can_use_video(token.id): |
| filtered_reasons[token.id] = "视频并发已满" |
| continue |
|
|
| available_tokens.append(token) |
|
|
| |
| if filtered_reasons: |
| debug_logger.log_info(f"[LOAD_BALANCER] 已过滤Token:") |
| for token_id, reason in filtered_reasons.items(): |
| debug_logger.log_info(f"[LOAD_BALANCER] - Token {token_id}: {reason}") |
|
|
| if not available_tokens: |
| debug_logger.log_info(f"[LOAD_BALANCER] ❌ 没有可用的Token (图片生成={for_image_generation}, 视频生成={for_video_generation})") |
| return None |
|
|
| |
| selected = random.choice(available_tokens) |
| debug_logger.log_info(f"[LOAD_BALANCER] ✅ 已选择Token {selected.id} ({selected.email}) - 余额: {selected.credits}") |
| return selected |
|
|