Ge-AI commited on
Commit
9b8516b
·
verified ·
1 Parent(s): e7fae93

Update openai_ondemand_adapter.py

Browse files
Files changed (1) hide show
  1. openai_ondemand_adapter.py +156 -167
openai_ondemand_adapter.py CHANGED
@@ -101,15 +101,13 @@ Claude always responds to the person in the language they use or request. If the
101
  Claude is now being connected with a person."""
102
 
103
  # ====== 读取 Huggingface Secret 配置的私有key =======
104
- # 用于保护此代理服务本身,防止未授权访问
105
- PRIVATE_KEY = os.environ.get("PRIVATE_KEY", "") # 如果在Huggingface Spaces运行,这里会读取Secrets
106
- SAFE_HEADERS = ["Authorization", "X-API-KEY"] # 允许传递私有key的请求头
107
 
108
  # 全局接口访问权限检查
109
  def check_private_key():
110
- # 根路径和favicon通常不需要鉴权
111
  if request.path in ["/", "/favicon.ico"]:
112
- return None # 显式返回 None 表示通过
113
 
114
  key_from_header = None
115
  for header_name in SAFE_HEADERS:
@@ -119,28 +117,22 @@ def check_private_key():
119
  key_from_header = key_from_header[len("Bearer "):].strip()
120
  break
121
 
122
- if not PRIVATE_KEY: # 如果没有设置 PRIVATE_KEY,则不进行鉴权 (方便本地测试)
123
  logging.warning("PRIVATE_KEY 未设置,服务将不进行鉴权!")
124
  return None
125
 
126
  if not key_from_header or key_from_header != PRIVATE_KEY:
127
  logging.warning(f"未授权访问尝试: Path={request.path}, IP={request.remote_addr}, Key Provided='{key_from_header[:10]}...'")
128
  return jsonify({"error": "Unauthorized. Correct 'Authorization: Bearer <PRIVATE_KEY>' or 'X-API-KEY: <PRIVATE_KEY>' header is required."}), 401
129
- return None # 鉴权通过
130
 
131
- # 应用所有API鉴权
132
  app = Flask(__name__)
133
  app.before_request(check_private_key)
134
 
135
- # ========== OnDemand API KEY池(从环境变量读取,每行一个KEY,用逗号分隔)==========
136
  ONDEMAND_APIKEYS_STR = os.environ.get("ONDEMAND_APIKEYS", "")
137
  ONDEMAND_APIKEYS = [key.strip() for key in ONDEMAND_APIKEYS_STR.split(',') if key.strip()]
 
138
 
139
- BAD_KEY_RETRY_INTERVAL = 600 # 标记为坏的KEY的重试间隔(秒),例如10分钟
140
- # SESSION_TIMEOUT 已移除,因为我们现在每次都用新会话
141
-
142
- # ========== OnDemand模型映射 ==========
143
- # 将 OpenAI 风格的模型名称映射到 OnDemand 服务的 endpointId
144
  MODEL_MAP = {
145
  "gpto3-mini": "predefined-openai-gpto3-mini",
146
  "gpt-4o": "predefined-openai-gpt4o",
@@ -154,106 +146,85 @@ MODEL_MAP = {
154
  "gemini-2.0-flash": "predefined-gemini-2.0-flash",
155
  }
156
  DEFAULT_ONDEMAND_MODEL = "predefined-openai-gpt4o"
157
- # ==========================================
158
 
159
  class KeyManager:
160
- """管理 OnDemand API 密钥池"""
161
  def __init__(self, key_list):
162
- self.key_list = list(key_list) # 存储可用的API密钥
163
- self.lock = threading.Lock() # 线程锁,用于同步访问密钥状态
164
- # 存储每个密钥的状态:是否被标记为“坏的”以及标记的时间戳
165
  self.key_status = {key: {"bad": False, "bad_ts": None} for key in self.key_list}
166
- self.idx = 0 # 用于轮询密钥的索引
167
 
168
  def display_key(self, key):
169
- """返回部分隐藏的密钥,用于日志输出"""
170
  if not key or len(key) < 10:
171
  return "INVALID_KEY_FORMAT"
172
  return f"{key[:6]}...{key[-4:]}"
173
 
174
  def get(self):
175
- """获取一个可用的API密钥"""
176
  with self.lock:
177
- if not self.key_list: # 如果密钥池为空
178
  logging.error("【KeyManager】API密钥池为空!无法提供密钥。")
179
  raise ValueError("API key pool is empty.")
180
 
181
  now = time.time()
182
  num_keys = len(self.key_list)
183
 
184
- for i in range(num_keys): # 尝试遍历所有密钥最多一次
185
  current_key_candidate = self.key_list[self.idx]
186
- self.idx = (self.idx + 1) % num_keys # 移动到下一个密钥,循环使用
187
-
188
  status = self.key_status[current_key_candidate]
189
 
190
- if not status["bad"]: # 如果密钥状态良好
191
  logging.info(f"【KeyManager】选择API KEY: {self.display_key(current_key_candidate)} [状态:正常]")
192
  return current_key_candidate
193
 
194
- # 如果密钥被标记为坏的,检查是否已达到重试时间
195
  if status["bad_ts"] and (now - status["bad_ts"] >= BAD_KEY_RETRY_INTERVAL):
196
  logging.info(f"【KeyManager】API KEY: {self.display_key(current_key_candidate)} 达到重试周期,恢复为正常。")
197
  status["bad"] = False
198
  status["bad_ts"] = None
199
  return current_key_candidate
200
 
201
- # 如果所有密钥都被标记为坏的,并且都未达到重试时间
202
- # 强制重置所有密钥状态并返回第一个,这是一种降级策略
203
  logging.warning("【KeyManager】所有API KEY均被标记为不良且未到重试时间。将强制重置所有KEY状态并尝试第一个。")
204
  for key_to_reset in self.key_list:
205
  self.key_status[key_to_reset]["bad"] = False
206
  self.key_status[key_to_reset]["bad_ts"] = None
207
  self.idx = 0
208
- if self.key_list: # 再次检查,以防在极小概率下key_list变空
209
  selected_key = self.key_list[0]
210
  logging.info(f"【KeyManager】强制选择API KEY: {self.display_key(selected_key)} [状态:强制重试]")
211
  return selected_key
212
- else: # 理论上不应该到这里,因为前面有检查
213
  logging.error("【KeyManager】在强制重试逻辑中发现密钥池为空!")
214
  raise ValueError("API key pool became empty during forced retry logic.")
215
 
216
-
217
  def mark_bad(self, key):
218
- """将指定的API密钥标记为“坏的”"""
219
  with self.lock:
220
  if key in self.key_status and not self.key_status[key]["bad"]:
221
  logging.warning(f"【KeyManager】禁用API KEY: {self.display_key(key)}。将在 {BAD_KEY_RETRY_INTERVAL // 60} 分钟后自动重试。")
222
  self.key_status[key]["bad"] = True
223
  self.key_status[key]["bad_ts"] = time.time()
224
 
225
- # 初始化 KeyManager
226
  if not ONDEMAND_APIKEYS:
227
  logging.warning("【启动警告】ONDEMAND_APIKEYS 环境变量未设置或为空。服务可能无法正常工作。")
228
  keymgr = KeyManager(ONDEMAND_APIKEYS)
229
 
230
- ONDEMAND_API_BASE = "https://api.on-demand.io/chat/v1" # OnDemand API 的基础URL
231
 
232
  def get_endpoint_id(openai_model_name):
233
- """根据用户提供的OpenAI模型名称,从MODEL_MAP中查找对应的OnDemand endpointId"""
234
  normalized_model_name = str(openai_model_name or "").lower().replace(" ", "")
235
  return MODEL_MAP.get(normalized_model_name, DEFAULT_ONDEMAND_MODEL)
236
 
237
  def create_session(apikey, external_user_id=None, plugin_ids=None):
238
- """
239
- 向 OnDemand API 创建一个新的会话。
240
- :param apikey: OnDemand API 密钥。
241
- :param external_user_id: 可选,外部用户ID。
242
- :param plugin_ids: 可选,插件ID列表。
243
- :return: 新创建的会话ID。
244
- :raises: requests.HTTPError 如果API调用失败。
245
- """
246
  url = f"{ONDEMAND_API_BASE}/sessions"
247
- payload = {"externalUserId": external_user_id or str(uuid.uuid4())} # 如果未提供,则生成UUID
248
- if plugin_ids is not None: # 通常聊天场景可能不需要插件
249
  payload["pluginIds"] = plugin_ids
250
 
251
  headers = {"apikey": apikey, "Content-Type": "application/json"}
252
 
253
  logging.info(f"【OnDemand】尝试创建新会话... URL: {url}, Key: {keymgr.display_key(apikey)}")
254
  try:
255
- resp = requests.post(url, json=payload, headers=headers, timeout=20) # 设置超时
256
- resp.raise_for_status() # 如果状态码不是2xx,则抛出HTTPError
257
  session_id = resp.json()["data"]["id"]
258
  logging.info(f"【OnDemand】新会话创建成功: {session_id}, Key: {keymgr.display_key(apikey)}")
259
  return session_id
@@ -265,17 +236,14 @@ def create_session(apikey, external_user_id=None, plugin_ids=None):
265
  raise
266
 
267
  def format_openai_sse_delta(chunk_data_dict):
268
- """将数据块格式化为 OpenAI SSE (Server-Sent Events) 流格式"""
269
  return f"data: {json.dumps(chunk_data_dict, ensure_ascii=False)}\n\n"
270
 
 
271
  def _execute_one_stream_attempt(apikey_for_attempt, session_id_for_attempt, query_str, endpoint_id, openai_model_name_for_response, current_attempt_num_logging):
272
  """
