custom_css = """ """ custom_html = custom_css + """

The Sound of Water: Inferring Physical Properties from Pouring Liquids

Project Page | Github | Paper | Data Models

""" tips = """


Please give us a 🌟 on Github if you like our work! Tips to get better results:
""" import os import sys import gradio as gr import torch import numpy as np import matplotlib.pyplot as plt plt.rcParams["font.family"] = "serif" import decord import PIL, PIL.Image import librosa from IPython.display import Markdown, display import pandas as pd import shared.utils as su import sound_of_water.audio_pitch.model as audio_models import sound_of_water.data.audio_loader as audio_loader import sound_of_water.data.audio_transforms as at import sound_of_water.data.csv_loader as csv_loader def read_html_file(file): with open(file) as f: return f.read() def define_axes(figsize=(13, 4), width_ratios=[0.22, 0.78]): fig, axes = plt.subplots( 1, 2, figsize=figsize, width_ratios=width_ratios, layout="constrained", ) return fig, axes def show_frame_and_spectrogram(frame, spectrogram, visualise_args, axes=None): """Shows the frame and spectrogram side by side.""" if axes is None: fig, axes = define_axes() else: assert len(axes) == 2 ax = axes[0] ax.imshow(frame, aspect="auto") ax.set_title("Example frame") ax.set_xticks([]) ax.set_yticks([]) ax = axes[1] audio_loader.show_logmelspectrogram( S=spectrogram, ax=ax, show=False, sr=visualise_args["sr"], n_fft=visualise_args["n_fft"], hop_length=visualise_args["hop_length"], ) def scatter_pitch(ax, t, f, s=60, marker="o", color="limegreen", label="Pitch"): """Scatter plot of pitch.""" ax.scatter(t, f, color=color, label=label, s=s, marker=marker) ax.set_xlabel("Time (s)") ax.set_ylabel("Frequency (Hz)") ax.legend(loc="upper left") def load_frame(video_path, video_backend="decord"): if video_backend == "decord": vr = decord.VideoReader(video_path, num_threads=1) frame = PIL.Image.fromarray(vr[0].asnumpy()) elif video_backend == "torchvision": import torchvision.io as tio video, _, _ = tio.read_video(video_path, pts_unit="sec") frame = video[0] frame = PIL.Image.fromarray(frame.numpy()) else: raise ValueError(f"Unknown video backend: {video_backend}") frame = audio_loader.crop_or_pad_to_size(frame, size=(270, 480)) return frame def load_spectrogram(video_path): y = audio_loader.load_audio_clips( audio_path=video_path, clips=None, load_entire=True, cut_to_clip_len=False, **aload_args, )[0] S = audio_loader.librosa_harmonic_spectrogram_db( y, sr=visualise_args["sr"], n_fft=visualise_args["n_fft"], hop_length=visualise_args["hop_length"], n_mels=visualise_args['n_mels'], ) return S # Load audio visualise_args = { "sr": 16000, "n_fft": 400, "hop_length": 320, "n_mels": 64, "margin": 16., "C": 340 * 100., "audio_output_fps": 49., "w_max": 100., "n_bins": 64, } aload_args = { "sr": 16000, "clip_len": None, "backend": "decord", } cfg_backbone = { "name": "Wav2Vec2WithTimeEncoding", "args": dict(), } backbone = getattr(audio_models, cfg_backbone["name"])( **cfg_backbone["args"], ) cfg_model = { "name": "WavelengthWithTime", "args": { "axial": True, "axial_bins": 64, "radial": True, "radial_bins": 64, "freeze_backbone": True, "train_backbone_modules": [6, 7, 8, 9, 10, 11], "act": "softmax", "criterion": "kl_div", } } def load_model(): model = getattr(audio_models, cfg_model["name"])( backbone=backbone, **cfg_model["args"], ) su.misc.num_params(model) # Load the model weights from trained checkpoint # NOTE: Be sure to set the correct path to the checkpoint su.log.print_update("[:::] Loading checkpoint ", color="cyan", fillchar=".", pos="left") # ckpt_dir = "/work/piyush/pretrained_checkpoints/SoundOfWater" ckpt_dir = "./checkpoints" ckpt_path = os.path.join( ckpt_dir, "dsr9mf13_ep100_step12423_real_finetuned_with_cosupervision.pth", ) assert os.path.exists(ckpt_path), \ f"Checkpoint not found at {ckpt_path}." print("Loading checkpoint from: ", ckpt_path) ckpt = torch.load(ckpt_path, map_location="cpu") msg = model.load_state_dict(ckpt) print(msg) return model # Define audio transforms cfg_transform = { "audio": { "wave": [ { "name": "AddNoise", "args": { "noise_level": 0.001 }, "augmentation": True, }, { "name": "ChangeVolume", "args": { "volume_factor": [0.8, 1.2] }, "augmentation": True, }, { "name": "Wav2Vec2WaveformProcessor", "args": { "model_name": "facebook/wav2vec2-base-960h", "sr": 16000 } } ], "spec": None, } } audio_transform = at.define_audio_transforms( cfg_transform, augment=False, ) # Define audio pipeline arguments apipe_args = { "spec_args": None, "stack": True, } def load_audio_tensor(video_path): # Load and transform input audio audio = audio_loader.load_and_process_audio( audio_path=video_path, clips=None, load_entire=True, cut_to_clip_len=False, audio_transform=audio_transform, aload_args=aload_args, apipe_args=apipe_args, )[0] return audio def get_model_output(audio, model): with torch.no_grad(): NS = audio.shape[-1] duration = NS / 16000 t = torch.tensor([[0, duration]]).unsqueeze(0) x = audio.unsqueeze(0) z_audio = model.backbone(x, t)[0][0].cpu() y_audio = model(x, t)["axial"][0][0].cpu() return z_audio, y_audio def show_output(frame, S, y_audio, z_audio): # duration = S.shape[-1] / visualise_args["sr"] # print(S.shape, y_audio.shape, z_audio.shape) duration = librosa.get_duration( S=S, sr=visualise_args["sr"], n_fft=visualise_args["n_fft"], hop_length=visualise_args["hop_length"], ) timestamps = np.linspace(0., duration, 25) # Get timestamps at evaluation frames n_frames = len(y_audio) timestamps_eval = librosa.frames_to_time( np.arange(n_frames), sr=visualise_args['sr'], n_fft=visualise_args['n_fft'], hop_length=visualise_args['hop_length'], ) # Get predicted frequencies at these times wavelengths = y_audio @ torch.linspace( 0, visualise_args['w_max'], visualise_args['n_bins'], ) f_pred = visualise_args['C'] / wavelengths # Pick only those timestamps where we define the true pitch indices = su.misc.find_nearest_indices(timestamps_eval, timestamps) f_pred = f_pred[indices] # print(timestamps, f_pred) # Show the true/pref pitch overlaid on the spectrogram fig, axes = define_axes() show_frame_and_spectrogram(frame, S, visualise_args, axes=axes) scatter_pitch(axes[1], timestamps, f_pred, color="white", label="Estimated pitch", marker="o", s=70) axes[1].set_title("True and predicted pitch overlaid on the spectrogram") # plt.show() # Convert to PIL Image and return the Image from PIL import Image # Draw the figure to a canvas canvas = fig.canvas canvas.draw() # Get the RGBA buffer from the figure w, h = fig.canvas.get_width_height() buf = canvas.tostring_rgb() # Create a PIL image from the RGB data image = Image.frombytes("RGB", (w, h), buf) # Get physical properties l_pred = su.physics.estimate_length_of_air_column(wavelengths) l_pred_mean = l_pred.mean().item() l_pred_mean = np.round(l_pred_mean, 2) H_pred = su.physics.estimate_cylinder_height(wavelengths) H_pred = np.round(H_pred, 2) R_pred = su.physics.estimate_cylinder_radius(wavelengths) R_pred = np.round(R_pred, 2) # print(f"Estimated length: {l_pred_mean} cm, Estimated height: {H_pred} cm, Estimated radius: {R_pred} cm") df_show = pd.DataFrame({ "Physical Property": ["Container height", "Container radius", "Length of air column (mean)"], "Estimated Value (in cms)": [H_pred, R_pred, l_pred_mean], }) tsne_image = su.visualize.show_temporal_tsne( z_audio.detach().numpy(), timestamps_eval, show=False, figsize=(6, 5), title="Temporal t-SNE of latent features", return_as_pil = True, ) return image, df_show, tsne_image