| |
| """Fix GGUF newline crash by adding byte-fallback tokens to the tokenizer. |
| |
| Problem: The SentencePiece Unigram tokenizer was trained without byte_fallback=True, |
| so characters like \n have no token representation. llama.cpp crashes when it |
| encounters these characters because there's no byte-fallback. |
| |
| Fix: |
| 1. Add 256 byte-fallback tokens (<0x00> .. <0xFF>) to tokenizer.json |
| 2. Resize model embeddings from 64000 -> 64256 |
| 3. Update config.json vocab_size |
| 4. Copy tokenizer.model for proper GGUF conversion |
| |
| Usage: |
| python scripts/fix_tokenizer_byte_fallback.py \ |
| --input outputs/hf_checkpoint-best \ |
| --output outputs/hf_checkpoint-best-fixed \ |
| --sp_model tokenizer/korean_sp/tokenizer.model |
| """ |
|
|
| import argparse |
| import json |
| import shutil |
| from pathlib import Path |
|
|
| import torch |
| from safetensors.torch import load_file, save_file |
|
|
|
|
| BYTE_FALLBACK_COUNT = 256 |
| BYTE_TOKEN_TEMPLATE = "<0x{:02X}>" |
|
|
|
|
| def fix_tokenizer_json(input_path: Path, output_path: Path): |
| """Add byte_fallback=True and 256 byte tokens to tokenizer.json.""" |
| with open(input_path) as f: |
| tok = json.load(f) |
|
|
| model = tok["model"] |
| vocab = model["vocab"] |
| original_size = len(vocab) |
|
|
| |
| model["byte_fallback"] = True |
|
|
| |
| for i in range(BYTE_FALLBACK_COUNT): |
| byte_token = BYTE_TOKEN_TEMPLATE.format(i) |
| vocab.append([byte_token, 0.0]) |
|
|
| new_size = len(vocab) |
| print(f" Vocab: {original_size} -> {new_size} (+{BYTE_FALLBACK_COUNT} byte tokens)") |
| print(f" byte_fallback: False -> True") |
|
|
| |
| added = tok.get("added_tokens", []) |
| for i in range(BYTE_FALLBACK_COUNT): |
| byte_token = BYTE_TOKEN_TEMPLATE.format(i) |
| added.append({ |
| "id": original_size + i, |
| "content": byte_token, |
| "single_word": False, |
| "lstrip": False, |
| "rstrip": False, |
| "normalized": False, |
| "special": True, |
| }) |
| tok["added_tokens"] = added |
|
|
| with open(output_path, "w") as f: |
| json.dump(tok, f, ensure_ascii=False, indent=2) |
|
|
| return original_size, new_size |
|
|
|
|
| def fix_config_json(input_path: Path, output_path: Path, new_vocab_size: int): |
| """Update vocab_size in config.json.""" |
| with open(input_path) as f: |
| config = json.load(f) |
|
|
| old_size = config["vocab_size"] |
| config["vocab_size"] = new_vocab_size |
| print(f" config.json vocab_size: {old_size} -> {new_vocab_size}") |
|
|
| with open(output_path, "w") as f: |
| json.dump(config, f, indent=2) |
|
|
|
|
| def resize_embeddings(input_path: Path, output_path: Path, |
| old_vocab: int, new_vocab: int, tie_embeddings: bool): |
| """Resize embedding and lm_head weights to accommodate new tokens.""" |
| print(f" Loading model weights from {input_path} ...") |
| state_dict = load_file(str(input_path)) |
|
|
| embed_key = "model.embed_tokens.weight" |
| lm_head_key = "lm_head.weight" |
|
|
| if embed_key not in state_dict: |
| raise KeyError(f"{embed_key} not found in state_dict. Keys: {list(state_dict.keys())[:10]}") |
|
|
| embed = state_dict[embed_key] |
| print(f" embed_tokens shape: {embed.shape}") |
|
|
| hidden_size = embed.shape[1] |
| extra = new_vocab - old_vocab |
|
|
| |
| mean_embed = embed.mean(dim=0, keepdim=True) |
| |
| noise = torch.randn(extra, hidden_size, dtype=embed.dtype) * 0.01 |
| new_rows = mean_embed.expand(extra, -1) + noise |
|
|
| new_embed = torch.cat([embed, new_rows], dim=0) |
| state_dict[embed_key] = new_embed |
| print(f" embed_tokens resized: {embed.shape} -> {new_embed.shape}") |
|
|
| if tie_embeddings: |
| |
| |
| if lm_head_key in state_dict: |
| del state_dict[lm_head_key] |
| print(f" lm_head removed (tie_word_embeddings=True)") |
| else: |
| if lm_head_key in state_dict: |
| lm_head = state_dict[lm_head_key] |
| mean_lm = lm_head.mean(dim=0, keepdim=True) |
| noise_lm = torch.randn(extra, hidden_size, dtype=lm_head.dtype) * 0.01 |
| new_lm = torch.cat([lm_head, mean_lm.expand(extra, -1) + noise_lm], dim=0) |
| state_dict[lm_head_key] = new_lm |
| print(f" lm_head resized: {lm_head.shape} -> {new_lm.shape}") |
|
|
| print(f" Saving to {output_path} ...") |
| save_file(state_dict, str(output_path)) |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Fix tokenizer byte-fallback for GGUF") |
| parser.add_argument("--input", type=Path, required=True, help="Input HF checkpoint dir") |
| parser.add_argument("--output", type=Path, required=True, help="Output fixed HF checkpoint dir") |
| parser.add_argument("--sp_model", type=Path, default=None, |
| help="SentencePiece .model file to copy (for GGUF conversion)") |
| args = parser.parse_args() |
|
|
| input_dir = args.input |
| output_dir = args.output |
|
|
| if not input_dir.exists(): |
| print(f"ERROR: Input directory not found: {input_dir}") |
| return 1 |
|
|
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| with open(input_dir / "config.json") as f: |
| config = json.load(f) |
| old_vocab = config["vocab_size"] |
| new_vocab = old_vocab + BYTE_FALLBACK_COUNT |
| tie_embeddings = config.get("tie_word_embeddings", False) |
|
|
| print(f"=== Byte-Fallback Fix ===") |
| print(f"Input: {input_dir}") |
| print(f"Output: {output_dir}") |
| print(f"Old vocab: {old_vocab}, New vocab: {new_vocab}") |
| print(f"tie_word_embeddings: {tie_embeddings}") |
| print() |
|
|
| |
| print("[1/4] Fixing tokenizer.json ...") |
| fix_tokenizer_json( |
| input_dir / "tokenizer.json", |
| output_dir / "tokenizer.json", |
| ) |
|
|
| |
| print("[2/4] Fixing config.json ...") |
| fix_config_json( |
| input_dir / "config.json", |
| output_dir / "config.json", |
| new_vocab, |
| ) |
|
|
| |
| print("[3/4] Resizing embeddings ...") |
| resize_embeddings( |
| input_dir / "model.safetensors", |
| output_dir / "model.safetensors", |
| old_vocab, new_vocab, tie_embeddings, |
| ) |
|
|
| |
| print("[4/4] Copying remaining files ...") |
| for fname in ["tokenizer_config.json", "generation_config.json"]: |
| src = input_dir / fname |
| if src.exists(): |
| shutil.copy2(src, output_dir / fname) |
| print(f" Copied {fname}") |
|
|
| |
| if args.sp_model and args.sp_model.exists(): |
| shutil.copy2(args.sp_model, output_dir / "tokenizer.model") |
| print(f" Copied tokenizer.model from {args.sp_model}") |
| elif (input_dir / "tokenizer.model").exists(): |
| shutil.copy2(input_dir / "tokenizer.model", output_dir / "tokenizer.model") |
| print(f" Copied tokenizer.model from input dir") |
|
|
| |
| tc_path = output_dir / "tokenizer_config.json" |
| if tc_path.exists(): |
| with open(tc_path) as f: |
| tc = json.load(f) |
| added_tokens_decoder = tc.get("added_tokens_decoder", {}) |
| for i in range(BYTE_FALLBACK_COUNT): |
| token_id = old_vocab + i |
| byte_token = BYTE_TOKEN_TEMPLATE.format(i) |
| added_tokens_decoder[str(token_id)] = { |
| "content": byte_token, |
| "lstrip": False, |
| "normalized": False, |
| "rstrip": False, |
| "single_word": False, |
| "special": True, |
| } |
| tc["added_tokens_decoder"] = added_tokens_decoder |
| with open(tc_path, "w") as f: |
| json.dump(tc, f, indent=2) |
| print(f" Updated tokenizer_config.json with {BYTE_FALLBACK_COUNT} byte tokens") |
|
|
| print() |
| print(f"=== Done! Fixed checkpoint at: {output_dir} ===") |
| print(f"Next: python outputs/llama.cpp/convert_hf_to_gguf.py {output_dir} --outfile outputs/gguf/frankenstallm-3b-f16.gguf --outtype f16") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|