273
- 执行一次流式请求尝试。
274
- 返回: (generated_sse_strings, accumulated_text_content, api_error_occurred)
275
- generated_sse_strings: 此尝试生成的所有SSE事件字符串列表。
276
- accumulated_text_content: 从流中累积的纯文本内容。
277
- api_error_occurred: 布尔值,指示此尝试是否遇到可直接转换为SSE错误事件的API错误。
278
- requests.RequestException (如超时) 会被直接抛出。
279
  """
280
  url = f"{ONDEMAND_API_BASE}/sessions/{session_id_for_attempt}/query"
281
  payload = {
@@ -290,16 +258,15 @@ def _execute_one_stream_attempt(apikey_for_attempt, session_id_for_attempt, quer
290
  "Accept": "text/event-stream"
291
  }
292
 
293
- generated_sse_strings = []
294
  accumulated_text_parts = []
295
- api_error_handled_as_sse = False
296
 
297
  logging.info(f"【流式请求子尝试 {current_attempt_num_logging}】发送到 OnDemand: Session={session_id_for_attempt}, Endpoint={endpoint_id}, Key={keymgr.display_key(apikey_for_attempt)}")
298
 
299
  try:
300
  with requests.post(url, json=payload, headers=headers, stream=True, timeout=180) as resp:
301
  if resp.status_code != 200:
302
- api_error_handled_as_sse = True
303
  error_text = resp.text
304
  logging.error(f"【OnDemand流错误】请求失败 (子尝试 {current_attempt_num_logging})。状态码: {resp.status_code}, Session: {session_id_for_attempt}, 响应: {error_text[:500]}")
305
  error_payload = {
@@ -309,9 +276,9 @@ def _execute_one_stream_attempt(apikey_for_attempt, session_id_for_attempt, quer
309
  "code": resp.status_code
310
  }
311
  }
312
- generated_sse_strings.append(format_openai_sse_delta(error_payload))
313
- generated_sse_strings.append("data: [DONE]\n\n")
314
- return generated_sse_strings, "".join(accumulated_text_parts), api_error_handled_as_sse
315
 
316
  first_chunk_sent = False
317
  last_line_str = ""
@@ -327,19 +294,22 @@ def _execute_one_stream_attempt(apikey_for_attempt, session_id_for_attempt, quer
327
 
328
  if data_part == "[DONE]":
329
  logging.info(f"【OnDemand流】接收到 [DONE] 信号 (子尝试 {current_attempt_num_logging})。Session: {session_id_for_attempt}")
330
- generated_sse_strings.append("data: [DONE]\n\n")
331
- break
 
 
332
  elif data_part.startswith("[ERROR]:"):
333
- api_error_handled_as_sse = True
334
  error_json_str = data_part[len("[ERROR]:"):].strip()
335
  logging.warning(f"【OnDemand流】接收到错误事件 (子尝试 {current_attempt_num_logging}): {error_json_str}。Session: {session_id_for_attempt}")
336
  try:
337
  error_obj = json.loads(error_json_str)
338
  except json.JSONDecodeError:
339
  error_obj = {"message": error_json_str, "type": "on_demand_stream_error_format"}
340
- generated_sse_strings.append(format_openai_sse_delta({"error": error_obj}))
341
- generated_sse_strings.append("data: [DONE]\n\n")
342
- break
 
343
  else:
344
  try:
345
  event_data = json.loads(data_part)
@@ -367,31 +337,39 @@ def _execute_one_stream_attempt(apikey_for_attempt, session_id_for_attempt, quer
367
  "model": openai_model_name_for_response,
368
  "choices": [{"delta": choice_delta, "index": 0, "finish_reason": None}]
369
  }
370
- generated_sse_strings.append(format_openai_sse_delta(openai_chunk))
371
  except json.JSONDecodeError:
372
  logging.warning(f"【OnDemand流】无法解析JSON (子尝试 {current_attempt_num_logging}): {data_part[:100]}... Session: {session_id_for_attempt}")
373
  continue
374
 
375
- if not last_line_str.startswith("data: [DONE]") and not api_error_handled_as_sse:
376
- logging.info(f"【OnDemand流】(子尝试 {current_attempt_num_logging}) 流迭代完成,补充发送 [DONE]。Session: {session_id_for_attempt}")
377
- generated_sse_strings.append("data: [DONE]\n\n")
378
-
379
- except requests.exceptions.RequestException: # Let specific RequestExceptions be handled by the caller
380
- raise
381
- except Exception as e: # Catch other unexpected errors during stream processing
382
- api_error_handled_as_sse = True
 
 
 
 
 
 
 
383
  logging.error(f"【OnDemand流】处理流时发生未知错误 (子尝试 {current_attempt_num_logging}): {e}, Session: {session_id_for_attempt}", exc_info=True)
 
384
  error_payload = {
385
  "error": {"message": f"Unknown error during streaming (Attempt {current_attempt_num_logging}): {str(e)}", "type": "unknown_streaming_error_in_attempt"}
386
  }
387
- generated_sse_strings.append(format_openai_sse_delta(error_payload))
388
- generated_sse_strings.append("data: [DONE]\n\n")
389
 
390
- return generated_sse_strings, "".join(accumulated_text_parts).strip(), api_error_handled_as_sse
 
391
 
392
  @app.route("/v1/chat/completions", methods=["POST"])
393
  def chat_completions():
394
- """处理聊天补全请求,模拟 OpenAI /v1/chat/completions 接口"""
395
  try:
396
  request_data = request.json
397
  except Exception as e:
@@ -435,20 +413,20 @@ def chat_completions():
435
  if not formatted_query_parts:
436
  return jsonify({"error": "No valid content found in 'messages'."}), 400
437
 
438
- # Use the globally defined CLAUDE_SYSTEM_PROMPT
439
  start_prompt = CLAUDE_SYSTEM_PROMPT + "\n\n" + """下面是对话历史. 你是Assitant角色,请遵从User指令,并用中文尽可能详细的回复。注意,请直接回复! 请不要在开头提出"根据上下文及历史记录"相关的话语。\n"""
440
  final_query_to_ondemand = start_prompt + "\n".join(formatted_query_parts)
441
 
442
  def attempt_ondemand_request_wrapper(current_apikey_from_wrapper, current_session_id_from_wrapper):
443
- # This inner function is what with_valid_key_and_session calls.
444
- # It receives the *initial* apikey and session_id.
445
  if is_stream_request:
446
- return handle_stream_request(current_apikey_from_wrapper, current_session_id_from_wrapper, final_query_to_ondemand, target_endpoint_id, openai_model_name)
 
 
 
 
447
  else:
448
  return handle_non_stream_request(current_apikey_from_wrapper, current_session_id_from_wrapper, final_query_to_ondemand, target_endpoint_id, openai_model_name)
449
 
450
  def with_valid_key_and_session(action_func_to_wrap):
451
- # This is the outer retry loop for API key/session issues for the *first* attempt of action_func_to_wrap
452
  max_key_retries = len(ONDEMAND_APIKEYS) * 2 if ONDEMAND_APIKEYS else 1
453
  key_retry_count = 0
454
  last_exception_for_key_retry = None
@@ -460,60 +438,70 @@ def chat_completions():
460
  selected_apikey_for_outer_retry = keymgr.get()
461
  logging.info(f"【请求处理 - Key轮换尝试 {key_retry_count}/{max_key_retries}】使用 API Key: {keymgr.display_key(selected_apikey_for_outer_retry)},准备创建新会话...")
462
  ondemand_session_id_for_outer_retry = create_session(selected_apikey_for_outer_retry)
463
-
464
- # Call the action_func_to_wrap (which is attempt_ondemand_request_wrapper)
465
- # This action_func_to_wrap will then call handle_stream_request or handle_non_stream_request
466
- # If handle_..._request fails its *first* attempt due to RequestException, it re-raises it here.
467
  return action_func_to_wrap(selected_apikey_for_outer_retry, ondemand_session_id_for_outer_retry)
468
 
469
- except ValueError as ve: # keymgr.get() failed
470
  logging.critical(f"【请求处理 - Key轮换尝试 {key_retry_count}】KeyManager 错误: {ve}")
471
  last_exception_for_key_retry = ve
472
- break # Cannot get any key, fatal for this request.
473
- except requests.exceptions.RequestException as http_err_outer: # Covers create_session failure or re-raised error from action_func's first attempt
474
  last_exception_for_key_retry = http_err_outer
475
  logging.warning(f"【请求处理 - Key轮换尝试 {key_retry_count}】HTTP/请求错误。Key: {keymgr.display_key(selected_apikey_for_outer_retry) if selected_apikey_for_outer_retry else 'N/A'}, Error: {http_err_outer}")
476
- if selected_apikey_for_outer_retry: # If a key was involved in this failure
477
  keymgr.mark_bad(selected_apikey_for_outer_retry)
478
 
479
  if key_retry_count >= max_key_retries:
480
  logging.error(f"【请求处理】所有Key轮换尝试均失败。最后错误: {last_exception_for_key_retry}")
481
- break # Exhausted key retries
482
 
483
  logging.info(f"【请求处理】Key轮换尝试 {key_retry_count} 失败,等待后重试下一个Key...")
484
  time.sleep(1)
485
- continue # To the next iteration of the key_retry_count loop
486
- except Exception as e_outer: # Other unexpected errors during the initial setup/call
487
  last_exception_for_key_retry = e_outer
488
  logging.error(f"【请求处理 - Key轮换尝试 {key_retry_count}】发生意外严重错误: {e_outer}", exc_info=True)
489
  if selected_apikey_for_outer_retry:
490
  keymgr.mark_bad(selected_apikey_for_outer_retry)
491
- # For truly unexpected errors, might be better to fail fast
492
- break # Break outer retry loop
493
 
494
  error_message = "All attempts to process the request failed after multiple key/session retries."
495
  if last_exception_for_key_retry:
496
  error_message += f" Last known error during key/session phase: {str(last_exception_for_key_retry)}"
497
  logging.error(error_message)
498
- return jsonify({"error": error_message}), 503
 
 
 
 
 
 
 
 
 
 
 
499
 
500
  return with_valid_key_and_session(attempt_ondemand_request_wrapper)
501
 
502
-
503
  def handle_stream_request(initial_apikey, initial_session_id, query_str, endpoint_id, openai_model_name_for_response):
504
- """处理流式聊天补全请求,包含空回复重试逻辑 (每次空回复重试使用新Key/Session)"""
 
 
 
 
505
  max_empty_response_retries = 5
506
  empty_retry_attempt_num = 0
507
 
508
  current_apikey_for_attempt = initial_apikey
509
  current_session_id_for_attempt = initial_session_id
510
 
511
- final_sse_strings_to_yield = []
512
-
513
  while empty_retry_attempt_num < max_empty_response_retries:
514
  empty_retry_attempt_num += 1
 
 
515
 
516
- if empty_retry_attempt_num > 1: # This is an empty-response retry, get new key/session
517
  logging.info(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session...")
518
  try:
519
  current_apikey_for_attempt = keymgr.get()
@@ -521,95 +509,98 @@ def handle_stream_request(initial_apikey, initial_session_id, query_str, endpoin
521
  logging.info(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】新Key/Session获取成功: Key={keymgr.display_key(current_apikey_for_attempt)}, Session={current_session_id_for_attempt}")
522
  except (ValueError, requests.exceptions.RequestException) as e_key_session:
523
  logging.warning(f"【流式请求-空回复重��� {empty_retry_attempt_num-1}】获取新Key/Session失败: {e_key_session}")
524
- if current_apikey_for_attempt and not isinstance(e_key_session, ValueError): # If key was obtained but create_session failed
525
  keymgr.mark_bad(current_apikey_for_attempt)
526
  if empty_retry_attempt_num >= max_empty_response_retries:
527
- final_sse_strings_to_yield = [
528
- format_openai_sse_delta({"error": {"message": f"Failed to get new key/session for final empty stream retry. Error: {str(e_key_session)}", "type": "internal_proxy_error"}}),
529
- "data: [DONE]\n\n"
530
- ]
531
- break # Break empty retry loop
532
  time.sleep(1)
533
- current_apikey_for_attempt = None # Ensure it's reset if keymgr.get() failed
534
- continue # Next iteration of empty_retry_attempt_num loop
535
 
536
- # Log which attempt this is (1st overall, or nth empty-response retry)
537
  log_attempt_str = f"初始尝试" if empty_retry_attempt_num == 1 else f"空回复重试 {empty_retry_attempt_num-1}"
538
 
539
  try:
540
- sse_strings_this_attempt, accumulated_text_this_attempt, api_error_in_attempt = \
541
- _execute_one_stream_attempt(current_apikey_for_attempt, current_session_id_for_attempt, query_str, endpoint_id, openai_model_name_for_response, f"{log_attempt_str} (Overall attempt {empty_retry_attempt_num})")
542
-
543
- final_sse_strings_to_yield = sse_strings_this_attempt
544
-
545
- if api_error_in_attempt:
546
- logging.warning(f"【流式请求】({log_attempt_str}) OnDemand 服务返回错误或处理内部错误,将返回此错误信息。")
547
- break
548
-
549
- if accumulated_text_this_attempt:
550
- logging.info(f"【流式请求】({log_attempt_str}) 成功获取非空内容。")
551
- break
552
-
553
- logging.warning(f"【流式请求】({log_attempt_str}) 返回空内容。")
554
- if empty_retry_attempt_num >= max_empty_response_retries:
555
- logging.error(f"【流式请求】达到最大空回复重试次数 ({max_empty_response_retries})。")
556
- empty_error_payload = {
557
- "error": {"message": f"Model returned an empty stream after {max_empty_response_retries} retries.", "type": "empty_stream_error_after_retries", "code": "empty_response"}
558
- }
559
- final_sse_strings_to_yield = [format_openai_sse_delta(empty_error_payload), "data: [DONE]\n\n"]
560
- break
561
-
562
- logging.info(f"【流式请求】空回复,将在1秒后重试下一个Key。当前总尝试 {empty_retry_attempt_num}/{max_empty_response_retries}")
563
- time.sleep(1)
564
 
565
  except requests.exceptions.RequestException as e_req:
566
  logging.warning(f"【流式请求】({log_attempt_str} using key {keymgr.display_key(current_apikey_for_attempt)}) 发生请求级错误: {e_req}")
567
- keymgr.mark_bad(current_apikey_for_attempt)
568
- if empty_retry_attempt_num == 1: # This was the initial_apikey provided by with_valid_key_and_session
569
- raise e_req # Re-raise for the outer key retry mechanism
 
 
 
 
570
 
571
  # If it's an empty-response retry (attempt_num > 1) that failed with RequestException
572
  if empty_retry_attempt_num >= max_empty_response_retries:
573
  logging.error(f"【流式请求】在���后一次空回复重试时发生请求错误。")
574
- final_sse_strings_to_yield = [
575
- format_openai_sse_delta({"error": {"message": f"Request failed on final empty stream retry attempt: {str(e_req)}", "type": "internal_proxy_error"}}),
576
- "data: [DONE]\n\n"
577
- ]
578
- break # Break empty retry loop
579
  time.sleep(1)
580
- # Loop continues, will try to get another new key for the next empty-response retry
581
- continue
582
 
583
- def final_generator_for_response():
584
- if not final_sse_strings_to_yield:
585
- logging.error("【流式请求】final_sse_strings_to_yield 为空,返回通用错误。")
586
- yield format_openai_sse_delta({"error": {"message": "Unexpected empty result in streaming.", "type": "internal_proxy_error"}})
 
 
 
 
 
 
 
 
 
 
 
 
 
 
587
  yield "data: [DONE]\n\n"
588
- else:
589
- for sse_str in final_sse_strings_to_yield:
590
- yield sse_str
591
-
592
- return Response(final_generator_for_response(), content_type='text/event-stream')
 
 
 
 
593
 
594
  def handle_non_stream_request(initial_apikey, initial_session_id, query_str, endpoint_id, openai_model_name_for_response):
595
- """处理非流式聊天补全请求,包含空回复重试逻辑 (每次空回复重试使用新Key/Session)"""
596
  max_empty_response_retries = 5
597
- empty_retry_attempt_num = 0 # Counts total attempts including initial one
598
 
599
  current_apikey_for_attempt = initial_apikey
600
  current_session_id_for_attempt = initial_session_id
601
 
602
- url = f"{ONDEMAND_API_BASE}/sessions/{current_session_id_for_attempt}/query" # URL will change if session_id changes
603
 
604
  while empty_retry_attempt_num < max_empty_response_retries:
605
  empty_retry_attempt_num += 1
606
 
607
- if empty_retry_attempt_num > 1: # This is an empty-response retry, get new key/session
608
  logging.info(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session...")
609
  try:
610
  current_apikey_for_attempt = keymgr.get()
611
  current_session_id_for_attempt = create_session(current_apikey_for_attempt)
612
- url = f"{ONDEMAND_API_BASE}/sessions/{current_session_id_for_attempt}/query" # Update URL with new session
613
  logging.info(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】新Key/Session获取成功: Key={keymgr.display_key(current_apikey_for_attempt)}, Session={current_session_id_for_attempt}")
614
  except (ValueError, requests.exceptions.RequestException) as e_key_session:
615
  logging.warning(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session失败: {e_key_session}")
@@ -664,7 +655,8 @@ def handle_non_stream_request(initial_apikey, initial_session_id, query_str, end
664
 
665
  except requests.exceptions.RequestException as e_req:
666
  logging.warning(f"【同步请求】({log_attempt_str} using key {keymgr.display_key(current_apikey_for_attempt)}) 发生请求级错误: {e_req}")
667
- keymgr.mark_bad(current_apikey_for_attempt)
 
668
  if empty_retry_attempt_num == 1:
669
  raise e_req
670
  if empty_retry_attempt_num >= max_empty_response_retries:
@@ -676,13 +668,12 @@ def handle_non_stream_request(initial_apikey, initial_session_id, query_str, end
676
  logging.error(f"【同步请求】({log_attempt_str}) 处理响应时出错: {e_parse}", exc_info=True)
677
  return jsonify({"error": f"Error processing OnDemand sync response: {str(e_parse)}"}), 502
678
 
679
- logging.error(f"【同步请求】意外退出空回复重试循环。") # Should be unreachable
680
  return jsonify({"error": "Unexpected error in non-stream handling after empty response retries."}), 500
681
 
682
 
683
  @app.route("/v1/models", methods=["GET"])
684
  def list_models():
685
- """返回此代理支持的模型列表,模拟 OpenAI /v1/models 接口"""
686
  model_objects = []
687
  for model_key_alias in MODEL_MAP.keys():
688
  model_objects.append({
@@ -698,7 +689,6 @@ def list_models():
698
 
699
  @app.route("/", methods=["GET"])
700
  def health_check():
701
- """简单的健康检查端点或首页"""
702
  num_keys = len(ONDEMAND_APIKEYS)
703
  key_status_summary = {keymgr.display_key(k): ("OK" if not v["bad"] else f"BAD (since {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(v['bad_ts'])) if v['bad_ts'] else 'N/A'})") for k, v in keymgr.key_status.items()}
704
 
@@ -716,7 +706,6 @@ def health_check():
716
 
717
  if __name__ == "__main__":
718
  log_format = '[%(asctime)s] %(levelname)s in %(module)s (%(funcName)s): %(message)s'
719
- # Use LOG_LEVEL from env if set, otherwise default to INFO
720
  logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO").upper(), format=log_format)
721
 
722
  if not PRIVATE_KEY:
 
101
  Claude is now being connected with a person."""
