BirkhoffLee commited on
Commit
757f620
·
unverified ·
1 Parent(s): b2d8381

fix: 完成了第三阶段的修复与加强

Browse files
TODO.md DELETED
@@ -1,123 +0,0 @@
1
- # TODO: PDFMathTranslate-next 重构计划
2
-
3
- 目标:在不引入过度复杂度的前提下,把当前单体 `gateway.py` 拆成可维护的小块,前端交互更稳、更清晰,后端任务和计费语义更可控。
4
-
5
- 整个重构分三个阶段推进,每个阶段尽量做到“可随时上线”,避免大爆炸改动。
6
-
7
- ---
8
-
9
- ## 阶段 1:结构拆分(尽量零功能变更)
10
-
11
- ### 1.1 后端模块化
12
-
13
- - 在 `src/` 下拆出最小模块(文件名可根据实现微调):
14
- - `auth.py`:用户加载、密码校验、Session 生成/校验相关逻辑。
15
- - `storage.py`:`sqlite3` 初始化、连接管理、`_db_execute/_db_fetch*` 封装。
16
- - `jobs.py`:任务表相关的增删改查函数(创建任务、更新任务、按用户查询任务等)。
17
- - `billing.py`:计费记录写入与查询。
18
- - `proxy.py`:内部 OpenAI 代理、模型路由表、上游调用逻辑。
19
- - 调整 `gateway.py`:
20
- - 只保留 FastAPI `app` 实例、路由注册、`startup/shutdown` 钩子。
21
- - 业务实现从上述模块导入,避免在 `gateway.py` 内直接操作底层细节(例如直接写 SQL)。
22
-
23
- ### 1.2 前端模板与静态资源拆分
24
-
25
- - 创建前端资源目录(示例):
26
- - `src/web/templates/login.html`
27
- - `src/web/templates/dashboard.html`
28
- - `src/web/static/dashboard.js`
29
- - 将 `_LOGIN_HTML` 与 `_dashboard_page` 中的 HTML/CSS/JS 内容搬迁到对应文件:
30
- - 登录页:只保留错误提示占位符(例如 `__ERROR_BLOCK__`)由后端替换。
31
- - 控制台页:将 `<script>...</script>` 中 JS 拆到 `dashboard.js`,HTML 中只通过 `<script src="/static/dashboard.js"></script>` 引入。
32
- - 不引入模板引擎,保持 KISS:
33
- - 在后端用最简单的文件读取+`str.replace` 或 `format` 注入 `username/lang_in/lang_out`。
34
- - 为模板文件路径增加最小封装函数,例如 `load_template("dashboard.html")`。
35
-
36
- ### 1.3 HTTP 路由边界梳理
37
-
38
- - 显式整理路由分组(哪一块由哪个模块负责),避免日后继续往一个文件堆:
39
- - 认证相关:`/login`, `/logout`, `/_session` 类逻辑。
40
- - 页面渲染:`/`(Dashboard)、`/login` HTML。
41
- - 任务 API:`/api/jobs*`。
42
- - 计费 API:`/api/billing*`。
43
- - 内部 OpenAI API:`/internal/openai/v1/*`。
44
- - 在代码注释中说明“这些路由不保证向后兼容,仅用于当前 Space 原型”,避免后续误用。
45
-
46
- ---
47
-
48
- ## 阶段 2:任务状态机 + 实时刷新
49
-
50
- ### 2.1 后端任务状态机收敛
51
-
52
- - 将任务状态限定为有限集合(例如):`queued/running/succeeded/failed/cancelled`。
53
- - 抽象出状态迁移函数而不是在各处随意 `_update_job(status=...)`:
54
- - 示例:`transition_job(job_id, event, **kwargs)`,内部根据 `event` 决定可接受的状态变化并写库。
55
- - 常见事件:`queued`, `start`, `progress`, `finish_ok`, `finish_error`, `cancel_requested`, `cancel_immediate`, `restart_failed`。
56
- - 对非法状态迁移(例如从 `succeeded` 再切回 `running`)直接记录错误日志并拒绝更新,避免数据乱掉。
57
-
58
- ### 2.2 任务状态实时推送(SSE)
59
-
60
- - 新增 SSE 端点(示例):`GET /api/jobs/stream`:
61
- - 只推送当前登录用户的任务更新。
62
- - 事件载荷最小化:`{id, status, progress, message, updated_at, artifact_urls}`。
63
- - 考虑断线重连:可以用简单方案,从最新更新时间开始重拉最近 N 条记录补齐。
64
- - 任务执行逻辑中,在关键状态变更点调用“事件推送”钩子:
65
- - 入队、开始、进度更新、成功、失败、取消等。
66
- - 保留原有 `/api/jobs` 列表接口用于页面初次加载与 SSE 失败时的兜底。
67
-
68
- ### 2.3 前端状态机与 UI 行为统一
69
-
70
- - 在 `dashboard.js` 中定义单一状态映射表,例如:
71
- - `STATUS_CONFIG = { queued: {...}, running: {...}, succeeded: {...}, failed: {...}, cancelled: {...} }`
72
- - 每个状态包含:显示文案、颜色/样式、可用操作(是否允许“取消”、是否有下载按钮等)。
73
- - 将当前散落在 DOM 拼接里的逻辑统一改为基于 `STATUS_CONFIG` 渲染:
74
- - 状态文本:`statusText(status)`。
75
- - 按钮可见性:从 `STATUS_CONFIG[status].actions` 派生。
76
- - 行样式(例如失败高亮红色、运行中高亮)。
77
- - 使用 `EventSource` 订阅 `/api/jobs/stream`:
78
- - 收到事件后更新内存中的 `jobs` 映射,并局部更新 DOM,而不是整表重绘。
79
- - SSE 失败时回退到现有的轮询逻辑(例如每 10 秒调用 `/api/jobs`)。
80
- - 调整轮询频率:
81
- - 页面可见时降低频率或完全依赖 SSE。
82
- - 页面不可见(`document.hidden === true`)时进一步降频或暂停轮询。
83
-
84
- ---
85
-
86
- ## 阶段 3:上传限制 + 计费语义修正
87
-
88
- ### 3.1 上传与执行约束
89
-
90
- - 为上传文件增加硬性大小上限:
91
- - 定义常量或环境变量(如 `MAX_UPLOAD_MB`),默认例如 100MB。
92
- - 在 `api_create_job` 中根据 `Content-Length` 或实际读取的字节数做校验,超���返回 413 / 400。
93
- - 如可行,增加 PDF 页数或执行时长限制:
94
- - 在任务启动时尝试读取 PDF 元信息,如失败则只按大小限制。
95
- - 在 worker 中增加“最大执行时长”(例如 2 小时),超时标记为 `failed` 并附带错误信息。
96
-
97
- ### 3.2 计费模型与前端展示
98
-
99
- - 明确不同模型的计费行为:
100
- - 对可以从上游拿到 `usage` 的 OpenAI 风格模型,保持当前“按 tokens 计价”的策略。
101
- - 对 `SiliconFlowFree`/chatproxy 这类无法获取准确 usage 的模型:
102
- - 方案 A:如果上游按请求提供大致的 token 数或成本,则在 `_forward_to_chatproxy` 中解析并写入 `usage_records`。
103
- - 方案 B:如果无法获得 usage,就明确将该模型标记为“不计费模型”,后端不写 `usage_records` 或只写 0,并在前端加注释说明“本模型不产生可计费账单”。
104
- - 前端账单页面文案调整:
105
- - 在“我的账单”区域增加提示:哪些模型的账单是准确计费,哪些只是占位或不计费。
106
- - 对 cost 为 0 的记录,明确展示为 “$0.000000(不计费模型)” 或类似说明,避免用户误解。
107
-
108
- ### 3.3 内部代理观测性增强
109
-
110
- - 在 `/internal/openai/v1/chat/completions` 与上游调用逻辑中添加更细的日志:
111
- - 每次请求打出 request id(例如 `job_id+uuid`)、模型名、路由目标 URL、耗时、状态码。
112
- - 将 chatproxy 调用失败原因分类(网络错误 / 非 2xx / JSON 解析失败 / 缺少字段等),方便排查。
113
- - 对 OpenAI 上游响应中缺失 `usage` 字段的情况增加保护:
114
- - 避免因字段缺失导致异常中断,对缺失 usage 的请求使用安全默认值(0 或 `prompt+completion` 推算)。
115
-
116
- ---
117
-
118
- ## 建议执行顺序与发布策略
119
-
120
- - 阶段 1 完成后先发布一次,确保结构拆分无副作用,再继续后续阶段。
121
- - 阶段 2 先在后端实现 SSE 和状态机,再逐步切换前端逻辑到 SSE,保留轮询兜底。
122
- - 阶段 3 在引入限制和计费变更前,准备好简单的回滚方案(例如环境变量开关),以防止因阈值配置不当影响正常用户。
123
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/gateway.py CHANGED
@@ -49,6 +49,12 @@ DEFAULT_LANG_IN = os.environ.get("DEFAULT_LANG_IN", "en").strip()
49
  DEFAULT_LANG_OUT = os.environ.get("DEFAULT_LANG_OUT", "zh").strip()
