import gradio as gr import torch from viscy.translation.engine import VSUNet from huggingface_hub import hf_hub_download from numpy.typing import ArrayLike import numpy as np from skimage import exposure from skimage.transform import resize from skimage.util import invert import cmap class VSGradio: def __init__(self, model_config, model_ckpt_path): self.model_config = model_config self.model_ckpt_path = model_ckpt_path self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {self.device}") self.model = None self.load_model() def load_model(self): try: # Load the model checkpoint and move it to the correct device (GPU or CPU) print(f"Loading model from checkpoint: {self.model_ckpt_path}") self.model = VSUNet.load_from_checkpoint( self.model_ckpt_path, architecture="UNeXt2_2D", model_config=self.model_config, ) self.model.to(self.device) self.model.eval() print("Model loaded successfully and set to evaluation mode") except Exception as e: print(f"Error loading model: {e}") raise def normalize_fov(self, input: ArrayLike): "Normalizing the fov with zero mean and unit variance" mean = np.mean(input) std = np.std(input) return (input - mean) / std def preprocess_image_standard(self, input: ArrayLike): input = exposure.equalize_adapthist(input) return input def downscale_image(self, inp: ArrayLike, scale_factor: float): """Downscales the image by the given scaling factor""" height, width = inp.shape new_height = int(height * scale_factor) new_width = int(width * scale_factor) return resize(inp, (new_height, new_width), anti_aliasing=True) def predict(self, inp, scaling_factor: float): try: if inp is None: print("Error: Input image is None") return None, None # Normalize the input and convert to tensor inp = self.normalize_fov(inp) original_shape = inp.shape inp = apply_rescale_image(inp, scaling_factor) # Convert the input to a tensor inp = torch.from_numpy(np.array(inp).astype(np.float32)) test_dict = dict( index=None, source=inp.unsqueeze(0).unsqueeze(0).unsqueeze(0).to(self.device), ) with torch.inference_mode(): self.model.on_predict_start() # Necessary preprocessing for the model pred = ( self.model.predict_step(test_dict, 0, 0).cpu().numpy() ) # Move output back to CPU for post-processing # Post-process the model output and rescale intensity nuc_pred = pred[0, 0, 0] mem_pred = pred[0, 1, 0] # Resize predictions back to the original image size nuc_pred = resize(nuc_pred, original_shape, anti_aliasing=True) mem_pred = resize(mem_pred, original_shape, anti_aliasing=True) green_colormap = cmap.Colormap("green") magenta_colormap = cmap.Colormap("magenta") nuc_rgb = apply_colormap(nuc_pred, green_colormap) mem_rgb = apply_colormap(mem_pred, magenta_colormap) return nuc_rgb, mem_rgb except Exception as e: print(f"Error during prediction: {e}") empty_img = np.zeros((300, 300, 3), dtype=np.uint8) return empty_img, empty_img def apply_colormap(prediction, colormap: cmap.Colormap): """Apply a colormap to a single-channel prediction image.""" # Ensure the prediction is within the valid range [0, 1] prediction = exposure.rescale_intensity(prediction, out_range=(0, 1)) rgb_image = colormap(prediction) rgb_image_uint8 = (rgb_image * 255).astype(np.uint8) return rgb_image_uint8 def merge_images(nuc_rgb: ArrayLike, mem_rgb: ArrayLike) -> ArrayLike: """Merge nucleus and membrane images into a single RGB image.""" return np.maximum(nuc_rgb, mem_rgb) def apply_image_adjustments(image, invert_image: bool, gamma_factor: float): if invert_image: image = invert(image, signed_float=False) image = exposure.adjust_gamma(image, gamma_factor) return exposure.rescale_intensity(image, out_range=(0, 255)).astype(np.uint8) def apply_rescale_image(image, scaling_factor: float): scaling_factor = float(scaling_factor) return resize( image, (int(image.shape[0] * scaling_factor), int(image.shape[1] * scaling_factor)), anti_aliasing=True, ) def clear_outputs(image): return image, None, None def load_css(file_path): with open(file_path, "r") as file: return file.read() if __name__ == "__main__": try: print("Downloading model checkpoint...") model_ckpt_path = hf_hub_download( repo_id="compmicro-czb/VSCyto2D", filename="epoch=399-step=23200.ckpt" ) print(f"Model downloaded successfully to: {model_ckpt_path}") model_config = { "in_channels": 1, "out_channels": 2, "encoder_blocks": [3, 3, 9, 3], "dims": [96, 192, 384, 768], "decoder_conv_blocks": 2, "stem_kernel_size": [1, 2, 2], "in_stack_depth": 1, "pretraining": False, } print("Initializing VSGradio...") vsgradio = VSGradio(model_config, model_ckpt_path) print(f"VSGradio initialized successfully! Using device: {vsgradio.device}") # Initialize the Gradio app using Blocks with gr.Blocks(css=load_css("style.css")) as demo: # Title and description gr.HTML( """
""" ) gr.HTML( """Model: VSCyto2D
Input: label-free image (e.g., QPI or phase contrast).
Output: Virtual staining of nucleus and membrane.
Note: The model works well with QPI, and sometimes generalizes to phase contrast and DIC.
It was trained primarily on HEK293T, BJ5, and A549 cells imaged at 20x.
We continue to diagnose and improve generalization
Check out our preprint: Liu et al., Robust virtual staining of landmark organelles
For training your own model and analyzing large amounts of data, use our GitHub repository.