proxycf / app.py
Elysiadev11's picture
Fix syntax error: replace multi-line f-strings with variables in stream_anthropic
e0778bc verified
import os
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse, Response, StreamingResponse
from starlette.requests import ClientDisconnect
import time
import json
import asyncio
import uuid
app = FastAPI()
# ==========================================
# KONFIGURASI & LOAD KEYS
# ==========================================
BASE_URL = os.getenv("BASE_URL", "https://elysiadev11-proxyollma.hf.space")
MASTER_API_KEY = os.getenv("MASTER_API_KEY", "olla")
# Default model mapping (Claude → MiniMax)
DEFAULT_MODEL_MAPPING = {
# Opus models
"claude-opus-4-7": "minimax-m2.7:cloud",
"claude-opus-4-6": "minimax-m2.7:cloud",
"claude-opus-4-5": "minimax-m2.7:cloud",
"claude-opus-4-1": "minimax-m2.7:cloud",
"claude-opus-4-20250514": "minimax-m2.7:cloud",
# Sonnet models
"claude-sonnet-4-6": "minimax-m2.7:cloud",
"claude-sonnet-4-5": "minimax-m2.7:cloud",
"claude-sonnet-4-20250514": "minimax-m2.7:cloud",
# Haiku models
"claude-haiku-4-5": "minimax-m2.7:cloud",
"claude-haiku-4-5-20251001": "minimax-m2.7:cloud",
}
# Load model mapping dari ENV
def load_model_mapping():
mapping = DEFAULT_MODEL_MAPPING.copy()
env_map = os.getenv("CLAUDE_MODEL_MAP")
if env_map:
for pair in env_map.split(","):
if ":" in pair:
parts = pair.split(":", 1)
if len(parts) == 2:
claude_model = parts[0].strip()
ollama_model = parts[1].strip()
mapping[claude_model] = ollama_model
return mapping
def map_model(claude_model: str) -> str:
"""Map Claude model name to Ollama model"""
model_mapping = load_model_mapping()
# Try exact match first
if claude_model in model_mapping:
return model_mapping[claude_model]
# Fallback based on model family
if "opus" in claude_model.lower():
return os.getenv("DEFAULT_OPUS_MODEL", "minimax-m2.7:cloud")
if "haiku" in claude_model.lower():
return os.getenv("DEFAULT_HAIKU_MODEL", "minimax-m2.7:cloud")
# Default to Sonnet model
return os.getenv("DEFAULT_SONNET_MODEL", "minimax-m2.7:cloud")
OLLAMA_KEYS = []
# Mendukung hingga 100 API Key (OLLAMA_KEY_1 sampai OLLAMA_KEY_100)
for i in range(1, 101):
key = os.getenv(f"OLLAMA_KEY_{i}")
if key:
OLLAMA_KEYS.append(key)
if not OLLAMA_KEYS:
OLLAMA_KEYS.append("ollam") # Dummy key jika ENV kosong
# Inisialisasi Status Key
# Round-Robin Index for load balancing
last_used_index = 0
key_status = {}
for idx, k in enumerate(OLLAMA_KEYS, 1):
key_status[k] = {
"index": idx,
"prefix": k[:8] + "...",
"failures": 0,
"success": 0,
"healthy": True,
"in_use": False # Fitur Lock: 1 Key = 1 Request
}
def log(msg):
print(f"[{time.strftime('%H:%M:%S')}] {msg}")
def get_and_lock_key(exclude_keys=None):
"""
Round-Robin + Atomic Lock: Pilih key berurutan dari last_used_index.
Ini memastikan burst request terdistribusi merata ke semua key.
"""
global last_used_index
if exclude_keys is None:
exclude_keys = set()
# Cek apakah semua key mati? Jika ya, reset semuanya
if not any(v["healthy"] for v in key_status.values()):
log("⚠️ Semua API Key berstatus mati/unhealthy. Melakukan RESET MASSAL...")
for v in key_status.values():
v["failures"] = 0
v["healthy"] = True
last_used_index = 0
# Round-robin: cari key berurutan dari last_used_index
for i in range(len(OLLAMA_KEYS)):
idx = (last_used_index + i) % len(OLLAMA_KEYS)
key = OLLAMA_KEYS[idx]
if key_status[key]["healthy"] and not key_status[key]["in_use"] and key not in exclude_keys:
last_used_index = idx
key_status[key]["in_use"] = True
return key
return None
def anthropic_error(error_type: str, message: str, status_code: int = 400):
"""Format error in Anthropic style"""
return JSONResponse(
{
"type": "error",
"error": {
"type": error_type,
"message": message
}
},
status_code=status_code
)
def anthropic_to_ollama(body: dict) -> dict:
"""Convert Anthropic request to Ollama format"""
# Build messages array
messages = []
# Add system message if exists
if body.get("system"):
messages.append({
"role": "system",
"content": body["system"]
})
# Add conversation messages
for msg in body.get("messages", []):
# Handle content blocks (Anthropic support array or string)
content = msg["content"]
if isinstance(content, list):
# Extract text from content blocks
text_content = ""
for block in content:
if block.get("type") == "text":
text_content += block.get("text", "")
content = text_content
messages.append({
"role": msg["role"],
"content": content
})
# Map model
ollama_model = map_model(body.get("model", "claude-sonnet-4-6"))
# Build Ollama request
ollama_body = {
"model": ollama_model,
"messages": messages,
"stream": body.get("stream", False),
"options": {}
}
# Add optional parameters
if "max_tokens" in body:
ollama_body["options"]["num_predict"] = body["max_tokens"]
if "temperature" in body:
ollama_body["options"]["temperature"] = body["temperature"]
if "top_p" in body:
ollama_body["options"]["top_p"] = body["top_p"]
if "top_k" in body:
ollama_body["options"]["top_k"] = body["top_k"]
return ollama_body
def ollama_to_anthropic(ollama_response: dict, original_model: str) -> dict:
"""Convert Ollama response to Anthropic format"""
message = ollama_response.get("message", {})
# Map stop reasons
stop_reason_map = {
"stop": "end_turn",
"length": "max_tokens",
"eos": "end_turn",
"load": "end_turn",
"unload": "end_turn",
}
done_reason = ollama_response.get("done_reason", "stop")
return {
"id": f"msg_{uuid.uuid4().hex[:10]}",
"type": "message",
"role": "assistant",
"content": [
{
"type": "text",
"text": message.get("content", "")
}
],
"model": original_model,
"stop_reason": stop_reason_map.get(done_reason, "end_turn"),
"stop_sequence": None,
"usage": {
"input_tokens": ollama_response.get("prompt_eval_count", 0),
"output_tokens": ollama_response.get("eval_count", 0)
}
}
async def stream_anthropic(ollama_stream, original_model: str):
"""Convert Ollama streaming to Anthropic SSE format"""
message_id = f"msg_{uuid.uuid4().hex[:10]}"
# Send message_start
message_start_data = {
'type': 'message_start',
'message': {
'id': message_id,
'type': 'message',
'role': 'assistant',
'model': original_model,
'content': [],
'stop_reason': None,
'stop_sequence': None,
'usage': {'input_tokens': 0, 'output_tokens': 0}
}
}
yield f"data: {json.dumps(message_start_data)}\n\n"
# Send content_block_start
content_block_start_data = {
'type': 'content_block_start',
'index': 0,
'content_block': {'type': 'text'}
}
yield f"data: {json.dumps(content_block_start_data)}\n\n"
input_tokens = 0
output_tokens = 0
stop_reason = "end_turn"
# Stream content
async for line in ollama_stream:
if line.startswith("data: "):
data_str = line[6:]
try:
data = json.loads(data_str)
if data.get("done", False):
input_tokens = data.get("prompt_eval_count", 0)
output_tokens = data.get("eval_count", 0)
stop_reason = data.get("done_reason", "stop")
continue
message = data.get("message", {})
content = message.get("content", "")
if content:
# Send text_delta
content_block_delta_data = {
'type': 'content_block_delta',
'index': 0,
'delta': {
'type': 'text_delta',
'text': content
}
}
yield f"data: {json.dumps(content_block_delta_data)}\n\n"
except json.JSONDecodeError:
continue
# Send content_block_stop
content_block_stop_data = {
'type': 'content_block_stop',
'index': 0
}
yield f"data: {json.dumps(content_block_stop_data)}\n\n"
# Map stop reason
stop_reason_map = {
"stop": "end_turn",
"length": "max_tokens",
"eos": "end_turn",
}
# Send message_delta
message_delta_data = {
'type': 'message_delta',
'delta': {
'stop_reason': stop_reason_map.get(stop_reason, "end_turn"),
'stop_sequence': None
},
'usage': {'output_tokens': output_tokens}
}
yield f"data: {json.dumps(message_delta_data)}\n\n"
# Send message_stop
message_stop_data = {'type': 'message_stop'}
yield f"data: {json.dumps(message_stop_data)}\n\n"
# ==========================================
# ENDPOINTS
# ==========================================
@app.get("/")
def root():
return {
"status": "ok",
"total_keys_loaded": len(OLLAMA_KEYS),
"keys_status": {
v["prefix"]: {
"status": "BUSY" if v["in_use"] else "IDLE",
"healthy": v["healthy"],
"success": v["success"],
"failures": v["failures"]
} for v in key_status.values()
}
}
@app.get("/v1/models")
async def list_models(request: Request):
# Validate auth
auth_key = request.headers.get("Authorization", "").replace("Bearer ", "")
if auth_key != MASTER_API_KEY:
return JSONResponse(
{"error": {"type": "authentication_error", "message": "Unauthorized"}},
status_code=401
)
# Proxy to Ollama /api/tags
async with httpx.AsyncClient(timeout=30.0) as client:
try:
resp = await client.get(
f"{BASE_URL}/api/tags",
headers={"Authorization": f"Bearer {OLLAMA_KEYS[0]}"}
)
if resp.status_code != 200:
return JSONResponse(
{"error": {"type": "api_error", "message": "Failed to fetch models"}},
status_code=resp.status_code
)
ollama_data = resp.json()
# Convert to OpenAI format
models = []
created_time = int(time.time())
for model in ollama_data.get("models", []):
models.append({
"id": model.get("name", model.get("model", "")),
"object": "model",
"created": created_time,
"owned_by": "ollama"
})
return {"object": "list", "data": models}
except Exception as e:
log(f"Error fetching models: {e}")
return JSONResponse(
{"error": {"type": "api_error", "message": str(e)}},
status_code=500
)
@app.post("/v1/messages")
async def anthropic_chat(request: Request):
# Validate auth
auth_key = request.headers.get("Authorization", "").replace("Bearer ", "")
if auth_key != MASTER_API_KEY:
return anthropic_error("authentication_error", "Unauthorized", 401)
try:
body = await request.json()
except ClientDisconnect:
log("Client kabur sebelum proxy selesai membaca request body.")
return Response(status_code=499)
except json.JSONDecodeError:
return anthropic_error("invalid_request_error", "Invalid JSON", 400)
is_stream = body.get("stream", False)
original_model = body.get("model", "claude-sonnet-4-6")
# Convert to Ollama format
ollama_body = anthropic_to_ollama(body)
# ==========================================
# LOGIKA NON-STREAM
# ==========================================
if not is_stream:
tried_keys = set()
for attempt in range(len(OLLAMA_KEYS)):
if len(tried_keys) >= len(OLLAMA_KEYS):
tried_keys.clear()
key = None
log("Menunggu API Key idle (Antrean Non-Stream)...")
# Antrean Tanpa Batas Waktu
while True:
if await request.is_disconnected():
log("Client membatalkan request saat mengantre (Non-Stream).")
return Response(status_code=499)
# Gunakan fungsi Atomic Lock
key = get_and_lock_key(exclude_keys=tried_keys)
if key:
break # Langsung keluar loop, key SUDAH DIKUNCI
await asyncio.sleep(0.5) # Cek tiap setengah detik
ki = key_status[key]
tried_keys.add(key)
log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)")
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{BASE_URL}/v1/chat/completions",
json=ollama_body,
headers={"Authorization": f"Bearer {key}"}
)
if resp.status_code == 200:
ki["success"] += 1
ki["failures"] = 0
# Convert response to Anthropic format
ollama_response = resp.json()
anthropic_response = ollama_to_anthropic(ollama_response, original_model)
ki["in_use"] = False
log(f"RELEASE: key#{ki['index']} (Non-Stream)")
return JSONResponse(anthropic_response)
elif resp.status_code == 429:
ki["failures"] += 1
ki["healthy"] = False
log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.")
continue
else:
ki["failures"] += 1
continue
except Exception as e:
ki["failures"] += 1
log(f"Error Non-Stream: {e}")
continue
finally:
ki["in_use"] = False # SELALU LEPAS KUNCI
log(f"RELEASE: key#{ki['index']} (Non-Stream)")
return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500)
# ==========================================
# LOGIKA STREAMING
# ==========================================
async def stream_generator():
current_body = ollama_body.copy()
generated_text_buffer = ""
tried_keys = set()
for attempt in range(len(OLLAMA_KEYS)):
if len(tried_keys) >= len(OLLAMA_KEYS):
tried_keys.clear()
key = None
if attempt == 0:
log("Menunggu API Key idle (Antrean Stream Baru)...")
else:
log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...")
# Antrean Tanpa Batas Waktu
while True:
if await request.is_disconnected():
log("Client membatalkan request saat mengantre stream.")
return
# Gunakan fungsi Atomic Lock
key = get_and_lock_key(exclude_keys=tried_keys)
if key:
break # Langsung keluar loop, key SUDAH DIKUNCI
await asyncio.sleep(0.5)
ki = key_status[key]
tried_keys.add(key)
log(f"STREAM LOCK ACQUIRED: key#{ki['index']}")
if generated_text_buffer:
log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.")
messages = current_body.get("messages", [])
if messages and messages[-1].get("role") == "assistant":
messages[-1]["content"] = generated_text_buffer
else:
messages.append({"role": "assistant", "content": generated_text_buffer})
current_body["messages"] = messages
try:
custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0)
async with httpx.AsyncClient(timeout=custom_timeout) as client:
async with client.stream(
"POST", f"{BASE_URL}/v1/chat/completions",
json=current_body, headers={"Authorization": f"Bearer {key}"}
) as response:
if response.status_code == 429:
ki["failures"] += 1
ki["healthy"] = False
log(f"STREAM 429: key#{ki['index']} - Switching key...")
continue
if response.status_code != 200:
ki["failures"] += 1
log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...")
continue
stream_interrupted = False
try:
# Convert Ollama stream to Anthropic SSE
async for chunk in stream_anthropic(response.aiter_lines(), original_model):
yield chunk
ki["success"] += 1
ki["failures"] = 0
return
except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e:
log(f"STREAM PUTUS: key#{ki['index']}. Buffering...")
ki["failures"] += 1
stream_interrupted = True
if not stream_interrupted:
return
except Exception as e:
ki["failures"] += 1
log(f"STREAM EXCEPTION: key#{ki['index']} - {e}")
continue
finally:
# SELALU LEPAS KUNCI
ki["in_use"] = False
log(f"STREAM RELEASE: key#{ki['index']}")
yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n"
return StreamingResponse(stream_generator(), media_type="text/event-stream")
@app.post("/v1/chat/completions")
async def chat(req: Request):
auth_key = req.headers.get("Authorization", "").replace("Bearer ", "")
if auth_key != MASTER_API_KEY:
return JSONResponse({"error": "Unauthorized"}, status_code=401)
# Tangkap error jika client kabur (ClientDisconnect)
try:
body = await req.json()
except ClientDisconnect:
log("Client kabur sebelum proxy selesai membaca request body.")
return Response(status_code=499)
except json.JSONDecodeError:
return JSONResponse({"error": "Invalid JSON body"}, status_code=400)
is_stream = body.get("stream", False)
# ==========================================
# LOGIKA NON-STREAM
# ==========================================
if not is_stream:
tried_keys = set()
for attempt in range(len(OLLAMA_KEYS)):
if len(tried_keys) >= len(OLLAMA_KEYS):
tried_keys.clear()
key = None
log("Menunggu API Key idle (Antrean Non-Stream)...")
# Antrean Tanpa Batas Waktu
while True:
if await req.is_disconnected():
log("Client membatalkan request saat mengantre (Non-Stream).")
return Response(status_code=499)
# Gunakan fungsi Atomic Lock
key = get_and_lock_key(exclude_keys=tried_keys)
if key:
break # Langsung keluar loop, key SUDAH DIKUNCI
await asyncio.sleep(0.5) # Cek tiap setengah detik
ki = key_status[key]
tried_keys.add(key)
log(f"LOCK ACQUIRED: key#{ki['index']} (Non-Stream)")
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{BASE_URL}/v1/chat/completions",
json=body,
headers={"Authorization": f"Bearer {key}"}
)
if resp.status_code == 200:
ki["success"] += 1
ki["failures"] = 0
return Response(content=resp.content, media_type=resp.headers.get("content-type"))
elif resp.status_code == 429:
ki["failures"] += 1
ki["healthy"] = False
log(f"RATE LIMIT: key#{ki['index']} - Skip ke key berikutnya.")
continue
else:
ki["failures"] += 1
continue
except Exception as e:
ki["failures"] += 1
log(f"Error Non-Stream: {e}")
continue
finally:
ki["in_use"] = False # SELALU LEPAS KUNCI
log(f"RELEASE: key#{ki['index']} (Non-Stream)")
return JSONResponse({"error": "All keys failed after multiple attempts"}, status_code=500)
# ==========================================
# LOGIKA STREAMING (Seamless Fallback + Queue)
# ==========================================
async def stream_generator():
current_body = body.copy()
current_body["messages"] = [msg.copy() for msg in body.get("messages", [])]
generated_text_buffer = ""
tried_keys = set()
for attempt in range(len(OLLAMA_KEYS)):
if len(tried_keys) >= len(OLLAMA_KEYS):
tried_keys.clear()
key = None
if attempt == 0:
log("Menunggu API Key idle (Antrean Stream Baru)...")
else:
log(f"Menunggu API Key idle (Antrean Fallback ke-{attempt})...")
# Antrean Tanpa Batas Waktu
while True:
if await req.is_disconnected():
log("Client membatalkan request saat mengantre stream.")
return
# Gunakan fungsi Atomic Lock
key = get_and_lock_key(exclude_keys=tried_keys)
if key:
break # Langsung keluar loop, key SUDAH DIKUNCI
await asyncio.sleep(0.5)
ki = key_status[key]
tried_keys.add(key)
log(f"STREAM LOCK ACQUIRED: key#{ki['index']}")
if generated_text_buffer:
log(f"Resuming stream. Injecting {len(generated_text_buffer)} chars.")
messages = current_body.get("messages", [])
if messages and messages[-1].get("role") == "assistant":
messages[-1]["content"] = generated_text_buffer
else:
messages.append({"role": "assistant", "content": generated_text_buffer})
current_body["messages"] = messages
try:
custom_timeout = httpx.Timeout(connect=15.0, read=None, write=15.0, pool=10.0)
async with httpx.AsyncClient(timeout=custom_timeout) as client:
async with client.stream(
"POST", f"{BASE_URL}/v1/chat/completions",
json=current_body, headers={"Authorization": f"Bearer {key}"}
) as response:
if response.status_code == 429:
ki["failures"] += 1
ki["healthy"] = False
log(f"STREAM 429: key#{ki['index']} - Switching key...")
continue
if response.status_code != 200:
ki["failures"] += 1
log(f"STREAM ERR {response.status_code}: key#{ki['index']} - Switching key...")
continue
stream_interrupted = False
try:
async for chunk in response.aiter_lines():
if chunk:
if chunk.startswith("data: "):
data_str = chunk[6:]
if data_str.strip() == "[DONE]":
ki["success"] += 1
ki["failures"] = 0
yield chunk + "\n\n"
return
try:
data_json = json.loads(data_str)
if "choices" in data_json and len(data_json["choices"]) > 0:
delta = data_json["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
generated_text_buffer += content
except json.JSONDecodeError:
pass
yield chunk + "\n\n"
except (httpx.ReadTimeout, httpx.ReadError, httpx.RemoteProtocolError) as e:
log(f"STREAM PUTUS: key#{ki['index']}. Buffering...")
ki["failures"] += 1
stream_interrupted = True
if not stream_interrupted:
return
except Exception as e:
ki["failures"] += 1
log(f"STREAM EXCEPTION: key#{ki['index']} - {e}")
continue
finally:
# SELALU LEPAS KUNCI
ki["in_use"] = False
log(f"STREAM RELEASE: key#{ki['index']}")
yield f"data: {json.dumps({'error': 'Stream failed completely'})}\n\ndata: [DONE]\n\n"
return StreamingResponse(stream_generator(), media_type="text/event-stream")