b3rian's picture
Upload folder using huggingface_hub
51e944e verified
from fastapi import FastAPI, File, UploadFile, HTTPException, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware import Middleware
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from starlette.responses import JSONResponse
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
from typing import List, Callable, Optional
from enum import Enum
import numpy as np
from PIL import Image
import os
import io
import time
import tensorflow as tf
import uvicorn
import logging
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
import datetime
# =================== Configuration ===================
class Settings(BaseSettings):
models_dir: str = "models"
allowed_origins: list[str] = ["*"]
app_name: str = "Image Classifier API"
app_version: str = "1.1.0"
log_level: str = "INFO"
enable_https_redirect: bool = False
class Config:
env_file = ".env"
settings = Settings()
# =================== Logging Setup ===================
logging.basicConfig(
level=settings.log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
# =================== Model Registry ===================
MODEL_DIR = os.getenv("MODEL_DIR", "models")
resnet_model = os.path.join(MODEL_DIR, "resnet50_imagenet.keras")
efficientnet_model = os.path.join(MODEL_DIR, "efficientnet.keras")
MODEL_REGISTRY = {
"efficientnet": {
"path": efficientnet_model,
"preprocess": tf.keras.applications.efficientnet_v2.preprocess_input,
"decode": tf.keras.applications.efficientnet_v2.decode_predictions,
"input_size": (480, 480)
},
"resnet": {
"path": resnet_model,
"preprocess": tf.keras.applications.resnet50.preprocess_input,
"decode": tf.keras.applications.resnet50.decode_predictions,
"input_size": (224, 224)
}
}
# =================== Custom Exceptions ===================
class ModelNotFoundError(Exception):
pass
class InvalidImageError(Exception):
pass
# =================== Model Loading ===================
@lru_cache(maxsize=None)
def load_model(model_path: str, input_size: tuple) -> tf.keras.Model:
try:
model = tf.keras.models.load_model(model_path)
# Warm up the model
dummy_input = np.zeros((1, *input_size, 3))
_ = model.predict(dummy_input)
logger.info(f"Successfully loaded model from {model_path}")
return model
except Exception as e:
logger.error(f"Failed to load model from {model_path}: {str(e)}")
raise RuntimeError(f"Failed to load model from {model_path}: {str(e)}")
# Initialize models with error handling
models = {}
for name, config in MODEL_REGISTRY.items():
try:
models[name] = load_model(config["path"], config["input_size"])
except Exception as e:
logger.error(f"Could not load model {name}: {str(e)}")
# =================== FastAPI Setup ===================
middleware = [
Middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins,
allow_methods=["*"],
allow_headers=["*"],
)
]
if settings.enable_https_redirect:
middleware.append(Middleware(HTTPSRedirectMiddleware))
app = FastAPI(
title=settings.app_name,
description="FastAPI backend for AI Image Classifier with multiple Keras models",
version=settings.app_version,
contact={
"name": "Your Name",
"email": "your.email@example.com",
},
license_info={
"name": "MIT",
},
openapi_tags=[{
"name": "predictions",
"description": "Operations with image predictions",
}],
middleware=middleware
)
# =================== Request Logging Middleware ===================
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = (time.time() - start_time) * 1000
logger.info(
f"Request: {request.method} {request.url} completed in {process_time:.2f}ms"
)
return response
# =================== Error Handlers ===================
@app.exception_handler(ModelNotFoundError)
async def model_not_found_handler(request, exc):
return JSONResponse(
status_code=404,
content={"message": str(exc)},
)
@app.exception_handler(InvalidImageError)
async def invalid_image_handler(request, exc):
return JSONResponse(
status_code=400,
content={"message": str(exc)},
)
# =================== Response Schemas ===================
class Prediction(BaseModel):
label: str
confidence: float = Field(..., ge=0.0, le=100.0)
class ApiResponse(BaseModel):
predictions: List[Prediction]
model_version: str
inference_time: float
timestamp: str
class HealthCheckResponse(BaseModel):
status: str
models_loaded: List[str]
timestamp: str
# =================== Model Name Enum ===================
class ModelName(str, Enum):
efficientnet = "efficientnet"
resnet = "resnet"
# =================== Image Preprocessing ===================
def preprocess_image(
image_bytes: bytes,
target_size: tuple,
preprocess_func: Callable[[np.ndarray], np.ndarray]
) -> np.ndarray:
try:
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
image = image.resize(target_size)
image_array = np.array(image).astype("float32")
image_array = preprocess_func(image_array)
return np.expand_dims(image_array, axis=0)
except Exception as e:
logger.error(f"Image preprocessing failed: {str(e)}")
raise InvalidImageError(f"Invalid image file: {str(e)}")
# =================== Async Prediction ===================
executor = ThreadPoolExecutor(max_workers=4)
async def async_predict(model: tf.keras.Model, input_tensor: np.ndarray):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, model.predict, input_tensor)
# =================== Inference Endpoint ===================
@app.post("/predict", response_model=ApiResponse, tags=["predictions"])
async def predict(
request: Request,
file: UploadFile = File(...),
model_name: ModelName = Query(..., description="Choose model for inference")
):
if model_name.value not in models:
logger.error(f"Model '{model_name}' not found in loaded models")
raise ModelNotFoundError(
f"Model '{model_name}' not available. Available options: {list(models.keys())}"
)
try:
model = models[model_name.value]
config = MODEL_REGISTRY[model_name.value]
contents = await file.read()
# Preprocess
input_tensor = preprocess_image(contents, config["input_size"], config["preprocess"])
# Inference
start = time.time()
predictions = await async_predict(model, input_tensor)
end = time.time()
# Decode predictions
decoded = config["decode"](predictions, top=3)[0]
results = [
{"label": label.replace("_", " "), "confidence": round(float(score * 100), 2)}
for (_, label, score) in decoded
]
return {
"predictions": results,
"model_version": model_name.value,
"inference_time": round(end - start, 4),
"timestamp": datetime.datetime.utcnow().isoformat()
}
except InvalidImageError as e:
raise
except Exception as e:
logger.error(f"Inference error: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Inference error: {str(e)}")
# =================== Health Check Endpoints ===================
@app.get("/", include_in_schema=False)
def root():
return {"message": "Image Classifier API is running."}
@app.get("/health", response_model=HealthCheckResponse, tags=["health"])
async def health_check():
return {
"status": "healthy",
"models_loaded": list(models.keys()),
"timestamp": datetime.datetime.utcnow().isoformat()
}