|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from typing import Dict |
|
|
|
from transformers import EvalPrediction, HfArgumentParser, TrainingArguments, is_torch_available |
|
from transformers.testing_utils import ( |
|
TestCasePlus, |
|
execute_subprocess_async, |
|
get_torch_dist_unique_port, |
|
require_torch_multi_gpu, |
|
require_torch_neuroncore, |
|
) |
|
from transformers.training_args import ParallelMode |
|
from transformers.utils import logging |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
if is_torch_available(): |
|
import torch |
|
from torch import nn |
|
from torch.utils.data import Dataset |
|
|
|
from transformers import Trainer |
|
|
|
class DummyDataset(Dataset): |
|
def __init__(self, length: int = 101): |
|
self.length = length |
|
|
|
def __len__(self): |
|
return self.length |
|
|
|
def __getitem__(self, i) -> int: |
|
return i |
|
|
|
class DummyDataCollator: |
|
def __call__(self, features): |
|
return {"input_ids": torch.tensor(features), "labels": torch.tensor(features)} |
|
|
|
class DummyModel(nn.Module): |
|
def __init__(self): |
|
super().__init__() |
|
|
|
self.fc = nn.Linear(120, 80) |
|
|
|
def forward(self, input_ids, labels=None): |
|
if labels is not None: |
|
return torch.tensor(0.0, device=input_ids.device), input_ids |
|
else: |
|
return input_ids |
|
|
|
|
|
class TestTrainerDistributedNeuronCore(TestCasePlus): |
|
@require_torch_neuroncore |
|
def test_trainer(self): |
|
distributed_args = f"""--nproc_per_node=2 |
|
--master_port={get_torch_dist_unique_port()} |
|
{self.test_file_dir}/test_trainer_distributed.py |
|
""".split() |
|
output_dir = self.get_auto_remove_tmp_dir() |
|
args = f"--output_dir {output_dir}".split() |
|
cmd = ["torchrun"] + distributed_args + args |
|
execute_subprocess_async(cmd, env=self.get_env()) |
|
|
|
|
|
|
|
class TestTrainerDistributed(TestCasePlus): |
|
@require_torch_multi_gpu |
|
def test_trainer(self): |
|
distributed_args = f"""--nproc_per_node={torch.cuda.device_count()} |
|
--master_port={get_torch_dist_unique_port()} |
|
{self.test_file_dir}/test_trainer_distributed.py |
|
""".split() |
|
output_dir = self.get_auto_remove_tmp_dir() |
|
args = f"--output_dir {output_dir}".split() |
|
cmd = ["torchrun"] + distributed_args + args |
|
execute_subprocess_async(cmd, env=self.get_env()) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
parser = HfArgumentParser((TrainingArguments,)) |
|
training_args = parser.parse_args_into_dataclasses()[0] |
|
|
|
logger.warning( |
|
f"Process rank: {training_args.local_rank}, device: {training_args.device}, n_gpu: {training_args.n_gpu}, " |
|
f"distributed training: {training_args.parallel_mode != ParallelMode.NOT_DISTRIBUTED}" |
|
) |
|
|
|
|
|
|
|
for dataset_length in [101, 40, 7]: |
|
dataset = DummyDataset(dataset_length) |
|
|
|
def compute_metrics(p: EvalPrediction) -> Dict: |
|
sequential = list(range(len(dataset))) |
|
success = p.predictions.tolist() == sequential and p.label_ids.tolist() == sequential |
|
if not success and training_args.local_rank == 0: |
|
logger.warning( |
|
"Predictions and/or labels do not match expected results:\n - predictions: " |
|
f"{p.predictions.tolist()}\n - labels: {p.label_ids.tolist()}\n - expected: {sequential}" |
|
) |
|
return {"success": success} |
|
|
|
trainer = Trainer( |
|
model=DummyModel(), |
|
args=training_args, |
|
data_collator=DummyDataCollator(), |
|
eval_dataset=dataset, |
|
compute_metrics=compute_metrics, |
|
) |
|
metrics = trainer.evaluate() |
|
logger.info(metrics) |
|
if metrics["eval_success"] is not True: |
|
logger.error(metrics) |
|
exit(1) |
|
|
|
p = trainer.predict(dataset) |
|
logger.info(p.metrics) |
|
if p.metrics["test_success"] is not True: |
|
logger.error(p.metrics) |
|
exit(1) |
|
|
|
trainer.args.eval_accumulation_steps = 2 |
|
|
|
metrics = trainer.evaluate() |
|
logger.info(metrics) |
|
if metrics["eval_success"] is not True: |
|
logger.error(metrics) |
|
exit(1) |
|
|
|
p = trainer.predict(dataset) |
|
logger.info(p.metrics) |
|
if p.metrics["test_success"] is not True: |
|
logger.error(p.metrics) |
|
exit(1) |
|
|
|
trainer.args.eval_accumulation_steps = None |
|
|