Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| from customs.utils import rgb2rggb, rggb2rgb, CV72fillCurve, rggb2rgb_np | |
| import torch, os | |
| import numpy as np | |
| from openvino.inference_engine import IECore | |
| from cryptography.fernet import Fernet | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| CONFIG = { | |
| "noise_levels": [4, 6, 8, 10, 12], | |
| "raw_images": [ | |
| "data/RAW/noisy/4Card_Gain160_E30.npy", | |
| "data/RAW/noisy/4Card_Gain180_E30.npy", | |
| "data/RAW/noisy/4Card_Gain200_E30.npy", | |
| "data/RAW/noisy/4Card_Gain220_E30.npy", | |
| ], | |
| "weights": [ | |
| "customs/weights/model_ir_0.xml.encrypted", | |
| "customs/weights/model_ir_1.xml.encrypted", | |
| "customs/weights/model_ir_2.xml.encrypted", | |
| "customs/weights/model_ir_3.xml.encrypted", | |
| ], | |
| "SIDD_model_weights": "customs/weights/model_ir_SIDD.xml.encrypted", | |
| } | |
| def main(): | |
| with gr.Blocks() as demo: | |
| create_text("Raw Image Denoiser", size=10) | |
| create_text( | |
| "Data Detail : Collect images of imx678 image sensor and analyze the noise composition and distribution", | |
| size=5, | |
| ) | |
| create_text("Model Detail : ", size=5) | |
| create_text( | |
| "Synthesis Data : We have the technology to analyze and apply noise", size=3 | |
| ) | |
| create_text( | |
| "Our Denoiser : Our model architecture is trained on synthesize noise with SD images", | |
| size=3, | |
| ) | |
| create_text( | |
| "SIDD Denoiser : Our model architecture is trained on SIDD dataset", size=3 | |
| ) | |
| create_text( | |
| "Community : Any questions please contact us (tim.liu@liteon.com)", size=3 | |
| ) | |
| # Add picture here | |
| with gr.Row(): | |
| with gr.Column(scale=1): # Empty column to create space on the left | |
| pass | |
| with gr.Column(scale=2): # Column containing the image | |
| gr.Image(label="SNR", value="customs/SNR.png", height=260) | |
| with gr.Column(scale=1): # Empty column to create space on the right | |
| pass | |
| with gr.Tab("Synthesis"): | |
| with gr.Column(): | |
| with gr.Row(): | |
| image1 = gr.Image(label="Your Input Image") | |
| with gr.Column(): | |
| noise_level1 = create_slider("noise level") | |
| denoise_level1 = create_slider("denoise level") | |
| use_synthesis = gr.Checkbox(label="Use synthesis", value=True) | |
| image_button1 = gr.Button("Inference") | |
| # create_text("SIDD Denoiser : Our model architecture is trained to SIDD dataset") | |
| image_input1 = [ | |
| image1, | |
| noise_level1, | |
| denoise_level1, | |
| use_synthesis, | |
| ] | |
| with gr.Row(): | |
| SynthesisNoise1 = gr.Image(label="Synthesis noise") | |
| OurDenoise1 = gr.Image(label="Our denoiser result") | |
| with gr.Row(): | |
| SIDDDenoise1 = gr.Image(label="SIDD denoiser result") | |
| examples1 = gr.Examples( | |
| examples=[ | |
| ["data/RGB/4Card.png"], | |
| ["data/RGB/Color.png"], | |
| ["data/RGB/Focus.png"], | |
| ], | |
| inputs=image_input1, | |
| ) | |
| image_output1 = [SynthesisNoise1, OurDenoise1, SIDDDenoise1] | |
| with gr.Tab("Real"): | |
| with gr.Column(): | |
| with gr.Row(): | |
| with gr.Column(): | |
| noise_level2 = create_slider("noise level") | |
| denoise_level2 = create_slider("denoise level") | |
| image_button2 = gr.Button("Inference") | |
| image_input2 = [noise_level2, denoise_level2] | |
| RealRow = gr.Image(label="Real noise") | |
| with gr.Row(): | |
| OurDenoise2 = gr.Image(label="Our denoiser result") | |
| SIDDDenoise2 = gr.Image(label="SIDD denoiser result") | |
| image_output2 = [RealRow, OurDenoise2, SIDDDenoise2] | |
| image_button1.click( | |
| denoise_synthesis, inputs=image_input1, outputs=image_output1 | |
| ) | |
| image_button2.click(denoise_real, inputs=image_input2, outputs=image_output2) | |
| demo.launch() | |
| def decrypt_model(encrypted_file_path, decrypted_file_path): | |
| """ | |
| 解密模型文件 | |
| """ | |
| # if key file not exist, get env key | |
| if os.path.exists("IRModelKey.txt"): | |
| with open("IRModelKey.txt", "rb") as file: | |
| key = file.read() | |
| else: | |
| # get env key | |
| key = os.getenv("IRModelKey") | |
| cipher_suite = Fernet(key) | |
| with open(encrypted_file_path, "rb") as file: | |
| encrypted_data = file.read() | |
| decrypted_data = cipher_suite.decrypt(encrypted_data) | |
| with open(decrypted_file_path, "wb") as file: | |
| file.write(decrypted_data) | |
| class IEModel: | |
| """Class for inference of models in the Inference Engine format""" | |
| def __init__(self, exec_net, inputs_info, input_key, output_key, switch_rb=True): | |
| self.net = exec_net | |
| self.inputs_info = inputs_info | |
| self.input_key = input_key | |
| self.output_key = output_key | |
| self.reqs_ids = [] | |
| self.switch_rb = switch_rb | |
| def _preprocess(self, img): | |
| _, _, h, w = self.get_input_shape() | |
| img = np.expand_dims(img.transpose(2, 0, 1), axis=0) | |
| return img | |
| def forward(self, img): | |
| """Performs forward pass of the wrapped IE model""" | |
| res = self.net.infer(inputs={self.input_key: self._preprocess(img)}) | |
| return np.copy(res[self.output_key]) | |
| def forward_async(self, img): | |
| id = len(self.reqs_ids) | |
| self.net.start_async( | |
| request_id=id, inputs={self.input_key: self._preprocess(img)} | |
| ) | |
| self.reqs_ids.append(id) | |
| def grab_all_async(self): | |
| outputs = [] | |
| for id in self.reqs_ids: | |
| self.net.requests[id].wait(-1) | |
| res = self.net.requests[id].output_blobs[self.output_key].buffer | |
| outputs.append(np.copy(res)) | |
| self.reqs_ids = [] | |
| return outputs | |
| def get_input_shape(self): | |
| """Returns an input shape of the wrapped IE model""" | |
| return self.inputs_info[self.input_key].input_data.shape | |
| def load_ie_model( | |
| model_xml, device, plugin_dir, cpu_extension="", num_reqs=1, **kwargs | |
| ): | |
| """Loads a model in the Inference Engine format""" | |
| if cpu_extension and "CPU" in device: | |
| IECore().add_extension(cpu_extension, "CPU") | |
| # Read IR | |
| net = IECore().read_network(model_xml, os.path.splitext(model_xml)[0] + ".bin") | |
| assert ( | |
| len(net.input_info) == 1 or len(net.input_info) == 2 | |
| ), "Supports topologies with only 1 or 2 inputs" | |
| assert ( | |
| len(net.outputs) == 1 or len(net.outputs) == 4 or len(net.outputs) == 5 | |
| ), "Supports topologies with only 1, 4 or 5 outputs" | |
| input_blob = next(iter(net.input_info)) | |
| out_blob = next(iter(net.outputs)) | |
| net.batch_size = 1 | |
| # Loading model to the plugin | |
| exec_net = IECore().load_network( | |
| network=net, device_name=device, num_requests=num_reqs | |
| ) | |
| model = IEModel(exec_net, net.input_info, input_blob, out_blob, **kwargs) | |
| return model | |
| decrypt_model( | |
| "customs/weights/model_ir_0.xml.encrypted", | |
| "customs/weights/model_ir_0_decrypted.xml", | |
| ) | |
| decrypt_model( | |
| "customs/weights/model_ir_0.bin.encrypted", | |
| "customs/weights/model_ir_0_decrypted.bin", | |
| ) | |
| decrypt_model( | |
| "customs/weights/model_ir_1.xml.encrypted", | |
| "customs/weights/model_ir_1_decrypted.xml", | |
| ) | |
| decrypt_model( | |
| "customs/weights/model_ir_1.bin.encrypted", | |
| "customs/weights/model_ir_1_decrypted.bin", | |
| ) | |
| decrypt_model( | |
| "customs/weights/model_ir_2.xml.encrypted", | |
| "customs/weights/model_ir_2_decrypted.xml", | |
| ) | |
| decrypt_model( | |
| "customs/weights/model_ir_2.bin.encrypted", | |
| "customs/weights/model_ir_2_decrypted.bin", | |
| ) | |
| decrypt_model( | |
| "customs/weights/model_ir_3.xml.encrypted", | |
| "customs/weights/model_ir_3_decrypted.xml", | |
| ) | |
| decrypt_model( | |
| "customs/weights/model_ir_3.bin.encrypted", | |
| "customs/weights/model_ir_3_decrypted.bin", | |
| ) | |
| decrypt_model( | |
| "customs/weights/model_ir_SIDD.xml.encrypted", | |
| "customs/weights/model_ir_SIDD_decrypted.xml", | |
| ) | |
| decrypt_model( | |
| "customs/weights/model_ir_SIDD.bin.encrypted", | |
| "customs/weights/model_ir_SIDD_decrypted.bin", | |
| ) | |
| denoiseModelList = [ | |
| load_ie_model(weight.split(".")[0] + "_decrypted.xml", "CPU", None, "") | |
| for weight in CONFIG["weights"] | |
| ] | |
| SIDD_model = load_ie_model( | |
| CONFIG["SIDD_model_weights"].split(".")[0] + "_decrypted.xml", "CPU", None, "" | |
| ) | |
| def denoise_synthesis(image, noise_level=1, denoise_level=1, use_synthesis=True): | |
| # # Assuming image is a numpy array | |
| # rgb = np.transpose(image, (2, 0, 1))[np.newaxis, :] | |
| # # rgb is not 1080 x 1920, resize it , test in 360 x 640 | |
| # rgb = cv2.resize(rgb[0].transpose(1,2,0), (1920, 1080)).transpose(2,0,1)[np.newaxis, :] | |
| # rggb = rgb2rggb_np(np.transpose(rgb.squeeze(0), (1, 2, 0))) / 255 # Normalize to [0, 1] | |
| # if use_synthesis: | |
| # noiseImage = CV72fillCurve_np(rggb, CONFIG["noise_levels"][noise_level-1], CONFIG["noise_levels"][noise_level-1]+1) | |
| # rgb = rggb2rgb_np(noiseImage) | |
| # rgb = np.clip(rgb, 0, 1) # In-place clipping | |
| # torch function speed more than numpy | |
| rgb = torch.tensor(image).permute(2, 0, 1).unsqueeze(0) | |
| # rgb is not 1080 x 1920, resize it , test in 360 x 640 | |
| rgb = torch.nn.functional.interpolate( | |
| rgb, size=(1080, 1920), mode="bilinear", align_corners=False | |
| ) | |
| rggb = rgb2rggb(rgb.squeeze(0).permute(1, 2, 0)) / 255 # Normalize to [0, 1] | |
| if use_synthesis: | |
| rggb = CV72fillCurve( | |
| rggb, | |
| CONFIG["noise_levels"][noise_level - 1], | |
| CONFIG["noise_levels"][noise_level - 1] + 1, | |
| ) | |
| rgb = rggb2rgb(rggb) | |
| rgb = rgb.clamp_(0, 1).cpu().numpy() # In-place clipping | |
| noiseImage = rggb.numpy() | |
| output = denoiseModelList[denoise_level - 1].forward(noiseImage) | |
| SIDDOutput = SIDD_model.forward(noiseImage) | |
| return ( | |
| rgb, | |
| RGGB2RGBNumpy(output.squeeze().transpose(1, 2, 0)), | |
| RGGB2RGBNumpy(SIDDOutput.squeeze().transpose(1, 2, 0)), | |
| ) | |
| def denoise_real(noise_level=1, denoise_level=1): | |
| noiseImage = ( | |
| np.load(CONFIG["raw_images"][noise_level - 1]).astype(np.float32) / 65535.0 | |
| ) | |
| # noiseImage = torch.from_numpy(noiseImage).permute(2, 0, 1).to(device).unsqueeze(0) | |
| output = denoiseModelList[denoise_level - 1].forward(noiseImage) | |
| SIDDOutput = SIDD_model.forward(noiseImage) | |
| return ( | |
| RGGB2RGBNumpy(noiseImage), | |
| RGGB2RGBNumpy(output.squeeze().transpose(1, 2, 0)), | |
| RGGB2RGBNumpy(SIDDOutput.squeeze().transpose(1, 2, 0)), | |
| ) | |
| def create_slider(label): | |
| return gr.Slider( | |
| minimum=1, maximum=4, value=1, step=1, interactive=True, label=label | |
| ) | |
| def create_text(text, size=3, color="black"): | |
| gr.Markdown( | |
| "<font size=" + str(size) + " color=" + str(color) + ">" + str(text) + "</font>" | |
| ) | |
| def RGGB2RGBNumpy(numpyInput): | |
| # Assuming rggb2rgb is a function that can handle numpy arrays | |
| output = rggb2rgb_np(numpyInput) | |
| # In-place clipping | |
| output = np.clip(output, 0, 1) | |
| return output | |
| if __name__ == "__main__": | |
| main() | |