# Copyright 2020 The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import unittest from transformers import ( MODEL_FOR_CAUSAL_LM_MAPPING, TF_MODEL_FOR_CAUSAL_LM_MAPPING, TextGenerationPipeline, logging, pipeline, ) from transformers.testing_utils import ( CaptureLogger, is_pipeline_test, require_accelerate, require_tf, require_torch, require_torch_gpu, require_torch_or_tf, ) from .test_pipelines_common import ANY @is_pipeline_test @require_torch_or_tf class TextGenerationPipelineTests(unittest.TestCase): model_mapping = MODEL_FOR_CAUSAL_LM_MAPPING tf_model_mapping = TF_MODEL_FOR_CAUSAL_LM_MAPPING @require_torch def test_small_model_pt(self): text_generator = pipeline(task="text-generation", model="sshleifer/tiny-ctrl", framework="pt") # Using `do_sample=False` to force deterministic output outputs = text_generator("This is a test", do_sample=False) self.assertEqual( outputs, [ { "generated_text": ( "This is a test ☃ ☃ segmental segmental segmental 议议eski eski flutter flutter Lacy oscope." " oscope. FiliFili@@" ) } ], ) outputs = text_generator(["This is a test", "This is a second test"]) self.assertEqual( outputs, [ [ { "generated_text": ( "This is a test ☃ ☃ segmental segmental segmental 议议eski eski flutter flutter Lacy oscope." " oscope. FiliFili@@" ) } ], [ { "generated_text": ( "This is a second test ☃ segmental segmental segmental 议议eski eski flutter flutter Lacy" " oscope. oscope. FiliFili@@" ) } ], ], ) outputs = text_generator("This is a test", do_sample=True, num_return_sequences=2, return_tensors=True) self.assertEqual( outputs, [ {"generated_token_ids": ANY(list)}, {"generated_token_ids": ANY(list)}, ], ) text_generator.tokenizer.pad_token_id = text_generator.model.config.eos_token_id text_generator.tokenizer.pad_token = "" outputs = text_generator( ["This is a test", "This is a second test"], do_sample=True, num_return_sequences=2, batch_size=2, return_tensors=True, ) self.assertEqual( outputs, [ [ {"generated_token_ids": ANY(list)}, {"generated_token_ids": ANY(list)}, ], [ {"generated_token_ids": ANY(list)}, {"generated_token_ids": ANY(list)}, ], ], ) @require_tf def test_small_model_tf(self): text_generator = pipeline(task="text-generation", model="sshleifer/tiny-ctrl", framework="tf") # Using `do_sample=False` to force deterministic output outputs = text_generator("This is a test", do_sample=False) self.assertEqual( outputs, [ { "generated_text": ( "This is a test FeyFeyFey(Croatis.), s.), Cannes Cannes Cannes 閲閲Cannes Cannes Cannes 攵" " please," ) } ], ) outputs = text_generator(["This is a test", "This is a second test"], do_sample=False) self.assertEqual( outputs, [ [ { "generated_text": ( "This is a test FeyFeyFey(Croatis.), s.), Cannes Cannes Cannes 閲閲Cannes Cannes Cannes 攵" " please," ) } ], [ { "generated_text": ( "This is a second test Chieftain Chieftain prefecture prefecture prefecture Cannes Cannes" " Cannes 閲閲Cannes Cannes Cannes 攵 please," ) } ], ], ) def get_test_pipeline(self, model, tokenizer, processor): text_generator = TextGenerationPipeline(model=model, tokenizer=tokenizer) return text_generator, ["This is a test", "Another test"] def test_stop_sequence_stopping_criteria(self): prompt = """Hello I believe in""" text_generator = pipeline("text-generation", model="hf-internal-testing/tiny-random-gpt2") output = text_generator(prompt) self.assertEqual( output, [{"generated_text": "Hello I believe in fe fe fe fe fe fe fe fe fe fe fe fe"}], ) output = text_generator(prompt, stop_sequence=" fe") self.assertEqual(output, [{"generated_text": "Hello I believe in fe"}]) def run_pipeline_test(self, text_generator, _): model = text_generator.model tokenizer = text_generator.tokenizer outputs = text_generator("This is a test") self.assertEqual(outputs, [{"generated_text": ANY(str)}]) self.assertTrue(outputs[0]["generated_text"].startswith("This is a test")) outputs = text_generator("This is a test", return_full_text=False) self.assertEqual(outputs, [{"generated_text": ANY(str)}]) self.assertNotIn("This is a test", outputs[0]["generated_text"]) text_generator = pipeline(task="text-generation", model=model, tokenizer=tokenizer, return_full_text=False) outputs = text_generator("This is a test") self.assertEqual(outputs, [{"generated_text": ANY(str)}]) self.assertNotIn("This is a test", outputs[0]["generated_text"]) outputs = text_generator("This is a test", return_full_text=True) self.assertEqual(outputs, [{"generated_text": ANY(str)}]) self.assertTrue(outputs[0]["generated_text"].startswith("This is a test")) outputs = text_generator(["This is great !", "Something else"], num_return_sequences=2, do_sample=True) self.assertEqual( outputs, [ [{"generated_text": ANY(str)}, {"generated_text": ANY(str)}], [{"generated_text": ANY(str)}, {"generated_text": ANY(str)}], ], ) if text_generator.tokenizer.pad_token is not None: outputs = text_generator( ["This is great !", "Something else"], num_return_sequences=2, batch_size=2, do_sample=True ) self.assertEqual( outputs, [ [{"generated_text": ANY(str)}, {"generated_text": ANY(str)}], [{"generated_text": ANY(str)}, {"generated_text": ANY(str)}], ], ) with self.assertRaises(ValueError): outputs = text_generator("test", return_full_text=True, return_text=True) with self.assertRaises(ValueError): outputs = text_generator("test", return_full_text=True, return_tensors=True) with self.assertRaises(ValueError): outputs = text_generator("test", return_text=True, return_tensors=True) # Empty prompt is slighly special # it requires BOS token to exist. # Special case for Pegasus which will always append EOS so will # work even without BOS. if ( text_generator.tokenizer.bos_token_id is not None or "Pegasus" in tokenizer.__class__.__name__ or "Git" in model.__class__.__name__ ): outputs = text_generator("") self.assertEqual(outputs, [{"generated_text": ANY(str)}]) else: with self.assertRaises((ValueError, AssertionError)): outputs = text_generator("") if text_generator.framework == "tf": # TF generation does not support max_new_tokens, and it's impossible # to control long generation with only max_length without # fancy calculation, dismissing tests for now. return # We don't care about infinite range models. # They already work. # Skip this test for XGLM, since it uses sinusoidal positional embeddings which are resized on-the-fly. if tokenizer.model_max_length < 10000 and "XGLM" not in tokenizer.__class__.__name__: # Handling of large generations with self.assertRaises((RuntimeError, IndexError, ValueError, AssertionError)): text_generator("This is a test" * 500, max_new_tokens=20) outputs = text_generator("This is a test" * 500, handle_long_generation="hole", max_new_tokens=20) # Hole strategy cannot work with self.assertRaises(ValueError): text_generator( "This is a test" * 500, handle_long_generation="hole", max_new_tokens=tokenizer.model_max_length + 10, ) @require_torch @require_accelerate @require_torch_gpu def test_small_model_pt_bloom_accelerate(self): import torch # Classic `model_kwargs` pipe = pipeline( model="hf-internal-testing/tiny-random-bloom", model_kwargs={"device_map": "auto", "torch_dtype": torch.bfloat16}, ) self.assertEqual(pipe.model.device, torch.device(0)) self.assertEqual(pipe.model.lm_head.weight.dtype, torch.bfloat16) out = pipe("This is a test") self.assertEqual( out, [ { "generated_text": ( "This is a test test test test test test test test test test test test test test test test" " test" ) } ], ) # Upgraded those two to real pipeline arguments (they just get sent for the model as they're unlikely to mean anything else.) pipe = pipeline(model="hf-internal-testing/tiny-random-bloom", device_map="auto", torch_dtype=torch.bfloat16) self.assertEqual(pipe.model.device, torch.device(0)) self.assertEqual(pipe.model.lm_head.weight.dtype, torch.bfloat16) out = pipe("This is a test") self.assertEqual( out, [ { "generated_text": ( "This is a test test test test test test test test test test test test test test test test" " test" ) } ], ) # torch_dtype will be automatically set to float32 if not provided - check: https://github.com/huggingface/transformers/pull/20602 pipe = pipeline(model="hf-internal-testing/tiny-random-bloom", device_map="auto") self.assertEqual(pipe.model.device, torch.device(0)) self.assertEqual(pipe.model.lm_head.weight.dtype, torch.float32) out = pipe("This is a test") self.assertEqual( out, [ { "generated_text": ( "This is a test test test test test test test test test test test test test test test test" " test" ) } ], ) @require_torch @require_torch_gpu def test_small_model_fp16(self): import torch pipe = pipeline(model="hf-internal-testing/tiny-random-bloom", device=0, torch_dtype=torch.float16) pipe("This is a test") @require_torch @require_accelerate @require_torch_gpu def test_pipeline_accelerate_top_p(self): import torch pipe = pipeline(model="hf-internal-testing/tiny-random-bloom", device_map="auto", torch_dtype=torch.float16) pipe("This is a test", do_sample=True, top_p=0.5) def test_pipeline_length_setting_warning(self): prompt = """Hello world""" text_generator = pipeline("text-generation", model="hf-internal-testing/tiny-random-gpt2") if text_generator.model.framework == "tf": logger = logging.get_logger("transformers.generation.tf_utils") else: logger = logging.get_logger("transformers.generation.utils") logger_msg = "Both `max_new_tokens`" # The beggining of the message to be checked in this test # Both are set by the user -> log warning with CaptureLogger(logger) as cl: _ = text_generator(prompt, max_length=10, max_new_tokens=1) self.assertIn(logger_msg, cl.out) # The user only sets one -> no warning with CaptureLogger(logger) as cl: _ = text_generator(prompt, max_new_tokens=1) self.assertNotIn(logger_msg, cl.out) with CaptureLogger(logger) as cl: _ = text_generator(prompt, max_length=10) self.assertNotIn(logger_msg, cl.out)