Spaces:
Running
Running
import os | |
import time | |
import random | |
import asyncio | |
import json | |
from fastapi import FastAPI, HTTPException, Depends, File, UploadFile, Form | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.security.api_key import APIKeyHeader | |
from pydantic import BaseModel | |
from typing import List, Optional | |
from dotenv import load_dotenv | |
from starlette.responses import StreamingResponse | |
from openai import OpenAI | |
from typing import List, Optional, Dict, Any | |
import io | |
import copy | |
from pathlib import Path | |
from pydub import AudioSegment | |
load_dotenv() | |
BASE_URL = "https://generativelanguage.googleapis.com/v1beta/openai/" | |
EXPECTED_API_KEY = os.getenv("API_HUGGINGFACE") | |
API_KEY_NAME = "Authorization" | |
API_KEYS = [ | |
os.getenv("API_GEMINI_1"), | |
os.getenv("API_GEMINI_2"), | |
os.getenv("API_GEMINI_3"), | |
os.getenv("API_GEMINI_4"), | |
os.getenv("API_GEMINI_5"), | |
] | |
GROQ_BASE_URL = "https://api.groq.com/openai/v1" | |
WHISPER_MODEL = "whisper-large-v3-turbo" | |
SEGMENT_MINUTES = 50 | |
GROQ_API_KEYS = [ | |
os.getenv("API_GROQ_1"), | |
#os.getenv("API_GROQ_2"), | |
#os.getenv("API_GROQ_3"), | |
#os.getenv("API_GROQ_4"), | |
#os.getenv("API_GROQ_5") | |
] | |
# Classi Pydantic di VALIDAZIONE Body | |
class ChatCompletionRequest(BaseModel): | |
model: str = "gemini-2.0-flash" | |
messages: Optional[Any] | |
temperature: Optional[float] = 0.8 | |
stream: Optional[bool] = False | |
stream_options: Optional[Dict[str, Any]] = None | |
class Config: | |
extra = "allow" | |
# Server FAST API | |
app = FastAPI(title="OpenAI-SDK-compatible API", version="1.0.0", description="Un wrapper FastAPI compatibile con le specifiche dell'API OpenAI.") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Client OpenAI | |
def get_openai_client(): | |
''' Client OpenAI passando in modo RANDOM le Chiavi API. In questo modo posso aggirare i limiti "Quota Exceeded" ''' | |
api_key = random.choice(API_KEYS) | |
return OpenAI(api_key=api_key, base_url=BASE_URL) | |
# Validazione API | |
api_key_header = APIKeyHeader(name=API_KEY_NAME, auto_error=False) | |
def verify_api_key(api_key: str = Depends(api_key_header)): | |
''' Validazione Chiave API - Per ora in ENV, Token HF ''' | |
if not api_key: | |
raise HTTPException(status_code=403, detail="API key mancante") | |
if api_key != f"Bearer {EXPECTED_API_KEY}": | |
raise HTTPException(status_code=403, detail="API key non valida") | |
return api_key | |
# Correzione payload con content=None | |
def sanitize_messages(messages): | |
"""Convert None content to empty string to avoid Gemini API errors""" | |
if not messages: | |
return messages | |
for message in messages: | |
if message.get('content') is None: | |
message['content'] = " " | |
return messages | |
# Funzione per conversione Payload OpenAI to GEMINI (anomalia per ACTION) AnyOf, e property: {} | |
def convert_openai_schema_for_gemini(tools_schema): | |
if isinstance(tools_schema, str): | |
try: | |
tools_schema = json.loads(tools_schema) | |
except json.JSONDecodeError: | |
raise ValueError("Stringa JSON non valida fornita") | |
converted_schema = [] | |
for tool in tools_schema: | |
if tool.get("type") != "function": | |
converted_schema.append(tool) | |
continue | |
converted_tool = {"type": "function", "function": {}} | |
func_def = tool.get("function", {}) | |
if not func_def: | |
continue | |
converted_tool["function"]["name"] = func_def.get("name", "") | |
converted_tool["function"]["description"] = func_def.get("description", "") | |
if "parameters" in func_def: | |
params = func_def["parameters"] | |
converted_params = {"type": "object"} | |
if "properties" in params: | |
converted_properties = {} | |
for prop_name, prop_value in params["properties"].items(): | |
cleaned = clean_schema_property(prop_value) | |
if cleaned: | |
converted_properties[prop_name] = cleaned | |
if converted_properties: | |
converted_params["properties"] = converted_properties | |
else: | |
converted_params["properties"] = {"parameter": {"type": "string"}} | |
else: | |
converted_params["properties"] = {"parameter": {"type": "string"}} | |
if "required" in params: | |
converted_params["required"] = params["required"] | |
converted_tool["function"]["parameters"] = converted_params | |
converted_schema.append(converted_tool) | |
return converted_schema | |
def clean_schema_property(prop): | |
if not isinstance(prop, dict): | |
return prop | |
result = {} | |
for key, value in prop.items(): | |
if key in ("title", "default"): | |
continue | |
elif key == "anyOf": | |
if isinstance(value, list): | |
for item in value: | |
if isinstance(item, dict) and item.get("type") != "null": | |
cleaned_item = clean_schema_property(item) | |
for k, v in cleaned_item.items(): | |
if k not in result: | |
result[k] = v | |
break | |
elif key == "oneOf": | |
if isinstance(value, list) and len(value) > 0: | |
cleaned_item = clean_schema_property(value[0]) | |
for k, v in cleaned_item.items(): | |
if k not in result: | |
result[k] = v | |
elif isinstance(value, dict): | |
cleaned_item = clean_schema_property(value) | |
for k, v in cleaned_item.items(): | |
if k not in result: | |
result[k] = v | |
elif key == "properties" and isinstance(value, dict): | |
new_props = {} | |
for prop_name, prop_value in value.items(): | |
cleaned_prop = clean_schema_property(prop_value) | |
if cleaned_prop: | |
new_props[prop_name] = cleaned_prop | |
if not new_props: | |
new_props = {"parameter": {"type": "string"}} | |
result[key] = new_props | |
elif key == "items" and isinstance(value, dict): | |
result[key] = clean_schema_property(value) | |
elif isinstance(value, list): | |
result[key] = [clean_schema_property(item) if isinstance(item, dict) else item for item in value] | |
else: | |
result[key] = value | |
if result.get("type") == "object" and ("properties" not in result or not result["properties"]): | |
result["properties"] = {"parameter": {"type": "string"}} | |
return result | |
def convert_payload_for_gemini(payload: ChatCompletionRequest): | |
if hasattr(payload, "model_dump"): | |
payload_converted = json.loads(payload.model_dump_json()) | |
elif isinstance(payload, dict): | |
payload_converted = payload.copy() | |
else: | |
raise ValueError("Formato payload non supportato") | |
payload_converted.pop("metadata", None) | |
payload_converted.pop("store", None) | |
if "tools" in payload_converted: | |
payload_converted["tools"] = convert_openai_schema_for_gemini(payload_converted["tools"]) | |
new_payload = ChatCompletionRequest.model_validate(payload_converted) | |
return new_payload | |
# ---------------------------------- Funzioni per Chat Completion --------------------------------------- | |
# Chiama API (senza Streaming) | |
def call_api_sync(params: ChatCompletionRequest): | |
''' Chiamata API senza streaming. Se da errore 429 lo rifa''' | |
try: | |
client = get_openai_client() | |
if params.messages: | |
params.messages = sanitize_messages(params.messages) | |
params = convert_payload_for_gemini(params) | |
print('------------------------------------------------------- INPUT ---------------------------------------------------------------') | |
print(params) | |
response_format = getattr(params, 'response_format', None) | |
if response_format and getattr(response_format, 'type', None) == 'json_schema': | |
response = client.beta.chat.completions.parse(**params.model_dump()) | |
else: | |
response = client.chat.completions.create(**params.model_dump()) | |
print('------------------------------------------------------- OUTPUT ---------------------------------------------------------------') | |
print(response) | |
print("") | |
return response | |
except Exception as e: | |
if "429" in str(e): | |
time.sleep(2) | |
return call_api_sync(params) | |
else: | |
raise e | |
# Chiama API (con Streaming) | |
async def _resp_async_generator(params: ChatCompletionRequest): | |
''' Chiamata API con streaming. Se da errore 429 lo rifa''' | |
client = get_openai_client() | |
try: | |
response = client.chat.completions.create(**params.model_dump()) | |
if params.messages: | |
params.messages = sanitize_messages(params.messages) | |
params = convert_payload_for_gemini(params) | |
print('------------------------------------------------------- INPUT ---------------------------------------------------------------') | |
print(params.model_dump_json(indent=4)) | |
final_response_content = '' | |
for chunk in response: | |
chunk_data = chunk.to_dict() if hasattr(chunk, "to_dict") else chunk | |
chunk_content = None | |
if chunk.choices and chunk.choices[0].delta: | |
chunk_content = chunk.choices[0].delta.content | |
if chunk_content: | |
final_response_content += chunk_content | |
yield f"data: {json.dumps(chunk_data)}\n\n" | |
await asyncio.sleep(0.01) | |
yield "data: [DONE]\n\n" | |
print('------------------------------------------------------- OUTPUT ---------------------------------------------------------------') | |
print(final_response_content) | |
except Exception as e: | |
if "429" in str(e): | |
await asyncio.sleep(2) | |
async for item in _resp_async_generator(params): | |
yield item | |
else: | |
error_data = {"error": str(e)} | |
yield f"data: {json.dumps(error_data)}\n\n" | |
def get_openai_client(): | |
''' Client OpenAI passando in modo RANDOM le Chiavi API. In questo modo posso aggirare i limiti "Quota Exceeded" ''' | |
api_key = random.choice(API_KEYS) | |
return OpenAI(api_key=api_key, base_url=BASE_URL) | |
# API Whisper Audio: | |
FORMAT_ALIASES = { | |
"mpeg": "mp3", | |
"x-wav": "wav", | |
"vnd.wave": "wav", | |
"x-m4a": "m4a", | |
"x-aac": "aac", | |
} | |
def _detect_format(upload_file: UploadFile) -> str: | |
"""Rileva il formato audio dal MIME-type o dall'estensione, con alias safe.""" | |
if upload_file.content_type and upload_file.content_type.startswith("audio/"): | |
fmt = upload_file.content_type.split("/", 1)[1] | |
else: | |
fmt = Path(upload_file.filename).suffix.lstrip(".").lower() | |
return FORMAT_ALIASES.get(fmt, fmt) | |
def _split_audio_to_mp3_chunks(audio_bytes: bytes, input_format: str, minutes: int): | |
""" Converte (se serve) e splitta. Lascia che ffmpeg auto-rilevi il formato passando format=None: è più sicuro e ignora alias sbagliati. """ | |
try: | |
audio = AudioSegment.from_file(io.BytesIO(audio_bytes)) | |
except Exception: | |
audio = AudioSegment.from_file(io.BytesIO(audio_bytes), format=input_format) | |
chunk_len_ms = minutes * 60 * 1000 | |
for start_ms in range(0, len(audio), chunk_len_ms): | |
chunk = audio[start_ms : start_ms + chunk_len_ms] | |
buf = io.BytesIO() | |
chunk.export(buf, format="mp3") | |
yield buf.getvalue() | |
def _transcribe_chunk(chunk_bytes: bytes, | |
model: str, | |
language: str, | |
response_format: str = "json") -> str: | |
bio = io.BytesIO(chunk_bytes) | |
bio.name = "chunk.mp3" | |
resp = call_whisper_api( | |
bio, | |
model=model, | |
language=language, | |
response_format=response_format | |
) | |
if isinstance(resp, str): | |
return resp | |
if hasattr(resp, "text"): | |
return resp.text | |
return resp.get("text", "") | |
def get_whisper_client(): | |
api_key = random.choice(GROQ_API_KEYS) | |
return OpenAI(api_key=api_key, base_url=GROQ_BASE_URL) | |
def call_whisper_api(audio_file: io.BytesIO, | |
model: str = WHISPER_MODEL, | |
language: str = "it", | |
response_format: str = "json"): | |
try: | |
client = get_whisper_client() | |
return client.audio.transcriptions.create( | |
file=audio_file, | |
model=model, | |
language=language, | |
response_format=response_format | |
) | |
except Exception as e: | |
if "429" in str(e): | |
time.sleep(2) | |
return call_whisper_api(audio_file, model, language, response_format) | |
raise e | |
# ---------------------------------- Metodi API --------------------------------------- | |
def read_general(): | |
return {"response": "Benvenuto"} | |
async def health_check(): | |
return {"message": "success"} | |
async def chat_completions(req: ChatCompletionRequest): | |
try: | |
if not req.messages: | |
raise HTTPException(status_code=400, detail="Nessun messaggio fornito") | |
if not req.stream: | |
return call_api_sync(req) | |
else: | |
return StreamingResponse(_resp_async_generator(req), media_type="application/x-ndjson") | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def audio_transcriptions_endpoint( | |
file: UploadFile = File(...), | |
model: str = Form(WHISPER_MODEL), | |
language: str = Form("it"), | |
response_format: str = Form("text"), | |
segment_minutes: int = Form(SEGMENT_MINUTES)): | |
try: | |
raw_bytes = await file.read() | |
input_fmt = _detect_format(file) or "mp3" | |
chunks = list(_split_audio_to_mp3_chunks(raw_bytes, input_fmt, segment_minutes)) | |
if not chunks: | |
raise ValueError("Audio vuoto o formato non riconosciuto") | |
transcripts = [_transcribe_chunk(c, model, language, response_format) for c in chunks] | |
final_text = "\n\n".join(transcripts) | |
return { | |
"model": model, | |
"language": language, | |
"segments": len(transcripts), | |
"segment_minutes": segment_minutes, | |
"text": final_text, | |
} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) |