Backg / app.py
LapStore
cleaned
26e9f88
from fastapi import FastAPI ,Request ,Form, UploadFile, File
from fastapi.responses import HTMLResponse, FileResponse,StreamingResponse,JSONResponse
import os
import io
from PIL import ImageOps,Image ,ImageFilter
#from transformers import pipeline
import matplotlib.pyplot as plt
import numpy as np
import ast
from server import *
import cv2
from typing import Optional
import base64
from fastapi import FastAPI, Request
from slowapi import Limiter
from slowapi.util import get_remote_address
from fastapi.responses import JSONResponse
#http://localhost:8000
app = FastAPI()
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
# Register the Limiter with FastAPI
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
response = await call_next(request)
return response
@app.exception_handler(429)
async def rate_limit_exceeded(request: Request, exc):
return JSONResponse(
status_code=429,
content={"detail": "Too Many Requests ,please just try 3 times per hour"},
)
'''
return JSONResponse(
status_code=429,
content={
"detail": "Too Many Requests ,please just try 3 times per hour",
"status": 0
})
'''
# Root route
@app.get('/')
def main():
return "Hello From Background remover !"
'''
#test limit
@app.post("/items")
@limiter.limit("3/hour") # must have parameter request
async def read_items(request: Request, type_of_filters: str = Form(...)):
return {"items": str(type_of_filters)}
'''
@app.post('/imageStep1')
async def image_step1(request: Request,image_file: UploadFile = File(...),background_image: Optional [UploadFile] = File(None),type_of_filters: str = Form(...), blur_radius: str = Form(...)):
#return {"type ":type_of_filters ,"radius":blur_radius,"image":image_file,"back":background_image}
#type_of_filters : cam / ...remove / ...back
input_to_type_of_filters = None
if background_image and background_image.filename:
contents__back = await background_image.read()
image_back = Image.open(io.BytesIO(contents__back))
input_to_type_of_filters = image_back
else:
input_to_type_of_filters = type_of_filters
contents_img = await image_file.read()
image = Image.open(io.BytesIO(contents_img))
output_step1 =SegmenterBackground().Back_step1(image,input_to_type_of_filters,int(blur_radius))
if (output_step1[-1] == 0):
return {
"detail": output_step1[0],
"status": output_step1[-1]
}
produced_image = output_step1[0]
'''
# Save the processed image to a temporary file
#output_file_path_tmp = "/tmp/tmp_processed_image.png"
#produced_image.save(output_file_path_tmp)
# return FileResponse(output_file_path_tmp, media_type='image/png', filename="/tmp/tmp_processed_image.png")
'''
# Convert the image to base64 to return it in the response
buffered = io.BytesIO()
produced_image.save(buffered, format="PNG")
encoded_img = base64.b64encode(buffered.getvalue()).decode("utf-8")
# Returning both text and the base64 image
return {
"message": output_step1[1],
"image_base64": encoded_img,
"status": output_step1[-1]
}
@app.post('/imageStep2')
async def image_step2(image_file: UploadFile = File(...),background_image: Optional [UploadFile] = File(None),type_of_filters: str = Form(...),
things_replace: str = Form(...), blur_radius: str = Form(...)):
#things_replace : from what detected.
things_replace=ast.literal_eval(things_replace)
blur_radius=int(blur_radius)
input_to_type_of_filters=None
if background_image and background_image.filename:
contents__back = await background_image.read()
image_back = Image.open(io.BytesIO(contents__back))
input_to_type_of_filters = image_back
else:
input_to_type_of_filters = type_of_filters
contents = await image_file.read()
image = Image.open(io.BytesIO(contents))
produced_image=SegmenterBackground().Back_step2(image,input_to_type_of_filters,things_replace,int(blur_radius))
# Save the processed image to a temporary file
output_file_path_tmp = "/tmp/tmp_processed_image.png"
produced_image.save(output_file_path_tmp)
# Return the processed image for download
return FileResponse(output_file_path_tmp, media_type='image/png', filename="/tmp/tmp_processed_image.png")
@app.post('/Video')
async def Video(video_file: UploadFile = File(...),background_image: Optional [UploadFile] = File(None),kind_back: str = Form(...)
,type_of_filters: str = Form(...),blur_radius: str = Form(...)):#--->,background_image: UploadFile = File(...)):
#video_data = await video_file.read()
#nparr = np.frombuffer(video_data, np.uint8)
#video_path=cv2.imdecode(nparr, cv2.IMREAD_COLOR) #named this as just passed as it's path
blur_radius=int(blur_radius)
kind_back=ast.literal_eval(kind_back)
input_to_type_of_filters=None
if background_image and background_image.filename:
contents__back = await background_image.read()
image_back = Image.open(io.BytesIO(contents__back))
input_to_type_of_filters = image_back
else:
input_to_type_of_filters = type_of_filters
input_path_toWrite = f'/tmp/tmp_imput.avi'#{video_file.filename}
output_path = '/tmp/tmp_output.avi'
with open(input_path_toWrite, 'wb') as f:
f.write(await video_file.read())
#--------> mp4? (when tried ,worked on it although) sound??
SegmenterBackground().Back_video(input_path_toWrite, output_path,input_to_type_of_filters,kind_back,blur_radius)#video,background_image,what_remove,blur_radius=23)
return StreamingResponse(open(output_path, "rb"), media_type="video/avi")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)