import os from fastapi import FastAPI, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel from transformers import pipeline from torchvision import transforms from PIL import Image import requests from io import BytesIO from steps.preprocess import process_image from huggingface_hub import hf_hub_download from architecture.resnet import ResNet import torch import logging from typing import List import httpx import asyncio app = FastAPI() image_size = 256 hf_token = os.environ.get("api_read") VALID_API_KEYS = os.environ.get("api_key") @app.middleware("http") async def verify_api_key(request, call_next): api_key = request.headers.get("x-api-key") if api_key is None or api_key not in VALID_API_KEYS: raise HTTPException(status_code=403, detail="Unauthorized") response = await call_next(request) return response models_locations = [ # { # "repo_id": "TamisAI/category-lamp", # "subfolder": "maison-jansen/palmtree-152-0005-32-256", # "filename": "palmtree-jansen.pth", # }, { "repo_id": "TamisAI/category-lamp", "subfolder": "maison-charles/corail-152-0001-32-256-L1", "filename": "maison-charles-corail-L1.pth", }, { "repo_id": "TamisAI/category-lamp", "subfolder": "michel-armand/flamme-152-0001A-32-256-L1", "filename": "flamme-L1.pth", }, ] device = torch.device("cpu") # Modèle de données pour les requêtes class PredictRequest(BaseModel): imageUrl: str modelName: str torch.set_num_threads(8) # Dictionnaire pour stocker les pipelines de modèles model_pipelines = {} # Create a single instance of the ResNet model base_model = ResNet("resnet152", num_output_neurons=2).to(device) @app.on_event("startup") async def load_models(): # Charger les modèles au démarrage print(f"Loading models...{len(models_locations)}") for model_location in models_locations: try: print(f"Loading model: {model_location['filename']}") model_weight = hf_hub_download( repo_id=model_location["repo_id"], subfolder=model_location["subfolder"], filename=model_location["filename"], token=hf_token, ) model = base_model.__class__("resnet152", num_output_neurons=2).to(device) model.load_state_dict( torch.load(model_weight, weights_only=True, map_location=device) ) model.eval() model_pipelines[model_location["filename"]] = model except Exception as e: print(f"Error loading model {model_location['filename']}: {e}") print(f"Models loaded. {len(model_pipelines)}") @app.post("/predict") async def predict(request: PredictRequest): image_url = request.imageUrl model_name = request.modelName # Télécharger l'image depuis l'URL try: response = requests.get(image_url) image = Image.open(BytesIO(response.content)) except Exception as e: raise HTTPException(status_code=400, detail="Invalid image URL") # Vérifier si le modèle est chargé if model_name not in model_pipelines: raise HTTPException(status_code=404, detail="Model not found") # Preprocess the image processed_image = process_image(image, size=image_size) # Convert to tensor image_tensor = transforms.ToTensor()(processed_image).unsqueeze(0) model = model_pipelines[model_name] # Perform inference with torch.no_grad(): outputs = model(image_tensor) probabilities = torch.nn.functional.softmax(outputs, dim=1) predicted_probabilities = probabilities.numpy().tolist() confidence = round(predicted_probabilities[0][1], 2) logging.info("confidence: %s", confidence) # Return the probabilities as JSON return JSONResponse(content={"confidence": confidence}) class BatchPredictRequest(BaseModel): imageUrls: List[str] modelName: str # @app.post("/batch_predict") # async def batch_predict(request: BatchPredictRequest): # model_name = request.modelName # results = [] # # Verify if the model is loaded # if model_name not in model_pipelines: # raise HTTPException(status_code=404, detail="Model not found") # model = model_pipelines[model_name] # # Asynchronously process each image # async with httpx.AsyncClient() as client: # for image_url in request.imageUrls: # try: # response = await client.get(image_url) # image = Image.open(BytesIO(response.content)) # except Exception as e: # results.append({"imageUrl": image_url, "error": "Invalid image URL"}) # continue # # Preprocess the image # processed_image = process_image(image, size=image_size) # # Convert to tensor # image_tensor = transforms.ToTensor()(processed_image).unsqueeze(0) # # Perform inference # with torch.no_grad(): # outputs = model(image_tensor) # probabilities = torch.nn.functional.softmax(outputs, dim=1) # predicted_probabilities = probabilities.numpy().tolist() # confidence = round(predicted_probabilities[0][1], 2) # results.append({"imageUrl": image_url, "confidence": confidence}) # # Return the results as JSON # return JSONResponse(content={"results": results}) @app.post("/batch_predict") async def batch_predict(request: BatchPredictRequest): model_name = request.modelName # Verify if the model is loaded if model_name not in model_pipelines: raise HTTPException(status_code=404, detail="Model not found") model = model_pipelines[model_name] semaphore = asyncio.Semaphore( 8 ) # Limiter à 8 tâches simultanées pour éviter de surcharger la machine async def process_single_image(image_url): async with semaphore: try: async with httpx.AsyncClient() as client: response = await client.get(image_url) image = Image.open(BytesIO(response.content)) except Exception: return {"imageUrl": image_url, "error": "Invalid image URL"} # Preprocess the image processed_image = process_image(image, size=image_size) # Convert to tensor image_tensor = transforms.ToTensor()(processed_image).unsqueeze(0) # Perform inference with torch.no_grad(): outputs = model(image_tensor) probabilities = torch.nn.functional.softmax(outputs, dim=1) predicted_probabilities = probabilities.numpy().tolist() confidence = round(predicted_probabilities[0][1], 2) return {"imageUrl": image_url, "confidence": confidence} # Launch tasks in parallel tasks = [process_single_image(url) for url in request.imageUrls] results = await asyncio.gather(*tasks) # Return the results as JSON return JSONResponse(content={"results": results})