File size: 2,328 Bytes
8ce0841 0019848 c2d251e 0019848 9c47c14 c2d251e 0019848 9c47c14 c2d251e 0019848 c2d251e 0019848 9c47c14 c2d251e 0019848 c2d251e 0019848 c2d251e 0019848 c2d251e 0019848 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# api/routes.py
import json
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi.responses import StreamingResponse, JSONResponse
from api.auth import verify_app_secret
from api.models import ChatRequest, ImageResponseModel, ChatCompletionResponse
from api.utils import process_gizai_stream_response, process_gizai_non_stream_response, GizAI
from api.logger import setup_logger
logger = setup_logger(__name__)
router = APIRouter()
@router.options("/v1/chat/completions")
@router.options("/api/v1/chat/completions")
async def gizai_chat_completions_options():
return Response(
status_code=200,
headers={
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
},
)
@router.get("/v1/models")
@router.get("/api/v1/models")
async def list_gizai_models():
return {"object": "list", "data": GizAI.models}
@router.post("/v1/chat/completions")
@router.post("/api/v1/chat/completions")
async def gizai_chat_completions(
request: ChatRequest, app_secret: str = Depends(verify_app_secret)
):
logger.info("Entering GizAI chat_completions route")
logger.info(f"Processing chat completion request for model: {request.model}")
model = GizAI.get_model(request.model)
if model not in GizAI.models:
raise HTTPException(
status_code=400,
detail=f"Model {request.model} is not supported. Supported models are: {', '.join(GizAI.models)}",
)
if request.stream:
if GizAI.is_image_model(model):
raise HTTPException(status_code=400, detail="Image generation does not support streaming.")
logger.info("Streaming response")
return StreamingResponse(process_gizai_stream_response(request, model), media_type="text/event-stream")
else:
logger.info("Non-streaming response")
response = await process_gizai_non_stream_response(request, model)
return response
@router.route('/')
@router.route('/healthz')
@router.route('/ready')
@router.route('/alive')
@router.route('/status')
@router.get("/health")
def health_check(request: Request):
return Response(content=json.dumps({"status": "ok"}), media_type="application/json")
|