Ge-AI commited on
Commit
6de98b8
·
verified ·
1 Parent(s): 9b8516b

Update openai_ondemand_adapter.py

Browse files
Files changed (1) hide show
  1. openai_ondemand_adapter.py +291 -209
openai_ondemand_adapter.py CHANGED
@@ -238,13 +238,7 @@ def create_session(apikey, external_user_id=None, plugin_ids=None):
238
  def format_openai_sse_delta(chunk_data_dict):
239
  return f"data: {json.dumps(chunk_data_dict, ensure_ascii=False)}\n\n"
240
 
241
- # Modified: This function is now a generator and returns accumulated text + error flag at the end.
242
  def _execute_one_stream_attempt(apikey_for_attempt, session_id_for_attempt, query_str, endpoint_id, openai_model_name_for_response, current_attempt_num_logging):
243
- """
244
- Executes one streaming request attempt.
245
- Yields SSE event strings.
246
- Returns a tuple: (accumulated_text_content, api_error_yielded_flag)
247
- """
248
  url = f"{ONDEMAND_API_BASE}/sessions/{session_id_for_attempt}/query"
249
  payload = {
250
  "query": query_str,
@@ -259,113 +253,125 @@ def _execute_one_stream_attempt(apikey_for_attempt, session_id_for_attempt, quer
259
  }
260
 
261
  accumulated_text_parts = []
262
- api_error_yielded = False # Flag to indicate if an API error was processed and yielded as SSE
263
-
264
- logging.info(f"【流式请求子尝试 {current_attempt_num_logging}】发送到 OnDemand: Session={session_id_for_attempt}, Endpoint={endpoint_id}, Key={keymgr.display_key(apikey_for_attempt)}")
 
265
 
266
- try:
267
- with requests.post(url, json=payload, headers=headers, stream=True, timeout=180) as resp:
268
- if resp.status_code != 200:
269
- api_error_yielded = True
270
- error_text = resp.text
271
- logging.error(f"【OnDemand错误】请求失败 (子尝试 {current_attempt_num_logging})。状态码: {resp.status_code}, Session: {session_id_for_attempt}, 响应: {error_text[:500]}")
272
- error_payload = {
273
- "error": {
274
- "message": f"OnDemand API Error (Stream Init, Attempt {current_attempt_num_logging}): {resp.status_code} - {error_text[:200]}",
275
- "type": "on_demand_api_error",
276
- "code": resp.status_code
277
- }
278
- }
279
- yield format_openai_sse_delta(error_payload)
280
- yield "data: [DONE]\n\n"
281
- return "".join(accumulated_text_parts).strip(), api_error_yielded # Return after yielding error
282
 
283
- first_chunk_sent = False
284
- last_line_str = ""
285
- for line_bytes in resp.iter_lines():
286
- if not line_bytes:
287
- continue
288
-
289
- line_str = line_bytes.decode("utf-8")
290
- last_line_str = line_str
291
-
292
- if line_str.startswith("data:"):
293
- data_part = line_str[len("data:"):].strip()
294
-
295
- if data_part == "[DONE]":
296
- logging.info(f"【OnDemand流】接收到 [DONE] 信号 (子尝试 {current_attempt_num_logging})。Session: {session_id_for_attempt}")
297
  yield "data: [DONE]\n\n"
298
- # No break here, let the function return after the loop
299
  return "".join(accumulated_text_parts).strip(), api_error_yielded
300
-
301
- elif data_part.startswith("[ERROR]:"):
302
- api_error_yielded = True
303
- error_json_str = data_part[len("[ERROR]:"):].strip()
304
- logging.warning(f"【OnDemand流】接收到错误事件 (子尝试 {current_attempt_num_logging}): {error_json_str}。Session: {session_id_for_attempt}")
305
- try:
306
- error_obj = json.loads(error_json_str)
307
- except json.JSONDecodeError:
308
- error_obj = {"message": error_json_str, "type": "on_demand_stream_error_format"}
309
- yield format_openai_sse_delta({"error": error_obj})
310
- yield "data: [DONE]\n\n"
311
- return "".join(accumulated_text_parts).strip(), api_error_yielded # Return after yielding error
312
-
313
- else:
314
- try:
315
- event_data = json.loads(data_part)
316
- if event_data.get("eventType") == "fulfillment":
317
- delta_content = event_data.get("answer", "")
318
- if delta_content is None: delta_content = ""
319
- accumulated_text_parts.append(delta_content)
320
-
321
- choice_delta = {}
322
- if not first_chunk_sent:
323
- choice_delta["role"] = "assistant"
324
- choice_delta["content"] = delta_content
325
- first_chunk_sent = True
326
- else:
327
- choice_delta["content"] = delta_content
328
-
329
- if not choice_delta.get("content") and not choice_delta.get("role"):
330
- if not (choice_delta.get("role") and not choice_delta.get("content")):
331
- continue
332
-
333
- openai_chunk = {
334
- "id": "chatcmpl-" + str(uuid.uuid4())[:12],
335
- "object": "chat.completion.chunk",
336
- "created": int(time.time()),
337
- "model": openai_model_name_for_response,
338
- "choices": [{"delta": choice_delta, "index": 0, "finish_reason": None}]
339
- }
340
- yield format_openai_sse_delta(openai_chunk) # Yield immediately
341
- except json.JSONDecodeError:
342
- logging.warning(f"【OnDemand流】无法解析JSON (子尝试 {current_attempt_num_logging}): {data_part[:100]}... Session: {session_id_for_attempt}")
343
- continue
344
-
345
- # This part is reached if the loop finishes without an explicit [DONE] or [ERROR] from the stream data itself.
346
- # This might happen if the stream just ends.
347
- if not api_error_yielded: # If no error was yielded, and no [DONE] was in data, yield a [DONE]
348
- if not last_line_str.startswith("data: [DONE]"): # Check if last processed line was not already DONE
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
349
  logging.info(f"【OnDemand流】(子尝试 {current_attempt_num_logging}) 流迭代完成,补充发送 [DONE]。Session: {session_id_for_attempt}")
350
  yield "data: [DONE]\n\n"
 
 
 
 
 
 
 
351
 
352
- except requests.exceptions.RequestException:
353
- # Network/request level errors before or during streaming
354
- # These should be caught by the caller (handle_stream_request) to decide on retries
355
- logging.error(f"【OnDemand流】请求过程中发生网络或请求异常 (子尝试 {current_attempt_num_logging}): Session: {session_id_for_attempt}", exc_info=False)
356
- raise # Re-raise for the caller to handle
357
-
358
- except Exception as e:
359
- # Unexpected Python errors during stream processing
360
- logging.error(f"【OnDemand流】处理流时发生未知错误 (子尝试 {current_attempt_num_logging}): {e}, Session: {session_id_for_attempt}", exc_info=True)
361
- api_error_yielded = True # Mark that an error occurred and we are yielding an SSE for it
362
- error_payload = {
363
- "error": {"message": f"Unknown error during streaming (Attempt {current_attempt_num_logging}): {str(e)}", "type": "unknown_streaming_error_in_attempt"}
364
- }
365
- yield format_openai_sse_delta(error_payload)
366
- yield "data: [DONE]\n\n"
367
 
368
- return "".join(accumulated_text_parts).strip(), api_error_yielded
 
369
 
370
 
371
  @app.route("/v1/chat/completions", methods=["POST"])
@@ -418,7 +424,6 @@ def chat_completions():
418
 
419
  def attempt_ondemand_request_wrapper(current_apikey_from_wrapper, current_session_id_from_wrapper):
420
  if is_stream_request:
421
- # Pass the generator directly to Response
422
  return Response(
423
  handle_stream_request(current_apikey_from_wrapper, current_session_id_from_wrapper, final_query_to_ondemand, target_endpoint_id, openai_model_name),
424
  content_type='text/event-stream'
@@ -446,10 +451,22 @@ def chat_completions():
446
  break
447
  except requests.exceptions.RequestException as http_err_outer:
448
  last_exception_for_key_retry = http_err_outer
449
- logging.warning(f"【请求处理 - Key轮换尝试 {key_retry_count}】HTTP/请求错误。Key: {keymgr.display_key(selected_apikey_for_outer_retry) if selected_apikey_for_outer_retry else 'N/A'}, Error: {http_err_outer}")
450
- if selected_apikey_for_outer_retry:
451
- keymgr.mark_bad(selected_apikey_for_outer_retry)
 
 
452
 
 
 
 
 
 
 
 
 
 
 
453
  if key_retry_count >= max_key_retries:
454
  logging.error(f"【请求处理】所有Key轮换尝试均失败。最后错误: {last_exception_for_key_retry}")
455
  break
@@ -460,36 +477,26 @@ def chat_completions():
460
  except Exception as e_outer:
461
  last_exception_for_key_retry = e_outer
462
  logging.error(f"【请求处理 - Key轮换尝试 {key_retry_count}】发生意外严重错误: {e_outer}", exc_info=True)
463
- if selected_apikey_for_outer_retry:
464
  keymgr.mark_bad(selected_apikey_for_outer_retry)
465
  break
466
 
467
- error_message = "All attempts to process the request failed after multiple key/session retries."
468
- if last_exception_for_key_retry:
469
- error_message += f" Last known error during key/session phase: {str(last_exception_for_key_retry)}"
470
- logging.error(error_message)
471
- # For stream requests, if with_valid_key_and_session fails, we can't return jsonify directly
472
- # This part might need adjustment if the action_func_to_wrap for stream is expected to return a Response object
473
- # However, if action_func_to_wrap (attempt_ondemand_request_wrapper) for stream returns a Response,
474
- # then this jsonify will only be hit if create_session or keymgr.get fails repeatedly.
475
  if is_stream_request:
476
- # Construct a generator that yields an error SSE
477
  def error_stream_gen():
478
- yield format_openai_sse_delta({"error": {"message": error_message, "type": "proxy_setup_error", "code": 503}})
479
  yield "data: [DONE]\n\n"
480
- return Response(error_stream_gen(), content_type='text/event-stream', status=503)
481
  else:
482
- return jsonify({"error": error_message}), 503
483
 
484
  return with_valid_key_and_session(attempt_ondemand_request_wrapper)
485
 
486
- # Modified: This function is now a generator that uses `yield from`
487
  def handle_stream_request(initial_apikey, initial_session_id, query_str, endpoint_id, openai_model_name_for_response):
488
- """
489
- Handles streaming chat completion requests with empty response retries.
490
- Each empty response retry uses a new API key and session.
491
- Yields SSE event strings directly.
492
- """
493
  max_empty_response_retries = 5
494
  empty_retry_attempt_num = 0
495
 
@@ -499,7 +506,7 @@ def handle_stream_request(initial_apikey, initial_session_id, query_str, endpoin
499
  while empty_retry_attempt_num < max_empty_response_retries:
500
  empty_retry_attempt_num += 1
501
  accumulated_text_this_attempt = ""
502
- api_error_in_attempt = False
503
 
504
  if empty_retry_attempt_num > 1:
505
  logging.info(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session...")
@@ -509,10 +516,23 @@ def handle_stream_request(initial_apikey, initial_session_id, query_str, endpoin
509
  logging.info(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】新Key/Session获取成功: Key={keymgr.display_key(current_apikey_for_attempt)}, Session={current_session_id_for_attempt}")
510
  except (ValueError, requests.exceptions.RequestException) as e_key_session:
511
  logging.warning(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session失败: {e_key_session}")
 
 
 
 
512
  if current_apikey_for_attempt and not isinstance(e_key_session, ValueError):
513
- keymgr.mark_bad(current_apikey_for_attempt)
 
 
 
 
 
 
514
  if empty_retry_attempt_num >= max_empty_response_retries:
515
- yield format_openai_sse_delta({"error": {"message": f"Failed to get new key/session for final empty stream retry. Error: {str(e_key_session)}", "type": "internal_proxy_error"}})
 
 
 
516
  yield "data: [DONE]\n\n"
517
  return
518
  time.sleep(1)
@@ -522,8 +542,6 @@ def handle_stream_request(initial_apikey, initial_session_id, query_str, endpoin
522
  log_attempt_str = f"初始尝试" if empty_retry_attempt_num == 1 else f"空回复重试 {empty_retry_attempt_num-1}"
523
 
524
  try:
525
- # Yield from the sub-generator; result_tuple will be (accumulated_text, api_error_yielded_flag)
526
- # This is where the true streaming to the client happens chunk by chunk.
527
  result_tuple = yield from _execute_one_stream_attempt(
528
  current_apikey_for_attempt,
529
  current_session_id_for_attempt,
@@ -533,64 +551,74 @@ def handle_stream_request(initial_apikey, initial_session_id, query_str, endpoin
533
  f"{log_attempt_str} (Overall attempt {empty_retry_attempt_num})"
534
  )
535
  accumulated_text_this_attempt = result_tuple[0]
536
- api_error_in_attempt = result_tuple[1]
537
 
538
- except requests.exceptions.RequestException as e_req:
539
- logging.warning(f"【流式请求】({log_attempt_str} using key {keymgr.display_key(current_apikey_for_attempt)}) 发生请求级错误: {e_req}")
540
- if current_apikey_for_attempt: # Ensure key is marked bad if one was used
541
- keymgr.mark_bad(current_apikey_for_attempt)
 
 
 
 
 
 
 
 
 
 
 
542
 
543
  if empty_retry_attempt_num == 1:
544
- # This was the initial_apikey. Re-raise for the outer key retry mechanism in with_valid_key_and_session.
545
- # The with_valid_key_and_session will then try a new key for the *entire* operation.
546
  raise e_req
547
 
548
- # If it's an empty-response retry (attempt_num > 1) that failed with RequestException
549
  if empty_retry_attempt_num >= max_empty_response_retries:
550
- logging.error(f"【流式请求】在最后一次空回复重试时发生求错误。")
551
- yield format_openai_sse_delta({"error": {"message": f"Request failed on final empty stream retry attempt: {str(e_req)}", "type": "internal_proxy_error"}})
 
 
552
  yield "data: [DONE]\n\n"
553
  return
554
  time.sleep(1)
555
- continue # To the next iteration of the empty_retry_attempt_num loop (will try new key/session)
556
 
557
- # Check results after _execute_one_stream_attempt has finished for this attempt
558
  if api_error_in_attempt:
559
- logging.warning(f"【流式请求】({log_attempt_str}) OnDemand 服务返回错误或处理内部错误,已将错误信息流式传输。")
560
- # Error already yielded by _execute_one_stream_attempt, so we just stop.
561
  return
562
 
563
  if accumulated_text_this_attempt:
564
  logging.info(f"【流式请求】({log_attempt_str}) 成功获取非空内容。")
565
- # Content already yielded by _execute_one_stream_attempt. We are done.
566
  return
567
 
568
- # If we reach here, content was empty and no API error was yielded by _execute_one_stream_attempt
569
  logging.warning(f"【流式请求】({log_attempt_str}) 返回空内容。")
570
  if empty_retry_attempt_num >= max_empty_response_retries:
571
- logging.error(f"【流式请求】达到最大空回复重试次数 ({max_empty_response_retries})。")
 
 
572
  yield format_openai_sse_delta({
573
- "error": {"message": f"Model returned an empty stream after {max_empty_response_retries} retries.", "type": "empty_stream_error_after_retries", "code": "empty_response"}
574
  })
575
  yield "data: [DONE]\n\n"
576
  return
577
 
578
  logging.info(f"【流式请求】空回复,将在1秒后重试下一个Key。当前总尝试 {empty_retry_attempt_num}/{max_empty_response_retries}")
579
  time.sleep(1)
580
- # Fallback if loop finishes unexpectedly (shouldn't happen with current logic)
581
- logging.error("【流式请求】意外退出空回复重试循环。")
582
- yield format_openai_sse_delta({"error": {"message": "Unexpected error in stream handling.", "type": "internal_proxy_error"}})
 
 
583
  yield "data: [DONE]\n\n"
584
 
585
 
586
  def handle_non_stream_request(initial_apikey, initial_session_id, query_str, endpoint_id, openai_model_name_for_response):
587
- max_empty_response_retries = 5
588
  empty_retry_attempt_num = 0
589
 
590
  current_apikey_for_attempt = initial_apikey
591
  current_session_id_for_attempt = initial_session_id
592
 
593
- url = f"{ONDEMAND_API_BASE}/sessions/{current_session_id_for_attempt}/query"
594
 
595
  while empty_retry_attempt_num < max_empty_response_retries:
596
  empty_retry_attempt_num += 1
@@ -600,76 +628,130 @@ def handle_non_stream_request(initial_apikey, initial_session_id, query_str, end
600
  try:
601
  current_apikey_for_attempt = keymgr.get()
602
  current_session_id_for_attempt = create_session(current_apikey_for_attempt)
603
- url = f"{ONDEMAND_API_BASE}/sessions/{current_session_id_for_attempt}/query"
604
  logging.info(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】新Key/Session获取成功: Key={keymgr.display_key(current_apikey_for_attempt)}, Session={current_session_id_for_attempt}")
605
  except (ValueError, requests.exceptions.RequestException) as e_key_session:
606
  logging.warning(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session失败: {e_key_session}")
 
 
 
 
607
  if current_apikey_for_attempt and not isinstance(e_key_session, ValueError):
608
- keymgr.mark_bad(current_apikey_for_attempt)
 
 
 
 
 
 
609
  if empty_retry_attempt_num >= max_empty_response_retries:
610
- return jsonify({"error": f"Failed to get new key/session for final empty response retry. Error: {str(e_key_session)}"}), 503
 
 
 
611
  time.sleep(1)
612
  current_apikey_for_attempt = None
613
  continue
614
 
615
  log_attempt_str = f"初始尝试" if empty_retry_attempt_num == 1 else f"空回复重试 {empty_retry_attempt_num-1}"
616
- logging.info(f"【同步请求】({log_attempt_str}, 总尝试 {empty_retry_attempt_num}) Session={current_session_id_for_attempt}, Key={keymgr.display_key(current_apikey_for_attempt)}")
617
 
618
- payload = { "query": query_str, "endpointId": endpoint_id, "pluginIds": [], "responseMode": "sync" }
619
- headers = {"apikey": current_apikey_for_attempt, "Content-Type": "application/json"}
620
 
621
- try:
622
- resp = requests.post(url, json=payload, headers=headers, timeout=120)
623
- resp.raise_for_status()
624
-
625
- response_json = resp.json()
626
- if "data" not in response_json or "answer" not in response_json["data"]:
627
- logging.error(f"【OnDemand同步错误响应格式不符合预期 ({log_attempt_str})。Session: {current_session_id_for_attempt}, 响应: {str(response_json)[:500]}")
628
- raise ValueError("OnDemand API sync response missing 'data.answer' field.")
 
 
 
 
 
 
629
 
630
- ai_response_content = response_json["data"]["answer"]
631
- if ai_response_content is None: ai_response_content = ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
632
 
633
- if ai_response_content.strip():
634
- logging.info(f"【同步请求】({log_attempt_str}) 成功获取非空内容。")
635
- openai_response_obj = {
636
- "id": "chatcmpl-" + str(uuid.uuid4())[:12], "object": "chat.completion", "created": int(time.time()),
637
- "model": openai_model_name_for_response,
638
- "choices": [{"index": 0, "message": {"role": "assistant", "content": ai_response_content}, "finish_reason": "stop"}],
639
- "usage": {}
640
- }
641
- return jsonify(openai_response_obj)
642
- else:
643
- logging.warning(f"【同步请求】({log_attempt_str}) 返回空回复。")
644
- if empty_retry_attempt_num >= max_empty_response_retries:
645
- logging.error(f"【同步请求】达到最大空回复重试次数 ({max_empty_response_retries})。")
646
- return jsonify({
647
- "error": f"Model returned an empty response after {max_empty_response_retries} retries.",
648
  "id": "chatcmpl-" + str(uuid.uuid4())[:12], "object": "chat.completion", "created": int(time.time()),
649
  "model": openai_model_name_for_response,
650
- "choices": [{"index": 0, "message": {"role": "assistant", "content": ""}, "finish_reason": "length"}],
651
- "usage": {}
652
- }), 500
653
- logging.info(f"【同步请求】空回复,将在1秒后重试下一个Key。当前总尝试 {empty_retry_attempt_num}/{max_empty_response_retries}")
654
- time.sleep(1)
655
-
656
- except requests.exceptions.RequestException as e_req:
657
- logging.warning(f"【同步请求】({log_attempt_str} using key {keymgr.display_key(current_apikey_for_attempt)}) 发生请求级错误: {e_req}")
658
- if current_apikey_for_attempt: # Ensure key is marked bad
659
- keymgr.mark_bad(current_apikey_for_attempt)
660
- if empty_retry_attempt_num == 1:
661
- raise e_req
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
662
  if empty_retry_attempt_num >= max_empty_response_retries:
663
- logging.error(f"【同步请求】在最后一次空回复重试时发生求错误。")
664
- return jsonify({"error": f"Request failed on final empty response retry attempt. Last error: {str(e_req)}"}), 503
665
- time.sleep(1)
666
- continue
667
- except (ValueError, KeyError, json.JSONDecodeError) as e_parse:
668
- logging.error(f"【同步请求】({log_attempt_str}) 处理响应时出错: {e_parse}", exc_info=True)
669
- return jsonify({"error": f"Error processing OnDemand sync response: {str(e_parse)}"}), 502
670
-
671
- logging.error(f"【同步请求】意外退出空回复重试循环。")
672
- return jsonify({"error": "Unexpected error in non-stream handling after empty response retries."}), 500
 
 
 
 
 
 
 
 
673
 
674
 
675
  @app.route("/v1/models", methods=["GET"])
 
238
  def format_openai_sse_delta(chunk_data_dict):
239
  return f"data: {json.dumps(chunk_data_dict, ensure_ascii=False)}\n\n"
240
 
 
241
  def _execute_one_stream_attempt(apikey_for_attempt, session_id_for_attempt, query_str, endpoint_id, openai_model_name_for_response, current_attempt_num_logging):
 
 
 
 
 
242
  url = f"{ONDEMAND_API_BASE}/sessions/{session_id_for_attempt}/query"
243
  payload = {
244
  "query": query_str,
 
253
  }
254
 
255
  accumulated_text_parts = []
256
+ api_error_yielded = False
257
+
258
+ max_500_retries_for_this_call = 5
259
+ current_500_retry_count = 0
260
 
261
+ while current_500_retry_count < max_500_retries_for_this_call:
262
+ current_500_retry_count += 1
263
+ if current_500_retry_count > 1:
264
+ logging.info(f"【流式请求子尝试 {current_attempt_num_logging} - 500错误重试 {current_500_retry_count-1}/{max_500_retries_for_this_call-1}】Key: {keymgr.display_key(apikey_for_attempt)}")
265
+ else:
266
+ logging.info(f"【流请求子尝试 {current_attempt_num_logging}】发送到 OnDemand: Session={session_id_for_attempt}, Endpoint={endpoint_id}, Key={keymgr.display_key(apikey_for_attempt)}")
 
 
 
 
 
 
 
 
 
 
267
 
268
+ try:
269
+ with requests.post(url, json=payload, headers=headers, stream=True, timeout=180) as resp:
270
+ if resp.status_code == 500:
271
+ logging.warning(f"【OnDemand流错误】(子尝试 {current_attempt_num_logging}, 500重试 {current_500_retry_count}) 收到500错误。Session: {session_id_for_attempt}")
272
+ if current_500_retry_count >= max_500_retries_for_this_call:
273
+ logging.error(f"【OnDemand流错误】(子尝试 {current_attempt_num_logging}) 达到500错误最大重试次数。将错误传递给上层。")
274
+ api_error_yielded = True
275
+ error_payload = {"error": {"message": f"OnDemand API persistent 500 error after {max_500_retries_for_this_call} retries (Attempt {current_attempt_num_logging}).",
276
+ "type": "on_demand_persistent_500_error", "code": 500}}
277
+ yield format_openai_sse_delta(error_payload)
 
 
 
 
278
  yield "data: [DONE]\n\n"
 
279
  return "".join(accumulated_text_parts).strip(), api_error_yielded
280
+ time.sleep(1)
281
+ continue
282
+
283
+ if resp.status_code != 200:
284
+ api_error_yielded = True
285
+ error_text = resp.text
286
+ logging.error(f"【OnDemand流错误】请求失败 (子尝试 {current_attempt_num_logging})。状态码: {resp.status_code}, Session: {session_id_for_attempt}, 响应: {error_text[:500]}")
287
+ error_payload = {
288
+ "error": {
289
+ "message": f"OnDemand API Error (Stream Init, Attempt {current_attempt_num_logging}): {resp.status_code} - {error_text[:200]}",
290
+ "type": "on_demand_api_error",
291
+ "code": resp.status_code
292
+ }
293
+ }
294
+ yield format_openai_sse_delta(error_payload)
295
+ yield "data: [DONE]\n\n"
296
+ return "".join(accumulated_text_parts).strip(), api_error_yielded
297
+
298
+ first_chunk_sent = False
299
+ last_line_str = ""
300
+ for line_bytes in resp.iter_lines():
301
+ if not line_bytes:
302
+ continue
303
+ line_str = line_bytes.decode("utf-8")
304
+ last_line_str = line_str
305
+
306
+ if line_str.startswith("data:"):
307
+ data_part = line_str[len("data:"):].strip()
308
+ if data_part == "[DONE]":
309
+ logging.info(f"【OnDemand流】接收到 [DONE] 信号 (子尝试 {current_attempt_num_logging})。Session: {session_id_for_attempt}")
310
+ yield "data: [DONE]\n\n"
311
+ return "".join(accumulated_text_parts).strip(), api_error_yielded
312
+ elif data_part.startswith("[ERROR]:"):
313
+ api_error_yielded = True
314
+ error_json_str = data_part[len("[ERROR]:"):].strip()
315
+ logging.warning(f"【OnDemand流】接收到错误事件 (子尝试 {current_attempt_num_logging}): {error_json_str}。Session: {session_id_for_attempt}")
316
+ try:
317
+ error_obj = json.loads(error_json_str)
318
+ except json.JSONDecodeError:
319
+ error_obj = {"message": error_json_str, "type": "on_demand_stream_error_format"}
320
+ yield format_openai_sse_delta({"error": error_obj})
321
+ yield "data: [DONE]\n\n"
322
+ return "".join(accumulated_text_parts).strip(), api_error_yielded
323
+ else:
324
+ try:
325
+ event_data = json.loads(data_part)
326
+ if event_data.get("eventType") == "fulfillment":
327
+ delta_content = event_data.get("answer", "")
328
+ if delta_content is None: delta_content = ""
329
+ accumulated_text_parts.append(delta_content)
330
+ choice_delta = {}
331
+ if not first_chunk_sent:
332
+ choice_delta["role"] = "assistant"
333
+ choice_delta["content"] = delta_content
334
+ first_chunk_sent = True
335
+ else:
336
+ choice_delta["content"] = delta_content
337
+ if not choice_delta.get("content") and not choice_delta.get("role"):
338
+ if not (choice_delta.get("role") and not choice_delta.get("content")):
339
+ continue
340
+ openai_chunk = {
341
+ "id": "chatcmpl-" + str(uuid.uuid4())[:12],
342
+ "object": "chat.completion.chunk",
343
+ "created": int(time.time()),
344
+ "model": openai_model_name_for_response,
345
+ "choices": [{"delta": choice_delta, "index": 0, "finish_reason": None}]
346
+ }
347
+ yield format_openai_sse_delta(openai_chunk)
348
+ except json.JSONDecodeError:
349
+ logging.warning(f"【OnDemand流】无法解析JSON (子尝试 {current_attempt_num_logging}): {data_part[:100]}... Session: {session_id_for_attempt}")
350
+ continue
351
+
352
+ if not api_error_yielded and not last_line_str.startswith("data: [DONE]"):
353
  logging.info(f"【OnDemand流】(子尝试 {current_attempt_num_logging}) 流迭代完成,补充发送 [DONE]。Session: {session_id_for_attempt}")
354
  yield "data: [DONE]\n\n"
355
+ return "".join(accumulated_text_parts).strip(), api_error_yielded
356
+
357
+ except requests.exceptions.RequestException as e_req_inner:
358
+ logging.error(f"【OnDemand流】(子尝试 {current_attempt_num_logging}) 请求时发生异常: {e_req_inner}, Key: {keymgr.display_key(apikey_for_attempt)}")
359
+ if current_500_retry_count >= max_500_retries_for_this_call or (hasattr(e_req_inner, 'response') and e_req_inner.response is not None and e_req_inner.response.status_code != 500):
360
+ raise e_req_inner
361
+ time.sleep(1)
362
 
363
+ except Exception as e_inner_unknown:
364
+ logging.error(f"【OnDemand流】处理流时发生未知错误 (子尝试 {current_attempt_num_logging}): {e_inner_unknown}, Session: {session_id_for_attempt}", exc_info=True)
365
+ api_error_yielded = True
366
+ error_payload = {
367
+ "error": {"message": f"Unknown error during streaming (Attempt {current_attempt_num_logging}): {str(e_inner_unknown)}", "type": "unknown_streaming_error_in_attempt"}
368
+ }
369
+ yield format_openai_sse_delta(error_payload)
370
+ yield "data: [DONE]\n\n"
371
+ return "".join(accumulated_text_parts).strip(), api_error_yielded
 
 
 
 
 
 
372
 
373
+ logging.error(f"【OnDemand流】(子尝试 {current_attempt_num_logging}) 500错误重试循环意外结束。")
374
+ raise requests.exceptions.RequestException(f"Exhausted internal 500 retries for attempt {current_attempt_num_logging} without success or specific error propagation.")
375
 
376
 
377
  @app.route("/v1/chat/completions", methods=["POST"])
 
424
 
425
  def attempt_ondemand_request_wrapper(current_apikey_from_wrapper, current_session_id_from_wrapper):
426
  if is_stream_request:
 
427
  return Response(
428
  handle_stream_request(current_apikey_from_wrapper, current_session_id_from_wrapper, final_query_to_ondemand, target_endpoint_id, openai_model_name),
429
  content_type='text/event-stream'
 
451
  break
452
  except requests.exceptions.RequestException as http_err_outer:
453
  last_exception_for_key_retry = http_err_outer
454
+ status_code_from_exc = None
455
+ if hasattr(http_err_outer, 'response') and http_err_outer.response is not None:
456
+ status_code_from_exc = http_err_outer.response.status_code
457
+
458
+ logging.warning(f"【请求处理 - Key轮换尝试 {key_retry_count}】HTTP/请求错误。Status: {status_code_from_exc}, Key: {keymgr.display_key(selected_apikey_for_outer_retry) if selected_apikey_for_outer_retry else 'N/A'}, Error: {http_err_outer}")
459
 
460
+ if selected_apikey_for_outer_retry:
461
+ if status_code_from_exc == 524:
462
+ logging.info(f"【KeyManager】Key {keymgr.display_key(selected_apikey_for_outer_retry)} not marked bad due to 524 error.")
463
+ elif status_code_from_exc == 500:
464
+ logging.info(f"【KeyManager】Key {keymgr.display_key(selected_apikey_for_outer_retry)} not marked bad due to 500 error.")
465
+ elif status_code_from_exc and ((400 <= status_code_from_exc < 500) or status_code_from_exc in [502, 503]): # Explicitly list codes that mark bad, excluding 500, 524
466
+ keymgr.mark_bad(selected_apikey_for_outer_retry)
467
+ elif not status_code_from_exc :
468
+ keymgr.mark_bad(selected_apikey_for_outer_retry)
469
+
470
  if key_retry_count >= max_key_retries:
471
  logging.error(f"【请求处理】所有Key轮换尝试均失败。最后错误: {last_exception_for_key_retry}")
472
  break
 
477
  except Exception as e_outer:
478
  last_exception_for_key_retry = e_outer
479
  logging.error(f"【请求处理 - Key轮换尝试 {key_retry_count}】发生意外严重错误: {e_outer}", exc_info=True)
480
+ if selected_apikey_for_outer_retry:
481
  keymgr.mark_bad(selected_apikey_for_outer_retry)
482
  break
483
 
484
+ error_message = "重试次数过多,请检查上下文长度! 或联系管理员!"
485
+ error_code_str = "max_retries_check_context_contact_admin"
486
+
487
+ logging.error(f"【请求处理】所有Key/Session获取尝试失败。最终错误: {error_message} Last underlying exception: {last_exception_for_key_retry}")
488
+
 
 
 
489
  if is_stream_request:
 
490
  def error_stream_gen():
491
+ yield format_openai_sse_delta({"error": {"message": error_message, "type": "proxy_max_retries_exceeded", "code": error_code_str}})
492
  yield "data: [DONE]\n\n"
493
+ return Response(error_stream_gen(), content_type='text/event-stream', status=500)
494
  else:
495
+ return jsonify({"error": error_message, "code": error_code_str}), 500
496
 
497
  return with_valid_key_and_session(attempt_ondemand_request_wrapper)
498
 
 
499
  def handle_stream_request(initial_apikey, initial_session_id, query_str, endpoint_id, openai_model_name_for_response):
 
 
 
 
 
500
  max_empty_response_retries = 5
501
  empty_retry_attempt_num = 0
502
 
 
506
  while empty_retry_attempt_num < max_empty_response_retries:
507
  empty_retry_attempt_num += 1
508
  accumulated_text_this_attempt = ""
509
+ api_error_in_attempt = False
510
 
511
  if empty_retry_attempt_num > 1:
512
  logging.info(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session...")
 
516
  logging.info(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】新Key/Session获取成功: Key={keymgr.display_key(current_apikey_for_attempt)}, Session={current_session_id_for_attempt}")
517
  except (ValueError, requests.exceptions.RequestException) as e_key_session:
518
  logging.warning(f"【流式请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session失败: {e_key_session}")
519
+ status_code_from_exc_retry_setup = None
520
+ if hasattr(e_key_session, 'response') and e_key_session.response is not None:
521
+ status_code_from_exc_retry_setup = e_key_session.response.status_code
522
+
523
  if current_apikey_for_attempt and not isinstance(e_key_session, ValueError):
524
+ if status_code_from_exc_retry_setup == 524:
525
+ logging.info(f"【KeyManager】Key {keymgr.display_key(current_apikey_for_attempt)} not marked bad for 524 error during key/session acquisition for stream retry.")
526
+ elif status_code_from_exc_retry_setup == 500:
527
+ logging.info(f"【KeyManager】Key {keymgr.display_key(current_apikey_for_attempt)} not marked bad for 500 error during key/session acquisition for stream retry.")
528
+ else:
529
+ keymgr.mark_bad(current_apikey_for_attempt)
530
+
531
  if empty_retry_attempt_num >= max_empty_response_retries:
532
+ final_error_message = "重试次数过多,请检查上下文长度! 或联系管理员!"
533
+ final_error_code = "max_retries_check_context_contact_admin"
534
+ logging.error(f"【流式请求】无法为最终空回复重试获取新Key/Session。错误: {e_key_session}")
535
+ yield format_openai_sse_delta({"error": {"message": final_error_message, "type": "proxy_final_retry_setup_failed", "code": final_error_code, "details": str(e_key_session)}})
536
  yield "data: [DONE]\n\n"
537
  return
538
  time.sleep(1)
 
542
  log_attempt_str = f"初始尝试" if empty_retry_attempt_num == 1 else f"空回复重试 {empty_retry_attempt_num-1}"
543
 
544
  try:
 
 
545
  result_tuple = yield from _execute_one_stream_attempt(
546
  current_apikey_for_attempt,
547
  current_session_id_for_attempt,
 
551
  f"{log_attempt_str} (Overall attempt {empty_retry_attempt_num})"
552
  )
553
  accumulated_text_this_attempt = result_tuple[0]
554
+ api_error_in_attempt = result_tuple[1]
555
 
556
+ except requests.exceptions.RequestException as e_req:
557
+ log_key_display = keymgr.display_key(current_apikey_for_attempt) if current_apikey_for_attempt else "N/A"
558
+ status_code_from_exc_stream = None
559
+ if hasattr(e_req, 'response') and e_req.response is not None:
560
+ status_code_from_exc_stream = e_req.response.status_code
561
+
562
+ logging.warning(f"【流式请求】({log_attempt_str} using key {log_key_display}) 发生请求级错误: {e_req}, Status: {status_code_from_exc_stream}")
563
+
564
+ if current_apikey_for_attempt:
565
+ if status_code_from_exc_stream == 524:
566
+ logging.info(f"【KeyManager】Key {log_key_display} not marked bad for 524 error during stream attempt.")
567
+ elif status_code_from_exc_stream == 500:
568
+ logging.info(f"【KeyManager】Key {log_key_display} not marked bad for 500 error during stream attempt.")
569
+ else:
570
+ keymgr.mark_bad(current_apikey_for_attempt)
571
 
572
  if empty_retry_attempt_num == 1:
 
 
573
  raise e_req
574
 
 
575
  if empty_retry_attempt_num >= max_empty_response_retries:
576
+ final_error_message = "重试次数过多,检查上下文长度! 或联系管理员!"
577
+ final_error_code = "max_retries_check_context_contact_admin"
578
+ logging.error(f"【流式请求】在最后一次空回复重试时发生请求错误: {e_req}")
579
+ yield format_openai_sse_delta({"error": {"message": final_error_message, "type": "proxy_final_retry_request_failed", "code": final_error_code, "details": str(e_req)}})
580
  yield "data: [DONE]\n\n"
581
  return
582
  time.sleep(1)
583
+ continue
584
 
 
585
  if api_error_in_attempt:
586
+ logging.warning(f"【流式请求】({log_attempt_str}) 子尝试已处理流式传输API错误。")
 
587
  return
588
 
589
  if accumulated_text_this_attempt:
590
  logging.info(f"【流式请求】({log_attempt_str}) 成功获取非空内容。")
 
591
  return
592
 
 
593
  logging.warning(f"【流式请求】({log_attempt_str}) 返回空内容。")
594
  if empty_retry_attempt_num >= max_empty_response_retries:
595
+ final_error_message = "重试次数过多,请检查上下文长度! 或联系管理员!"
596
+ final_error_code = "max_retries_check_context_contact_admin"
597
+ logging.error(f"【流式请求】达到最大空回复重试次数 ({max_empty_response_retries})。将返回指定错误。")
598
  yield format_openai_sse_delta({
599
+ "error": {"message": final_error_message, "type": "max_empty_retries_exceeded", "code": final_error_code}
600
  })
601
  yield "data: [DONE]\n\n"
602
  return
603
 
604
  logging.info(f"【流式请求】空回复,将在1秒后重试下一个Key。当前总尝试 {empty_retry_attempt_num}/{max_empty_response_retries}")
605
  time.sleep(1)
606
+
607
+ final_fallback_error_message = "重试次数过多,请检查上下文长度! 或联系管理员!"
608
+ final_fallback_error_code = "max_retries_check_context_contact_admin_fallback"
609
+ logging.error(f"【流式请求】意外退出空回复重试循环。返回最终错误。")
610
+ yield format_openai_sse_delta({"error": {"message": final_fallback_error_message, "type": "internal_proxy_error_unexpected_exit", "code": final_fallback_error_code}})
611
  yield "data: [DONE]\n\n"
612
 
613
 
614
  def handle_non_stream_request(initial_apikey, initial_session_id, query_str, endpoint_id, openai_model_name_for_response):
615
+ max_empty_response_retries = 5
616
  empty_retry_attempt_num = 0
617
 
618
  current_apikey_for_attempt = initial_apikey
619
  current_session_id_for_attempt = initial_session_id
620
 
621
+ ai_response_content = "" # Define ai_response_content outside the try block to ensure it's available for the final empty check
622
 
623
  while empty_retry_attempt_num < max_empty_response_retries:
624
  empty_retry_attempt_num += 1
 
628
  try:
629
  current_apikey_for_attempt = keymgr.get()
630
  current_session_id_for_attempt = create_session(current_apikey_for_attempt)
 
631
  logging.info(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】新Key/Session获取成功: Key={keymgr.display_key(current_apikey_for_attempt)}, Session={current_session_id_for_attempt}")
632
  except (ValueError, requests.exceptions.RequestException) as e_key_session:
633
  logging.warning(f"【同步请求-空回复重试 {empty_retry_attempt_num-1}】获取新Key/Session失败: {e_key_session}")
634
+ status_code_from_exc_retry_setup_ns = None
635
+ if hasattr(e_key_session, 'response') and e_key_session.response is not None:
636
+ status_code_from_exc_retry_setup_ns = e_key_session.response.status_code
637
+
638
  if current_apikey_for_attempt and not isinstance(e_key_session, ValueError):
639
+ if status_code_from_exc_retry_setup_ns == 524:
640
+ logging.info(f"【KeyManager】Key {keymgr.display_key(current_apikey_for_attempt)} not marked bad for 524 error during key/session acquisition for non-stream retry.")
641
+ elif status_code_from_exc_retry_setup_ns == 500:
642
+ logging.info(f"【KeyManager】Key {keymgr.display_key(current_apikey_for_attempt)} not marked bad for 500 error during key/session acquisition for non-stream retry.")
643
+ else:
644
+ keymgr.mark_bad(current_apikey_for_attempt)
645
+
646
  if empty_retry_attempt_num >= max_empty_response_retries:
647
+ final_error_message = "重试次数过多,请检查上下文长度! 或联系管理员!"
648
+ final_error_code = "max_retries_check_context_contact_admin"
649
+ logging.error(f"【同步请求】无法为最终空回复重试获取新Key/Session。错误: {e_key_session}")
650
+ return jsonify({"error": final_error_message, "code": final_error_code, "details": str(e_key_session)}), 500
651
  time.sleep(1)
652
  current_apikey_for_attempt = None
653
  continue
654
 
655
  log_attempt_str = f"初始尝试" if empty_retry_attempt_num == 1 else f"空回复重试 {empty_retry_attempt_num-1}"
 
656
 
657
+ max_500_retries_for_this_call = 5
658
+ current_500_retry_count = 0
659
 
660
+ # Reset ai_response_content for each new attempt (especially for the 500-retry loop)
661
+ ai_response_content = ""
662
+
663
+ while current_500_retry_count < max_500_retries_for_this_call:
664
+ current_500_retry_count += 1
665
+ if current_500_retry_count > 1:
666
+ logging.info(f"【同步请求】({log_attempt_str}, 总尝试 {empty_retry_attempt_num}, 500错误重试 {current_500_retry_count-1}/{max_500_retries_for_this_call-1}) Session={current_session_id_for_attempt}, Key={keymgr.display_key(current_apikey_for_attempt)}")
667
+ else:
668
+ logging.info(f"【同步请求】({log_attempt_str}, 总尝试 {empty_retry_attempt_num}) Session={current_session_id_for_attempt}, Key={keymgr.display_key(current_apikey_for_attempt)}")
669
+
670
+ url = f"{ONDEMAND_API_BASE}/sessions/{current_session_id_for_attempt}/query"
671
+ # Corrected: Use query_str and endpoint_id parameters passed to the function
672
+ payload = { "query": query_str, "endpointId": endpoint_id, "pluginIds": [], "responseMode": "sync" }
673
+ headers = {"apikey": current_apikey_for_attempt, "Content-Type": "application/json"}
674
 
675
+ try:
676
+ resp = requests.post(url, json=payload, headers=headers, timeout=120)
677
+
678
+ if resp.status_code == 500:
679
+ logging.warning(f"【OnDemand同步错误】({log_attempt_str}, 500重试 {current_500_retry_count}) 收到500错误。")
680
+ if current_500_retry_count >= max_500_retries_for_this_call:
681
+ logging.error(f"【OnDemand同步错误】({log_attempt_str}) 达到500错误最大重试次数。将错误传递给上层。")
682
+ resp.raise_for_status()
683
+ time.sleep(1)
684
+ continue
685
+
686
+ resp.raise_for_status()
687
+
688
+ response_json = resp.json()
689
+ if "data" not in response_json or "answer" not in response_json["data"]:
690
+ logging.error(f"【OnDemand同步错误】响应格式不符合预期 ({log_attempt_str})。Session: {current_session_id_for_attempt}, 响应: {str(response_json)[:500]}")
691
+ raise ValueError(f"OnDemand API sync response missing 'data.answer' field on attempt {empty_retry_attempt_num}, 500-retry {current_500_retry_count}.")
692
+
693
+ ai_response_content = response_json["data"]["answer"]
694
+ if ai_response_content is None: ai_response_content = ""
695
 
696
+ if ai_response_content.strip():
697
+ logging.info(f"【同步请求】({log_attempt_str}, 500重试 {current_500_retry_count}) 成功获取非空内容。")
698
+ openai_response_obj = {
 
 
 
 
 
 
 
 
 
 
 
 
699
  "id": "chatcmpl-" + str(uuid.uuid4())[:12], "object": "chat.completion", "created": int(time.time()),
700
  "model": openai_model_name_for_response,
701
+ "choices": [{"index": 0, "message": {"role": "assistant", "content": ai_response_content}, "finish_reason": "stop"}],
702
+ "usage": {}
703
+ }
704
+ return jsonify(openai_response_obj) # SUCCESS
705
+ else:
706
+ logging.warning(f"【同步请求】({log_attempt_str}, 500重试 {current_500_retry_count}) 返回空回复。")
707
+ break
708
+
709
+ except requests.exceptions.RequestException as e_req:
710
+ log_key_display_sync = keymgr.display_key(current_apikey_for_attempt) if current_apikey_for_attempt else "N/A"
711
+ status_code_from_exc_sync = None
712
+ if hasattr(e_req, 'response') and e_req.response is not None:
713
+ status_code_from_exc_sync = e_req.response.status_code
714
+
715
+ logging.warning(f"【同步请求】({log_attempt_str}, 500重试 {current_500_retry_count} using key {log_key_display_sync}) 发生请求级错误: {e_req}, Status: {status_code_from_exc_sync}")
716
+
717
+ if current_500_retry_count >= max_500_retries_for_this_call or status_code_from_exc_sync != 500:
718
+ if empty_retry_attempt_num == 1:
719
+ raise e_req
720
+ else:
721
+ raise e_req
722
+
723
+ time.sleep(1)
724
+ # Continue to the next iteration of current_500_retry_count loop
725
+
726
+ except (ValueError, KeyError, json.JSONDecodeError) as e_parse:
727
+ logging.error(f"【同步请求】({log_attempt_str}, 500重试 {current_500_retry_count}) 处理响应或格式时出错: {e_parse}", exc_info=True)
728
+ if empty_retry_attempt_num == 1 and current_500_retry_count == 1 :
729
+ raise requests.exceptions.RequestException(f"Response format error on first attempt: {e_parse}") from e_parse
730
+ raise requests.exceptions.RequestException(f"Response format error during retry: {e_parse}") from e_parse
731
+
732
+ # After the 500-retry loop for the current key/session
733
+ if ai_response_content.strip(): # Should have been returned if non-empty
734
+ pass # Should not reach here if content was found
735
+ else: # Content is still empty for this key/session after 500-retries (or if 200 OK but empty)
736
  if empty_retry_attempt_num >= max_empty_response_retries:
737
+ final_error_message = "重试次数过多,检查上下文长度! 或联系管理员!"
738
+ final_error_code = "max_retries_check_context_contact_admin"
739
+ logging.error(f"【同步请求】达到最大空回复重试次数 ({max_empty_response_retries})。将返回指定错误。")
740
+ return jsonify({
741
+ "error": final_error_message,
742
+ "id": "chatcmpl-" + str(uuid.uuid4())[:12], "object": "chat.completion", "created": int(time.time()),
743
+ "model": openai_model_name_for_response,
744
+ "choices": [{"index": 0, "message": {"role": "assistant", "content": ""}, "finish_reason": "length"}],
745
+ "usage": {}, "code": final_error_code
746
+ }), 500
747
+ logging.info(f"【同步请求】空回复(在500-重试循环之后),准备进行下一个空回复尝试。当前总尝试 {empty_retry_attempt_num}/{max_empty_response_retries}")
748
+ time.sleep(1)
749
+ # Outer loop (empty_retry_attempt_num) will continue to try a new key/session
750
+
751
+ final_fallback_error_message = "重试次数过多,请检查上下文长度! 或联系管理员!"
752
+ final_fallback_error_code = "max_retries_check_context_contact_admin_fallback"
753
+ logging.error(f"【同步请求】意外退出空回复重试循环。返回最终错误。")
754
+ return jsonify({"error": final_fallback_error_message, "code": final_fallback_error_code}), 500
755
 
756
 
757
  @app.route("/v1/models", methods=["GET"])