Spaces:
Sleeping
Sleeping
| import os | |
| import httpx | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse, Response, StreamingResponse | |
| from starlette.requests import ClientDisconnect | |
| import time | |
| import json | |
| import asyncio | |
| import uuid | |
| app = FastAPI() | |
| # ========================================== | |
| # KONFIGURASI & LOAD KEYS | |
| # ========================================== | |
| BASE_URL = os.getenv("BASE_URL", "https://elysiadev11-proxyollma.hf.space") | |
| MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla") | |
| # Default model mapping (Claude → MiniMax) | |
| DEFAULT_MODEL_MAPPING = { | |
| # Opus models | |
| "claude-opus-4-7": "minimax-m2.7:cloud", | |
| "claude-opus-4-6": "minimax-m2.7:cloud", | |
| "claude-opus-4-5": "minimax-m2.7:cloud", | |
| "claude-opus-4-1": "minimax-m2.7:cloud", | |
| "claude-opus-4-20250514": "minimax-m2.7:cloud", | |
| # Sonnet models | |
| "claude-sonnet-4-6": "minimax-m2.7:cloud", | |
| "claude-sonnet-4-5": "minimax-m2.7:cloud", | |
| "claude-sonnet-4-20250514": "minimax-m2.7:cloud", | |
| # Haiku models | |
| "claude-haiku-4-5": "minimax-m2.7:cloud", | |
| "claude-haiku-4-5-20251001": "minimax-m2.7:cloud", | |
| } | |
| # Load model mapping dari ENV | |
| def load_model_mapping(): | |
| mapping = DEFAULT_MODEL_MAPPING.copy() | |
| env_map = os.getenv("CLAUDE_MODEL_MAP") | |
| if env_map: | |
| for pair in env_map.split(","): | |
| if ":" in pair: | |
| parts = pair.split(":", 1) | |
| if len(parts) == 2: | |
| claude_model = parts[0].strip() | |
| ollama_model = parts[1].strip() | |
| mapping[claude_model] = ollama_model | |
| return mapping | |
| def map_model(claude_model: str) -> str: | |
| """Map Claude model name to Ollama model""" | |
| model_mapping = load_model_mapping() | |
| # Try exact match first | |
| if claude_model in model_mapping: | |
| return model_mapping[claude_model] | |
| # Fallback based on model family | |
| if "opus" in claude_model.lower(): | |
| return os.getenv("DEFAULT_OPUS_MODEL", "minimax-m2.7:cloud") | |
| if "haiku" in claude_model.lower(): | |
| return os.getenv("DEFAULT_HAIKU_MODEL", "minimax-m2.7:cloud") | |
| # Default to Sonnet model | |
| return os.getenv("DEFAULT_SONNET_MODEL", "minimax-m2.7:cloud") | |
| OLLAMA_KEYS = [] | |
| # Mendukung hingga 100 API Key (OLLAMA_KEY_1 sampai OLLAMA_KEY_100) | |
| for i in range(1, 101): | |
| key = os.getenv(f"OLLAMA_KEY_{i}") | |
| if key: | |
| OLLAMA_KEYS.append(key) | |
| if not OLLAMA_KEYS: | |
| OLLAMA_KEYS.append("ollam") # Dummy key jika ENV kosong | |
| # Inisialisasi Status Key | |
| # Round-Robin Index for load balancing | |
| last_used_index = 0 | |
| key_status = {} | |
| for idx, k in enumerate(OLLAMA_KEYS, 1): | |
| key_status[k] = { | |
| "index": idx, | |
| "prefix": k[:8] + "...", | |
| "failures": 0, | |
| "success": 0, | |
| "healthy": True, | |
| "in_use": False # Fitur Lock: 1 Key = 1 Request | |
| } | |
| def log(msg): | |
| print(f"[{time.strftime('%H:%M:%S')}] {msg}") | |
| def get_and_lock_key(exclude_keys=None): | |
| """ | |
| Round-Robin + Atomic Lock: Pilih key berurutan dari last_used_index. | |
| Ini memastikan burst request terdistribusi merata ke semua key. | |
| """ | |
| global last_used_index | |
| if exclude_keys is None: | |
| exclude_keys = set() | |
| # Cek apakah semua key mati? Jika ya, reset semuanya | |
| if not any(v["healthy"] for v in key_status.values()): | |
| log("⚠️ Semua API Key berstatus mati/unhealthy. Melakukan RESET MASSAL...") | |
| for v in key_status.values(): | |
| v["failures"] = 0 | |
| v["healthy"] = True | |
| last_used_index = 0 | |
| # Round-robin: cari key berurutan dari last_used_index | |
| for i in range(len(OLLAMA_KEYS)): | |
| idx = (last_used_index + i) % len(OLLAMA_KEYS) | |
| key = OLLAMA_KEYS[idx] | |
| if key_status[key]["healthy"] and not key_status[key]["in_use"] and key not in exclude_keys: | |
| last_used_index = idx | |
| key_status[key]["in_use"] = True | |
| return key | |
| return None | |
| def anthropic_error(error_type: str, message: str, status_code: int = 400): | |
| """Format error in Anthropic style""" | |
| return JSONResponse( | |
| { | |
| "type": "error", | |
| "error": { | |
| "type": error_type, | |
| "message": message | |
| } | |
| }, | |
| status_code=status_code | |
| ) | |
| def anthropic_to_ollama(body: dict) -> dict: | |
| """Convert Anthropic request to Ollama format""" | |
| # Build messages array | |
| messages = [] | |
| # Add system message if exists | |
| if body.get("system"): | |
| messages.append({ | |
| "role": "system", | |
| "content": body["system"] | |
| }) | |
| # Add conversation messages | |
| for msg in body.get("messages", []): | |
| # Handle content blocks (Anthropic support array or string) | |
| content = msg["content"] | |
| if isinstance(content, list): | |
| # Extract text from content blocks | |
| text_content = "" | |
| for block in content: | |
| if block.get("type") == "text": | |
| text_content += block.get("text", "") | |
| content = text_content | |
| messages.append({ | |
| "role": msg["role"], | |
| "content": content | |
| }) | |
| # Map model | |
| ollama_model = map_model(body.get("model", "claude-sonnet-4-6")) | |
| # Build Ollama request | |
| ollama_body = { | |
| "model": ollama_model, | |
| "messages": messages, | |
| "stream": body.get("stream", False), | |
| "options": {} | |
| } | |
| # Add optional parameters | |
| if "max_tokens" in body: | |
| ollama_body["options"]["num_predict"] = body["max_tokens"] | |
| if "temperature" in body: | |
| ollama_body["options"]["temperature"] = body["temperature"] | |
| if "top_p" in body: | |
| ollama_body["options"]["top_p"] = body["top_p"] | |
| if "top_k" in body: | |
| ollama_body["options"]["top_k"] = body["top_k"] | |
| return ollama_body | |
| def ollama_to_anthropic(ollama_response: dict, original_model: str) -> dict: | |
| """Convert Ollama response to Anthropic format""" | |
| message = ollama_response.get("message", {}) | |
| # Map stop reasons | |
| stop_reason_map = { | |
| "stop": "end_turn", | |
| "length": "max_tokens", | |
| "eos": "end_turn", | |
| "load": "end_turn", | |
| "unload": "end_turn", | |
| } | |
| done_reason = ollama_response.get("done_reason", "stop") | |
| return { | |
| "id": f"msg_{uuid.uuid4().hex[:10]}", | |
| "type": "message", | |
| "role": "assistant", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": message.get("content", "") | |
| } | |
| ], | |
| "model": original_model, | |
| "stop_reason": stop_reason_map.get(done_reason, "end_turn"), | |
| "stop_sequence": None, | |
| "usage": { | |
| "input_tokens": ollama_response.get("prompt_eval_count", 0), | |
| "output_tokens": ollama_response.get("eval_count", 0) | |
| } | |
| } | |
| async def stream_anthropic(ollama_stream, original_model: str): | |
| """Convert Ollama streaming to Anthropic SSE format""" | |
| message_id = f"msg_{uuid.uuid4().hex[:10]}" | |
| # Send message_start | |
| message_start_data = { | |
| 'type': 'message_start', | |
| 'message': { | |
| 'id': message_id, | |
| 'type': 'message', | |
| 'role': 'assistant', | |
| 'model': original_model, | |
| 'content': [], | |
| 'stop_reason': None, | |
| 'stop_sequence': None, | |
| 'usage': {'input_tokens': 0, 'output_tokens': 0} | |
| } | |
| } | |
| yield f"data: {json.dumps(message_start_data)}\n\n" | |
| # Send content_block_start | |
| content_block_start_data = { | |
| 'type': 'content_block_start', | |
| 'index': 0, | |
| 'content_block': {'type': 'text'} | |
| } | |
| yield f"data: {json.dumps(content_block_start_data)}\n\n" | |
| input_tokens = 0 | |
| output_tokens = 0 | |
| stop_reason = "end_turn" | |
| # Stream content | |
| async for line in ollama_stream: | |
| if line.startswith("data: "): | |
| data_str = line[6:] | |
| try: | |
| data = json.loads(data_str) | |
| if data.get("done", False): | |
| input_tokens = data.get("prompt_eval_count", 0) | |
| output_tokens = data.get("eval_count", 0) | |
| stop_reason = data.get("done_reason", "stop") | |
| continue | |
| message = data.get("message", {}) | |
| content = message.get("content", "") | |
| if content: | |
| # Send text_delta | |
| content_block_delta_data = { | |
| 'type': 'content_block_delta', | |
| 'index': 0, | |
| 'delta': { | |
| 'type': 'text_delta', | |
| 'text': content | |
| } | |
| } | |
| yield f"data: {json.dumps(content_block_delta_data)}\n\n" | |
| except json.JSONDecodeError: | |
| continue | |
| # Send content_block_stop | |
| content_block_stop_data = { | |
| 'type': 'content_block_stop', | |
| 'index': 0 | |
| } | |
| yield f"data: {json.dumps(content_block_stop_data)}\n\n" | |
| # Map stop reason | |
| stop_reason_map = { | |
| "stop": "end_turn", | |
| "length": "max_tokens", | |
| "eos": "end_turn", | |
| } | |
| # Send message_delta | |
| message_delta_data = { | |
| 'type': 'message_delta', | |
| 'delta': { | |
| 'stop_reason': stop_reason_map.get(stop_reason, "end_turn"), | |
| 'stop_sequence': None | |
| }, | |
| 'usage': {'output_tokens': output_tokens} | |
| } | |
| yield f"data: {json.dumps(message_delta_data)}\n\n" | |
| # Send message_stop | |
| message_stop_data = {'type': 'message_stop'} | |
| yield f"data: {json.dumps(message_stop_data)}\n\n" | |
| # ========================================== | |
| # ENDPOINTS | |
| # ========================================== | |
| def root(): | |
| return { | |
| "status": "ok", | |
| "total_keys_loaded": len(OLLAMA_KEYS), | |
| "keys_status": { | |
| v["prefix"]: { | |
| "status": "BUSY" if v["in_use"] else "IDLE", | |
| "healthy": v["healthy"], | |
| "success": v["success"], | |
| "failures": v["failures"] | |
| } for v in key_status.values() | |
| } | |
| } | |
| async def list_models(request: Request): | |
| # Validate auth | |
| auth_key = request.headers.get("Authorization", "").replace("Bearer ", "") | |
| if auth_key != MASTER_API_KEY: | |
| return JSONResponse( | |
| {"error": {"type": "authentication_error", "message": "Unauthorized"}}, | |
| status_code=401 | |
| ) | |
| # Proxy to Ollama /api/tags | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| try: | |
| resp = await client.get( | |
| f"{BASE_URL}/api/tags", | |
| headers={"Authorization": f"Bearer {OLLAMA_KEYS[0]}"} | |
| ) | |
| if resp.status_code != 200: | |
| return JSONResponse( | |
| {"error": {"type": "api_error", "message": "Failed to fetch models"}}, | |
| status_code=resp.status_code | |
| ) | |
| ollama_data = resp.json() | |
| # Convert to OpenAI format | |
| models = [] | |
| created_time = int(time.time()) | |
| for model in ollama_data.get("models", []): | |
| models.append({ | |
| "id": model.get("name", model.get("model", "")), | |
| "object": "model", | |
| "created": created_time, | |
| "owned_by": "ollama" | |
| }) | |
| return {"object": "list", "data": models} | |
| except Exception as e: | |
| log(f"Error fetching models: {e}") | |
| return JSONResponse( | |
| {"error": {"type": "api_error", "message": str(e)}}, | |
| status_code=500 | |
| ) | |
| async def anthropic_chat(request: Request): | |
| # Validate auth | |
| auth_key = request.headers.get("Authorization", "").replace("Bearer ", "") | |
| if auth_key != MASTER_API_KEY: | |
| return anthropic_error("authentication_error", "Unauthorized", 401) | |
| try: | |
| body = await request.json() | |
| except ClientDisconnect: | |
| log("Client kabur sebelum proxy selesai membaca request body.") | |
| return Response(status_code=499) | |
| except json.JSONDecodeError: | |
| return anthropic_error("invalid_request_error", "Invalid JSON", 400) | |
| is_stream = body.get("stream", False) | |
| original_model = body.get("model", "claude-sonnet-4-6") | |
| # Convert to Ollama format | |
| ollama_body = anthropic_to_ollama(body) | |
| # ========================================== | |
| # LOGIKA NON-STREAM | |
| # ========================================== | |
| if not is_stream: | |
| tried_keys = set() | |
| for attempt in range(len(OLLAMA_KEYS)): | |
| if len(tried_keys) >= len(OLLAMA_KEYS): | |
| tried_keys.clear() | |
| key = None | |
| log("Menunggu API Key idle (Antrean Non-Stream)...") | |
| # Antrean Tanpa Batas Waktu | |
| while True: | |
| if await request.is_disconnected(): | |
| log("Client membatalkan request saat mengantre (Non-Stream).") | |
| return Response(status_code=499) | |
| # Gunakan fungsi Atomic Lock | |
| key = get_and_lock_key(exclude_keys=tried_keys) | |
| if key: | |
| break # Langsung keluar loop, key SUDAH DIKUNCI | |
| await asyncio.sleep(0.5) # Cek tiap setengah detik | |
| ki = key_status[key] | |
| tried_keys.add(key) | |
| log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)") | |
| try: | |
| async with httpx.AsyncClient(timeout=120.0) as client: | |
| resp = await client.post( | |
| f"{BASE_URL}/v1/chat/completions", | |
| json=ollama_body, | |
| headers={"Authorization": f"Bearer {key}"} | |
| ) | |
| if resp.status_code == 200: | |
| ki["success"] += 1 | |
| ki["failures"] = 0 | |
| # Convert response to Anthropic format | |
| ollama_response = resp.json() | |
| anthropic_response = ollama_to_anthropic(ollama_response, original_model) | |
| ki["in_use"] = False | |
| log(f"RELEASE: key#{ki['index']} (Non-Stream)") | |
| return JSONResponse(anthropic_response) | |
| elif resp.status_code == 429: | |
| ki["failures"] += 1 | |
| ki["healthy"] = False | |
| log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.") | |
| continue | |
| else: | |
| ki["failures"] += 1 | |
| continue | |
| except Exception as e: | |
| ki["failures"] += 1 | |
| log(f"Error Non-Stream: {e}") | |
| continue | |
| finally: | |
| ki["in_use"] = False # SELALU LEPAS KUNCI | |
| log(f"RELEASE: key#{ki['index']} (Non-Stream)") | |
| return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500) | |
| # ========================================== | |
| # LOGIKA STREAMING | |
| # ========================================== | |
| async def stream_generator(): | |
| current_body = ollama_body.copy() | |
| generated_text_buffer = "" | |
| tried_keys = set() | |
| for attempt in range(len(OLLAMA_KEYS)): | |
| if len(tried_keys) >= len(OLLAMA_KEYS): | |
| tried_keys.clear() | |
| key = None | |
| if attempt == 0: | |
| log("Menunggu API Key idle (Antrean Stream Baru)...") | |
| else: | |
| log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...") | |
| # Antrean Tanpa Batas Waktu | |
| while True: | |
| if await request.is_disconnected(): | |
| log("Client membatalkan request saat mengantre stream.") | |
| return | |
| # Gunakan fungsi Atomic Lock | |
| key = get_and_lock_key(exclude_keys=tried_keys) | |
| if key: | |
| break # Langsung keluar loop, key SUDAH DIKUNCI | |
| await asyncio.sleep(0.5) | |
| ki = key_status[key] | |
| tried_keys.add(key) | |
| log(f"STREAM LOCK ACQUIRED: key#{ki['index']}") | |
| if generated_text_buffer: | |
| log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.") | |
| messages = current_body.get("messages", []) | |
| if messages and messages[-1].get("role") == "assistant": | |
| messages[-1]["content"] = generated_text_buffer | |
| else: | |
| messages.append({"role": "assistant", "content": generated_text_buffer}) | |
| current_body["messages"] = messages | |
| try: | |
| custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0) | |
| async with httpx.AsyncClient(timeout=custom_timeout) as client: | |
| async with client.stream( | |
| "POST", f"{BASE_URL}/v1/chat/completions", | |
| json=current_body, headers={"Authorization": f"Bearer {key}"} | |
| ) as response: | |
| if response.status_code == 429: | |
| ki["failures"] += 1 | |
| ki["healthy"] = False | |
| log(f"STREAM 429: key#{ki['index']} - Switching key...") | |
| continue | |
| if response.status_code != 200: | |
| ki["failures"] += 1 | |
| log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...") | |
| continue | |
| stream_interrupted = False | |
| try: | |
| # Convert Ollama stream to Anthropic SSE | |
| async for chunk in stream_anthropic(response.aiter_lines(), original_model): | |
| yield chunk | |
| ki["success"] += 1 | |
| ki["failures"] = 0 | |
| return | |
| except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e: | |
| log(f"STREAM PUTUS: key#{ki['index']}. Buffering...") | |
| ki["failures"] += 1 | |
| stream_interrupted = True | |
| if not stream_interrupted: | |
| return | |
| except Exception as e: | |
| ki["failures"] += 1 | |
| log(f"STREAM EXCEPTION: key#{ki['index']} - {e}") | |
| continue | |
| finally: | |
| # SELALU LEPAS KUNCI | |
| ki["in_use"] = False | |
| log(f"STREAM RELEASE: key#{ki['index']}") | |
| yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n" | |
| return StreamingResponse(stream_generator(), media_type="text/event-stream") | |
| async def chat(req: Request): | |
| auth_key = req.headers.get("Authorization", "").replace("Bearer ", "") | |
| if auth_key != MASTER_API_KEY: | |
| return JSONResponse({"error": "Unauthorized"}, status_code=401) | |
| # Tangkap error jika client kabur (ClientDisconnect) | |
| try: | |
| body = await req.json() | |
| except ClientDisconnect: | |
| log("Client kabur sebelum proxy selesai membaca request body.") | |
| return Response(status_code=499) | |
| except json.JSONDecodeError: | |
| return JSONResponse({"error": "Invalid JSON body"}, status_code=400) | |
| is_stream = body.get("stream", False) | |
| # ========================================== | |
| # LOGIKA NON-STREAM | |
| # ========================================== | |
| if not is_stream: | |
| tried_keys = set() | |
| for attempt in range(len(OLLAMA_KEYS)): | |
| if len(tried_keys) >= len(OLLAMA_KEYS): | |
| tried_keys.clear() | |
| key = None | |
| log("Menunggu API Key idle (Antrean Non-Stream)...") | |
| # Antrean Tanpa Batas Waktu | |
| while True: | |
| if await req.is_disconnected(): | |
| log("Client membatalkan request saat mengantre (Non-Stream).") | |
| return Response(status_code=499) | |
| # Gunakan fungsi Atomic Lock | |
| key = get_and_lock_key(exclude_keys=tried_keys) | |
| if key: | |
| break # Langsung keluar loop, key SUDAH DIKUNCI | |
| await asyncio.sleep(0.5) # Cek tiap setengah detik | |
| ki = key_status[key] | |
| tried_keys.add(key) | |
| log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)") | |
| try: | |
| async with httpx.AsyncClient(timeout=120.0) as client: | |
| resp = await client.post( | |
| f"{BASE_URL}/v1/chat/completions", | |
| json=body, | |
| headers={"Authorization": f"Bearer {key}"} | |
| ) | |
| if resp.status_code == 200: | |
| ki["success"] += 1 | |
| ki["failures"] = 0 | |
| return Response(content=resp.content, media_type=resp.headers.get("content-type")) | |
| elif resp.status_code == 429: | |
| ki["failures"] += 1 | |
| ki["healthy"] = False | |
| log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.") | |
| continue | |
| else: | |
| ki["failures"] += 1 | |
| continue | |
| except Exception as e: | |
| ki["failures"] += 1 | |
| log(f"Error Non-Stream: {e}") | |
| continue | |
| finally: | |
| ki["in_use"] = False # SELALU LEPAS KUNCI | |
| log(f"RELEASE: key#{ki['index']} (Non-Stream)") | |
| return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500) | |
| # ========================================== | |
| # LOGIKA STREAMING (Seamless Fallback + Queue) | |
| # ========================================== | |
| async def stream_generator(): | |
| current_body = body.copy() | |
| current_body["messages"] = [msg.copy() for msg in body.get("messages", [])] | |
| generated_text_buffer = "" | |
| tried_keys = set() | |
| for attempt in range(len(OLLAMA_KEYS)): | |
| if len(tried_keys) >= len(OLLAMA_KEYS): | |
| tried_keys.clear() | |
| key = None | |
| if attempt == 0: | |
| log("Menunggu API Key idle (Antrean Stream Baru)...") | |
| else: | |
| log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...") | |
| # Antrean Tanpa Batas Waktu | |
| while True: | |
| if await req.is_disconnected(): | |
| log("Client membatalkan request saat mengantre stream.") | |
| return | |
| # Gunakan fungsi Atomic Lock | |
| key = get_and_lock_key(exclude_keys=tried_keys) | |
| if key: | |
| break # Langsung keluar loop, key SUDAH DIKUNCI | |
| await asyncio.sleep(0.5) | |
| ki = key_status[key] | |
| tried_keys.add(key) | |
| log(f"STREAM LOCK ACQUIRED: key#{ki['index']}") | |
| if generated_text_buffer: | |
| log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.") | |
| messages = current_body.get("messages", []) | |
| if messages and messages[-1].get("role") == "assistant": | |
| messages[-1]["content"] = generated_text_buffer | |
| else: | |
| messages.append({"role": "assistant", "content": generated_text_buffer}) | |
| current_body["messages"] = messages | |
| try: | |
| custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0) | |
| async with httpx.AsyncClient(timeout=custom_timeout) as client: | |
| async with client.stream( | |
| "POST", f"{BASE_URL}/v1/chat/completions", | |
| json=current_body, headers={"Authorization": f"Bearer {key}"} | |
| ) as response: | |
| if response.status_code == 429: | |
| ki["failures"] += 1 | |
| ki["healthy"] = False | |
| log(f"STREAM 429: key#{ki['index']} - Switching key...") | |
| continue | |
| if response.status_code != 200: | |
| ki["failures"] += 1 | |
| log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...") | |
| continue | |
| stream_interrupted = False | |
| try: | |
| async for chunk in response.aiter_lines(): | |
| if chunk: | |
| if chunk.startswith("data: "): | |
| data_str = chunk[6:] | |
| if data_str.strip() == "[DONE]": | |
| ki["success"] += 1 | |
| ki["failures"] = 0 | |
| yield chunk + "\n\n" | |
| return | |
| try: | |
| data_json = json.loads(data_str) | |
| if "choices" in data_json and len(data_json["choices"]) > 0: | |
| delta = data_json["choices"][0].get("delta", {}) | |
| content = delta.get("content", "") | |
| if content: | |
| generated_text_buffer += content | |
| except json.JSONDecodeError: | |
| pass | |
| yield chunk + "\n\n" | |
| except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e: | |
| log(f"STREAM PUTUS: key#{ki['index']}. Buffering...") | |
| ki["failures"] += 1 | |
| stream_interrupted = True | |
| if not stream_interrupted: | |
| return | |
| except Exception as e: | |
| ki["failures"] += 1 | |
| log(f"STREAM EXCEPTION: key#{ki['index']} - {e}") | |
| continue | |
| finally: | |
| # SELALU LEPAS KUNCI | |
| ki["in_use"] = False | |
| log(f"STREAM RELEASE: key#{ki['index']}") | |
| yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n" | |
| return StreamingResponse(stream_generator(), media_type="text/event-stream") | |