| import json |
| import time |
| import math |
| import asyncio |
| from typing import List, Dict, Any, Callable, Union |
| from fastapi.responses import JSONResponse, StreamingResponse |
|
|
| from google.auth.transport.requests import Request as AuthRequest |
| from google.genai import types |
| from google import genai |
|
|
| |
| from models import OpenAIRequest, OpenAIMessage |
| from message_processing import deobfuscate_text, convert_to_openai_format, convert_chunk_to_openai, create_final_chunk |
| import config as app_config |
|
|
| def create_openai_error_response(status_code: int, message: str, error_type: str) -> Dict[str, Any]: |
| return { |
| "error": { |
| "message": message, |
| "type": error_type, |
| "code": status_code, |
| "param": None, |
| } |
| } |
|
|
| def create_generation_config(request: OpenAIRequest) -> Dict[str, Any]: |
| config = {} |
| if request.temperature is not None: config["temperature"] = request.temperature |
| if request.max_tokens is not None: config["max_output_tokens"] = request.max_tokens |
| if request.top_p is not None: config["top_p"] = request.top_p |
| if request.top_k is not None: config["top_k"] = request.top_k |
| if request.stop is not None: config["stop_sequences"] = request.stop |
| if request.seed is not None: config["seed"] = request.seed |
| if request.presence_penalty is not None: config["presence_penalty"] = request.presence_penalty |
| if request.frequency_penalty is not None: config["frequency_penalty"] = request.frequency_penalty |
| if request.n is not None: config["candidate_count"] = request.n |
| config["safety_settings"] = [ |
| types.SafetySetting(category="HARM_CATEGORY_HATE_SPEECH", threshold="OFF"), |
| types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="OFF"), |
| types.SafetySetting(category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="OFF"), |
| types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="OFF"), |
| types.SafetySetting(category="HARM_CATEGORY_CIVIC_INTEGRITY", threshold="OFF") |
| ] |
| return config |
|
|
| def is_response_valid(response): |
| if response is None: |
| print("DEBUG: Response is None, therefore invalid.") |
| return False |
| |
| |
| if hasattr(response, 'text') and isinstance(response.text, str) and response.text.strip(): |
| |
| return True |
| |
| |
| if hasattr(response, 'candidates') and response.candidates: |
| for candidate in response.candidates: |
| if hasattr(candidate, 'text') and isinstance(candidate.text, str) and candidate.text.strip(): |
| |
| return True |
| if hasattr(candidate, 'content') and hasattr(candidate.content, 'parts') and candidate.content.parts: |
| for part in candidate.content.parts: |
| if hasattr(part, 'text') and isinstance(part.text, str) and part.text.strip(): |
| |
| return True |
| |
| |
| |
| |
| print("DEBUG: Response is invalid, no usable text content found by is_response_valid.") |
| return False |
|
|
| async def fake_stream_generator(client_instance, model_name: str, prompt: Union[types.Content, List[types.Content]], current_gen_config: Dict[str, Any], request_obj: OpenAIRequest, is_auto_attempt: bool): |
| response_id = f"chatcmpl-{int(time.time())}" |
| async def fake_stream_inner(): |
| print(f"FAKE STREAMING: Making non-streaming request to Gemini API (Model: {model_name})") |
| api_call_task = asyncio.create_task( |
| client_instance.aio.models.generate_content( |
| model=model_name, contents=prompt, config=current_gen_config |
| ) |
| ) |
| while not api_call_task.done(): |
| keep_alive_data = { |
| "id": "chatcmpl-keepalive", "object": "chat.completion.chunk", "created": int(time.time()), |
| "model": request_obj.model, "choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}] |
| } |
| yield f"data: {json.dumps(keep_alive_data)}\n\n" |
| await asyncio.sleep(app_config.FAKE_STREAMING_INTERVAL_SECONDS) |
| try: |
| response = api_call_task.result() |
|
|
| |
| if hasattr(response, 'prompt_feedback') and \ |
| hasattr(response.prompt_feedback, 'block_reason') and \ |
| response.prompt_feedback.block_reason: |
| block_message = f"Response blocked by safety filter: {response.prompt_feedback.block_reason}" |
| if hasattr(response.prompt_feedback, 'block_reason_message') and response.prompt_feedback.block_reason_message: |
| block_message = f"Response blocked by safety filter: {response.prompt_feedback.block_reason_message} (Reason: {response.prompt_feedback.block_reason})" |
| print(f"DEBUG: {block_message} (in fake_stream_generator)") |
| raise ValueError(block_message) |
|
|
| if not is_response_valid(response): |
| raise ValueError(f"Invalid/empty response in fake stream (no text content): {str(response)[:200]}") |
| |
| full_text = "" |
| if hasattr(response, 'text'): |
| full_text = response.text or "" |
| elif hasattr(response, 'candidates') and response.candidates: |
| |
| candidate = response.candidates[0] |
| if hasattr(candidate, 'text'): |
| full_text = candidate.text or "" |
| elif hasattr(candidate, 'content') and hasattr(candidate.content, 'parts') and candidate.content.parts: |
| |
| texts = [] |
| for part in candidate.content.parts: |
| if hasattr(part, 'text') and part.text is not None: |
| texts.append(part.text) |
| full_text = "".join(texts) |
| if request_obj.model.endswith("-encrypt-full"): |
| full_text = deobfuscate_text(full_text) |
| |
| chunk_size = max(20, math.ceil(len(full_text) / 10)) |
| for i in range(0, len(full_text), chunk_size): |
| chunk_text = full_text[i:i+chunk_size] |
| delta_data = { |
| "id": response_id, "object": "chat.completion.chunk", "created": int(time.time()), |
| "model": request_obj.model, "choices": [{"index": 0, "delta": {"content": chunk_text}, "finish_reason": None}] |
| } |
| yield f"data: {json.dumps(delta_data)}\n\n" |
| await asyncio.sleep(0.05) |
| yield create_final_chunk(request_obj.model, response_id) |
| yield "data: [DONE]\n\n" |
| except Exception as e: |
| err_msg = f"Error in fake_stream_generator: {str(e)}" |
| print(err_msg) |
| err_resp = create_openai_error_response(500, err_msg, "server_error") |
| |
| |
| json_payload_for_fake_stream_error = json.dumps(err_resp) |
| |
| print(f"DEBUG: Internal error in fake_stream_generator. JSON error for handler: {json_payload_for_fake_stream_error}") |
| if not is_auto_attempt: |
| yield f"data: {json_payload_for_fake_stream_error}\n\n" |
| yield "data: [DONE]\n\n" |
| raise e |
| return fake_stream_inner() |
|
|
| async def execute_gemini_call( |
| current_client: Any, |
| model_to_call: str, |
| prompt_func: Callable[[List[OpenAIMessage]], Union[types.Content, List[types.Content]]], |
| gen_config_for_call: Dict[str, Any], |
| request_obj: OpenAIRequest, |
| is_auto_attempt: bool = False |
| ): |
| actual_prompt_for_call = prompt_func(request_obj.messages) |
| |
| if request_obj.stream: |
| if app_config.FAKE_STREAMING_ENABLED: |
| return StreamingResponse( |
| await fake_stream_generator(current_client, model_to_call, actual_prompt_for_call, gen_config_for_call, request_obj, is_auto_attempt=is_auto_attempt), |
| media_type="text/event-stream" |
| ) |
|
|
| response_id_for_stream = f"chatcmpl-{int(time.time())}" |
| cand_count_stream = request_obj.n or 1 |
| |
| async def _stream_generator_inner_for_execute(): |
| try: |
| for c_idx_call in range(cand_count_stream): |
| async for chunk_item_call in await current_client.aio.models.generate_content_stream( |
| model=model_to_call, contents=actual_prompt_for_call, config=gen_config_for_call |
| ): |
| yield convert_chunk_to_openai(chunk_item_call, request_obj.model, response_id_for_stream, c_idx_call) |
| yield create_final_chunk(request_obj.model, response_id_for_stream, cand_count_stream) |
| yield "data: [DONE]\n\n" |
| except Exception as e_stream_call: |
| print(f"Streaming Error in _execute_gemini_call: {e_stream_call}") |
| |
| error_message_str = str(e_stream_call) |
| |
| if len(error_message_str) > 1024: |
| error_message_str = error_message_str[:1024] + "..." |
| |
| err_resp_content_call = create_openai_error_response(500, error_message_str, "server_error") |
| json_payload_for_error = json.dumps(err_resp_content_call) |
| |
| print(f"DEBUG: Internal error in _stream_generator_inner_for_execute. JSON error for handler: {json_payload_for_error}") |
| if not is_auto_attempt: |
| yield f"data: {json_payload_for_error}\n\n" |
| yield "data: [DONE]\n\n" |
| raise e_stream_call |
| return StreamingResponse(_stream_generator_inner_for_execute(), media_type="text/event-stream") |
| else: |
| response_obj_call = await current_client.aio.models.generate_content( |
| model=model_to_call, contents=actual_prompt_for_call, config=gen_config_for_call |
| ) |
|
|
| |
| if hasattr(response_obj_call, 'prompt_feedback') and \ |
| hasattr(response_obj_call.prompt_feedback, 'block_reason') and \ |
| response_obj_call.prompt_feedback.block_reason: |
| block_message = f"Response blocked by safety filter: {response_obj_call.prompt_feedback.block_reason}" |
| if hasattr(response_obj_call.prompt_feedback, 'block_reason_message') and response_obj_call.prompt_feedback.block_reason_message: |
| block_message = f"Response blocked by safety filter: {response_obj_call.prompt_feedback.block_reason_message} (Reason: {response_obj_call.prompt_feedback.block_reason})" |
| print(f"DEBUG: {block_message} (in execute_gemini_call non-streaming)") |
| raise ValueError(block_message) |
|
|
| if not is_response_valid(response_obj_call): |
| raise ValueError("Invalid/empty response from non-streaming Gemini call (no text content).") |
| return JSONResponse(content=convert_to_openai_format(response_obj_call, request_obj.model)) |