50
  TRANSLATION_QPS = int(os.environ.get("TRANSLATION_QPS", "4"))
51
 
 
 
 
 
 
 
52
  logging.basicConfig(
53
  level=logging.INFO,
54
  format="%(asctime)s %(levelname)s %(name)s - %(message)s",
@@ -87,6 +93,76 @@ def _build_settings_for_job(row: sqlite3.Row) -> SettingsModel:
87
  return settings
88
 
89
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
  async def _run_single_job(job_id: str) -> None:
91
  row = jobs.get_job_row(job_id)
92
  if row is None:
@@ -117,62 +193,22 @@ async def _run_single_job(job_id: str) -> None:
117
 
118
  try:
119
  settings = _build_settings_for_job(row)
120
- async for event in do_translate_async_stream(settings, input_path):
121
- event_type = event.get("type")
122
- if event_type in {"progress_start", "progress_update", "progress_end"}:
123
- progress = float(event.get("overall_progress", 0.0))
124
- stage = event.get("stage", "")
125
- await _transition_and_notify(
126
- job_id,
127
- "progress",
128
- progress=max(0.0, min(100.0, progress)),
129
- message=f"{stage}" if stage else "Running",
130
- )
131
- elif event_type == "error":
132
- error_msg = str(event.get("error", "Unknown translation error"))
133
- await _transition_and_notify(
134
- job_id,
135
- "finish_error",
136
- error=error_msg,
137
- message="Translation failed",
138
- finished_at=storage.now_iso(),
139
- )
140
- return
141
- elif event_type == "finish":
142
- result = event.get("translate_result")
143
- mono_path = str(getattr(result, "mono_pdf_path", "") or "")
144
- dual_path = str(getattr(result, "dual_pdf_path", "") or "")
145
- glossary_path = str(
146
- getattr(result, "auto_extracted_glossary_path", "") or ""
147
- )
148
-
149
- # 兜底:如果路径为空,尝试在输出目录中扫描常见文件
150
- if not mono_path or not dual_path:
151
- files = list(output_dir.glob("*.pdf"))
152
- for file in files:
153
- name = file.name.lower()
154
- if ".mono.pdf" in name and not mono_path:
155
- mono_path = str(file)
156
- elif ".dual.pdf" in name and not dual_path:
157
- dual_path = str(file)
158
-
159
- await _transition_and_notify(
160
- job_id,
161
- "finish_ok",
162
- progress=100.0,
163
- message="Translation finished",
164
- finished_at=storage.now_iso(),
165
- mono_pdf_path=mono_path or None,
166
- dual_pdf_path=dual_path or None,
167
- glossary_path=glossary_path or None,
168
- )
169
- return
170
-
171
  await _transition_and_notify(
172
  job_id,
173
  "finish_error",
174
- error="Translation stream ended unexpectedly",
175
- message="Translation failed",
176
  finished_at=storage.now_iso(),
177
  )
178
  except asyncio.CancelledError:
@@ -455,6 +491,7 @@ async def api_get_job(
455
 
456
  @app.post("/api/jobs")
457
  async def api_create_job(
 
458
  file: UploadFile = File(...),
459
  lang_in: str = Form(DEFAULT_LANG_IN),
460
  lang_out: str = Form(DEFAULT_LANG_OUT),
@@ -464,18 +501,63 @@ async def api_create_job(
464
  if not filename.lower().endswith(".pdf"):
465
  raise HTTPException(status_code=400, detail="仅支持 PDF 文件")
466
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
467
  job_id = uuid.uuid4().hex
468
  safe_filename = Path(filename).name
469
  input_path = (storage.UPLOAD_DIR / f"{job_id}.pdf").resolve()
470
  output_dir = (storage.JOB_DIR / job_id).resolve()
471
  output_dir.mkdir(parents=True, exist_ok=True)
472
 
 
 
473
  try:
474
  with input_path.open("wb") as f:
475
- shutil.copyfileobj(file.file, f)
 
 
 
 
 
 
 
 
476
  finally:
477
  await file.close()
478
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
479
  job_dict = jobs.create_job_record(
480
  job_id=job_id,
481
  username=username,
@@ -599,6 +681,10 @@ async def api_billing_records(
599
  username: str = Depends(auth._require_user),
600
  ) -> dict[str, Any]:
601
  limit = max(1, min(limit, 200))
 
 
 
 
602
  @app.post("/internal/openai/v1/chat/completions")
603
  async def internal_openai_chat_completions(request: Request) -> Response:
604
  return await proxy.handle_internal_chat_completions(
 
49
  DEFAULT_LANG_OUT = os.environ.get("DEFAULT_LANG_OUT", "zh").strip()
50
  TRANSLATION_QPS = int(os.environ.get("TRANSLATION_QPS", "4"))
51
 
52
+ # 上传与任务执行约束配置
53
+ MAX_UPLOAD_MB = int(os.environ.get("MAX_UPLOAD_MB", "100"))
54
+ MAX_UPLOAD_BYTES = MAX_UPLOAD_MB * 1024 * 1024
55
+ MAX_JOB_RUNTIME_SECONDS = int(os.environ.get("MAX_JOB_RUNTIME_SECONDS", "7200"))
56
+ UPLOAD_CHUNK_SIZE = 1024 * 1024 # 1MB per chunk
57
+
58
  logging.basicConfig(
59
  level=logging.INFO,
60
  format="%(asctime)s %(levelname)s %(name)s - %(message)s",
 
93
  return settings
94
 
95
 
96
+ async def _consume_translation_stream(
97
+ job_id: str,
98
+ settings: SettingsModel,
99
+ input_path: Path,
100
+ output_dir: Path,
101
+ ) -> None:
102
+ """消费翻译流事件并驱动任务状态机。
103
+
104
+ 注意:本函数不负责超时控制,由上层通过 asyncio.wait_for 约束最大执行时长。
105
+ """
106
+ async for event in do_translate_async_stream(settings, input_path):
107
+ event_type = event.get("type")
108
+ if event_type in {"progress_start", "progress_update", "progress_end"}:
109
+ progress = float(event.get("overall_progress", 0.0))
110
+ stage = event.get("stage", "")
111
+ await _transition_and_notify(
112
+ job_id,
113
+ "progress",
114
+ progress=max(0.0, min(100.0, progress)),
115
+ message=f"{stage}" if stage else "Running",
116
+ )
117
+ elif event_type == "error":
118
+ error_msg = str(event.get("error", "Unknown translation error"))
119
+ await _transition_and_notify(
120
+ job_id,
121
+ "finish_error",
122
+ error=error_msg,
123
+ message="Translation failed",
124
+ finished_at=storage.now_iso(),
125
+ )
126
+ return
127
+ elif event_type == "finish":
128
+ result = event.get("translate_result")
129
+ mono_path = str(getattr(result, "mono_pdf_path", "") or "")
130
+ dual_path = str(getattr(result, "dual_pdf_path", "") or "")
131
+ glossary_path = str(
132
+ getattr(result, "auto_extracted_glossary_path", "") or ""
133
+ )
134
+
135
+ # 兜底:如果路径为空,尝试在输出目录中扫描常见文件
136
+ if not mono_path or not dual_path:
137
+ files = list(output_dir.glob("*.pdf"))
138
+ for file in files:
139
+ name = file.name.lower()
140
+ if ".mono.pdf" in name and not mono_path:
141
+ mono_path = str(file)
142
+ elif ".dual.pdf" in name and not dual_path:
143
+ dual_path = str(file)
144
+
145
+ await _transition_and_notify(
146
+ job_id,
147
+ "finish_ok",
148
+ progress=100.0,
149
+ message="Translation finished",
150
+ finished_at=storage.now_iso(),
151
+ mono_pdf_path=mono_path or None,
152
+ dual_pdf_path=dual_path or None,
153
+ glossary_path=glossary_path or None,
154
+ )
155
+ return
156
+
157
+ await _transition_and_notify(
158
+ job_id,
159
+ "finish_error",
160
+ error="Translation stream ended unexpectedly",
161
+ message="Translation failed",
162
+ finished_at=storage.now_iso(),
163
+ )
164
+
165
+
166
  async def _run_single_job(job_id: str) -> None:
167
  row = jobs.get_job_row(job_id)
168
  if row is None:
 
193
 
194
  try:
195
  settings = _build_settings_for_job(row)
196
+ await asyncio.wait_for(
197
+ _consume_translation_stream(
198
+ job_id=job_id,
199
+ settings=settings,
200
+ input_path=input_path,
201
+ output_dir=output_dir,
202
+ ),
203
+ timeout=MAX_JOB_RUNTIME_SECONDS,
204
+ )
205
+ except asyncio.TimeoutError:
206
+ logger.warning("Translation job timed out: job_id=%s", job_id)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
207
  await _transition_and_notify(
208
  job_id,
209
  "finish_error",
210
+ error="Translation timed out",
211
+ message="Translation timed out",
212
  finished_at=storage.now_iso(),
213
  )
214
  except asyncio.CancelledError:
 
491
 
492
  @app.post("/api/jobs")
493
  async def api_create_job(
494
+ request: Request,
495
  file: UploadFile = File(...),
496
  lang_in: str = Form(DEFAULT_LANG_IN),
497
  lang_out: str = Form(DEFAULT_LANG_OUT),
 
501
  if not filename.lower().endswith(".pdf"):
502
  raise HTTPException(status_code=400, detail="仅支持 PDF 文件")
503
 
504
+ # 如果客户端提供了 Content-Length,可做一次粗略预检,避免明显超大的请求
505
+ content_length = request.headers.get("content-length")
506
+ if content_length:
507
+ try:
508
+ total_len = int(content_length)
509
+ except ValueError:
510
+ total_len = 0
511
+ if total_len > MAX_UPLOAD_BYTES * 2:
512
+ logger.warning(
513
+ "Upload rejected by Content-Length: username=%s size=%s limit=%s",
514
+ username,
515
+ total_len,
516
+ MAX_UPLOAD_BYTES,
517
+ )
518
+ raise HTTPException(
519
+ status_code=413,
520
+ detail=f"上传文件过大,最大 {MAX_UPLOAD_MB}MB",
521
+ )
522
+
523
  job_id = uuid.uuid4().hex
524
  safe_filename = Path(filename).name
525
  input_path = (storage.UPLOAD_DIR / f"{job_id}.pdf").resolve()
526
  output_dir = (storage.JOB_DIR / job_id).resolve()
527
  output_dir.mkdir(parents=True, exist_ok=True)
528
 
529
+ total_bytes = 0
530
+ too_large = False
531
  try:
532
  with input_path.open("wb") as f:
533
+ while True:
534
+ chunk = file.file.read(UPLOAD_CHUNK_SIZE)
535
+ if not chunk:
536
+ break
537
+ total_bytes += len(chunk)
538
+ if total_bytes > MAX_UPLOAD_BYTES:
539
+ too_large = True
540
+ break
541
+ f.write(chunk)
542
  finally:
543
  await file.close()
544
 
545
+ if too_large:
546
+ # 删除已写入的部分文件,避免残留
547
+ with contextlib.suppress(FileNotFoundError):
548
+ input_path.unlink()
549
+ logger.warning(
550
+ "Upload too large: username=%s job_id=%s size=%s limit=%s",
551
+ username,
552
+ job_id,
553
+ total_bytes,
554
+ MAX_UPLOAD_BYTES,
555
+ )
556
+ raise HTTPException(
557
+ status_code=413,
558
+ detail=f"上传文件过大,最大 {MAX_UPLOAD_MB}MB",
559
+ )
560
+
561
  job_dict = jobs.create_job_record(
562
  job_id=job_id,
563
  username=username,
 
681
  username: str = Depends(auth._require_user),
682
  ) -> dict[str, Any]:
683
  limit = max(1, min(limit, 200))
684
+ records = billing.get_billing_records(username=username, limit=limit)
685
+ return {"records": records}
686
+
687
+
688
  @app.post("/internal/openai/v1/chat/completions")
689
  async def internal_openai_chat_completions(request: Request) -> Response:
690
  return await proxy.handle_internal_chat_completions(
src/proxy.py CHANGED
@@ -7,6 +7,7 @@ import logging
7
  import os
8
  import uuid
9
  from datetime import datetime, timezone
 
10
  from typing import Any
11
 
12
  import httpx
@@ -36,6 +37,10 @@ MODEL_ROUTE_TABLE: dict[str, dict[str, Any]] = {
36
  "https://api2.pdf2zh-next.com/chatproxy",
37
  ],
38
  "api_key": "",
 
 
 
 
39
  }
40
  }
41
 
@@ -136,6 +141,7 @@ async def _forward_to_chatproxy(
136
  payload: dict[str, Any],
137
  model: str,
138
  route: dict[str, Any],
 
139
  ) -> dict[str, Any]:
140
  base_urls = route.get("base_urls", [])
141
  if not isinstance(base_urls, list) or not base_urls:
@@ -162,13 +168,20 @@ async def _forward_to_chatproxy(
162
  )
163
  except httpx.HTTPError as exc:
164
  last_error = str(exc)
165
- logger.warning("chatproxy call failed: model=%s url=%s error=%s", model, base_url, exc)
 
 
 
 
 
 
166
  continue
167
 
168
  if upstream.status_code >= 400:
169
  last_error = f"status={upstream.status_code}"
170
  logger.warning(
171
- "chatproxy upstream returned error: model=%s url=%s status=%s",
 
172
  model,
173
  base_url,
174
  upstream.status_code,
@@ -180,7 +193,8 @@ async def _forward_to_chatproxy(
180
  except Exception as exc: # noqa: BLE001
181
  last_error = f"invalid json response: {exc}"
182
  logger.warning(
183
- "chatproxy upstream returned invalid json: model=%s url=%s",
 
184
  model,
185
  base_url,
186
  )
@@ -190,14 +204,21 @@ async def _forward_to_chatproxy(
190
  if not isinstance(content, str):
191
  last_error = "missing content field"
192
  logger.warning(
193
- "chatproxy upstream missing content: model=%s url=%s body=%s",
 
194
  model,
195
  base_url,
196
  body,
197
  )
198
  continue
199
 
200
- return _build_openai_compatible_response(model=model, content=content)
 
 
 
 
 
 
201
 
202
  raise HTTPException(
203
  status_code=502,
@@ -233,23 +254,59 @@ async def handle_internal_chat_completions(
233
  if not model:
234
  raise HTTPException(status_code=400, detail="model is required")
235
 
 
236
  route = MODEL_ROUTE_TABLE.get(model)
 
 
 
 
 
 
 
 
 
 
 
 
237
  if route and route.get("route_type") == "chatproxy":
238
- response_json = await _forward_to_chatproxy(
239
- http_client=http_client,
240
- payload=payload,
241
- model=model,
242
- route=route,
243
- )
244
- billing.record_usage(
245
- username=username,
246
- job_id=active_job_by_user.get(username),
247
- model=model,
248
- prompt_tokens=0,
249
- completion_tokens=0,
250
- total_tokens=0,
251
- )
252
- return JSONResponse(response_json, status_code=200)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
253
 
254
  if not OPENAI_REAL_API_KEY:
255
  raise HTTPException(status_code=500, detail="OPENAI_API_KEY is not configured")
@@ -259,6 +316,8 @@ async def handle_internal_chat_completions(
259
  "Content-Type": "application/json",
260
  }
261
 
 
 
262
  try:
263
  upstream = await http_client.post(
264
  OPENAI_UPSTREAM_CHAT_URL,
@@ -266,8 +325,25 @@ async def handle_internal_chat_completions(
266
  json=payload,
267
  )
268
  except httpx.HTTPError as exc:
269
- logger.error("Upstream OpenAI call failed: %s", exc)
 
 
 
 
 
 
 
 
270
  raise HTTPException(status_code=502, detail="Upstream OpenAI request failed") from exc
 
 
 
 
 
 
 
 
 
271
 
272
  response_json: dict[str, Any] | None = None
273
  content_type = upstream.headers.get("content-type", "")
@@ -281,7 +357,21 @@ async def handle_internal_chat_completions(
281
  usage = response_json.get("usage") or {}
282
  prompt_tokens = int(usage.get("prompt_tokens") or 0)
283
  completion_tokens = int(usage.get("completion_tokens") or 0)
284
- total_tokens = int(usage.get("total_tokens") or (prompt_tokens + completion_tokens))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
285
 
286
  job_id = active_job_by_user.get(username)
287
 
@@ -302,4 +392,3 @@ async def handle_internal_chat_completions(
302
  status_code=upstream.status_code,
303
  media_type=content_type or None,
304
  )
305
-
 
7
  import os
8
  import uuid
9
  from datetime import datetime, timezone
10
+ from time import perf_counter
11
  from typing import Any
12
 
13
  import httpx
 
37
  "https://api2.pdf2zh-next.com/chatproxy",
38
  ],
39
  "api_key": "",
40
+ # 计费模式:
41
+ # - "none": 不计费(当前 SiliconFlowFree 默认)
42
+ # - "upstream_usage": 按上游返回的 usage 字段计费
43
+ "billing_mode": "none",
44
  }
45
  }
46
 
 
141
  payload: dict[str, Any],
142
  model: str,
143
  route: dict[str, Any],
144
+ request_id: str | None = None,
145
  ) -> dict[str, Any]:
146
  base_urls = route.get("base_urls", [])
147
  if not isinstance(base_urls, list) or not base_urls:
 
168
  )
169
  except httpx.HTTPError as exc:
170
  last_error = str(exc)
171
+ logger.warning(
172
+ "chatproxy call failed: request_id=%s model=%s url=%s error=%s",
173
+ request_id,
174
+ model,
175
+ base_url,
176
+ exc,
177
+ )
178
  continue
179
 
180
  if upstream.status_code >= 400:
181
  last_error = f"status={upstream.status_code}"
182
  logger.warning(
183
+ "chatproxy upstream returned error: request_id=%s model=%s url=%s status=%s",
184
+ request_id,
185
  model,
186
  base_url,
187
  upstream.status_code,
 
193
  except Exception as exc: # noqa: BLE001
194
  last_error = f"invalid json response: {exc}"
195
  logger.warning(
196
+ "chatproxy upstream returned invalid json: request_id=%s model=%s url=%s",
197
+ request_id,
198
  model,
199
  base_url,
200
  )
 
204
  if not isinstance(content, str):
205
  last_error = "missing content field"
206
  logger.warning(
207
+ "chatproxy upstream missing content: request_id=%s model=%s url=%s body=%s",
208
+ request_id,
209
  model,
210
  base_url,
211
  body,
212
  )
213
  continue
214
 
215
+ return {
216
+ "openai_response": _build_openai_compatible_response(
217
+ model=model,
218
+ content=content,
219
+ ),
220
+ "raw_upstream_body": body,
221
+ }
222
 
223
  raise HTTPException(
224
  status_code=502,
 
254
  if not model:
255
  raise HTTPException(status_code=400, detail="model is required")
256
 
257
+ request_id = uuid.uuid4().hex[:8]
258
  route = MODEL_ROUTE_TABLE.get(model)
259
+ job_id = active_job_by_user.get(username)
260
+ route_type = route.get("route_type") if route else "openai"
261
+
262
+ logger.info(
263
+ "Internal chat request start: request_id=%s username=%s model=%s route_type=%s job_id=%s",
264
+ request_id,
265
+ username,
266
+ model,
267
+ route_type,
268
+ job_id,
269
+ )
270
+
271
  if route and route.get("route_type") == "chatproxy":
272
+ t0 = perf_counter()
273
+ try:
274
+ result = await _forward_to_chatproxy(
275
+ http_client=http_client,
276
+ payload=payload,
277
+ model=model,
278
+ route=route,
279
+ request_id=request_id,
280
+ )
281
+ response_json = result.get("openai_response") or {}
282
+ raw_body = result.get("raw_upstream_body") or {}
283
+
284
+ billing_mode = str(route.get("billing_mode") or "none")
285
+ if billing_mode == "upstream_usage":
286
+ usage = raw_body.get("usage") or {}
287
+ prompt_tokens = int(usage.get("prompt_tokens") or 0)
288
+ completion_tokens = int(usage.get("completion_tokens") or 0)
289
+ total_tokens = int(
290
+ usage.get("total_tokens") or (prompt_tokens + completion_tokens)
291
+ )
292
+ if total_tokens > 0:
293
+ billing.record_usage(
294
+ username=username,
295
+ job_id=job_id,
296
+ model=model,
297
+ prompt_tokens=prompt_tokens,
298
+ completion_tokens=completion_tokens,
299
+ total_tokens=total_tokens,
300
+ )
301
+
302
+ return JSONResponse(response_json, status_code=200)
303
+ finally:
304
+ duration = perf_counter() - t0
305
+ logger.info(
306
+ "Internal chat request finished: request_id=%s route_type=chatproxy duration=%.3fs",
307
+ request_id,
308
+ duration,
309
+ )
310
 
311
  if not OPENAI_REAL_API_KEY:
312
  raise HTTPException(status_code=500, detail="OPENAI_API_KEY is not configured")
 
316
  "Content-Type": "application/json",
317
  }
318
 
319
+ upstream: httpx.Response | None = None
320
+ t0 = perf_counter()
321
  try:
322
  upstream = await http_client.post(
323
  OPENAI_UPSTREAM_CHAT_URL,
 
325
  json=payload,
326
  )
327
  except httpx.HTTPError as exc:
328
+ duration = perf_counter() - t0
329
+ logger.error(
330
+ "Upstream OpenAI call failed: request_id=%s model=%s username=%s duration=%.3fs error=%s",
331
+ request_id,
332
+ model,
333
+ username,
334
+ duration,
335
+ exc,
336
+ )
337
  raise HTTPException(status_code=502, detail="Upstream OpenAI request failed") from exc
338
+ else:
339
+ duration = perf_counter() - t0
340
+ logger.info(
341
+ "Upstream OpenAI call finished: request_id=%s model=%s status=%s duration=%.3fs",
342
+ request_id,
343
+ model,
344
+ upstream.status_code,
345
+ duration,
346
+ )
347
 
348
  response_json: dict[str, Any] | None = None
349
  content_type = upstream.headers.get("content-type", "")
 
357
  usage = response_json.get("usage") or {}
358
  prompt_tokens = int(usage.get("prompt_tokens") or 0)
359
  completion_tokens = int(usage.get("completion_tokens") or 0)
360
+ total_tokens = int(
361
+ usage.get("total_tokens") or (prompt_tokens + completion_tokens)
362
+ )
363
+
364
+ if (
365
+ prompt_tokens == 0
366
+ and completion_tokens == 0
367
+ and total_tokens == 0
368
+ ):
369
+ logger.warning(
370
+ "Upstream OpenAI response missing or zero usage: request_id=%s model=%s username=%s",
371
+ request_id,
372
+ model,
373
+ username,
374
+ )
375
 
376
  job_id = active_job_by_user.get(username)
377
 
 
392
  status_code=upstream.status_code,
393
  media_type=content_type or None,
394
  )
 
src/web/static/dashboard.js CHANGED
@@ -24,11 +24,14 @@ async function refreshBilling() {
24
  document.getElementById("billingSummary").textContent =
25
  `总 tokens=${summary.total_tokens} | 总费用(USD)=${Number(
26
  summary.total_cost_usd,
27
- ).toFixed(6)}`;
28
 
29
  const body = document.getElementById("billingBody");
30
  body.innerHTML = "";
31
  for (const r of rows.records) {
 
 
 
32
  const tr = document.createElement("tr");
33
  tr.innerHTML = `
34
  <td>${esc(r.created_at)}</td>
@@ -36,7 +39,7 @@ async function refreshBilling() {
36
  <td>${r.prompt_tokens}</td>
37
  <td>${r.completion_tokens}</td>
38
  <td>${r.total_tokens}</td>
39
- <td>${Number(r.cost_usd).toFixed(6)}</td>
40
  `;
41
  body.appendChild(tr);
42
  }
 
24
  document.getElementById("billingSummary").textContent =
25
  `总 tokens=${summary.total_tokens} | 总费用(USD)=${Number(
26
  summary.total_cost_usd,
27
+ ).toFixed(6)}(仅统计计费模型,不含 SiliconFlowFree 等免费模型)`;
28
 
29
  const body = document.getElementById("billingBody");
30
  body.innerHTML = "";
31
  for (const r of rows.records) {
32
+ const cost = Number(r.cost_usd).toFixed(6);
33
+ const costLabel = cost === "0.000000" ? `${cost}(不计费模型)` : cost;
34
+
35
  const tr = document.createElement("tr");
36
  tr.innerHTML = `
37
  <td>${esc(r.created_at)}</td>
 
39
  <td>${r.prompt_tokens}</td>
40
  <td>${r.completion_tokens}</td>
41
  <td>${r.total_tokens}</td>
42
+ <td>${costLabel}</td>
43
  `;
44
  body.appendChild(tr);
45
  }
src/web/templates/dashboard.html CHANGED
@@ -120,6 +120,7 @@
120
  </thead>
121
  <tbody id="billingBody"></tbody>
122
  </table>
 
123
  </section>
124
  </div>
125
 
@@ -147,4 +148,3 @@
147
  <script src="/static/dashboard.js"></script>
148
  </body>
149
  </html>
150
-
 
120
  </thead>
121
  <tbody id="billingBody"></tbody>
122
  </table>
123
+ <div class="hint">说明:仅统计计费模型的使用情况,当前 SiliconFlowFree 等免费模型不会产生账单记录。</div>
124
  </section>
125
  </div>
126
 
 
148
  <script src="/static/dashboard.js"></script>
149
  </body>
150
  </html>