| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | import tokenizers |
| | import sys |
| | import subprocess |
| | import logging |
| | import spacy |
| | import numpy as np |
| | from tokenizers.models import BPE |
| | from tokenizers.trainers import BpeTrainer |
| | from tokenizers.pre_tokenizers import Whitespace |
| | from tokenizers.normalizers import NFKC |
| | from transformers import PreTrainedTokenizerFast |
| |
|
| | |
| |
|
| |
|
| | class SegmentationTokenizer: |
| | def __init__( |
| | self, |
| | vocab_size=32_768, |
| | min_frequency=2, |
| | max_length=1024 |
| | ): |
| | self.max_length = max_length |
| |
|
| | |
| | self.raw_tokenizer = tokenizers.Tokenizer( |
| | BPE(unk_token="[UNK]") |
| | ) |
| | self.raw_tokenizer.normalizer = NFKC() |
| | self.raw_tokenizer.pre_tokenizer = Whitespace() |
| |
|
| | self.trainer = BpeTrainer( |
| | vocab_size=vocab_size, |
| | min_frequency=min_frequency, |
| | special_tokens=["[PAD]", "[UNK]", "[CLS]", "[SEP]", "[MASK]"] |
| | ) |
| |
|
| | self._hf_tokenizer = None |
| |
|
| | |
| | def build_iterator(self, dataset, batch_size=1024): |
| | batch = [] |
| | for item in dataset: |
| | batch.append("\n".join(item["text"]).replace("\n\n", "\n")) |
| | if len(batch) == batch_size: |
| | yield batch |
| | batch = [] |
| | if batch: |
| | yield batch |
| |
|
| | def train_from_iterator(self, iterator): |
| | self.raw_tokenizer.train_from_iterator( |
| | iterator, trainer=self.trainer |
| | ) |
| |
|
| | |
| | def save(self, path): |
| | self.raw_tokenizer.save(path) |
| |
|
| | def load(self, tokenizer_path): |
| | self._hf_tokenizer = PreTrainedTokenizerFast( |
| | tokenizer_file=tokenizer_path, |
| | unk_token="[UNK]", |
| | pad_token="[PAD]", |
| | cls_token="[CLS]", |
| | sep_token="[SEP]", |
| | mask_token="[MASK]" |
| | ) |
| | return self |
| |
|
| | |
| | def compute_unk_rate(self, corpus): |
| | unk_id = self._hf_tokenizer.convert_tokens_to_ids("[UNK]") |
| |
|
| | total_tokens = 0 |
| | unk_tokens = 0 |
| |
|
| | for text in corpus: |
| | enc = self._hf_tokenizer( |
| | text, |
| | add_special_tokens=False |
| | )["input_ids"] |
| |
|
| | total_tokens += len(enc) |
| | unk_tokens += sum(1 for t in enc if t == unk_id) |
| |
|
| | return unk_tokens / total_tokens if total_tokens > 0 else 0.0 |
| |
|
| | def __call__( |
| | self, |
| | text, |
| | return_tensors="pt", |
| | padding=True, |
| | truncation=True |
| | ): |
| | """ |
| | text: str or List[str] |
| | returns: dict with input_ids and attention_mask (torch.long) |
| | """ |
| | if self._hf_tokenizer is None: |
| | raise RuntimeError("Tokenizer not loaded. Call .load() first.") |
| |
|
| | enc = self._hf_tokenizer( |
| | text, |
| | padding="max_length" if padding else False, |
| | truncation=truncation, |
| | max_length=self.max_length, |
| | return_tensors=return_tensors |
| | ) |
| |
|
| | return { |
| | "input_ids": enc["input_ids"], |
| | "attention_mask": enc["attention_mask"] |
| | } |
| |
|
| | @property |
| | def vocab_size(self): |
| | if self._hf_tokenizer is None: |
| | raise RuntimeError("Tokenizer not loaded.") |
| | return self._hf_tokenizer.vocab_size |
| |
|
| | def __repr__(self): |
| | return f"<SegmentationTokenizer vocab_size={self.trainer.vocab_size}>" |
| |
|
| |
|
| | |
| | |
| | |
| | class SentenceSegmenter: |
| | def __init__( |
| | self, |
| | max_sentences: int, |
| | spacy_model: str = "es_core_news_sm", |
| | logger: logging.Logger | None = None |
| | ): |
| | self.max_sentences = max_sentences |
| | self.logger = self._get_logger(logger) |
| | self.nlp = self.__build_model__(spacy_model, logger=self.logger) |
| |
|
| | @staticmethod |
| | def __build_model__(sentence_tokenizer_model: str, logger: logging.Logger) -> spacy.language.Language: |
| | """ |
| | Download the pre-trained sentence tokenizer model. |
| | :param sentence_tokenizer_model: The sentence tokenizer model to download. |
| | :return: The spacy language model. |
| | """ |
| | try: |
| | spacy_model = spacy.load(sentence_tokenizer_model) |
| | except OSError: |
| | result = subprocess.run( |
| | [sys.executable, "-m", "spacy", "download", sentence_tokenizer_model], |
| | capture_output=True, |
| | text=True |
| | ) |
| |
|
| | if result.returncode != 0: |
| | logger.error(f'[BEAST-Tokenizer]: Loading {sentence_tokenizer_model} failed.') |
| | raise RuntimeError(f"[BEAST-Tokenizer]: Error while downloading '{sentence_tokenizer_model}'") |
| |
|
| | spacy_model = spacy.load(sentence_tokenizer_model) |
| | logger.info('[BEAST-Tokenizer]: Successfully downloaded the pre-trained sentence tokenizer model.') |
| |
|
| | if 'parser' not in spacy_model.pipe_names: |
| | logger.error(f'[BEAST-Tokenizer]: The SpaCy model needs a parser installed.') |
| | raise RuntimeError(f'[BEAST-Tokenizer]: The SpaCy model needs a parser installed.') |
| | else: |
| | spacy_model.add_pipe("newline_segmenter_keep_exact", before="parser") |
| |
|
| | return spacy_model |
| |
|
| | @staticmethod |
| | def _get_logger(logger): |
| | if logger is None: |
| | logger = logging.getLogger(__name__) |
| | logger.addHandler(logging.NullHandler()) |
| | return logger |
| |
|
| | def __call__(self, texts: list[str]) -> dict: |
| | sentences = list() |
| | sentence_candidates = list() |
| | sentence_boundaries = list() |
| | sentence_masking = list() |
| |
|
| | for article in texts: |
| | doc = self.nlp(article) |
| | for idx, sent in enumerate(doc.sents): |
| |
|
| | if idx == 0: |
| | |
| | sentence_candidates.append(1) |
| | sentence_boundaries.append(1) |
| | elif sent.text.endswith("\n"): |
| | |
| | sentence_candidates.append(1) |
| | sentence_boundaries.append(0) |
| | else: |
| | sentence_candidates.append(0) |
| | sentence_boundaries.append(0) |
| |
|
| | sentences.append(sent.text.replace('\n', '').strip()) |
| | sentence_masking.append(1) |
| |
|
| | if len(sentences) >= self.max_sentences: |
| | self.logger.warning(f"Maximum number of sentences reached: {self.max_sentences}") |
| | break |
| |
|
| | if len(sentences) >= self.max_sentences: |
| | break |
| |
|
| | |
| | while len(sentences) < self.max_sentences: |
| | sentences.append("") |
| | sentence_candidates.append(0) |
| | sentence_boundaries.append(0) |
| | sentence_masking.append(0) |
| |
|
| | return { |
| | "sentences": sentences, |
| | "sentence_candidates": np.array(sentence_candidates, dtype=np.int8), |
| | "sentence_boundaries": np.array(sentence_boundaries, dtype=np.int8), |
| | "sentence_mask": np.array(sentence_masking, dtype=np.int8) |
| | } |
| |
|
| |
|
| | @spacy.Language.component("newline_segmenter_keep_exact") |
| | def newline_segmenter_keep_exact(doc): |
| | for token in doc[:-1]: |
| | if token.text == "\n": |
| | doc[token.i + 1].is_sent_start = True |
| | return doc |
| | |
| | |
| | |
| |
|