Spaces:
Runtime error
Runtime error
# coding=utf-8 | |
# Copyright 2021 The HuggingFace Team All rights reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
""" | |
A subclass of `Trainer` specific to Question-Answering tasks | |
""" | |
import math | |
import time | |
from typing import Dict, List, Optional | |
from torch.utils.data import Dataset | |
from transformers import Seq2SeqTrainer, is_torch_tpu_available | |
from transformers.trainer_utils import PredictionOutput, speed_metrics | |
if is_torch_tpu_available(check_device=False): | |
import torch_xla.core.xla_model as xm | |
import torch_xla.debug.metrics as met | |
class QuestionAnsweringSeq2SeqTrainer(Seq2SeqTrainer): | |
def __init__(self, *args, eval_examples=None, post_process_function=None, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.eval_examples = eval_examples | |
self.post_process_function = post_process_function | |
# def evaluate(self, eval_dataset=None, eval_examples=None, ignore_keys=None, metric_key_prefix: str = "eval"): | |
def evaluate( | |
self, | |
eval_dataset: Optional[Dataset] = None, | |
eval_examples=None, | |
ignore_keys: Optional[List[str]] = None, | |
metric_key_prefix: str = "eval", | |
**gen_kwargs, | |
) -> Dict[str, float]: | |
gen_kwargs = gen_kwargs.copy() | |
gen_kwargs["max_length"] = ( | |
gen_kwargs["max_length"] if gen_kwargs.get("max_length") is not None else self.args.generation_max_length | |
) | |
gen_kwargs["num_beams"] = ( | |
gen_kwargs["num_beams"] if gen_kwargs.get("num_beams") is not None else self.args.generation_num_beams | |
) | |
self._gen_kwargs = gen_kwargs | |
eval_dataset = self.eval_dataset if eval_dataset is None else eval_dataset | |
eval_dataloader = self.get_eval_dataloader(eval_dataset) | |
eval_examples = self.eval_examples if eval_examples is None else eval_examples | |
# Temporarily disable metric computation, we will do it in the loop here. | |
compute_metrics = self.compute_metrics | |
self.compute_metrics = None | |
start_time = time.time() | |
eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop | |
try: | |
output = eval_loop( | |
eval_dataloader, | |
description="Evaluation", | |
# No point gathering the predictions if there are no metrics, otherwise we defer to | |
# self.args.prediction_loss_only | |
prediction_loss_only=True if compute_metrics is None else None, | |
ignore_keys=ignore_keys, | |
metric_key_prefix=metric_key_prefix, | |
) | |
finally: | |
self.compute_metrics = compute_metrics | |
total_batch_size = self.args.eval_batch_size * self.args.world_size | |
if f"{metric_key_prefix}_jit_compilation_time" in output.metrics: | |
start_time += output.metrics[f"{metric_key_prefix}_jit_compilation_time"] | |
output.metrics.update( | |
speed_metrics( | |
metric_key_prefix, | |
start_time, | |
num_samples=output.num_samples, | |
num_steps=math.ceil(output.num_samples / total_batch_size), | |
) | |
) | |
if self.post_process_function is not None and self.compute_metrics is not None and self.args.should_save: | |
# Only the main node write the results by default | |
eval_preds = self.post_process_function(eval_examples, eval_dataset, output) | |
metrics = self.compute_metrics(eval_preds) | |
# Prefix all keys with metric_key_prefix + '_' | |
for key in list(metrics.keys()): | |
if not key.startswith(f"{metric_key_prefix}_"): | |
metrics[f"{metric_key_prefix}_{key}"] = metrics.pop(key) | |
metrics.update(output.metrics) | |
else: | |
metrics = output.metrics | |
if self.args.should_log: | |
# Only the main node log the results by default | |
self.log(metrics) | |
if self.args.tpu_metrics_debug or self.args.debug: | |
# tpu-comment: Logging debug metrics for PyTorch/XLA (compile, execute times, ops, etc.) | |
xm.master_print(met.metrics_report()) | |
self.control = self.callback_handler.on_evaluate(self.args, self.state, self.control, metrics) | |
return metrics | |
def predict( | |
self, predict_dataset, predict_examples, ignore_keys=None, metric_key_prefix: str = "test", **gen_kwargs | |
): | |
self._gen_kwargs = gen_kwargs.copy() | |
predict_dataloader = self.get_test_dataloader(predict_dataset) | |
# Temporarily disable metric computation, we will do it in the loop here. | |
compute_metrics = self.compute_metrics | |
self.compute_metrics = None | |
start_time = time.time() | |
eval_loop = self.prediction_loop if self.args.use_legacy_prediction_loop else self.evaluation_loop | |
try: | |
output = eval_loop( | |
predict_dataloader, | |
description="Prediction", | |
# No point gathering the predictions if there are no metrics, otherwise we defer to | |
# self.args.prediction_loss_only | |
prediction_loss_only=True if compute_metrics is None else None, | |
ignore_keys=ignore_keys, | |
metric_key_prefix=metric_key_prefix, | |
) | |
finally: | |
self.compute_metrics = compute_metrics | |
total_batch_size = self.args.eval_batch_size * self.args.world_size | |
if f"{metric_key_prefix}_jit_compilation_time" in output.metrics: | |
start_time += output.metrics[f"{metric_key_prefix}_jit_compilation_time"] | |
output.metrics.update( | |
speed_metrics( | |
metric_key_prefix, | |
start_time, | |
num_samples=output.num_samples, | |
num_steps=math.ceil(output.num_samples / total_batch_size), | |
) | |
) | |
if self.post_process_function is None or self.compute_metrics is None: | |
return output | |
predictions = self.post_process_function(predict_examples, predict_dataset, output, "predict") | |
metrics = self.compute_metrics(predictions) | |
# Prefix all keys with metric_key_prefix + '_' | |
for key in list(metrics.keys()): | |
if not key.startswith(f"{metric_key_prefix}_"): | |
metrics[f"{metric_key_prefix}_{key}"] = metrics.pop(key) | |
metrics.update(output.metrics) | |
return PredictionOutput(predictions=predictions.predictions, label_ids=predictions.label_ids, metrics=metrics) | |