| """内部 OpenAI 兼容代理逻辑。""" |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| import uuid |
| from datetime import datetime, timezone |
| from time import perf_counter |
| from typing import Any |
|
|
| import httpx |
| from fastapi import HTTPException, Request |
| from fastapi.responses import JSONResponse, Response |
|
|
| import auth |
| import billing |
|
|
| logger = logging.getLogger("gateway") |
|
|
|
|
| OPENAI_UPSTREAM_CHAT_URL = os.environ.get( |
| "OPENAI_UPSTREAM_CHAT_URL", "https://api.openai.com/v1/chat/completions" |
| ) |
| OPENAI_REAL_API_KEY = os.environ.get("OPENAI_API_KEY", "").strip() |
|
|
| LOCALHOSTS = frozenset({"127.0.0.1", "::1", "localhost"}) |
|
|
|
|
| |
| MODEL_ROUTE_TABLE: dict[str, dict[str, Any]] = { |
| "SiliconFlowFree": { |
| "route_type": "chatproxy", |
| "base_urls": [ |
| "https://api1.pdf2zh-next.com/chatproxy", |
| "https://api2.pdf2zh-next.com/chatproxy", |
| ], |
| "api_key": "", |
| |
| |
| |
| "billing_mode": "none", |
| } |
| } |
|
|
|
|
| def _extract_bearer_token(request: Request) -> str: |
| """从 Authorization 头中提取 Bearer token。""" |
| header = request.headers.get("authorization", "") |
| if not header.lower().startswith("bearer "): |
| raise HTTPException(status_code=401, detail="Missing bearer token") |
| token = header[7:].strip() |
| if not token: |
| raise HTTPException(status_code=401, detail="Missing bearer token") |
| return token |
|
|
|
|
| def _require_localhost(request: Request) -> None: |
| """限制仅允许本地回环地址访问。""" |
| client = request.client |
| host = client.host if client else "" |
| if host not in LOCALHOSTS: |
| raise HTTPException(status_code=403, detail="Internal endpoint only") |
|
|
|
|
| def _extract_text_from_message_content(content: Any) -> str: |
| if isinstance(content, str): |
| return content |
| if not isinstance(content, list): |
| return "" |
|
|
| parts: list[str] = [] |
| for item in content: |
| if not isinstance(item, dict): |
| continue |
| if item.get("type") != "text": |
| continue |
| text = item.get("text") |
| if isinstance(text, str): |
| parts.append(text) |
| return "".join(parts) |
|
|
|
|
| def _extract_text_from_messages(messages: Any) -> str: |
| if not isinstance(messages, list): |
| raise HTTPException(status_code=400, detail="messages must be a list") |
|
|
| for message in reversed(messages): |
| if not isinstance(message, dict): |
| continue |
| if message.get("role") != "user": |
| continue |
| text = _extract_text_from_message_content(message.get("content")) |
| if text: |
| return text |
|
|
| for message in reversed(messages): |
| if not isinstance(message, dict): |
| continue |
| text = _extract_text_from_message_content(message.get("content")) |
| if text: |
| return text |
|
|
| raise HTTPException(status_code=400, detail="messages does not contain text content") |
|
|
|
|
| def _should_request_json_mode(payload: dict[str, Any]) -> bool: |
| response_format = payload.get("response_format") |
| if not isinstance(response_format, dict): |
| return False |
| return response_format.get("type") == "json_object" |
|
|
|
|
| def _build_openai_compatible_response(model: str, content: str) -> dict[str, Any]: |
| return { |
| "id": f"chatcmpl-{uuid.uuid4().hex}", |
| "object": "chat.completion", |
| "created": int(datetime.now(timezone.utc).timestamp()), |
| "model": model, |
| "choices": [ |
| { |
| "index": 0, |
| "message": { |
| "role": "assistant", |
| "content": content, |
| }, |
| "finish_reason": "stop", |
| } |
| ], |
| "usage": { |
| "prompt_tokens": 0, |
| "completion_tokens": 0, |
| "total_tokens": 0, |
| }, |
| } |
|
|
|
|
| async def _forward_to_chatproxy( |
| http_client: httpx.AsyncClient, |
| payload: dict[str, Any], |
| model: str, |
| route: dict[str, Any], |
| request_id: str | None = None, |
| ) -> dict[str, Any]: |
| base_urls = route.get("base_urls", []) |
| if not isinstance(base_urls, list) or not base_urls: |
| raise HTTPException(status_code=500, detail=f"No upstream configured for model {model}") |
|
|
| request_json = { |
| "text": _extract_text_from_messages(payload.get("messages")), |
| } |
| if _should_request_json_mode(payload): |
| request_json["requestJsonMode"] = True |
|
|
| api_key = str(route.get("api_key") or "").strip() |
| headers = {"Content-Type": "application/json"} |
| if api_key: |
| headers["Authorization"] = f"Bearer {api_key}" |
|
|
| last_error = "No available upstream" |
| for base_url in base_urls: |
| try: |
| upstream = await http_client.post( |
| str(base_url), |
| headers=headers, |
| json=request_json, |
| ) |
| except httpx.HTTPError as exc: |
| last_error = str(exc) |
| logger.warning( |
| "chatproxy call failed: request_id=%s model=%s url=%s error=%s", |
| request_id, |
| model, |
| base_url, |
| exc, |
| ) |
| continue |
|
|
| if upstream.status_code >= 400: |
| last_error = f"status={upstream.status_code}" |
| logger.warning( |
| "chatproxy upstream returned error: request_id=%s model=%s url=%s status=%s", |
| request_id, |
| model, |
| base_url, |
| upstream.status_code, |
| ) |
| continue |
|
|
| try: |
| body = upstream.json() |
| except Exception as exc: |
| last_error = f"invalid json response: {exc}" |
| logger.warning( |
| "chatproxy upstream returned invalid json: request_id=%s model=%s url=%s", |
| request_id, |
| model, |
| base_url, |
| ) |
| continue |
|
|
| content = body.get("content") |
| if not isinstance(content, str): |
| last_error = "missing content field" |
| logger.warning( |
| "chatproxy upstream missing content: request_id=%s model=%s url=%s body=%s", |
| request_id, |
| model, |
| base_url, |
| body, |
| ) |
| continue |
|
|
| return { |
| "openai_response": _build_openai_compatible_response( |
| model=model, |
| content=content, |
| ), |
| "raw_upstream_body": body, |
| } |
|
|
| raise HTTPException( |
| status_code=502, |
| detail=f"All chatproxy upstreams failed for model {model}: {last_error}", |
| ) |
|
|
|
|
| async def handle_internal_chat_completions( |
| request: Request, |
| http_client: httpx.AsyncClient | None, |
| active_job_by_user: dict[str, str], |
| ) -> Response: |
| """处理内部 OpenAI 兼容聊天接口请求。""" |
| _require_localhost(request) |
| token = _extract_bearer_token(request) |
|
|
| username = auth.INTERNAL_KEY_TO_USER.get(token) |
| if not username: |
| raise HTTPException(status_code=401, detail="Invalid internal API key") |
|
|
| try: |
| payload = await request.json() |
| except json.JSONDecodeError as exc: |
| raise HTTPException(status_code=400, detail=f"Invalid JSON body: {exc}") from exc |
|
|
| if payload.get("stream"): |
| raise HTTPException(status_code=400, detail="stream=true is not supported") |
|
|
| if http_client is None: |
| raise HTTPException(status_code=500, detail="HTTP client is not ready") |
|
|
| model = str(payload.get("model") or "").strip() |
| if not model: |
| raise HTTPException(status_code=400, detail="model is required") |
|
|
| request_id = uuid.uuid4().hex[:8] |
| route = MODEL_ROUTE_TABLE.get(model) |
| job_id = active_job_by_user.get(username) |
| route_type = route.get("route_type") if route else "openai" |
|
|
| logger.info( |
| "Internal chat request start: request_id=%s username=%s model=%s route_type=%s job_id=%s", |
| request_id, |
| username, |
| model, |
| route_type, |
| job_id, |
| ) |
|
|
| if route and route.get("route_type") == "chatproxy": |
| t0 = perf_counter() |
| try: |
| result = await _forward_to_chatproxy( |
| http_client=http_client, |
| payload=payload, |
| model=model, |
| route=route, |
| request_id=request_id, |
| ) |
| response_json = result.get("openai_response") or {} |
| raw_body = result.get("raw_upstream_body") or {} |
|
|
| billing_mode = str(route.get("billing_mode") or "none") |
| if billing_mode == "upstream_usage": |
| usage = raw_body.get("usage") or {} |
| prompt_tokens = int(usage.get("prompt_tokens") or 0) |
| completion_tokens = int(usage.get("completion_tokens") or 0) |
| total_tokens = int( |
| usage.get("total_tokens") or (prompt_tokens + completion_tokens) |
| ) |
| if total_tokens > 0: |
| billing.record_usage( |
| username=username, |
| job_id=job_id, |
| model=model, |
| prompt_tokens=prompt_tokens, |
| completion_tokens=completion_tokens, |
| total_tokens=total_tokens, |
| ) |
|
|
| return JSONResponse(response_json, status_code=200) |
| finally: |
| duration = perf_counter() - t0 |
| logger.info( |
| "Internal chat request finished: request_id=%s route_type=chatproxy duration=%.3fs", |
| request_id, |
| duration, |
| ) |
|
|
| if not OPENAI_REAL_API_KEY: |
| raise HTTPException(status_code=500, detail="OPENAI_API_KEY is not configured") |
|
|
| headers = { |
| "Authorization": f"Bearer {OPENAI_REAL_API_KEY}", |
| "Content-Type": "application/json", |
| } |
|
|
| upstream: httpx.Response | None = None |
| t0 = perf_counter() |
| try: |
| upstream = await http_client.post( |
| OPENAI_UPSTREAM_CHAT_URL, |
| headers=headers, |
| json=payload, |
| ) |
| except httpx.HTTPError as exc: |
| duration = perf_counter() - t0 |
| logger.error( |
| "Upstream OpenAI call failed: request_id=%s model=%s username=%s duration=%.3fs error=%s", |
| request_id, |
| model, |
| username, |
| duration, |
| exc, |
| ) |
| raise HTTPException(status_code=502, detail="Upstream OpenAI request failed") from exc |
| else: |
| duration = perf_counter() - t0 |
| logger.info( |
| "Upstream OpenAI call finished: request_id=%s model=%s status=%s duration=%.3fs", |
| request_id, |
| model, |
| upstream.status_code, |
| duration, |
| ) |
|
|
| response_json: dict[str, Any] | None = None |
| content_type = upstream.headers.get("content-type", "") |
| if "application/json" in content_type.lower(): |
| try: |
| response_json = upstream.json() |
| except Exception: |
| response_json = None |
|
|
| if upstream.status_code < 400 and response_json is not None: |
| usage = response_json.get("usage") or {} |
| prompt_tokens = int(usage.get("prompt_tokens") or 0) |
| completion_tokens = int(usage.get("completion_tokens") or 0) |
| total_tokens = int( |
| usage.get("total_tokens") or (prompt_tokens + completion_tokens) |
| ) |
|
|
| if ( |
| prompt_tokens == 0 |
| and completion_tokens == 0 |
| and total_tokens == 0 |
| ): |
| logger.warning( |
| "Upstream OpenAI response missing or zero usage: request_id=%s model=%s username=%s", |
| request_id, |
| model, |
| username, |
| ) |
|
|
| job_id = active_job_by_user.get(username) |
|
|
| billing.record_usage( |
| username=username, |
| job_id=job_id, |
| model=model, |
| prompt_tokens=prompt_tokens, |
| completion_tokens=completion_tokens, |
| total_tokens=total_tokens, |
| ) |
|
|
| if response_json is not None: |
| return JSONResponse(response_json, status_code=upstream.status_code) |
|
|
| return Response( |
| content=upstream.content, |
| status_code=upstream.status_code, |
| media_type=content_type or None, |
| ) |
|
|