102
 
103
  # ====== 读取 Huggingface Secret 配置的私有key =======
104
+ PRIVATE_KEY = os.environ.get("PRIVATE_KEY", "")
105
+ SAFE_HEADERS = ["Authorization", "X-API-KEY"]
 
106
 
107
  # 全局接口访问权限检查
108
  def check_private_key():
 
109
  if request.path in ["/", "/favicon.ico"]:
110
+ return None
111
 
112
  key_from_header = None
113
  for header_name in SAFE_HEADERS:
 
117
  key_from_header = key_from_header[len("Bearer "):].strip()
118
  break
119
 
120
+ if not PRIVATE_KEY:
121
  logging.warning("PRIVATE_KEY 未设置,服务将不进行鉴权!")
122
  return None
123
 
124
  if not key_from_header or key_from_header != PRIVATE_KEY:
125
  logging.warning(f"未授权访问尝试: Path={request.path}, IP={request.remote_addr}, Key Provided='{key_from_header[:10]}...'")
126
  return jsonify({"error": "Unauthorized. Correct 'Authorization: Bearer <PRIVATE_KEY>' or 'X-API-KEY: <PRIVATE_KEY>' header is required."}), 401
127
+ return None
128
 
 
129
  app = Flask(__name__)
130
  app.before_request(check_private_key)
