# Show VLAD clustering for set of example images or a user image """ User input: - Domain: Indoor, Aerial, or Urban - Image: Image to be clustered - Cluster numbers (to visualize) - Pixel coordinates (to pick further clusters) - A unique cache ID (to store the DINO forward passes) There are example images for each domain. Output: - All images with cluster assignments Some Gradio links: - Controlling layout - https://www.gradio.app/guides/quickstart#blocks-more-flexibility-and-control - Data state (persistence) - https://www.gradio.app/guides/interface-state - https://www.gradio.app/docs/state - Layout control - https://www.gradio.app/guides/controlling-layout - https://www.gradio.app/guides/blocks-and-event-listeners """ # A markdown string shown at the top of the app header_markdown = """ # AnyLoc Demo \| [Website](https://anyloc.github.io/) \| \ [GitHub](https://github.com/AnyLoc/AnyLoc) \| \ [YouTube](https://youtu.be/ITo8rMInatk) \| This space contains a collection of demos for AnyLoc. Each demo is a \ self-contained application in the tabs below. The following \ applications are included 1. **GeM t-SNE Projection**: Upload a set of images and see where \ they land on a t-SNE projection of GeM descriptors from many \ domains. This can be used to guide domain selection (from a few \ representative images). 2. **Cluster Visualization**: This visualizes the VLAD cluster \ assignments for the patch descriptors. You need to select the \ domain for loading VLAD cluster centers (vocabulary). We do **not** save any images uploaded to the demo. Some errors may \ leave a log. We do not collect any information about the user. The \ example images are attributed in the respective tabs. 🥳 Thanks to HuggingFace for providing a free GPU for this demo. """ # %% import os import gradio as gr import numpy as np import cv2 as cv import torch from torch import nn from torch.nn import functional as F from torchvision import transforms as tvf from torchvision.transforms import functional as T from PIL import Image import matplotlib.pyplot as plt from sklearn.manifold import TSNE import distinctipy as dipy import joblib from typing import Literal, List import gradio as gr import time import glob import shutil import matplotlib.pyplot as plt from copy import deepcopy # DINOv2 imports from utilities import DinoV2ExtractFeatures from utilities import VLAD device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # %% # Configurations T1 = Literal["query", "key", "value", "token"] T2 = Literal["aerial", "indoor", "urban"] DOMAINS = ["aerial", "indoor", "urban"] T3 = Literal["dinov2_vits14", "dinov2_vitb14", "dinov2_vitl14", "dinov2_vitg14"] _ex = lambda x: os.path.realpath(os.path.expanduser(x)) dino_model: T3 = "dinov2_vitg14" desc_layer: int = 31 desc_facet: T1 = "value" num_c: int = 8 cache_dir: str = _ex("./cache") # Directory containing program cache max_img_size: int = 1024 # Image resolution (max dim/size) max_num_imgs: int = 16 # Max number of images to upload share: bool = False # Share application using .gradio link # Verify inputs assert os.path.isdir(cache_dir), "Cache directory not found" # %% # Model and transforms print("Loading DINO model") # extractor = None # FIXME: For quick testing only extractor = DinoV2ExtractFeatures(dino_model, desc_layer, desc_facet, device=device) print("DINO model loaded") # VLAD path (directory) ext_s = f"{dino_model}/l{desc_layer}_{desc_facet}_c{num_c}" vc_dir = os.path.join(cache_dir, "vocabulary", ext_s) assert os.path.isdir(vc_dir), f"VLAD directory: {vc_dir} not found" # GeM path (cache) gem_cf = os.path.join(cache_dir, "gem_cache", "result_dino_v2.gz") assert os.path.isfile(gem_cf), f"GeM cache: {gem_cf} not found" gem_cache = joblib.load(gem_cf) assert gem_cache["model"]["type"] == dino_model assert gem_cache["model"]["layer"] == desc_layer assert gem_cache["model"]["facet"] == desc_facet fig = plt.figure() # Main figure fig.clear() # Base image transformations base_tf = tvf.Compose([ tvf.ToTensor(), tvf.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) # %% # Get VLAD object def get_vlad_clusters(domain, pr = gr.Progress()): dm: T2 = str(domain).lower() assert dm in DOMAINS, "Invalid domain" # Load VLAD cluster centers pr(0, desc="Loading VLAD clusters") c_centers_file = os.path.join(vc_dir, dm, "c_centers.pt") if not os.path.isfile(c_centers_file): return f"Cluster centers not found for: {domain}", None c_centers = torch.load(c_centers_file) pr(0.5) num_c = c_centers.shape[0] desc_dim = c_centers.shape[1] vlad = VLAD(num_c, desc_dim, cache_dir=os.path.dirname(c_centers_file)) vlad.fit(None) # Restore the cache pr(1) return f"VLAD clusters loaded for: {domain}", vlad # %% # Get VLAD descriptors @torch.no_grad() def get_descs(imgs_batch, pr = gr.Progress()): imgs_batch: List[np.ndarray] = imgs_batch pr(0, desc="Extracting descriptors") patch_descs = [] for i, img in enumerate(imgs_batch): if img is None: print(f"Image {i+1} is None") continue # Convert to PIL image pil_img = Image.fromarray(img) img_pt = base_tf(pil_img).to(device) if max(img_pt.shape[-2:]) > max_img_size: print(f"Image {i+1}: {img_pt.shape[-2:]}, outside") c, h, w = img_pt.shape # Maintain aspect ratio if h == max(img_pt.shape[-2:]): w = int(w * max_img_size / h) h = max_img_size else: h = int(h * max_img_size / w) w = max_img_size img_pt = T.resize(img_pt, (h, w), interpolation=T.InterpolationMode.BICUBIC) pil_img = pil_img.resize((w, h)) # Backup # Make image patchable c, h, w = img_pt.shape h_new, w_new = (h // 14) * 14, (w // 14) * 14 img_pt = tvf.CenterCrop((h_new, w_new))(img_pt)[None, ...] # Extract descriptors ret = extractor(img_pt).cpu() # [1, n_p, d] patch_descs.append({"img": pil_img, "descs": ret}) pr((i+1) / len(imgs_batch)) pr(1.0) return patch_descs, \ f"Descriptors extracted for {len(imgs_batch)} images" # %% # Assign VLAD clusters (descriptor assignment) def assign_vlad(patch_descs, vlad, pr = gr.Progress()): vlad: VLAD = vlad img_patch_descs = [pd["descs"] for pd in patch_descs] pr(0, desc="Assigning VLAD clusters") desc_assignments = [] # List[Tensor;shape=('h', 'w');int] for i, qu_desc in enumerate(img_patch_descs): # Residual vectors; 'n' could differ (based on img sizes) res = vlad.generate_res_vec(qu_desc[0]) # ['n', n_c, d] img = patch_descs[i]["img"] h, w, c = np.array(img).shape h_p, w_p = h // 14, w // 14 h_new, w_new = h_p * 14, w_p * 14 assert h_p * w_p == res.shape[0], "Residual incorrect!" # Descriptor assignments da = res.abs().sum(dim=2).argmin(dim=1).reshape(h_p, w_p) da = F.interpolate(da[None, None, ...].to(float), (h_new, w_new), mode="nearest")[0, 0].to(da.dtype) desc_assignments.append(da) pr((i+1) / len(img_patch_descs)) pr(1.0) return desc_assignments, "VLAD clusters assigned" # %% # Cluster assignments to images def get_ca_images(desc_assignments, patch_descs, alpha, pr = gr.Progress()): if desc_assignments is None or len(desc_assignments) == 0: if not 0 <= alpha <= 1: return None, f"Invalid alpha value: {alpha} (should be "\ "between 0 and 1)" return None, "First load the images" c_colors = dipy.get_colors(num_c, rng=928, colorblind_type="Deuteranomaly") np_colors = (np.array(c_colors) * 255).astype(np.uint8) # Get images with clusters pil_imgs = [pd["img"] for pd in patch_descs] res_imgs = [] # List[PIL.Image] pr(0, desc="Generating cluster assignment images") for i, pil_img in enumerate(pil_imgs): # Descriptor assignment image: [h, w, 3] da: torch.Tensor = desc_assignments[i] # ['h', 'w'] da_img = np.zeros((*da.shape, 3), dtype=np.uint8) for c in range(num_c): da_img[da == c] = np_colors[c] # Background image: [h, w, 3] img_np = np.array(pil_img, dtype=np.uint8) h, w, c = np.array(img_np).shape h_p, w_p = (h // 14), (w // 14) h_new, w_new = h_p * 14, w_p * 14 img_np = F.interpolate(torch.tensor(img_np)\ .permute(2, 0, 1)[None, ...], (h_new, w_new), mode='nearest')[0].permute(1, 2, 0).numpy() res_img = cv.addWeighted(img_np, 1 - alpha, da_img, alpha, 0.) res_imgs.append(Image.fromarray(res_img)) pr((i+1) / len(pil_imgs)) pr(1.0) return res_imgs, "Cluster assignment images generated" # %% # Get GeM descriptors from cache def get_gem_descs_cache(use_d, pr = gr.Progress()): use_d: List[str] = use_d if len(use_d) == 0: return "Select at least one domain", None else: use_d = [d.lower() for d in use_d] indoor_datasets = ["baidu_datasets", "gardens", "17places"] urban_datasets = ["pitts30k", "st_lucia", "Oxford"] aerial_datasets = ["Tartan_GNSS_test_rotated", "Tartan_GNSS_test_notrotated", "VPAir"] pr(0, desc="Loading GeM descriptors from cache") gem_descs = { "labels": [], "descs": [], } for i, ds in enumerate(gem_cache["data"]): # GeM descriptors from data: n_desc, desc_dim d: np.ndarray = gem_cache["data"][ds]["descriptors"] if ds in indoor_datasets and "indoor" in use_d: gem_descs["labels"].extend(["indoor"] * d.shape[0]) elif ds in urban_datasets and "urban" in use_d: gem_descs["labels"].extend(["urban"] * d.shape[0]) elif ds in aerial_datasets and "aerial" in use_d: gem_descs["labels"].extend(["aerial"] * d.shape[0]) else: continue gem_descs["descs"].append(d) pr((i+1) / len(gem_cache["data"])) gem_descs["descs"] = np.concatenate(gem_descs["descs"], axis=0) pr(1.0) return "GeM descriptors loaded from cache", gem_descs # %% # Get GeM pooled features of the uploaded images def get_add_gem_descs(imgs_batch, gem_descs, pr = gr.Progress()): imgs_batch: List[np.ndarray] = imgs_batch gem_descs: dict = gem_descs pr(0, desc="Extracting GeM descriptors") num_imgs_extracted = 0 for i, img in enumerate(imgs_batch): if img is None: print(f"Image {i+1} is None") continue # Convert to PIL image pil_img = Image.fromarray(img) img_pt = base_tf(pil_img).to(device) if max(img_pt.shape[-2:]) > max_img_size: print(f"Image {i+1}: {img_pt.shape[-2:]}, outside") c, h, w = img_pt.shape # Maintain aspect ratio if h == max(img_pt.shape[-2:]): w = int(w * max_img_size / h) h = max_img_size else: h = int(h * max_img_size / w) w = max_img_size img_pt = T.resize(img_pt, (h, w), interpolation=T.InterpolationMode.BICUBIC) pil_img = pil_img.resize((w, h)) # Backup # Make image patchable c, h, w = img_pt.shape h_new, w_new = (h // 14) * 14, (w // 14) * 14 img_pt = tvf.CenterCrop((h_new, w_new))(img_pt)[None, ...] # Extract descriptors ret = extractor(img_pt).cpu() # [1, n_p, d] # Get the GeM pooled descriptor x = torch.mean(ret**3, dim=-2) g_res = x.to(torch.complex64) ** (1/3) g_res = torch.abs(g_res) * torch.sign(x) # [1, d] g_res = g_res.numpy() # Add to state gem_descs["labels"].append(f"Image{i+1}") gem_descs["descs"] = np.concatenate([gem_descs["descs"], g_res]) num_imgs_extracted += 1 pr((i+1) / len(imgs_batch)) pr(1.0) gem_descs["num_uimgs"] = num_imgs_extracted return gem_descs, "GeM descriptors extracted" # %% # Apply tSNE to the GeM descriptors def get_tsne_fm_gem(gem_descs, pr = gr.Progress()): pr(0, desc="Applying tSNE to GeM descriptors") desc_all: np.ndarray = gem_descs["descs"] # [n, d_dim] labels_all: List[str] = gem_descs["labels"] # [n] # tSNE projection tsne = TSNE(n_components=2, random_state=30, perplexity=50, learning_rate=200, init='random') desc_2d = tsne.fit_transform(desc_all) # Result tsne_pts = { "labels": labels_all, "pts": desc_2d, "num_uimgs": gem_descs["num_uimgs"], # Number of user imgs } pr(1.0) return tsne_pts, "tSNE projection done" # %% # Plot tSNE to matplotlib figure def plot_tsne(tsne_pts): colors = { "aerial": (80/255, 0/255, 80/255), "indoor": ( 0/255, 76/255, 204/255), "urban": ( 0/255, 204/255, 0/255), } ni = int(tsne_pts["num_uimgs"]) # Custom colors for user images ucs = dipy.get_colors(ni, exclude_colors=list(colors.values())\ .extend([(0, 0, 0), (1, 1, 1)]), colorblind_type="Deuteranomaly") for i in range(ni): colors[f"Image{i+1}"] = ucs[i] fig.clear() gs = fig.add_gridspec(1, 1) ax = fig.add_subplot(gs[0, 0]) ax.set_title("tSNE Projection") for i, domain in enumerate(list(colors.keys())): pts = tsne_pts["pts"][np.array(tsne_pts["labels"]) == domain] if domain.startswith("Image"): m = "x" else: m = "o" ax.scatter(pts[:, 0], pts[:, 1], label=domain, marker=m, color=colors[domain]) # Put legend at the bottom of axis ax.legend() ax.set_xticks([]) ax.set_yticks([]) fig.set_tight_layout(True) # fig.set_tight_layout(True) return fig, "tSNE plot created" # %% print("Interface build started") # Tab for VLAD cluster assignment visualization def tab_cluster_viz(): d_vals = [k.title() for k in DOMAINS] domain = gr.Radio(d_vals, value=d_vals[0], label="Domain", info="The domain of images (for loading VLAD vocabulary)") nimg_s = gr.Number(2, label="How many images?", precision=0, info=f"Between '1' and '{max_num_imgs}' images. Press "\ "enter/return to register") with gr.Row(): # Dynamic row (images in columns) imgs = [gr.Image(label=f"Image {i+1}", visible=True) \ for i in range(int(nimg_s.value))] + \ [gr.Image(visible=False) \ for _ in range(max_num_imgs - int(nimg_s.value))] for i, img in enumerate(imgs): # Set image as "input" img.change(lambda _: None, img) with gr.Row(): # Dynamic row of output (cluster) images imgs2 = [gr.Image(label=f"VLAD Clusters {i+1}", visible=False) for i in range(max_num_imgs)] nimg_s.submit(var_num_img, nimg_s, imgs) blend_alpha = gr.Number(0.4, label="Blending alpha", info="Weight for cluster centers (between 0 and 1). "\ "Higher (close to 1) means greater emphasis on cluster "\ "visibility. Lower (closer to 0) will show the "\ "underlying image more. "\ "Press enter/return to register") bttn1 = gr.Button("Click Me!") # Cluster assignment gr.Markdown("### Status strings") out_msg1 = gr.Markdown("Select domain and upload images") out_msg2 = gr.Markdown("For descriptor extraction") out_msg3 = gr.Markdown("Followed by VLAD assignment") out_msg4 = gr.Markdown("Followed by cluster images") # ---- Utility functions ---- # A wrapper to batch the images def batch_images(data): sv = int(data[nimg_s]) images: List[np.ndarray] = [data[imgs[k]] \ for k in range(sv)] return images # A wrapper to unbatch images (and pad to max) def unbatch_images(imgs_batch, nimg): ret = [gr.Image.update(visible=False) \ for _ in range(max_num_imgs)] if imgs_batch is None or len(imgs_batch) == 0: return ret for i in range(nimg): # nimg only to match input layout if i < len(imgs_batch): img_np = np.array(imgs_batch[i]) else: img_np = None ret[i] = gr.Image.update(img_np, visible=True) return ret # ---- Examples ---- # Two images from each domain gr.Examples( [ ["Aerial", 2, "ex_aerial_nardo-air_db-42.png", "ex_aerial_nardo-air_qu-42.png",], ["Indoor", 2, "ex_indoor_17places_db-75.jpg", "ex_indoor_17places_qu-75.jpg"], ["Urban", 2, "ex_urban_oxford_db-75.png", "ex_urban_oxford_qu-75.png"],], [domain, nimg_s, *imgs], ) # ---- Main pipeline ---- # Get the VLAD cluster assignment images on click bttn1.click(get_vlad_clusters, domain, [out_msg1, vlad])\ .then(batch_images, {nimg_s, *imgs, imgs_batch}, imgs_batch)\ .then(get_descs, imgs_batch, [patch_descs, out_msg2])\ .then(assign_vlad, [patch_descs, vlad], [desc_assignments, out_msg3])\ .then(get_ca_images, [desc_assignments, patch_descs, blend_alpha], [imgs_batch, out_msg4])\ .then(unbatch_images, [imgs_batch, nimg_s], imgs2) # If the blending changes now, update the cluster images only blend_alpha.submit(get_ca_images, [desc_assignments, patch_descs, blend_alpha], [imgs_batch, out_msg4])\ .then(unbatch_images, [imgs_batch, nimg_s], imgs2) # Tab for GeM t-SNE projection plot def tab_gem_tsne(): d_vals = [k.title() for k in DOMAINS] dms = gr.CheckboxGroup(d_vals, value=d_vals, label="Domains", info="The domains to use for the t-SNE projection") nimg_s = gr.Number(2, label="How many images?", precision=0, info=f"Between '1' and '{max_num_imgs}' images. Press "\ "enter/return to register") with gr.Row(): # Dynamic row (images in columns) imgs = [gr.Image(label=f"Image {i+1}", visible=True) \ for i in range(int(nimg_s.value))] + \ [gr.Image(visible=False) \ for _ in range(max_num_imgs - int(nimg_s.value))] for i, img in enumerate(imgs): # Set image as "input" img.change(lambda _: None, img) nimg_s.submit(var_num_img, nimg_s, imgs) tsne_plot = gr.Plot(None, label="tSNE Plot") out_msg1 = gr.Markdown("Select domains") out_msg2 = gr.Markdown("Upload images") out_msg3 = gr.Markdown("Wait for tSNE plots") # A wrapper to batch the images def batch_images(data): sv = int(data[nimg_s]) # images: List[np.ndarray] = [data[imgs[k]] \ # for k in range(sv)] images: List[np.ndarray] = [] for k in range(sv): img = data[imgs[k]] if img is None: return None, f"Image {k+1} is None!" images.append(img) return images, "Images batched" bttn1 = gr.Button("Click Me!") # ---- Examples ---- gr.Examples( [ ["./ex_dining_room.jpeg", "./ex_city_road.jpeg"], ["./ex_manhattan_aerial.jpeg", "./ex_city_road.jpeg"], ["./ex_dining_room.jpeg", "./ex_manhattan_aerial.jpeg"], ], [*imgs], ) # ---- Main pipeline ---- # Get the tSNE plot bttn1.click(get_gem_descs_cache, dms, [out_msg1, gem_descs])\ .then(batch_images, {nimg_s, *imgs, imgs_batch}, [imgs_batch, out_msg2])\ .then(get_add_gem_descs, [imgs_batch, gem_descs], [gem_descs, out_msg2])\ .then(get_tsne_fm_gem, gem_descs, [tsne_pts, out_msg3])\ .then(plot_tsne, tsne_pts, [tsne_plot, out_msg3]) # Build the interface with gr.Blocks() as demo: # Main header gr.Markdown(header_markdown) # ---- Helper functions ---- # Variable number of input images (show/hide UI image array) def var_num_img(s): n = int(s) # Slider (string) value as int assert 1 <= n <= max_num_imgs, f"Invalid num of images: {n}!" return [gr.Image.update(label=f"Image {i+1}", visible=True) \ for i in range(n)] \ + [gr.Image.update(visible=False) \ for _ in range(max_num_imgs - n)] # ---- State declarations ---- vlad = gr.State() # VLAD object desc_assignments = gr.State() # Cluster assignments imgs_batch = gr.State() # Images as batch patch_descs = gr.State() # Patch descriptors gem_descs = gr.State() # GeM descriptors (of each state) tsne_pts = gr.State() # tSNE points # ---- All UI elements ---- with gr.Tab("GeM t-SNE Projection"): gr.Markdown( """ ## GeM t-SNE Projection Select the domains (toggle visibility) for t-SNE plot. \ Enter the number of images to upload and upload images. \ Then click the button to get the t-SNE plot. You can also directly click on one of the examples (at \ the bottom) to load the data and then click the button \ to get the t-SNE plot. The examples have the following images - [Manhattan aerial view](https://www.crushpixel.com/stock-photo/aerial-view-midtown-manhattan-849717.html) - [Dining room](https://homesfeed.com/formal-dining-room-sets-for-8/) - [City road](https://pxhere.com/en/photo/824211) """) tab_gem_tsne() with gr.Tab("Cluster Visualization"): gr.Markdown( """ ## Cluster Visualizations Select the domain for the images (all should be from the \ same domain). Enter the number of images to upload. \ Upload the images. Then click the button to get the \ cluster assignment images. You can also directly click on one of the examples (at \ the bottom) to load the data and then click the button \ to get the cluster assignment images. - The `aerial` example is from the Tartan Air dataset - The `indoor` example is from the 17Places dataset - The `urban` example is from the Oxford dataset """) tab_cluster_viz() print("Interface build completed") # %% # Deploy application demo.queue().launch(share=share) print("Application deployment ended, exiting...")