"""Module for testing prompt tokenizers.""" import json import logging import unittest from copy import deepcopy from pathlib import Path from typing import Optional import pytest from datasets import load_dataset from transformers import AddedToken, AutoTokenizer, LlamaTokenizer from axolotl.prompt_strategies.alpaca_chat import NoSystemPrompter from axolotl.prompt_strategies.alpaca_w_system import ( InstructionWSystemPromptTokenizingStrategy, SystemDataPrompter, ) from axolotl.prompt_strategies.llama2_chat import ( Llama2ChatPrompter, LLama2ChatTokenizingStrategy, ) from axolotl.prompt_strategies.orpo.chat_template import load from axolotl.prompt_strategies.sharegpt import GlaiveShareGPTPromptTokenizingStrategy from axolotl.prompt_tokenizers import ( AlpacaPromptTokenizingStrategy, ShareGPTPromptTokenizingStrategy, ) from axolotl.prompters import AlpacaPrompter, PromptStyle, ShareGPTPrompterV2 from axolotl.utils.dict import DictDefault LOG = logging.getLogger("axolotl") test_data = { "multi_turn_sys": { "conversations": [ {"from": "system", "value": "lorem"}, {"from": "human", "value": "abc"}, {"from": "gpt", "value": "ipsum"}, {"from": "human", "value": "123"}, {"from": "gpt", "value": "sit"}, ] }, "single_turn_sys": { "conversations": [ {"from": "system", "value": "lorem"}, {"from": "human", "value": "abc"}, {"from": "gpt", "value": "ipsum"}, ] }, "single_turn_no_sys": { "conversations": [ {"from": "human", "value": "abc"}, {"from": "gpt", "value": "ipsum"}, ] }, "multi_turn_no_sys": { "conversations": [ {"from": "human", "value": "abc"}, {"from": "gpt", "value": "ipsum"}, {"from": "human", "value": "123"}, {"from": "gpt", "value": "sit"}, ] }, } def prompt_strat(conversation, tokenizer): "Helper function to create a prompt strategy for testing." prompter = ShareGPTPrompterV2(conversation=conversation) return ShareGPTPromptTokenizingStrategy( prompter, tokenizer, False, 2048, ) class TestPromptTokenizationStrategies(unittest.TestCase): """ Test class for prompt tokenization strategies. """ _caplog: Optional[pytest.LogCaptureFixture] = None @pytest.fixture(autouse=True) def inject_fixtures(self, caplog): self._caplog = caplog def setUp(self) -> None: # pylint: disable=duplicate-code self.tokenizer = AutoTokenizer.from_pretrained("huggyllama/llama-7b") self.tokenizer.add_special_tokens( { "bos_token": "", "eos_token": "", "unk_token": "", } ) def test_sharegpt_integration(self): with open( Path(__file__).parent / "fixtures/conversation.json", encoding="utf-8" ) as fin: data = fin.read() conversation = json.loads(data) with open( Path(__file__).parent / "fixtures/conversation.tokenized.json", encoding="utf-8", ) as fin: data = fin.read() tokenized_conversation = json.loads(data) prompter = ShareGPTPrompterV2() strat = ShareGPTPromptTokenizingStrategy( prompter, self.tokenizer, False, 2048, ) example = strat.tokenize_prompt(conversation) for fields in ["input_ids", "attention_mask", "labels"]: self.assertEqual(len(example[fields]), len(tokenized_conversation[fields])) self.assertEqual(example[fields], tokenized_conversation[fields]) def test_sharegpt_warnings_integration(self): with open( Path(__file__).parent / "fixtures/conversation.missingturns.json", encoding="utf-8", ) as fin: data = fin.read() conversation = json.loads(data) prompter = ShareGPTPrompterV2() strat = ShareGPTPromptTokenizingStrategy( prompter, self.tokenizer, False, 2048, ) with self._caplog.at_level(logging.WARNING): strat.tokenize_prompt(conversation) assert "assistant turn has empty text" in self._caplog.records[1].message def test_sharegpt_warnings_turns(self): conversation = { "conversations": [ {"from": "system", "value": "lorem"}, {"from": "gpt", "value": "ipsum"}, {"from": "human", "value": "dolor"}, {"from": "human", "value": "dolor"}, {"from": "gpt", "value": "sit"}, ] } prompter = ShareGPTPrompterV2() strat = ShareGPTPromptTokenizingStrategy( prompter, self.tokenizer, False, 2048, ) with self._caplog.at_level(logging.WARNING): strat.tokenize_prompt(conversation) assert ( "Role did not alternate between turns (gpt and human)" in self._caplog.records[0].message ) def test_sharegpt_llama(self): "Make sure the sharegpt/llama is tokenized and formatted correctly." strat = prompt_strat("llama-2", self.tokenizer) def tokenize(conv): return strat.tokenize_prompt(deepcopy(conv))["input_ids"] def decode(ids): return strat.tokenizer.decode(ids) # fmt: off # System message, multi-turn conversations mt_ids = tokenize(test_data['multi_turn_sys']) assert decode(mt_ids) == ' [INST] <>\nlorem\n<>\n\nabc [/INST] ipsum [INST] 123 [/INST] sit' assert mt_ids == [1, 518, 25580, 29962, 3532, 14816, 29903, 6778, 13, 29880, 3668, 13, 29966, 829, 14816, 29903, 6778, 13, 13, 10736, 518, 29914, 25580, 29962, 23421, 2, 1, 518, 25580, 29962, 29871, 29896, 29906, 29941, 518, 29914, 25580, 29962, 7845, 2] # System message, single-turn conversations st_ids = tokenize(test_data['single_turn_sys']) assert decode(st_ids) == ' [INST] <>\nlorem\n<>\n\nabc [/INST] ipsum' assert st_ids == [1, 518, 25580, 29962, 3532, 14816, 29903, 6778, 13, 29880, 3668, 13, 29966, 829, 14816, 29903, 6778, 13, 13, 10736, 518, 29914, 25580, 29962, 23421, 2] # No system message, single-turn ns_ids = tokenize(test_data['single_turn_no_sys']) assert decode(ns_ids) == ' [INST] abc [/INST] ipsum' assert ns_ids == [1, 518, 25580, 29962, 25638, 518, 29914, 25580, 29962, 23421, 2] # No system message, multi-turn ns_mt_ids = tokenize(test_data['multi_turn_no_sys']) assert decode(ns_mt_ids) == ' [INST] abc [/INST] ipsum [INST] 123 [/INST] sit' assert ns_mt_ids == [1, 518, 25580, 29962, 25638, 518, 29914, 25580, 29962, 23421, 2, 1, 518, 25580, 29962, 29871, 29896, 29906, 29941, 518, 29914, 25580, 29962, 7845, 2] # fmt: on def test_sharegpt_mistral(self): "Make sure the sharegpt/mistral is tokenized and formatted correctly." strat = prompt_strat("mistral", self.tokenizer) def tokenize(conv): return strat.tokenize_prompt(deepcopy(conv))["input_ids"] def decode(ids): return strat.tokenizer.decode(ids) # fmt: off # System message, multi-turn conversations mt_ids = tokenize(test_data['multi_turn_sys']) assert decode(mt_ids) == ' [INST] lorem\nabc [/INST] ipsum [INST] 123 [/INST] sit' assert mt_ids == [1, 518, 25580, 29962, 29871, 301, 3668, 13, 10736, 518, 29914, 25580, 29962, 23421, 2, 518, 25580, 29962, 29871, 29896, 29906, 29941, 518, 29914, 25580, 29962, 7845, 2] # System message, single-turn conversations st_ids = tokenize(test_data['single_turn_sys']) assert decode(st_ids) == ' [INST] lorem\nabc [/INST] ipsum' assert st_ids == [1, 518, 25580, 29962, 29871, 301, 3668, 13, 10736, 518, 29914, 25580, 29962, 23421, 2] # No system message, single-turn ns_ids = tokenize(test_data['single_turn_no_sys']) assert decode(ns_ids) == ' [INST] abc [/INST] ipsum' assert ns_ids == [1, 518, 25580, 29962, 25638, 518, 29914, 25580, 29962, 23421, 2] # No system message, multi-turn ns_mt_ids = tokenize(test_data['multi_turn_no_sys']) assert decode(ns_mt_ids) == ' [INST] abc [/INST] ipsum [INST] 123 [/INST] sit' assert ns_mt_ids == [1, 518, 25580, 29962, 25638, 518, 29914, 25580, 29962, 23421, 2, 518, 25580, 29962, 29871, 29896, 29906, 29941, 518, 29914, 25580, 29962, 7845, 2] # fmt: on def test_sharegpt_changes_roles(self): conversation = { "roles": ["USER", "CHARACTER"], "conversations": [ {"from": "system", "value": "lorem"}, {"from": "gpt", "value": "ipsum"}, {"from": "human", "value": "dolor"}, {"from": "gpt", "value": "sit"}, ], } prompter = ShareGPTPrompterV2() strat = ShareGPTPromptTokenizingStrategy( prompter, self.tokenizer, False, 2048, ) with self._caplog.at_level(logging.WARNING): res = strat.tokenize_prompt(conversation) assert "CHARACTER" in self.tokenizer.decode(res["input_ids"]) def test_sharegpt_assistant_label_ignore(self): conversation = { "roles": ["user", "assistant"], "conversations": [ {"from": "system", "value": "lorem"}, {"from": "gpt", "value": "ipsum"}, {"from": "human", "value": "dolor"}, {"from": "gpt", "value": "sit"}, ], } prompter = ShareGPTPrompterV2() strat = ShareGPTPromptTokenizingStrategy( prompter, self.tokenizer, False, 2048, ) with self._caplog.at_level(logging.WARNING): res = strat.tokenize_prompt(conversation) idx = res["input_ids"].index(20255) # assistant token assert res["labels"][idx] == -100 def test_glaive_tool_label_ignore(self): conversation = { "system": "SYSTEM: This is a system prompt", "chat": "USER: Can you book a flight for me from New York to London? ASSISTANT: I'm sorry, but I don't have the capability to book flights. <|endoftext|>", } prompter = ShareGPTPrompterV2() strat = GlaiveShareGPTPromptTokenizingStrategy( prompter, self.tokenizer, False, 2048, ) with self._caplog.at_level(logging.WARNING): res = strat.tokenize_prompt(conversation) idx = res["input_ids"].index(13566) # assistant token assert res["labels"][idx] == -100 def test_no_sys_prompt(self): """ tests the interface between the user and assistant parts """ prompter = NoSystemPrompter() # pylint: disable=duplicate-code strat = AlpacaPromptTokenizingStrategy( prompter, self.tokenizer, False, 2048, ) sample = { "instruction": "hello cruel. lorem ipsum dolor sit amet.", "output": "world!", } example = strat.tokenize_prompt(sample) world_idx = example["input_ids"].index(3186) assert example["labels"][world_idx] == 3186 assert example["labels"][world_idx - 1] == -100 def test_alpaca(self): """ tests the interface between the user and assistant parts """ # pylint: disable=duplicate-code prompter = AlpacaPrompter() strat = AlpacaPromptTokenizingStrategy( prompter, self.tokenizer, False, 2048, ) sample = {"instruction": "hello!", "output": "Hi! How can I help?"} example = strat.tokenize_prompt(sample) world_idx = example["input_ids"].index(6324) assert example["labels"][world_idx] == 6324 assert example["labels"][world_idx - 1] == -100 class InstructionWSystemPromptTokenizingStrategyTest(unittest.TestCase): """ Test class for prompt tokenization strategies with sys prompt from the dataset """ def setUp(self) -> None: # pylint: disable=duplicate-code self.tokenizer = AutoTokenizer.from_pretrained("huggyllama/llama-7b") self.tokenizer.add_special_tokens( { "bos_token": "", "eos_token": "", "unk_token": "", } ) def test_system_alpaca(self): prompter = SystemDataPrompter(PromptStyle.CHAT.value) strat = InstructionWSystemPromptTokenizingStrategy( prompter, self.tokenizer, False, 2048, ) sample = { "system": "use cot", "instruction": "hello!", "output": "Hi! How can I help?", } example = strat.tokenize_prompt(sample) assert example["input_ids"][0:5] == [ 1, 28962, 1254, 12665, 29901, ] # "SYSTEM:" assert example["input_ids"][5:7] == [671, 20118] # " use cot" assert example["input_ids"][8] == 11889 # USER class Llama2ChatTokenizationTest(unittest.TestCase): """ Test class for prompt tokenization strategies with sys prompt from the dataset """ def setUp(self) -> None: # pylint: disable=duplicate-code self.tokenizer = LlamaTokenizer.from_pretrained("NousResearch/Llama-2-7b-hf") # woraround because official Meta repos are not open def test_llama2_chat_integration(self): with open( Path(__file__).parent / "fixtures/conversation.json", encoding="utf-8" ) as fin: data = fin.read() conversation = json.loads(data) with open( Path(__file__).parent / "fixtures/conversation.tokenized_llama2chat.json", encoding="utf-8", ) as fin: data = fin.read() tokenized_conversation = json.loads(data) prompter = Llama2ChatPrompter() strat = LLama2ChatTokenizingStrategy( prompter, self.tokenizer, False, 4096, ) example = strat.tokenize_prompt(conversation) for fields in ["input_ids", "attention_mask", "labels"]: self.assertEqual(len(example[fields]), len(tokenized_conversation[fields])) self.assertEqual(example[fields], tokenized_conversation[fields]) def compare_with_transformers_integration(self): # this needs transformers >= v4.31.0 from transformers.models.llama.tokenization_llama import B_SYS, E_SYS from transformers.pipelines.conversational import Conversation # from transformers.models.llama.tokenization_llama import DEFAULT_SYSTEM_PROMPT # broken as of 23/7/20 # see https://github.com/huggingface/transformers/pull/24935 # pylint: disable=C0103 DEFAULT_SYSTEM_PROMPT = """\ You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. Your answers should not include any harmful, unethical, racist, sexist, toxic, dangerous, or illegal content. Please ensure that your responses are socially unbiased and positive in nature. If a question does not make any sense, or is not factually coherent, explain why instead of answering something not correct. If you don't know the answer to a question, please don't share false information.""" with open( Path(__file__).parent / "fixtures/conversation.json", encoding="utf-8" ) as fin: data = fin.read() conversation = json.loads(data) with open( Path(__file__).parent / "fixtures/conversation.tokenized_llama2chat.json", encoding="utf-8", ) as fin: data = fin.read() tokenized_conversation = json.loads(data) user_input = [] answers = [] for msg in conversation["conversations"]: if msg["from"] == "human": user_input.append(msg["value"]) else: answers.append(msg["value"]) hf_conf = Conversation( text=user_input[-1], past_user_inputs=[B_SYS + DEFAULT_SYSTEM_PROMPT + E_SYS + user_input[0]] + user_input[1:-1], generated_responses=answers, ) # pylint: disable=W0212 hf_tokens = self.tokenizer._build_conversation_input_ids(hf_conf) self.assertEqual( hf_tokens, tokenized_conversation["input_ids"][: len(hf_tokens)] ) class OrpoTokenizationTest(unittest.TestCase): """test case for the ORPO tokenization""" def setUp(self) -> None: # pylint: disable=duplicate-code tokenizer = LlamaTokenizer.from_pretrained("mistralai/Mistral-7B-v0.1") tokenizer.add_special_tokens( { "eos_token": AddedToken( "<|im_end|>", rstrip=False, lstrip=False, normalized=False ) } ) tokenizer.add_tokens( [ AddedToken( "<|im_start|>", rstrip=False, lstrip=False, normalized=False ), ] ) self.tokenizer = tokenizer self.dataset = load_dataset( "argilla/ultrafeedback-binarized-preferences-cleaned", split="train" ).select([0]) def test_orpo_integration(self): strat = load( self.tokenizer, DictDefault({"train_on_inputs": False}), DictDefault({"chat_template": "chatml"}), ) res = strat.tokenize_prompt(self.dataset[0]) assert "rejected_input_ids" in res assert "rejected_labels" in res assert "input_ids" in res assert "labels" in res assert "prompt_attention_mask" in res assert len(res["rejected_input_ids"]) == len(res["rejected_labels"]) assert len(res["input_ids"]) == len(res["labels"]) assert len(res["input_ids"]) == len(res["prompt_attention_mask"]) assert res["rejected_labels"][0] == -100 assert res["rejected_input_ids"][-1] == res["rejected_labels"][-1] assert res["labels"][0] == -100 assert res["input_ids"][-1] == res["labels"][-1] assert res["prompt_attention_mask"][0] == 1 assert res["prompt_attention_mask"][-1] == 0 if __name__ == "__main__": unittest.main()