131
 
 
132
  ONDEMAND_APIKEYS_STR = os.environ.get("ONDEMAND_APIKEYS", "")
133
  ONDEMAND_APIKEYS = [key.strip() for key in ONDEMAND_APIKEYS_STR.split(',') if key.strip()]
134
+ BAD_KEY_RETRY_INTERVAL = 600
135
 
 
 
 
 
 
136
  MODEL_MAP = {
137
  "gpto3-mini": "predefined-openai-gpto3-mini",
138
  "gpt-4o": "predefined-openai-gpt4o",
 
146
  "gemini-2.0-flash": "predefined-gemini-2.0-flash",
147
  }
148
  DEFAULT_ONDEMAND_MODEL = "predefined-openai-gpt4o"
 
149
 
150
  class KeyManager:
 
151
  def __init__(self, key_list):
152
+ self.key_list = list(key_list)
153
+ self.lock = threading.Lock()
 
154
  self.key_status = {key: {"bad": False, "bad_ts": None} for key in self.key_list}
155
+ self.idx = 0
156
 
157
  def display_key(self, key):
 
158
  if not key or len(key) < 10:
159
  return "INVALID_KEY_FORMAT"
160
  return f"{key[:6]}...{key[-4:]}"
161
 
162
  def get(self):
 
163
  with self.lock:
