import torch from PIL import Image from io import BytesIO from realesrgan import RealESRGANer from typing import Dict, List, Any import os from pathlib import Path from basicsr.archs.rrdbnet_arch import RRDBNet import numpy as np import cv2 import PIL import torch import base64 class EndpointHandler: def __init__(self, path=""): self.model = RealESRGANer( scale=4, # model_path=f"/repository/weights/Real-ESRGAN-x4plus.pth", model_path="/workspace/real-esrgan/weights/Real-ESRGAN-x4plus.pth", # dni_weight=dni_weight, model= RRDBNet(num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=4 ), tile=0, tile_pad=10, # pre_pad=args.pre_pad, half=True, # gpu_id=args.gpu_id ) def __call__(self, data: Any) -> Dict[str, List[float]]: try: # get inputs inputs = data.pop("inputs", data) # get outscale outscale = float(inputs.pop("outscale", 3)) # decode base64 image to PIL image = Image.open(BytesIO(base64.b64decode(inputs['image']))) in_size, in_mode = image.size, image.mode # check image size and mode and return dict assert in_mode in ["RGB", "RGBA", "L"], f"Unsupported image mode: {in_mode}" assert in_size[0] * in_size[1] < 1400*1400, f"Image is too large: {in_size}: {in_size[0] * in_size[1]} is greater than {1400*1400}" assert outscale > 1 and outscale <=10, f"Outscale must be between 1 and 10: {outscale}" # debug print(f"image.size: {in_size}, image.mode: {in_mode}, outscale: {outscale}") # Convert RGB to BGR (PIL uses RGB, OpenCV expects BGR) opencv_image = np.array(image) if in_mode == "RGB": opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_RGB2BGR) elif in_mode == "RGBA": opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_RGBA2BGRA) elif in_mode == "L": opencv_image = cv2.cvtColor(opencv_image, cv2.COLOR_GRAY2RGB) else: raise ValueError(f"Unsupported image mode: {in_mode}") # enhance image output, _ = self.model.enhance(opencv_image, outscale=outscale) # debug print(f"output.shape: {output.shape}") # convert to RGB/RGBA format out_shape = output.shape if len(out_shape) == 3: if out_shape[2] == 3: output = cv2.cvtColor(output, cv2.COLOR_BGR2RGB) elif out_shape[2] == 4: output = cv2.cvtColor(output, cv2.COLOR_BGRA2RGBA) else: output = cv2.cvtColor(output, cv2.COLOR_GRAY2RGB) # convert to PIL image img_byte_arr = BytesIO() output = Image.fromarray(output) # save to BytesIO output.save(img_byte_arr, format='PNG') img_str = base64.b64encode(img_byte_arr.getvalue()) img_str = img_str.decode() return {"out_image": img_str, "error": None } # handle errors except AssertionError as e: print(f"AssertionError: {e}") return {"out_image": None, "error": str(e)} except KeyError as e: print(f"KeyError: {e}") return {"out_image": None, "error": f"Missing key: {e}"} except ValueError as e: print(f"ValueError: {e}") return {"out_image": None, "error": str(e)} except PIL.UnidentifiedImageError as e: print(f"PIL.UnidentifiedImageError: {e}") return {"out_image": None, "error": "Invalid image format"} except Exception as e: print(f"Exception: {e}") return {"out_image": None, "error": "An unexpected error occurred"}