|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import inspect |
|
import json |
|
import random |
|
import tempfile |
|
from pathlib import Path |
|
from typing import Optional |
|
|
|
import numpy as np |
|
from huggingface_hub import hf_hub_download |
|
from parameterized import parameterized |
|
|
|
from transformers.models.auto.processing_auto import processor_class_from_name |
|
from transformers.processing_utils import Unpack |
|
from transformers.testing_utils import ( |
|
check_json_file_has_correct_format, |
|
require_av, |
|
require_librosa, |
|
require_torch, |
|
require_vision, |
|
) |
|
from transformers.utils import is_torch_available, is_vision_available |
|
|
|
|
|
global_rng = random.Random() |
|
|
|
if is_vision_available(): |
|
from PIL import Image |
|
|
|
if is_torch_available(): |
|
import torch |
|
|
|
|
|
MODALITY_INPUT_DATA = { |
|
"images": [ |
|
"http://images.cocodataset.org/val2017/000000039769.jpg", |
|
"http://images.cocodataset.org/val2017/000000039769.jpg", |
|
], |
|
"videos": [ |
|
"https://test-videos.co.uk/vids/bigbuckbunny/mp4/h264/720/Big_Buck_Bunny_720_10s_10MB.mp4", |
|
["https://www.ilankelman.org/stopsigns/australia.jpg", "https://www.ilankelman.org/stopsigns/australia.jpg"], |
|
], |
|
"audio": [ |
|
"https://qianwen-res.oss-cn-beijing.aliyuncs.com/Qwen2-Audio/audio/glass-breaking-151256.mp3", |
|
"https://qianwen-res.oss-cn-beijing.aliyuncs.com/Qwen2-Audio/audio/f2641_0_throatclearing.wav", |
|
], |
|
} |
|
|
|
|
|
def prepare_image_inputs(): |
|
"""This function prepares a list of PIL images""" |
|
image_inputs = [np.random.randint(255, size=(3, 30, 400), dtype=np.uint8)] |
|
image_inputs = [Image.fromarray(np.moveaxis(x, 0, -1)) for x in image_inputs] |
|
return image_inputs |
|
|
|
|
|
|
|
def floats_list(shape, scale=1.0, rng=None, name=None): |
|
"""Creates a random float32 tensor""" |
|
if rng is None: |
|
rng = global_rng |
|
|
|
values = [] |
|
for batch_idx in range(shape[0]): |
|
values.append([]) |
|
for _ in range(shape[1]): |
|
values[-1].append(rng.random() * scale) |
|
|
|
return values |
|
|
|
|
|
@require_torch |
|
@require_vision |
|
class ProcessorTesterMixin: |
|
processor_class = None |
|
text_input_name = "input_ids" |
|
images_input_name = "pixel_values" |
|
videos_input_name = "pixel_values_videos" |
|
audio_input_name = "input_features" |
|
|
|
@staticmethod |
|
def prepare_processor_dict(): |
|
return {} |
|
|
|
def get_component(self, attribute, **kwargs): |
|
assert attribute in self.processor_class.attributes |
|
component_class_name = getattr(self.processor_class, f"{attribute}_class") |
|
if isinstance(component_class_name, tuple): |
|
component_class_name = component_class_name[0] |
|
|
|
component_class = processor_class_from_name(component_class_name) |
|
component = component_class.from_pretrained(self.tmpdirname, **kwargs) |
|
if "tokenizer" in attribute and not component.pad_token: |
|
component.pad_token = "[TEST_PAD]" |
|
if component.pad_token_id is None: |
|
component.pad_token_id = 0 |
|
|
|
return component |
|
|
|
def prepare_components(self): |
|
components = {} |
|
for attribute in self.processor_class.attributes: |
|
component = self.get_component(attribute) |
|
components[attribute] = component |
|
|
|
return components |
|
|
|
def get_processor(self): |
|
components = self.prepare_components() |
|
processor = self.processor_class(**components, **self.prepare_processor_dict()) |
|
return processor |
|
|
|
def prepare_text_inputs(self, batch_size: Optional[int] = None, modality: Optional[str] = None): |
|
if modality is not None: |
|
special_token_to_add = getattr(self, f"{modality}_token", "") |
|
else: |
|
special_token_to_add = "" |
|
|
|
if batch_size is None: |
|
return f"lower newer {special_token_to_add}" |
|
|
|
if batch_size < 1: |
|
raise ValueError("batch_size must be greater than 0") |
|
|
|
if batch_size == 1: |
|
return [f"lower newer {special_token_to_add}"] |
|
return [f"lower newer {special_token_to_add}", f" {special_token_to_add} upper older longer string"] + [ |
|
f"lower newer {special_token_to_add}" |
|
] * (batch_size - 2) |
|
|
|
@require_vision |
|
def prepare_image_inputs(self, batch_size: Optional[int] = None): |
|
"""This function prepares a list of PIL images for testing""" |
|
if batch_size is None: |
|
return prepare_image_inputs()[0] |
|
if batch_size < 1: |
|
raise ValueError("batch_size must be greater than 0") |
|
return prepare_image_inputs() * batch_size |
|
|
|
@require_vision |
|
def prepare_video_inputs(self, batch_size: Optional[int] = None): |
|
"""This function prepares a list of numpy videos.""" |
|
video_input = [np.random.randint(255, size=(3, 30, 400), dtype=np.uint8)] * 8 |
|
if batch_size is None: |
|
return video_input |
|
return [video_input] * batch_size |
|
|
|
def test_processor_to_json_string(self): |
|
processor = self.get_processor() |
|
obj = json.loads(processor.to_json_string()) |
|
for key, value in self.prepare_processor_dict().items(): |
|
|
|
if key not in "chat_template": |
|
|
|
if ( |
|
isinstance(obj[key], dict) |
|
and isinstance(list(obj[key].keys())[0], str) |
|
and isinstance(list(value.keys())[0], int) |
|
): |
|
obj[key] = {int(k): v for k, v in obj[key].items()} |
|
self.assertEqual(obj[key], value) |
|
self.assertEqual(getattr(processor, key, None), value) |
|
|
|
def test_processor_from_and_save_pretrained(self): |
|
processor_first = self.get_processor() |
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname: |
|
saved_files = processor_first.save_pretrained(tmpdirname) |
|
if len(saved_files) > 0: |
|
check_json_file_has_correct_format(saved_files[0]) |
|
processor_second = self.processor_class.from_pretrained(tmpdirname) |
|
|
|
self.assertEqual(processor_second.to_dict(), processor_first.to_dict()) |
|
|
|
for attribute in processor_first.attributes: |
|
attribute_first = getattr(processor_first, attribute) |
|
attribute_second = getattr(processor_second, attribute) |
|
|
|
|
|
if "tokenizer" not in attribute: |
|
self.assertEqual(repr(attribute_first), repr(attribute_second)) |
|
|
|
|
|
|
|
|
|
def skip_processor_without_typed_kwargs(self, processor): |
|
|
|
|
|
is_kwargs_typed_dict = False |
|
call_signature = inspect.signature(processor.__call__) |
|
for param in call_signature.parameters.values(): |
|
if param.kind == param.VAR_KEYWORD and param.annotation != param.empty: |
|
is_kwargs_typed_dict = ( |
|
hasattr(param.annotation, "__origin__") and param.annotation.__origin__ == Unpack |
|
) |
|
if not is_kwargs_typed_dict: |
|
self.skipTest(f"{self.processor_class} doesn't have typed kwargs.") |
|
|
|
def test_tokenizer_defaults_preserved_by_kwargs(self): |
|
if "image_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"image_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_components["tokenizer"] = self.get_component("tokenizer", max_length=117, padding="max_length") |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
input_str = self.prepare_text_inputs(modality="image") |
|
image_input = self.prepare_image_inputs() |
|
inputs = processor(text=input_str, images=image_input, return_tensors="pt") |
|
self.assertEqual(inputs[self.text_input_name].shape[-1], 117) |
|
|
|
def test_image_processor_defaults_preserved_by_image_kwargs(self): |
|
""" |
|
We use do_rescale=True, rescale_factor=-1 to ensure that image_processor kwargs are preserved in the processor. |
|
We then check that the mean of the pixel_values is less than or equal to 0 after processing. |
|
Since the original pixel_values are in [0, 255], this is a good indicator that the rescale_factor is indeed applied. |
|
""" |
|
if "image_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"image_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_components["image_processor"] = self.get_component( |
|
"image_processor", do_rescale=True, rescale_factor=-1 |
|
) |
|
processor_components["tokenizer"] = self.get_component("tokenizer", max_length=117, padding="max_length") |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(modality="image") |
|
image_input = self.prepare_image_inputs() |
|
|
|
inputs = processor(text=input_str, images=image_input, return_tensors="pt") |
|
self.assertLessEqual(inputs[self.images_input_name][0][0].mean(), 0) |
|
|
|
def test_kwargs_overrides_default_tokenizer_kwargs(self): |
|
if "image_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"image_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_components["tokenizer"] = self.get_component("tokenizer", padding="longest") |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
input_str = self.prepare_text_inputs(modality="image") |
|
image_input = self.prepare_image_inputs() |
|
inputs = processor( |
|
text=input_str, images=image_input, return_tensors="pt", max_length=112, padding="max_length" |
|
) |
|
self.assertEqual(inputs[self.text_input_name].shape[-1], 112) |
|
|
|
def test_kwargs_overrides_default_image_processor_kwargs(self): |
|
if "image_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"image_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_components["image_processor"] = self.get_component( |
|
"image_processor", do_rescale=True, rescale_factor=1 |
|
) |
|
processor_components["tokenizer"] = self.get_component("tokenizer", max_length=117, padding="max_length") |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(modality="image") |
|
image_input = self.prepare_image_inputs() |
|
|
|
inputs = processor(text=input_str, images=image_input, do_rescale=True, rescale_factor=-1, return_tensors="pt") |
|
self.assertLessEqual(inputs[self.images_input_name][0][0].mean(), 0) |
|
|
|
def test_unstructured_kwargs(self): |
|
if "image_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"image_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_kwargs = self.prepare_processor_dict() |
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(modality="image") |
|
image_input = self.prepare_image_inputs() |
|
inputs = processor( |
|
text=input_str, |
|
images=image_input, |
|
return_tensors="pt", |
|
do_rescale=True, |
|
rescale_factor=-1, |
|
padding="max_length", |
|
max_length=76, |
|
) |
|
|
|
self.assertLessEqual(inputs[self.images_input_name][0][0].mean(), 0) |
|
self.assertEqual(inputs[self.text_input_name].shape[-1], 76) |
|
|
|
def test_unstructured_kwargs_batched(self): |
|
if "image_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"image_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_kwargs = self.prepare_processor_dict() |
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(batch_size=2, modality="image") |
|
image_input = self.prepare_image_inputs(batch_size=2) |
|
inputs = processor( |
|
text=input_str, |
|
images=image_input, |
|
return_tensors="pt", |
|
do_rescale=True, |
|
rescale_factor=-1, |
|
padding="longest", |
|
max_length=76, |
|
) |
|
|
|
self.assertLessEqual(inputs[self.images_input_name][0][0].mean(), 0) |
|
self.assertTrue( |
|
len(inputs[self.text_input_name][0]) == len(inputs[self.text_input_name][1]) |
|
and len(inputs[self.text_input_name][1]) < 76 |
|
) |
|
|
|
def test_doubly_passed_kwargs(self): |
|
if "image_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"image_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_kwargs = self.prepare_processor_dict() |
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = [self.prepare_text_inputs(modality="image")] |
|
image_input = self.prepare_image_inputs() |
|
with self.assertRaises(ValueError): |
|
_ = processor( |
|
text=input_str, |
|
images=image_input, |
|
images_kwargs={"do_rescale": True, "rescale_factor": -1}, |
|
do_rescale=True, |
|
return_tensors="pt", |
|
) |
|
|
|
def test_structured_kwargs_nested(self): |
|
if "image_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"image_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_kwargs = self.prepare_processor_dict() |
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(modality="image") |
|
image_input = self.prepare_image_inputs() |
|
|
|
|
|
all_kwargs = { |
|
"common_kwargs": {"return_tensors": "pt"}, |
|
"images_kwargs": {"do_rescale": True, "rescale_factor": -1}, |
|
"text_kwargs": {"padding": "max_length", "max_length": 76}, |
|
} |
|
|
|
inputs = processor(text=input_str, images=image_input, **all_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
self.assertLessEqual(inputs[self.images_input_name][0][0].mean(), 0) |
|
self.assertEqual(inputs[self.text_input_name].shape[-1], 76) |
|
|
|
def test_structured_kwargs_nested_from_dict(self): |
|
if "image_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"image_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_kwargs = self.prepare_processor_dict() |
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
input_str = self.prepare_text_inputs(modality="image") |
|
image_input = self.prepare_image_inputs() |
|
|
|
|
|
all_kwargs = { |
|
"common_kwargs": {"return_tensors": "pt"}, |
|
"images_kwargs": {"do_rescale": True, "rescale_factor": -1}, |
|
"text_kwargs": {"padding": "max_length", "max_length": 76}, |
|
} |
|
|
|
inputs = processor(text=input_str, images=image_input, **all_kwargs) |
|
self.assertLessEqual(inputs[self.images_input_name][0][0].mean(), 0) |
|
self.assertEqual(inputs[self.text_input_name].shape[-1], 76) |
|
|
|
|
|
@require_torch |
|
def test_tokenizer_defaults_preserved_by_kwargs_audio(self): |
|
if "feature_extractor" not in self.processor_class.attributes: |
|
self.skipTest(f"feature_extractor attribute not present in {self.processor_class}") |
|
|
|
feature_extractor = self.get_component("feature_extractor") |
|
tokenizer = self.get_component("tokenizer", max_length=300, padding="max_length") |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(tokenizer=tokenizer, feature_extractor=feature_extractor, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(batch_size=3, modality="audio") |
|
raw_speech = floats_list((3, 1000)) |
|
raw_speech = [np.asarray(audio) for audio in raw_speech] |
|
inputs = processor(text=input_str, audio=raw_speech, return_tensors="pt") |
|
self.assertEqual(len(inputs[self.text_input_name][0]), 300) |
|
|
|
@require_torch |
|
def test_kwargs_overrides_default_tokenizer_kwargs_audio(self): |
|
if "feature_extractor" not in self.processor_class.attributes: |
|
self.skipTest(f"feature_extractor attribute not present in {self.processor_class}") |
|
|
|
feature_extractor = self.get_component("feature_extractor") |
|
tokenizer = self.get_component("tokenizer", max_length=117) |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(tokenizer=tokenizer, feature_extractor=feature_extractor, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(batch_size=3, modality="audio") |
|
raw_speech = floats_list((3, 1000)) |
|
raw_speech = [np.asarray(audio) for audio in raw_speech] |
|
inputs = processor(text=input_str, audio=raw_speech, return_tensors="pt", max_length=300, padding="max_length") |
|
|
|
self.assertEqual(len(inputs[self.text_input_name][0]), 300) |
|
|
|
@require_torch |
|
def test_unstructured_kwargs_audio(self): |
|
if "feature_extractor" not in self.processor_class.attributes: |
|
self.skipTest(f"feature_extractor attribute not present in {self.processor_class}") |
|
|
|
feature_extractor = self.get_component("feature_extractor") |
|
tokenizer = self.get_component("tokenizer") |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(tokenizer=tokenizer, feature_extractor=feature_extractor, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(batch_size=3, modality="audio") |
|
raw_speech = floats_list((3, 1000)) |
|
raw_speech = [np.asarray(audio) for audio in raw_speech] |
|
inputs = processor(text=input_str, audio=raw_speech, return_tensors="pt", max_length=300, padding="max_length") |
|
|
|
self.assertEqual(len(inputs[self.text_input_name][0]), 300) |
|
|
|
@require_torch |
|
def test_doubly_passed_kwargs_audio(self): |
|
if "feature_extractor" not in self.processor_class.attributes: |
|
self.skipTest(f"feature_extractor attribute not present in {self.processor_class}") |
|
|
|
feature_extractor = self.get_component("feature_extractor") |
|
tokenizer = self.get_component("tokenizer") |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(tokenizer=tokenizer, feature_extractor=feature_extractor, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(batch_size=3, modality="audio") |
|
raw_speech = floats_list((3, 1000)) |
|
raw_speech = [np.asarray(audio) for audio in raw_speech] |
|
with self.assertRaises(ValueError): |
|
_ = processor( |
|
text=input_str, |
|
audio=raw_speech, |
|
text_kwargs={"padding": "max_length"}, |
|
padding="max_length", |
|
) |
|
|
|
@require_torch |
|
@require_vision |
|
def test_structured_kwargs_audio_nested(self): |
|
if "feature_extractor" not in self.processor_class.attributes: |
|
self.skipTest(f"feature_extractor attribute not present in {self.processor_class}") |
|
|
|
feature_extractor = self.get_component("feature_extractor") |
|
tokenizer = self.get_component("tokenizer", max_length=117) |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(tokenizer=tokenizer, feature_extractor=feature_extractor, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(batch_size=3, modality="audio") |
|
raw_speech = floats_list((3, 1000)) |
|
raw_speech = [np.asarray(audio) for audio in raw_speech] |
|
|
|
|
|
all_kwargs = { |
|
"common_kwargs": {"return_tensors": "pt"}, |
|
"text_kwargs": {"padding": "max_length", "max_length": 76}, |
|
"audio_kwargs": {"padding": "max_length", "max_length": 300}, |
|
} |
|
|
|
inputs = processor(text=input_str, audio=raw_speech, **all_kwargs) |
|
self.assertEqual(len(inputs[self.text_input_name][0]), 76) |
|
|
|
def test_tokenizer_defaults_preserved_by_kwargs_video(self): |
|
if "video_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"video_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_components["tokenizer"] = self.get_component("tokenizer", max_length=117, padding="max_length") |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
input_str = self.prepare_text_inputs(modality="video") |
|
video_input = self.prepare_video_inputs() |
|
inputs = processor(text=input_str, videos=video_input, return_tensors="pt") |
|
self.assertEqual(inputs[self.text_input_name].shape[-1], 117) |
|
|
|
def test_video_processor_defaults_preserved_by_video_kwargs(self): |
|
""" |
|
We use do_rescale=True, rescale_factor=-1 to ensure that image_processor kwargs are preserved in the processor. |
|
We then check that the mean of the pixel_values is less than or equal to 0 after processing. |
|
Since the original pixel_values are in [0, 255], this is a good indicator that the rescale_factor is indeed applied. |
|
""" |
|
if "video_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"video_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_components["video_processor"] = self.get_component( |
|
"video_processor", do_rescale=True, rescale_factor=-1 |
|
) |
|
processor_components["tokenizer"] = self.get_component("tokenizer", max_length=117, padding="max_length") |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(modality="video") |
|
video_input = self.prepare_video_inputs() |
|
|
|
inputs = processor(text=input_str, videos=video_input, return_tensors="pt") |
|
self.assertLessEqual(inputs[self.videos_input_name][0].mean(), 0) |
|
|
|
def test_kwargs_overrides_default_tokenizer_kwargs_video(self): |
|
if "video_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"video_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_components["tokenizer"] = self.get_component("tokenizer", padding="longest") |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
input_str = self.prepare_text_inputs(modality="video") |
|
video_input = self.prepare_video_inputs() |
|
inputs = processor( |
|
text=input_str, videos=video_input, return_tensors="pt", max_length=112, padding="max_length" |
|
) |
|
self.assertEqual(inputs[self.text_input_name].shape[-1], 112) |
|
|
|
def test_kwargs_overrides_default_video_processor_kwargs(self): |
|
if "video_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"video_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_components["video_processor"] = self.get_component( |
|
"video_processor", do_rescale=True, rescale_factor=1 |
|
) |
|
processor_components["tokenizer"] = self.get_component("tokenizer", max_length=117, padding="max_length") |
|
processor_kwargs = self.prepare_processor_dict() |
|
|
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(modality="video") |
|
video_input = self.prepare_video_inputs() |
|
|
|
inputs = processor(text=input_str, videos=video_input, do_rescale=True, rescale_factor=-1, return_tensors="pt") |
|
self.assertLessEqual(inputs[self.videos_input_name][0].mean(), 0) |
|
|
|
def test_unstructured_kwargs_video(self): |
|
if "video_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"video_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_kwargs = self.prepare_processor_dict() |
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(modality="video") |
|
video_input = self.prepare_video_inputs() |
|
inputs = processor( |
|
text=input_str, |
|
videos=video_input, |
|
return_tensors="pt", |
|
do_rescale=True, |
|
rescale_factor=-1, |
|
padding="max_length", |
|
max_length=76, |
|
) |
|
|
|
self.assertLessEqual(inputs[self.videos_input_name][0].mean(), 0) |
|
self.assertEqual(inputs[self.text_input_name].shape[-1], 76) |
|
|
|
def test_unstructured_kwargs_batched_video(self): |
|
if "video_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"video_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_kwargs = self.prepare_processor_dict() |
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(batch_size=2, modality="video") |
|
video_input = self.prepare_video_inputs(batch_size=2) |
|
inputs = processor( |
|
text=input_str, |
|
videos=video_input, |
|
return_tensors="pt", |
|
do_rescale=True, |
|
rescale_factor=-1, |
|
padding="longest", |
|
max_length=76, |
|
) |
|
|
|
self.assertLessEqual(inputs[self.videos_input_name][0].mean(), 0) |
|
self.assertTrue( |
|
len(inputs[self.text_input_name][0]) == len(inputs[self.text_input_name][1]) |
|
and len(inputs[self.text_input_name][1]) < 76 |
|
) |
|
|
|
def test_doubly_passed_kwargs_video(self): |
|
if "video_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"video_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_kwargs = self.prepare_processor_dict() |
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = [self.prepare_text_inputs(modality="video")] |
|
video_input = self.prepare_video_inputs() |
|
with self.assertRaises(ValueError): |
|
_ = processor( |
|
text=input_str, |
|
videos=video_input, |
|
videos_kwargs={"do_rescale": True, "rescale_factor": -1}, |
|
do_rescale=True, |
|
return_tensors="pt", |
|
) |
|
|
|
def test_structured_kwargs_nested_video(self): |
|
if "video_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"video_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_kwargs = self.prepare_processor_dict() |
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(modality="video") |
|
video_input = self.prepare_video_inputs() |
|
|
|
|
|
all_kwargs = { |
|
"common_kwargs": {"return_tensors": "pt"}, |
|
"videos_kwargs": {"do_rescale": True, "rescale_factor": -1}, |
|
"text_kwargs": {"padding": "max_length", "max_length": 76}, |
|
} |
|
|
|
inputs = processor(text=input_str, videos=video_input, **all_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
self.assertLessEqual(inputs[self.videos_input_name][0].mean(), 0) |
|
self.assertEqual(inputs[self.text_input_name].shape[-1], 76) |
|
|
|
def test_structured_kwargs_nested_from_dict_video(self): |
|
if "video_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"video_processor attribute not present in {self.processor_class}") |
|
processor_components = self.prepare_components() |
|
processor_kwargs = self.prepare_processor_dict() |
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
input_str = self.prepare_text_inputs(modality="video") |
|
video_input = self.prepare_video_inputs() |
|
|
|
|
|
all_kwargs = { |
|
"common_kwargs": {"return_tensors": "pt"}, |
|
"videos_kwargs": {"do_rescale": True, "rescale_factor": -1}, |
|
"text_kwargs": {"padding": "max_length", "max_length": 76}, |
|
} |
|
|
|
inputs = processor(text=input_str, videos=video_input, **all_kwargs) |
|
self.assertLessEqual(inputs[self.videos_input_name][0].mean(), 0) |
|
self.assertEqual(inputs[self.text_input_name].shape[-1], 76) |
|
|
|
|
|
|
|
def test_overlapping_text_image_kwargs_handling(self): |
|
if "image_processor" not in self.processor_class.attributes: |
|
self.skipTest(f"image_processor attribute not present in {self.processor_class}") |
|
|
|
processor_components = self.prepare_components() |
|
processor = self.processor_class(**processor_components) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(modality="image") |
|
image_input = self.prepare_image_inputs() |
|
|
|
with self.assertRaises(ValueError): |
|
_ = processor( |
|
text=input_str, |
|
images=image_input, |
|
return_tensors="pt", |
|
padding="max_length", |
|
text_kwargs={"padding": "do_not_pad"}, |
|
) |
|
|
|
def test_overlapping_text_audio_kwargs_handling(self): |
|
""" |
|
Checks that `padding`, or any other overlap arg between audio extractor and tokenizer |
|
is be passed to only text and ignored for audio for BC purposes |
|
""" |
|
if "feature_extractor" not in self.processor_class.attributes: |
|
self.skipTest(f"feature_extractor attribute not present in {self.processor_class}") |
|
|
|
processor_components = self.prepare_components() |
|
processor_kwargs = self.prepare_processor_dict() |
|
processor = self.processor_class(**processor_components, **processor_kwargs) |
|
self.skip_processor_without_typed_kwargs(processor) |
|
|
|
input_str = self.prepare_text_inputs(batch_size=3, modality="audio") |
|
audio_lengths = [4000, 8000, 16000, 32000] |
|
raw_speech = [np.asarray(audio)[:length] for audio, length in zip(floats_list((3, 32_000)), audio_lengths)] |
|
|
|
|
|
_ = processor(text=input_str, audio=raw_speech, padding=True, return_tensors="pt") |
|
|
|
def test_prepare_and_validate_optional_call_args(self): |
|
processor = self.get_processor() |
|
optional_call_args_name = getattr(processor, "optional_call_args", []) |
|
num_optional_call_args = len(optional_call_args_name) |
|
if num_optional_call_args == 0: |
|
self.skipTest("No optional call args") |
|
|
|
optional_call_args = processor.prepare_and_validate_optional_call_args( |
|
*(f"optional_{i}" for i in range(num_optional_call_args)) |
|
) |
|
self.assertEqual( |
|
optional_call_args, {arg_name: f"optional_{i}" for i, arg_name in enumerate(optional_call_args_name)} |
|
) |
|
|
|
optional_call_args = processor.prepare_and_validate_optional_call_args("optional_1") |
|
self.assertEqual(optional_call_args, {optional_call_args_name[0]: "optional_1"}) |
|
|
|
optional_call_args = processor.prepare_and_validate_optional_call_args() |
|
self.assertEqual(optional_call_args, {}) |
|
|
|
with self.assertRaises(ValueError): |
|
processor.prepare_and_validate_optional_call_args( |
|
*(f"optional_{i}" for i in range(num_optional_call_args + 1)) |
|
) |
|
|
|
def test_chat_template_save_loading(self): |
|
processor = self.processor_class.from_pretrained(self.tmpdirname) |
|
signature = inspect.signature(processor.__init__) |
|
if "chat_template" not in {*signature.parameters.keys()}: |
|
self.skipTest("Processor doesn't accept chat templates at input") |
|
|
|
existing_tokenizer_template = getattr(processor.tokenizer, "chat_template", None) |
|
processor.chat_template = "test template" |
|
with tempfile.TemporaryDirectory() as tmpdirname: |
|
processor.save_pretrained(tmpdirname, save_jinja_files=False) |
|
self.assertTrue(Path(tmpdirname, "chat_template.json").is_file()) |
|
self.assertFalse(Path(tmpdirname, "chat_template.jinja").is_file()) |
|
reloaded_processor = self.processor_class.from_pretrained(tmpdirname) |
|
self.assertEqual(processor.chat_template, reloaded_processor.chat_template) |
|
|
|
|
|
self.assertEqual(getattr(reloaded_processor.tokenizer, "chat_template", None), existing_tokenizer_template) |
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname: |
|
processor.save_pretrained(tmpdirname) |
|
self.assertTrue(Path(tmpdirname, "chat_template.jinja").is_file()) |
|
self.assertFalse(Path(tmpdirname, "chat_template.json").is_file()) |
|
self.assertFalse(Path(tmpdirname, "additional_chat_templates").is_dir()) |
|
reloaded_processor = self.processor_class.from_pretrained(tmpdirname) |
|
self.assertEqual(processor.chat_template, reloaded_processor.chat_template) |
|
|
|
|
|
self.assertEqual(reloaded_processor.chat_template, reloaded_processor.tokenizer.chat_template) |
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname: |
|
processor.chat_template = {"default": "a", "secondary": "b"} |
|
processor.save_pretrained(tmpdirname) |
|
self.assertTrue(Path(tmpdirname, "chat_template.jinja").is_file()) |
|
self.assertFalse(Path(tmpdirname, "chat_template.json").is_file()) |
|
self.assertTrue(Path(tmpdirname, "additional_chat_templates").is_dir()) |
|
reloaded_processor = self.processor_class.from_pretrained(tmpdirname) |
|
self.assertEqual(processor.chat_template, reloaded_processor.chat_template) |
|
|
|
|
|
self.assertEqual(reloaded_processor.chat_template, reloaded_processor.tokenizer.chat_template) |
|
|
|
with self.assertRaises(ValueError): |
|
|
|
with tempfile.TemporaryDirectory() as tmpdirname: |
|
processor.chat_template = {"default": "a", "secondary": "b"} |
|
processor.save_pretrained(tmpdirname, save_jinja_files=False) |
|
|
|
@require_torch |
|
def _test_apply_chat_template( |
|
self, |
|
modality: str, |
|
batch_size: int, |
|
return_tensors: str, |
|
input_name: str, |
|
processor_name: str, |
|
input_data: list[str], |
|
): |
|
processor = self.get_processor() |
|
if processor.chat_template is None: |
|
self.skipTest("Processor has no chat template") |
|
|
|
if processor_name not in self.processor_class.attributes: |
|
self.skipTest(f"{processor_name} attribute not present in {self.processor_class}") |
|
|
|
|
|
if getattr(processor, processor_name).__class__.__name__.endswith("Fast"): |
|
return_tensors = "pt" |
|
|
|
batch_messages = [ |
|
[ |
|
{ |
|
"role": "user", |
|
"content": [{"type": "text", "text": "Describe this."}], |
|
}, |
|
] |
|
] * batch_size |
|
|
|
|
|
formatted_prompt = processor.apply_chat_template(batch_messages, add_generation_prompt=True, tokenize=False) |
|
self.assertEqual(len(formatted_prompt), batch_size) |
|
|
|
|
|
formatted_prompt_tokenized = processor.apply_chat_template( |
|
batch_messages, add_generation_prompt=True, tokenize=True, return_tensors=return_tensors |
|
) |
|
add_special_tokens = True |
|
if processor.tokenizer.bos_token is not None and formatted_prompt[0].startswith(processor.tokenizer.bos_token): |
|
add_special_tokens = False |
|
tok_output = processor.tokenizer( |
|
formatted_prompt, return_tensors=return_tensors, add_special_tokens=add_special_tokens |
|
) |
|
expected_output = tok_output.input_ids |
|
self.assertListEqual(expected_output.tolist(), formatted_prompt_tokenized.tolist()) |
|
|
|
|
|
tokenized_prompt_100 = processor.apply_chat_template( |
|
batch_messages, |
|
add_generation_prompt=True, |
|
tokenize=True, |
|
padding="max_length", |
|
truncation=True, |
|
return_tensors=return_tensors, |
|
max_length=100, |
|
) |
|
self.assertEqual(len(tokenized_prompt_100[0]), 100) |
|
|
|
|
|
out_dict_text = processor.apply_chat_template( |
|
batch_messages, |
|
add_generation_prompt=True, |
|
tokenize=True, |
|
return_dict=True, |
|
return_tensors=return_tensors, |
|
) |
|
self.assertTrue(all(key in out_dict_text for key in ["input_ids", "attention_mask"])) |
|
self.assertEqual(len(out_dict_text["input_ids"]), batch_size) |
|
self.assertEqual(len(out_dict_text["attention_mask"]), batch_size) |
|
|
|
|
|
for idx, url in enumerate(input_data[:batch_size]): |
|
batch_messages[idx][0]["content"] = [batch_messages[idx][0]["content"][0], {"type": modality, "url": url}] |
|
|
|
out_dict = processor.apply_chat_template( |
|
batch_messages, |
|
add_generation_prompt=True, |
|
tokenize=True, |
|
return_dict=True, |
|
return_tensors=return_tensors, |
|
num_frames=4, |
|
) |
|
input_name = getattr(self, input_name) |
|
self.assertTrue(input_name in out_dict) |
|
self.assertEqual(len(out_dict["input_ids"]), batch_size) |
|
self.assertEqual(len(out_dict["attention_mask"]), batch_size) |
|
self.assertEqual(len(out_dict[input_name]), batch_size) |
|
|
|
return_tensor_to_type = {"pt": torch.Tensor, "np": np.ndarray, None: list} |
|
for k in out_dict: |
|
self.assertIsInstance(out_dict[k], return_tensor_to_type[return_tensors]) |
|
|
|
|
|
assistant_message = { |
|
"role": "assistant", |
|
"content": [{"type": "text", "text": "It is the sound of"}], |
|
} |
|
for idx, url in enumerate(input_data[:batch_size]): |
|
batch_messages[idx] = batch_messages[idx] + [assistant_message] |
|
continue_prompt = processor.apply_chat_template(batch_messages, continue_final_message=True, tokenize=False) |
|
for prompt in continue_prompt: |
|
self.assertTrue(prompt.endswith("It is the sound of")) |
|
|
|
@require_librosa |
|
@parameterized.expand([(1, "np"), (1, "pt"), (2, "np"), (2, "pt")]) |
|
def test_apply_chat_template_audio(self, batch_size: int, return_tensors: str): |
|
self._test_apply_chat_template( |
|
"audio", batch_size, return_tensors, "audio_input_name", "feature_extracttor", MODALITY_INPUT_DATA["audio"] |
|
) |
|
|
|
@require_av |
|
@parameterized.expand([(1, "pt"), (2, "pt")]) |
|
def test_apply_chat_template_video(self, batch_size: int, return_tensors: str): |
|
self._test_apply_chat_template( |
|
"video", batch_size, return_tensors, "videos_input_name", "video_processor", MODALITY_INPUT_DATA["videos"] |
|
) |
|
|
|
@parameterized.expand([(1, "np"), (1, "pt"), (2, "np"), (2, "pt")]) |
|
def test_apply_chat_template_image(self, batch_size: int, return_tensors: str): |
|
self._test_apply_chat_template( |
|
"image", batch_size, return_tensors, "images_input_name", "image_processor", MODALITY_INPUT_DATA["images"] |
|
) |
|
|
|
@require_torch |
|
def test_apply_chat_template_video_frame_sampling(self): |
|
processor = self.get_processor() |
|
|
|
if processor.chat_template is None: |
|
self.skipTest("Processor has no chat template") |
|
|
|
signature = inspect.signature(processor.__call__) |
|
if "videos" not in {*signature.parameters.keys()} or ( |
|
signature.parameters.get("videos") is not None |
|
and signature.parameters["videos"].annotation == inspect._empty |
|
): |
|
self.skipTest("Processor doesn't accept videos at input") |
|
|
|
messages = [ |
|
[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{ |
|
"type": "video", |
|
"url": "https://test-videos.co.uk/vids/bigbuckbunny/mp4/h264/720/Big_Buck_Bunny_720_10s_10MB.mp4", |
|
}, |
|
{"type": "text", "text": "What is shown in this video?"}, |
|
], |
|
}, |
|
] |
|
] |
|
|
|
num_frames = 3 |
|
out_dict_with_video = processor.apply_chat_template( |
|
messages, |
|
add_generation_prompt=True, |
|
tokenize=True, |
|
return_dict=True, |
|
num_frames=num_frames, |
|
return_tensors="pt", |
|
) |
|
self.assertTrue(self.videos_input_name in out_dict_with_video) |
|
self.assertEqual(len(out_dict_with_video[self.videos_input_name]), 1) |
|
self.assertEqual(len(out_dict_with_video[self.videos_input_name][0]), num_frames) |
|
|
|
|
|
video_fps = 1 |
|
out_dict_with_video = processor.apply_chat_template( |
|
messages, |
|
add_generation_prompt=True, |
|
tokenize=True, |
|
return_dict=True, |
|
video_fps=video_fps, |
|
return_tensors="pt", |
|
) |
|
self.assertTrue(self.videos_input_name in out_dict_with_video) |
|
self.assertEqual(len(out_dict_with_video[self.videos_input_name]), 1) |
|
self.assertEqual(len(out_dict_with_video[self.videos_input_name][0]), video_fps * 10) |
|
|
|
|
|
with self.assertRaises(ValueError): |
|
out_dict_with_video = processor.apply_chat_template( |
|
messages, |
|
add_generation_prompt=True, |
|
tokenize=True, |
|
return_dict=True, |
|
video_fps=video_fps, |
|
num_frames=num_frames, |
|
) |
|
|
|
|
|
out_dict_with_video = processor.apply_chat_template( |
|
messages, |
|
add_generation_prompt=True, |
|
tokenize=True, |
|
return_dict=True, |
|
) |
|
self.assertTrue(self.videos_input_name in out_dict_with_video) |
|
self.assertEqual(len(out_dict_with_video[self.videos_input_name]), 1) |
|
self.assertEqual(len(out_dict_with_video[self.videos_input_name][0]), 300) |
|
|
|
|
|
|
|
messages[0][0]["content"][0] = { |
|
"type": "video", |
|
"url": [ |
|
"https://www.ilankelman.org/stopsigns/australia.jpg", |
|
"https://www.ilankelman.org/stopsigns/australia.jpg", |
|
], |
|
} |
|
out_dict_with_video = processor.apply_chat_template( |
|
messages, |
|
add_generation_prompt=True, |
|
tokenize=True, |
|
return_dict=True, |
|
) |
|
self.assertTrue(self.videos_input_name in out_dict_with_video) |
|
self.assertEqual(len(out_dict_with_video[self.videos_input_name]), 1) |
|
self.assertEqual(len(out_dict_with_video[self.videos_input_name][0]), 2) |
|
|
|
@require_av |
|
@require_torch |
|
def test_apply_chat_template_video_special_processing(self): |
|
""" |
|
Tests that models can use their own preprocessing to preprocess conversations. |
|
""" |
|
processor = self.get_processor() |
|
if processor.chat_template is None: |
|
self.skipTest("Processor has no chat template") |
|
|
|
signature = inspect.signature(processor.__call__) |
|
if "videos" not in {*signature.parameters.keys()} or ( |
|
signature.parameters.get("videos") is not None |
|
and signature.parameters["videos"].annotation == inspect._empty |
|
): |
|
self.skipTest("Processor doesn't accept videos at input") |
|
|
|
video_file_path = hf_hub_download( |
|
repo_id="raushan-testing-hf/videos-test", filename="sample_demo_1.mp4", repo_type="dataset" |
|
) |
|
messages = [ |
|
[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "video", "path": video_file_path}, |
|
{"type": "text", "text": "What is shown in this video?"}, |
|
], |
|
}, |
|
] |
|
] |
|
|
|
def _process_messages_for_chat_template( |
|
conversation, |
|
batch_images, |
|
batch_videos, |
|
batch_video_metadata, |
|
**chat_template_kwargs, |
|
): |
|
|
|
new_msg = [ |
|
[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "video"}, |
|
{"type": "text", "text": "Dummy prompt for preprocess testing"}, |
|
], |
|
}, |
|
] |
|
] |
|
return new_msg |
|
|
|
processor._process_messages_for_chat_template = _process_messages_for_chat_template |
|
out_dict_with_video = processor.apply_chat_template( |
|
messages, |
|
add_generation_prompt=True, |
|
tokenize=True, |
|
return_dict=True, |
|
return_tensors="pt", |
|
) |
|
self.assertTrue(self.videos_input_name in out_dict_with_video) |
|
|
|
|
|
formatted_text = processor.batch_decode(out_dict_with_video["input_ids"], skip_special_tokens=True)[0] |
|
self.assertTrue("Dummy prompt for preprocess testing" in formatted_text) |
|
self.assertEqual(len(out_dict_with_video[self.videos_input_name]), 1) |
|
self.assertEqual(len(out_dict_with_video[self.videos_input_name][0]), 243) |
|
|
|
@require_librosa |
|
@require_av |
|
def test_chat_template_audio_from_video(self): |
|
processor = self.get_processor() |
|
if processor.chat_template is None: |
|
self.skipTest("Processor has no chat template") |
|
|
|
signature = inspect.signature(processor.__call__) |
|
if "videos" not in {*signature.parameters.keys()} or ( |
|
signature.parameters.get("videos") is not None |
|
and signature.parameters["videos"].annotation == inspect._empty |
|
): |
|
self.skipTest(f"{self.processor_class} does not support video inputs") |
|
|
|
if "feature_extractor" not in self.processor_class.attributes: |
|
self.skipTest(f"feature_extractor attribute not present in {self.processor_class}") |
|
|
|
video_file_path = hf_hub_download( |
|
repo_id="raushan-testing-hf/videos-test", filename="sample_demo_1.mp4", repo_type="dataset" |
|
) |
|
messages = [ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "video", "path": video_file_path}, |
|
{"type": "text", "text": "Which of these animals is making the sound?"}, |
|
], |
|
}, |
|
{ |
|
"role": "assistant", |
|
"content": [{"type": "text", "text": "It is a cow."}], |
|
}, |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": "Tell me all about this animal."}, |
|
], |
|
}, |
|
] |
|
|
|
formatted_prompt = processor.apply_chat_template([messages], add_generation_prompt=True, tokenize=False) |
|
self.assertEqual(len(formatted_prompt), 1) |
|
|
|
out_dict = processor.apply_chat_template( |
|
messages, |
|
add_generation_prompt=True, |
|
tokenize=True, |
|
return_dict=True, |
|
return_tensors="np", |
|
load_audio_from_video=True, |
|
) |
|
self.assertTrue(self.audio_input_name in out_dict) |
|
self.assertTrue(self.videos_input_name in out_dict) |
|
|
|
|
|
self.assertEqual(len(out_dict["input_ids"]), 1) |
|
self.assertEqual(len(out_dict["attention_mask"]), 1) |
|
self.assertEqual(len(out_dict[self.audio_input_name]), 1) |
|
self.assertEqual(len(out_dict[self.videos_input_name]), 1) |
|
|