web2api / core /protocol /service.py
ohmyapi's picture
feat: align hosted Space deployment with latest upstream
77169b4
"""Canonical 请求桥接到 OpenAI 语义事件流(唯一中间态)。"""
from __future__ import annotations
from collections.abc import AsyncIterator
from core.api.chat_handler import ChatHandler
from core.api.schemas import (
InputAttachment,
OpenAIChatRequest,
OpenAIContentPart,
OpenAIMessage,
)
from core.protocol.images import (
MAX_IMAGE_COUNT,
download_remote_image,
parse_base64_image,
parse_data_url,
)
from core.hub.schemas import OpenAIStreamEvent
from core.protocol.schemas import CanonicalChatRequest, CanonicalContentBlock, CanonicalMessage
class CanonicalChatService:
def __init__(self, handler: ChatHandler) -> None:
self._handler = handler
async def stream_raw(
self, req: CanonicalChatRequest
) -> AsyncIterator[OpenAIStreamEvent]:
openai_req = await self._to_openai_request(req)
async for event in self._handler.stream_openai_events(req.provider, openai_req):
yield event
async def collect_raw(self, req: CanonicalChatRequest) -> list[OpenAIStreamEvent]:
events: list[OpenAIStreamEvent] = []
async for event in self.stream_raw(req):
events.append(event)
return events
async def _to_openai_request(self, req: CanonicalChatRequest) -> OpenAIChatRequest:
messages: list[OpenAIMessage] = []
if req.system:
messages.append(
OpenAIMessage(
role="system",
content=self._to_openai_content(req.system),
)
)
for msg in req.messages:
messages.append(
OpenAIMessage(
role=msg.role,
content=self._to_openai_content(msg.content),
tool_call_id=msg.content[0].tool_use_id
if msg.role == "tool" and msg.content
else None,
)
)
openai_tools = [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.input_schema,
"strict": tool.strict,
},
}
for tool in req.tools
]
last_user_attachments, all_attachments = await self._resolve_attachments(req)
return OpenAIChatRequest(
model=req.model,
messages=messages,
stream=req.stream,
tools=openai_tools or None,
tool_choice=req.tool_choice,
resume_session_id=req.resume_session_id,
upstream_model=str(req.metadata.get("upstream_model") or "") or None,
# 由 ChatHandler 根据是否 full_history 选择实际赋值给 attachment_files
attachment_files=[],
attachment_files_last_user=last_user_attachments,
attachment_files_all_users=all_attachments,
)
async def _resolve_attachments(
self, req: CanonicalChatRequest
) -> tuple[list[InputAttachment], list[InputAttachment]]:
"""
解析图片附件,返回 (last_user_attachments, all_user_attachments):
- 复用会话(full_history=False)时,仅需最后一条 user 的图片;
- 重建会话(full_history=True)时,需要把所有历史 user 的图片一并补上。
"""
last_user: CanonicalMessage | None = None
for msg in reversed(req.messages):
if msg.role == "user":
last_user = msg
break
# 所有 user 消息里的图片(用于重建会话补历史)
all_image_blocks: list[CanonicalContentBlock] = []
for msg in req.messages:
if msg.role != "user":
continue
all_image_blocks.extend(
block for block in msg.content if block.type == "image"
)
last_user_blocks: list[CanonicalContentBlock] = []
if last_user is not None:
last_user_blocks = [
block for block in last_user.content if block.type == "image"
]
if len(all_image_blocks) > MAX_IMAGE_COUNT:
raise ValueError(f"单次最多上传 {MAX_IMAGE_COUNT} 张图片")
async def _prepare(
blocks: list[CanonicalContentBlock],
) -> list[InputAttachment]:
attachments: list[InputAttachment] = []
for idx, block in enumerate(blocks, start=1):
if block.url:
prepared = await download_remote_image(
block.url, prefix=f"message_image_{idx}"
)
elif block.data and block.data.startswith("data:"):
prepared = parse_data_url(block.data, prefix=f"message_image_{idx}")
elif block.data and block.mime_type:
prepared = parse_base64_image(
block.data,
block.mime_type,
prefix=f"message_image_{idx}",
)
else:
raise ValueError("图片块缺少可用数据")
attachments.append(
InputAttachment(
filename=prepared.filename,
mime_type=prepared.mime_type,
data=prepared.data,
)
)
return attachments
last_attachments = await _prepare(last_user_blocks)
all_attachments = await _prepare(all_image_blocks)
return last_attachments, all_attachments
@staticmethod
def _to_openai_content(
blocks: list[CanonicalContentBlock],
) -> str | list[OpenAIContentPart]:
if not blocks:
return ""
parts: list[OpenAIContentPart] = []
for block in blocks:
if block.type in {"text", "thinking", "tool_result"}:
parts.append(OpenAIContentPart(type="text", text=block.text or ""))
elif block.type == "image":
url = block.url or block.data or ""
parts.append(
OpenAIContentPart(
type="image_url",
image_url={"url": url},
)
)
if not parts:
return ""
if len(parts) == 1 and parts[0].type == "text":
return parts[0].text or ""
return parts