Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
# Portions Copyright (c) Meta Platforms, Inc. and affiliates. | |
# All rights reserved. | |
# This source code is licensed under the license found in the | |
# LICENSE file in the root directory of this source tree. | |
import math | |
import torch | |
import torch.nn as nn | |
import torchaudio | |
import logging | |
from .models.multimodal_preprocessors import SimpleTokenizer | |
from PIL import Image | |
from pytorchvideo import transforms as pv_transforms | |
from pytorchvideo.data.clip_sampling import ConstantClipsPerVideoSampler | |
from pytorchvideo.data.encoded_video import EncodedVideo | |
from torchvision import transforms | |
from torchvision.transforms._transforms_video import NormalizeVideo | |
DEFAULT_AUDIO_FRAME_SHIFT_MS = 10 # in milliseconds | |
BPE_PATH = "bpe/bpe_simple_vocab_16e6.txt.gz" | |
def waveform2melspec(waveform, sample_rate, num_mel_bins, target_length): | |
# Based on https://github.com/YuanGongND/ast/blob/d7d8b4b8e06cdaeb6c843cdb38794c1c7692234c/src/dataloader.py#L102 | |
waveform -= waveform.mean() | |
fbank = torchaudio.compliance.kaldi.fbank( | |
waveform, | |
htk_compat=True, | |
sample_frequency=sample_rate, | |
use_energy=False, | |
window_type="hanning", | |
num_mel_bins=num_mel_bins, | |
dither=0.0, | |
frame_length=25, | |
frame_shift=DEFAULT_AUDIO_FRAME_SHIFT_MS, | |
) | |
# Convert to [mel_bins, num_frames] shape | |
fbank = fbank.transpose(0, 1) | |
# Pad to target_length | |
n_frames = fbank.size(1) | |
p = target_length - n_frames | |
# if p is too large (say >20%), flash a warning | |
if abs(p) / n_frames > 0.2: | |
logging.warning( | |
"Large gap between audio n_frames(%d) and " | |
"target_length (%d). Is the audio_target_length " | |
"setting correct?", | |
n_frames, | |
target_length, | |
) | |
# cut and pad | |
if p > 0: | |
fbank = torch.nn.functional.pad(fbank, (0, p), mode="constant", value=0) | |
elif p < 0: | |
fbank = fbank[:, 0:target_length] | |
# Convert to [1, mel_bins, num_frames] shape, essentially like a 1 | |
# channel image | |
fbank = fbank.unsqueeze(0) | |
return fbank | |
def get_clip_timepoints(clip_sampler, duration): | |
# Read out all clips in this video | |
all_clips_timepoints = [] | |
is_last_clip = False | |
end = 0.0 | |
while not is_last_clip: | |
start, end, _, _, is_last_clip = clip_sampler(end, duration, annotation=None) | |
all_clips_timepoints.append((start, end)) | |
return all_clips_timepoints | |
def load_and_transform_vision_data(image_paths, device): | |
if image_paths is None: | |
return None | |
image_ouputs = [] | |
for image_path in image_paths: | |
data_transform = transforms.Compose( | |
[ | |
transforms.Resize( | |
224, interpolation=transforms.InterpolationMode.BICUBIC | |
), | |
transforms.CenterCrop(224), | |
transforms.ToTensor(), | |
transforms.Normalize( | |
mean=(0.48145466, 0.4578275, 0.40821073), | |
std=(0.26862954, 0.26130258, 0.27577711), | |
), | |
] | |
) | |
with open(image_path, "rb") as fopen: | |
image = Image.open(fopen).convert("RGB") | |
image = data_transform(image).to(device) | |
image_ouputs.append(image) | |
return torch.stack(image_ouputs, dim=0) | |
def load_and_transform_thermal_data(thermal_paths, device): | |
if thermal_paths is None: | |
return None | |
thermal_ouputs = [] | |
for thermal_path in thermal_paths: | |
data_transform = transforms.Compose( | |
[ | |
transforms.Resize( | |
224, interpolation=transforms.InterpolationMode.BICUBIC | |
), | |
transforms.CenterCrop(224), | |
transforms.ToTensor(), | |
] | |
) | |
with open(thermal_path, "rb") as fopen: | |
thermal = Image.open(fopen).convert("L") | |
thermal = data_transform(thermal).to(device) | |
thermal_ouputs.append(thermal) | |
return torch.stack(thermal_ouputs, dim=0) | |
def load_and_transform_text(text, device): | |
if text is None: | |
return None | |
tokenizer = SimpleTokenizer(bpe_path=BPE_PATH) | |
tokens = [tokenizer(t).unsqueeze(0).to(device) for t in text] | |
tokens = torch.cat(tokens, dim=0) | |
return tokens | |
def load_and_transform_audio_data( | |
audio_paths, | |
device, | |
num_mel_bins=128, | |
target_length=204, | |
sample_rate=16000, | |
clip_duration=2, | |
clips_per_video=3, | |
mean=-4.268, | |
std=9.138, | |
): | |
if audio_paths is None: | |
return None | |
audio_outputs = [] | |
clip_sampler = ConstantClipsPerVideoSampler( | |
clip_duration=clip_duration, clips_per_video=clips_per_video | |
) | |
for audio_path in audio_paths: | |
waveform, sr = torchaudio.load(audio_path) | |
if sample_rate != sr: | |
waveform = torchaudio.functional.resample( | |
waveform, orig_freq=sr, new_freq=sample_rate | |
) | |
all_clips_timepoints = get_clip_timepoints( | |
clip_sampler, waveform.size(1) / sample_rate | |
) | |
all_clips = [] | |
for clip_timepoints in all_clips_timepoints: | |
waveform_clip = waveform[ | |
:, | |
int(clip_timepoints[0] * sample_rate) : int( | |
clip_timepoints[1] * sample_rate | |
), | |
] | |
waveform_melspec = waveform2melspec( | |
waveform_clip, sample_rate, num_mel_bins, target_length | |
) | |
all_clips.append(waveform_melspec) | |
normalize = transforms.Normalize(mean=mean, std=std) | |
all_clips = [normalize(ac).to(device) for ac in all_clips] | |
all_clips = torch.stack(all_clips, dim=0) | |
audio_outputs.append(all_clips) | |
return torch.stack(audio_outputs, dim=0) | |
def get_clip_timepoints(clip_sampler, duration): | |
# Read out all clips in this video | |
all_clips_timepoints = [] | |
is_last_clip = False | |
end = 0.0 | |
while not is_last_clip: | |
start, end, _, _, is_last_clip = clip_sampler(end, duration, annotation=None) | |
all_clips_timepoints.append((start, end)) | |
return all_clips_timepoints | |
def crop_boxes(boxes, x_offset, y_offset): | |
""" | |
Peform crop on the bounding boxes given the offsets. | |
Args: | |
boxes (ndarray or None): bounding boxes to peform crop. The dimension | |
is `num boxes` x 4. | |
x_offset (int): cropping offset in the x axis. | |
y_offset (int): cropping offset in the y axis. | |
Returns: | |
cropped_boxes (ndarray or None): the cropped boxes with dimension of | |
`num boxes` x 4. | |
""" | |
cropped_boxes = boxes.copy() | |
cropped_boxes[:, [0, 2]] = boxes[:, [0, 2]] - x_offset | |
cropped_boxes[:, [1, 3]] = boxes[:, [1, 3]] - y_offset | |
return cropped_boxes | |
def uniform_crop(images, size, spatial_idx, boxes=None, scale_size=None): | |
""" | |
Perform uniform spatial sampling on the images and corresponding boxes. | |
Args: | |
images (tensor): images to perform uniform crop. The dimension is | |
`num frames` x `channel` x `height` x `width`. | |
size (int): size of height and weight to crop the images. | |
spatial_idx (int): 0, 1, or 2 for left, center, and right crop if width | |
is larger than height. Or 0, 1, or 2 for top, center, and bottom | |
crop if height is larger than width. | |
boxes (ndarray or None): optional. Corresponding boxes to images. | |
Dimension is `num boxes` x 4. | |
scale_size (int): optinal. If not None, resize the images to scale_size before | |
performing any crop. | |
Returns: | |
cropped (tensor): images with dimension of | |
`num frames` x `channel` x `size` x `size`. | |
cropped_boxes (ndarray or None): the cropped boxes with dimension of | |
`num boxes` x 4. | |
""" | |
assert spatial_idx in [0, 1, 2] | |
ndim = len(images.shape) | |
if ndim == 3: | |
images = images.unsqueeze(0) | |
height = images.shape[2] | |
width = images.shape[3] | |
if scale_size is not None: | |
if width <= height: | |
width, height = scale_size, int(height / width * scale_size) | |
else: | |
width, height = int(width / height * scale_size), scale_size | |
images = torch.nn.functional.interpolate( | |
images, | |
size=(height, width), | |
mode="bilinear", | |
align_corners=False, | |
) | |
y_offset = int(math.ceil((height - size) / 2)) | |
x_offset = int(math.ceil((width - size) / 2)) | |
if height > width: | |
if spatial_idx == 0: | |
y_offset = 0 | |
elif spatial_idx == 2: | |
y_offset = height - size | |
else: | |
if spatial_idx == 0: | |
x_offset = 0 | |
elif spatial_idx == 2: | |
x_offset = width - size | |
cropped = images[:, :, y_offset : y_offset + size, x_offset : x_offset + size] | |
cropped_boxes = crop_boxes(boxes, x_offset, y_offset) if boxes is not None else None | |
if ndim == 3: | |
cropped = cropped.squeeze(0) | |
return cropped, cropped_boxes | |
class SpatialCrop(nn.Module): | |
""" | |
Convert the video into 3 smaller clips spatially. Must be used after the | |
temporal crops to get spatial crops, and should be used with | |
-2 in the spatial crop at the slowfast augmentation stage (so full | |
frames are passed in here). Will return a larger list with the | |
3x spatial crops as well. | |
""" | |
def __init__(self, crop_size: int = 224, num_crops: int = 3): | |
super().__init__() | |
self.crop_size = crop_size | |
if num_crops == 3: | |
self.crops_to_ext = [0, 1, 2] | |
self.flipped_crops_to_ext = [] | |
elif num_crops == 1: | |
self.crops_to_ext = [1] | |
self.flipped_crops_to_ext = [] | |
else: | |
raise NotImplementedError("Nothing else supported yet") | |
def forward(self, videos): | |
""" | |
Args: | |
videos: A list of C, T, H, W videos. | |
Returns: | |
videos: A list with 3x the number of elements. Each video converted | |
to C, T, H', W' by spatial cropping. | |
""" | |
assert isinstance(videos, list), "Must be a list of videos after temporal crops" | |
assert all([video.ndim == 4 for video in videos]), "Must be (C,T,H,W)" | |
res = [] | |
for video in videos: | |
for spatial_idx in self.crops_to_ext: | |
res.append(uniform_crop(video, self.crop_size, spatial_idx)[0]) | |
if not self.flipped_crops_to_ext: | |
continue | |
flipped_video = transforms.functional.hflip(video) | |
for spatial_idx in self.flipped_crops_to_ext: | |
res.append(uniform_crop(flipped_video, self.crop_size, spatial_idx)[0]) | |
return res | |
def load_and_transform_video_data( | |
video_paths, | |
device, | |
clip_duration=2, | |
clips_per_video=5, | |
sample_rate=16000, | |
): | |
if video_paths is None: | |
return None | |
video_outputs = [] | |
video_transform = transforms.Compose( | |
[ | |
pv_transforms.ShortSideScale(224), | |
NormalizeVideo( | |
mean=(0.48145466, 0.4578275, 0.40821073), | |
std=(0.26862954, 0.26130258, 0.27577711), | |
), | |
] | |
) | |
clip_sampler = ConstantClipsPerVideoSampler( | |
clip_duration=clip_duration, clips_per_video=clips_per_video | |
) | |
frame_sampler = pv_transforms.UniformTemporalSubsample(num_samples=clip_duration) | |
for video_path in video_paths: | |
video = EncodedVideo.from_path( | |
video_path, | |
decoder="decord", | |
decode_audio=False, | |
**{"sample_rate": sample_rate}, | |
) | |
all_clips_timepoints = get_clip_timepoints(clip_sampler, video.duration) | |
all_video = [] | |
for clip_timepoints in all_clips_timepoints: | |
# Read the clip, get frames | |
clip = video.get_clip(clip_timepoints[0], clip_timepoints[1]) | |
if clip is None: | |
raise ValueError("No clip found") | |
video_clip = frame_sampler(clip["video"]) | |
video_clip = video_clip / 255.0 # since this is float, need 0-1 | |
all_video.append(video_clip) | |
all_video = [video_transform(clip) for clip in all_video] | |
all_video = SpatialCrop(224, num_crops=3)(all_video) | |
all_video = torch.stack(all_video, dim=0) | |
video_outputs.append(all_video) | |
return torch.stack(video_outputs, dim=0).to(device) | |