| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | import os
|
| | import sys
|
| | import argparse
|
| | import math
|
| | import csv
|
| | from typing import List, Tuple
|
| |
|
| | try:
|
| | from tqdm import tqdm
|
| | except ImportError:
|
| | print("tqdm not installed. Install with: pip install tqdm", file=sys.stderr)
|
| | sys.exit(1)
|
| |
|
| | try:
|
| | import pandas as pd
|
| | except ImportError:
|
| | print("pandas not installed. Install with: pip install pandas", file=sys.stderr)
|
| | sys.exit(1)
|
| |
|
| | try:
|
| | from transformers import AutoTokenizer
|
| | except ImportError:
|
| | print("transformers not installed. Install with: pip install transformers", file=sys.stderr)
|
| | sys.exit(1)
|
| |
|
| | def load_texts(csv_path: str) -> List[str]:
|
| |
|
| | df = pd.read_csv(csv_path, header=None, quoting=csv.QUOTE_ALL, dtype=str)
|
| | df.columns = ["text"]
|
| |
|
| | texts = df["text"].astype(str).tolist()
|
| | return texts
|
| |
|
| | def compute_lengths(
|
| | texts: List[str],
|
| | tokenizer_name: str,
|
| | max_length: int,
|
| | batch_size: int = 64
|
| | ) -> Tuple[List[int], List[int], int]:
|
| | """
|
| | Returns:
|
| | - lengths_with_special: token lengths when add_special_tokens=True
|
| | - lengths_no_special: token lengths when add_special_tokens=False
|
| | - special_tokens_to_add: tokenizer.num_special_tokens_to_add(pair=False)
|
| | """
|
| | tokenizer = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True)
|
| |
|
| | lengths_with_special = []
|
| | lengths_no_special = []
|
| |
|
| |
|
| | special_tokens_to_add = tokenizer.num_special_tokens_to_add(pair=False)
|
| |
|
| |
|
| | for i in tqdm(range(0, len(texts), batch_size), desc="Tokenizing", unit="batch"):
|
| | batch = texts[i:i+batch_size]
|
| |
|
| | enc_with = tokenizer(
|
| | batch,
|
| | add_special_tokens=True,
|
| | truncation=False,
|
| | return_attention_mask=False,
|
| | return_token_type_ids=False
|
| | )
|
| | enc_without = tokenizer(
|
| | batch,
|
| | add_special_tokens=False,
|
| | truncation=False,
|
| | return_attention_mask=False,
|
| | return_token_type_ids=False
|
| | )
|
| |
|
| |
|
| | lengths_with_special.extend([len(ids) for ids in enc_with["input_ids"]])
|
| | lengths_no_special.extend([len(ids) for ids in enc_without["input_ids"]])
|
| |
|
| | return lengths_with_special, lengths_no_special, special_tokens_to_add
|
| |
|
| | def summarize(lengths: List[int]):
|
| | import numpy as np
|
| | arr = np.array(lengths, dtype=int)
|
| | stats = {
|
| | "count": int(arr.size),
|
| | "mean": float(arr.mean()) if arr.size else 0.0,
|
| | "median": float(np.median(arr)) if arr.size else 0.0,
|
| | "min": int(arr.min()) if arr.size else 0,
|
| | "p90": float(np.percentile(arr, 90)) if arr.size else 0.0,
|
| | "p95": float(np.percentile(arr, 95)) if arr.size else 0.0,
|
| | "p99": float(np.percentile(arr, 99)) if arr.size else 0.0,
|
| | "max": int(arr.max()) if arr.size else 0
|
| | }
|
| | return stats
|
| |
|
| | def estimate_windows(
|
| | lengths_no_special: List[int],
|
| | max_length: int,
|
| | special_tokens_to_add: int,
|
| | doc_stride: int
|
| | ) -> Tuple[int, int]:
|
| | """
|
| | Estimate total number of sliding-window chunks required if we split long docs.
|
| | We compute per-sample windows with content capacity = max_length - special_tokens_to_add.
|
| | Overlap is applied on content tokens via doc_stride.
|
| | Returns:
|
| | (total_windows, num_samples_needing_chunking)
|
| | """
|
| | content_capacity = max_length - special_tokens_to_add
|
| | if content_capacity <= 0:
|
| | raise ValueError(f"Invalid content capacity: {content_capacity}. Check tokenizer specials and max_length.")
|
| |
|
| | total_windows = 0
|
| | need_chunking = 0
|
| |
|
| | for n in lengths_no_special:
|
| | if n <= content_capacity:
|
| | total_windows += 1
|
| | else:
|
| | need_chunking += 1
|
| | step = max(content_capacity - doc_stride, 1)
|
| |
|
| | remaining = max(n - content_capacity, 0)
|
| | extra = math.ceil(remaining / step)
|
| | total_windows += 1 + extra
|
| | return total_windows, need_chunking
|
| |
|
| | def histogram_counts(lengths_with_special: List[int], max_length: int) -> List[Tuple[str, int]]:
|
| | bins = [128, 256, 384, 512]
|
| | labels = []
|
| | counts = []
|
| | prev = 0
|
| | for b in bins:
|
| | labels.append(f"{prev+1:>4}-{b:>4}")
|
| | counts.append(sum(1 for L in lengths_with_special if prev < L <= b))
|
| | prev = b
|
| | labels.append(f">{bins[-1]}")
|
| | counts.append(sum(1 for L in lengths_with_special if L > bins[-1]))
|
| | return list(zip(labels, counts))
|
| |
|
| | def main():
|
| | parser = argparse.ArgumentParser(description="DeBERTa v3 base token-length analysis for train_clean.csv")
|
| | parser.add_argument("--input_csv", default="train_clean.csv", help="Input CSV (one quoted text per line).")
|
| | parser.add_argument("--tokenizer", default="microsoft/mdeberta-v3-base", help="HF tokenizer name.")
|
| | parser.add_argument("--max_length", type=int, default=512, help="Max sequence length (incl. specials).")
|
| | parser.add_argument("--doc_stride", type=int, default=128, help="Sliding window overlap on content tokens.")
|
| | parser.add_argument("--batch_size", type=int, default=64, help="Batch size for tokenization.")
|
| | args = parser.parse_args()
|
| |
|
| | script_dir = os.path.dirname(os.path.abspath(__file__))
|
| | input_path = os.path.join(script_dir, args.input_csv)
|
| |
|
| | if not os.path.isfile(input_path):
|
| | print(f"Input file not found: {input_path}", file=sys.stderr)
|
| | sys.exit(1)
|
| |
|
| | texts = load_texts(input_path)
|
| |
|
| | lengths_with_special, lengths_no_special, specials = compute_lengths(
|
| | texts, args.tokenizer, args.max_length, batch_size=args.batch_size
|
| | )
|
| |
|
| |
|
| | stats_with = summarize(lengths_with_special)
|
| | stats_no = summarize(lengths_no_special)
|
| |
|
| |
|
| | exceed = sum(1 for L in lengths_with_special if L > args.max_length)
|
| | total = len(lengths_with_special)
|
| | frac = (exceed / total) * 100.0 if total else 0.0
|
| |
|
| |
|
| | hist = histogram_counts(lengths_with_special, args.max_length)
|
| | total_windows, need_chunking = estimate_windows(
|
| | lengths_no_special, args.max_length, specials, args.doc_stride
|
| | )
|
| |
|
| |
|
| | print("\n=== Token Length Analysis (DeBERTa v3 base) ===")
|
| | print(f"Tokenizer: {args.tokenizer}")
|
| | print(f"max_length: {args.max_length} (includes {specials} special tokens per sequence)")
|
| | print(f"doc_stride: {args.doc_stride} (applies to content tokens)")
|
| | print(f"Samples: {total}")
|
| | print(f"Exceeding max_length: {exceed} ({frac:.2f}%)")
|
| | print(f"Samples needing chunking (by content capacity): {need_chunking}")
|
| |
|
| | print("\n-- Length stats WITH specials --")
|
| | for k, v in stats_with.items():
|
| | print(f"{k:>6}: {v}")
|
| |
|
| | print("\n-- Length stats WITHOUT specials --")
|
| | for k, v in stats_no.items():
|
| | print(f"{k:>6}: {v}")
|
| |
|
| | print("\n-- Histogram (WITH specials) --")
|
| | for label, cnt in hist:
|
| | print(f"{label}: {cnt}")
|
| |
|
| | content_capacity = args.max_length - specials
|
| | print(f"\nContent capacity per window (tokens excluding specials): {content_capacity}")
|
| | print(f"Estimated total windows if chunked with doc_stride={args.doc_stride}: {total_windows}")
|
| |
|
| | print("\nDone.")
|
| |
|
| | if __name__ == "__main__":
|
| | main()
|
| |
|