Ge-AI commited on
Commit
e7fae93
·
verified ·
1 Parent(s): fb55624

Update openai_ondemand_adapter.py

Browse files
Files changed (1) hide show
  1. openai_ondemand_adapter.py +225 -223
openai_ondemand_adapter.py CHANGED
@@ -7,7 +7,8 @@ import threading
7
  import logging
8
  import os
9
 
10
- CLAUDE_SYSTEM_PROMOT = """The assistant is Claude, created by Anthropic.
 
11
 
12
  Claude enjoys helping humans and sees its role as an intelligent and kind assistant to the people, with depth and wisdom that makes it more than a mere tool.
13
 
@@ -267,18 +268,16 @@ def format_openai_sse_delta(chunk_data_dict):
267
  """将数据块格式化为 OpenAI SSE (Server-Sent Events) 流格式"""
268
  return f"data: {json.dumps(chunk_data_dict, ensure_ascii=False)}\n\n"
269
 
270
- # --- chat_completions 辅助函数,用于流式请求的单个尝试 ---
271
- # 这个函数在 chat_completions 内部定义,或者在它可以访问 ONDEMAND_API_BASE, format_openai_sse_delta 等全局/闭包变量的地方定义
272
- def _execute_one_stream_attempt(apikey, session_id, query_str, endpoint_id, openai_model_name_for_response, attempt_num_logging):
273
  """
274
  执行一次流式请求尝试。
275
  返回: (generated_sse_strings, accumulated_text_content, api_error_occurred)
276
  generated_sse_strings: 此尝试生成的所有SSE事件字符串列表。
277
  accumulated_text_content: 从流中累积的纯文本内容。
