| """ |
| data/tokenize_extra.py β λμ©λ korean_extra/ λ°μ΄ν°μ
λ³λ ¬ ν ν°ν |
| |
| HuggingFace datasets disk ν¬λ§·(arrow), parquet, jsonl λ± μΈ κ°μ§ ν¬λ§·μ |
| μλ κ°μ§νμ¬ SentencePiece ν ν¬λμ΄μ λ‘ ν ν°ννκ³ , κ²°κ³Όλ₯Ό uint16 memmap |
| (.bin) νμΌλ‘ μ μ₯νλ€. 881 GB μ΄μμ λμ©λ λ°μ΄ν°λ μ€νΈλ¦¬λ°Β·μ²ν¬ λ°©μμΌλ‘ |
| μ²λ¦¬νλ€. |
| |
| μΆλ ₯ ν¬λ§·μ data/dataset.py PackedDataset / TextDataset κ³Ό μμ ν νΈνλλ |
| numpy uint16 νλ« λ°°μ΄μ΄λ€. |
| |
| μ¬μ© μμ: |
| # λ¨μΌ λλ ν 리 |
| python data/tokenize_extra.py \ |
| --input_dir data/korean_extra/fineweb2_edu_ko \ |
| --output data/fineweb2_train.bin \ |
| --num_proc 8 |
| |
| # korean_extra/ μ 체 μλΈλλ ν 리 μΌκ΄ μ²λ¦¬ |
| python data/tokenize_extra.py \ |
| --input_dir data/korean_extra \ |
| --auto_scan \ |
| --output_dir data \ |
| --num_proc 8 |
| |
| # κ³΅κ° κ²μ¦ |
| python -c " |
| import numpy as np |
| d = np.memmap('data/fineweb2_train.bin', dtype='uint16', mode='r') |
| print(f'μ΄ ν ν°: {len(d):,}') |
| " |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import math |
| import multiprocessing as mp |
| import os |
| import struct |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Generator, Iterable, Iterator |
|
|
| import numpy as np |
| from tqdm import tqdm |
|
|
| |
| |
| |
| try: |
| import sentencepiece as spm |
| except ImportError: |
| print( |
| "ERROR: sentencepiece ν¨ν€μ§κ° μ€μΉλμ΄ μμ§ μμ΅λλ€.\n" |
| " pip install sentencepiece λ‘ μ€μΉ ν μ¬μ€ννμΈμ.", |
| file=sys.stderr, |
| ) |
| sys.exit(1) |
|
|
| |
| |
| |
| try: |
| import datasets as hf_datasets |
| except ImportError: |
| print( |
| "ERROR: datasets ν¨ν€μ§κ° μ€μΉλμ΄ μμ§ μμ΅λλ€.\n" |
| " pip install datasets λ‘ μ€μΉ ν μ¬μ€ννμΈμ.", |
| file=sys.stderr, |
| ) |
| sys.exit(1) |
|
|
|
|
| |
| |
| |
|
|
| UINT16_MAX = 65535 |
| MIN_TOKENS = 100 |
| MAX_TOKENS = 32_768 |
| HANGUL_RE_THRESHOLD = 0.10 |
| CHUNK_TOKENS = 500_000 |
| EOS_TOKEN_PLACEHOLDER = 1 |
|
|
| |
| |
| |
| |
| _HANGUL_START = 0xAC00 |
| _HANGUL_END = 0xD7A3 |
|
|
|
|
| def _has_enough_korean_or_english(text: str) -> bool: |
| """ |
| νκΈ λ¬Έμ λΉμ¨μ΄ HANGUL_RE_THRESHOLD μ΄μμ΄κ±°λ, |
| ASCII μνλ²³ λΉμ¨μ΄ 0.3 μ΄μμ΄λ©΄ True λ°ν. |
| λ λ€ μλ κ²½μ° False (μ€κ΅μ΄, μΌλ³Έμ΄λ§ μλ λ±). |
| """ |
| if not text: |
| return False |
| total = len(text) |
| hangul_cnt = sum(1 for ch in text if _HANGUL_START <= ord(ch) <= _HANGUL_END) |
| if hangul_cnt / total >= HANGUL_RE_THRESHOLD: |
| return True |
| ascii_alpha = sum(1 for ch in text if ch.isascii() and ch.isalpha()) |
| if ascii_alpha / total >= 0.30: |
| return True |
| return False |
|
|
|
|
| |
| |
| |
|
|
| class SPTokenizer: |
| """SentencePiece λͺ¨λΈμ wrappingν κ°λ¨ν ν ν¬λμ΄μ .""" |
|
|
| def __init__(self, model_path: str) -> None: |
| self._model_path = model_path |
| self._sp: spm.SentencePieceProcessor | None = None |
|
|
| |
| def _ensure_loaded(self) -> None: |
| if self._sp is None: |
| sp = spm.SentencePieceProcessor() |
| sp.Load(self._model_path) |
| self._sp = sp |
|
|
| @property |
| def eos_id(self) -> int: |
| self._ensure_loaded() |
| return self._sp.eos_id() |
|
|
| @property |
| def vocab_size(self) -> int: |
| self._ensure_loaded() |
| return self._sp.GetPieceSize() |
|
|
| def encode(self, text: str) -> list[int]: |
| self._ensure_loaded() |
| return self._sp.EncodeAsIds(text) |
|
|
|
|
| |
| |
| |
|
|
| def _detect_format(input_dir: Path) -> str: |
| """ |
| λλ ν 리 λ΄μ©μ λ³΄κ³ ν¬λ§·μ μλ κ°μ§νλ€. |
| |
| λ°νκ°: |
| "hf_arrow" β HuggingFace datasets disk ν¬λ§· (dataset_info.json μ‘΄μ¬) |
| "parquet" β .parquet νμΌμ΄ μμ |
| "jsonl" β .jsonl λλ .json νμΌμ΄ μμ |
| "unknown" β μ μ μμ |
| """ |
| if not input_dir.is_dir(): |
| raise NotADirectoryError(f"μ
λ ₯ κ²½λ‘κ° λλ ν λ¦¬κ° μλλλ€: {input_dir}") |
|
|
| |
| if (input_dir / "dataset_info.json").exists(): |
| return "hf_arrow" |
| if (input_dir / "state.json").exists(): |
| return "hf_arrow" |
| |
| for child in input_dir.iterdir(): |
| if child.is_dir() and (child / "dataset_info.json").exists(): |
| return "hf_arrow" |
|
|
| |
| parquets = list(input_dir.rglob("*.parquet")) |
| if parquets: |
| return "parquet" |
|
|
| |
| jsonls = list(input_dir.rglob("*.jsonl")) + list(input_dir.rglob("*.json")) |
| if jsonls: |
| return "jsonl" |
|
|
| return "unknown" |
|
|
|
|
| def _iter_hf_arrow( |
| input_dir: Path, |
| text_col: str, |
| num_proc: int, |
| ) -> Iterator[str]: |
| """HuggingFace datasets disk ν¬λ§·μμ ν
μ€νΈλ₯Ό μ€νΈλ¦¬λ°νλ€.""" |
| print(f" [ν¬λ§·] HuggingFace arrow (disk): {input_dir}") |
| try: |
| ds = hf_datasets.load_from_disk(str(input_dir)) |
| except Exception as exc: |
| |
| try: |
| ds_dict = hf_datasets.load_from_disk(str(input_dir)) |
| if isinstance(ds_dict, hf_datasets.DatasetDict): |
| splits = list(ds_dict.keys()) |
| print(f" DatasetDict κ°μ§. splits={splits}, 'train' split μ¬μ©.") |
| ds = ds_dict.get("train", ds_dict[splits[0]]) |
| else: |
| raise exc |
| except Exception: |
| raise RuntimeError( |
| f"HF arrow ν¬λ§· λ‘λ μ€ν¨: {input_dir}\nμμΈ: {exc}" |
| ) from exc |
|
|
| |
| col = _resolve_text_col(list(ds.column_names), text_col) |
| print(f" ν
μ€νΈ 컬λΌ: '{col}', μ΄ ν μ: {len(ds):,}") |
|
|
| for row in ds: |
| yield row[col] |
|
|
|
|
| def _iter_parquet(input_dir: Path, text_col: str) -> Iterator[str]: |
| """parquet νμΌμμ ν
μ€νΈλ₯Ό μ€νΈλ¦¬λ°νλ€.""" |
| try: |
| import pyarrow.parquet as pq |
| except ImportError: |
| |
| print(" [κ²½κ³ ] pyarrow λ―Έμ€μΉ, datasetsλ‘ parquet λ‘λ μλ...") |
| files = sorted(input_dir.rglob("*.parquet")) |
| print(f" [ν¬λ§·] parquet ({len(files)} νμΌ): {input_dir}") |
| ds = hf_datasets.load_dataset( |
| "parquet", |
| data_files={"train": [str(f) for f in files]}, |
| split="train", |
| streaming=True, |
| ) |
| col = _resolve_text_col(list(ds.column_names), text_col) |
| print(f" ν
μ€νΈ 컬λΌ: '{col}'") |
| for row in ds: |
| yield row[col] |
| return |
|
|
| files = sorted(input_dir.rglob("*.parquet")) |
| print(f" [ν¬λ§·] parquet ({len(files)} νμΌ): {input_dir}") |
| for fpath in files: |
| pf = pq.ParquetFile(str(fpath)) |
| cols = pf.schema_arrow.names |
| col = _resolve_text_col(cols, text_col) |
| for batch in pf.iter_batches(batch_size=1000, columns=[col]): |
| for val in batch.column(col): |
| yield val.as_py() or "" |
|
|
|
|
| def _iter_jsonl(input_dir: Path, text_col: str) -> Iterator[str]: |
| """jsonl / json νμΌμμ ν
μ€νΈλ₯Ό μ€νΈλ¦¬λ°νλ€.""" |
| files = sorted(input_dir.rglob("*.jsonl")) + sorted(input_dir.rglob("*.json")) |
| |
| print(f" [ν¬λ§·] jsonl ({len(files)} νμΌ): {input_dir}") |
| for fpath in files: |
| try: |
| with open(fpath, "r", encoding="utf-8", errors="replace") as fh: |
| for line in fh: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| obj = json.loads(line) |
| except json.JSONDecodeError: |
| continue |
| if isinstance(obj, str): |
| yield obj |
| elif isinstance(obj, dict): |
| text = ( |
| obj.get(text_col) |
| or obj.get("text") |
| or obj.get("content") |
| or obj.get("document") |
| or "" |
| ) |
| yield str(text) |
| except Exception as exc: |
| print(f" [κ²½κ³ ] νμΌ μ½κΈ° μ€ν¨: {fpath} β {exc}", file=sys.stderr) |
|
|
|
|
| def _resolve_text_col(columns: list[str], preferred: str) -> str: |
| """ |
| μ§μ λ 컬λΌμ΄ μμ κ²½μ°, μΌλ°μ μΈ ν
μ€νΈ μ»¬λΌ μ΄λ¦μ μμλλ‘ νμνλ€. |
| """ |
| if preferred in columns: |
| return preferred |
| for candidate in ("text", "content", "document", "body", "passage"): |
| if candidate in columns: |
| print( |
| f" [INFO] μ»¬λΌ '{preferred}' λ―Έμ‘΄μ¬ β '{candidate}' μ¬μ©. " |
| f"(μ 체 컬λΌ: {columns[:10]})" |
| ) |
| return candidate |
| |
| print( |
| f" [κ²½κ³ ] ν
μ€νΈ 컬λΌμ μ°Ύμ§ λͺ»ν¨. 첫 λ²μ§Έ μ»¬λΌ '{columns[0]}' μ¬μ©.", |
| file=sys.stderr, |
| ) |
| return columns[0] |
|
|
|
|
| def get_text_iterator( |
| input_dir: Path, |
| text_col: str, |
| num_proc: int, |
| ) -> tuple[str, Iterator[str]]: |
| """ |
| ν¬λ§·μ μλ κ°μ§νκ³ μλ§μ ν
μ€νΈ μ΄ν°λ μ΄ν°λ₯Ό λ°ννλ€. |
| |
| Returns: |
| (fmt, iterator) fmtμ κ°μ§λ ν¬λ§· λ¬Έμμ΄ |
| """ |
| fmt = _detect_format(input_dir) |
| if fmt == "hf_arrow": |
| return fmt, _iter_hf_arrow(input_dir, text_col, num_proc) |
| elif fmt == "parquet": |
| return fmt, _iter_parquet(input_dir, text_col) |
| elif fmt == "jsonl": |
| return fmt, _iter_jsonl(input_dir, text_col) |
| else: |
| raise RuntimeError( |
| f"μ§μνμ§ μλ ν¬λ§·μ΄κ±°λ μΈμν μ μμ΅λλ€: {input_dir}\n" |
| f"μ§μ ν¬λ§·: HuggingFace arrow, parquet, jsonl" |
| ) |
|
|
|
|
| |
| |
| |
|
|
| |
| _g_sp: SPTokenizer | None = None |
| _g_model_path: str = "" |
|
|
|
|
| def _worker_init(model_path: str) -> None: |
| """μ컀 μ΄κΈ°ν ν¨μ: SentencePiece λͺ¨λΈ λ‘λ.""" |
| global _g_sp, _g_model_path |
| _g_model_path = model_path |
| _g_sp = SPTokenizer(model_path) |
| _g_sp._ensure_loaded() |
|
|
|
|
| def _worker_tokenize_batch(texts: list[str]) -> list[list[int]]: |
| """ |
| ν
μ€νΈ λ°°μΉλ₯Ό ν ν°ννκ³ νμ§ νν°λ₯Ό μ μ©νλ€. |
| |
| λ°νκ°: μ ν¨ν ν ν° λ¦¬μ€νΈ λͺ©λ‘ (νν° ν΅κ³Όν κ²λ§) |
| """ |
| global _g_sp |
| results: list[list[int]] = [] |
| for text in texts: |
| if not text or not isinstance(text, str): |
| continue |
| |
| if not _has_enough_korean_or_english(text): |
| continue |
| try: |
| ids = _g_sp.encode(text) |
| except Exception: |
| continue |
| |
| if len(ids) < MIN_TOKENS: |
| continue |
| if len(ids) > MAX_TOKENS: |
| ids = ids[:MAX_TOKENS] |
| results.append(ids) |
| return results |
|
|
|
|
| |
| |
| |
|
|
| class MemmapWriter: |
| """ |
| uint16 numpy memmap νμΌμ ν ν°μ μ²ν¬ λ¨μλ‘ κΈ°λ‘νλ λνΌ. |
| |
| μ΄κΈ°μ μμ ν¬κΈ°λ‘ μμ±νκ³ , νμν λ resizeνλ€. |
| μ΅μ’
μ μΌλ‘ μ€μ κΈ°λ‘λ ν¬κΈ°λ‘ truncateνμ¬ μ μ₯νλ€. |
| """ |
|
|
| def __init__(self, path: Path, initial_size: int = CHUNK_TOKENS) -> None: |
| self.path = path |
| path.parent.mkdir(parents=True, exist_ok=True) |
| self._alloc = max(initial_size, CHUNK_TOKENS) |
| self._mm = np.memmap( |
| str(path), dtype="uint16", mode="w+", shape=(self._alloc,) |
| ) |
| self._pos = 0 |
|
|
| def write(self, tokens: Iterable[int]) -> int: |
| """tokensλ₯Ό κΈ°λ‘νκ³ κΈ°λ‘λ ν ν° μλ₯Ό λ°ννλ€.""" |
| arr = np.asarray(list(tokens), dtype=np.uint16) |
| n = len(arr) |
| if n == 0: |
| return 0 |
| needed = self._pos + n |
| if needed > self._alloc: |
| |
| new_alloc = max(self._alloc * 2, needed + CHUNK_TOKENS) |
| self._mm.flush() |
| del self._mm |
| self._alloc = new_alloc |
| self._mm = np.memmap( |
| str(self.path), dtype="uint16", mode="r+", shape=(self._alloc,) |
| ) |
| self._mm[self._pos : self._pos + n] = arr |
| self._pos += n |
| return n |
|
|
| def finalize(self) -> int: |
| """κΈ°λ‘λ μ€μ ν¬κΈ°λ‘ νμΌμ truncateνκ³ λ«λλ€. μ΄ ν ν° μλ₯Ό λ°ννλ€.""" |
| self._mm.flush() |
| del self._mm |
| |
| final_bytes = self._pos * 2 |
| with open(str(self.path), "r+b") as fh: |
| fh.truncate(final_bytes) |
| return self._pos |
|
|
|
|
| |
| |
| |
|
|
| def tokenize_directory( |
| input_dir: Path, |
| output_path: Path, |
| tokenizer_path: str, |
| text_col: str = "text", |
| num_proc: int = 8, |
| batch_size: int = 512, |
| eos_between_docs: bool = True, |
| val_split: float = 0.002, |
| seed: int = 42, |
| ) -> dict: |
| """ |
| λ¨μΌ λλ ν 리λ₯Ό ν ν°ννμ¬ .bin νμΌ(λ€)λ‘ μ μ₯νλ€. |
| |
| Args: |
| input_dir: μ
λ ₯ λλ ν 리 (ν¬λ§· μλ κ°μ§) |
| output_path: μΆλ ₯ .bin νμΌ κ²½λ‘ (νλ ¨ μ
) |
| tokenizer_path: SentencePiece .model νμΌ κ²½λ‘ |
| text_col: ν
μ€νΈ μ»¬λΌ μ΄λ¦ (arrow/parquetμμ μ¬μ©) |
| num_proc: λ³λ ¬ μ컀 μ |
| batch_size: μμ»€λΉ λ°°μΉ ν¬κΈ° |
| eos_between_docs: λ¬Έμ μ¬μ΄μ EOS ν ν° μ½μ
μ¬λΆ |
| val_split: κ²μ¦ λΆλ¦¬ λΉμ¨ (0 μ΄λ©΄ val νμΌ μμ± μ ν¨) |
| seed: μ¬νμ± μλ |
| |
| Returns: |
| ν΅κ³ dict (total_tokens, train_tokens, val_tokens, skipped, elapsed_s) |
| """ |
| t_start = time.time() |
|
|
| |
| sp_main = SPTokenizer(tokenizer_path) |
| eos_id = sp_main.eos_id |
| vocab_size = sp_main.vocab_size |
| print(f" ν ν¬λμ΄μ : {tokenizer_path}") |
| print(f" vocab_size={vocab_size:,}, eos_id={eos_id}") |
| if vocab_size > UINT16_MAX: |
| print( |
| f" [κ²½κ³ ] vocab_size({vocab_size}) > {UINT16_MAX} " |
| f"β uint16 μ€λ²νλ‘ κ°λ₯. 65535 μ΄ν idλ§ μμ .", |
| file=sys.stderr, |
| ) |
|
|
| |
| fmt, text_iter = get_text_iterator(input_dir, text_col, num_proc) |
| print(f" ν¬λ§·: {fmt}") |
|
|
| |
| train_path = output_path |
| val_path: Path | None = None |
| if val_split > 0: |
| stem = output_path.stem |
| if "train" in stem: |
| val_path = output_path.parent / output_path.name.replace("train", "val") |
| else: |
| val_path = output_path.with_name(stem + "_val" + output_path.suffix) |
|
|
| print(f" μΆλ ₯(train): {train_path}") |
| if val_path: |
| print(f" μΆλ ₯(val): {val_path}") |
|
|
| |
| writer = MemmapWriter(train_path) |
| val_writer: MemmapWriter | None = MemmapWriter(val_path) if val_path else None |
|
|
| |
| pool = mp.Pool( |
| processes=num_proc, |
| initializer=_worker_init, |
| initargs=(tokenizer_path,), |
| ) |
|
|
| total_docs = 0 |
| skipped = 0 |
| total_toks = 0 |
|
|
| |
| rng = np.random.default_rng(seed) |
|
|
| def _submit_batch(batch_texts: list[str]) -> None: |
| nonlocal total_docs, skipped, total_toks |
| |
| sub_size = max(1, len(batch_texts) // num_proc) |
| sub_batches = [ |
| batch_texts[i : i + sub_size] |
| for i in range(0, len(batch_texts), sub_size) |
| ] |
| results_list = pool.map(_worker_tokenize_batch, sub_batches) |
|
|
| for results in results_list: |
| for ids in results: |
| total_docs += 1 |
| n = len(ids) |
| total_toks += n |
| |
| if eos_between_docs: |
| ids_out = ids + [eos_id] |
| else: |
| ids_out = ids |
|
|
| |
| if val_writer is not None and rng.random() < val_split: |
| val_writer.write(ids_out) |
| else: |
| writer.write(ids_out) |
|
|
| skipped_in_batch = sum(1 for _ in results) - len(results) |
|
|
| |
| batch_buf: list[str] = [] |
| pbar = tqdm(desc=f"ν ν°ν [{input_dir.name}]", unit="doc", dynamic_ncols=True) |
|
|
| for text in text_iter: |
| batch_buf.append(text) |
| if len(batch_buf) >= batch_size * num_proc: |
| _submit_batch(batch_buf) |
| pbar.update(len(batch_buf)) |
| pbar.set_postfix( |
| tokens=f"{total_toks:,}", |
| docs=f"{total_docs:,}", |
| refresh=False, |
| ) |
| batch_buf = [] |
|
|
| |
| if batch_buf: |
| _submit_batch(batch_buf) |
| pbar.update(len(batch_buf)) |
|
|
| pbar.close() |
| pool.close() |
| pool.join() |
|
|
| |
| train_tokens = writer.finalize() |
| val_tokens = val_writer.finalize() if val_writer else 0 |
|
|
| elapsed = time.time() - t_start |
| total_toks_with_eos = train_tokens + val_tokens |
|
|
| print() |
| print(f" μλ£: {elapsed:.1f}μ΄") |
| print(f" μ²λ¦¬ λ¬Έμ: {total_docs:,}") |
| print(f" μ΄ ν ν°(EOS ν¬ν¨): {total_toks_with_eos:,}") |
| print(f" train: {train_tokens:,} ({train_tokens*2/1e9:.2f} GB)") |
| if val_tokens: |
| print(f" val: {val_tokens:,} ({val_tokens*2/1e9:.2f} GB)") |
| throughput = total_toks_with_eos / elapsed if elapsed > 0 else 0 |
| print(f" μ²λ¦¬μ¨: {throughput/1e6:.2f} M token/s") |
|
|
| return { |
| "total_docs" : total_docs, |
| "total_tokens" : total_toks_with_eos, |
| "train_tokens" : train_tokens, |
| "val_tokens" : val_tokens, |
| "elapsed_s" : elapsed, |
| "train_path" : str(train_path), |
| "val_path" : str(val_path) if val_path else None, |
| } |
|
|
|
|
| |
| |
| |
|
|
| def auto_scan_and_tokenize( |
| root_dir: Path, |
| output_dir: Path, |
| tokenizer_path: str, |
| text_col: str, |
| num_proc: int, |
| batch_size: int, |
| val_split: float, |
| seed: int, |
| ) -> list[dict]: |
| """ |
| root_dir μ μ§μ μμ λλ ν 리λ₯Ό μ€μΊνμ¬ κ°κ° ν ν°ννλ€. |
| |
| κ° μλΈλλ ν 리μ λν΄: |
| output_dir/korean_extra_{subdir_name}_train.bin μ μμ±νλ€. |
| """ |
| children = sorted(p for p in root_dir.iterdir() if p.is_dir()) |
| if not children: |
| raise RuntimeError(f"μλΈλλ ν λ¦¬κ° μμ΅λλ€: {root_dir}") |
|
|
| print(f"μλ μ€μΊ: {len(children)}κ° μλΈλλ ν 리 λ°κ²¬") |
| for ch in children: |
| print(f" - {ch.name}") |
| print() |
|
|
| all_stats = [] |
| for child in children: |
| print("=" * 60) |
| print(f"μ²λ¦¬ μ€: {child}") |
| print("=" * 60) |
| safe_name = child.name.replace("/", "_").replace(" ", "_") |
| out_name = f"korean_extra_{safe_name}_train.bin" |
| out_path = output_dir / out_name |
| try: |
| stats = tokenize_directory( |
| input_dir = child, |
| output_path = out_path, |
| tokenizer_path = tokenizer_path, |
| text_col = text_col, |
| num_proc = num_proc, |
| batch_size = batch_size, |
| val_split = val_split, |
| seed = seed, |
| ) |
| stats["source"] = child.name |
| all_stats.append(stats) |
| except Exception as exc: |
| print(f" [μ€λ₯] {child.name} μ²λ¦¬ μ€ν¨: {exc}", file=sys.stderr) |
| all_stats.append({"source": child.name, "error": str(exc)}) |
| print() |
|
|
| return all_stats |
|
|
|
|
| |
| |
| |
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser( |
| description=( |
| "korean_extra/ λμ©λ λ°μ΄ν°μ
μ λ³λ ¬ ν ν°ννμ¬ uint16 memmap(.bin) λ‘ μ μ₯. " |
| "HuggingFace arrow, parquet, jsonl ν¬λ§· μλ κ°μ§." |
| ), |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, |
| ) |
|
|
| |
| parser.add_argument( |
| "--input_dir", |
| required=True, |
| help="ν ν°νν λλ ν 리 κ²½λ‘. --auto_scan μμλ λ£¨νΈ λλ ν 리.", |
| ) |
| parser.add_argument( |
| "--auto_scan", |
| action="store_true", |
| help=( |
| "input_dir μ μ§μ μμ λλ ν 리λ₯Ό λͺ¨λ μμ°¨ μ²λ¦¬. " |
| "μ΄ κ²½μ° --output_dir μ μ§μ ν΄μΌ ν¨." |
| ), |
| ) |
| parser.add_argument( |
| "--text_col", |
| default="text", |
| help="ν
μ€νΈ μ»¬λΌ μ΄λ¦ (arrow/parquet/jsonl). μλ μΆμ κ°λ₯.", |
| ) |
|
|
| |
| out_group = parser.add_mutually_exclusive_group() |
| out_group.add_argument( |
| "--output", |
| default=None, |
| help="μΆλ ₯ .bin νμΌ κ²½λ‘ (λ¨μΌ λλ ν 리 μ²λ¦¬ μ μ¬μ©).", |
| ) |
| out_group.add_argument( |
| "--output_dir", |
| default=None, |
| help="μΆλ ₯ .bin νμΌλ€μ μ μ₯ν λλ ν 리 (--auto_scan μ μ¬μ©).", |
| ) |
|
|
| |
| parser.add_argument( |
| "--tokenizer", |
| default=( |
| "/PROJECT/0325120031_A/ghong/taketimes/llm-bang" |
| "/tokenizer/korean_64k.model" |
| ), |
| help="SentencePiece .model νμΌ κ²½λ‘.", |
| ) |
|
|
| |
| parser.add_argument( |
| "--num_proc", |
| type=int, |
| default=8, |
| help="λ³λ ¬ μ컀 μ (multiprocessing.Pool).", |
| ) |
| parser.add_argument( |
| "--batch_size", |
| type=int, |
| default=512, |
| help="μμ»€λΉ λ°°μΉ ν¬κΈ° (λ¬Έμ μ).", |
| ) |
| parser.add_argument( |
| "--val_split", |
| type=float, |
| default=0.002, |
| help="κ²μ¦ λΆλ¦¬ λΉμ¨ (0.0 μ΄λ©΄ val νμΌ λ―Έμμ±).", |
| ) |
| parser.add_argument( |
| "--seed", |
| type=int, |
| default=42, |
| help="μ¬νμ± μλ.", |
| ) |
| parser.add_argument( |
| "--no_eos", |
| action="store_true", |
| help="λ¬Έμ μ¬μ΄μ EOS ν ν°μ μ½μ
νμ§ μλλ€.", |
| ) |
|
|
| args = parser.parse_args() |
|
|
| |
| if not args.auto_scan and args.output is None: |
| |
| input_name = Path(args.input_dir).name |
| args.output = str( |
| Path(args.input_dir).parent.parent |
| / f"korean_extra_{input_name}_train.bin" |
| ) |
| print(f"[INFO] --output λ―Έμ§μ β μλ μ€μ : {args.output}") |
|
|
| if args.auto_scan and args.output_dir is None: |
| parser.error("--auto_scan μ¬μ© μ --output_dir μ μ§μ ν΄μΌ ν©λλ€.") |
|
|
| return args |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
|
|
| tokenizer_path = args.tokenizer |
| if not Path(tokenizer_path).exists(): |
| |
| fallback = Path( |
| "/PROJECT/0325120031_A/ghong/taketimes/llm-bang" |
| "/tokenizer/korean_64k.model" |
| ) |
| if fallback.exists(): |
| tokenizer_path = str(fallback) |
| else: |
| print( |
| f"ERROR: ν ν¬λμ΄μ νμΌμ μ°Ύμ μ μμ΅λλ€: {tokenizer_path}", |
| file=sys.stderr, |
| ) |
| sys.exit(1) |
|
|
| print("=" * 60) |
| print(" LLM-Bang tokenize_extra.py") |
| print("=" * 60) |
| print(f" μ
λ ₯: {args.input_dir}") |
| print(f" ν ν¬λμ΄μ : {tokenizer_path}") |
| print(f" num_proc: {args.num_proc}") |
| print(f" batch_size: {args.batch_size}") |
| print(f" val_split: {args.val_split}") |
| print(f" seed: {args.seed}") |
| print(f" eos: {not args.no_eos}") |
| print() |
|
|
| if args.auto_scan: |
| stats_list = auto_scan_and_tokenize( |
| root_dir = Path(args.input_dir), |
| output_dir = Path(args.output_dir), |
| tokenizer_path = tokenizer_path, |
| text_col = args.text_col, |
| num_proc = args.num_proc, |
| batch_size = args.batch_size, |
| val_split = args.val_split, |
| seed = args.seed, |
| ) |
| print("=" * 60) |
| print(" μ 체 μμ½") |
| print("=" * 60) |
| grand_train = 0 |
| grand_val = 0 |
| for s in stats_list: |
| if "error" in s: |
| print(f" {s['source']:40s} ERROR: {s['error']}") |
| else: |
| t = s.get("train_tokens", 0) |
| v = s.get("val_tokens", 0) |
| grand_train += t |
| grand_val += v |
| print( |
| f" {s['source']:40s} " |
| f"train={t:>14,} val={v:>12,} " |
| f"({s['elapsed_s']:.0f}s)" |
| ) |
| print("-" * 60) |
| print( |
| f" {'ν©κ³':40s} " |
| f"train={grand_train:>14,} val={grand_val:>12,}" |
| ) |
| print( |
| f"\n μ΄ ν ν°: {grand_train + grand_val:,} " |
| f"({(grand_train + grand_val) * 2 / 1e9:.2f} GB)" |
| ) |
|
|
| else: |
| stats = tokenize_directory( |
| input_dir = Path(args.input_dir), |
| output_path = Path(args.output), |
| tokenizer_path = tokenizer_path, |
| text_col = args.text_col, |
| num_proc = args.num_proc, |
| batch_size = args.batch_size, |
| eos_between_docs = not args.no_eos, |
| val_split = args.val_split, |
| seed = args.seed, |
| ) |
| print() |
| print("=" * 60) |
| print(" κ²°κ³Ό μμ½") |
| print("=" * 60) |
| print(f" train .bin : {stats['train_path']}") |
| if stats.get("val_path"): |
| print(f" val .bin : {stats['val_path']}") |
| print(f" train ν ν° : {stats['train_tokens']:,}") |
| print(f" val ν ν° : {stats['val_tokens']:,}") |
| print(f" μ²λ¦¬ λ¬Έμ : {stats['total_docs']:,}") |
| print(f" μμ μκ° : {stats['elapsed_s']:.1f}μ΄") |
|
|
| |
| print() |
| print(" [κ²μ¦] memmap λ‘λ ν
μ€νΈ...") |
| try: |
| d = np.memmap(stats["train_path"], dtype="uint16", mode="r") |
| print(f" memmap shape: {d.shape} dtype: {d.dtype}") |
| print(f" 첫 10 ν ν°: {d[:10].tolist()}") |
| except Exception as exc: |
| print(f" [κ²½κ³ ] memmap λ‘λ μ€ν¨: {exc}", file=sys.stderr) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|