fufeigemini / app /api /nonstream_handlers.py
Leeflour's picture
Upload 197 files
d0dd276 verified
import asyncio
from fastapi import HTTPException, Request
from fastapi.responses import StreamingResponse, Response
from app.models.schemas import ChatCompletionRequest
from app.services import GeminiClient
from app.utils import update_api_call_stats
from app.utils.error_handling import handle_gemini_error
from app.utils.logging import log
import app.config.settings as settings
from typing import Literal
from app.utils.response import gemini_from_text, openAI_from_Gemini, openAI_from_text
from app.utils.stats import get_api_key_usage
# 非流式请求处理函数
async def process_nonstream_request(
chat_request: ChatCompletionRequest,
contents,
system_instruction,
current_api_key: str,
response_cache_manager,
safety_settings,
safety_settings_g2,
cache_key: str
):
"""处理非流式API请求"""
gemini_client = GeminiClient(current_api_key)
# 创建调用 Gemini API 的主任务
gemini_task = asyncio.create_task(
gemini_client.complete_chat(
chat_request,
contents,
safety_settings_g2 if 'gemini-2.5' in chat_request.model else safety_settings,
system_instruction
)
)
# 使用 shield 保护任务不被外部轻易取消
shielded_gemini_task = asyncio.shield(gemini_task)
try:
# 等待受保护的 API 调用任务完成
response_content = await shielded_gemini_task
response_content.set_model(chat_request.model)
# 检查响应内容是否为空
if not response_content or (not response_content.text and not response_content.function_call):
log('warning', f"API密钥 {current_api_key[:8]}... 返回空响应",
extra={'key': current_api_key[:8], 'request_type': 'non-stream', 'model': chat_request.model})
return "empty"
# 缓存响应结果
await response_cache_manager.store(cache_key, response_content)
# 更新 API 调用统计
await update_api_call_stats(settings.api_call_stats, endpoint=current_api_key, model=chat_request.model,token=response_content.total_token_count)
return "success"
except Exception as e:
# 处理 API 调用过程中可能发生的任何异常
handle_gemini_error(e, current_api_key)
return "error"
# 带保活功能的非流式请求处理函数
async def process_nonstream_request_with_keepalive(
chat_request: ChatCompletionRequest,
contents,
system_instruction,
current_api_key: str,
response_cache_manager,
safety_settings,
safety_settings_g2,
cache_key: str,
keepalive_interval: float = 30.0 # 保活间隔,默认30秒
):
"""处理非流式API请求,带TCP保活功能"""
gemini_client = GeminiClient(current_api_key)
# 创建调用 Gemini API 的主任务
gemini_task = asyncio.create_task(
gemini_client.complete_chat(
chat_request,
contents,
safety_settings_g2 if 'gemini-2.5' in chat_request.model else safety_settings,
system_instruction
)
)
# 创建保活任务
keepalive_task = asyncio.create_task(
send_keepalive_messages(keepalive_interval)
)
try:
# 等待Gemini任务完成
response_content = await gemini_task
# 取消保活任务
keepalive_task.cancel()
response_content.set_model(chat_request.model)
# 检查响应内容是否为空
if not response_content or (not response_content.text and not response_content.function_call):
log('warning', f"API密钥 {current_api_key[:8]}... 返回空响应",
extra={'key': current_api_key[:8], 'request_type': 'non-stream', 'model': chat_request.model})
return "empty"
# 缓存响应结果
await response_cache_manager.store(cache_key, response_content)
# 更新 API 调用统计
await update_api_call_stats(settings.api_call_stats, endpoint=current_api_key, model=chat_request.model,token=response_content.total_token_count)
return "success"
except Exception as e:
# 取消保活任务
keepalive_task.cancel()
# 处理 API 调用过程中可能发生的任何异常
handle_gemini_error(e, current_api_key)
return "error"
# 简化的保活功能 - 在等待期间发送换行符
async def process_nonstream_request_with_simple_keepalive(
chat_request: ChatCompletionRequest,
contents,
system_instruction,
current_api_key: str,
response_cache_manager,
safety_settings,
safety_settings_g2,
cache_key: str,
keepalive_interval: float = 30.0 # 保活间隔,默认30秒
):
"""处理非流式API请求,带简化TCP保活功能"""
gemini_client = GeminiClient(current_api_key)
# 创建调用 Gemini API 的主任务
gemini_task = asyncio.create_task(
gemini_client.complete_chat(
chat_request,
contents,
safety_settings_g2 if 'gemini-2.5' in chat_request.model else safety_settings,
system_instruction
)
)
# 创建保活任务
keepalive_task = asyncio.create_task(
send_keepalive_messages(keepalive_interval)
)
try:
# 等待Gemini任务完成
response_content = await gemini_task
# 取消保活任务
keepalive_task.cancel()
response_content.set_model(chat_request.model)
# 检查响应内容是否为空
if not response_content or (not response_content.text and not response_content.function_call):
log('warning', f"API密钥 {current_api_key[:8]}... 返回空响应",
extra={'key': current_api_key[:8], 'request_type': 'non-stream', 'model': chat_request.model})
return "empty"
# 缓存响应结果
await response_cache_manager.store(cache_key, response_content)
# 更新 API 调用统计
await update_api_call_stats(settings.api_call_stats, endpoint=current_api_key, model=chat_request.model,token=response_content.total_token_count)
return "success"
except Exception as e:
# 取消保活任务
keepalive_task.cancel()
# 处理 API 调用过程中可能发生的任何异常
handle_gemini_error(e, current_api_key)
return "error"
async def send_keepalive_messages(interval: float):
"""发送保活消息的任务"""
try:
while True:
await asyncio.sleep(interval)
# 发送换行符作为保活消息
# 注意:在非流式响应中,我们无法直接发送数据到客户端
except asyncio.CancelledError:
# 任务被取消时正常退出
pass
except Exception as e:
log('error', f"保活任务出错: {str(e)}",
extra={'request_type': 'non-stream', 'keepalive': True})
# 处理 route 中发起请求的函数
async def process_request(
chat_request,
key_manager,
response_cache_manager,
safety_settings,
safety_settings_g2,
cache_key: str
):
"""处理非流式请求"""
global current_api_key
format_type = getattr(chat_request, 'format_type', None)
if format_type and (format_type == "gemini"):
is_gemini = True
contents, system_instruction = None,None
else:
is_gemini = False
# 转换消息格式
contents, system_instruction = GeminiClient.convert_messages(GeminiClient, chat_request.messages,model=chat_request.model)
# 设置初始并发数
current_concurrent = settings.CONCURRENT_REQUESTS
max_retry_num = settings.MAX_RETRY_NUM
# 当前请求次数
current_try_num = 0
# 空响应计数
empty_response_count = 0
# 尝试使用不同API密钥,直到达到最大重试次数或空响应限制
while (current_try_num < max_retry_num) and (empty_response_count < settings.MAX_EMPTY_RESPONSES):
# 获取当前批次的密钥数量
batch_num = min(max_retry_num - current_try_num, current_concurrent)
# 获取当前批次的密钥
valid_keys = []
checked_keys = set() # 用于记录已检查过的密钥
all_keys_checked = False # 标记是否已检查所有密钥
# 尝试获取足够数量的有效密钥
while len(valid_keys) < batch_num:
api_key = await key_manager.get_available_key()
if not api_key:
break
# 如果这个密钥已经检查过,说明已经检查了所有密钥
if api_key in checked_keys:
all_keys_checked = True
break
checked_keys.add(api_key)
# 获取API密钥的调用次数
usage = await get_api_key_usage(settings.api_call_stats, api_key)
# 如果调用次数小于限制,则添加到有效密钥列表
if usage < settings.API_KEY_DAILY_LIMIT:
valid_keys.append(api_key)
else:
log('warning', f"API密钥 {api_key[:8]}... 已达到每日调用限制 ({usage}/{settings.API_KEY_DAILY_LIMIT})",
extra={'key': api_key[:8], 'request_type': 'non-stream', 'model': chat_request.model})
# 如果已经检查了所有密钥且没有找到有效密钥,则重置密钥栈
if all_keys_checked and not valid_keys:
log('warning', "所有API密钥已达到每日调用限制,重置密钥栈",
extra={'request_type': 'non-stream', 'model': chat_request.model})
key_manager._reset_key_stack()
# 重置后重新获取一个密钥
api_key = await key_manager.get_available_key()
if api_key:
valid_keys = [api_key]
# 如果没有获取到任何有效密钥,跳出循环
if not valid_keys:
break
# 更新当前尝试次数
current_try_num += len(valid_keys)
# 创建并发任务
tasks = []
tasks_map = {}
for api_key in valid_keys:
# 记录当前尝试的密钥信息
log('info', f"非流式请求开始,使用密钥: {api_key[:8]}...",
extra={'key': api_key[:8], 'request_type': 'non-stream', 'model': chat_request.model})
# 创建任务 - 根据配置决定是否使用保活功能
if settings.NONSTREAM_KEEPALIVE_ENABLED:
task = asyncio.create_task(
process_nonstream_request_with_simple_keepalive(
chat_request,
contents,
system_instruction,
api_key,
response_cache_manager,
safety_settings,
safety_settings_g2,
cache_key,
settings.NONSTREAM_KEEPALIVE_INTERVAL
)
)
else:
task = asyncio.create_task(
process_nonstream_request(
chat_request,
contents,
system_instruction,
api_key,
response_cache_manager,
safety_settings,
safety_settings_g2,
cache_key
)
)
tasks.append((api_key, task))
tasks_map[task] = api_key
# 等待所有任务完成或找到成功响应
success = False
while tasks and not success:
# 短时间等待任务完成
done, pending = await asyncio.wait(
[task for _, task in tasks],
return_when=asyncio.FIRST_COMPLETED
)
# 检查已完成的任务是否成功
for task in done:
api_key = tasks_map[task]
try:
status = task.result()
# 如果有成功响应内容
if status == "success" :
success = True
log('info', f"非流式请求成功",
extra={'key': api_key[:8],'request_type': 'non-stream', 'model': chat_request.model})
cached_response, cache_hit = await response_cache_manager.get_and_remove(cache_key)
if is_gemini :
return cached_response.data
else:
return openAI_from_Gemini(cached_response,stream=False)
elif status == "empty":
# 增加空响应计数
empty_response_count += 1
log('warning', f"空响应计数: {empty_response_count}/{settings.MAX_EMPTY_RESPONSES}",
extra={'key': api_key[:8], 'request_type': 'non-stream', 'model': chat_request.model})
except Exception as e:
handle_gemini_error(e, api_key)
# 更新任务列表,移除已完成的任务
tasks = [(k, t) for k, t in tasks if not t.done()]
# 如果当前批次没有成功响应,并且还有密钥可用,则继续尝试
if not success and valid_keys:
# 增加并发数,但不超过最大并发数
current_concurrent = min(current_concurrent + settings.INCREASE_CONCURRENT_ON_FAILURE, settings.MAX_CONCURRENT_REQUESTS)
log('info', f"所有并发请求失败或返回空响应,增加并发数至: {current_concurrent}",
extra={'request_type': 'non-stream', 'model': chat_request.model})
# 如果空响应次数达到限制,跳出循环,并返回酒馆正常响应(包含错误信息)
if empty_response_count >= settings.MAX_EMPTY_RESPONSES:
log('warning', f"空响应次数达到限制 ({empty_response_count}/{settings.MAX_EMPTY_RESPONSES}),停止轮询",
extra={'request_type': 'non-stream', 'model': chat_request.model})
if is_gemini :
return gemini_from_text(content="空响应次数达到上限\n请修改输入提示词",finish_reason="STOP",stream=False)
else:
return openAI_from_text(model=chat_request.model,content="空响应次数达到上限\n请修改输入提示词",finish_reason="stop",stream=False)
# 如果所有尝试都失败
log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'request_type': 'switch_key'})
if is_gemini:
return gemini_from_text(content="所有API密钥均请求失败\n具体错误请查看轮询日志",finish_reason="STOP",stream=False)
else:
return openAI_from_text(model=chat_request.model,content="所有API密钥均请求失败\n具体错误请查看轮询日志",finish_reason="stop",stream=False)
# raise HTTPException(status_code=500, detail=f"API key 替换失败,所有API key都已尝试,请重新配置或稍后重试")
# 处理带保活的非流式请求(使用流式响应)
async def process_nonstream_with_keepalive_stream(
chat_request,
key_manager,
response_cache_manager,
safety_settings,
safety_settings_g2,
cache_key: str,
is_gemini: bool
):
"""处理带保活的非流式请求,使用流式响应发送保活消息但最终返回非流式格式"""
from fastapi.responses import StreamingResponse
import json
async def keepalive_stream_generator():
"""生成带保活的流式响应"""
try:
# 转换消息格式
format_type = getattr(chat_request, 'format_type', None)
if format_type and (format_type == "gemini"):
contents, system_instruction = None, None
else:
contents, system_instruction = GeminiClient.convert_messages(GeminiClient, chat_request.messages, model=chat_request.model)
# 设置初始并发数
current_concurrent = settings.CONCURRENT_REQUESTS
max_retry_num = settings.MAX_RETRY_NUM
# 当前请求次数
current_try_num = 0
# 空响应计数
empty_response_count = 0
# 尝试使用不同API密钥,直到达到最大重试次数或空响应限制
while (current_try_num < max_retry_num) and (empty_response_count < settings.MAX_EMPTY_RESPONSES):
# 获取当前批次的密钥数量
batch_num = min(max_retry_num - current_try_num, current_concurrent)
# 获取当前批次的密钥
valid_keys = []
checked_keys = set() # 用于记录已检查过的密钥
all_keys_checked = False # 标记是否已检查所有密钥
# 尝试获取足够数量的有效密钥
while len(valid_keys) < batch_num:
api_key = await key_manager.get_available_key()
if not api_key:
break
# 如果这个密钥已经检查过,说明已经检查了所有密钥
if api_key in checked_keys:
all_keys_checked = True
break
checked_keys.add(api_key)
# 获取API密钥的调用次数
usage = await get_api_key_usage(settings.api_call_stats, api_key)
# 如果调用次数小于限制,则添加到有效密钥列表
if usage < settings.API_KEY_DAILY_LIMIT:
valid_keys.append(api_key)
else:
log('warning', f"API密钥 {api_key[:8]}... 已达到每日调用限制 ({usage}/{settings.API_KEY_DAILY_LIMIT})",
extra={'key': api_key[:8], 'request_type': 'non-stream', 'model': chat_request.model})
# 如果已经检查了所有密钥且没有找到有效密钥,则重置密钥栈
if all_keys_checked and not valid_keys:
log('warning', "所有API密钥已达到每日调用限制,重置密钥栈",
extra={'request_type': 'non-stream', 'model': chat_request.model})
key_manager._reset_key_stack()
# 重置后重新获取一个密钥
api_key = await key_manager.get_available_key()
if api_key:
valid_keys = [api_key]
# 如果没有获取到任何有效密钥,跳出循环
if not valid_keys:
break
# 更新当前尝试次数
current_try_num += len(valid_keys)
# 创建并发任务
tasks = []
tasks_map = {}
for api_key in valid_keys:
# 记录当前尝试的密钥信息
log('info', f"非流式请求开始,使用密钥: {api_key[:8]}...",
extra={'key': api_key[:8], 'request_type': 'non-stream', 'model': chat_request.model})
# 创建任务
task = asyncio.create_task(
process_nonstream_request(
chat_request,
contents,
system_instruction,
api_key,
response_cache_manager,
safety_settings,
safety_settings_g2,
cache_key
)
)
tasks.append((api_key, task))
tasks_map[task] = api_key
# 等待所有任务完成或找到成功响应
success = False
keepalive_counter = 0
while tasks and not success:
# 短时间等待任务完成
done, pending = await asyncio.wait(
[task for _, task in tasks],
timeout=settings.NONSTREAM_KEEPALIVE_INTERVAL,
return_when=asyncio.FIRST_COMPLETED
)
# 如果没有任务完成,发送保活消息
if not done:
keepalive_counter += 1
# 发送简单的换行符作为保活消息
yield "\n"
continue
# 检查已完成的任务是否成功
for task in done:
api_key = tasks_map[task]
try:
status = task.result()
# 如果有成功响应内容
if status == "success" :
success = True
log('info', f"非流式请求成功",
extra={'key': api_key[:8],'request_type': 'non-stream', 'model': chat_request.model})
cached_response, cache_hit = await response_cache_manager.get_and_remove(cache_key)
# 发送最终的非流式响应
if is_gemini:
final_response = cached_response.data
else:
final_response = openAI_from_Gemini(cached_response, stream=False)
# 将非流式响应作为字符串发送
yield json.dumps(final_response, ensure_ascii=False)
return
elif status == "empty":
# 增加空响应计数
empty_response_count += 1
log('warning', f"空响应计数: {empty_response_count}/{settings.MAX_EMPTY_RESPONSES}",
extra={'key': api_key[:8], 'request_type': 'non-stream', 'model': chat_request.model})
except Exception as e:
handle_gemini_error(e, api_key)
# 更新任务列表,移除已完成的任务
tasks = [(k, t) for k, t in tasks if not t.done()]
# 如果当前批次没有成功响应,并且还有密钥可用,则继续尝试
if not success and valid_keys:
# 增加并发数,但不超过最大并发数
current_concurrent = min(current_concurrent + settings.INCREASE_CONCURRENT_ON_FAILURE, settings.MAX_CONCURRENT_REQUESTS)
log('info', f"所有并发请求失败或返回空响应,增加并发数至: {current_concurrent}",
extra={'request_type': 'non-stream', 'model': chat_request.model})
# 如果空响应次数达到限制,跳出循环,并返回酒馆正常响应(包含错误信息)
if empty_response_count >= settings.MAX_EMPTY_RESPONSES:
log('warning', f"空响应次数达到限制 ({empty_response_count}/{settings.MAX_EMPTY_RESPONSES}),停止轮询",
extra={'request_type': 'non-stream', 'model': chat_request.model})
if is_gemini :
error_response = gemini_from_text(content="空响应次数达到上限\n请修改输入提示词", finish_reason="STOP", stream=False)
else:
error_response = openAI_from_text(model=chat_request.model, content="空响应次数达到上限\n请修改输入提示词", finish_reason="stop", stream=False)
yield json.dumps(error_response, ensure_ascii=False)
return
# 如果所有尝试都失败
log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'request_type': 'switch_key'})
if is_gemini:
error_response = gemini_from_text(content="所有API密钥均请求失败\n具体错误请查看轮询日志", finish_reason="STOP", stream=False)
else:
error_response = openAI_from_text(model=chat_request.model, content="所有API密钥均请求失败\n具体错误请查看轮询日志", finish_reason="stop", stream=False)
yield json.dumps(error_response, ensure_ascii=False)
except Exception as e:
log('error', f"保活流式处理出错: {str(e)}",
extra={'request_type': 'non-stream', 'keepalive': True})
raise
# 返回流式响应,但使用application/json媒体类型
return StreamingResponse(
keepalive_stream_generator(),
media_type="application/json"
)