google / src /services /load_balancer.py
Admin
鏁村悎sora2api
22a3c56
"""Load balancing module"""
import random
from typing import Optional
from ..core.models import Token
from ..core.config import config
from .token_manager import TokenManager
from .token_lock import TokenLock
class LoadBalancer:
"""Token load balancer with random selection and image generation lock"""
def __init__(self, token_manager: TokenManager):
self.token_manager = token_manager
# Use image timeout from config as lock timeout
self.token_lock = TokenLock(lock_timeout=config.image_timeout)
async def select_token(self, for_image_generation: bool = False, for_video_generation: bool = False) -> Optional[Token]:
"""
Select a token using random load balancing
Args:
for_image_generation: If True, only select tokens that are not locked for image generation and have image_enabled=True
for_video_generation: If True, filter out tokens with Sora2 quota exhausted (sora2_cooldown_until not expired), tokens that don't support Sora2, and tokens with video_enabled=False
Returns:
Selected token or None if no available tokens
"""
# Try to auto-refresh tokens expiring within 24 hours if enabled
if config.at_auto_refresh_enabled:
all_tokens = await self.token_manager.get_all_tokens()
for token in all_tokens:
if token.is_active and token.expiry_time:
from datetime import datetime
time_until_expiry = token.expiry_time - datetime.now()
hours_until_expiry = time_until_expiry.total_seconds() / 3600
# Refresh if expiry is within 24 hours
if hours_until_expiry <= 24:
await self.token_manager.auto_refresh_expiring_token(token.id)
active_tokens = await self.token_manager.get_active_tokens()
if not active_tokens:
return None
# If for video generation, filter out tokens with Sora2 quota exhausted and tokens without Sora2 support
if for_video_generation:
from datetime import datetime
available_tokens = []
for token in active_tokens:
# Skip tokens that don't have video enabled
if not token.video_enabled:
continue
# Skip tokens that don't support Sora2
if not token.sora2_supported:
continue
# Check if Sora2 cooldown has expired and refresh if needed
if token.sora2_cooldown_until and token.sora2_cooldown_until <= datetime.now():
await self.token_manager.refresh_sora2_remaining_if_cooldown_expired(token.id)
# Reload token data after refresh
token = await self.token_manager.db.get_token(token.id)
# Skip tokens that are in Sora2 cooldown (quota exhausted)
if token and token.sora2_cooldown_until and token.sora2_cooldown_until > datetime.now():
continue
if token:
available_tokens.append(token)
if not available_tokens:
return None
active_tokens = available_tokens
# If for image generation, filter out locked tokens and tokens without image enabled
if for_image_generation:
available_tokens = []
for token in active_tokens:
# Skip tokens that don't have image enabled
if not token.image_enabled:
continue
if not await self.token_lock.is_locked(token.id):
available_tokens.append(token)
if not available_tokens:
return None
# Random selection from available tokens
return random.choice(available_tokens)
else:
# For video generation, no lock needed
return random.choice(active_tokens)