164
+ if not self.key_list:
165
  logging.error("【KeyManager】API密钥池为空!无法提供密钥。")
166
  raise ValueError("API key pool is empty.")
167
 
168
  now = time.time()
169
  num_keys = len(self.key_list)
170
 
171
+ for i in range(num_keys):
172
  current_key_candidate = self.key_list[self.idx]
173
+ self.idx = (self.idx + 1) % num_keys
 
174
  status = self.key_status[current_key_candidate]
175
 
176
+ if not status["bad"]:
177
  logging.info(f"【KeyManager】选择API KEY: {self.display_key(current_key_candidate)} [状态:正常]")
178
  return current_key_candidate
179
 
 
180
  if status["bad_ts"] and (now - status["bad_ts"] >= BAD_KEY_RETRY_INTERVAL):
181
  logging.info(f"【KeyManager】API KEY: {self.display_key(current_key_candidate)} 达到重试周期,恢复为正常。")
182
  status["bad"] = False
183
  status["bad_ts"] = None
184
  return current_key_candidate
185
 
 
 
186
  logging.warning("【KeyManager】所有API KEY均被标记为不良且未到重试时间。将强制重置所有KEY状态并尝试第一个。")
187
  for key_to_reset in self.key_list:
188
  self.key_status[key_to_reset]["bad"] = False
189
  self.key_status[key_to_reset]["bad_ts"] = None
190
  self.idx = 0
191
+ if self.key_list:
192
  selected_key = self.key_list[0]
193
  logging.info(f"【KeyManager】强制选择API KEY: {self.display_key(selected_key)} [状态:强制重试]")
194
  return selected_key
195
+ else:
196
  logging.error("【KeyManager】在强制重试逻辑中发现密钥池为空!")
197
  raise ValueError("API key pool became empty during forced retry logic.")
198
 
 
199
  def mark_bad(self, key):
 
200
  with self.lock:
201
  if key in self.key_status and not self.key_status[key]["bad"]:
202
  logging.warning(f"【KeyManager】禁用API KEY: {self.display_key(key)}。将在 {BAD_KEY_RETRY_INTERVAL // 60} 分钟后自动重试。")
203
  self.key_status[key]["bad"] = True
204
  self.key_status[key]["bad_ts"] = time.time()
205
 
 
206
  if not ONDEMAND_APIKEYS:
207
  logging.warning("【启动警告】ONDEMAND_APIKEYS 环境变量未设置或为空。服务可能无法正常工作。")
