Spaces:
Runtime error
Runtime error
import os | |
from typing import Callable | |
from clu import checkpoint | |
from flax import linen as nn | |
import gradio as gr | |
from huggingface_hub import snapshot_download | |
import jax | |
import jax.numpy as jnp | |
import numpy as np | |
from PIL import Image | |
from invariant_slot_attention.configs.clevr_with_masks.equiv_transl_scale_v2 import get_config | |
from invariant_slot_attention.lib import utils | |
def load_model(config, checkpoint_dir): | |
rng = jax.random.PRNGKey(42) | |
# Initialize model | |
model = utils.build_model_from_config(config.model) | |
def init_model(rng): | |
rng, init_rng, model_rng, dropout_rng = jax.random.split(rng, num=4) | |
init_conditioning = None | |
init_inputs = jnp.ones([1, 1, 128, 128, 3], jnp.float32) | |
initial_vars = model.init( | |
{"params": model_rng, "state_init": init_rng, "dropout": dropout_rng}, | |
video=init_inputs, conditioning=init_conditioning, | |
padding_mask=jnp.ones(init_inputs.shape[:-1], jnp.int32)) | |
# Split into state variables (e.g. for batchnorm stats) and model params. | |
# Note that `pop()` on a FrozenDict performs a deep copy. | |
state_vars, initial_params = initial_vars.pop("params") # pytype: disable=attribute-error | |
# Filter out intermediates (we don't want to store these in the TrainState). | |
state_vars = utils.filter_key_from_frozen_dict( | |
state_vars, key="intermediates") | |
return state_vars, initial_params | |
state_vars, initial_params = init_model(rng) | |
opt_state = None | |
state = utils.TrainState( | |
step=1, opt_state=opt_state, params=initial_params, rng=rng, | |
variables=state_vars) | |
ckpt = checkpoint.Checkpoint(checkpoint_dir) | |
state = ckpt.restore(state, checkpoint=checkpoint_dir + "/ckpt-0") | |
return model, state, rng | |
def load_image(name): | |
img = Image.open(f"images/{name}.png") | |
img = img.crop((64, 29, 64 + 192, 29 + 192)) | |
img = img.resize((128, 128)) | |
img = np.array(img)[:, :, :3] / 255. | |
img = jnp.array(img, dtype=jnp.float32) | |
return img | |
download_path = snapshot_download(repo_id="ondrejbiza/isa", allow_patterns="clevr_isa_ts_v2*") | |
checkpoint_dir = os.path.join(download_path, "clevr_isa_ts_v2") | |
model, state, rng = load_model(get_config(), checkpoint_dir) | |
rng, init_rng = jax.random.split(rng, num=2) | |
class DecoderWrapper(nn.Module): | |
decoder: Callable[[], nn.Module] | |
def __call__(self, slots, train=False): | |
return self.decoder()(slots, train) | |
decoder_model = DecoderWrapper(decoder=model.decoder) | |
with gr.Blocks() as demo: | |
local_slots = gr.State(np.zeros((11, 64), dtype=np.float32)) | |
local_orig_pos = gr.State(np.zeros((11, 2), dtype=np.float32)) | |
local_orig_scale = gr.State(np.zeros((11, 2), dtype=np.float32)) | |
local_pos = gr.State(np.zeros((11, 2), dtype=np.float32)) | |
local_scale = gr.State(np.ones((11, 2), dtype=np.float32)) | |
local_probs = gr.State(np.zeros((11, 128, 128), dtype=np.float32)) | |
with gr.Row(): | |
gr_choose_image = gr.Dropdown( | |
[f"img{i}" for i in range(1, 9)], label="CLEVR Image", info="Start by a picking an image from the CLEVR dataset." | |
) | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Row(): | |
with gr.Column(): | |
gr_image_1 = gr.Image(type="numpy", shape=(112, 112), source="canvas", label="Decoding") | |
with gr.Column(): | |
gr_image_2 = gr.Image(type="numpy", shape=(112, 112), source="canvas", label="Segmentation") | |
with gr.Column(): | |
gr_slot_slider = gr.Slider(1, 11, value=1, step=1, label="Slot Index", | |
info="Change slot index too see the segmentation mask, position and scale of each slot.") | |
gr_y_slider = gr.Slider(-1, 1, value=0, step=0.01, label="X") | |
gr_x_slider = gr.Slider(-1, 1, value=0, step=0.01, label="Y") | |
gr_sy_slider = gr.Slider(0.5, 1.5, value=1., step=0.1, label="Width Multiplier") | |
gr_sx_slider = gr.Slider(0.5, 1.5, value=1., step=0.1, label="Height Multiplier") | |
with gr.Row(): | |
with gr.Column(): | |
gr_button_render = gr.Button("Render", variant="primary", info="Render a new image with altered positions and scales.") | |
with gr.Column(): | |
gr_button_reset = gr.Button("Reset", info="Reset slot statistics.") | |
def update_image_and_segmentation(name, idx): | |
idx = idx - 1 | |
img_input = load_image(name) | |
out = model.apply( | |
{"params": state.params, **state.variables}, | |
video=img_input[None, None], | |
rngs={"state_init": init_rng}, | |
train=False) | |
probs = np.array(nn.softmax(out["outputs"]["segmentation_logits"][0, 0, :, :, :, 0], axis=0)) | |
img = np.array(out["outputs"]["video"][0, 0]) | |
img = np.clip(img, 0, 1) | |
slots_ = np.array(out["states"]) | |
slots = slots_[0, 0, :, :-4] | |
pos = slots_[0, 0, :, -4: -2] | |
scale = slots_[0, 0, :, -2:] | |
return (img * 255).astype(np.uint8), (probs[idx] * 255).astype(np.uint8), float(pos[idx, 0]), \ | |
float(pos[idx, 1]), probs, slots, pos, np.ones((11, 2), dtype=np.float32), pos, scale | |
gr_choose_image.change( | |
fn=update_image_and_segmentation, | |
inputs=[gr_choose_image, gr_slot_slider], | |
outputs=[gr_image_1, gr_image_2, gr_x_slider, gr_y_slider, local_probs, | |
local_slots, local_pos, local_scale, local_orig_pos, local_orig_scale] | |
) | |
def update_sliders(idx, local_probs, local_pos, local_scale): | |
idx = idx - 1 # 1-indexing to 0-indexing | |
return (local_probs[idx] * 255).astype(np.uint8), float(local_pos[idx, 0]), \ | |
float(local_pos[idx, 1]), float(local_scale[idx, 0]), float(local_scale[idx, 1]) | |
gr_slot_slider.release( | |
fn=update_sliders, | |
inputs=[gr_slot_slider, local_probs, local_pos, local_scale], | |
outputs=[gr_image_2, gr_x_slider, gr_y_slider, gr_sx_slider, gr_sy_slider] | |
) | |
def update_pos_x(idx, val, local_pos): | |
local_pos[idx - 1, 0] = val | |
return local_pos | |
def update_pos_y(idx, val, local_pos): | |
local_pos[idx - 1, 1] = val | |
return local_pos | |
def update_scale_x(idx, val, local_scale): | |
local_scale[idx - 1, 0] = val | |
return local_scale | |
def update_scale_y(idx, val, local_scale): | |
local_scale[idx - 1, 1] = val | |
return local_scale | |
gr_x_slider.release( | |
fn=update_pos_x, | |
inputs=[gr_slot_slider, gr_x_slider, local_pos], | |
outputs=local_pos | |
) | |
gr_y_slider.release( | |
fn=update_pos_y, | |
inputs=[gr_slot_slider, gr_y_slider, local_pos], | |
outputs=local_pos | |
) | |
gr_sx_slider.release( | |
fn=update_scale_x, | |
inputs=[gr_slot_slider, gr_sx_slider, local_scale], | |
outputs=local_scale | |
) | |
gr_sy_slider.release( | |
fn=update_scale_y, | |
inputs=[gr_slot_slider, gr_sy_slider, local_scale], | |
outputs=local_scale | |
) | |
def render(idx, local_slots, local_pos, local_scale, local_orig_scale): | |
idx = idx - 1 | |
slots = np.concatenate([local_slots, local_pos, local_scale * local_orig_scale], axis=-1) | |
slots = jnp.array(slots) | |
out = decoder_model.apply( | |
{"params": state.params, **state.variables}, | |
slots=slots[None, None], | |
train=False | |
) | |
probs = np.array(nn.softmax(out["segmentation_logits"][0, 0, :, :, :, 0], axis=0)) | |
image = np.array(out["video"][0, 0]) | |
image = np.clip(image, 0, 1) | |
return (image * 255).astype(np.uint8), (probs[idx] * 255).astype(np.uint8), probs | |
gr_button_render.click( | |
fn=render, | |
inputs=[gr_slot_slider, local_slots, local_pos, local_scale, local_orig_scale], | |
outputs=[gr_image_1, gr_image_2, local_probs] | |
) | |
def reset(idx, local_orig_pos): | |
idx = idx - 1 | |
return np.copy(local_orig_pos), np.ones((11, 2), dtype=np.float32), float(local_orig_pos[idx, 0]), \ | |
float(local_orig_pos[idx, 1]), 1., 1. | |
gr_button_reset.click( | |
fn=reset, | |
inputs=[gr_slot_slider, local_orig_pos], | |
outputs=[local_pos, local_scale, gr_x_slider, gr_y_slider, gr_sx_slider, gr_sy_slider] | |
) | |
demo.launch() | |