|
from fastapi import FastAPI ,Request ,Form, UploadFile, File |
|
from fastapi.responses import HTMLResponse, FileResponse,StreamingResponse,JSONResponse |
|
import os |
|
import io |
|
from PIL import ImageOps,Image ,ImageFilter |
|
|
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
import ast |
|
from server import * |
|
import cv2 |
|
from typing import Optional |
|
import base64 |
|
from fastapi import FastAPI, Request |
|
from slowapi import Limiter |
|
from slowapi.util import get_remote_address |
|
from fastapi.responses import JSONResponse |
|
|
|
|
|
|
|
app = FastAPI() |
|
|
|
limiter = Limiter(key_func=get_remote_address) |
|
app.state.limiter = limiter |
|
|
|
@app.middleware("http") |
|
async def add_process_time_header(request: Request, call_next): |
|
response = await call_next(request) |
|
return response |
|
|
|
|
|
@app.exception_handler(429) |
|
async def rate_limit_exceeded(request: Request, exc): |
|
|
|
return JSONResponse( |
|
status_code=429, |
|
content={"detail": "Too Many Requests ,please just try 3 times per hour"}, |
|
) |
|
''' |
|
return JSONResponse( |
|
status_code=429, |
|
content={ |
|
"detail": "Too Many Requests ,please just try 3 times per hour", |
|
"status": 0 |
|
}) |
|
''' |
|
|
|
|
|
|
|
|
|
@app.get('/') |
|
def main(): |
|
return "Hello From Background remover !" |
|
|
|
''' |
|
#test limit |
|
@app.post("/items") |
|
@limiter.limit("3/hour") # must have parameter request |
|
async def read_items(request: Request, type_of_filters: str = Form(...)): |
|
return {"items": str(type_of_filters)} |
|
''' |
|
|
|
@app.post('/imageStep1') |
|
async def image_step1(request: Request,image_file: UploadFile = File(...),background_image: Optional [UploadFile] = File(None),type_of_filters: str = Form(...), blur_radius: str = Form(...)): |
|
|
|
|
|
input_to_type_of_filters = None |
|
|
|
if background_image and background_image.filename: |
|
contents__back = await background_image.read() |
|
image_back = Image.open(io.BytesIO(contents__back)) |
|
input_to_type_of_filters = image_back |
|
else: |
|
input_to_type_of_filters = type_of_filters |
|
|
|
contents_img = await image_file.read() |
|
image = Image.open(io.BytesIO(contents_img)) |
|
|
|
|
|
output_step1 =SegmenterBackground().Back_step1(image,input_to_type_of_filters,int(blur_radius)) |
|
|
|
if (output_step1[-1] == 0): |
|
return { |
|
"detail": output_step1[0], |
|
"status": output_step1[-1] |
|
} |
|
|
|
produced_image = output_step1[0] |
|
|
|
''' |
|
# Save the processed image to a temporary file |
|
#output_file_path_tmp = "/tmp/tmp_processed_image.png" |
|
#produced_image.save(output_file_path_tmp) |
|
# return FileResponse(output_file_path_tmp, media_type='image/png', filename="/tmp/tmp_processed_image.png") |
|
''' |
|
|
|
|
|
buffered = io.BytesIO() |
|
produced_image.save(buffered, format="PNG") |
|
encoded_img = base64.b64encode(buffered.getvalue()).decode("utf-8") |
|
|
|
|
|
return { |
|
"message": output_step1[1], |
|
"image_base64": encoded_img, |
|
"status": output_step1[-1] |
|
} |
|
|
|
|
|
@app.post('/imageStep2') |
|
async def image_step2(image_file: UploadFile = File(...),background_image: Optional [UploadFile] = File(None),type_of_filters: str = Form(...), |
|
things_replace: str = Form(...), blur_radius: str = Form(...)): |
|
|
|
things_replace=ast.literal_eval(things_replace) |
|
blur_radius=int(blur_radius) |
|
|
|
|
|
input_to_type_of_filters=None |
|
|
|
if background_image and background_image.filename: |
|
contents__back = await background_image.read() |
|
image_back = Image.open(io.BytesIO(contents__back)) |
|
input_to_type_of_filters = image_back |
|
else: |
|
input_to_type_of_filters = type_of_filters |
|
|
|
|
|
contents = await image_file.read() |
|
image = Image.open(io.BytesIO(contents)) |
|
|
|
|
|
produced_image=SegmenterBackground().Back_step2(image,input_to_type_of_filters,things_replace,int(blur_radius)) |
|
|
|
|
|
output_file_path_tmp = "/tmp/tmp_processed_image.png" |
|
produced_image.save(output_file_path_tmp) |
|
|
|
|
|
return FileResponse(output_file_path_tmp, media_type='image/png', filename="/tmp/tmp_processed_image.png") |
|
|
|
|
|
|
|
|
|
@app.post('/Video') |
|
async def Video(video_file: UploadFile = File(...),background_image: Optional [UploadFile] = File(None),kind_back: str = Form(...) |
|
,type_of_filters: str = Form(...),blur_radius: str = Form(...)): |
|
|
|
|
|
|
|
|
|
blur_radius=int(blur_radius) |
|
kind_back=ast.literal_eval(kind_back) |
|
|
|
input_to_type_of_filters=None |
|
|
|
if background_image and background_image.filename: |
|
contents__back = await background_image.read() |
|
image_back = Image.open(io.BytesIO(contents__back)) |
|
input_to_type_of_filters = image_back |
|
else: |
|
input_to_type_of_filters = type_of_filters |
|
|
|
|
|
input_path_toWrite = f'/tmp/tmp_imput.avi' |
|
output_path = '/tmp/tmp_output.avi' |
|
with open(input_path_toWrite, 'wb') as f: |
|
f.write(await video_file.read()) |
|
|
|
|
|
|
|
SegmenterBackground().Back_video(input_path_toWrite, output_path,input_to_type_of_filters,kind_back,blur_radius) |
|
|
|
return StreamingResponse(open(output_path, "rb"), media_type="video/avi") |
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8000) |