208
  keymgr = KeyManager(ONDEMAND_APIKEYS)
209
 
210
+ ONDEMAND_API_BASE = "https://api.on-demand.io/chat/v1"
211
 
212
  def get_endpoint_id(openai_model_name):
 
213
  normalized_model_name = str(openai_model_name or "").lower().replace(" ", "")
214
  return MODEL_MAP.get(normalized_model_name, DEFAULT_ONDEMAND_MODEL)
215
 
216
  def create_session(apikey, external_user_id=None, plugin_ids=None):
 
 
 
 
 
 
 
 
217
  url = f"{ONDEMAND_API_BASE}/sessions"
218
+ payload = {"externalUserId": external_user_id or str(uuid.uuid4())}
219
+ if plugin_ids is not None:
220
  payload["pluginIds"] = plugin_ids
221
 
222
  headers = {"apikey": apikey, "Content-Type": "application/json"}
223
 
224
  logging.info(f"【OnDemand】尝试创建新会话... URL: {url}, Key: {keymgr.display_key(apikey)}")
225
  try:
226
+ resp = requests.post(url, json=payload, headers=headers, timeout=20)
227
+ resp.raise_for_status()
228
  session_id = resp.json()["data"]["id"]
229
  logging.info(f"【OnDemand】新会话创建成功: {session_id}, Key: {keymgr.display_key(apikey)}")
230
  return session_id
 
236
  raise
237
 
238
  def format_openai_sse_delta(chunk_data_dict):
 
239
  return f"data: {json.dumps(chunk_data_dict, ensure_ascii=False)}\n\n"
240
 
241
+ # Modified: This function is now a generator and returns accumulated text + error flag at the end.
242
  def _execute_one_stream_attempt(apikey_for_attempt, session_id_for_attempt, query_str, endpoint_id, openai_model_name_for_response, current_attempt_num_logging):
243
  """
244
+ Executes one streaming request attempt.
245
+ Yields SSE event strings.
246
+ Returns a tuple: (accumulated_text_content, api_error_yielded_flag)
 
 
 
247
  """
248
  url = f"{ONDEMAND_API_BASE}/sessions/{session_id_for_attempt}/query"
249
  payload = {
 
258
  "Accept": "text/event-stream"
259
  }
260
 
 
261
  accumulated_text_parts = []
262
+ api_error_yielded = False # Flag to indicate if an API error was processed and yielded as SSE
263
 
264
  logging.info(f"【流式请求子尝试 {current_attempt_num_logging}】发送到 OnDemand: Session={session_id_for_attempt}, Endpoint={endpoint_id}, Key={keymgr.display_key(apikey_for_attempt)}")
265
 
266
  try:
267
  with requests.post(url, json=payload, headers=headers, stream=True, timeout=180) as resp:
268
  if resp.status_code != 200:
269
+ api_error_yielded = True
270
  error_text = resp.text
271
  logging.error(f"【OnDemand流错误】请求失败 (子尝试 {current_attempt_num_logging})。状态码: {resp.status_code}, Session: {session_id_for_attempt}, 响应: {error_text[:500]}")
272
  error_payload = {
 
276
  "code": resp.status_code
277
  }
278
  }
279
+ yield format_openai_sse_delta(error_payload)
280
+ yield "data: [DONE]\n\n"
281
+ return "".join(accumulated_text_parts).strip(), api_error_yielded # Return after yielding error
282
 
283
  first_chunk_sent = False
284
  last_line_str = ""
 
294
 
295
  if data_part == "[DONE]":
296
  logging.info(f"【OnDemand流】接收到 [DONE] 信号 (子尝试 {current_attempt_num_logging})。Session: {session_id_for_attempt}")
297
+ yield "data: [DONE]\n\n"
298
+ # No break here, let the function return after the loop
299
+ return "".join(accumulated_text_parts).strip(), api_error_yielded
300
+
301
  elif data_part.startswith("[ERROR]:"):
302
+ api_error_yielded = True
303
  error_json_str = data_part[len("[ERROR]:"):].strip()
304
  logging.warning(f"【OnDemand流】接收到错误事件 (子尝试 {current_attempt_num_logging}): {error_json_str}。Session: {session_id_for_attempt}")
305
  try:
306
  error_obj = json.loads(error_json_str)
307
  except json.JSONDecodeError:
308
  error_obj = {"message": error_json_str, "type": "on_demand_stream_error_format"}
309
+ yield format_openai_sse_delta({"error": error_obj})
310
+ yield "data: [DONE]\n\n"
311
+ return "".join(accumulated_text_parts).strip(), api_error_yielded # Return after yielding error
312
+
313
  else:
314
  try:
315
  event_data = json.loads(data_part)
 
337
  "model": openai_model_name_for_response,
338
  "choices": [{"delta": choice_delta, "index": 0, "finish_reason": None}]
339
  }
340
+ yield format_openai_sse_delta(openai_chunk) # Yield immediately
341
  except json.JSONDecodeError:
342
  logging.warning(f"【OnDemand流】无法解析JSON (子尝试 {current_attempt_num_logging}): {data_part[:100]}... Session: {session_id_for_attempt}")
343
  continue
344
 
345
+ # This part is reached if the loop finishes without an explicit [DONE] or [ERROR] from the stream data itself.
346
+ # This might happen if the stream just ends.
347
+ if not api_error_yielded: # If no error was yielded, and no [DONE] was in data, yield a [DONE]
348
+ if not last_line_str.startswith("data: [DONE]"): # Check if last processed line was not already DONE
349
+ logging.info(f"【OnDemand流】(子尝试 {current_attempt_num_logging}) 流迭代完成,补充发送 [DONE]。Session: {session_id_for_attempt}")
350
+ yield "data: [DONE]\n\n"
351
+
352
+ except requests.exceptions.RequestException:
353
+ # Network/request level errors before or during streaming
354
+ # These should be caught by the caller (handle_stream_request) to decide on retries
355
+ logging.error(f"【OnDemand流】请求过程中发生网络或请求异常 (子尝试 {current_attempt_num_logging}): Session: {session_id_for_attempt}", exc_info=False)
356
+ raise # Re-raise for the caller to handle
357
+
358
+ except Exception as e:
359
+ # Unexpected Python errors during stream processing
360
  logging.error(f"【OnDemand流】处理流时发生未知错误 (子尝试 {current_attempt_num_logging}): {e}, Session: {session_id_for_attempt}", exc_info=True)
361
+ api_error_yielded = True # Mark that an error occurred and we are yielding an SSE for it
362
  error_payload = {
363
  "error": {"message": f"Unknown error during streaming (Attempt {current_attempt_num_logging}): {str(e)}", "type": "unknown_streaming_error_in_attempt"}
364
  }
365
+ yield format_openai_sse_delta(error_payload)
366
+ yield "data: [DONE]\n\n"
367
 
368
+ return "".join(accumulated_text_parts).strip(), api_error_yielded
369
+
370
 
