Spaces:
Paused
Paused
# coding=utf-8 | |
# Copyright 2019 HuggingFace Inc. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from __future__ import annotations | |
import copy | |
import os | |
import tempfile | |
from importlib import import_module | |
from math import isnan | |
from transformers import is_tf_available | |
from transformers.models.auto import get_values | |
from transformers.testing_utils import _tf_gpu_memory_limit, require_tf, slow | |
from ..test_modeling_tf_common import ids_tensor | |
if is_tf_available(): | |
import numpy as np | |
import tensorflow as tf | |
from transformers import ( | |
TF_MODEL_FOR_CAUSAL_LM_MAPPING, | |
TF_MODEL_FOR_IMAGE_CLASSIFICATION_MAPPING, | |
TF_MODEL_FOR_MASKED_LM_MAPPING, | |
TF_MODEL_FOR_MULTIPLE_CHOICE_MAPPING, | |
TF_MODEL_FOR_NEXT_SENTENCE_PREDICTION_MAPPING, | |
TF_MODEL_FOR_PRETRAINING_MAPPING, | |
TF_MODEL_FOR_QUESTION_ANSWERING_MAPPING, | |
TF_MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING, | |
TF_MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING, | |
TF_MODEL_FOR_TOKEN_CLASSIFICATION_MAPPING, | |
TFSharedEmbeddings, | |
) | |
if _tf_gpu_memory_limit is not None: | |
gpus = tf.config.list_physical_devices("GPU") | |
for gpu in gpus: | |
# Restrict TensorFlow to only allocate x GB of memory on the GPUs | |
try: | |
tf.config.set_logical_device_configuration( | |
gpu, [tf.config.LogicalDeviceConfiguration(memory_limit=_tf_gpu_memory_limit)] | |
) | |
logical_gpus = tf.config.list_logical_devices("GPU") | |
print("Logical GPUs", logical_gpus) | |
except RuntimeError as e: | |
# Virtual devices must be set before GPUs have been initialized | |
print(e) | |
class TFCoreModelTesterMixin: | |
model_tester = None | |
all_model_classes = () | |
all_generative_model_classes = () | |
test_mismatched_shapes = True | |
test_resize_embeddings = True | |
test_head_masking = True | |
is_encoder_decoder = False | |
def _prepare_for_class(self, inputs_dict, model_class, return_labels=False) -> dict: | |
inputs_dict = copy.deepcopy(inputs_dict) | |
if model_class in get_values(TF_MODEL_FOR_MULTIPLE_CHOICE_MAPPING): | |
inputs_dict = { | |
k: tf.tile(tf.expand_dims(v, 1), (1, self.model_tester.num_choices) + (1,) * (v.ndim - 1)) | |
if isinstance(v, tf.Tensor) and v.ndim > 0 | |
else v | |
for k, v in inputs_dict.items() | |
} | |
if return_labels: | |
if model_class in get_values(TF_MODEL_FOR_MULTIPLE_CHOICE_MAPPING): | |
inputs_dict["labels"] = tf.ones(self.model_tester.batch_size, dtype=tf.int32) | |
elif model_class in get_values(TF_MODEL_FOR_QUESTION_ANSWERING_MAPPING): | |
inputs_dict["start_positions"] = tf.zeros(self.model_tester.batch_size, dtype=tf.int32) | |
inputs_dict["end_positions"] = tf.zeros(self.model_tester.batch_size, dtype=tf.int32) | |
elif model_class in [ | |
*get_values(TF_MODEL_FOR_SEQUENCE_CLASSIFICATION_MAPPING), | |
*get_values(TF_MODEL_FOR_IMAGE_CLASSIFICATION_MAPPING), | |
]: | |
inputs_dict["labels"] = tf.zeros(self.model_tester.batch_size, dtype=tf.int32) | |
elif model_class in get_values(TF_MODEL_FOR_NEXT_SENTENCE_PREDICTION_MAPPING): | |
inputs_dict["next_sentence_label"] = tf.zeros(self.model_tester.batch_size, dtype=tf.int32) | |
elif model_class in [ | |
*get_values(TF_MODEL_FOR_TOKEN_CLASSIFICATION_MAPPING), | |
*get_values(TF_MODEL_FOR_CAUSAL_LM_MAPPING), | |
*get_values(TF_MODEL_FOR_MASKED_LM_MAPPING), | |
*get_values(TF_MODEL_FOR_PRETRAINING_MAPPING), | |
*get_values(TF_MODEL_FOR_SEQ_TO_SEQ_CAUSAL_LM_MAPPING), | |
]: | |
inputs_dict["labels"] = tf.zeros( | |
(self.model_tester.batch_size, self.model_tester.seq_length), dtype=tf.int32 | |
) | |
return inputs_dict | |
def test_graph_mode(self): | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() | |
for model_class in self.all_model_classes[:2]: | |
inputs = self._prepare_for_class(inputs_dict, model_class) | |
model = model_class(config) | |
def run_in_graph_mode(): | |
return model(inputs) | |
outputs = run_in_graph_mode() | |
self.assertIsNotNone(outputs) | |
def test_xla_mode(self): | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() | |
for model_class in self.all_model_classes[:2]: | |
inputs = self._prepare_for_class(inputs_dict, model_class) | |
model = model_class(config) | |
def run_in_graph_mode(): | |
return model(inputs) | |
outputs = run_in_graph_mode() | |
self.assertIsNotNone(outputs) | |
def test_xla_fit(self): | |
# This is a copy of the test_keras_fit method, but we use XLA compilation instead of eager | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() | |
for model_class in self.all_model_classes[:2]: | |
model = model_class(config) | |
if getattr(model, "hf_compute_loss", None): | |
# Test that model correctly compute the loss with kwargs | |
prepared_for_class = self._prepare_for_class(inputs_dict.copy(), model_class, return_labels=True) | |
# Is there a better way to remove these decoder inputs? | |
prepared_for_class = { | |
key: val | |
for key, val in prepared_for_class.items() | |
if key not in ("head_mask", "decoder_head_mask", "cross_attn_head_mask", "decoder_input_ids") | |
} | |
possible_label_cols = { | |
"labels", | |
"label", | |
"label_ids", | |
"start_positions", | |
"start_position", | |
"end_positions", | |
"end_position", | |
"next_sentence_label", | |
} | |
label_names = possible_label_cols.intersection(set(prepared_for_class)) | |
self.assertGreater(len(label_names), 0, msg="No matching label names found!") | |
labels = {key: val for key, val in prepared_for_class.items() if key in label_names} | |
inputs_minus_labels = {key: val for key, val in prepared_for_class.items() if key not in label_names} | |
self.assertGreater(len(inputs_minus_labels), 0) | |
# Make sure it works with XLA! | |
model.compile(optimizer=tf.keras.optimizers.SGD(0.0), jit_compile=True) | |
# Make sure the model fits without crashing regardless of where we pass the labels | |
history = model.fit( | |
prepared_for_class, | |
validation_data=prepared_for_class, | |
steps_per_epoch=1, | |
validation_steps=1, | |
shuffle=False, | |
verbose=0, | |
) | |
loss = history.history["loss"][0] | |
self.assertTrue(not isnan(loss)) | |
val_loss = history.history["val_loss"][0] | |
self.assertTrue(not isnan(val_loss)) | |
# Now test it with separate labels, to make sure that path works in XLA too. | |
model = model_class(config) | |
model.compile(optimizer=tf.keras.optimizers.SGD(0.0), jit_compile=True) | |
history = model.fit( | |
inputs_minus_labels, | |
labels, | |
validation_data=(inputs_minus_labels, labels), | |
steps_per_epoch=1, | |
validation_steps=1, | |
shuffle=False, | |
verbose=0, | |
) | |
loss = history.history["loss"][0] | |
self.assertTrue(not isnan(loss)) | |
val_loss = history.history["val_loss"][0] | |
self.assertTrue(not isnan(val_loss)) | |
def test_saved_model_creation_extended(self): | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() | |
config.output_hidden_states = True | |
config.output_attentions = True | |
if hasattr(config, "use_cache"): | |
config.use_cache = True | |
encoder_seq_length = getattr(self.model_tester, "encoder_seq_length", self.model_tester.seq_length) | |
encoder_key_length = getattr(self.model_tester, "key_length", encoder_seq_length) | |
for model_class in self.all_model_classes[:2]: | |
class_inputs_dict = self._prepare_for_class(inputs_dict, model_class) | |
model = model_class(config) | |
model.build() | |
num_out = len(model(class_inputs_dict)) | |
for key in list(class_inputs_dict.keys()): | |
# Remove keys not in the serving signature, as the SavedModel will not be compiled to deal with them | |
if key not in model.input_signature: | |
del class_inputs_dict[key] | |
# Check it's a tensor, in case the inputs dict has some bools in it too | |
elif isinstance(class_inputs_dict[key], tf.Tensor) and class_inputs_dict[key].dtype.is_integer: | |
class_inputs_dict[key] = tf.cast(class_inputs_dict[key], tf.int32) | |
if set(class_inputs_dict.keys()) != set(model.input_signature.keys()): | |
continue # Some models have inputs that the preparation functions don't create, we skip those | |
with tempfile.TemporaryDirectory() as tmpdirname: | |
model.save_pretrained(tmpdirname, saved_model=True) | |
saved_model_dir = os.path.join(tmpdirname, "saved_model", "1") | |
model = tf.keras.models.load_model(saved_model_dir) | |
outputs = model(class_inputs_dict) | |
if self.is_encoder_decoder: | |
output_hidden_states = outputs["encoder_hidden_states"] | |
output_attentions = outputs["encoder_attentions"] | |
else: | |
output_hidden_states = outputs["hidden_states"] | |
output_attentions = outputs["attentions"] | |
self.assertEqual(len(outputs), num_out) | |
expected_num_layers = getattr( | |
self.model_tester, "expected_num_hidden_layers", self.model_tester.num_hidden_layers + 1 | |
) | |
self.assertEqual(len(output_hidden_states), expected_num_layers) | |
self.assertListEqual( | |
list(output_hidden_states[0].shape[-2:]), | |
[self.model_tester.seq_length, self.model_tester.hidden_size], | |
) | |
self.assertEqual(len(output_attentions), self.model_tester.num_hidden_layers) | |
self.assertListEqual( | |
list(output_attentions[0].shape[-3:]), | |
[self.model_tester.num_attention_heads, encoder_seq_length, encoder_key_length], | |
) | |
def test_mixed_precision(self): | |
tf.keras.mixed_precision.set_global_policy("mixed_float16") | |
# try/finally block to ensure subsequent tests run in float32 | |
try: | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() | |
for model_class in self.all_model_classes[:2]: | |
class_inputs_dict = self._prepare_for_class(inputs_dict, model_class) | |
model = model_class(config) | |
outputs = model(class_inputs_dict) | |
self.assertIsNotNone(outputs) | |
finally: | |
tf.keras.mixed_precision.set_global_policy("float32") | |
def test_train_pipeline_custom_model(self): | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() | |
# head_mask and decoder_head_mask has different shapes than other input args | |
if "head_mask" in inputs_dict: | |
del inputs_dict["head_mask"] | |
if "decoder_head_mask" in inputs_dict: | |
del inputs_dict["decoder_head_mask"] | |
if "cross_attn_head_mask" in inputs_dict: | |
del inputs_dict["cross_attn_head_mask"] | |
tf_main_layer_classes = { | |
module_member | |
for model_class in self.all_model_classes | |
for module in (import_module(model_class.__module__),) | |
for module_member_name in dir(module) | |
if module_member_name.endswith("MainLayer") | |
for module_member in (getattr(module, module_member_name),) | |
if isinstance(module_member, type) | |
and tf.keras.layers.Layer in module_member.__bases__ | |
and getattr(module_member, "_keras_serializable", False) | |
} | |
for main_layer_class in tf_main_layer_classes: | |
# T5MainLayer needs an embed_tokens parameter when called without the inputs_embeds parameter | |
if "T5" in main_layer_class.__name__: | |
# Take the same values than in TFT5ModelTester for this shared layer | |
shared = TFSharedEmbeddings(self.model_tester.vocab_size, self.model_tester.hidden_size, name="shared") | |
config.use_cache = False | |
main_layer = main_layer_class(config, embed_tokens=shared) | |
else: | |
main_layer = main_layer_class(config) | |
symbolic_inputs = { | |
name: tf.keras.Input(tensor.shape[1:], dtype=tensor.dtype) for name, tensor in inputs_dict.items() | |
} | |
if hasattr(self.model_tester, "num_labels"): | |
num_labels = self.model_tester.num_labels | |
else: | |
num_labels = 2 | |
X = tf.data.Dataset.from_tensor_slices( | |
(inputs_dict, np.ones((self.model_tester.batch_size, self.model_tester.seq_length, num_labels, 1))) | |
).batch(1) | |
hidden_states = main_layer(symbolic_inputs)[0] | |
outputs = tf.keras.layers.Dense(num_labels, activation="softmax", name="outputs")(hidden_states) | |
model = tf.keras.models.Model(inputs=symbolic_inputs, outputs=[outputs]) | |
model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["binary_accuracy"]) | |
model.fit(X, epochs=1) | |
with tempfile.TemporaryDirectory() as tmpdirname: | |
filepath = os.path.join(tmpdirname, "keras_model.h5") | |
model.save(filepath) | |
if "T5" in main_layer_class.__name__: | |
model = tf.keras.models.load_model( | |
filepath, | |
custom_objects={ | |
main_layer_class.__name__: main_layer_class, | |
"TFSharedEmbeddings": TFSharedEmbeddings, | |
}, | |
) | |
else: | |
model = tf.keras.models.load_model( | |
filepath, custom_objects={main_layer_class.__name__: main_layer_class} | |
) | |
assert isinstance(model, tf.keras.Model) | |
model(inputs_dict) | |
def test_graph_mode_with_inputs_embeds(self): | |
config, inputs_dict = self.model_tester.prepare_config_and_inputs_for_common() | |
for model_class in self.all_model_classes[:2]: | |
model = model_class(config) | |
inputs = copy.deepcopy(inputs_dict) | |
if not self.is_encoder_decoder: | |
input_ids = inputs["input_ids"] | |
del inputs["input_ids"] | |
else: | |
encoder_input_ids = inputs["input_ids"] | |
decoder_input_ids = inputs.get("decoder_input_ids", encoder_input_ids) | |
del inputs["input_ids"] | |
inputs.pop("decoder_input_ids", None) | |
if not self.is_encoder_decoder: | |
inputs["inputs_embeds"] = model.get_input_embeddings()(input_ids) | |
else: | |
inputs["inputs_embeds"] = model.get_input_embeddings()(encoder_input_ids) | |
inputs["decoder_inputs_embeds"] = model.get_input_embeddings()(decoder_input_ids) | |
inputs = self._prepare_for_class(inputs, model_class) | |
def run_in_graph_mode(): | |
return model(inputs) | |
outputs = run_in_graph_mode() | |
self.assertIsNotNone(outputs) | |
def _generate_random_bad_tokens(self, num_bad_tokens, model): | |
# special tokens cannot be bad tokens | |
special_tokens = [] | |
if model.config.bos_token_id is not None: | |
special_tokens.append(model.config.bos_token_id) | |
if model.config.pad_token_id is not None: | |
special_tokens.append(model.config.pad_token_id) | |
if model.config.eos_token_id is not None: | |
special_tokens.append(model.config.eos_token_id) | |
# create random bad tokens that are not special tokens | |
bad_tokens = [] | |
while len(bad_tokens) < num_bad_tokens: | |
token = tf.squeeze(ids_tensor((1, 1), self.model_tester.vocab_size), 0).numpy()[0] | |
if token not in special_tokens: | |
bad_tokens.append(token) | |
return bad_tokens | |
def _check_generated_ids(self, output_ids): | |
for token_id in output_ids[0].numpy().tolist(): | |
self.assertGreaterEqual(token_id, 0) | |
self.assertLess(token_id, self.model_tester.vocab_size) | |
def _check_match_tokens(self, generated_ids, bad_words_ids): | |
# for all bad word tokens | |
for bad_word_ids in bad_words_ids: | |
# for all slices in batch | |
for generated_ids_slice in generated_ids: | |
# for all word idx | |
for i in range(len(bad_word_ids), len(generated_ids_slice)): | |
# if tokens match | |
if generated_ids_slice[i - len(bad_word_ids) : i] == bad_word_ids: | |
return True | |
return False | |