Spaces:
Sleeping
Sleeping
| """ | |
| OpenAI Transfer Module - Handles conversion between OpenAI and Gemini API formats | |
| 被openai-router调用,负责OpenAI格式与Gemini格式的双向转换 | |
| """ | |
| import time | |
| import uuid | |
| from typing import Dict, Any | |
| from config import ( | |
| DEFAULT_SAFETY_SETTINGS, | |
| get_base_model_name, | |
| get_thinking_budget, | |
| is_search_model, | |
| should_include_thoughts, | |
| get_compatibility_mode_enabled | |
| ) | |
| from log import log | |
| from .models import ChatCompletionRequest | |
| async def openai_request_to_gemini_payload(openai_request: ChatCompletionRequest) -> Dict[str, Any]: | |
| """ | |
| 将OpenAI聊天完成请求直接转换为完整的Gemini API payload格式 | |
| Args: | |
| openai_request: OpenAI格式请求对象 | |
| Returns: | |
| 完整的Gemini API payload,包含model和request字段 | |
| """ | |
| contents = [] | |
| system_instructions = [] | |
| # 检查是否启用兼容性模式 | |
| compatibility_mode = await get_compatibility_mode_enabled() | |
| # 处理对话中的每条消息 | |
| # 第一阶段:收集连续的system消息到system_instruction中(除非在兼容性模式下) | |
| collecting_system = True if not compatibility_mode else False | |
| for message in openai_request.messages: | |
| role = message.role | |
| # 处理系统消息 | |
| if role == "system": | |
| if compatibility_mode: | |
| # 兼容性模式:所有system消息转换为user消息 | |
| role = "user" | |
| elif collecting_system: | |
| # 正常模式:仍在收集连续的system消息 | |
| if isinstance(message.content, str): | |
| system_instructions.append(message.content) | |
| elif isinstance(message.content, list): | |
| # 处理列表格式的系统消息 | |
| for part in message.content: | |
| if part.get("type") == "text" and part.get("text"): | |
| system_instructions.append(part["text"]) | |
| continue | |
| else: | |
| # 正常模式:后续的system消息转换为user消息 | |
| role = "user" | |
| else: | |
| # 遇到非system消息,停止收集system消息 | |
| collecting_system = False | |
| # 将OpenAI角色映射到Gemini角色 | |
| if role == "assistant": | |
| role = "model" | |
| # 处理普通内容 | |
| if isinstance(message.content, list): | |
| parts = [] | |
| for part in message.content: | |
| if part.get("type") == "text": | |
| parts.append({"text": part.get("text", "")}) | |
| elif part.get("type") == "image_url": | |
| image_url = part.get("image_url", {}).get("url") | |
| if image_url: | |
| # 解析数据URI: "data:image/jpeg;base64,{base64_image}" | |
| try: | |
| mime_type, base64_data = image_url.split(";") | |
| _, mime_type = mime_type.split(":") | |
| _, base64_data = base64_data.split(",") | |
| parts.append({ | |
| "inlineData": { | |
| "mimeType": mime_type, | |
| "data": base64_data | |
| } | |
| }) | |
| except ValueError: | |
| continue | |
| contents.append({"role": role, "parts": parts}) | |
| # log.debug(f"Added message to contents: role={role}, parts={parts}") | |
| elif message.content: | |
| # 简单文本内容 | |
| contents.append({"role": role, "parts": [{"text": message.content}]}) | |
| # log.debug(f"Added message to contents: role={role}, content={message.content}") | |
| # 将OpenAI生成参数映射到Gemini格式 | |
| generation_config = {} | |
| if openai_request.temperature is not None: | |
| generation_config["temperature"] = openai_request.temperature | |
| if openai_request.top_p is not None: | |
| generation_config["topP"] = openai_request.top_p | |
| if openai_request.max_tokens is not None: | |
| generation_config["maxOutputTokens"] = openai_request.max_tokens | |
| if openai_request.stop is not None: | |
| # Gemini支持停止序列 | |
| if isinstance(openai_request.stop, str): | |
| generation_config["stopSequences"] = [openai_request.stop] | |
| elif isinstance(openai_request.stop, list): | |
| generation_config["stopSequences"] = openai_request.stop | |
| if openai_request.frequency_penalty is not None: | |
| generation_config["frequencyPenalty"] = openai_request.frequency_penalty | |
| if openai_request.presence_penalty is not None: | |
| generation_config["presencePenalty"] = openai_request.presence_penalty | |
| if openai_request.n is not None: | |
| generation_config["candidateCount"] = openai_request.n | |
| if openai_request.seed is not None: | |
| generation_config["seed"] = openai_request.seed | |
| if openai_request.response_format is not None: | |
| # 处理JSON模式 | |
| if openai_request.response_format.get("type") == "json_object": | |
| generation_config["responseMimeType"] = "application/json" | |
| # 如果contents为空(只有系统消息的情况),添加一个默认的用户消息以满足Gemini API要求 | |
| if not contents: | |
| contents.append({"role": "user", "parts": [{"text": "请根据系统指令回答。"}]}) | |
| # 构建请求数据 | |
| request_data = { | |
| "contents": contents, | |
| "generationConfig": generation_config, | |
| "safetySettings": DEFAULT_SAFETY_SETTINGS, | |
| } | |
| # 如果有系统消息且未启用兼容性模式,添加systemInstruction | |
| if system_instructions and not compatibility_mode: | |
| combined_system_instruction = "\n\n".join(system_instructions) | |
| request_data["systemInstruction"] = {"parts": [{"text": combined_system_instruction}]} | |
| log.debug(f"Final request payload contents count: {len(contents)}, system_instruction: {bool(system_instructions and not compatibility_mode)}, compatibility_mode: {compatibility_mode}") | |
| # 为thinking模型添加thinking配置 | |
| thinking_budget = get_thinking_budget(openai_request.model) | |
| if thinking_budget is not None: | |
| request_data["generationConfig"]["thinkingConfig"] = { | |
| "thinkingBudget": thinking_budget, | |
| "includeThoughts": should_include_thoughts(openai_request.model) | |
| } | |
| # 为搜索模型添加Google Search工具 | |
| if is_search_model(openai_request.model): | |
| request_data["tools"] = [{"googleSearch": {}}] | |
| # 移除None值 | |
| request_data = {k: v for k, v in request_data.items() if v is not None} | |
| # 返回完整的Gemini API payload格式 | |
| return { | |
| "model": get_base_model_name(openai_request.model), | |
| "request": request_data | |
| } | |
| def _extract_content_and_reasoning(parts: list) -> tuple: | |
| """从Gemini响应部件中提取内容和推理内容""" | |
| content = "" | |
| reasoning_content = "" | |
| for part in parts: | |
| # 处理文本内容 | |
| if part.get("text"): | |
| # 检查这个部件是否包含thinking tokens | |
| if part.get("thought", False): | |
| reasoning_content += part.get("text", "") | |
| else: | |
| content += part.get("text", "") | |
| return content, reasoning_content | |
| def _build_message_with_reasoning(role: str, content: str, reasoning_content: str) -> dict: | |
| """构建包含可选推理内容的消息对象""" | |
| message = { | |
| "role": role, | |
| "content": content | |
| } | |
| # 如果有thinking tokens,添加reasoning_content | |
| if reasoning_content: | |
| message["reasoning_content"] = reasoning_content | |
| return message | |
| def gemini_response_to_openai(gemini_response: Dict[str, Any], model: str) -> Dict[str, Any]: | |
| """ | |
| 将Gemini API响应转换为OpenAI聊天完成格式 | |
| Args: | |
| gemini_response: 来自Gemini API的响应 | |
| model: 要在响应中包含的模型名称 | |
| Returns: | |
| OpenAI聊天完成格式的字典 | |
| """ | |
| choices = [] | |
| for candidate in gemini_response.get("candidates", []): | |
| role = candidate.get("content", {}).get("role", "assistant") | |
| # 将Gemini角色映射回OpenAI角色 | |
| if role == "model": | |
| role = "assistant" | |
| # 提取并分离thinking tokens和常规内容 | |
| parts = candidate.get("content", {}).get("parts", []) | |
| content, reasoning_content = _extract_content_and_reasoning(parts) | |
| # 构建消息对象 | |
| message = _build_message_with_reasoning(role, content, reasoning_content) | |
| choices.append({ | |
| "index": candidate.get("index", 0), | |
| "message": message, | |
| "finish_reason": _map_finish_reason(candidate.get("finishReason")), | |
| }) | |
| return { | |
| "id": str(uuid.uuid4()), | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": choices, | |
| } | |
| def gemini_stream_chunk_to_openai(gemini_chunk: Dict[str, Any], model: str, response_id: str) -> Dict[str, Any]: | |
| """ | |
| 将Gemini流式响应块转换为OpenAI流式格式 | |
| Args: | |
| gemini_chunk: 来自Gemini流式响应的单个块 | |
| model: 要在响应中包含的模型名称 | |
| response_id: 此流式响应的一致ID | |
| Returns: | |
| OpenAI流式格式的字典 | |
| """ | |
| choices = [] | |
| for candidate in gemini_chunk.get("candidates", []): | |
| role = candidate.get("content", {}).get("role", "assistant") | |
| # 将Gemini角色映射回OpenAI角色 | |
| if role == "model": | |
| role = "assistant" | |
| # 提取并分离thinking tokens和常规内容 | |
| parts = candidate.get("content", {}).get("parts", []) | |
| content, reasoning_content = _extract_content_and_reasoning(parts) | |
| # 构建delta对象 | |
| delta = {} | |
| if content: | |
| delta["content"] = content | |
| if reasoning_content: | |
| delta["reasoning_content"] = reasoning_content | |
| choices.append({ | |
| "index": candidate.get("index", 0), | |
| "delta": delta, | |
| "finish_reason": _map_finish_reason(candidate.get("finishReason")), | |
| }) | |
| return { | |
| "id": response_id, | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": choices, | |
| } | |
| def _map_finish_reason(gemini_reason: str) -> str: | |
| """ | |
| 将Gemini结束原因映射到OpenAI结束原因 | |
| Args: | |
| gemini_reason: 来自Gemini API的结束原因 | |
| Returns: | |
| OpenAI兼容的结束原因 | |
| """ | |
| if gemini_reason == "STOP": | |
| return "stop" | |
| elif gemini_reason == "MAX_TOKENS": | |
| return "length" | |
| elif gemini_reason in ["SAFETY", "RECITATION"]: | |
| return "content_filter" | |
| else: | |
| return None | |
| def validate_openai_request(request_data: Dict[str, Any]) -> ChatCompletionRequest: | |
| """ | |
| 验证并标准化OpenAI请求数据 | |
| Args: | |
| request_data: 原始请求数据字典 | |
| Returns: | |
| 验证后的ChatCompletionRequest对象 | |
| Raises: | |
| ValueError: 当请求数据无效时 | |
| """ | |
| try: | |
| return ChatCompletionRequest(**request_data) | |
| except Exception as e: | |
| raise ValueError(f"Invalid OpenAI request format: {str(e)}") | |
| def normalize_openai_request(request_data: ChatCompletionRequest) -> ChatCompletionRequest: | |
| """ | |
| 标准化OpenAI请求数据,应用默认值和限制 | |
| Args: | |
| request_data: 原始请求对象 | |
| Returns: | |
| 标准化后的请求对象 | |
| """ | |
| # 限制max_tokens | |
| if getattr(request_data, "max_tokens", None) is not None and request_data.max_tokens > 65535: | |
| request_data.max_tokens = 65535 | |
| # 覆写 top_k 为 64 | |
| setattr(request_data, "top_k", 64) | |
| # 过滤空消息 | |
| filtered_messages = [] | |
| for m in request_data.messages: | |
| content = getattr(m, "content", None) | |
| if content: | |
| if isinstance(content, str) and content.strip(): | |
| filtered_messages.append(m) | |
| elif isinstance(content, list) and len(content) > 0: | |
| has_valid_content = False | |
| for part in content: | |
| if isinstance(part, dict): | |
| if part.get("type") == "text" and part.get("text", "").strip(): | |
| has_valid_content = True | |
| break | |
| elif part.get("type") == "image_url" and part.get("image_url", {}).get("url"): | |
| has_valid_content = True | |
| break | |
| if has_valid_content: | |
| filtered_messages.append(m) | |
| request_data.messages = filtered_messages | |
| return request_data | |
| def is_health_check_request(request_data: ChatCompletionRequest) -> bool: | |
| """ | |
| 检查是否为健康检查请求 | |
| Args: | |
| request_data: 请求对象 | |
| Returns: | |
| 是否为健康检查请求 | |
| """ | |
| return (len(request_data.messages) == 1 and | |
| getattr(request_data.messages[0], "role", None) == "user" and | |
| getattr(request_data.messages[0], "content", None) == "Hi") | |
| def create_health_check_response() -> Dict[str, Any]: | |
| """ | |
| 创建健康检查响应 | |
| Returns: | |
| 健康检查响应字典 | |
| """ | |
| return { | |
| "choices": [{ | |
| "message": { | |
| "role": "assistant", | |
| "content": "gcli2api正常工作中" | |
| } | |
| }] | |
| } | |
| def extract_model_settings(model: str) -> Dict[str, Any]: | |
| """ | |
| 从模型名称中提取设置信息 | |
| Args: | |
| model: 模型名称 | |
| Returns: | |
| 包含模型设置的字典 | |
| """ | |
| return { | |
| "base_model": get_base_model_name(model), | |
| "use_fake_streaming": model.endswith("-假流式"), | |
| "thinking_budget": get_thinking_budget(model), | |
| "include_thoughts": should_include_thoughts(model) | |
| } |