371
  @app.route("/v1/chat/completions", methods=["POST"])
372
  def chat_completions():
 
373
  try:
374
  request_data = request.json
375
  except Exception as e:
 
413
  if not formatted_query_parts:
414
  return jsonify({"error": "No valid content found in 'messages'."}), 400
415
 
 
416
  start_prompt = CLAUDE_SYSTEM_PROMPT + "\n\n" + """下面是对话历史. 你是Assitant角色,请遵从User指令,并用中文尽可能详细的回复。注意,请直接回复! 请不要在开头提出"根据上下文及历史记录"相关的话语。\n"""
417
  final_query_to_ondemand = start_prompt + "\n".join(formatted_query_parts)
418
 
419
  def attempt_ondemand_request_wrapper(current_apikey_from_wrapper, current_session_id_from_wrapper):
 
 
420
  if is_stream_request:
421
+ # Pass the generator directly to Response
422
+ return Response(
423
+ handle_stream_request(current_apikey_from_wrapper, current_session_id_from_wrapper, final_query_to_ondemand, target_endpoint_id, openai_model_name),
424
+ content_type='text/event-stream'
425
+ )
426
  else:
427
  return handle_non_stream_request(current_apikey_from_wrapper, current_session_id_from_wrapper, final_query_to_ondemand, target_endpoint_id, openai_model_name)
428
 
429
  def with_valid_key_and_session(action_func_to_wrap):
 
430
  max_key_retries = len(ONDEMAND_APIKEYS) * 2 if ONDEMAND_APIKEYS else 1
431
  key_retry_count = 0
432
  last_exception_for_key_retry = None
 
438
  selected_apikey_for_outer_retry = keymgr.get()
439
  logging.info(f"【请求处理 - Key轮换尝试 {key_retry_count}/{max_key_retries}】使用 API Key: {keymgr.display_key(selected_apikey_for_outer_retry)},准备创建新会话...")
440
  ondemand_session_id_for_outer_retry = create_session(selected_apikey_for_outer_retry)
 
 
 
 
441
  return action_func_to_wrap(selected_apikey_for_outer_retry, ondemand_session_id_for_outer_retry)
442
 
443
+ except ValueError as ve:
444
  logging.critical(f"【请求处理 - Key轮换尝试 {key_retry_count}】KeyManager 错误: {ve}")
445
  last_exception_for_key_retry = ve
446
+ break
447
+ except requests.exceptions.RequestException as http_err_outer:
448
  last_exception_for_key_retry = http_err_outer
449
  logging.warning(f"【请求处理 - Key轮换尝试 {key_retry_count}】HTTP/请求错误。Key: {keymgr.display_key(selected_apikey_for_outer_retry) if selected_apikey_for_outer_retry else 'N/A'}, Error: {http_err_outer}")
450
+ if selected_apikey_for_outer_retry:
451
  keymgr.mark_bad(selected_apikey_for_outer_retry)
452
 
453
  if key_retry_count >= max_key_retries:
454
  logging.error(f"【请求处理】所有Key轮换尝试均失败。最后错误: {last_exception_for_key_retry}")
455
+ break
456
 
457
  logging.info(f"【请求处理】Key轮换尝试 {key_retry_count} 失败,等待后重试下一个Key...")
458
  time.sleep(1)
459
+ continue
460
+ except Exception as e_outer:
461
  last_exception_for_key_retry = e_outer
462
  logging.error(f"【请求处理 - Key轮换尝试 {key_retry_count}】发生意外严重错误: {e_outer}", exc_info=True)
463
  if selected_apikey_for_outer_retry:
464
  keymgr.mark_bad(selected_apikey_for_outer_retry)
465
+ break
 
466
 
467
  error_message = "All attempts to process the request failed after multiple key/session retries."
468
  if last_exception_for_key_retry:
469
  error_message += f" Last known error during key/session phase: {str(last_exception_for_key_retry)}"
470
  logging.error(error_message)
471
+ # For stream requests, if with_valid_key_and_session fails, we can't return jsonify directly
472
+ # This part might need adjustment if the action_func_to_wrap for stream is expected to return a Response object
473
+ # However, if action_func_to_wrap (attempt_ondemand_request_wrapper) for stream returns a Response,
474
+ # then this jsonify will only be hit if create_session or keymgr.get fails repeatedly.
475
+ if is_stream_request:
476
+ # Construct a generator that yields an error SSE
477
+ def error_stream_gen():
478
+ yield format_openai_sse_delta({"error": {"message": error_message, "type": "proxy_setup_error", "code": 503}})
479
+ yield "data: [DONE]\n\n"
480
+ return Response(error_stream_gen(), content_type='text/event-stream', status=503)
481
+ else:
482
+ return jsonify({"error": error_message}), 503
483
 
484
  return with_valid_key_and_session(attempt_ondemand_request_wrapper)
485
 
486
+ # Modified: This function is now a generator that uses `yield from`
487
  def handle_stream_request(initial_apikey, initial_session_id, query_str, endpoint_id, openai_model_name_for_response):
488
+ """
489
+ Handles streaming chat completion requests with empty response retries.
490
+ Each empty response retry uses a new API key and session.
491
+ Yields SSE event strings directly.
492
+ """
493
  max_empty_response_retries = 5
494
  empty_retry_attempt_num = 0
495
 
496
  current_apikey_for_attempt = initial_apikey
497
  current_session_id_for_attempt = initial_session_id
498
 
 
 
499
  while empty_retry_attempt_num < max_empty_response_retries:
500
  empty_retry_attempt_num += 1
501
+ accumulated_text_this_attempt = ""
502
+ api_error_in_attempt = False
503
 
