Spaces:
Sleeping
Sleeping
| from typing import Optional | |
| from transformers import AutoTokenizer, AutoModel | |
| import numpy as np | |
| import os | |
| import torch | |
| from torch import Tensor | |
| from transformers import BatchEncoding, PreTrainedTokenizerBase | |
| import json | |
| class ModelUtils : | |
| def __init__(self, model_root) : | |
| self.model_root = model_root | |
| self.model_path = os.path.join(model_root, "model") | |
| self.tokenizer_path = os.path.join(model_root, "tokenizer") | |
| def download_model (self) : | |
| BASE_MODEL = "HooshvareLab/bert-fa-zwnj-base" | |
| tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL) | |
| model = AutoModel.from_pretrained(BASE_MODEL) | |
| tokenizer.save_pretrained(self.tokenizer_path) | |
| model.save_pretrained(self.model_path) | |
| def make_dirs (self) : | |
| if not os.path.isdir(self.model_root) : | |
| os.mkdir(self.model_root) | |
| if not os.path.isdir(self.model_path) : | |
| os.mkdir(self.model_path) | |
| if not os.path.isdir(self.tokenizer_path) : | |
| os.mkdir(self.tokenizer_path) | |
| class Preprocess : | |
| def __init__(self, model_root) : | |
| self.model_root = model_root | |
| self.model_path = os.path.join(model_root, "model") | |
| self.tokenizer_path = os.path.join(model_root, "tokenizer") | |
| self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') | |
| def vectorize (self, text) : | |
| model = AutoModel.from_pretrained(self.model_path).to(self.device) | |
| tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_path) | |
| ids, masks = self.transform_single_text(text, tokenizer, 510, stride=510, minimal_chunk_length=0, maximal_text_length=None) | |
| # ids = torch.cat(ids, dim=0) | |
| # masks = torch.cat(masks, dim=0) | |
| tokens = {'input_ids': ids.to(self.device), 'attention_mask': masks.to(self.device)} | |
| output = model(**tokens) | |
| last_hidden_states = output.last_hidden_state | |
| # first token embedding of shape <1, hidden_size> | |
| # first_token_embedding = last_hidden_states[:,0,:] | |
| # pooled embedding of shape <1, hidden_size> | |
| mean_pooled_embedding = last_hidden_states.mean(axis=1) | |
| result = mean_pooled_embedding.mean(axis=0).flatten().cpu().detach().numpy() | |
| # print(result.shape) | |
| # print(result) | |
| # Convert the list to JSON | |
| json_data = json.dumps(result.tolist()) | |
| return json_data | |
| def transform_list_of_texts( | |
| self, | |
| texts: list[str], | |
| tokenizer: PreTrainedTokenizerBase, | |
| chunk_size: int, | |
| stride: int, | |
| minimal_chunk_length: int, | |
| maximal_text_length: Optional[int] = None, | |
| ) -> BatchEncoding: | |
| model_inputs = [ | |
| self.transform_single_text(text, tokenizer, chunk_size, stride, minimal_chunk_length, maximal_text_length) | |
| for text in texts | |
| ] | |
| input_ids = [model_input[0] for model_input in model_inputs] | |
| attention_mask = [model_input[1] for model_input in model_inputs] | |
| tokens = {"input_ids": input_ids, "attention_mask": attention_mask} | |
| return input_ids, attention_mask | |
| def transform_single_text( | |
| self, | |
| text: str, | |
| tokenizer: PreTrainedTokenizerBase, | |
| chunk_size: int, | |
| stride: int, | |
| minimal_chunk_length: int, | |
| maximal_text_length: Optional[int], | |
| ) -> tuple[Tensor, Tensor]: | |
| """Transforms (the entire) text to model input of BERT model.""" | |
| if maximal_text_length: | |
| tokens = self.tokenize_text_with_truncation(text, tokenizer, maximal_text_length) | |
| else: | |
| tokens = self.tokenize_whole_text(text, tokenizer) | |
| input_id_chunks, mask_chunks = self.split_tokens_into_smaller_chunks(tokens, chunk_size, stride, minimal_chunk_length) | |
| self.add_special_tokens_at_beginning_and_end(input_id_chunks, mask_chunks) | |
| self.add_padding_tokens(input_id_chunks, mask_chunks) | |
| input_ids, attention_mask = self.stack_tokens_from_all_chunks(input_id_chunks, mask_chunks) | |
| return input_ids, attention_mask | |
| def tokenize_whole_text(self, text: str, tokenizer: PreTrainedTokenizerBase) -> BatchEncoding: | |
| """Tokenizes the entire text without truncation and without special tokens.""" | |
| tokens = tokenizer(text, add_special_tokens=False, truncation=False, return_tensors="pt") | |
| return tokens | |
| def tokenize_text_with_truncation( | |
| self, text: str, tokenizer: PreTrainedTokenizerBase, maximal_text_length: int | |
| ) -> BatchEncoding: | |
| """Tokenizes the text with truncation to maximal_text_length and without special tokens.""" | |
| tokens = tokenizer( | |
| text, add_special_tokens=False, max_length=maximal_text_length, truncation=True, return_tensors="pt" | |
| ) | |
| return tokens | |
| def split_tokens_into_smaller_chunks( | |
| self, | |
| tokens: BatchEncoding, | |
| chunk_size: int, | |
| stride: int, | |
| minimal_chunk_length: int, | |
| ) -> tuple[list[Tensor], list[Tensor]]: | |
| """Splits tokens into overlapping chunks with given size and stride.""" | |
| input_id_chunks = self.split_overlapping(tokens["input_ids"][0], chunk_size, stride, minimal_chunk_length) | |
| mask_chunks = self.split_overlapping(tokens["attention_mask"][0], chunk_size, stride, minimal_chunk_length) | |
| return input_id_chunks, mask_chunks | |
| def add_special_tokens_at_beginning_and_end(self, input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> None: | |
| """ | |
| Adds special CLS token (token id = 101) at the beginning. | |
| Adds SEP token (token id = 102) at the end of each chunk. | |
| Adds corresponding attention masks equal to 1 (attention mask is boolean). | |
| """ | |
| for i in range(len(input_id_chunks)): | |
| # adding CLS (token id 101) and SEP (token id 102) tokens | |
| input_id_chunks[i] = torch.cat([Tensor([101]), input_id_chunks[i], Tensor([102])]) | |
| # adding attention masks corresponding to special tokens | |
| mask_chunks[i] = torch.cat([Tensor([1]), mask_chunks[i], Tensor([1])]) | |
| def add_padding_tokens(self, input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> None: | |
| """Adds padding tokens (token id = 0) at the end to make sure that all chunks have exactly 512 tokens.""" | |
| for i in range(len(input_id_chunks)): | |
| # get required padding length | |
| pad_len = 512 - input_id_chunks[i].shape[0] | |
| # check if tensor length satisfies required chunk size | |
| if pad_len > 0: | |
| # if padding length is more than 0, we must add padding | |
| input_id_chunks[i] = torch.cat([input_id_chunks[i], Tensor([0] * pad_len)]) | |
| mask_chunks[i] = torch.cat([mask_chunks[i], Tensor([0] * pad_len)]) | |
| def stack_tokens_from_all_chunks(self, input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> tuple[Tensor, Tensor]: | |
| """Reshapes data to a form compatible with BERT model input.""" | |
| input_ids = torch.stack(input_id_chunks) | |
| attention_mask = torch.stack(mask_chunks) | |
| return input_ids.long(), attention_mask.int() | |
| def split_overlapping(self, tensor: Tensor, chunk_size: int, stride: int, minimal_chunk_length: int) -> list[Tensor]: | |
| """Helper function for dividing 1-dimensional tensors into overlapping chunks.""" | |
| self.check_split_parameters_consistency(chunk_size, stride, minimal_chunk_length) | |
| result = [tensor[i : i + chunk_size] for i in range(0, len(tensor), stride)] | |
| if len(result) > 1: | |
| # ignore chunks with less than minimal_length number of tokens | |
| result = [x for x in result if len(x) >= minimal_chunk_length] | |
| return result | |
| def check_split_parameters_consistency(self, chunk_size: int, stride: int, minimal_chunk_length: int) -> None: | |
| if chunk_size > 510: | |
| raise RuntimeError("Size of each chunk cannot be bigger than 510!") | |
| if minimal_chunk_length > chunk_size: | |
| raise RuntimeError("Minimal length cannot be bigger than size!") | |
| if stride > chunk_size: | |
| raise RuntimeError( | |
| "Stride cannot be bigger than size! Chunks must overlap or be near each other!" | |
| ) | |