samgis-lisa-on-cuda / wrappers /fastapi_wrapper.py
alessandro trinca tornidor
[fix] handle multiple exceptions the right way
264ecd7
raw
history blame
11.2 kB
import json
import os
import pathlib
import uuid
from fastapi.templating import Jinja2Templates
import uvicorn
from fastapi import FastAPI, HTTPException, Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import ValidationError
from samgis_lisa_on_cuda import PROJECT_ROOT_FOLDER, WORKDIR
from samgis_lisa_on_cuda.utilities.type_hints import ApiRequestBody, StringPromptApiRequestBody
from samgis_core.utilities.fastapi_logger import setup_logging
app_logger = setup_logging(debug=True)
app = FastAPI()
@app.middleware("http")
async def request_middleware(request, call_next):
request_id = str(uuid.uuid4())
with app_logger.contextualize(request_id=request_id):
app_logger.info("Request started")
try:
response = await call_next(request)
except Exception as ex:
app_logger.error(f"Request failed: {ex}")
response = JSONResponse(content={"success": False}, status_code=500)
finally:
response.headers["X-Request-ID"] = request_id
app_logger.info("Request ended")
return response
@app.post("/post_test_dictlist")
def post_test_dictlist2(request_input: ApiRequestBody) -> JSONResponse:
from samgis_lisa_on_cuda.io.wrappers_helpers import get_parsed_bbox_points_with_dictlist_prompt
request_body = get_parsed_bbox_points_with_dictlist_prompt(request_input)
app_logger.info(f"request_body:{request_body}.")
return JSONResponse(
status_code=200,
content=request_body
)
@app.get("/health")
async def health() -> JSONResponse:
import importlib.metadata
from importlib.metadata import PackageNotFoundError
try:
core_version = importlib.metadata.version('samgis_core')
lisa_on_cuda_version = importlib.metadata.version('lisa-on-cuda')
samgis_lisa_on_cuda_version = importlib.metadata.version('samgis-lisa-on-cuda')
except PackageNotFoundError as pe:
app_logger.error(f"pe:{pe}.")
samgis_lisa_on_cuda_version = "0.0.0"
msg = "still alive, "
msg += f"""version:{samgis_lisa_on_cuda_version}, core version:{core_version},"""
msg += f"""lisa-on-cuda version:{lisa_on_cuda_version},"""
app_logger.info(msg)
return JSONResponse(status_code=200, content={"msg": "still alive..."})
@app.post("/post_test_string")
def post_test_string(request_input: StringPromptApiRequestBody) -> JSONResponse:
from lisa_on_cuda.utils import app_helpers
from samgis_lisa_on_cuda.io.wrappers_helpers import get_parsed_bbox_points_with_string_prompt
request_body = get_parsed_bbox_points_with_string_prompt(request_input)
app_logger.info(f"request_body:{request_body}.")
custom_args = app_helpers.parse_args([])
request_body["content"] = {**request_body, "precision": str(custom_args.precision)}
return JSONResponse(
status_code=200,
content=request_body
)
@app.post("/infer_lisa")
def infer_lisa(request_input: StringPromptApiRequestBody) -> JSONResponse:
from samgis_lisa_on_cuda.prediction_api import lisa
from samgis_lisa_on_cuda.io.wrappers_helpers import get_parsed_bbox_points_with_string_prompt, get_source_name
app_logger.info("starting lisa inference request...")
try:
import time
time_start_run = time.time()
body_request = get_parsed_bbox_points_with_string_prompt(request_input)
app_logger.info(f"lisa body_request:{body_request}.")
app_logger.info(f"lisa module:{lisa}.")
try:
source_name = get_source_name(request_input.source_type)
app_logger.info(f"source_name = {source_name}.")
output = lisa.lisa_predict(
bbox=body_request["bbox"], prompt=body_request["prompt"], zoom=body_request["zoom"],
source=body_request["source"], source_name=source_name
)
duration_run = time.time() - time_start_run
app_logger.info(f"duration_run:{duration_run}.")
body = {
"duration_run": duration_run,
"output": output
}
return JSONResponse(status_code=200, content={"body": json.dumps(body)})
except Exception as inference_exception:
import subprocess
project_root_folder_content = subprocess.run(
f"ls -l {PROJECT_ROOT_FOLDER}/", shell=True, universal_newlines=True, stdout=subprocess.PIPE
)
app_logger.error(f"project_root folder 'ls -l' command output: {project_root_folder_content.stdout}.")
workdir_folder_content = subprocess.run(
f"ls -l {WORKDIR}/", shell=True, universal_newlines=True, stdout=subprocess.PIPE
)
app_logger.error(f"workdir folder 'ls -l' command output: {workdir_folder_content.stdout}.")
app_logger.error(f"inference error:{inference_exception}.")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error on inference")
except ValidationError as va1:
app_logger.error(f"validation error: {str(va1)}.")
raise ValidationError("Unprocessable Entity")
@app.post("/infer_samgis")
def infer_samgis(request_input: ApiRequestBody) -> JSONResponse:
from samgis_lisa_on_cuda.prediction_api import predictors
from samgis_lisa_on_cuda.io.wrappers_helpers import get_parsed_bbox_points_with_dictlist_prompt, get_source_name
app_logger.info("starting plain samgis inference request...")
try:
import time
time_start_run = time.time()
body_request = get_parsed_bbox_points_with_dictlist_prompt(request_input)
app_logger.info(f"body_request:{body_request}.")
try:
source_name = get_source_name(request_input.source_type)
app_logger.info(f"source_name = {source_name}.")
output = predictors.samexporter_predict(
bbox=body_request["bbox"], prompt=body_request["prompt"], zoom=body_request["zoom"],
source=body_request["source"], source_name=source_name
)
duration_run = time.time() - time_start_run
app_logger.info(f"duration_run:{duration_run}.")
body = {
"duration_run": duration_run,
"output": output
}
return JSONResponse(status_code=200, content={"body": json.dumps(body)})
except Exception as inference_exception:
import subprocess
project_root_folder_content = subprocess.run(
f"ls -l {PROJECT_ROOT_FOLDER}/", shell=True, universal_newlines=True, stdout=subprocess.PIPE
)
app_logger.error(f"project_root folder 'ls -l' command output: {project_root_folder_content.stdout}.")
workdir_folder_content = subprocess.run(
f"ls -l {WORKDIR}/", shell=True, universal_newlines=True, stdout=subprocess.PIPE
)
app_logger.error(f"workdir folder 'ls -l' command output: {workdir_folder_content.stdout}.")
app_logger.error(f"inference error:{inference_exception}.")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Internal server error on inference")
except ValidationError as va1:
app_logger.error(f"validation error: {str(va1)}.")
raise ValidationError("Unprocessable Entity")
@app.exception_handler(RequestValidationError)
async def request_validation_exception_handler(request: Request, exc: RequestValidationError) -> JSONResponse:
app_logger.error(f"exception errors: {exc.errors()}.")
app_logger.error(f"exception body: {exc.body}.")
headers = request.headers.items()
app_logger.error(f'request header: {dict(headers)}.')
params = request.query_params.items()
app_logger.error(f'request query params: {dict(params)}.')
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content={"msg": "Error - Unprocessable Entity"}
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException) -> JSONResponse:
app_logger.error(f"exception: {str(exc)}.")
headers = request.headers.items()
app_logger.error(f'request header: {dict(headers)}.')
params = request.query_params.items()
app_logger.error(f'request query params: {dict(params)}.')
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"msg": "Error - Internal Server Error"}
)
write_tmp_on_disk = os.getenv("WRITE_TMP_ON_DISK", "")
app_logger.info(f"write_tmp_on_disk:{write_tmp_on_disk}.")
if bool(write_tmp_on_disk):
try:
path_write_tmp_on_disk = pathlib.Path(write_tmp_on_disk)
try:
pathlib.Path.unlink(path_write_tmp_on_disk, missing_ok=True)
except (IsADirectoryError, PermissionError, OSError) as err:
app_logger.error(f"{err} while removing old write_tmp_on_disk:{write_tmp_on_disk}.")
app_logger.error(f"is file?{path_write_tmp_on_disk.is_file()}.")
app_logger.error(f"is symlink?{path_write_tmp_on_disk.is_symlink()}.")
app_logger.error(f"is folder?{path_write_tmp_on_disk.is_dir()}.")
os.makedirs(write_tmp_on_disk, exist_ok=True)
app.mount("/vis_output", StaticFiles(directory=write_tmp_on_disk), name="vis_output")
except RuntimeError as rerr:
app_logger.error(f"{rerr} while loading the folder write_tmp_on_disk:{write_tmp_on_disk}...")
raise rerr
templates = Jinja2Templates(directory=WORKDIR / "static")
@app.get("/vis_output", response_class=HTMLResponse)
def list_files(request: Request):
files = os.listdir(write_tmp_on_disk)
files_paths = sorted([f"{request.url._url}/{f}" for f in files])
print(files_paths)
return templates.TemplateResponse(
"list_files.html", {"request": request, "files": files_paths}
)
# important: the index() function and the app.mount MUST be at the end
# samgis.html
app.mount("/samgis", StaticFiles(directory=WORKDIR / "static" / "dist", html=True), name="samgis")
@app.get("/samgis")
async def samgis() -> FileResponse:
return FileResponse(path=WORKDIR / "static" / "dist" / "samgis.html", media_type="text/html")
# lisa.html
app.mount("/lisa", StaticFiles(directory=WORKDIR / "static" / "dist", html=True), name="lisa")
@app.get("/lisa")
async def lisa() -> FileResponse:
return FileResponse(path=WORKDIR / "static" / "dist" / "lisa.html", media_type="text/html")
# index.html (lisa.html copy)
app.mount("/", StaticFiles(directory=WORKDIR / "static" / "dist", html=True), name="index")
@app.get("/")
async def index() -> FileResponse:
return FileResponse(path=WORKDIR / "static" / "dist" / "index.html", media_type="text/html")
if __name__ == '__main__':
try:
uvicorn.run(host="0.0.0.0", port=7860, app=app)
except Exception as e:
app_logger.error("e:", e)
raise e