504
+ if empty_retry_attempt_num > 1:
505
  logging.info(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session...")
506
  try:
507
  current_apikey_for_attempt = keymgr.get()
 
509
  logging.info(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】新Key/Session获取成功: Key={keymgr.display_key(current_apikey_for_attempt)}, Session={current_session_id_for_attempt}")
510
  except (ValueError, requests.exceptions.RequestException) as e_key_session:
511
  logging.warning(f"【流式请求-空回复重��� {empty_retry_attempt_num-1}】获取新Key/Session失败: {e_key_session}")
512
+ if current_apikey_for_attempt and not isinstance(e_key_session, ValueError):
513
  keymgr.mark_bad(current_apikey_for_attempt)
514
  if empty_retry_attempt_num >= max_empty_response_retries:
515
+ yield format_openai_sse_delta({"error": {"message": f"Failed to get new key/session for final empty stream retry. Error: {str(e_key_session)}", "type": "internal_proxy_error"}})
516
+ yield "data: [DONE]\n\n"
517
+ return
 
 
518
  time.sleep(1)
519
+ current_apikey_for_attempt = None
520
+ continue
521
 
 
522
  log_attempt_str = f"初始尝试" if empty_retry_attempt_num == 1 else f"空回复重试 {empty_retry_attempt_num-1}"
523
 
524
  try:
525
+ # Yield from the sub-generator; result_tuple will be (accumulated_text, api_error_yielded_flag)
526
+ # This is where the true streaming to the client happens chunk by chunk.
527
+ result_tuple = yield from _execute_one_stream_attempt(
528
+ current_apikey_for_attempt,
529
+ current_session_id_for_attempt,
530
+ query_str,
531
+ endpoint_id,
532
+ openai_model_name_for_response,
533
+ f"{log_attempt_str} (Overall attempt {empty_retry_attempt_num})"
534
+ )
535
+ accumulated_text_this_attempt = result_tuple[0]
536
+ api_error_in_attempt = result_tuple[1]
 
 
 
 
 
 
 
 
 
 
 
 
537
 
538
  except requests.exceptions.RequestException as e_req:
539
  logging.warning(f"【流式请求】({log_attempt_str} using key {keymgr.display_key(current_apikey_for_attempt)}) 发生请求级错误: {e_req}")
540
+ if current_apikey_for_attempt: # Ensure key is marked bad if one was used
541
+ keymgr.mark_bad(current_apikey_for_attempt)
542
+
543
+ if empty_retry_attempt_num == 1:
544
+ # This was the initial_apikey. Re-raise for the outer key retry mechanism in with_valid_key_and_session.
545
+ # The with_valid_key_and_session will then try a new key for the *entire* operation.
546
+ raise e_req
547
 
548
  # If it's an empty-response retry (attempt_num > 1) that failed with RequestException
549
  if empty_retry_attempt_num >= max_empty_response_retries:
550
  logging.error(f"【流式请求】在���后一次空回复重试时发生请求错误。")
551
+ yield format_openai_sse_delta({"error": {"message": f"Request failed on final empty stream retry attempt: {str(e_req)}", "type": "internal_proxy_error"}})
552
+ yield "data: [DONE]\n\n"
553
+ return
 
 
554
  time.sleep(1)
555
+ continue # To the next iteration of the empty_retry_attempt_num loop (will try new key/session)
 
556
 
557
+ # Check results after _execute_one_stream_attempt has finished for this attempt
558
+ if api_error_in_attempt:
559
+ logging.warning(f"【流式请求】({log_attempt_str}) OnDemand 服务返回错误或处理内部错误,已将错误信息流式传输。")
560
+ # Error already yielded by _execute_one_stream_attempt, so we just stop.
561
+ return
562
+
563
+ if accumulated_text_this_attempt:
564
+ logging.info(f"【流式请求】({log_attempt_str}) 成功获取非空内容。")
565
+ # Content already yielded by _execute_one_stream_attempt. We are done.
566
+ return
567
+
568
+ # If we reach here, content was empty and no API error was yielded by _execute_one_stream_attempt
569
+ logging.warning(f"【流式请求】({log_attempt_str}) 返回空内容。")
570
+ if empty_retry_attempt_num >= max_empty_response_retries:
571
+ logging.error(f"【流式请求】达到最大空回复重试次数 ({max_empty_response_retries})。")
572
+ yield format_openai_sse_delta({
573
+ "error": {"message": f"Model returned an empty stream after {max_empty_response_retries} retries.", "type": "empty_stream_error_after_retries", "code": "empty_response"}
574
+ })
575
  yield "data: [DONE]\n\n"
576
+ return
577
+
578
+ logging.info(f"【流式请求】空回复,将在1秒后重试下一个Key。当前总尝试 {empty_retry_attempt_num}/{max_empty_response_retries}")
579
+ time.sleep(1)
580
+ # Fallback if loop finishes unexpectedly (shouldn't happen with current logic)
581
+ logging.error("【流式请求】意外退出空回复重试循环。")
582
+ yield format_openai_sse_delta({"error": {"message": "Unexpected error in stream handling.", "type": "internal_proxy_error"}})
583
+ yield "data: [DONE]\n\n"
584
+
585
 
586
  def handle_non_stream_request(initial_apikey, initial_session_id, query_str, endpoint_id, openai_model_name_for_response):
 
587
  max_empty_response_retries = 5
588
+ empty_retry_attempt_num = 0
589
 
590
  current_apikey_for_attempt = initial_apikey
591
  current_session_id_for_attempt = initial_session_id
592
 
593
+ url = f"{ONDEMAND_API_BASE}/sessions/{current_session_id_for_attempt}/query"
594
 
595
  while empty_retry_attempt_num < max_empty_response_retries:
596
  empty_retry_attempt_num += 1
597
 
598
+ if empty_retry_attempt_num > 1:
599
  logging.info(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session...")
600
  try:
601
  current_apikey_for_attempt = keymgr.get()
602
  current_session_id_for_attempt = create_session(current_apikey_for_attempt)
603
+ url = f"{ONDEMAND_API_BASE}/sessions/{current_session_id_for_attempt}/query"
604
  logging.info(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】新Key/Session获取成功: Key={keymgr.display_key(current_apikey_for_attempt)}, Session={current_session_id_for_attempt}")
605
  except (ValueError, requests.exceptions.RequestException) as e_key_session:
606
  logging.warning(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session失败: {e_key_session}")
 
655
 
656
  except requests.exceptions.RequestException as e_req:
657
  logging.warning(f"【同步请求】({log_attempt_str} using key {keymgr.display_key(current_apikey_for_attempt)}) 发生请求级错误: {e_req}")
658
+ if current_apikey_for_attempt: # Ensure key is marked bad
659
+ keymgr.mark_bad(current_apikey_for_attempt)
660
  if empty_retry_attempt_num == 1:
661
  raise e_req
662
  if empty_retry_attempt_num >= max_empty_response_retries:
 
668
  logging.error(f"【同步请求】({log_attempt_str}) 处理响应时出错: {e_parse}", exc_info=True)
669
  return jsonify({"error": f"Error processing OnDemand sync response: {str(e_parse)}"}), 502
670
 
671
+ logging.error(f"【同步请求】意外退出空回复重试循环。")
672
  return jsonify({"error": "Unexpected error in non-stream handling after empty response retries."}), 500
673
 
674
 
675
  @app.route("/v1/models", methods=["GET"])
676
  def list_models():
 
677
  model_objects = []
678
  for model_key_alias in MODEL_MAP.keys():
679
  model_objects.append({
 
689
 
690
  @app.route("/", methods=["GET"])
691
  def health_check():
 
692
  num_keys = len(ONDEMAND_APIKEYS)
693
  key_status_summary = {keymgr.display_key(k): ("OK" if not v["bad"] else f"BAD (since {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(v['bad_ts'])) if v['bad_ts'] else 'N/A'})") for k, v in keymgr.key_status.items()}
694
 
 
706
 
707
  if __name__ == "__main__":
708
  log_format = '[%(asctime)s] %(levelname)s in %(module)s (%(funcName)s): %(message)s'
 
709
  logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO").upper(), format=log_format)
710
 
711
  if not PRIVATE_KEY: