MTPw / tokenizer.py
teszenofficial's picture
Upload 4 files
77d62e8 verified
import sentencepiece as spm
import os
import json
class MTPTokenizer:
"""Tokenizer using SentencePiece BPE"""
def __init__(self, model_path=None):
self.sp = None
self.model_path = model_path
if model_path and os.path.exists(model_path):
self.load(model_path)
def train(self, corpus_path, vocab_size=4000, model_prefix='mtp_tokenizer'):
"""Train SentencePiece BPE tokenizer on corpus"""
# Extract text from JSONL corpus
texts = []
with open(corpus_path, 'r', encoding='utf-8') as f:
for line in f:
data = json.loads(line)
if 'instruction' in data:
texts.append(data['instruction'])
if 'response' in data:
texts.append(data['response'])
# Save temporary text file
temp_file = 'temp_corpus.txt'
with open(temp_file, 'w', encoding='utf-8') as f:
f.write('\n'.join(texts))
# Calculate optimal vocab size based on corpus
total_chars = sum(len(text) for text in texts)
max_vocab = min(vocab_size, int(total_chars * 0.15)) # Heuristic: ~15% of chars
print(f" β†’ Corpus stats: {len(texts)} texts, {total_chars} characters")
print(f" β†’ Adjusted vocab size: {max_vocab} (requested: {vocab_size})")
# Train SentencePiece with adjusted parameters
try:
spm.SentencePieceTrainer.train(
input=temp_file,
model_prefix=model_prefix,
vocab_size=max_vocab,
model_type='bpe',
pad_id=0,
unk_id=1,
bos_id=2,
eos_id=3,
character_coverage=1.0,
normalization_rule_name='identity',
num_threads=4,
split_digits=True,
allow_whitespace_only_pieces=False,
byte_fallback=False,
max_sentencepiece_length=16
)
except RuntimeError as e:
if "Vocabulary size too high" in str(e):
# Extract suggested max from error and retry
import re
match = re.search(r'value <= (\d+)', str(e))
if match:
suggested_max = int(match.group(1))
print(f" β†’ Retrying with vocab size: {suggested_max}")
spm.SentencePieceTrainer.train(
input=temp_file,
model_prefix=model_prefix,
vocab_size=suggested_max,
model_type='bpe',
pad_id=0,
unk_id=1,
bos_id=2,
eos_id=3,
character_coverage=1.0,
normalization_rule_name='identity',
num_threads=4,
split_digits=True,
allow_whitespace_only_pieces=False,
byte_fallback=False,
max_sentencepiece_length=16
)
else:
raise
else:
raise
# Clean up
os.remove(temp_file)
# Load the trained model
self.model_path = f"{model_prefix}.model"
self.load(self.model_path)
print(f"βœ“ Tokenizer trained: {self.vocab_size()} tokens")
print(f"βœ“ Model saved: {self.model_path}")
def load(self, model_path):
"""Load trained tokenizer"""
self.sp = spm.SentencePieceProcessor()
self.sp.load(model_path)
self.model_path = model_path
def encode(self, text):
"""Encode text to token IDs"""
if self.sp is None:
raise ValueError("Tokenizer not loaded. Train or load a model first.")
return self.sp.encode_as_ids(text)
def decode(self, ids):
"""Decode token IDs to text"""
if self.sp is None:
raise ValueError("Tokenizer not loaded. Train or load a model first.")
return self.sp.decode_ids(ids)
def vocab_size(self):
"""Get vocabulary size"""
if self.sp is None:
return 0
return self.sp.get_piece_size()
def bos_id(self):
"""Beginning of sentence token ID"""
return self.sp.bos_id()
def eos_id(self):
"""End of sentence token ID"""
return self.sp.eos_id()
def pad_id(self):
"""Padding token ID"""
return self.sp.pad_id()
def unk_id(self):
"""Unknown token ID"""
return self.sp.unk_id()