Spaces:
Configuration error
Configuration error
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
# | |
# Copyright (c) 2022 Intel Corporation | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import os | |
import pytest | |
import shutil | |
import tempfile | |
from unittest.mock import MagicMock | |
from tlt.datasets import dataset_factory | |
from tlt.models import model_factory | |
try: | |
from tlt.datasets.text_classification.hf_custom_text_classification_dataset import HFCustomTextClassificationDataset | |
except ModuleNotFoundError: | |
print("WARNING: Unable to import HFCustomTextClassificationDataset.") | |
def test_pyt_text_classification(model_name, dataset_name, extra_layers, correct_num_layers, test_inc): | |
""" | |
Tests basic transfer learning functionality for PyTorch text classification models using a hugging face dataset | |
""" | |
framework = 'pytorch' | |
output_dir = tempfile.mkdtemp() | |
# Get the dataset | |
dataset = dataset_factory.get_dataset(output_dir, 'text_classification', framework, dataset_name, | |
'huggingface', split=["train"], shuffle_files=False) | |
# Get the model | |
model = model_factory.get_model(model_name, framework) | |
# Preprocess the dataset | |
dataset.preprocess(model_name, batch_size=32) | |
dataset.shuffle_split(train_pct=0.02, val_pct=0.01, seed=6) | |
assert dataset._validation_type == 'shuffle_split' | |
# Evaluate before training | |
pretrained_metrics = model.evaluate(dataset) | |
assert len(pretrained_metrics) > 0 | |
# Train | |
train_history = model.train(dataset, output_dir=output_dir, epochs=1, do_eval=False, extra_layers=extra_layers) | |
assert train_history is not None and isinstance(train_history, dict) | |
assert 'Loss' in train_history | |
assert 'Acc' in train_history | |
assert 'train_runtime' in train_history | |
assert 'train_samples_per_second' in train_history | |
classifier_layer = getattr(model._model, "classifier") | |
try: | |
# If extra_layers given, the classifier is a Sequential layer with given input | |
n_layers = len(classifier_layer) | |
except TypeError: | |
# If not given, the classifer is just a single Linear layer | |
n_layers = 1 | |
assert n_layers == correct_num_layers | |
# Evaluate | |
trained_metrics = model.evaluate(dataset) | |
assert trained_metrics['eval_loss'] <= pretrained_metrics['eval_loss'] | |
assert trained_metrics['eval_accuracy'] >= pretrained_metrics['eval_accuracy'] | |
# Export the saved model | |
saved_model_dir = model.export(output_dir) | |
assert os.path.isdir(saved_model_dir) | |
assert os.path.isfile(os.path.join(saved_model_dir, "model.pt")) | |
# Reload the saved model | |
reload_model = model_factory.get_model(model_name, framework) | |
reload_model.load_from_directory(saved_model_dir) | |
# Evaluate | |
reload_metrics = reload_model.evaluate(dataset) | |
assert reload_metrics['eval_accuracy'] == trained_metrics['eval_accuracy'] | |
# Ensure we get 'NotImplementedError' for graph_optimization | |
with pytest.raises(NotImplementedError): | |
model.optimize_graph(os.path.join(saved_model_dir, 'optimized')) | |
# Quantization | |
if test_inc: | |
inc_output_dir = os.path.join(output_dir, "quantized", model_name) | |
os.makedirs(inc_output_dir, exist_ok=True) | |
model.quantize(inc_output_dir, dataset) | |
assert os.path.exists(os.path.join(inc_output_dir, "model.pt")) | |
# Delete the temp output directory | |
if os.path.exists(output_dir) and os.path.isdir(output_dir): | |
shutil.rmtree(output_dir) | |
def test_custom_dataset_workflow(model_name): | |
""" | |
Tests the full workflow for PYT text classification using a custom dataset mock | |
""" | |
model = model_factory.get_model(model_name, framework='pytorch', use_case="text_classification") | |
output_dir = tempfile.mkdtemp() | |
os.environ["TORCH_HOME"] = output_dir | |
mock_dataset = MagicMock() | |
mock_dataset.__class__ = HFCustomTextClassificationDataset | |
mock_dataset.validation_subset = ['fun', 'terrible'] | |
mock_dataset.train_subset = ["fun, happy, boring, terrible"] | |
mock_dataset.class_names = ['good', 'bad'] | |
# Preprocess the dataset and split to get small subsets for training and validation | |
mock_dataset.shuffle_split(train_pct=0.1, val_pct=0.1, shuffle_files=False) | |
mock_dataset.preprocess(model_name, batch_size=32) | |
# Train for 1 epoch | |
history = model.train(mock_dataset, output_dir=output_dir, epochs=1, seed=10, do_eval=False) | |
assert history is not None | |
# Evaluate | |
model.evaluate(mock_dataset) | |
# export the saved model | |
saved_model_dir = model.export(output_dir) | |
assert os.path.isdir(saved_model_dir) | |
assert os.path.isfile(os.path.join(saved_model_dir, "model.pt")) | |
# Reload the saved model | |
reload_model = model_factory.get_model(model_name, 'pytorch') | |
reload_model.load_from_directory(saved_model_dir) | |
# Evaluate | |
metrics = reload_model.evaluate(mock_dataset) | |
assert len(metrics) > 0 | |
# Delete the temp output directory | |
if os.path.exists(output_dir) and os.path.isdir(output_dir): | |
shutil.rmtree(output_dir) | |
def test_initial_checkpoints(model_name, dataset_name): | |
framework = 'pytorch' | |
output_dir = tempfile.mkdtemp() | |
checkpoint_dir = os.path.join(output_dir, model_name + '_checkpoints') | |
# Get the dataset | |
dataset = dataset_factory.get_dataset(output_dir, 'text_classification', framework, dataset_name, | |
'huggingface', split=["train"], shuffle_files=False) | |
# Get the model | |
model = model_factory.get_model(model_name, framework) | |
assert model._generate_checkpoints is True | |
dataset.preprocess(model_name, batch_size=32) | |
dataset.shuffle_split(train_pct=0.01, val_pct=0.01, seed=10) | |
# Train | |
model.train(dataset, output_dir=output_dir, epochs=2, do_eval=False) | |
trained_metrics = model.evaluate(dataset) | |
# Delete the model and train a brand new model but instead we resume training from checkpoints | |
del model | |
model = model_factory.get_model(model_name, framework) | |
model.train(dataset, output_dir=output_dir, epochs=2, do_eval=False, | |
initial_checkpoints=os.path.join(checkpoint_dir, 'checkpoint.pt')) | |
improved_metrics = model.evaluate(dataset) | |
assert improved_metrics['eval_loss'] < trained_metrics['eval_loss'] | |
assert improved_metrics['eval_accuracy'] > trained_metrics['eval_accuracy'] | |
# Delete the temp output directory | |
if os.path.exists(output_dir) and os.path.isdir(output_dir): | |
shutil.rmtree(output_dir) | |
def test_freeze_bert(model_name, dataset_name): | |
framework = 'pytorch' | |
output_dir = tempfile.mkdtemp() | |
# Get the dataset | |
dataset = dataset_factory.get_dataset(output_dir, 'text_classification', framework, dataset_name, | |
'huggingface', split=["train"], shuffle_files=False) | |
# Get the model | |
model = model_factory.get_model(model_name, framework) | |
dataset.preprocess(model_name, batch_size=32) | |
dataset.shuffle_split(train_pct=0.01, val_pct=0.01, seed=10) | |
# Train | |
model.train(dataset, output_dir=output_dir, epochs=1, do_eval=False) | |
# Freeze feature layers | |
layer_name = "features" | |
model.freeze_layer(layer_name) | |
# Check everything is frozen (not trainable) in the layer | |
for (name, module) in model._model.named_children(): | |
if name == layer_name: | |
for param in module.parameters(): | |
assert param.requires_grad is False | |
# Delete the temp output directory | |
if os.path.exists(output_dir) and os.path.isdir(output_dir): | |
shutil.rmtree(output_dir) | |
def test_unfreeze_bert(model_name, dataset_name): | |
framework = 'pytorch' | |
output_dir = tempfile.mkdtemp() | |
# Get the dataset | |
dataset = dataset_factory.get_dataset(output_dir, 'text_classification', framework, dataset_name, | |
'huggingface', split=["train"], shuffle_files=False) | |
# Get the model | |
model = model_factory.get_model(model_name, framework) | |
dataset.preprocess(model_name, batch_size=32) | |
dataset.shuffle_split(train_pct=0.01, val_pct=0.01, seed=10) | |
# Train | |
model.train(dataset, output_dir=output_dir, epochs=1, do_eval=False) | |
layer_name = "features" | |
model.unfreeze_layer(layer_name) | |
# Check everything is unfrozen (trainable) in the layer | |
for (name, module) in model._model.named_children(): | |
if name == layer_name: | |
for param in module.parameters(): | |
assert param.requires_grad is True | |
# Delete the temp output directory | |
if os.path.exists(output_dir) and os.path.isdir(output_dir): | |
shutil.rmtree(output_dir) | |
def test_list_layers_bert(model_name, dataset_name): | |
import io | |
import unittest.mock as mock | |
framework = 'pytorch' | |
output_dir = tempfile.mkdtemp() | |
# Get the model | |
model = model_factory.get_model(model_name, framework) | |
# Get the dataset | |
dataset = dataset_factory.get_dataset(output_dir, 'text_classification', framework, dataset_name, | |
'huggingface', split=["train"], shuffle_files=False) | |
dataset.preprocess(model_name, batch_size=32) | |
dataset.shuffle_split(train_pct=0.01, val_pct=0.01, seed=10) | |
# Train | |
model.train(dataset, output_dir=output_dir, epochs=1, do_eval=False) | |
# Mock stdout and sterr to capture the function's output | |
stdout = io.StringIO() | |
stderr = io.StringIO() | |
with mock.patch('sys.stdout', stdout), mock.patch('sys.stderr', stderr): | |
model.list_layers(verbose=True) | |
# Assert the function printed the correct output of the trainable layers | |
output = stdout.getvalue().strip() | |
assert 'distilbert' in output | |
assert 'embeddings: 23835648/23835648 parameters are trainable' in output | |
assert 'transformer: 42527232/42527232 parameters are trainable' in output | |
assert 'pre_classifier: 590592/590592 parameters are trainable' in output | |
assert 'dropout: 0/0 parameters are trainable' in output | |
assert 'dropout: 0/0 parameters are trainable' in output | |
assert 'Total Trainable Parameters: 66955010/66955010' in output | |
# Delete the temp output directory | |
if os.path.exists(output_dir) and os.path.isdir(output_dir): | |
shutil.rmtree(output_dir) | |