|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import unittest |
|
|
|
import numpy as np |
|
from huggingface_hub import hf_hub_download |
|
|
|
from transformers import is_torch_available, is_vision_available |
|
from transformers.image_processing_utils import get_size_dict |
|
from transformers.image_utils import SizeDict |
|
from transformers.processing_utils import VideosKwargs |
|
from transformers.testing_utils import ( |
|
require_av, |
|
require_cv2, |
|
require_decord, |
|
require_torch, |
|
require_torchvision, |
|
require_vision, |
|
) |
|
from transformers.video_utils import group_videos_by_shape, make_batched_videos, reorder_videos |
|
|
|
|
|
if is_torch_available(): |
|
import torch |
|
|
|
if is_vision_available(): |
|
import PIL |
|
|
|
from transformers import BaseVideoProcessor |
|
from transformers.video_utils import VideoMetadata, load_video |
|
|
|
|
|
def get_random_video(height, width, num_frames=8, return_torch=False): |
|
random_frame = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8) |
|
video = np.array(([random_frame] * num_frames)) |
|
if return_torch: |
|
|
|
return torch.from_numpy(video).permute(0, 3, 1, 2) |
|
return video |
|
|
|
|
|
@require_vision |
|
@require_torchvision |
|
class BaseVideoProcessorTester(unittest.TestCase): |
|
""" |
|
Tests that the `transforms` can be applied to a 4-dim array directly, i.e. to a whole video. |
|
""" |
|
|
|
def test_make_batched_videos_pil(self): |
|
|
|
video = get_random_video(16, 32) |
|
pil_image = PIL.Image.fromarray(video[0]) |
|
videos_list = make_batched_videos(pil_image) |
|
self.assertIsInstance(videos_list, list) |
|
self.assertIsInstance(videos_list[0], np.ndarray) |
|
self.assertEqual(videos_list[0].shape, (1, 16, 32, 3)) |
|
self.assertTrue(np.array_equal(videos_list[0][0], np.array(pil_image))) |
|
|
|
|
|
video = get_random_video(16, 32) |
|
pil_video = [PIL.Image.fromarray(frame) for frame in video] |
|
videos_list = make_batched_videos(pil_video) |
|
self.assertIsInstance(videos_list, list) |
|
self.assertIsInstance(videos_list[0], np.ndarray) |
|
self.assertEqual(videos_list[0].shape, (8, 16, 32, 3)) |
|
self.assertTrue(np.array_equal(videos_list[0], video)) |
|
|
|
|
|
video = get_random_video(16, 32) |
|
pil_video = [PIL.Image.fromarray(frame) for frame in video] |
|
videos = [pil_video, pil_video] |
|
videos_list = make_batched_videos(videos) |
|
self.assertIsInstance(videos_list, list) |
|
self.assertIsInstance(videos_list[0], np.ndarray) |
|
self.assertEqual(videos_list[0].shape, (8, 16, 32, 3)) |
|
self.assertTrue(np.array_equal(videos_list[0], video)) |
|
|
|
def test_make_batched_videos_numpy(self): |
|
|
|
video = get_random_video(16, 32)[0] |
|
videos_list = make_batched_videos(video) |
|
self.assertIsInstance(videos_list, list) |
|
self.assertIsInstance(videos_list[0], np.ndarray) |
|
self.assertEqual(videos_list[0].shape, (1, 16, 32, 3)) |
|
self.assertTrue(np.array_equal(videos_list[0][0], video)) |
|
|
|
|
|
video = get_random_video(16, 32) |
|
videos_list = make_batched_videos(video) |
|
self.assertIsInstance(videos_list, list) |
|
self.assertIsInstance(videos_list[0], np.ndarray) |
|
self.assertEqual(videos_list[0].shape, (8, 16, 32, 3)) |
|
self.assertTrue(np.array_equal(videos_list[0], video)) |
|
|
|
|
|
video = get_random_video(16, 32) |
|
videos = [video, video] |
|
videos_list = make_batched_videos(videos) |
|
self.assertIsInstance(videos_list, list) |
|
self.assertIsInstance(videos_list[0], np.ndarray) |
|
self.assertEqual(videos_list[0].shape, (8, 16, 32, 3)) |
|
self.assertTrue(np.array_equal(videos_list[0], video)) |
|
|
|
@require_torch |
|
def test_make_batched_videos_torch(self): |
|
|
|
video = get_random_video(16, 32)[0] |
|
torch_video = torch.from_numpy(video) |
|
videos_list = make_batched_videos(torch_video) |
|
self.assertIsInstance(videos_list, list) |
|
self.assertIsInstance(videos_list[0], np.ndarray) |
|
self.assertEqual(videos_list[0].shape, (1, 16, 32, 3)) |
|
self.assertTrue(np.array_equal(videos_list[0][0], video)) |
|
|
|
|
|
video = get_random_video(16, 32) |
|
torch_video = torch.from_numpy(video) |
|
videos_list = make_batched_videos(torch_video) |
|
self.assertIsInstance(videos_list, list) |
|
self.assertIsInstance(videos_list[0], torch.Tensor) |
|
self.assertEqual(videos_list[0].shape, (8, 16, 32, 3)) |
|
self.assertTrue(np.array_equal(videos_list[0], video)) |
|
|
|
|
|
video = get_random_video(16, 32) |
|
torch_video = torch.from_numpy(video) |
|
videos = [torch_video, torch_video] |
|
videos_list = make_batched_videos(videos) |
|
self.assertIsInstance(videos_list, list) |
|
self.assertIsInstance(videos_list[0], torch.Tensor) |
|
self.assertEqual(videos_list[0].shape, (8, 16, 32, 3)) |
|
self.assertTrue(np.array_equal(videos_list[0], video)) |
|
|
|
def test_resize(self): |
|
video_processor = BaseVideoProcessor(model_init_kwargs=VideosKwargs) |
|
video = get_random_video(16, 32, return_torch=True) |
|
|
|
|
|
size_dict = SizeDict(**get_size_dict((8, 8), param_name="size")) |
|
resized_video = video_processor.resize(video, size=size_dict) |
|
self.assertIsInstance(resized_video, torch.Tensor) |
|
self.assertEqual(resized_video.shape, (8, 3, 8, 8)) |
|
|
|
def test_normalize(self): |
|
video_processor = BaseVideoProcessor(model_init_kwargs=VideosKwargs) |
|
array = torch.randn(4, 3, 16, 32) |
|
mean = [0.1, 0.5, 0.9] |
|
std = [0.2, 0.4, 0.6] |
|
|
|
|
|
expected = (array - torch.tensor(mean)[:, None, None]) / torch.tensor(std)[:, None, None] |
|
normalized_array = video_processor.normalize(array, mean, std) |
|
torch.testing.assert_close(normalized_array, expected) |
|
|
|
def test_center_crop(self): |
|
video_processor = BaseVideoProcessor(model_init_kwargs=VideosKwargs) |
|
video = get_random_video(16, 32, return_torch=True) |
|
|
|
|
|
crop_sizes = [8, (8, 64), 20, (32, 64)] |
|
for size in crop_sizes: |
|
size_dict = SizeDict(**get_size_dict(size, default_to_square=True, param_name="crop_size")) |
|
cropped_video = video_processor.center_crop(video, size_dict) |
|
self.assertIsInstance(cropped_video, torch.Tensor) |
|
|
|
expected_size = (size, size) if isinstance(size, int) else size |
|
self.assertEqual(cropped_video.shape, (8, 3, *expected_size)) |
|
|
|
def test_convert_to_rgb(self): |
|
video_processor = BaseVideoProcessor(model_init_kwargs=VideosKwargs) |
|
video = get_random_video(20, 20, return_torch=True) |
|
|
|
rgb_video = video_processor.convert_to_rgb(video[:, :1]) |
|
self.assertEqual(rgb_video.shape, (8, 3, 20, 20)) |
|
|
|
rgb_video = video_processor.convert_to_rgb(torch.cat([video, video[:, :1]], dim=1)) |
|
self.assertEqual(rgb_video.shape, (8, 3, 20, 20)) |
|
|
|
def test_group_and_reorder_videos(self): |
|
"""Tests that videos can be grouped by frame size and number of frames""" |
|
video_1 = get_random_video(20, 20, num_frames=3, return_torch=True) |
|
video_2 = get_random_video(20, 20, num_frames=5, return_torch=True) |
|
|
|
|
|
grouped_videos, grouped_videos_index = group_videos_by_shape([video_1, video_2]) |
|
self.assertEqual(len(grouped_videos), 2) |
|
|
|
regrouped_videos = reorder_videos(grouped_videos, grouped_videos_index) |
|
self.assertTrue(len(regrouped_videos), 2) |
|
self.assertEqual(video_1.shape, regrouped_videos[0].shape) |
|
|
|
|
|
video_3 = get_random_video(15, 20, num_frames=3, return_torch=True) |
|
grouped_videos, grouped_videos_index = group_videos_by_shape([video_1, video_3]) |
|
self.assertEqual(len(grouped_videos), 2) |
|
|
|
regrouped_videos = reorder_videos(grouped_videos, grouped_videos_index) |
|
self.assertTrue(len(regrouped_videos), 2) |
|
self.assertEqual(video_1.shape, regrouped_videos[0].shape) |
|
|
|
|
|
|
|
grouped_videos, grouped_videos_index = group_videos_by_shape([video_1, video_2, video_3]) |
|
self.assertEqual(len(grouped_videos), 3) |
|
|
|
regrouped_videos = reorder_videos(grouped_videos, grouped_videos_index) |
|
self.assertTrue(len(regrouped_videos), 3) |
|
self.assertEqual(video_1.shape, regrouped_videos[0].shape) |
|
|
|
|
|
grouped_videos, grouped_videos_index = group_videos_by_shape([video_1, video_1, video_3]) |
|
self.assertEqual(len(grouped_videos), 2) |
|
|
|
regrouped_videos = reorder_videos(grouped_videos, grouped_videos_index) |
|
self.assertTrue(len(regrouped_videos), 2) |
|
self.assertEqual(video_1.shape, regrouped_videos[0].shape) |
|
|
|
|
|
grouped_videos, grouped_videos_index = group_videos_by_shape([video_1, video_1, video_1]) |
|
self.assertEqual(len(grouped_videos), 1) |
|
|
|
regrouped_videos = reorder_videos(grouped_videos, grouped_videos_index) |
|
self.assertTrue(len(regrouped_videos), 1) |
|
self.assertEqual(video_1.shape, regrouped_videos[0].shape) |
|
|
|
|
|
@require_vision |
|
@require_av |
|
class LoadVideoTester(unittest.TestCase): |
|
def test_load_video_url(self): |
|
video, _ = load_video( |
|
"https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4", |
|
) |
|
self.assertEqual(video.shape, (243, 360, 640, 3)) |
|
|
|
def test_load_video_local(self): |
|
video_file_path = hf_hub_download( |
|
repo_id="raushan-testing-hf/videos-test", filename="sample_demo_1.mp4", repo_type="dataset" |
|
) |
|
video, _ = load_video(video_file_path) |
|
self.assertEqual(video.shape, (243, 360, 640, 3)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@require_decord |
|
@require_torchvision |
|
@require_cv2 |
|
def test_load_video_backend_url(self): |
|
video, _ = load_video( |
|
"https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4", |
|
backend="decord", |
|
) |
|
self.assertEqual(video.shape, (243, 360, 640, 3)) |
|
|
|
|
|
with self.assertRaises(ValueError): |
|
video, _ = load_video( |
|
"https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4", |
|
backend="opencv", |
|
) |
|
with self.assertRaises(ValueError): |
|
video, _ = load_video( |
|
"https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4", |
|
backend="torchvision", |
|
) |
|
|
|
@require_decord |
|
@require_torchvision |
|
@require_cv2 |
|
def test_load_video_backend_local(self): |
|
video_file_path = hf_hub_download( |
|
repo_id="raushan-testing-hf/videos-test", filename="sample_demo_1.mp4", repo_type="dataset" |
|
) |
|
video, metadata = load_video(video_file_path, backend="decord") |
|
self.assertEqual(video.shape, (243, 360, 640, 3)) |
|
self.assertIsInstance(metadata, VideoMetadata) |
|
|
|
video, metadata = load_video(video_file_path, backend="opencv") |
|
self.assertEqual(video.shape, (243, 360, 640, 3)) |
|
self.assertIsInstance(metadata, VideoMetadata) |
|
|
|
video, metadata = load_video(video_file_path, backend="torchvision") |
|
self.assertEqual(video.shape, (243, 360, 640, 3)) |
|
self.assertIsInstance(metadata, VideoMetadata) |
|
|
|
def test_load_video_num_frames(self): |
|
video, _ = load_video( |
|
"https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4", |
|
num_frames=16, |
|
) |
|
self.assertEqual(video.shape, (16, 360, 640, 3)) |
|
|
|
video, _ = load_video( |
|
"https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4", |
|
num_frames=22, |
|
) |
|
self.assertEqual(video.shape, (22, 360, 640, 3)) |
|
|
|
def test_load_video_fps(self): |
|
video, _ = load_video( |
|
"https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4", fps=1 |
|
) |
|
self.assertEqual(video.shape, (9, 360, 640, 3)) |
|
|
|
video, _ = load_video( |
|
"https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4", fps=2 |
|
) |
|
self.assertEqual(video.shape, (19, 360, 640, 3)) |
|
|
|
|
|
with self.assertRaises(ValueError): |
|
video, _ = load_video( |
|
"https://huggingface.co/datasets/raushan-testing-hf/videos-test/resolve/main/sample_demo_1.mp4", |
|
fps=1, |
|
num_frames=10, |
|
) |
|
|