| | """ |
| | Custom BPE Tokenizer for SLM v1. |
| | 16,384 vocabulary size optimized for conversational use. |
| | """ |
| |
|
| | import os |
| | import json |
| | from typing import List, Optional, Union |
| | from tokenizers import Tokenizer, models, trainers, pre_tokenizers, processors, decoders |
| | from tokenizers.normalizers import NFKC, Lowercase, Sequence |
| |
|
| |
|
| | class SLMTokenizer: |
| | """Custom BPE tokenizer for the SLM model. |
| | |
| | Features: |
| | - 16,384 token vocabulary (memory efficient) |
| | - Special tokens for conversation format |
| | - Compatible with the model's embedding layer |
| | """ |
| |
|
| | |
| | PAD_TOKEN = "<|pad|>" |
| | BOS_TOKEN = "<|bos|>" |
| | EOS_TOKEN = "<|eos|>" |
| | UNK_TOKEN = "<|unk|>" |
| | USER_TOKEN = "<|user|>" |
| | ASSISTANT_TOKEN = "<|assistant|>" |
| |
|
| | SPECIAL_TOKENS = [PAD_TOKEN, BOS_TOKEN, EOS_TOKEN, UNK_TOKEN, USER_TOKEN, ASSISTANT_TOKEN] |
| |
|
| | def __init__(self, tokenizer: Optional[Tokenizer] = None): |
| | """Initialize tokenizer. |
| | |
| | Args: |
| | tokenizer: Pre-trained HuggingFace tokenizer object |
| | """ |
| | self.tokenizer = tokenizer |
| | self._setup_special_token_ids() |
| |
|
| | def _setup_special_token_ids(self): |
| | """Setup special token IDs for easy access.""" |
| | if self.tokenizer is not None: |
| | self.pad_token_id = self.tokenizer.token_to_id(self.PAD_TOKEN) |
| | self.bos_token_id = self.tokenizer.token_to_id(self.BOS_TOKEN) |
| | self.eos_token_id = self.tokenizer.token_to_id(self.EOS_TOKEN) |
| | self.unk_token_id = self.tokenizer.token_to_id(self.UNK_TOKEN) |
| | self.user_token_id = self.tokenizer.token_to_id(self.USER_TOKEN) |
| | self.assistant_token_id = self.tokenizer.token_to_id(self.ASSISTANT_TOKEN) |
| |
|
| | @classmethod |
| | def train( |
| | cls, |
| | files: List[str], |
| | vocab_size: int = 16384, |
| | min_frequency: int = 2, |
| | save_path: Optional[str] = None, |
| | ) -> "SLMTokenizer": |
| | """Train a new BPE tokenizer on the given files. |
| | |
| | Args: |
| | files: List of text file paths to train on |
| | vocab_size: Size of vocabulary (default 16,384) |
| | min_frequency: Minimum token frequency to include |
| | save_path: Optional path to save the trained tokenizer |
| | |
| | Returns: |
| | Trained SLMTokenizer instance |
| | """ |
| | print(f"Training BPE tokenizer with vocab_size={vocab_size}...") |
| | print(f"Training files: {files}") |
| |
|
| | |
| | tokenizer = Tokenizer(models.BPE(unk_token=cls.UNK_TOKEN)) |
| |
|
| | |
| | |
| | tokenizer.normalizer = NFKC() |
| |
|
| | |
| | tokenizer.pre_tokenizer = pre_tokenizers.ByteLevel(add_prefix_space=False) |
| |
|
| | |
| | tokenizer.decoder = decoders.ByteLevel() |
| |
|
| | |
| | trainer = trainers.BpeTrainer( |
| | vocab_size=vocab_size, |
| | min_frequency=min_frequency, |
| | special_tokens=cls.SPECIAL_TOKENS, |
| | show_progress=True, |
| | ) |
| |
|
| | |
| | tokenizer.train(files, trainer) |
| |
|
| | |
| | tokenizer.post_processor = processors.TemplateProcessing( |
| | single=f"{cls.BOS_TOKEN} $A {cls.EOS_TOKEN}", |
| | pair=f"{cls.BOS_TOKEN} $A {cls.EOS_TOKEN} {cls.BOS_TOKEN} $B {cls.EOS_TOKEN}", |
| | special_tokens=[ |
| | (cls.BOS_TOKEN, tokenizer.token_to_id(cls.BOS_TOKEN)), |
| | (cls.EOS_TOKEN, tokenizer.token_to_id(cls.EOS_TOKEN)), |
| | ], |
| | ) |
| |
|
| | print(f"Tokenizer trained! Vocabulary size: {tokenizer.get_vocab_size()}") |
| |
|
| | |
| | instance = cls(tokenizer) |
| |
|
| | |
| | if save_path: |
| | instance.save(save_path) |
| |
|
| | return instance |
| |
|
| | @classmethod |
| | def from_file(cls, path: str) -> "SLMTokenizer": |
| | """Load a tokenizer from a saved file. |
| | |
| | Args: |
| | path: Path to the tokenizer.json file |
| | |
| | Returns: |
| | Loaded SLMTokenizer instance |
| | """ |
| | tokenizer = Tokenizer.from_file(path) |
| | return cls(tokenizer) |
| |
|
| | def save(self, path: str): |
| | """Save the tokenizer to a file. |
| | |
| | Args: |
| | path: Path to save the tokenizer (directory or file) |
| | """ |
| | if os.path.isdir(path): |
| | save_path = os.path.join(path, "tokenizer.json") |
| | else: |
| | save_path = path |
| | os.makedirs(os.path.dirname(save_path), exist_ok=True) |
| |
|
| | self.tokenizer.save(save_path) |
| | print(f"Tokenizer saved to: {save_path}") |
| |
|
| | |
| | config_path = save_path.replace("tokenizer.json", "tokenizer_config.json") |
| | config = { |
| | "vocab_size": self.vocab_size, |
| | "pad_token": self.PAD_TOKEN, |
| | "bos_token": self.BOS_TOKEN, |
| | "eos_token": self.EOS_TOKEN, |
| | "unk_token": self.UNK_TOKEN, |
| | "user_token": self.USER_TOKEN, |
| | "assistant_token": self.ASSISTANT_TOKEN, |
| | } |
| | with open(config_path, "w") as f: |
| | json.dump(config, f, indent=2) |
| | print(f"Tokenizer config saved to: {config_path}") |
| |
|
| | def encode( |
| | self, |
| | text: str, |
| | add_special_tokens: bool = True, |
| | max_length: Optional[int] = None, |
| | padding: bool = False, |
| | truncation: bool = False, |
| | ) -> List[int]: |
| | """Encode text to token IDs. |
| | |
| | Args: |
| | text: Input text string |
| | add_special_tokens: Whether to add BOS/EOS tokens |
| | max_length: Maximum sequence length |
| | padding: Whether to pad to max_length |
| | truncation: Whether to truncate to max_length |
| | |
| | Returns: |
| | List of token IDs |
| | """ |
| | |
| | if add_special_tokens: |
| | encoding = self.tokenizer.encode(text) |
| | else: |
| | encoding = self.tokenizer.encode(text, add_special_tokens=False) |
| |
|
| | ids = encoding.ids |
| |
|
| | |
| | if truncation and max_length and len(ids) > max_length: |
| | ids = ids[:max_length] |
| | |
| | if add_special_tokens and ids[-1] != self.eos_token_id: |
| | ids[-1] = self.eos_token_id |
| |
|
| | |
| | if padding and max_length and len(ids) < max_length: |
| | ids = ids + [self.pad_token_id] * (max_length - len(ids)) |
| |
|
| | return ids |
| |
|
| | def decode(self, ids: List[int], skip_special_tokens: bool = True) -> str: |
| | """Decode token IDs to text. |
| | |
| | Args: |
| | ids: List of token IDs |
| | skip_special_tokens: Whether to remove special tokens |
| | |
| | Returns: |
| | Decoded text string |
| | """ |
| | return self.tokenizer.decode(ids, skip_special_tokens=skip_special_tokens) |
| |
|
| | def encode_conversation( |
| | self, |
| | user_message: str, |
| | assistant_message: Optional[str] = None, |
| | max_length: Optional[int] = None, |
| | ) -> List[int]: |
| | """Encode a conversation turn. |
| | |
| | Format: <|bos|><|user|>message<|assistant|>response<|eos|> |
| | |
| | Args: |
| | user_message: The user's message |
| | assistant_message: Optional assistant response |
| | max_length: Maximum sequence length |
| | |
| | Returns: |
| | List of token IDs |
| | """ |
| | |
| | if assistant_message: |
| | text = f"{self.USER_TOKEN}{user_message}{self.ASSISTANT_TOKEN}{assistant_message}" |
| | else: |
| | |
| | text = f"{self.USER_TOKEN}{user_message}{self.ASSISTANT_TOKEN}" |
| |
|
| | return self.encode(text, add_special_tokens=True, max_length=max_length, truncation=True) |
| |
|
| | @property |
| | def vocab_size(self) -> int: |
| | """Get vocabulary size.""" |
| | return self.tokenizer.get_vocab_size() |
| |
|
| | def get_vocab(self) -> dict: |
| | """Get the vocabulary as a dictionary.""" |
| | return self.tokenizer.get_vocab() |
| |
|
| | def __len__(self) -> int: |
| | """Return vocabulary size.""" |
| | return self.vocab_size |
| |
|
| | def __call__( |
| | self, |
| | text: Union[str, List[str]], |
| | max_length: Optional[int] = None, |
| | padding: bool = False, |
| | truncation: bool = False, |
| | return_tensors: Optional[str] = None, |
| | ) -> dict: |
| | """Tokenize text (HuggingFace-style interface). |
| | |
| | Args: |
| | text: Input text or list of texts |
| | max_length: Maximum sequence length |
| | padding: Whether to pad sequences |
| | truncation: Whether to truncate sequences |
| | return_tensors: If "pt", return PyTorch tensors |
| | |
| | Returns: |
| | Dictionary with input_ids and attention_mask |
| | """ |
| | if isinstance(text, str): |
| | text = [text] |
| |
|
| | all_ids = [] |
| | for t in text: |
| | ids = self.encode( |
| | t, |
| | max_length=max_length, |
| | padding=padding, |
| | truncation=truncation, |
| | ) |
| | all_ids.append(ids) |
| |
|
| | |
| | attention_mask = [[1 if id != self.pad_token_id else 0 for id in ids] for ids in all_ids] |
| |
|
| | result = { |
| | "input_ids": all_ids, |
| | "attention_mask": attention_mask, |
| | } |
| |
|
| | if return_tensors == "pt": |
| | import torch |
| | result["input_ids"] = torch.tensor(all_ids) |
| | result["attention_mask"] = torch.tensor(attention_mask) |
| |
|
| | return result |
| |
|