278
- api_error_occurred: 布尔值,指示此尝试是否遇到可恢复的API错误(例如,非200状态码但被处理为SSE错误事件
279
- 注意:网络超时等 requests.RequestException 会被直接抛出。
280
  """
281
- url = f"{ONDEMAND_API_BASE}/sessions/{session_id}/query"
282
  payload = {
283
  "query": query_str,
284
  "endpointId": endpoint_id,
@@ -286,26 +285,26 @@ def _execute_one_stream_attempt(apikey, session_id, query_str, endpoint_id, open
286
  "responseMode": "stream"
287
  }
288
  headers = {
289
- "apikey": apikey,
290
  "Content-Type": "application/json",
291
  "Accept": "text/event-stream"
292
  }
293
 
294
  generated_sse_strings = []
295
  accumulated_text_parts = []
296
- api_error_handled_as_sse = False # 标记是否已将API错误转换为SSE事件
297
 
298
- logging.info(f"【流式请求子尝试 {attempt_num_logging}】发送到 OnDemand: Session={session_id}, Endpoint={endpoint_id}, Key={keymgr.display_key(apikey)}")
299
 
300
  try:
301
  with requests.post(url, json=payload, headers=headers, stream=True, timeout=180) as resp:
302
  if resp.status_code != 200:
303
  api_error_handled_as_sse = True
304
  error_text = resp.text
305
- logging.error(f"【OnDemand流错误】请求失败 (子尝试 {attempt_num_logging})。状态码: {resp.status_code}, Session: {session_id}, 响应: {error_text[:500]}")
306
  error_payload = {
307
  "error": {
308
- "message": f"OnDemand API Error (Stream Init, Attempt {attempt_num_logging}): {resp.status_code} - {error_text[:200]}",
309
  "type": "on_demand_api_error",
310
  "code": resp.status_code
311
  }
@@ -315,31 +314,31 @@ def _execute_one_stream_attempt(apikey, session_id, query_str, endpoint_id, open
315
  return generated_sse_strings, "".join(accumulated_text_parts), api_error_handled_as_sse
316
 
317
  first_chunk_sent = False
318
- last_line_str = "" # 用于检查流是否以[DONE]结束
319
  for line_bytes in resp.iter_lines():
320
  if not line_bytes:
321
  continue
322
 
323
  line_str = line_bytes.decode("utf-8")
324
- last_line_str = line_str # 跟踪最后一行,以防流意外终止
325
 
326
  if line_str.startswith("data:"):
327
  data_part = line_str[len("data:"):].strip()
328
 
329
  if data_part == "[DONE]":
330
- logging.info(f"【OnDemand流】接收到 [DONE] 信号 (子尝试 {attempt_num_logging})。Session: {session_id}")
331
  generated_sse_strings.append("data: [DONE]\n\n")
332
  break
333
  elif data_part.startswith("[ERROR]:"):
334
- api_error_handled_as_sse = True # OnDemand流内错误
335
  error_json_str = data_part[len("[ERROR]:"):].strip()
336
- logging.warning(f"【OnDemand流】接收到错误事件 (子尝试 {attempt_num_logging}): {error_json_str}。Session: {session_id}")
337
  try:
338
  error_obj = json.loads(error_json_str)
339
  except json.JSONDecodeError:
340
  error_obj = {"message": error_json_str, "type": "on_demand_stream_error_format"}
341
  generated_sse_strings.append(format_openai_sse_delta({"error": error_obj}))
342
- generated_sse_strings.append("data: [DONE]\n\n") # 错误后也发送DONE
343
  break
344
  else:
345
  try:
@@ -358,7 +357,6 @@ def _execute_one_stream_attempt(apikey, session_id, query_str, endpoint_id, open
358
  choice_delta["content"] = delta_content
359
 
360
  if not choice_delta.get("content") and not choice_delta.get("role"):
361
- # 避免发送完全空的 delta 对象,除非它是第一个角色块
362
  if not (choice_delta.get("role") and not choice_delta.get("content")):
363
  continue
364
 
@@ -367,45 +365,30 @@ def _execute_one_stream_attempt(apikey, session_id, query_str, endpoint_id, open
367
  "object": "chat.completion.chunk",
368
  "created": int(time.time()),
369
  "model": openai_model_name_for_response,
370
- "choices": [{
371
- "delta": choice_delta,
372
- "index": 0,
373
- "finish_reason": None
374
- }]
375
  }
376
  generated_sse_strings.append(format_openai_sse_delta(openai_chunk))
377
  except json.JSONDecodeError:
378
- logging.warning(f"【OnDemand流】无法解析JSON数据块 (子尝试 {attempt_num_logging}): {data_part[:100]}... Session: {session_id}")
379
- # 可以选择忽略,或者也作为一种错误事件发送
380
- # generated_sse_strings.append(f"event: warning\ndata: Malformed JSON in stream: {data_part[:100]}\n\n")
381
  continue
382
 
383
- # 如果循环正常结束但最后一行不是 [DONE] 且没有API错误,补充一个 [DONE]
384
  if not last_line_str.startswith("data: [DONE]") and not api_error_handled_as_sse:
385
- logging.info(f"【OnDemand流】(子尝试 {attempt_num_logging}) 流迭代完成,补充发送 [DONE]。Session: {session_id}")
386
  generated_sse_strings.append("data: [DONE]\n\n")
387
 
388
- except requests.exceptions.RequestException as e:
389
- # 网络/请求级别错误,应由更上层的重试逻辑(如 with_valid_key_and_session)处理
390
- logging.error(f"【OnDemand流】请求过程中发生网络或请求异常 (子尝试 {attempt_num_logging}): {e}, Session: {session_id}", exc_info=False) # exc_info=False for brevity
391
- raise # 重要:重新抛出,让调用者处理API Key/网络层面的重试
392
- except Exception as e:
393
- # 此处捕获在流处理中发生的其他意外Python错误
394
- api_error_handled_as_sse = True # 将其视为一种API错误,以便返回错误信息给客户端
395
- logging.error(f"【OnDemand流】处理流时发生未知错误 (子尝试 {attempt_num_logging}): {e}, Session: {session_id}", exc_info=True)
396
  error_payload = {
397
- "error": {
398
- "message": f"Unknown error during streaming (Attempt {attempt_num_logging}): {str(e)}",
399
- "type": "unknown_streaming_error_in_attempt"
400
- }
401
  }
402
  generated_sse_strings.append(format_openai_sse_delta(error_payload))
403
  generated_sse_strings.append("data: [DONE]\n\n")
404
- # 不重新抛出,因为我们已经格式化了错误信息以便通过SSE发送
405
 
406
  return generated_sse_strings, "".join(accumulated_text_parts).strip(), api_error_handled_as_sse
407
 
408
-
409
  @app.route("/v1/chat/completions", methods=["POST"])
410
  def chat_completions():
411
  """处理聊天补全请求,模拟 OpenAI /v1/chat/completions 接口"""
@@ -422,166 +405,184 @@ def chat_completions():
422
  if not isinstance(messages, list) or not messages:
423
  return jsonify({"error": "'messages' must be a non-empty list."}), 400
424
 
425
- openai_model_name = request_data.get("model", "gpt-4o") # 默认为 gpt-4o
426
  target_endpoint_id = get_endpoint_id(openai_model_name)
427
  is_stream_request = bool(request_data.get("stream", False))
428
 
429
- # --- 构造发送给 OnDemand 的 query 字符串 ---
430
  formatted_query_parts = []
431
  for msg in messages:
432
  role = msg.get("role", "user").strip().capitalize()
433
- content = msg.get("content", "") # content可以是字符串或列表(例如包含图片时)
434
  content_string = ""
435
- if isinstance(content, list): # 处理OpenAI content为列表的情况 (通常用于多模态)
436
- # OnDemand的query字段可能只接受文本。这里简单拼接文本部分。
437
- # 您可能需要根据OnDemand API如何处理多模态输入来调整此逻辑。
438
  temp_parts = []
439
  for item in content:
440
  if isinstance(item, dict) and item.get("type") == "text":
441
  temp_parts.append(item.get("text", ""))
442
- # elif isinstance(item, dict) and item.get("type") == "image_url":
443
- # temp_parts.append("[Image Content Not Transmitted To Text-Only OnDemand Query]") # 示例
444
- # 按照用户原始代码逻辑处理 list content
445
- elif isinstance(item, dict): # 用户原始逻辑
446
- for k, v_item in item.items(): # 修改变量名 v -> v_item 避免与外层冲突
447
- content_string += f"{k}: {v_item}\n{k}: {v_item}" # 用户原始逻辑
448
- if not content_string and temp_parts: # 如果原始逻辑未产生字符串,但有文本部分
449
  content_string = "\n".join(filter(None, temp_parts))
450
-
451
  elif isinstance(content, str):
452
  content_string = content
453
 
454
  content_string = content_string.strip()
455
- if not content_string: # 跳过空内容的消息
456
  continue
457
- formatted_query_parts.append(f"<|{role}|>: {content_string}") # 使用用户指定的格式
458
 
459
  if not formatted_query_parts:
460
  return jsonify({"error": "No valid content found in 'messages'."}), 400
461
 
462
- start_prompt = CLAUDE_SYSTEM_PROMOT + "\n\n" + """下面是对话历史. 你是Assitant角色,请遵从User指令,并用中文尽可能详细的回复。注意,请直接回复! 请不要在开头提出"根据上下文及历史记录"相关的话语。\n"""
 
463
  final_query_to_ondemand = start_prompt + "\n".join(formatted_query_parts)
464
 
465
- # --- 结束构造 query ---
466
-
467
- # 内部函数,用于封装实际的API调用逻辑,方便重试和密钥管理
468
- def attempt_ondemand_request(current_apikey, current_session_id):
469
  if is_stream_request:
470
- return handle_stream_request(current_apikey, current_session_id, final_query_to_ondemand, target_endpoint_id, openai_model_name)
471
  else:
472
- return handle_non_stream_request(current_apikey, current_session_id, final_query_to_ondemand, target_endpoint_id, openai_model_name)
473
 
474
- # 装饰器/高阶函数,用于管理API密钥获取、会话创建和重试逻辑
475
- def with_valid_key_and_session(action_func):
476
- max_retries = len(ONDEMAND_APIKEYS) * 2 if ONDEMAND_APIKEYS else 1
477
- retries_count = 0
478
- last_exception_seen = None
479
 
480
- while retries_count < max_retries:
481
- selected_apikey = None
 
482
  try:
483
- selected_apikey = keymgr.get()
484
- logging.info(f"【请求处理】使用 API Key: {keymgr.display_key(selected_apikey)},准备创建新会话...")
485
- ondemand_session_id = create_session(selected_apikey)
486
- return action_func(selected_apikey, ondemand_session_id)
487
-
488
- except ValueError as ve:
489
- logging.critical(f"【请求处理】KeyManager 错误: {ve}")
490
- last_exception_seen = ve
491
- break
492
- except requests.HTTPError as http_err:
493
- last_exception_seen = http_err
494
- response = http_err.response
495
- logging.warning(f"【请求处理】HTTP 错误发生。状态码: {response.status_code if response else 'N/A'}, Key: {keymgr.display_key(selected_apikey) if selected_apikey else 'N/A'}")
496
- if selected_apikey and response is not None:
497
- if response.status_code in (401, 403, 429):
498
- keymgr.mark_bad(selected_apikey)
499
- retries_count += 1
500
- logging.info(f"【请求处理】尝试次数: {retries_count}/{max_retries}. 等待片刻后重试...")
 
 
 
 
 
 
501
  time.sleep(1)
502
- continue
503
- except requests.exceptions.Timeout as timeout_err: # 更明确地捕获 Timeout
504
- last_exception_seen = timeout_err # timeout_err 而不是字符串
505
- logging.warning(f"【请求处理】请求超时。Key: {keymgr.display_key(selected_apikey) if selected_apikey else 'N/A'}, Error: {timeout_err}")
506
- if selected_apikey:
507
- keymgr.mark_bad(selected_apikey)
508
- retries_count += 1
509
- logging.info(f"【请求处理】尝试次数: {retries_count}/{max_retries}. 等待片刻后重试...")
510
- time.sleep(1)
511
- continue
512
- except requests.exceptions.RequestException as req_ex: # 其他网络相关错误
513
- last_exception_seen = req_ex
514
- logging.warning(f"【请求处理】网络请求错误。Key: {keymgr.display_key(selected_apikey) if selected_apikey else 'N/A'}, Error: {req_ex}")
515
- if selected_apikey: # 对于一般网络错误,也可能标记key
516
- keymgr.mark_bad(selected_apikey)
517
- retries_count += 1
518
- logging.info(f"【请求处理】尝试次数: {retries_count}/{max_retries}. 等待片刻后重试...")
519
- time.sleep(1)
520
- continue
521
- except Exception as e:
522
- last_exception_seen = e
523
- logging.error(f"【请求处理】发生意外的严重错误: {e}", exc_info=True)
524
- if selected_apikey:
525
- keymgr.mark_bad(selected_apikey)
526
- retries_count += 1
527
- # break # 对于非常严重的未知错误,可以选择直接中断
528
 
529
- error_message = "All attempts to process the request failed after multiple retries."
530
- if last_exception_seen:
531
- error_message += f" Last known error: {str(last_exception_seen)}"
532
  logging.error(error_message)
533
- return jsonify({"error": "Failed to process request with OnDemand service after multiple retries. Please check service status or API keys."}), 503
534
 
535
- return with_valid_key_and_session(attempt_ondemand_request)
536
 
537
 
538
- def handle_stream_request(apikey, session_id, query_str, endpoint_id, openai_model_name_for_response):
539
- """处理流式聊天补全请求,包含空回复重试逻辑"""
540
  max_empty_response_retries = 5
541
- attempt_count = 0
 
 
 
542
 
543
  final_sse_strings_to_yield = []
544
 
545
- while attempt_count < max_empty_response_retries:
546
- attempt_count += 1
547
 
548
- # _execute_one_stream_attempt 可能会抛出 requests.RequestException (如超时、连接错误)
549
- # 这些异常会由 with_valid_key_and_session 捕获并处理 (可能更换key重试)
550
- sse_strings_this_attempt, accumulated_text_this_attempt, api_error_in_attempt = \
551
- _execute_one_stream_attempt(apikey, session_id, query_str, endpoint_id, openai_model_name_for_response, attempt_count)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
552
 
553
- final_sse_strings_to_yield = sse_strings_this_attempt # 保存当前尝试的结果,无论好坏
 
554
 
555
- if api_error_in_attempt:
556
- logging.warning(f"【流式请求】尝试 {attempt_count} OnDemand 服务返回错误或处理内部错误,将返回此错误信息给客户端。")
557
- break # 退出空回复重试循环,直接使用包含错误信息的 final_sse_strings_to_yield
558
 
559
- if accumulated_text_this_attempt:
560
- logging.info(f"【流式请求】尝试 {attempt_count} 成功获取非空内容。")
561
- break # 成功获取内容,退出空回复重试循环
562
-
563
- # 到这里说明内容为空,且没有API错误
564
- logging.warning(f"【流式请求】尝试 {attempt_count} 返回空内容。总共尝试次数 {max_empty_response_retries}。")
565
- if attempt_count >= max_empty_response_retries:
566
- logging.error(f"【流式请求】达到最大空回复重试次数 ({max_empty_response_retries})。将返回空回复错误。")
567
- # 构造一个表示空回复错误的SSE事件
568
- empty_error_payload = {
569
- "error": {
570
- "message": f"Model returned an empty stream after {max_empty_response_retries} retries.",
571
- "type": "empty_stream_error_after_retries",
572
- "code": "empty_response"
573
- }
574
- }
575
- final_sse_strings_to_yield = [format_openai_sse_delta(empty_error_payload), "data: [DONE]\n\n"]
576
- break # 退出循环,使用这个错误信息
577
-
578
- logging.info(f"【流式请求】空回复,将在1秒后重试。当前尝试 {attempt_count}/{max_empty_response_retries}")
579
- time.sleep(1) # 等待1秒再进行下一次空回复重试
580
 
581
- # 定义最终的生成器,用于Response对象
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
582
  def final_generator_for_response():
583
- if not final_sse_strings_to_yield: # 以防万一 final_sse_strings_to_yield 为空
584
- logging.error("【流式请求】final_sse_strings_to_yield 为空,这不应该发生。返回通用错误。")
585
  yield format_openai_sse_delta({"error": {"message": "Unexpected empty result in streaming.", "type": "internal_proxy_error"}})
586
  yield "data: [DONE]\n\n"
587
  else:
@@ -590,92 +591,92 @@ def handle_stream_request(apikey, session_id, query_str, endpoint_id, openai_mod
590
 
591
  return Response(final_generator_for_response(), content_type='text/event-stream')
592
 
593
-
594
- def handle_non_stream_request(apikey, session_id, query_str, endpoint_id, openai_model_name_for_response):
595
- """处理非流式聊天补全请求,包含空回复重试逻辑"""
596
- url = f"{ONDEMAND_API_BASE}/sessions/{session_id}/query"
597
- payload = {
598
- "query": query_str,
599
- "endpointId": endpoint_id,
600
- "pluginIds": [],
601
- "responseMode": "sync"
602
- }
603
- headers = {"apikey": apikey, "Content-Type": "application/json"}
604
-
605
  max_empty_response_retries = 5
606
- empty_response_retry_count = 0
 
 
 
 
 
607
 
608
- while empty_response_retry_count < max_empty_response_retries:
609
- empty_response_retry_count += 1
610
- logging.info(f"【同步请求】尝试 #{empty_response_retry_count}/{max_empty_response_retries}. Session={session_id}, Endpoint={endpoint_id}, Key={keymgr.display_key(apikey)}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
611
 
612
  try:
613
  resp = requests.post(url, json=payload, headers=headers, timeout=120)
614
- resp.raise_for_status() # 检查HTTP错误 (4xx, 5xx) - 这会被 with_valid_key_and_session 捕获
615
 
616
  response_json = resp.json()
617
  if "data" not in response_json or "answer" not in response_json["data"]:
618
- logging.error(f"【OnDemand同步错误】响应格式不符合预期 (尝试 {empty_response_retry_count})。Session: {session_id}, 响应: {str(response_json)[:500]}")
619
- # 这种格式错误不计为空回复重试,而是视为API行为异常,可能需要上层重试或失败
620
- # 为了简单起见,如果上层 with_valid_key_and_session 不处理这种 ValueError,这里我们直接返回错误
621
- # 或者可以抛出自定义异常让上层处理
622
  raise ValueError("OnDemand API sync response missing 'data.answer' field.")
623
 
624
  ai_response_content = response_json["data"]["answer"]
625
- if ai_response_content is None:
626
- ai_response_content = ""
627
 
628
- if ai_response_content.strip(): # 如果内容非空
629
- logging.info(f"【同步请求】尝试 {empty_response_retry_count} 成功获取非空内容。")
630
  openai_response_obj = {
631
- "id": "chatcmpl-" + str(uuid.uuid4())[:12],
632
- "object": "chat.completion",
633
- "created": int(time.time()),
634
  "model": openai_model_name_for_response,
635
- "choices": [{
636
- "index": 0,
637
- "message": {"role": "assistant", "content": ai_response_content},
638
- "finish_reason": "stop"
639
- }],
640
  "usage": {}
641
  }
642
  return jsonify(openai_response_obj)
643
- else: # 内容为空
644
- logging.warning(f"【同步请求】尝试 {empty_response_retry_count} 返回空回复。Session: {session_id}")
645
- if empty_response_retry_count >= max_empty_response_retries:
646
- logging.error(f"【同步请求】达到最大空回复重试次数 ({max_empty_response_retries})。将返回空回复错误。")
647
- # 返回一个表示错误的JSON响应
648
  return jsonify({
649
  "error": f"Model returned an empty response after {max_empty_response_retries} retries.",
650
- "id": "chatcmpl-" + str(uuid.uuid4())[:12],
651
- "object": "chat.completion", # 保持对象类型一致
652
- "created": int(time.time()),
653
  "model": openai_model_name_for_response,
654
- "choices": [{
655
- "index": 0,
656
- "message": {"role": "assistant", "content": ""}, # 空内容
657
- "finish_reason": "length" # 或 "stop", 或自定义 "empty_response"
658
- }],
659
  "usage": {}
660
- }), 500 # 使用 500 Internal Server Error 或 503 Service Unavailable
661
-
662
- logging.info(f"【同步请求】空回复,将在1秒后重试。当前尝试 {empty_response_retry_count}/{max_empty_response_retries}")
663
- time.sleep(1) # 等待1秒再进行下一次空回复重试
664
-
665
- except requests.exceptions.RequestException as e:
666
- # 网络/请求级别错误 (包括 resp.raise_for_status() 引发的 HTTPError)
667
- # 这些应由 with_valid_key_and_session 处理 (例如更换API Key重试)
668
- logging.warning(f"【同步请求】(尝试 {empty_response_retry_count}) 发生请求级错误: {e}. 将由上层处理重试。")
669
- raise # 重新抛出,让 with_valid_key_and_session 处理
670
- except (ValueError, KeyError, json.JSONDecodeError) as e:
671
- # 解析响应或响应结构错误
672
- logging.error(f"【同步请求】(尝试 {empty_response_retry_count}) 处理响应时出错: {e}", exc_info=True)
673
- # 这种错误通常不应通过简单的空回复重试解决,可能表明API响应格式已更改或存在问题
674
- # 直接返回错误给客户端,或者抛给上层
675
- return jsonify({"error": f"Error processing OnDemand sync response: {str(e)}"}), 502 # Bad Gateway
 
676
 
677
- # 如果循环结束仍未成功(理论上应该在循环内返回或抛出异常)
678
- logging.error(f"【同步请求】意外退出空回复重试循环。这不应该发生。")
679
  return jsonify({"error": "Unexpected error in non-stream handling after empty response retries."}), 500
680
 
681
 
@@ -715,6 +716,7 @@ def health_check():
715
 
716
  if __name__ == "__main__":
717
  log_format = '[%(asctime)s] %(levelname)s in %(module)s (%(funcName)s): %(message)s'
 
718
  logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO").upper(), format=log_format)
719
 
720
  if not PRIVATE_KEY:
 
7
  import logging
8
  import os
9
 
10
+ # Claude System Prompt (as provided by user)
11
+ CLAUDE_SYSTEM_PROMPT = """The assistant is Claude, created by Anthropic.
12
 
13
  Claude enjoys helping humans and sees its role as an intelligent and kind assistant to the people, with depth and wisdom that makes it more than a mere tool.
14
 
 
268
  """将数据块格式化为 OpenAI SSE (Server-Sent Events) 流格式"""
269
  return f"data: {json.dumps(chunk_data_dict, ensure_ascii=False)}\n\n"
270
 
271
+ def _execute_one_stream_attempt(apikey_for_attempt, session_id_for_attempt, query_str, endpoint_id, openai_model_name_for_response, current_attempt_num_logging):
 
 
272
  """
273
  执行一次流式请求尝试。
274
  返回: (generated_sse_strings, accumulated_text_content, api_error_occurred)
275
  generated_sse_strings: 此尝试生成的所有SSE事件字符串列表。
276
  accumulated_text_content: 从流中累积的纯文本内容。
277
+ api_error_occurred: 布尔值,指示此尝试是否遇到可直接转换为SSE错误事件的API错误
278
+ requests.RequestException (如超时) 会被直接抛出。
279
  """
280
+ url = f"{ONDEMAND_API_BASE}/sessions/{session_id_for_attempt}/query"
281
  payload = {
282
  "query": query_str,
283
  "endpointId": endpoint_id,
 
285
  "responseMode": "stream"
286
  }
287
  headers = {
288
+ "apikey": apikey_for_attempt,
289
  "Content-Type": "application/json",
290
  "Accept": "text/event-stream"
291
  }
292
 
293
  generated_sse_strings = []
294
  accumulated_text_parts = []
295
+ api_error_handled_as_sse = False
296
 
297
+ logging.info(f"【流式请求子尝试 {current_attempt_num_logging}】发送到 OnDemand: Session={session_id_for_attempt}, Endpoint={endpoint_id}, Key={keymgr.display_key(apikey_for_attempt)}")
298
 
299
  try:
300
  with requests.post(url, json=payload, headers=headers, stream=True, timeout=180) as resp:
301
  if resp.status_code != 200:
302
  api_error_handled_as_sse = True
303
  error_text = resp.text
304
+ logging.error(f"【OnDemand流错误】请求失败 (子尝试 {current_attempt_num_logging})。状态码: {resp.status_code}, Session: {session_id_for_attempt}, 响应: {error_text[:500]}")
305
  error_payload = {
306
  "error": {
307
+ "message": f"OnDemand API Error (Stream Init, Attempt {current_attempt_num_logging}): {resp.status_code} - {error_text[:200]}",
308
  "type": "on_demand_api_error",
309
  "code": resp.status_code
310
  }
 
314
  return generated_sse_strings, "".join(accumulated_text_parts), api_error_handled_as_sse
315
 
316
  first_chunk_sent = False
317
+ last_line_str = ""
318
  for line_bytes in resp.iter_lines():
319
  if not line_bytes:
320
  continue
321
 
322
  line_str = line_bytes.decode("utf-8")
323
+ last_line_str = line_str
324
 
325
  if line_str.startswith("data:"):
326
  data_part = line_str[len("data:"):].strip()
327
 
328
  if data_part == "[DONE]":
329
+ logging.info(f"【OnDemand流】接收到 [DONE] 信号 (子尝试 {current_attempt_num_logging})。Session: {session_id_for_attempt}")
330
  generated_sse_strings.append("data: [DONE]\n\n")
331
  break
332
  elif data_part.startswith("[ERROR]:"):
333
+ api_error_handled_as_sse = True
334
  error_json_str = data_part[len("[ERROR]:"):].strip()
335
+ logging.warning(f"【OnDemand流】接收到错误事件 (子尝试 {current_attempt_num_logging}): {error_json_str}。Session: {session_id_for_attempt}")
336
  try:
337
  error_obj = json.loads(error_json_str)
338
  except json.JSONDecodeError:
339
  error_obj = {"message": error_json_str, "type": "on_demand_stream_error_format"}
340
  generated_sse_strings.append(format_openai_sse_delta({"error": error_obj}))
341
+ generated_sse_strings.append("data: [DONE]\n\n")
342
  break
343
  else:
344
  try:
 
357
  choice_delta["content"] = delta_content
358
 
359
  if not choice_delta.get("content") and not choice_delta.get("role"):
 
360
  if not (choice_delta.get("role") and not choice_delta.get("content")):
361
  continue
362
 
 
365
  "object": "chat.completion.chunk",
366
  "created": int(time.time()),
367
  "model": openai_model_name_for_response,
368
+ "choices": [{"delta": choice_delta, "index": 0, "finish_reason": None}]
 
 
 
 
369
  }
370
  generated_sse_strings.append(format_openai_sse_delta(openai_chunk))
371
  except json.JSONDecodeError:
372
+ logging.warning(f"【OnDemand流】无法解析JSON (子尝试 {current_attempt_num_logging}): {data_part[:100]}... Session: {session_id_for_attempt}")
 
 
373
  continue
374
 
 
375
  if not last_line_str.startswith("data: [DONE]") and not api_error_handled_as_sse:
376
+ logging.info(f"【OnDemand流】(子尝试 {current_attempt_num_logging}) 流迭代完成,补充发送 [DONE]。Session: {session_id_for_attempt}")
377
  generated_sse_strings.append("data: [DONE]\n\n")
378
 
379
+ except requests.exceptions.RequestException: # Let specific RequestExceptions be handled by the caller
380
+ raise
381
+ except Exception as e: # Catch other unexpected errors during stream processing
382
+ api_error_handled_as_sse = True
383
+ logging.error(f"【OnDemand流】处理流时发生未知错误 (子尝试 {current_attempt_num_logging}): {e}, Session: {session_id_for_attempt}", exc_info=True)
 
 
 
384
  error_payload = {
385
+ "error": {"message": f"Unknown error during streaming (Attempt {current_attempt_num_logging}): {str(e)}", "type": "unknown_streaming_error_in_attempt"}
 
 
 
386
  }
387
  generated_sse_strings.append(format_openai_sse_delta(error_payload))
388
  generated_sse_strings.append("data: [DONE]\n\n")
 
389
 
390
  return generated_sse_strings, "".join(accumulated_text_parts).strip(), api_error_handled_as_sse
391
 
 
392
  @app.route("/v1/chat/completions", methods=["POST"])
393
  def chat_completions():
394
  """处理聊天补全请求,模拟 OpenAI /v1/chat/completions 接口"""
 
405
  if not isinstance(messages, list) or not messages:
406
  return jsonify({"error": "'messages' must be a non-empty list."}), 400
407
 
408
+ openai_model_name = request_data.get("model", "gpt-4o")
409
  target_endpoint_id = get_endpoint_id(openai_model_name)
410
  is_stream_request = bool(request_data.get("stream", False))
411
 
 
412
  formatted_query_parts = []
413
  for msg in messages:
414
  role = msg.get("role", "user").strip().capitalize()
415
+ content = msg.get("content", "")
416
  content_string = ""
417
+ if isinstance(content, list):
 
 
418
  temp_parts = []
419
  for item in content:
420
  if isinstance(item, dict) and item.get("type") == "text":
421
  temp_parts.append(item.get("text", ""))
422
+ elif isinstance(item, dict):
423
+ for k, v_item in item.items():
424
+ content_string += f"{k}: {v_item}\n{k}: {v_item}"
425
+ if not content_string and temp_parts:
 
 
 
426
  content_string = "\n".join(filter(None, temp_parts))
 
427
  elif isinstance(content, str):
428
  content_string = content
429
 
430
  content_string = content_string.strip()
431
+ if not content_string:
432
  continue
433
+ formatted_query_parts.append(f"<|{role}|>: {content_string}")
434
 
435
  if not formatted_query_parts:
436
  return jsonify({"error": "No valid content found in 'messages'."}), 400
437
 
438
+ # Use the globally defined CLAUDE_SYSTEM_PROMPT
439
+ start_prompt = CLAUDE_SYSTEM_PROMPT + "\n\n" + """下面是对话历史. 你是Assitant角色,请遵从User指令,并用中文尽可能详细的回复。注意,请直接回复! 请不要在开头提出"根据上下文及历史记录"相关的话语。\n"""
440
  final_query_to_ondemand = start_prompt + "\n".join(formatted_query_parts)
441
 
442
+ def attempt_ondemand_request_wrapper(current_apikey_from_wrapper, current_session_id_from_wrapper):
443
+ # This inner function is what with_valid_key_and_session calls.
444
+ # It receives the *initial* apikey and session_id.
 
445
  if is_stream_request:
446
+ return handle_stream_request(current_apikey_from_wrapper, current_session_id_from_wrapper, final_query_to_ondemand, target_endpoint_id, openai_model_name)
447
  else:
448
+ return handle_non_stream_request(current_apikey_from_wrapper, current_session_id_from_wrapper, final_query_to_ondemand, target_endpoint_id, openai_model_name)
449
 
450
+ def with_valid_key_and_session(action_func_to_wrap):
451
+ # This is the outer retry loop for API key/session issues for the *first* attempt of action_func_to_wrap
452
+ max_key_retries = len(ONDEMAND_APIKEYS) * 2 if ONDEMAND_APIKEYS else 1
453
+ key_retry_count = 0
454
+ last_exception_for_key_retry = None
455
 
456
+ while key_retry_count < max_key_retries:
457
+ key_retry_count += 1
458
+ selected_apikey_for_outer_retry = None
459
  try:
460
+ selected_apikey_for_outer_retry = keymgr.get()
461
+ logging.info(f"【请求处理 - Key轮换尝试 {key_retry_count}/{max_key_retries}】使用 API Key: {keymgr.display_key(selected_apikey_for_outer_retry)},准备创建新会话...")
462
+ ondemand_session_id_for_outer_retry = create_session(selected_apikey_for_outer_retry)
463
+
464
+ # Call the action_func_to_wrap (which is attempt_ondemand_request_wrapper)
465
+ # This action_func_to_wrap will then call handle_stream_request or handle_non_stream_request
466
+ # If handle_..._request fails its *first* attempt due to RequestException, it re-raises it here.
467
+ return action_func_to_wrap(selected_apikey_for_outer_retry, ondemand_session_id_for_outer_retry)
468
+
469
+ except ValueError as ve: # keymgr.get() failed
470
+ logging.critical(f"【请求处理 - Key轮换尝试 {key_retry_count}】KeyManager 错误: {ve}")
471
+ last_exception_for_key_retry = ve
472
+ break # Cannot get any key, fatal for this request.
473
+ except requests.exceptions.RequestException as http_err_outer: # Covers create_session failure or re-raised error from action_func's first attempt
474
+ last_exception_for_key_retry = http_err_outer
475
+ logging.warning(f"【请求处理 - Key轮换尝试 {key_retry_count}】HTTP/请求错误。Key: {keymgr.display_key(selected_apikey_for_outer_retry) if selected_apikey_for_outer_retry else 'N/A'}, Error: {http_err_outer}")
476
+ if selected_apikey_for_outer_retry: # If a key was involved in this failure
477
+ keymgr.mark_bad(selected_apikey_for_outer_retry)
478
+
479
+ if key_retry_count >= max_key_retries:
480
+ logging.error(f"【请求处理】所有Key轮换尝试均失败。最后错误: {last_exception_for_key_retry}")
481
+ break # Exhausted key retries
482
+
483
+ logging.info(f"【请求处理】Key轮换尝试 {key_retry_count} 失败,等待后重试下一个Key...")
484
  time.sleep(1)
485
+ continue # To the next iteration of the key_retry_count loop
486
+ except Exception as e_outer: # Other unexpected errors during the initial setup/call
487
+ last_exception_for_key_retry = e_outer
488
+ logging.error(f"【请求处理 - Key轮换尝试 {key_retry_count}】发生意外严重错误: {e_outer}", exc_info=True)
489
+ if selected_apikey_for_outer_retry:
490
+ keymgr.mark_bad(selected_apikey_for_outer_retry)
491
+ # For truly unexpected errors, might be better to fail fast
492
+ break # Break outer retry loop
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
493
 
494
+ error_message = "All attempts to process the request failed after multiple key/session retries."
495
+ if last_exception_for_key_retry:
496
+ error_message += f" Last known error during key/session phase: {str(last_exception_for_key_retry)}"
497
  logging.error(error_message)
498
+ return jsonify({"error": error_message}), 503
499
 
500
+ return with_valid_key_and_session(attempt_ondemand_request_wrapper)
501
 
502
 
503
+ def handle_stream_request(initial_apikey, initial_session_id, query_str, endpoint_id, openai_model_name_for_response):
504
+ """处理流式聊天补全请求,包含空回复重试逻辑 (每次空回复重试使用新Key/Session)"""
505
  max_empty_response_retries = 5
506
+ empty_retry_attempt_num = 0
507
+
508
+ current_apikey_for_attempt = initial_apikey
509
+ current_session_id_for_attempt = initial_session_id
510
 
511
  final_sse_strings_to_yield = []
512
 
513
+ while empty_retry_attempt_num < max_empty_response_retries:
514
+ empty_retry_attempt_num += 1
515
 
516
+ if empty_retry_attempt_num > 1: # This is an empty-response retry, get new key/session
517
+ logging.info(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session...")
518
+ try:
519
+ current_apikey_for_attempt = keymgr.get()
520
+ current_session_id_for_attempt = create_session(current_apikey_for_attempt)
521
+ logging.info(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】新Key/Session获取成功: Key={keymgr.display_key(current_apikey_for_attempt)}, Session={current_session_id_for_attempt}")
522
+ except (ValueError, requests.exceptions.RequestException) as e_key_session:
523
+ logging.warning(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session失败: {e_key_session}")
524
+ if current_apikey_for_attempt and not isinstance(e_key_session, ValueError): # If key was obtained but create_session failed
525
+ keymgr.mark_bad(current_apikey_for_attempt)
526
+ if empty_retry_attempt_num >= max_empty_response_retries:
527
+ final_sse_strings_to_yield = [
528
+ format_openai_sse_delta({"error": {"message": f"Failed to get new key/session for final empty stream retry. Error: {str(e_key_session)}", "type": "internal_proxy_error"}}),
529
+ "data: [DONE]\n\n"
530
+ ]
531
+ break # Break empty retry loop
532
+ time.sleep(1)
533
+ current_apikey_for_attempt = None # Ensure it's reset if keymgr.get() failed
534
+ continue # Next iteration of empty_retry_attempt_num loop
535
 
536
+ # Log which attempt this is (1st overall, or nth empty-response retry)
537
+ log_attempt_str = f"初始尝试" if empty_retry_attempt_num == 1 else f"空回复重试 {empty_retry_attempt_num-1}"
538
 
539
+ try:
540
+ sse_strings_this_attempt, accumulated_text_this_attempt, api_error_in_attempt = \
541
+ _execute_one_stream_attempt(current_apikey_for_attempt, current_session_id_for_attempt, query_str, endpoint_id, openai_model_name_for_response, f"{log_attempt_str} (Overall attempt {empty_retry_attempt_num})")
542
 
543
+ final_sse_strings_to_yield = sse_strings_this_attempt
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
544
 
545
+ if api_error_in_attempt:
546
+ logging.warning(f"【流式请求】({log_attempt_str}) OnDemand 服务返回错误或处理内部错误,将返回此错误信息。")
547
+ break
548
+
549
+ if accumulated_text_this_attempt:
550
+ logging.info(f"【流式请求】({log_attempt_str}) 成功获取非空内容。")
551
+ break
552
+
553
+ logging.warning(f"【流式请求】({log_attempt_str}) 返回空内容。")
554
+ if empty_retry_attempt_num >= max_empty_response_retries:
555
+ logging.error(f"【流式请求】达到最大空回复重试次数 ({max_empty_response_retries})。")
556
+ empty_error_payload = {
557
+ "error": {"message": f"Model returned an empty stream after {max_empty_response_retries} retries.", "type": "empty_stream_error_after_retries", "code": "empty_response"}
558
+ }
559
+ final_sse_strings_to_yield = [format_openai_sse_delta(empty_error_payload), "data: [DONE]\n\n"]
560
+ break
561
+
562
+ logging.info(f"【流式请求】空回复,将在1秒后重试下一个Key。当前总尝试 {empty_retry_attempt_num}/{max_empty_response_retries}")
563
+ time.sleep(1)
564
+
565
+ except requests.exceptions.RequestException as e_req:
566
+ logging.warning(f"【流式请求】({log_attempt_str} using key {keymgr.display_key(current_apikey_for_attempt)}) 发生请求级错误: {e_req}")
567
+ keymgr.mark_bad(current_apikey_for_attempt)
568
+ if empty_retry_attempt_num == 1: # This was the initial_apikey provided by with_valid_key_and_session
569
+ raise e_req # Re-raise for the outer key retry mechanism
570
+
571
+ # If it's an empty-response retry (attempt_num > 1) that failed with RequestException
572
+ if empty_retry_attempt_num >= max_empty_response_retries:
573
+ logging.error(f"【流式请求】在最后一次空回复重试时发生请求错误。")
574
+ final_sse_strings_to_yield = [
575
+ format_openai_sse_delta({"error": {"message": f"Request failed on final empty stream retry attempt: {str(e_req)}", "type": "internal_proxy_error"}}),
576
+ "data: [DONE]\n\n"
577
+ ]
578
+ break # Break empty retry loop
579
+ time.sleep(1)
580
+ # Loop continues, will try to get another new key for the next empty-response retry
581
+ continue
582
+
583
  def final_generator_for_response():
584
+ if not final_sse_strings_to_yield:
585
+ logging.error("【流式请求】final_sse_strings_to_yield 为空,返回通用错误。")
586
  yield format_openai_sse_delta({"error": {"message": "Unexpected empty result in streaming.", "type": "internal_proxy_error"}})
587
  yield "data: [DONE]\n\n"
588
  else:
 
591
 
592
  return Response(final_generator_for_response(), content_type='text/event-stream')
593
 
594
+ def handle_non_stream_request(initial_apikey, initial_session_id, query_str, endpoint_id, openai_model_name_for_response):
595
+ """处理非流式聊天补全请求,包含空回复重试逻辑 (每次空回复重试使用新Key/Session)"""
 
 
 
 
 
 
 
 
 
 
596
  max_empty_response_retries = 5
597
+ empty_retry_attempt_num = 0 # Counts total attempts including initial one
598
+
599
+ current_apikey_for_attempt = initial_apikey
600
+ current_session_id_for_attempt = initial_session_id
601
+
602
+ url = f"{ONDEMAND_API_BASE}/sessions/{current_session_id_for_attempt}/query" # URL will change if session_id changes
603
 
604
+ while empty_retry_attempt_num < max_empty_response_retries:
605
+ empty_retry_attempt_num += 1
606
+
607
+ if empty_retry_attempt_num > 1: # This is an empty-response retry, get new key/session
608
+ logging.info(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session...")
609
+ try:
610
+ current_apikey_for_attempt = keymgr.get()
611
+ current_session_id_for_attempt = create_session(current_apikey_for_attempt)
612
+ url = f"{ONDEMAND_API_BASE}/sessions/{current_session_id_for_attempt}/query" # Update URL with new session
613
+ logging.info(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】新Key/Session获取成功: Key={keymgr.display_key(current_apikey_for_attempt)}, Session={current_session_id_for_attempt}")
614
+ except (ValueError, requests.exceptions.RequestException) as e_key_session:
615
+ logging.warning(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session失败: {e_key_session}")
616
+ if current_apikey_for_attempt and not isinstance(e_key_session, ValueError):
617
+ keymgr.mark_bad(current_apikey_for_attempt)
618
+ if empty_retry_attempt_num >= max_empty_response_retries:
619
+ return jsonify({"error": f"Failed to get new key/session for final empty response retry. Error: {str(e_key_session)}"}), 503
620
+ time.sleep(1)
621
+ current_apikey_for_attempt = None
622
+ continue
623
+
624
+ log_attempt_str = f"初始尝试" if empty_retry_attempt_num == 1 else f"空回复重试 {empty_retry_attempt_num-1}"
625
+ logging.info(f"【同步请求】({log_attempt_str}, 总尝试 {empty_retry_attempt_num}) Session={current_session_id_for_attempt}, Key={keymgr.display_key(current_apikey_for_attempt)}")
626
+
627
+ payload = { "query": query_str, "endpointId": endpoint_id, "pluginIds": [], "responseMode": "sync" }
628
+ headers = {"apikey": current_apikey_for_attempt, "Content-Type": "application/json"}
629
 
630
  try:
631
  resp = requests.post(url, json=payload, headers=headers, timeout=120)
632
+ resp.raise_for_status()
633
 
634
  response_json = resp.json()
635
  if "data" not in response_json or "answer" not in response_json["data"]:
636
+ logging.error(f"【OnDemand同步错误】响应格式不符合预期 ({log_attempt_str})。Session: {current_session_id_for_attempt}, 响应: {str(response_json)[:500]}")
 
 
 
637
  raise ValueError("OnDemand API sync response missing 'data.answer' field.")
638
 
639
  ai_response_content = response_json["data"]["answer"]
640
+ if ai_response_content is None: ai_response_content = ""
 
641
 
642
+ if ai_response_content.strip():
643
+ logging.info(f"【同步请求】({log_attempt_str}) 成功获取非空内容。")
644
  openai_response_obj = {
645
+ "id": "chatcmpl-" + str(uuid.uuid4())[:12], "object": "chat.completion", "created": int(time.time()),
 
 
646
  "model": openai_model_name_for_response,
647
+ "choices": [{"index": 0, "message": {"role": "assistant", "content": ai_response_content}, "finish_reason": "stop"}],
 
 
 
 
648
  "usage": {}
649
  }
650
  return jsonify(openai_response_obj)
651
+ else:
652
+ logging.warning(f"【同步请求】({log_attempt_str}) 返回空回复。")
653
+ if empty_retry_attempt_num >= max_empty_response_retries:
654
+ logging.error(f"【同步请求】达到最大空回复重试次数 ({max_empty_response_retries})。")
 
655
  return jsonify({
656
  "error": f"Model returned an empty response after {max_empty_response_retries} retries.",
657
+ "id": "chatcmpl-" + str(uuid.uuid4())[:12], "object": "chat.completion", "created": int(time.time()),
 
 
658
  "model": openai_model_name_for_response,
659
+ "choices": [{"index": 0, "message": {"role": "assistant", "content": ""}, "finish_reason": "length"}],
 
 
 
 
660
  "usage": {}
661
+ }), 500
662
+ logging.info(f"【同步请求】空回复,将在1秒后重试下一个Key。当前总尝试 {empty_retry_attempt_num}/{max_empty_response_retries}")
663
+ time.sleep(1)
664
+
665
+ except requests.exceptions.RequestException as e_req:
666
+ logging.warning(f"【同步请求】({log_attempt_str} using key {keymgr.display_key(current_apikey_for_attempt)}) 发生请求级错误: {e_req}")
667
+ keymgr.mark_bad(current_apikey_for_attempt)
668
+ if empty_retry_attempt_num == 1:
669
+ raise e_req
670
+ if empty_retry_attempt_num >= max_empty_response_retries:
671
+ logging.error(f"【同步请求】在最后一次空回复重试时发生请求错误。")
672
+ return jsonify({"error": f"Request failed on final empty response retry attempt. Last error: {str(e_req)}"}), 503
673
+ time.sleep(1)
674
+ continue
675
+ except (ValueError, KeyError, json.JSONDecodeError) as e_parse:
676
+ logging.error(f"【同步请求】({log_attempt_str}) 处理响应时出错: {e_parse}", exc_info=True)
677
+ return jsonify({"error": f"Error processing OnDemand sync response: {str(e_parse)}"}), 502
678
 
679
+ logging.error(f"【同步请求】意外退出空回复重试循环。") # Should be unreachable
 
680
  return jsonify({"error": "Unexpected error in non-stream handling after empty response retries."}), 500
681
 
682
 
 
716
 
717
  if __name__ == "__main__":
718
  log_format = '[%(asctime)s] %(levelname)s in %(module)s (%(funcName)s): %(message)s'
719
+ # Use LOG_LEVEL from env if set, otherwise default to INFO
720
  logging.basicConfig(level=os.environ.get("LOG_LEVEL", "INFO").upper(), format=log_format)
721
 
722
  if not PRIVATE_KEY: