NTO-TCP-HF / src /api /image_prep_api.py
ishworrsubedii's picture
fix: import issue
7956aa1
"""
project @ NTO-TCP-HF
created @ 2024-10-28
author @ github.com/ishworrsubedii
"""
import base64
import os
import time
from io import BytesIO
import cv2
import numpy as np
import replicate
import requests
from PIL import Image
from fastapi import APIRouter, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
from src.components.auto_crop import crop_transparent_image
from src.components.color_extraction import ColorExtractionRMBG
from src.components.title_des_gen import NecklaceProductListing
from src.utils.logger import logger
preprocessing_router = APIRouter()
rmbg: str = os.getenv("RMBG")
enhancer: str = os.getenv("ENHANCER")
prod_listing_api_key: str = os.getenv("PROD_LISTING_API_KEY")
color_extraction_rmbg = ColorExtractionRMBG()
product_listing_obj = NecklaceProductListing(prod_listing_api_key)
def replicate_bg(input):
output = replicate.run(
rmbg,
input=input
)
return output
def replicate_enhancer(input):
output = replicate.run(
enhancer,
input=input
)
return output
@preprocessing_router.post("/rem_bg")
async def remove_background(image: UploadFile = File(...)):
logger.info("-" * 50)
logger.info(">>> REMOVE BACKGROUND STARTED <<<")
start_time = time.time()
try:
image_bytes = await image.read()
image = Image.open(BytesIO(image_bytes)).convert("RGB")
logger.info(">>> IMAGE LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500, content={"error": f"Error reading image: {str(e)}", "code": 500})
try:
act_img_base_64 = BytesIO()
image.save(act_img_base_64, format="WEBP")
image_bytes_ = base64.b64encode(act_img_base_64.getvalue()).decode("utf-8")
image_data_uri = f"data:image/WEBP;base64,{image_bytes_}"
logger.info(">>> IMAGE ENCODING COMPLETED <<<")
except Exception as e:
logger.error(f">>> IMAGE ENCODING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error converting image to base64: {str(e)}", "code": 500})
try:
output = replicate_bg({"image": image_data_uri})
logger.info(">>> BACKGROUND REMOVAL COMPLETED <<<")
except Exception as e:
logger.error(f">>> BACKGROUND REMOVAL ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error running background removal: {str(e)}", "code": 500})
try:
response = requests.get(output)
base_64 = base64.b64encode(response.content).decode('utf-8')
base64_prefix = "data:image/WEBP;base64,"
total_inference_time = round((time.time() - start_time), 2)
response = {
"output": f"{base64_prefix}{base_64}",
"inference_time": total_inference_time,
"code": 200
}
logger.info(">>> RESPONSE PREPARATION COMPLETED <<<")
logger.info(f">>> TOTAL INFERENCE TIME: {total_inference_time}s <<<")
logger.info(">>> REQUEST COMPLETED SUCCESSFULLY <<<")
logger.info("-" * 50)
return JSONResponse(content=response, status_code=200)
except Exception as e:
logger.error(f">>> RESPONSE PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error processing response: {str(e)}", "code": 500})
@preprocessing_router.post("/upscale_image")
async def upscale_image(image: UploadFile = File(...), scale: int = 1):
logger.info("-" * 50)
logger.info(">>> IMAGE UPSCALING STARTED <<<")
start_time = time.time()
try:
image_bytes = await image.read()
image = Image.open(BytesIO(image_bytes)).convert("RGBA")
logger.info(">>> IMAGE LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500, content={"error": f"Error reading image: {str(e)}", "code": 500})
try:
act_img_base_64 = BytesIO()
image.save(act_img_base_64, format="PNG")
image_bytes_ = base64.b64encode(act_img_base_64.getvalue()).decode("utf-8")
image_data_uri = f"data:image/png;base64,{image_bytes_}"
logger.info(">>> IMAGE ENCODING COMPLETED <<<")
except Exception as e:
logger.error(f">>> IMAGE ENCODING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error converting image to base64: {str(e)}", "code": 500})
try:
input = {
"image": image_data_uri,
"scale": scale,
"face_enhance": False
}
output = replicate_enhancer(input)
logger.info(">>> IMAGE ENHANCEMENT COMPLETED <<<")
except Exception as e:
logger.error(f">>> IMAGE ENHANCEMENT ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error running image enhancement: {str(e)}", "code": 500})
try:
response = requests.get(output)
base_64 = base64.b64encode(response.content).decode('utf-8')
base64_prefix = image_data_uri.split(",")[0] + ","
total_inference_time = round((time.time() - start_time), 2)
response = {
"output": f"{base64_prefix}{base_64}",
"inference_time": total_inference_time,
"code": 200
}
logger.info(">>> RESPONSE PREPARATION COMPLETED <<<")
logger.info(f">>> TOTAL INFERENCE TIME: {total_inference_time}s <<<")
logger.info(">>> REQUEST COMPLETED SUCCESSFULLY <<<")
logger.info("-" * 50)
return JSONResponse(content=response, status_code=200)
except Exception as e:
logger.error(f">>> RESPONSE PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error processing response: {str(e)}", "code": 500})
@preprocessing_router.post("/crop_transparent")
async def crop_transparent(image: UploadFile):
logger.info("-" * 50)
logger.info(">>> CROP TRANSPARENT STARTED <<<")
start_time = time.time()
try:
if not image.content_type == "image/png":
logger.error(">>> INVALID FILE TYPE: NOT PNG <<<")
return JSONResponse(status_code=400,
content={"error": "Only PNG files are supported", "code": 400})
except Exception as e:
logger.error(f">>> FILE TYPE CHECK ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error checking file type: {str(e)}", "code": 500})
try:
contents = await image.read()
cropped_image_bytes, metadata = crop_transparent_image(contents)
logger.info(">>> IMAGE CROPPING COMPLETED <<<")
except Exception as e:
logger.error(f">>> IMAGE CROPPING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error cropping image: {str(e)}", "code": 500})
try:
base64_image = base64.b64encode(cropped_image_bytes).decode('utf-8')
base64_prefix = "data:image/png;base64,"
total_inference_time = round((time.time() - start_time), 2)
logger.info(">>> RESPONSE PREPARATION COMPLETED <<<")
logger.info(f">>> TOTAL INFERENCE TIME: {total_inference_time}s <<<")
logger.info(">>> REQUEST COMPLETED SUCCESSFULLY <<<")
logger.info("-" * 50)
return JSONResponse(content={
"status": "success",
"code": 200,
"data": {
"image": f"{base64_prefix}{base64_image}",
"metadata": metadata,
"inference_time": total_inference_time
}
}, status_code=200)
except Exception as e:
logger.error(f">>> RESPONSE PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error processing response: {str(e)}", "code": 500})
@preprocessing_router.post("/background_replace")
async def bg_replace(image: UploadFile = File(...), bg_image: UploadFile = File(...)):
logger.info("-" * 50)
logger.info(">>> BACKGROUND REPLACE STARTED <<<")
start_time = time.time()
try:
image_bytes = await image.read()
bg_bytes = await bg_image.read()
image = Image.open(BytesIO(image_bytes)).convert("RGBA")
bg_image = Image.open(BytesIO(bg_bytes)).convert("RGB")
logger.info(">>> IMAGES LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error reading images: {str(e)}", "code": 500})
try:
width, height = bg_image.size
background = Image.fromarray(np.array(bg_image)).resize((width, height))
orig_img = Image.fromarray(np.array(image)).resize((width, height))
background.paste(orig_img, (0, 0), mask=orig_img)
logger.info(">>> IMAGE PROCESSING COMPLETED <<<")
except Exception as e:
logger.error(f">>> IMAGE PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error processing images: {str(e)}", "code": 500})
try:
act_img_base_64 = BytesIO()
background.save(act_img_base_64, format="WEBP")
image_bytes_ = base64.b64encode(act_img_base_64.getvalue()).decode("utf-8")
image_data_uri = f"data:image/webp;base64,{image_bytes_}"
total_inference_time = round((time.time() - start_time), 2)
logger.info(">>> RESPONSE PREPARATION COMPLETED <<<")
logger.info(f">>> TOTAL INFERENCE TIME: {total_inference_time}s <<<")
logger.info(">>> REQUEST COMPLETED SUCCESSFULLY <<<")
logger.info("-" * 50)
return JSONResponse(content={
"output": image_data_uri,
"code": 200,
"inference_time": total_inference_time
}, status_code=200)
except Exception as e:
logger.error(f">>> RESPONSE PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error creating response: {str(e)}", "code": 500})
@preprocessing_router.post("/rem_bg_color_extraction")
async def remove_background_color_extraction(image: UploadFile = File(...),
hex_color: str = "#FFFFFF",
threshold: int = 30):
logger.info("-" * 50)
logger.info(">>> COLOR EXTRACTION STARTED <<<")
start_time = time.time()
try:
image_bytes = await image.read()
image = Image.open(BytesIO(image_bytes)).convert("RGBA")
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
logger.info(">>> IMAGE LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error reading image: {str(e)}", "code": 500})
try:
result = color_extraction_rmbg.extract_color(image, hex_color, threshold)
result = Image.fromarray(cv2.cvtColor(result, cv2.COLOR_RGB2BGRA)).convert("RGBA")
logger.info(">>> COLOR EXTRACTION COMPLETED <<<")
except Exception as e:
logger.error(f">>> COLOR EXTRACTION ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error extracting colors: {str(e)}", "code": 500})
try:
act_img_base_64 = BytesIO()
result.save(act_img_base_64, format="PNG")
image_bytes_ = base64.b64encode(act_img_base_64.getvalue()).decode("utf-8")
image_data_uri = f"data:image/png;base64,{image_bytes_}"
total_inference_time = round((time.time() - start_time), 2)
logger.info(">>> RESPONSE PREPARATION COMPLETED <<<")
logger.info(f">>> TOTAL INFERENCE TIME: {total_inference_time}s <<<")
logger.info(">>> REQUEST COMPLETED SUCCESSFULLY <<<")
logger.info("-" * 50)
return JSONResponse(content={
"output": image_data_uri,
"code": 200,
"inference_time": total_inference_time
}, status_code=200)
except Exception as e:
logger.error(f">>> RESPONSE PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error creating response: {str(e)}", "code": 500})
@preprocessing_router.post("/title_description_generator")
async def product_title_description_generator(image: UploadFile = File(...)):
logger.info("-" * 50)
logger.info(">>> TITLE DESCRIPTION GENERATION STARTED <<<")
start_time = time.time()
try:
image_bytes = await image.read()
image = Image.open(BytesIO(image_bytes)).convert("RGB")
logger.info(">>> IMAGE LOADED SUCCESSFULLY <<<")
except Exception as e:
logger.error(f">>> IMAGE LOADING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error reading image: {str(e)}", "code": 500})
try:
result = product_listing_obj.gen_title_desc(image=image)
title = result.split("Title:")[1].split("Description:")[0]
description = result.split("Description:")[1]
logger.info(">>> TITLE AND DESCRIPTION GENERATION COMPLETED <<<")
except Exception as e:
logger.error(">>> TITLE DESCRIPTION GENERATION ERROR <<<")
return JSONResponse(status_code=500,
content={"error": "Please make sure the image is clear and necklaces are visible",
"code": 500})
try:
total_inference_time = round((time.time() - start_time), 2)
logger.info(">>> RESPONSE PREPARATION COMPLETED <<<")
logger.info(f">>> TOTAL INFERENCE TIME: {total_inference_time}s <<<")
logger.info(">>> REQUEST COMPLETED SUCCESSFULLY <<<")
logger.info("-" * 50)
return JSONResponse(content={
"code": 200,
"title": title,
"description": description,
"inference_time": total_inference_time
}, status_code=200)
except Exception as e:
logger.error(f">>> RESPONSE PROCESSING ERROR: {str(e)} <<<")
return JSONResponse(status_code=500,
content={"error": f"Error creating response: {str(e)}", "code": 500})