Spaces:
Paused
Paused
| # Copyright 2020 The HuggingFace Team. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import unittest | |
| from transformers import ( | |
| MODEL_FOR_CAUSAL_LM_MAPPING, | |
| TF_MODEL_FOR_CAUSAL_LM_MAPPING, | |
| TextGenerationPipeline, | |
| logging, | |
| pipeline, | |
| ) | |
| from transformers.testing_utils import ( | |
| CaptureLogger, | |
| is_pipeline_test, | |
| require_accelerate, | |
| require_tf, | |
| require_torch, | |
| require_torch_gpu, | |
| require_torch_or_tf, | |
| ) | |
| from .test_pipelines_common import ANY | |
| class TextGenerationPipelineTests(unittest.TestCase): | |
| model_mapping = MODEL_FOR_CAUSAL_LM_MAPPING | |
| tf_model_mapping = TF_MODEL_FOR_CAUSAL_LM_MAPPING | |
| def test_small_model_pt(self): | |
| text_generator = pipeline(task="text-generation", model="sshleifer/tiny-ctrl", framework="pt") | |
| # Using `do_sample=False` to force deterministic output | |
| outputs = text_generator("This is a test", do_sample=False) | |
| self.assertEqual( | |
| outputs, | |
| [ | |
| { | |
| "generated_text": ( | |
| "This is a test ☃ ☃ segmental segmental segmental 议议eski eski flutter flutter Lacy oscope." | |
| " oscope. FiliFili@@" | |
| ) | |
| } | |
| ], | |
| ) | |
| outputs = text_generator(["This is a test", "This is a second test"]) | |
| self.assertEqual( | |
| outputs, | |
| [ | |
| [ | |
| { | |
| "generated_text": ( | |
| "This is a test ☃ ☃ segmental segmental segmental 议议eski eski flutter flutter Lacy oscope." | |
| " oscope. FiliFili@@" | |
| ) | |
| } | |
| ], | |
| [ | |
| { | |
| "generated_text": ( | |
| "This is a second test ☃ segmental segmental segmental 议议eski eski flutter flutter Lacy" | |
| " oscope. oscope. FiliFili@@" | |
| ) | |
| } | |
| ], | |
| ], | |
| ) | |
| outputs = text_generator("This is a test", do_sample=True, num_return_sequences=2, return_tensors=True) | |
| self.assertEqual( | |
| outputs, | |
| [ | |
| {"generated_token_ids": ANY(list)}, | |
| {"generated_token_ids": ANY(list)}, | |
| ], | |
| ) | |
| text_generator.tokenizer.pad_token_id = text_generator.model.config.eos_token_id | |
| text_generator.tokenizer.pad_token = "<pad>" | |
| outputs = text_generator( | |
| ["This is a test", "This is a second test"], | |
| do_sample=True, | |
| num_return_sequences=2, | |
| batch_size=2, | |
| return_tensors=True, | |
| ) | |
| self.assertEqual( | |
| outputs, | |
| [ | |
| [ | |
| {"generated_token_ids": ANY(list)}, | |
| {"generated_token_ids": ANY(list)}, | |
| ], | |
| [ | |
| {"generated_token_ids": ANY(list)}, | |
| {"generated_token_ids": ANY(list)}, | |
| ], | |
| ], | |
| ) | |
| def test_small_model_tf(self): | |
| text_generator = pipeline(task="text-generation", model="sshleifer/tiny-ctrl", framework="tf") | |
| # Using `do_sample=False` to force deterministic output | |
| outputs = text_generator("This is a test", do_sample=False) | |
| self.assertEqual( | |
| outputs, | |
| [ | |
| { | |
| "generated_text": ( | |
| "This is a test FeyFeyFey(Croatis.), s.), Cannes Cannes Cannes 閲閲Cannes Cannes Cannes 攵" | |
| " please," | |
| ) | |
| } | |
| ], | |
| ) | |
| outputs = text_generator(["This is a test", "This is a second test"], do_sample=False) | |
| self.assertEqual( | |
| outputs, | |
| [ | |
| [ | |
| { | |
| "generated_text": ( | |
| "This is a test FeyFeyFey(Croatis.), s.), Cannes Cannes Cannes 閲閲Cannes Cannes Cannes 攵" | |
| " please," | |
| ) | |
| } | |
| ], | |
| [ | |
| { | |
| "generated_text": ( | |
| "This is a second test Chieftain Chieftain prefecture prefecture prefecture Cannes Cannes" | |
| " Cannes 閲閲Cannes Cannes Cannes 攵 please," | |
| ) | |
| } | |
| ], | |
| ], | |
| ) | |
| def get_test_pipeline(self, model, tokenizer, processor): | |
| text_generator = TextGenerationPipeline(model=model, tokenizer=tokenizer) | |
| return text_generator, ["This is a test", "Another test"] | |
| def test_stop_sequence_stopping_criteria(self): | |
| prompt = """Hello I believe in""" | |
| text_generator = pipeline("text-generation", model="hf-internal-testing/tiny-random-gpt2") | |
| output = text_generator(prompt) | |
| self.assertEqual( | |
| output, | |
| [{"generated_text": "Hello I believe in fe fe fe fe fe fe fe fe fe fe fe fe"}], | |
| ) | |
| output = text_generator(prompt, stop_sequence=" fe") | |
| self.assertEqual(output, [{"generated_text": "Hello I believe in fe"}]) | |
| def run_pipeline_test(self, text_generator, _): | |
| model = text_generator.model | |
| tokenizer = text_generator.tokenizer | |
| outputs = text_generator("This is a test") | |
| self.assertEqual(outputs, [{"generated_text": ANY(str)}]) | |
| self.assertTrue(outputs[0]["generated_text"].startswith("This is a test")) | |
| outputs = text_generator("This is a test", return_full_text=False) | |
| self.assertEqual(outputs, [{"generated_text": ANY(str)}]) | |
| self.assertNotIn("This is a test", outputs[0]["generated_text"]) | |
| text_generator = pipeline(task="text-generation", model=model, tokenizer=tokenizer, return_full_text=False) | |
| outputs = text_generator("This is a test") | |
| self.assertEqual(outputs, [{"generated_text": ANY(str)}]) | |
| self.assertNotIn("This is a test", outputs[0]["generated_text"]) | |
| outputs = text_generator("This is a test", return_full_text=True) | |
| self.assertEqual(outputs, [{"generated_text": ANY(str)}]) | |
| self.assertTrue(outputs[0]["generated_text"].startswith("This is a test")) | |
| outputs = text_generator(["This is great !", "Something else"], num_return_sequences=2, do_sample=True) | |
| self.assertEqual( | |
| outputs, | |
| [ | |
| [{"generated_text": ANY(str)}, {"generated_text": ANY(str)}], | |
| [{"generated_text": ANY(str)}, {"generated_text": ANY(str)}], | |
| ], | |
| ) | |
| if text_generator.tokenizer.pad_token is not None: | |
| outputs = text_generator( | |
| ["This is great !", "Something else"], num_return_sequences=2, batch_size=2, do_sample=True | |
| ) | |
| self.assertEqual( | |
| outputs, | |
| [ | |
| [{"generated_text": ANY(str)}, {"generated_text": ANY(str)}], | |
| [{"generated_text": ANY(str)}, {"generated_text": ANY(str)}], | |
| ], | |
| ) | |
| with self.assertRaises(ValueError): | |
| outputs = text_generator("test", return_full_text=True, return_text=True) | |
| with self.assertRaises(ValueError): | |
| outputs = text_generator("test", return_full_text=True, return_tensors=True) | |
| with self.assertRaises(ValueError): | |
| outputs = text_generator("test", return_text=True, return_tensors=True) | |
| # Empty prompt is slighly special | |
| # it requires BOS token to exist. | |
| # Special case for Pegasus which will always append EOS so will | |
| # work even without BOS. | |
| if ( | |
| text_generator.tokenizer.bos_token_id is not None | |
| or "Pegasus" in tokenizer.__class__.__name__ | |
| or "Git" in model.__class__.__name__ | |
| ): | |
| outputs = text_generator("") | |
| self.assertEqual(outputs, [{"generated_text": ANY(str)}]) | |
| else: | |
| with self.assertRaises((ValueError, AssertionError)): | |
| outputs = text_generator("") | |
| if text_generator.framework == "tf": | |
| # TF generation does not support max_new_tokens, and it's impossible | |
| # to control long generation with only max_length without | |
| # fancy calculation, dismissing tests for now. | |
| return | |
| # We don't care about infinite range models. | |
| # They already work. | |
| # Skip this test for XGLM, since it uses sinusoidal positional embeddings which are resized on-the-fly. | |
| EXTRA_MODELS_CAN_HANDLE_LONG_INPUTS = ["RwkvForCausalLM", "XGLMForCausalLM", "GPTNeoXForCausalLM"] | |
| if ( | |
| tokenizer.model_max_length < 10000 | |
| and text_generator.model.__class__.__name__ not in EXTRA_MODELS_CAN_HANDLE_LONG_INPUTS | |
| ): | |
| # Handling of large generations | |
| with self.assertRaises((RuntimeError, IndexError, ValueError, AssertionError)): | |
| text_generator("This is a test" * 500, max_new_tokens=20) | |
| outputs = text_generator("This is a test" * 500, handle_long_generation="hole", max_new_tokens=20) | |
| # Hole strategy cannot work | |
| with self.assertRaises(ValueError): | |
| text_generator( | |
| "This is a test" * 500, | |
| handle_long_generation="hole", | |
| max_new_tokens=tokenizer.model_max_length + 10, | |
| ) | |
| def test_small_model_pt_bloom_accelerate(self): | |
| import torch | |
| # Classic `model_kwargs` | |
| pipe = pipeline( | |
| model="hf-internal-testing/tiny-random-bloom", | |
| model_kwargs={"device_map": "auto", "torch_dtype": torch.bfloat16}, | |
| ) | |
| self.assertEqual(pipe.model.device, torch.device(0)) | |
| self.assertEqual(pipe.model.lm_head.weight.dtype, torch.bfloat16) | |
| out = pipe("This is a test") | |
| self.assertEqual( | |
| out, | |
| [ | |
| { | |
| "generated_text": ( | |
| "This is a test test test test test test test test test test test test test test test test" | |
| " test" | |
| ) | |
| } | |
| ], | |
| ) | |
| # Upgraded those two to real pipeline arguments (they just get sent for the model as they're unlikely to mean anything else.) | |
| pipe = pipeline(model="hf-internal-testing/tiny-random-bloom", device_map="auto", torch_dtype=torch.bfloat16) | |
| self.assertEqual(pipe.model.device, torch.device(0)) | |
| self.assertEqual(pipe.model.lm_head.weight.dtype, torch.bfloat16) | |
| out = pipe("This is a test") | |
| self.assertEqual( | |
| out, | |
| [ | |
| { | |
| "generated_text": ( | |
| "This is a test test test test test test test test test test test test test test test test" | |
| " test" | |
| ) | |
| } | |
| ], | |
| ) | |
| # torch_dtype will be automatically set to float32 if not provided - check: https://github.com/huggingface/transformers/pull/20602 | |
| pipe = pipeline(model="hf-internal-testing/tiny-random-bloom", device_map="auto") | |
| self.assertEqual(pipe.model.device, torch.device(0)) | |
| self.assertEqual(pipe.model.lm_head.weight.dtype, torch.float32) | |
| out = pipe("This is a test") | |
| self.assertEqual( | |
| out, | |
| [ | |
| { | |
| "generated_text": ( | |
| "This is a test test test test test test test test test test test test test test test test" | |
| " test" | |
| ) | |
| } | |
| ], | |
| ) | |
| def test_small_model_fp16(self): | |
| import torch | |
| pipe = pipeline(model="hf-internal-testing/tiny-random-bloom", device=0, torch_dtype=torch.float16) | |
| pipe("This is a test") | |
| def test_pipeline_accelerate_top_p(self): | |
| import torch | |
| pipe = pipeline(model="hf-internal-testing/tiny-random-bloom", device_map="auto", torch_dtype=torch.float16) | |
| pipe("This is a test", do_sample=True, top_p=0.5) | |
| def test_pipeline_length_setting_warning(self): | |
| prompt = """Hello world""" | |
| text_generator = pipeline("text-generation", model="hf-internal-testing/tiny-random-gpt2") | |
| if text_generator.model.framework == "tf": | |
| logger = logging.get_logger("transformers.generation.tf_utils") | |
| else: | |
| logger = logging.get_logger("transformers.generation.utils") | |
| logger_msg = "Both `max_new_tokens`" # The beggining of the message to be checked in this test | |
| # Both are set by the user -> log warning | |
| with CaptureLogger(logger) as cl: | |
| _ = text_generator(prompt, max_length=10, max_new_tokens=1) | |
| self.assertIn(logger_msg, cl.out) | |
| # The user only sets one -> no warning | |
| with CaptureLogger(logger) as cl: | |
| _ = text_generator(prompt, max_new_tokens=1) | |
| self.assertNotIn(logger_msg, cl.out) | |
| with CaptureLogger(logger) as cl: | |
| _ = text_generator(prompt, max_length=10) | |
| self.assertNotIn(logger_msg, cl.out) | |