import os from typing import Callable from clu import checkpoint from flax import linen as nn import gradio as gr from huggingface_hub import snapshot_download import jax import jax.numpy as jnp import numpy as np from PIL import Image from invariant_slot_attention.configs.clevr_with_masks.equiv_transl_scale_v2 import get_config from invariant_slot_attention.lib import utils def load_model(config, checkpoint_dir): rng = jax.random.PRNGKey(42) # Initialize model model = utils.build_model_from_config(config.model) def init_model(rng): rng, init_rng, model_rng, dropout_rng = jax.random.split(rng, num=4) init_conditioning = None init_inputs = jnp.ones([1, 1, 128, 128, 3], jnp.float32) initial_vars = model.init( {"params": model_rng, "state_init": init_rng, "dropout": dropout_rng}, video=init_inputs, conditioning=init_conditioning, padding_mask=jnp.ones(init_inputs.shape[:-1], jnp.int32)) # Split into state variables (e.g. for batchnorm stats) and model params. # Note that `pop()` on a FrozenDict performs a deep copy. state_vars, initial_params = initial_vars.pop("params") # pytype: disable=attribute-error # Filter out intermediates (we don't want to store these in the TrainState). state_vars = utils.filter_key_from_frozen_dict( state_vars, key="intermediates") return state_vars, initial_params state_vars, initial_params = init_model(rng) opt_state = None state = utils.TrainState( step=1, opt_state=opt_state, params=initial_params, rng=rng, variables=state_vars) ckpt = checkpoint.Checkpoint(checkpoint_dir) state = ckpt.restore(state, checkpoint=checkpoint_dir + "/ckpt-0") return model, state, rng def load_image(name): img = Image.open(f"images/{name}.png") img = img.crop((64, 29, 64 + 192, 29 + 192)) img = img.resize((128, 128)) img = np.array(img)[:, :, :3] / 255. img = jnp.array(img, dtype=jnp.float32) return img download_path = snapshot_download(repo_id="ondrejbiza/isa", allow_patterns="clevr_isa_ts_v2*") checkpoint_dir = os.path.join(download_path, "clevr_isa_ts_v2") model, state, rng = load_model(get_config(), checkpoint_dir) rng, init_rng = jax.random.split(rng, num=2) class DecoderWrapper(nn.Module): decoder: Callable[[], nn.Module] @nn.compact def __call__(self, slots, train=False): return self.decoder()(slots, train) decoder_model = DecoderWrapper(decoder=model.decoder) with gr.Blocks() as demo: local_slots = gr.State(np.zeros((11, 64), dtype=np.float32)) local_orig_pos = gr.State(np.zeros((11, 2), dtype=np.float32)) local_orig_scale = gr.State(np.zeros((11, 2), dtype=np.float32)) local_pos = gr.State(np.zeros((11, 2), dtype=np.float32)) local_scale = gr.State(np.ones((11, 2), dtype=np.float32)) local_probs = gr.State(np.zeros((11, 128, 128), dtype=np.float32)) with gr.Row(): gr_choose_image = gr.Dropdown( [f"img{i}" for i in range(1, 9)], label="CLEVR Image", info="Start by a picking an image from the CLEVR dataset." ) with gr.Row(): with gr.Column(): with gr.Row(): with gr.Column(): gr_image_1 = gr.Image(type="numpy", shape=(112, 112), source="canvas", label="Decoding") with gr.Column(): gr_image_2 = gr.Image(type="numpy", shape=(112, 112), source="canvas", label="Segmentation") with gr.Column(): gr_slot_slider = gr.Slider(1, 11, value=1, step=1, label="Slot Index", info="Change slot index too see the segmentation mask, position and scale of each slot.") gr_y_slider = gr.Slider(-1, 1, value=0, step=0.01, label="X") gr_x_slider = gr.Slider(-1, 1, value=0, step=0.01, label="Y") gr_sy_slider = gr.Slider(0.5, 1.5, value=1., step=0.1, label="Width Multiplier") gr_sx_slider = gr.Slider(0.5, 1.5, value=1., step=0.1, label="Height Multiplier") with gr.Row(): with gr.Column(): gr_button_render = gr.Button("Render", variant="primary", info="Render a new image with altered positions and scales.") with gr.Column(): gr_button_reset = gr.Button("Reset", info="Reset slot statistics.") def update_image_and_segmentation(name, idx): idx = idx - 1 img_input = load_image(name) out = model.apply( {"params": state.params, **state.variables}, video=img_input[None, None], rngs={"state_init": init_rng}, train=False) probs = np.array(nn.softmax(out["outputs"]["segmentation_logits"][0, 0, :, :, :, 0], axis=0)) img = np.array(out["outputs"]["video"][0, 0]) img = np.clip(img, 0, 1) slots_ = np.array(out["states"]) slots = slots_[0, 0, :, :-4] pos = slots_[0, 0, :, -4: -2] scale = slots_[0, 0, :, -2:] return (img * 255).astype(np.uint8), (probs[idx] * 255).astype(np.uint8), float(pos[idx, 0]), \ float(pos[idx, 1]), probs, slots, pos, np.ones((11, 2), dtype=np.float32), pos, scale gr_choose_image.change( fn=update_image_and_segmentation, inputs=[gr_choose_image, gr_slot_slider], outputs=[gr_image_1, gr_image_2, gr_x_slider, gr_y_slider, local_probs, local_slots, local_pos, local_scale, local_orig_pos, local_orig_scale] ) def update_sliders(idx, local_probs, local_pos, local_scale): idx = idx - 1 # 1-indexing to 0-indexing return (local_probs[idx] * 255).astype(np.uint8), float(local_pos[idx, 0]), \ float(local_pos[idx, 1]), float(local_scale[idx, 0]), float(local_scale[idx, 1]) gr_slot_slider.release( fn=update_sliders, inputs=[gr_slot_slider, local_probs, local_pos, local_scale], outputs=[gr_image_2, gr_x_slider, gr_y_slider, gr_sx_slider, gr_sy_slider] ) def update_pos_x(idx, val, local_pos): local_pos[idx - 1, 0] = val return local_pos def update_pos_y(idx, val, local_pos): local_pos[idx - 1, 1] = val return local_pos def update_scale_x(idx, val, local_scale): local_scale[idx - 1, 0] = val return local_scale def update_scale_y(idx, val, local_scale): local_scale[idx - 1, 1] = val return local_scale gr_x_slider.release( fn=update_pos_x, inputs=[gr_slot_slider, gr_x_slider, local_pos], outputs=local_pos ) gr_y_slider.release( fn=update_pos_y, inputs=[gr_slot_slider, gr_y_slider, local_pos], outputs=local_pos ) gr_sx_slider.release( fn=update_scale_x, inputs=[gr_slot_slider, gr_sx_slider, local_scale], outputs=local_scale ) gr_sy_slider.release( fn=update_scale_y, inputs=[gr_slot_slider, gr_sy_slider, local_scale], outputs=local_scale ) def render(idx, local_slots, local_pos, local_scale, local_orig_scale): idx = idx - 1 slots = np.concatenate([local_slots, local_pos, local_scale * local_orig_scale], axis=-1) slots = jnp.array(slots) out = decoder_model.apply( {"params": state.params, **state.variables}, slots=slots[None, None], train=False ) probs = np.array(nn.softmax(out["segmentation_logits"][0, 0, :, :, :, 0], axis=0)) image = np.array(out["video"][0, 0]) image = np.clip(image, 0, 1) return (image * 255).astype(np.uint8), (probs[idx] * 255).astype(np.uint8), probs gr_button_render.click( fn=render, inputs=[gr_slot_slider, local_slots, local_pos, local_scale, local_orig_scale], outputs=[gr_image_1, gr_image_2, local_probs] ) def reset(idx, local_orig_pos): idx = idx - 1 return np.copy(local_orig_pos), np.ones((11, 2), dtype=np.float32), float(local_orig_pos[idx, 0]), \ float(local_orig_pos[idx, 1]), 1., 1. gr_button_reset.click( fn=reset, inputs=[gr_slot_slider, local_orig_pos], outputs=[local_pos, local_scale, gr_x_slider, gr_y_slider, gr_sx_slider, gr_sy_slider] ) demo.launch()