AI / main.py
MatteoScript's picture
Update main.py
7a6dee9 verified
import os
import time
import random
import asyncio
import json
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security.api_key import APIKeyHeader
from pydantic import BaseModel
from typing import List, Optional
from dotenv import load_dotenv
from starlette.responses import StreamingResponse
from openai import OpenAI
from typing import List, Optional, Dict, Any
import copy
load_dotenv()
BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/"
EXPECTED_API_KEY = os.getenv("API_HUGGINGFACE")
API_KEY_NAME = "Authorization"
API_KEYS = [
os.getenv("API_GEMINI_1"),
os.getenv("API_GEMINI_2"),
os.getenv("API_GEMINI_3"),
os.getenv("API_GEMINI_4"),
os.getenv("API_GEMINI_5"),
]
# Classi Pydantic di VALIDAZIONE Body
class ChatCompletionRequest(BaseModel):
model: str = "gemini-2.0-flash"
messages: Optional[Any]
max_tokens: Optional[int] = 8196
temperature: Optional[float] = 0.8
stream: Optional[bool] = False
stream_options: Optional[Dict[str, Any]] = None
class Config:
extra = "allow"
# Server FAST API
app = FastAPI(title="OpenAI-SDK-compatible API", version="1.0.0", description="Un wrapper FastAPI compatibile con le specifiche dell'API OpenAI.")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Client OpenAI
def get_openai_client():
''' Client OpenAI passando in modo RANDOM le Chiavi API. In questo modo posso aggirare i limiti "Quota Exceeded" '''
api_key = random.choice(API_KEYS)
return OpenAI(api_key=api_key, base_url=BASE_URL)
# Validazione API
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False)
def verify_api_key(api_key: str = Depends(api_key_header)):
''' Validazione Chiave API - Per ora in ENV, Token HF '''
if not api_key:
raise HTTPException(status_code=403, detail="API key mancante")
if api_key != f"Bearer {EXPECTED_API_KEY}":
raise HTTPException(status_code=403, detail="API key non valida")
return api_key
# Correzione payload con content=None
def sanitize_messages(messages):
"""Convert None content to empty string to avoid Gemini API errors"""
if not messages:
return messages
for message in messages:
if message.get('content') is None:
message['content'] = " "
return messages
# Funzione per conversione Payload OpenAI to GEMINI (anomalia per ACTION) AnyOf, e property: {}
def convert_openai_schema_for_gemini(tools_schema):
if isinstance(tools_schema, str):
try:
tools_schema = json.loads(tools_schema)
except json.JSONDecodeError:
raise ValueError("Stringa JSON non valida fornita")
converted_schema = []
for tool in tools_schema:
if tool.get("type") != "function":
converted_schema.append(tool)
continue
converted_tool = {"type": "function", "function": {}}
func_def = tool.get("function", {})
if not func_def:
continue
converted_tool["function"]["name"] = func_def.get("name", "")
converted_tool["function"]["description"] = func_def.get("description", "")
if "parameters" in func_def:
params = func_def["parameters"]
converted_params = {"type": "object"}
if "properties" in params:
converted_properties = {}
for prop_name, prop_value in params["properties"].items():
cleaned = clean_schema_property(prop_value)
if cleaned:
converted_properties[prop_name] = cleaned
if converted_properties:
converted_params["properties"] = converted_properties
else:
converted_params["properties"] = {"parameter": {"type": "string"}}
else:
converted_params["properties"] = {"parameter": {"type": "string"}}
if "required" in params:
converted_params["required"] = params["required"]
converted_tool["function"]["parameters"] = converted_params
converted_schema.append(converted_tool)
return converted_schema
def clean_schema_property(prop):
if not isinstance(prop, dict):
return prop
result = {}
for key, value in prop.items():
if key in ("title", "default"):
continue
elif key == "anyOf":
if isinstance(value, list):
for item in value:
if isinstance(item, dict) and item.get("type") != "null":
cleaned_item = clean_schema_property(item)
for k, v in cleaned_item.items():
if k not in result:
result[k] = v
break
elif key == "oneOf":
if isinstance(value, list) and len(value) > 0:
cleaned_item = clean_schema_property(value[0])
for k, v in cleaned_item.items():
if k not in result:
result[k] = v
elif isinstance(value, dict):
cleaned_item = clean_schema_property(value)
for k, v in cleaned_item.items():
if k not in result:
result[k] = v
elif key == "properties" and isinstance(value, dict):
new_props = {}
for prop_name, prop_value in value.items():
cleaned_prop = clean_schema_property(prop_value)
if cleaned_prop:
new_props[prop_name] = cleaned_prop
if not new_props:
new_props = {"parameter": {"type": "string"}}
result[key] = new_props
elif key == "items" and isinstance(value, dict):
result[key] = clean_schema_property(value)
elif isinstance(value, list):
result[key] = [clean_schema_property(item) if isinstance(item, dict) else item for item in value]
else:
result[key] = value
if result.get("type") == "object" and ("properties" not in result or not result["properties"]):
result["properties"] = {"parameter": {"type": "string"}}
return result
def convert_payload_for_gemini(payload: ChatCompletionRequest):
if hasattr(payload, "model_dump"):
payload_converted = json.loads(payload.model_dump_json())
elif isinstance(payload, dict):
payload_converted = payload.copy()
else:
raise ValueError("Formato payload non supportato")
if "tools" in payload_converted:
payload_converted["tools"] = convert_openai_schema_for_gemini(payload_converted["tools"])
new_payload = ChatCompletionRequest.model_validate(payload_converted)
return new_payload
# ---------------------------------- Funzioni per Chat Completion ---------------------------------------
# Chiama API (senza Streaming)
def call_api_sync(params: ChatCompletionRequest):
''' Chiamata API senza streaming. Se da errore 429 lo rifa'''
try:
client = get_openai_client()
if params.messages:
params.messages = sanitize_messages(params.messages)
params = convert_payload_for_gemini(params)
print(params)
response_format = getattr(params, 'response_format', None)
if response_format and getattr(response_format, 'type', None) == 'json_schema':
response = client.beta.chat.completions.parse(**params.model_dump())
else:
response = client.chat.completions.create(**params.model_dump())
return response
except Exception as e:
if "429" in str(e):
time.sleep(2)
return call_api_sync(params)
else:
raise e
# Chiama API (con Streaming)
async def _resp_async_generator(params: ChatCompletionRequest):
''' Chiamata API con streaming. Se da errore 429 lo rifa'''
client = get_openai_client()
try:
response = client.chat.completions.create(**params.model_dump())
if params.messages:
params.messages = sanitize_messages(params.messages)
params = convert_payload_for_gemini(params)
for chunk in response:
chunk_data = chunk.to_dict() if hasattr(chunk, "to_dict") else chunk
yield f"data: {json.dumps(chunk_data)}\n\n"
await asyncio.sleep(0.01)
yield "data: [DONE]\n\n"
except Exception as e:
if "429" in str(e):
await asyncio.sleep(2)
async for item in _resp_async_generator(params):
yield item
else:
error_data = {"error": str(e)}
yield f"data: {json.dumps(error_data)}\n\n"
# ---------------------------------- Metodi API ---------------------------------------
@app.get("/")
def read_general():
return {"response": "Benvenuto"}
@app.get("/health")
async def health_check():
return {"message": "success"}
@app.post("/v1/chat/completions", dependencies=[Depends(verify_api_key)])
async def chat_completions(req: ChatCompletionRequest):
try:
print(req)
if not req.messages:
raise HTTPException(status_code=400, detail="Nessun messaggio fornito")
if not req.stream:
return call_api_sync(req)
else:
return StreamingResponse(_resp_async_generator(req), media_type="application/x-ndjson")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)