mikami / app /api /nonstream_handlers.py
mikamipan's picture
Upload 122 files
985f275 verified
import asyncio
from fastapi import HTTPException, status, Request
from app.models import ChatCompletionRequest
from app.services import GeminiClient
from app.utils import cache_response, update_api_call_stats,handle_api_error
from app.utils.logging import log
from .client_disconnect import check_client_disconnect, handle_client_disconnect
from .gemini_handlers import run_gemini_completion
import app.config.settings as settings
import random
from typing import Literal
# 非流式请求处理函数
async def process_nonstream_request(
chat_request: ChatCompletionRequest,
http_request: Request,
request_type: str,
contents,
system_instruction,
current_api_key: str,
response_cache_manager,
active_requests_manager,
safety_settings,
safety_settings_g2,
api_call_stats,
cache_key: str = None,
client_ip: str = None
):
"""处理非流式API请求"""
gemini_client = GeminiClient(current_api_key)
# 创建任务
gemini_task = asyncio.create_task(
run_gemini_completion(
gemini_client,
chat_request,
contents,
system_instruction,
request_type,
current_api_key,
safety_settings,
safety_settings_g2
)
)
disconnect_task = asyncio.create_task(
check_client_disconnect(
http_request,
current_api_key,
request_type,
chat_request.model
)
)
try:
# 先等待看是否API任务先完成,或者客户端先断开连接
done, pending = await asyncio.wait(
[gemini_task, disconnect_task],
return_when=asyncio.FIRST_COMPLETED
)
if disconnect_task in done:
# 客户端已断开连接,但我们仍继续完成API请求以便缓存结果
result = await handle_client_disconnect(
gemini_task,
chat_request,
request_type,
current_api_key,
response_cache_manager,
cache_key,
client_ip,
chat_request.model,
current_api_key
)
return (result, "success" if result else "empty")
else:
# API任务先完成,取消断开检测任务
disconnect_task.cancel()
# 获取响应内容
response_content = await gemini_task
# 检查响应内容是否为空
if not response_content or not response_content.text:
log('warning', f"非流式请求: API密钥 {current_api_key[:8]}... 返回空响应",
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
return (None, "empty")
# 检查缓存是否已经存在,如果存在则不再创建新缓存
cached_response, cache_hit = response_cache_manager.get(cache_key)
if cache_hit:
log('info', f"缓存已存在,直接返回: {cache_key[:8]}...",
extra={'cache_operation': 'use-existing', 'request_type': request_type})
# 安全删除缓存
if cache_key in response_cache_manager.cache:
del response_cache_manager.cache[cache_key]
log('info', f"缓存使用后已删除: {cache_key[:8]}...",
extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
return (cached_response, "success")
# 创建响应
from app.utils.response import create_response
response = create_response(chat_request, response_content)
update_api_call_stats(settings.api_call_stats, endpoint=current_api_key, model=chat_request.model)
# 缓存响应
cache_response(response, cache_key, client_ip, response_cache_manager, endpoint=current_api_key,model=chat_request.model)
# 立即删除缓存,确保只能使用一次
if cache_key and cache_key in response_cache_manager.cache:
del response_cache_manager.cache[cache_key]
log('info', f"缓存创建后立即删除: {cache_key[:8]}...",
extra={'request_type': request_type})
# 返回响应
return (response, "success")
except asyncio.CancelledError:
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message':"请求被取消"}
log('info', "请求取消", extra=extra_log)
# 在请求被取消时先检查缓存中是否已有结果
cached_response, cache_hit = response_cache_manager.get(cache_key)
if cache_hit:
log('info', f"请求取消但找到有效缓存,使用缓存响应: {cache_key[:8]}...",
extra={'cache_operation': 'use-cache-on-cancel', 'request_type': request_type})
# 安全删除缓存
if cache_key in response_cache_manager.cache:
del response_cache_manager.cache[cache_key]
log('info', f"缓存使用后已删除: {cache_key[:8]}...",
extra={'cache_operation': 'used-and-removed', 'request_type': request_type})
return (cached_response, "success")
# 尝试完成正在进行的API请求
if not gemini_task.done():
log('info', "请求取消但API请求尚未完成,继续等待...",
extra={'key': current_api_key[:8], 'request_type': request_type})
# 使用shield确保任务不会被取消
response_content = await asyncio.shield(gemini_task)
# 检查响应内容是否为空
if not response_content or not response_content.text:
log('warning', f"非流式请求(取消后): API密钥 {current_api_key[:8]}... 返回空响应",
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
return (None, "empty")
update_api_call_stats(settings.api_call_stats, endpoint=current_api_key, model=chat_request.model)
# 创建响应
from app.utils.response import create_response
response = create_response(chat_request, response_content)
# 不缓存这个响应,直接返回
return (response, "success")
else:
# 任务已完成,获取结果
response_content = gemini_task.result()
# 检查响应内容是否为空
if not response_content or not response_content.text:
log('warning', f"非流式请求(已完成): API密钥 {current_api_key[:8]}... 返回空响应",
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
return (None, "empty")
# 创建响应
from app.utils.response import create_response
response = create_response(chat_request, response_content)
# 不缓存这个响应,直接返回
return (response, "success")
except HTTPException as e:
if e.status_code == status.HTTP_408_REQUEST_TIMEOUT:
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model,
'status_code': 408, 'error_message': '客户端连接中断'}
log('error', "客户端连接中断,终止后续重试", extra=extra_log)
raise
else:
raise
except Exception as e:
# 其他异常,返回None以便并发请求可以继续尝试其他密钥
log('error', f"非流式请求异常: {str(e)[:8]}",
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model})
return (None, "error")
# route 中发起请求的处理函数
async def process_request(
chat_request: ChatCompletionRequest,
http_request: Request,
request_type: Literal['stream', 'non-stream'],
key_manager,
response_cache_manager,
active_requests_manager,
safety_settings,
safety_settings_g2,
api_call_stats,
cache_key: str = None,
client_ip: str = None
):
"""处理非流式请求"""
global current_api_key
# 转换消息格式
contents, system_instruction = GeminiClient.convert_messages(
GeminiClient, chat_request.messages,model=chat_request.model)
# 非流式请求处理 - 使用并发请求
# 重置已尝试的密钥
key_manager.reset_tried_keys_for_request()
# 设置初始并发数
current_concurrent = settings.CONCURRENT_REQUESTS
# 获取所有可用的API密钥
all_keys = key_manager.api_keys.copy()
random.shuffle(all_keys)
# 如果可用密钥数量小于并发数,则使用所有可用密钥
if len(all_keys) < current_concurrent:
current_concurrent = len(all_keys)
# 尝试使用不同API密钥,直到所有密钥都尝试过
while all_keys:
# 获取当前批次的密钥
current_batch = all_keys[:current_concurrent]
all_keys = all_keys[current_concurrent:]
# 创建并发任务
tasks = []
for api_key in current_batch:
# 记录当前尝试的密钥信息
log('info', f"并发请求使用密钥: {api_key[:8]}...",
extra={'key': api_key[:8], 'request_type': request_type, 'model': chat_request.model})
# 创建任务
task = asyncio.create_task(
process_nonstream_request(
chat_request,
http_request,
request_type,
contents,
system_instruction,
api_key,
response_cache_manager,
active_requests_manager,
safety_settings,
safety_settings_g2,
api_call_stats,
cache_key,
client_ip
)
)
tasks.append((api_key, task))
# 等待所有任务完成
done, pending = await asyncio.wait(
[task for _, task in tasks],
return_when=asyncio.ALL_COMPLETED
)
# 检查是否有成功的响应
success = False
for api_key, task in tasks:
if task in done:
try:
result, status = task.result()
if status == "success" and result: # 如果有成功响应内容
success = True
log('info', f"并发请求成功,使用密钥: {api_key[:8]}...",
extra={'key': api_key[:8], 'request_type': request_type, 'model': chat_request.model})
return result
except Exception as e:
# 使用统一的API错误处理函数
error_result = await handle_api_error(
e,
api_key,
key_manager,
request_type,
chat_request.model,
0
)
# 如果需要删除缓存,清除缓存
if error_result.get('remove_cache', False) and cache_key and cache_key in response_cache_manager.cache:
log('info', f"因API错误,删除缓存: {cache_key[:8]}...",
extra={'cache_operation': 'remove-on-error', 'request_type': request_type})
del response_cache_manager.cache[cache_key]
# 如果所有请求都失败或返回空响应,增加并发数并继续尝试
if not success and all_keys:
# 增加并发数,但不超过最大并发数
current_concurrent = min(current_concurrent + settings.INCREASE_CONCURRENT_ON_FAILURE, settings.MAX_CONCURRENT_REQUESTS)
log('info', f"所有并发请求失败或返回空响应,增加并发数至: {current_concurrent}",
extra={'request_type': request_type, 'model': chat_request.model})
# 如果所有尝试都失败
msg = "所有API密钥均请求失败,请稍后重试"
log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'request_type': 'switch_key'})