rtferraz's picture
Add data_pipeline.py — tokenize_user_sequences, pack_sequences, prepare_clm_dataset
1dfd4e2 verified
"""
Data pipeline for domain sequence CLM pre-training.
Pipeline:
1. Raw events -> DomainTokenizerBuilder.tokenize_sequence() -> token strings
2. Token strings -> HF tokenizer -> token IDs (variable length)
3. Token ID sequences -> pack into fixed-length blocks (group_texts pattern)
4. Packed blocks -> DataCollatorForLanguageModeling -> {input_ids, labels, attention_mask}
Packing follows the official HF run_clm.py pattern: concatenate all tokenized
sequences, split into fixed-length blocks. Zero padding waste, 100% token utilization.
"""
import logging
from typing import Any, Dict, List, Optional, Sequence
from datasets import Dataset as HFDataset
from ..tokenizers.domain_tokenizer import DomainTokenizerBuilder
logger = logging.getLogger(__name__)
def tokenize_user_sequences(
user_sequences: Sequence[Sequence[Dict[str, Any]]],
builder: DomainTokenizerBuilder,
hf_tokenizer,
add_bos: bool = True,
add_eos: bool = True,
num_proc: int = 1,
) -> List[List[int]]:
"""Tokenize user event sequences into token ID lists."""
all_token_ids = []
for events in user_sequences:
token_strings = builder.tokenize_sequence(events, add_bos=add_bos, add_eos=add_eos)
token_text = " ".join(token_strings)
encoding = hf_tokenizer(token_text, add_special_tokens=False)
all_token_ids.append(encoding["input_ids"])
return all_token_ids
def pack_sequences(token_id_sequences: List[List[int]], block_size: int = 512) -> HFDataset:
"""Pack variable-length token sequences into fixed-length blocks.
Follows the official HF run_clm.py pattern: concatenate all sequences
into one long stream, split into fixed-length blocks, drop remainder.
Achieves 100% token utilization with zero padding waste.
"""
concatenated = []
for seq in token_id_sequences:
concatenated.extend(seq)
total_tokens = len(concatenated)
n_blocks = total_tokens // block_size
dropped = total_tokens - n_blocks * block_size
if n_blocks == 0:
raise ValueError(
f"Not enough tokens ({total_tokens}) to form even one block of size {block_size}. "
f"Reduce block_size or add more data."
)
logger.info(f"Packing: {total_tokens:,} tokens -> {n_blocks:,} blocks of {block_size} "
f"({dropped} tokens dropped, {dropped/total_tokens*100:.1f}% waste)")
packed = [concatenated[i * block_size : (i + 1) * block_size] for i in range(n_blocks)]
return HFDataset.from_dict({"input_ids": packed})
def prepare_clm_dataset(
user_sequences: Sequence[Sequence[Dict[str, Any]]],
builder: DomainTokenizerBuilder,
hf_tokenizer,
block_size: int = 512,
add_bos: bool = True,
add_eos: bool = True,
) -> HFDataset:
"""Full pipeline: user event sequences -> packed CLM training dataset.
Example:
>>> dataset = prepare_clm_dataset(user_sequences, builder, hf_tokenizer, block_size=512)
>>> collator = DataCollatorForLanguageModeling(tokenizer=hf_tokenizer, mlm=False)
>>> trainer = Trainer(model=model, train_dataset=dataset, data_collator=collator, ...)
"""
token_id_sequences = tokenize_user_sequences(
user_sequences, builder, hf_tokenizer, add_bos=add_bos, add_eos=add_eos,
)
total_tokens = sum(len(seq) for seq in token_id_sequences)
avg_tokens = total_tokens / max(len(token_id_sequences), 1)
logger.info(f"Tokenized {len(token_id_sequences)} user sequences -> "
f"{total_tokens:,} tokens (avg {avg_tokens:.1f} tokens/sequence)")
return pack_sequences(token_id_sequences, block_size=block_size)