| |
| |
| """Freebuff OpenAI API 反代代理 - Hugging Face Docker Space 版 |
| |
| 说明: |
| - 保留原有 OpenAI 兼容接口: |
| - /v1/chat/completions |
| - /v1/responses |
| - /v1/models |
| - /v1/reset-run |
| - /health |
| - 将原来的命令行/TTY 登录改为网页管理页: |
| - / |
| - /admin/login/start |
| - /admin/login/status |
| - 适配 Hugging Face Spaces: |
| - 读取 PORT 环境变量 |
| - 凭据优先保存到 /data(若已开通持久化存储) |
| - 可通过 Secrets 传入 API_KEY / ADMIN_PASSWORD / ACCOUNTS_JSON |
| """ |
|
|
| import asyncio |
| import json |
| import os |
| import platform |
| import random |
| import string |
| import sys |
| import time |
| import uuid |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional, Tuple |
| from urllib.parse import quote |
|
|
| try: |
| import aiohttp |
| from aiohttp import web |
| except ImportError: |
| print("请先安装 aiohttp: pip install aiohttp") |
| sys.exit(1) |
|
|
| API_BASE = os.environ.get("API_BASE", "www.codebuff.com") |
| PORT = int(os.environ.get("PORT", "7860")) |
| HOST = os.environ.get("HOST", "0.0.0.0") |
| POLL_INTERVAL_S = int(os.environ.get("POLL_INTERVAL_S", "5")) |
| TIMEOUT_S = int(os.environ.get("TIMEOUT_S", "300")) |
|
|
| |
| PROXY_API_KEY = os.environ.get("API_KEY", "") |
| |
| ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD", "") |
| FRONTEND_PASSWORD = os.environ.get("FRONTEND_PASSWORD", "") |
| FRONTEND_COOKIE_NAME = "frontend_gate" |
|
|
| MODEL_TO_AGENT = { |
| "minimax/minimax-m2.7": "base2-free", |
| "z-ai/glm-5.1": "base2-free", |
| "google/gemini-2.5-flash-lite": "file-picker", |
| "google/gemini-3.1-flash-lite-preview": "file-picker-max", |
| "google/gemini-3.1-pro-preview": "thinker-with-files-gemini", |
| } |
|
|
| default_model = os.environ.get("DEFAULT_MODEL", "minimax/minimax-m2.7") |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| token_pool: List[Dict[str, Any]] = [] |
| next_token_index = 0 |
|
|
| |
| run_cache: Dict[Tuple[str, str], str] = {} |
|
|
| |
| pending_logins: Dict[str, Dict[str, Any]] = {} |
|
|
| C = { |
| "R": "\033[0m", "B": "\033[1m", "G": "\033[32m", |
| "Y": "\033[33m", "E": "\033[31m", "C": "\033[36m", "D": "\033[90m", |
| } |
|
|
|
|
| def log(msg: str, t: str = "info") -> None: |
| c = {"success": C["G"], "error": C["E"], "warn": C["Y"]}.get(t, C["C"]) |
| icon = {"success": "✓", "error": "✗", "warn": "⚠"}.get(t, "ℹ") |
| print(f"{c}{icon}{C['R']} {msg}", flush=True) |
|
|
|
|
| def token_fingerprint(auth_token: str) -> str: |
| if not auth_token: |
| return "none" |
| if len(auth_token) < 12: |
| return auth_token[:3] + "..." |
| return f"{auth_token[:6]}...{auth_token[-4:]}" |
|
|
|
|
| def generate_fingerprint_id() -> str: |
| chars = string.ascii_lowercase + string.digits |
| return f"codebuff-cli-{''.join(random.choices(chars, k=26))}" |
|
|
|
|
| def get_config_paths() -> Tuple[Path, Path]: |
| |
| if Path("/data").exists() and os.access("/data", os.W_OK): |
| config_dir = Path("/data") / "manicode" |
| return config_dir, config_dir / "credentials.json" |
|
|
| home = Path.home() |
| if platform.system() == "Windows": |
| config_dir = Path(os.environ.get("APPDATA", str(home))) / "manicode" |
| else: |
| config_dir = home / ".config" / "manicode" |
| return config_dir, config_dir / "credentials.json" |
|
|
|
|
| def normalize_accounts(creds: Any) -> List[Dict[str, Any]]: |
| """兼容旧格式(default)和新格式(accounts)。""" |
| if not isinstance(creds, dict): |
| return [] |
|
|
| accounts: List[Dict[str, Any]] = [] |
|
|
| raw_accounts = creds.get("accounts") |
| if isinstance(raw_accounts, list): |
| for entry in raw_accounts: |
| if isinstance(entry, dict) and entry.get("authToken"): |
| accounts.append({ |
| "id": entry.get("id"), |
| "name": entry.get("name") or "unknown", |
| "email": entry.get("email") or "unknown", |
| "authToken": entry.get("authToken"), |
| "credits": entry.get("credits", 0), |
| }) |
|
|
| default_entry = creds.get("default") |
| if isinstance(default_entry, dict) and default_entry.get("authToken"): |
| default_token = default_entry.get("authToken") |
| exists = any(acc.get("authToken") == default_token for acc in accounts) |
| if not exists: |
| accounts.insert(0, { |
| "id": default_entry.get("id"), |
| "name": default_entry.get("name") or "default", |
| "email": default_entry.get("email") or "unknown", |
| "authToken": default_token, |
| "credits": default_entry.get("credits", 0), |
| }) |
|
|
| return accounts |
|
|
|
|
| def load_accounts() -> List[Dict[str, Any]]: |
| _, creds_path = get_config_paths() |
| disk_accounts: List[Dict[str, Any]] = [] |
|
|
| if creds_path.exists(): |
| try: |
| creds = json.loads(creds_path.read_text(encoding="utf-8")) |
| disk_accounts = normalize_accounts(creds) |
| except Exception as e: |
| log(f"读取本地凭据失败: {e}", "warn") |
|
|
| |
| env_accounts: List[Dict[str, Any]] = [] |
| raw_env = os.environ.get("ACCOUNTS_JSON", "").strip() |
| if raw_env: |
| try: |
| parsed = json.loads(raw_env) |
| if isinstance(parsed, list): |
| env_accounts = [ |
| { |
| "id": entry.get("id"), |
| "name": entry.get("name") or "unknown", |
| "email": entry.get("email") or "unknown", |
| "authToken": entry.get("authToken"), |
| "credits": entry.get("credits", 0), |
| } |
| for entry in parsed |
| if isinstance(entry, dict) and entry.get("authToken") |
| ] |
| elif isinstance(parsed, dict): |
| env_accounts = normalize_accounts(parsed) |
| except Exception as e: |
| log(f"解析 ACCOUNTS_JSON 失败: {e}", "warn") |
|
|
| merged: List[Dict[str, Any]] = [] |
| seen = set() |
| for acc in disk_accounts + env_accounts: |
| token = acc.get("authToken") |
| if token and token not in seen: |
| seen.add(token) |
| merged.append(acc) |
| return merged |
|
|
|
|
| def save_accounts(accounts: List[Dict[str, Any]]) -> None: |
| config_dir, creds_path = get_config_paths() |
| config_dir.mkdir(parents=True, exist_ok=True) |
|
|
| if not accounts: |
| return |
|
|
| |
| data = { |
| "default": { |
| "id": accounts[0].get("id"), |
| "name": accounts[0].get("name"), |
| "email": accounts[0].get("email"), |
| "authToken": accounts[0].get("authToken"), |
| "credits": accounts[0].get("credits", 0), |
| }, |
| "accounts": accounts, |
| } |
| creds_path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") |
|
|
|
|
| async def warm_account_default_agent(session: aiohttp.ClientSession, account: Dict[str, Any]) -> Optional[str]: |
| auth_token = account.get("authToken") |
| if not auth_token: |
| return None |
| default_agent = MODEL_TO_AGENT.get(default_model, "base2-free") |
| try: |
| run_id = await create_agent_run(session, auth_token, default_agent) |
| run_cache[get_run_cache_key(auth_token, default_agent)] = run_id |
| return run_id |
| except Exception as e: |
| log( |
| f"预热失败: user={account.get('name')}, token={token_fingerprint(auth_token)}, err={e}", |
| "warn", |
| ) |
| return None |
|
|
|
|
| def append_account(user_obj: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: |
| """把新登录账号追加到 token 池并持久化(按 authToken 去重)。""" |
| global token_pool |
|
|
| new_acc = { |
| "id": user_obj.get("id"), |
| "name": user_obj.get("name") or "unknown", |
| "email": user_obj.get("email") or "unknown", |
| "authToken": user_obj.get("authToken") or user_obj.get("auth_token"), |
| "credits": user_obj.get("credits", 0), |
| } |
|
|
| if not new_acc["authToken"]: |
| raise RuntimeError("登录返回中缺少 authToken") |
|
|
| existing = {acc.get("authToken") for acc in token_pool} |
| if new_acc["authToken"] not in existing: |
| token_pool.append(new_acc) |
| save_accounts(token_pool) |
| return True, new_acc |
| return False, new_acc |
|
|
|
|
| def pick_next_account() -> Tuple[Dict[str, Any], int]: |
| """轮询选择下一个账号。""" |
| global next_token_index |
|
|
| if not token_pool: |
| raise RuntimeError("没有可用 token,请先在管理页添加至少一个账号") |
|
|
| idx = next_token_index % len(token_pool) |
| next_token_index = (next_token_index + 1) % len(token_pool) |
| return token_pool[idx], idx |
|
|
|
|
| def get_run_cache_key(auth_token: str, agent_id: str) -> Tuple[str, str]: |
| return auth_token, agent_id |
|
|
|
|
| |
|
|
| async def api_request( |
| session: aiohttp.ClientSession, |
| hostname: str, |
| path: str, |
| body: Optional[Dict[str, Any]] = None, |
| auth_token: Optional[str] = None, |
| method: str = "POST", |
| ) -> Dict[str, Any]: |
| url = f"https://{hostname}{path}" |
| headers = { |
| "Content-Type": "application/json", |
| "Accept": "application/json", |
| "User-Agent": "freebuff-proxy/1.0", |
| } |
| if auth_token: |
| headers["Authorization"] = f"Bearer {auth_token}" |
|
|
| kwargs: Dict[str, Any] = { |
| "headers": headers, |
| "timeout": aiohttp.ClientTimeout(total=30), |
| } |
| if body is not None and method.upper() != "GET": |
| kwargs["json"] = body |
|
|
| async with session.request(method.upper(), url, **kwargs) as resp: |
| try: |
| data = await resp.json() |
| except Exception: |
| data = await resp.text() |
| return {"status": resp.status, "data": data} |
|
|
|
|
| |
|
|
| async def start_login_flow(session: aiohttp.ClientSession) -> Dict[str, Any]: |
| fp_id = generate_fingerprint_id() |
| res = await api_request(session, "freebuff.com", "/api/auth/cli/code", {"fingerprintId": fp_id}) |
| if res["status"] != 200 or "loginUrl" not in res["data"]: |
| raise RuntimeError(f"获取登录 URL 失败: {res['data']}") |
|
|
| d = res["data"] |
| login_id = uuid.uuid4().hex |
| pending_logins[login_id] = { |
| "fingerprintId": fp_id, |
| "fingerprintHash": d["fingerprintHash"], |
| "expiresAt": d["expiresAt"], |
| "loginUrl": d["loginUrl"], |
| "createdAt": time.time(), |
| } |
| return { |
| "loginId": login_id, |
| "fingerprintId": fp_id, |
| "fingerprintHash": d["fingerprintHash"], |
| "expiresAt": d["expiresAt"], |
| "loginUrl": d["loginUrl"], |
| } |
|
|
|
|
| async def poll_login_status(session: aiohttp.ClientSession, login_id: str) -> Dict[str, Any]: |
| pending = pending_logins.get(login_id) |
| if not pending: |
| raise RuntimeError("loginId 不存在或已过期") |
|
|
| |
| if time.time() - float(pending.get("createdAt", 0)) > TIMEOUT_S: |
| pending_logins.pop(login_id, None) |
| raise RuntimeError("登录已超时,请重新发起") |
|
|
| path = ( |
| f"/api/auth/cli/status?fingerprintId={quote(str(pending['fingerprintId']))}" |
| f"&fingerprintHash={quote(str(pending['fingerprintHash']))}" |
| f"&expiresAt={quote(str(pending['expiresAt']))}" |
| ) |
| sr = await api_request(session, "freebuff.com", path, method="GET") |
| if sr["status"] == 200 and isinstance(sr["data"], dict) and "user" in sr["data"]: |
| user = sr["data"]["user"] |
| added, new_acc = append_account(user) |
| pending_logins.pop(login_id, None) |
| await warm_account_default_agent(session, new_acc) |
| return { |
| "status": "success", |
| "added": added, |
| "user": { |
| "id": new_acc.get("id"), |
| "name": new_acc.get("name"), |
| "email": new_acc.get("email"), |
| "credits": new_acc.get("credits", 0), |
| "token": token_fingerprint(new_acc.get("authToken", "")), |
| }, |
| } |
|
|
| return { |
| "status": "pending", |
| "message": "尚未完成登录,请在新标签页完成授权后继续轮询", |
| "upstream": sr.get("data"), |
| } |
|
|
|
|
| |
|
|
| async def create_agent_run(session: aiohttp.ClientSession, auth_token: str, agent_id: str) -> str: |
| t = time.time() |
| res = await api_request( |
| session, |
| API_BASE, |
| "/api/v1/agent-runs", |
| {"action": "START", "agentId": agent_id}, |
| auth_token, |
| ) |
| ms = int((time.time() - t) * 1000) |
| if res["status"] != 200 or "runId" not in res["data"]: |
| raise RuntimeError(f"创建 Agent Run 失败: {json.dumps(res['data'], ensure_ascii=False)}") |
| log(f"创建新 Agent Run: {res['data']['runId']} (耗时 {ms}ms, token={token_fingerprint(auth_token)})") |
| return res["data"]["runId"] |
|
|
|
|
| async def get_or_create_agent_run(session: aiohttp.ClientSession, auth_token: str, agent_id: str) -> str: |
| key = get_run_cache_key(auth_token, agent_id) |
| run_id = run_cache.get(key) |
| if run_id: |
| return run_id |
|
|
| run_id = await create_agent_run(session, auth_token, agent_id) |
| run_cache[key] = run_id |
| return run_id |
|
|
|
|
| async def finish_agent_run(session: aiohttp.ClientSession, auth_token: str, run_id: str) -> None: |
| await api_request(session, API_BASE, "/api/v1/agent-runs", { |
| "action": "FINISH", "runId": run_id, "status": "completed", |
| "totalSteps": 1, "directCredits": 0, "totalCredits": 0, |
| }, auth_token) |
|
|
|
|
| def make_freebuff_body(openai_body: Dict[str, Any], run_id: str) -> Dict[str, Any]: |
| body = dict(openai_body) |
| body["codebuff_metadata"] = { |
| "run_id": run_id, |
| "client_id": f"freebuff-proxy-{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}", |
| "cost_mode": "free", |
| } |
| return body |
|
|
|
|
| def build_openai_response(run_id: str, model: str, choice_data: Dict[str, Any], usage_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: |
| choice = choice_data or {} |
| message = choice.get("message", {}) |
| resp = { |
| "id": f"freebuff-{run_id}", |
| "object": "chat.completion", |
| "created": int(time.time()), |
| "model": model, |
| "choices": [{ |
| "index": 0, |
| "message": { |
| "role": "assistant", |
| "content": message.get("content", ""), |
| }, |
| "finish_reason": choice.get("finish_reason", "stop"), |
| }], |
| "usage": { |
| "prompt_tokens": (usage_data or {}).get("prompt_tokens", 0), |
| "completion_tokens": (usage_data or {}).get("completion_tokens", 0), |
| "total_tokens": (usage_data or {}).get("total_tokens", 0), |
| }, |
| } |
| if message.get("tool_calls"): |
| resp["choices"][0]["message"]["tool_calls"] = message["tool_calls"] |
| return resp |
|
|
|
|
| |
|
|
| async def stream_to_openai_format( |
| session: aiohttp.ClientSession, |
| freebuff_body: Dict[str, Any], |
| auth_token: str, |
| response: web.StreamResponse, |
| model: str, |
| ) -> None: |
| url = f"https://{API_BASE}/api/v1/chat/completions" |
| headers = { |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {auth_token}", |
| "Accept": "text/event-stream", |
| "User-Agent": "freebuff-proxy/1.0", |
| } |
| response_id = f"freebuff-{int(time.time() * 1000)}" |
| finish_reason = "stop" |
|
|
| timeout = aiohttp.ClientTimeout(total=120) |
| async with session.post(url, json=freebuff_body, headers=headers, timeout=timeout) as resp: |
| if resp.status != 200: |
| err = await resp.text() |
| raise RuntimeError(f"HTTP {resp.status}: {err}") |
|
|
| buffer = "" |
| async for chunk in resp.content.iter_any(): |
| buffer += chunk.decode("utf-8", errors="replace") |
| lines = buffer.split("\n") |
| buffer = lines.pop() |
|
|
| for line in lines: |
| trimmed = line.strip() |
| if not trimmed or not trimmed.startswith("data: "): |
| continue |
| json_str = trimmed[6:].strip() |
| if json_str == "[DONE]": |
| await response.write(b"data: [DONE]\n\n") |
| continue |
| try: |
| parsed = json.loads(json_str) |
| delta = (parsed.get("choices") or [{}])[0].get("delta", {}) |
| cfr = (parsed.get("choices") or [{}])[0].get("finish_reason") |
| if cfr: |
| finish_reason = cfr |
|
|
| delta_obj: Dict[str, Any] = {} |
| if delta.get("content"): |
| delta_obj["content"] = delta["content"] |
| if delta.get("tool_calls"): |
| delta_obj["tool_calls"] = delta["tool_calls"] |
| if delta.get("role"): |
| delta_obj["role"] = delta["role"] |
|
|
| if delta_obj: |
| openai_chunk = { |
| "id": response_id, |
| "object": "chat.completion.chunk", |
| "created": int(time.time()), |
| "model": model, |
| "choices": [{"index": 0, "delta": delta_obj, "finish_reason": None}], |
| } |
| await response.write(f"data: {json.dumps(openai_chunk, ensure_ascii=False)}\n\n".encode()) |
| except Exception: |
| pass |
|
|
| final_chunk = { |
| "id": response_id, |
| "object": "chat.completion.chunk", |
| "created": int(time.time()), |
| "model": model, |
| "choices": [{"index": 0, "delta": {}, "finish_reason": finish_reason}], |
| } |
| await response.write(f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n".encode()) |
| await response.write(b"data: [DONE]\n\n") |
| await response.write_eof() |
|
|
|
|
| |
|
|
| @web.middleware |
| async def auth_middleware(request: web.Request, handler): |
| """当 PROXY_API_KEY 非空时,对 /v1/* 路由强制验证 Bearer token。""" |
| if PROXY_API_KEY and request.path.startswith("/v1/"): |
| auth_header = request.headers.get("Authorization", "") |
| token = auth_header[7:] if auth_header.startswith("Bearer ") else "" |
| if token != PROXY_API_KEY: |
| log(f"鉴权失败: path={request.path}, ip={request.remote}", "warn") |
| return web.json_response( |
| {"error": {"message": "Incorrect API key provided", "type": "invalid_request_error", "code": "invalid_api_key"}}, |
| status=401, |
| ) |
| return await handler(request) |
|
|
|
|
| def require_admin(request: web.Request) -> Optional[web.Response]: |
| if not ADMIN_PASSWORD: |
| return None |
| supplied = request.headers.get("X-Admin-Password") or request.query.get("password") or "" |
| if supplied != ADMIN_PASSWORD: |
| return web.json_response({"error": {"message": "Admin password required"}}, status=401) |
| return None |
|
|
|
|
| def get_frontend_gate_password() -> str: |
| return FRONTEND_PASSWORD or ADMIN_PASSWORD |
|
|
|
|
| def is_frontend_allowed(request: web.Request) -> bool: |
| gate_password = get_frontend_gate_password() |
| if not gate_password: |
| return True |
| return request.cookies.get(FRONTEND_COOKIE_NAME) == gate_password |
|
|
|
|
| def frontend_gate_page(error: str = "") -> str: |
| escaped_error = quote(error) if error else "" |
| return f"""<!doctype html> |
| <html lang="zh-CN"> |
| <head> |
| <meta charset="utf-8" /> |
| <meta name="viewport" content="width=device-width, initial-scale=1" /> |
| <title>进入管理页</title> |
| <style> |
| body {{ font-family: -apple-system, BlinkMacSystemFont, Segoe UI, sans-serif; margin: 0; min-height: 100vh; display: grid; place-items: center; background: #0b1020; color: #e8eefc; }} |
| .card {{ width: min(92vw, 420px); background: #141b34; border: 1px solid #2a355e; border-radius: 18px; padding: 24px; box-shadow: 0 20px 60px rgba(0,0,0,.35); }} |
| h1 {{ margin: 0 0 12px; font-size: 24px; }} |
| p {{ color: #9fb0dd; line-height: 1.6; }} |
| input, button {{ width: 100%; box-sizing: border-box; border-radius: 12px; border: 1px solid #3a4b80; background: #0d1430; color: #fff; padding: 12px; font-size: 14px; }} |
| button {{ margin-top: 12px; cursor: pointer; background: #3451ff; border: none; font-weight: 700; }} |
| .err {{ margin-top: 12px; color: #ff9d9d; min-height: 20px; }} |
| .hint {{ font-size: 13px; }} |
| </style> |
| </head> |
| <body> |
| <form class="card" method="post" action="/gate/login"> |
| <h1>进入管理页</h1> |
| <p>这个页面已加简单门禁。输入访问密码后才会展示前端管理面板。</p> |
| <input type="password" name="password" placeholder="请输入访问密码" autofocus /> |
| <button type="submit">进入</button> |
| <div class="err" id="err">{error}</div> |
| <p class="hint">默认优先读取 <code>FRONTEND_PASSWORD</code>;如果没设置,会回退使用 <code>ADMIN_PASSWORD</code>。</p> |
| </form> |
| </body> |
| </html> |
| """ |
|
|
|
|
| |
|
|
|
|
| def _responses_parse_input(data: Dict[str, Any]) -> List[Dict[str, Any]]: |
| """将 Responses API 的 input 字段转换为 OpenAI messages 列表。""" |
| raw_input = data.get("input", "") |
| instructions = data.get("instructions", "") |
| messages: List[Dict[str, Any]] = [] |
|
|
| if instructions: |
| messages.append({"role": "system", "content": instructions}) |
|
|
| if isinstance(raw_input, str): |
| if raw_input: |
| messages.append({"role": "user", "content": raw_input}) |
| elif isinstance(raw_input, list): |
| for item in raw_input: |
| if isinstance(item, str): |
| messages.append({"role": "user", "content": item}) |
| elif isinstance(item, dict): |
| role = item.get("role", "user") |
| content = item.get("content", "") |
| item_type = item.get("type", "") |
| if item_type == "function_call_output": |
| messages.append({ |
| "role": "tool", |
| "tool_call_id": item.get("call_id", ""), |
| "content": item.get("output", ""), |
| }) |
| elif role in ("user", "assistant", "system", "developer"): |
| if role == "developer": |
| role = "system" |
| if isinstance(content, list): |
| text_parts: List[str] = [] |
| for part in content: |
| if isinstance(part, dict): |
| if part.get("type") in ("input_text", "text"): |
| text_parts.append(part.get("text", "")) |
| elif isinstance(part, str): |
| text_parts.append(part) |
| content = "".join(text_parts) |
| messages.append({"role": role, "content": content}) |
| return messages |
|
|
|
|
|
|
| def _responses_make_base(resp_id: str, model: str, created: float, |
| instructions: Optional[str] = None, status: str = "completed") -> Dict[str, Any]: |
| return { |
| "id": resp_id, |
| "object": "response", |
| "created_at": created, |
| "status": status, |
| "model": model, |
| "output": [], |
| "parallel_tool_calls": True, |
| "tool_choice": "auto", |
| "tools": [], |
| "temperature": 1.0, |
| "top_p": 1.0, |
| "max_output_tokens": None, |
| "truncation": "disabled", |
| "instructions": instructions, |
| "metadata": {}, |
| "incomplete_details": None, |
| "error": None, |
| "usage": None, |
| } |
|
|
|
|
| |
|
|
| async def handle_chat_completion(request: web.Request) -> web.StreamResponse: |
| start = time.time() |
| session: aiohttp.ClientSession = request.app["client_session"] |
|
|
| try: |
| body = await request.json() |
| except Exception: |
| return web.json_response({"error": {"message": "Invalid JSON body"}}, status=400) |
|
|
| model = body.get("model", default_model) |
| agent_id = MODEL_TO_AGENT.get(model, "base2-free") |
|
|
| try: |
| account, account_idx = pick_next_account() |
| auth_token = account["authToken"] |
| except Exception as e: |
| return web.json_response({"error": {"message": str(e)}}, status=503) |
|
|
| log( |
| "收到请求: " |
| f"model={model}, messages={len(body.get('messages', []))}, stream={body.get('stream', False)}, " |
| f"account_index={account_idx}, user={account.get('name')}, token={token_fingerprint(auth_token)}" |
| ) |
|
|
| try: |
| run_id = await get_or_create_agent_run(session, auth_token, agent_id) |
| except Exception as e: |
| return web.json_response({"error": {"message": str(e)}}, status=500) |
|
|
| fb_body = make_freebuff_body(body, run_id) |
|
|
| try: |
| if body.get("stream"): |
| response = web.StreamResponse( |
| status=200, |
| headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive"}, |
| ) |
| await response.prepare(request) |
| await stream_to_openai_format(session, fb_body, auth_token, response, model) |
| log(f"请求完成,总耗时 {int((time.time() - start) * 1000)}ms", "success") |
| return response |
|
|
| res = await api_request(session, API_BASE, "/api/v1/chat/completions", fb_body, auth_token) |
| if res["status"] == 200: |
| choice = (res["data"].get("choices") or [{}])[0] |
| resp = build_openai_response(run_id, model, choice, res["data"].get("usage")) |
| log(f"请求完成,总耗时 {int((time.time() - start) * 1000)}ms", "success") |
| return web.json_response(resp) |
| if res["status"] in (400, 404): |
| log("Agent Run 失效,重新创建...", "warn") |
| run_cache.pop(get_run_cache_key(auth_token, agent_id), None) |
| run_id = await get_or_create_agent_run(session, auth_token, agent_id) |
| fb_body["codebuff_metadata"]["run_id"] = run_id |
| retry = await api_request(session, API_BASE, "/api/v1/chat/completions", fb_body, auth_token) |
| if retry["status"] == 200: |
| choice = (retry["data"].get("choices") or [{}])[0] |
| resp = build_openai_response(run_id, model, choice, retry["data"].get("usage")) |
| log(f"重试成功,总耗时 {int((time.time() - start) * 1000)}ms", "success") |
| return web.json_response(resp) |
| return web.json_response({"error": {"message": retry["data"]}}, status=retry["status"]) |
| return web.json_response({"error": {"message": res["data"]}}, status=res["status"]) |
| except Exception as e: |
| log(f"请求失败: {e}", "error") |
| return web.json_response({"error": {"message": str(e)}}, status=500) |
|
|
|
|
| async def handle_responses(request: web.Request) -> web.StreamResponse: |
| """OpenAI Responses API 兼容端点 /v1/responses。""" |
| start = time.time() |
| session: aiohttp.ClientSession = request.app["client_session"] |
|
|
| try: |
| data = await request.json() |
| except Exception: |
| return web.json_response({"error": {"message": "Invalid JSON body"}}, status=400) |
|
|
| model = data.get("model", default_model) |
| stream = data.get("stream", False) |
| instructions = data.get("instructions") |
| agent_id = MODEL_TO_AGENT.get(model, "base2-free") |
|
|
| messages = _responses_parse_input(data) |
| if not messages: |
| return web.json_response({"error": {"message": "input is required", "type": "invalid_request_error"}}, status=400) |
|
|
| try: |
| account, account_idx = pick_next_account() |
| auth_token = account["authToken"] |
| except Exception as e: |
| return web.json_response({"error": {"message": str(e)}}, status=503) |
|
|
| log( |
| f"收到 Responses API 请求: model={model}, msgs={len(messages)}, stream={stream}, " |
| f"account_index={account_idx}, user={account.get('name')}, token={token_fingerprint(auth_token)}" |
| ) |
|
|
| try: |
| run_id = await get_or_create_agent_run(session, auth_token, agent_id) |
| except Exception as e: |
| return web.json_response({"error": {"message": str(e)}}, status=500) |
|
|
| fb_body = make_freebuff_body({"model": model, "messages": messages, "stream": True}, run_id) |
|
|
| resp_id = f"resp_{int(time.time() * 1000)}" |
| msg_id = f"msg_{int(time.time() * 1000)}" |
| created = time.time() |
| msg_chars = sum(len(str(m.get("content", ""))) for m in messages) |
|
|
| url = f"https://{API_BASE}/api/v1/chat/completions" |
| headers = { |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {auth_token}", |
| "Accept": "text/event-stream", |
| "User-Agent": "freebuff-proxy/1.0", |
| } |
|
|
| try: |
| if stream: |
| response = web.StreamResponse( |
| status=200, |
| headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache", "Connection": "keep-alive"}, |
| ) |
| await response.prepare(request) |
|
|
| timeout = aiohttp.ClientTimeout(total=120) |
| async with session.post(url, json=fb_body, headers=headers, timeout=timeout) as upstream_resp: |
| if upstream_resp.status != 200: |
| err = await upstream_resp.text() |
| raise RuntimeError(f"HTTP {upstream_resp.status}: {err}") |
|
|
| seq = 0 |
| full_text_parts: List[str] = [] |
| base = _responses_make_base(resp_id, model, created, instructions, "in_progress") |
|
|
| async def emit(event_type: str, payload: Dict[str, Any]) -> None: |
| nonlocal seq |
| data_str = json.dumps({"type": event_type, "sequence_number": seq, **payload}, ensure_ascii=False) |
| await response.write(f"event: {event_type}\ndata: {data_str}\n\n".encode()) |
| seq += 1 |
|
|
| await emit("response.created", {"response": base}) |
| await emit("response.in_progress", {"response": base}) |
|
|
| item_skeleton = {"id": msg_id, "type": "message", "role": "assistant", "status": "in_progress", "content": []} |
| await emit("response.output_item.added", {"output_index": 0, "item": item_skeleton}) |
|
|
| part_skeleton = {"type": "output_text", "text": "", "annotations": []} |
| await emit("response.content_part.added", {"item_id": msg_id, "output_index": 0, "content_index": 0, "part": part_skeleton}) |
|
|
| buffer = "" |
| async for chunk in upstream_resp.content.iter_any(): |
| buffer += chunk.decode("utf-8", errors="replace") |
| lines = buffer.split("\n") |
| buffer = lines.pop() |
| for line in lines: |
| trimmed = line.strip() |
| if not trimmed or not trimmed.startswith("data: "): |
| continue |
| json_str = trimmed[6:].strip() |
| if json_str == "[DONE]": |
| continue |
| try: |
| parsed = json.loads(json_str) |
| delta_content = (parsed.get("choices") or [{}])[0].get("delta", {}).get("content", "") |
| if delta_content: |
| full_text_parts.append(delta_content) |
| await emit("response.output_text.delta", { |
| "item_id": msg_id, |
| "output_index": 0, |
| "content_index": 0, |
| "delta": delta_content, |
| }) |
| except Exception: |
| pass |
|
|
| full_text = "".join(full_text_parts) |
|
|
| await emit("response.output_text.done", { |
| "item_id": msg_id, |
| "output_index": 0, |
| "content_index": 0, |
| "text": full_text, |
| }) |
|
|
| done_part = {"type": "output_text", "text": full_text, "annotations": []} |
| await emit("response.content_part.done", { |
| "item_id": msg_id, |
| "output_index": 0, |
| "content_index": 0, |
| "part": done_part, |
| }) |
|
|
| done_item = {"id": msg_id, "type": "message", "role": "assistant", "status": "completed", "content": [done_part]} |
| await emit("response.output_item.done", {"output_index": 0, "item": done_item}) |
|
|
| usage = { |
| "input_tokens": msg_chars // 4, |
| "input_tokens_details": {"cached_tokens": 0}, |
| "output_tokens": len(full_text) // 4, |
| "output_tokens_details": {"reasoning_tokens": 0}, |
| "total_tokens": (msg_chars + len(full_text)) // 4, |
| } |
| final = _responses_make_base(resp_id, model, created, instructions, "completed") |
| final["output"] = [done_item] |
| final["usage"] = usage |
| await emit("response.completed", {"response": final}) |
|
|
| await response.write_eof() |
| log(f"Responses API 请求完成,总耗时 {int((time.time() - start) * 1000)}ms", "success") |
| return response |
|
|
| timeout = aiohttp.ClientTimeout(total=120) |
| content_parts: List[str] = [] |
| async with session.post(url, json=fb_body, headers=headers, timeout=timeout) as upstream_resp: |
| if upstream_resp.status != 200: |
| err = await upstream_resp.text() |
| if upstream_resp.status in (400, 404): |
| log("Responses API: Agent Run 失效,重新创建...", "warn") |
| run_cache.pop(get_run_cache_key(auth_token, agent_id), None) |
| run_id = await get_or_create_agent_run(session, auth_token, agent_id) |
| fb_body["codebuff_metadata"]["run_id"] = run_id |
| async with session.post(url, json=fb_body, headers=headers, timeout=timeout) as retry_resp: |
| if retry_resp.status != 200: |
| raise RuntimeError(f"HTTP {retry_resp.status}: {await retry_resp.text()}") |
| buffer = "" |
| async for chunk in retry_resp.content.iter_any(): |
| buffer += chunk.decode("utf-8", errors="replace") |
| for line in buffer.split("\n"): |
| trimmed = line.strip() |
| if not trimmed or not trimmed.startswith("data: "): |
| continue |
| json_str = trimmed[6:].strip() |
| if json_str == "[DONE]": |
| continue |
| try: |
| parsed = json.loads(json_str) |
| c = (parsed.get("choices") or [{}])[0].get("delta", {}).get("content", "") |
| if c: |
| content_parts.append(c) |
| except Exception: |
| pass |
| else: |
| raise RuntimeError(f"HTTP {upstream_resp.status}: {err}") |
| else: |
| buffer = "" |
| async for chunk in upstream_resp.content.iter_any(): |
| buffer += chunk.decode("utf-8", errors="replace") |
| for line in buffer.split("\n"): |
| trimmed = line.strip() |
| if not trimmed or not trimmed.startswith("data: "): |
| continue |
| json_str = trimmed[6:].strip() |
| if json_str == "[DONE]": |
| continue |
| try: |
| parsed = json.loads(json_str) |
| c = (parsed.get("choices") or [{}])[0].get("delta", {}).get("content", "") |
| if c: |
| content_parts.append(c) |
| except Exception: |
| pass |
|
|
| full_text = "".join(content_parts) |
| output_item = { |
| "id": msg_id, "type": "message", "role": "assistant", "status": "completed", |
| "content": [{"type": "output_text", "text": full_text, "annotations": []}], |
| } |
| usage = { |
| "input_tokens": msg_chars // 4, |
| "input_tokens_details": {"cached_tokens": 0}, |
| "output_tokens": len(full_text) // 4, |
| "output_tokens_details": {"reasoning_tokens": 0}, |
| "total_tokens": (msg_chars + len(full_text)) // 4, |
| } |
| result = _responses_make_base(resp_id, model, created, instructions, "completed") |
| result["output"] = [output_item] |
| result["usage"] = usage |
| log(f"Responses API 请求完成,总耗时 {int((time.time() - start) * 1000)}ms", "success") |
| return web.json_response(result) |
|
|
| except Exception as e: |
| log(f"Responses API 请求失败: {e}", "error") |
| return web.json_response({"error": {"message": str(e)}}, status=500) |
|
|
|
|
| async def handle_models(request: web.Request) -> web.Response: |
| models = [{"id": m, "object": "model", "created": 1700000000, "owned_by": "freebuff"} for m in MODEL_TO_AGENT] |
| return web.json_response({"object": "list", "data": models}) |
|
|
|
|
| async def handle_reset_run(request: web.Request) -> web.Response: |
| run_cache.clear() |
| log("Agent Run 缓存已清除") |
| return web.json_response({"status": "cleared"}) |
|
|
|
|
| async def handle_health(request: web.Request) -> web.Response: |
| config_dir, creds_path = get_config_paths() |
| account_brief = [ |
| { |
| "index": i, |
| "name": acc.get("name"), |
| "email": acc.get("email"), |
| "token": token_fingerprint(acc.get("authToken", "")), |
| } |
| for i, acc in enumerate(token_pool) |
| ] |
| return web.json_response({ |
| "status": "ok", |
| "model": default_model, |
| "accounts": account_brief, |
| "accountCount": len(token_pool), |
| "nextAccountIndex": next_token_index, |
| "cachedRunCount": len(run_cache), |
| "storage": { |
| "configDir": str(config_dir), |
| "credentialsFile": str(creds_path), |
| "persistentDataMounted": Path("/data").exists(), |
| }, |
| "security": { |
| "apiKeyEnabled": bool(PROXY_API_KEY), |
| "adminPasswordEnabled": bool(ADMIN_PASSWORD), |
| "frontendPasswordEnabled": bool(get_frontend_gate_password()), |
| }, |
| }) |
|
|
|
|
| async def handle_root(request: web.Request) -> web.Response: |
| if not is_frontend_allowed(request): |
| return web.Response(text=frontend_gate_page(), content_type="text/html") |
|
|
| html = f"""<!doctype html> |
| <html lang="zh-CN"> |
| <head> |
| <meta charset="utf-8" /> |
| <meta name="viewport" content="width=device-width, initial-scale=1" /> |
| <title>Freebuff OpenAI Proxy - HF Space</title> |
| <style> |
| body {{ font-family: -apple-system, BlinkMacSystemFont, Segoe UI, sans-serif; margin: 0; background: #0b1020; color: #e8eefc; }} |
| .wrap {{ max-width: 1100px; margin: 0 auto; padding: 28px; }} |
| .card {{ background: #141b34; border: 1px solid #2a355e; border-radius: 16px; padding: 18px; margin-bottom: 18px; }} |
| .row {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(240px, 1fr)); gap: 12px; }} |
| input, button, textarea {{ width: 100%; box-sizing: border-box; border-radius: 12px; border: 1px solid #3a4b80; background: #0d1430; color: #fff; padding: 12px; }} |
| button {{ cursor: pointer; background: #3451ff; border: none; font-weight: 700; }} |
| button.secondary {{ background: #273053; }} |
| a {{ color: #94c7ff; }} |
| code, pre {{ background: #091024; color: #d6e6ff; border-radius: 10px; }} |
| pre {{ padding: 14px; overflow: auto; white-space: pre-wrap; }} |
| .muted {{ color: #9fb0dd; }} |
| .ok {{ color: #8dffb2; }} |
| .warn {{ color: #ffd37a; }} |
| </style> |
| </head> |
| <body> |
| <div class="wrap"> |
| <div class="card"> |
| <div style="display:flex;justify-content:space-between;gap:12px;align-items:flex-start;flex-wrap:wrap;"> |
| <div> |
| <h1 style="margin:0 0 8px;">Freebuff OpenAI Proxy · Hugging Face Space</h1> |
| <p class="muted">保留原脚本的 OpenAI 兼容接口、Responses API、账号池轮询、Run 缓存、健康检查与重置能力,并把登录迁移到网页管理页。</p> |
| </div> |
| {('<form method="post" action="/gate/logout" style="margin:0;"><button class="secondary" style="width:auto;padding:10px 16px;">退出前端门禁</button></form>' if get_frontend_gate_password() else '')} |
| </div> |
| <div class="row"> |
| <div> |
| <strong>聊天接口</strong> |
| <pre>POST /v1/chat/completions</pre> |
| </div> |
| <div> |
| <strong>Responses API</strong> |
| <pre>POST /v1/responses</pre> |
| </div> |
| <div> |
| <strong>模型列表 / 健康检查</strong> |
| <pre>GET /v1/models |
| GET /health</pre> |
| </div> |
| </div> |
| </div> |
| |
| <div class="card"> |
| <h2>管理权限</h2> |
| <p class="muted">如果你设置了 <code>ADMIN_PASSWORD</code>,请先输入。它只用于管理页,不影响 /v1 接口。/v1 接口仍由 <code>API_KEY</code> 控制。</p> |
| <input id="adminPassword" type="password" placeholder="ADMIN_PASSWORD(可留空)" /> |
| </div> |
| |
| <div class="card"> |
| <h2>账号管理</h2> |
| <div class="row"> |
| <div><button onclick="startLogin()">1. 发起网页登录</button></div> |
| <div><button class="secondary" onclick="checkLogin()">2. 检查登录状态</button></div> |
| <div><button class="secondary" onclick="refreshHealth()">刷新状态</button></div> |
| </div> |
| <p class="muted">登录后账号会写入凭据文件。若 Space 挂载了持久化存储,会写入 <code>/data/manicode/credentials.json</code>。</p> |
| <pre id="loginBox">尚未发起登录。</pre> |
| </div> |
| |
| <div class="card"> |
| <h2>当前状态</h2> |
| <pre id="healthBox">加载中...</pre> |
| </div> |
| |
| <div class="card"> |
| <h2>调用示例</h2> |
| <pre id="curlBox">curl {(' -H "Authorization: Bearer ' + PROXY_API_KEY + '"') if PROXY_API_KEY else ''} \ |
| -H "Content-Type: application/json" \ |
| -X POST "./v1/chat/completions" \ |
| -d '{{ |
| "model": "{default_model}", |
| "messages": [{{"role": "user", "content": "你好"}}], |
| "stream": false |
| }}'</pre> |
| <p class="muted">在浏览器中用相对路径展示;实际接入请替换成你的 Space 域名。</p> |
| </div> |
| </div> |
| |
| <script> |
| let currentLoginId = localStorage.getItem('loginId') || ''; |
| const loginBox = document.getElementById('loginBox'); |
| const healthBox = document.getElementById('healthBox'); |
| const adminPassword = document.getElementById('adminPassword'); |
| adminPassword.value = localStorage.getItem('adminPassword') || ''; |
| adminPassword.addEventListener('input', () => localStorage.setItem('adminPassword', adminPassword.value)); |
| |
| function adminHeaders() {{ |
| const headers = {{ 'Content-Type': 'application/json' }}; |
| if (adminPassword.value) headers['X-Admin-Password'] = adminPassword.value; |
| return headers; |
| }} |
| |
| async function refreshHealth() {{ |
| const res = await fetch('/health'); |
| const data = await res.json(); |
| healthBox.textContent = JSON.stringify(data, null, 2); |
| }} |
| |
| async function startLogin() {{ |
| const res = await fetch('/admin/login/start', {{ method: 'POST', headers: adminHeaders() }}); |
| const data = await res.json(); |
| if (!res.ok) {{ |
| loginBox.textContent = JSON.stringify(data, null, 2); |
| return; |
| }} |
| currentLoginId = data.loginId; |
| localStorage.setItem('loginId', currentLoginId); |
| loginBox.innerHTML = `请在新标签页完成授权:\n\n${{JSON.stringify(data, null, 2)}}\n\n打开登录链接:\n${{data.loginUrl}}`; |
| window.open(data.loginUrl, '_blank'); |
| }} |
| |
| async function checkLogin() {{ |
| if (!currentLoginId) {{ |
| loginBox.textContent = '请先发起登录。'; |
| return; |
| }} |
| const url = '/admin/login/status?loginId=' + encodeURIComponent(currentLoginId) + (adminPassword.value ? ('&password=' + encodeURIComponent(adminPassword.value)) : ''); |
| const res = await fetch(url, {{ headers: adminHeaders() }}); |
| const data = await res.json(); |
| loginBox.textContent = JSON.stringify(data, null, 2); |
| await refreshHealth(); |
| }} |
| |
| refreshHealth(); |
| </script> |
| </body> |
| </html> |
| """ |
| return web.Response(text=html, content_type="text/html") |
|
|
|
|
| async def handle_frontend_login(request: web.Request) -> web.Response: |
| gate_password = get_frontend_gate_password() |
| if not gate_password: |
| raise web.HTTPFound("/") |
|
|
| data = await request.post() |
| supplied = (data.get("password") or "").strip() |
| if supplied != gate_password: |
| return web.Response(text=frontend_gate_page("密码错误,请重试"), content_type="text/html", status=401) |
|
|
| response = web.HTTPFound("/") |
| response.set_cookie( |
| FRONTEND_COOKIE_NAME, |
| gate_password, |
| max_age=7 * 24 * 60 * 60, |
| httponly=True, |
| samesite="Lax", |
| ) |
| raise response |
|
|
|
|
| async def handle_frontend_logout(request: web.Request) -> web.Response: |
| response = web.HTTPFound("/") |
| response.del_cookie(FRONTEND_COOKIE_NAME) |
| raise response |
|
|
|
|
| async def handle_admin_login_start(request: web.Request) -> web.Response: |
| denied = require_admin(request) |
| if denied: |
| return denied |
| session: aiohttp.ClientSession = request.app["client_session"] |
| try: |
| payload = await start_login_flow(session) |
| return web.json_response(payload) |
| except Exception as e: |
| return web.json_response({"error": {"message": str(e)}}, status=500) |
|
|
|
|
| async def handle_admin_login_status(request: web.Request) -> web.Response: |
| denied = require_admin(request) |
| if denied: |
| return denied |
| session: aiohttp.ClientSession = request.app["client_session"] |
| login_id = request.query.get("loginId", "") |
| if not login_id: |
| return web.json_response({"error": {"message": "loginId is required"}}, status=400) |
| try: |
| payload = await poll_login_status(session, login_id) |
| return web.json_response(payload) |
| except Exception as e: |
| return web.json_response({"error": {"message": str(e)}}, status=400) |
|
|
|
|
| |
|
|
| async def on_startup(app: web.Application) -> None: |
| global token_pool |
| session = aiohttp.ClientSession() |
| app["client_session"] = session |
|
|
| token_pool = load_accounts() |
| if token_pool: |
| log(f"已加载账号池: {len(token_pool)} 个") |
| for i, acc in enumerate(token_pool): |
| log( |
| f" [{i}] {acc.get('name')} <{acc.get('email')}> token={token_fingerprint(acc.get('authToken', ''))}" |
| ) |
|
|
| log("预热:为账号池创建默认 Agent Run...") |
| warmed = 0 |
| for acc in token_pool: |
| run_id = await warm_account_default_agent(session, acc) |
| if run_id: |
| warmed += 1 |
|
|
| if warmed: |
| log(f"预热完成,已缓存 {warmed} 个默认 Agent Run", "success") |
| else: |
| log("当前没有已预热账号;可在管理页登录新增账号", "warn") |
|
|
|
|
| async def on_cleanup(app: web.Application) -> None: |
| session: aiohttp.ClientSession = app["client_session"] |
| if run_cache: |
| log("关闭代理,结束全部缓存 Agent Run...") |
| for (auth_token, _agent_id), run_id in list(run_cache.items()): |
| try: |
| await finish_agent_run(session, auth_token, run_id) |
| except Exception: |
| pass |
| log("Agent Run 清理完成", "success") |
| await session.close() |
|
|
|
|
|
|
| def create_app() -> web.Application: |
| app = web.Application(middlewares=[auth_middleware], client_max_size=20 * 1024 * 1024) |
| app.on_startup.append(on_startup) |
| app.on_cleanup.append(on_cleanup) |
|
|
| app.router.add_get("/", handle_root) |
| app.router.add_post("/gate/login", handle_frontend_login) |
| app.router.add_post("/gate/logout", handle_frontend_logout) |
| app.router.add_get("/health", handle_health) |
| app.router.add_post("/v1/chat/completions", handle_chat_completion) |
| app.router.add_post("/v1/responses", handle_responses) |
| app.router.add_get("/v1/models", handle_models) |
| app.router.add_post("/v1/reset-run", handle_reset_run) |
| app.router.add_post("/admin/login/start", handle_admin_login_start) |
| app.router.add_get("/admin/login/status", handle_admin_login_status) |
| return app |
|
|
|
|
| if __name__ == "__main__": |
| app = create_app() |
| log(f"代理地址: http://{HOST}:{PORT}/v1/chat/completions") |
| log(f"Responses: http://{HOST}:{PORT}/v1/responses") |
| log(f"模型列表: http://{HOST}:{PORT}/v1/models") |
| log(f"重置缓存: http://{HOST}:{PORT}/v1/reset-run (POST)") |
| log(f"健康检查: http://{HOST}:{PORT}/health") |
| if PROXY_API_KEY: |
| log(f"API 鉴权: 已启用 (Bearer {token_fingerprint(PROXY_API_KEY)})", "success") |
| else: |
| log("API 鉴权: 未启用(所有 /v1 请求均可访问)", "warn") |
| if ADMIN_PASSWORD: |
| log("管理页鉴权: 已启用", "success") |
| else: |
| log("管理页鉴权: 未启用", "warn") |
| web.run_app(app, host=HOST, port=PORT) |
|
|