import os import re import multiprocessing from pathlib import Path from typing import Dict, List from datasets import load_dataset, Dataset from transformers import AutoTokenizer os.environ["TOKENIZERS_PARALLELISM"] = "false" DATASET_NAME_PATTERN = re.compile(r"[^a-zA-Z0-9]") def download_dataset( ds_name: str, ds_config: str = None, ds_split: str = "train", ): """ Download a dataset from the HuggingFace Hub. Will only save the Args: ds_name (`str`): The name of the dataset to load. ds_config (`str`, *optional*, Defaults to `None`): The configuration of the dataset to load. ds_split (`str`, *optional*, Defaults to `"train"`): The split of the dataset to load. Returns: len(ds) (`int`): The number of rows in the dataset. """ if ds_name == "wikipedia": ds = load_wikipedia(ds_name, ds_config) else: if ds_config == "": ds_config = None ds = load_dataset(ds_name, ds_config, split=ds_split) chunk_and_save_dataset( ds, ds_name=ds_name, ds_config=ds_config, suffix=f"_{ds_split}_raw" ) return len(ds) def load_wikipedia(ds_name, ds_config): """ Stream the wikipedia dataset from the HuggingFace Hub. Args: ds_name (`str`): The name of the dataset to load. Must be `"wikipedia"`. ds_config (`str`, *optional*, Defaults to `None`): The configuration of the dataset to load. Returns: ds (`datasets.Dataset`): """ ds = load_dataset(ds_name, ds_config, streaming=True, split="train") def gen(): for example in ds: yield {"text": example["text"]} return Dataset.from_generator(gen) def chunk_and_save_dataset( ds: Dataset, chunk_size: int = 20_000, ds_name: str = None, ds_config: str = None, suffix: str = "", ): """ Chunk a dataset into smaller datasets of size `chunk_size`. The name of the dataset will be used to create a folder in `/data`. Args: ds (`Dataset`): The dataset to chunk. chunk_size (`int`, *optional*, Defaults to `20_000`): The size of each chunk. Defaults to `20_000`. ds_name (`str`, *optional*, Defaults to `None`): The name of the dataset to load. ds_config (`str`, *optional*, Defaults to `None`): The configuration of the dataset to load. suffix (`str`, *optional*, Defaults to `""`): The suffix to add to the dataset name. Returns: chunks (`List[Dataset]`): The list of chunks. """ if ds_config is None: ds_config = "" folder = Path("/data") / DATASET_NAME_PATTERN.sub("", ds_name + ds_config) folder.mkdir(exist_ok=True, parents=True) for chunk_num, start_idx in enumerate(range(0, len(ds), chunk_size)): end_idx = min(start_idx + chunk_size, len(ds)) temp = ds.select(range(start_idx, end_idx)) temp.to_parquet(str(folder / f"chunk_{chunk_num}{suffix}")) def tokenize_dataset( ds_name: str, ds_config: str = None, ds_split: str = "train", model_name: str = None, opt_level: str = None, column_name: str = "text", num2skip: int = 0, num2embed: int = -1, ): """ Tokenize the examples using the tokenizer. Sort by length Args: ds_name (`str`): The name of the dataset to load. ds_config (`str`, *optional*, Defaults to `None`): The configuration of the dataset to load. model_name (`str`, *optional*, Defaults to `None`): The name of the model to use for tokenization. opt_level (`str`, *optional*, Defaults to `None`): The optimization level to use for tokenization. column_name (`str`, *optional*, defaults to `text`): column name to use for tokenization. Defaults to `text` num2skip (`int`, *optional*, defaults to `0`): number of rows to skip. Defaults to `0` num2embed (`int`, *optional*, defaults to `-1`): number of rows to embed. Defaults to `-1`, which means all rows. Returns: ds (`Dataset`): """ # TODO: option for controlling length for models that can go shorter/longer than 512 folder = Path("/data") / DATASET_NAME_PATTERN.sub("", ds_name + ds_config) files = list(map(str, folder.glob(f"chunk_*_{ds_split}_raw"))) ds = load_dataset("parquet", data_files=files, split="train") if num2embed == -1: num2embed = len(ds) ds = ds.select(range(num2skip, num2skip + num2embed)) tokenizer = AutoTokenizer.from_pretrained(model_name) padding = "max_length" if opt_level == "O4" else False max_length = 512 def tokenize( examples: Dict[str, List[str]], ): tokenized = tokenizer( examples[column_name], truncation=True, padding=padding, max_length=max_length, ) tokenized["length"] = [len(x) for x in tokenized["input_ids"]] return tokenized tds = ds.map( tokenize, batched=True, batch_size=1000, remove_columns=set(ds.column_names) - {column_name}, num_proc=multiprocessing.cpu_count(), desc="Tokenizing", ) # sort to minimize padding if padding != "max_length": tds = tds.sort("length") chunk_and_save_dataset( tds, ds_name=ds_name, ds_config=ds_config, suffix=f"_{ds_split}_tokenized" ) def load_tokenized_dataset( ds_name: str, ds_config: str = None, ds_split: str = "train", ): """ Load a tokenized dataset from disk. Args: ds_name (`str`): The name of the dataset to load. ds_config (`str`, *optional*, Defaults to `None`): The configuration of the dataset to load. ds_split (`str`, *optional*, Defaults to `"train"`): The split of the dataset to load. Returns: ds (`Dataset`): """ folder = Path("/data") / DATASET_NAME_PATTERN.sub("", ds_name + ds_config) files = list(map(str, folder.glob(f"chunk_*_{ds_split}_tokenized"))) return load_dataset("parquet", data_files=files, split="train")