Jean Garcia-Gathright
added ernie files
a02c788
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
from transformers import (
AutoTokenizer,
AutoModel,
AutoConfig,
TFAutoModelForSequenceClassification,
)
from tensorflow import keras
from sklearn.model_selection import train_test_split
import logging
import time
from .models import Models, ModelsByFamily # noqa: F401
from .split_strategies import ( # noqa: F401
SplitStrategy,
SplitStrategies,
RegexExpressions
)
from .aggregation_strategies import ( # noqa: F401
AggregationStrategy,
AggregationStrategies
)
from .helper import (
get_features,
softmax,
remove_dir,
make_dir,
copy_dir
)
AUTOSAVE_PATH = './ernie-autosave/'
def clean_autosave():
remove_dir(AUTOSAVE_PATH)
class SentenceClassifier:
def __init__(self,
model_name=Models.BertBaseUncased,
model_path=None,
max_length=64,
labels_no=2,
tokenizer_kwargs=None,
model_kwargs=None):
self._loaded_data = False
self._model_path = None
if model_kwargs is None:
model_kwargs = {}
model_kwargs['num_labels'] = labels_no
if tokenizer_kwargs is None:
tokenizer_kwargs = {}
tokenizer_kwargs['max_len'] = max_length
if model_path is not None:
self._load_local_model(model_path)
else:
self._load_remote_model(model_name, tokenizer_kwargs, model_kwargs)
@property
def model(self):
return self._model
@property
def tokenizer(self):
return self._tokenizer
def load_dataset(self,
dataframe=None,
validation_split=0.1,
random_state=None,
stratify=None,
csv_path=None,
read_csv_kwargs=None):
if dataframe is None and csv_path is None:
raise ValueError
if csv_path is not None:
dataframe = pd.read_csv(csv_path, **read_csv_kwargs)
sentences = list(dataframe[dataframe.columns[0]])
labels = dataframe[dataframe.columns[1]].values
(
training_sentences,
validation_sentences,
training_labels,
validation_labels
) = train_test_split(
sentences,
labels,
test_size=validation_split,
shuffle=True,
random_state=random_state,
stratify=stratify
)
self._training_features = get_features(
self._tokenizer, training_sentences, training_labels)
self._training_size = len(training_sentences)
self._validation_features = get_features(
self._tokenizer,
validation_sentences,
validation_labels
)
self._validation_split = len(validation_sentences)
logging.info(f'training_size: {self._training_size}')
logging.info(f'validation_split: {self._validation_split}')
self._loaded_data = True
def fine_tune(self,
epochs=4,
learning_rate=2e-5,
epsilon=1e-8,
clipnorm=1.0,
optimizer_function=keras.optimizers.Adam,
optimizer_kwargs=None,
loss_function=keras.losses.SparseCategoricalCrossentropy,
loss_kwargs=None,
accuracy_function=keras.metrics.SparseCategoricalAccuracy,
accuracy_kwargs=None,
training_batch_size=32,
validation_batch_size=64,
**kwargs):
if not self._loaded_data:
raise Exception('Data has not been loaded.')
if optimizer_kwargs is None:
optimizer_kwargs = {
'learning_rate': learning_rate,
'epsilon': epsilon,
'clipnorm': clipnorm
}
optimizer = optimizer_function(**optimizer_kwargs)
if loss_kwargs is None:
loss_kwargs = {'from_logits': True}
loss = loss_function(**loss_kwargs)
if accuracy_kwargs is None:
accuracy_kwargs = {'name': 'accuracy'}
accuracy = accuracy_function(**accuracy_kwargs)
self._model.compile(optimizer=optimizer, loss=loss, metrics=[accuracy])
training_features = self._training_features.shuffle(
self._training_size).batch(training_batch_size).repeat(-1)
validation_features = self._validation_features.batch(
validation_batch_size)
training_steps = self._training_size // training_batch_size
if training_steps == 0:
training_steps = self._training_size
logging.info(f'training_steps: {training_steps}')
validation_steps = self._validation_split // validation_batch_size
if validation_steps == 0:
validation_steps = self._validation_split
logging.info(f'validation_steps: {validation_steps}')
for i in range(epochs):
self._model.fit(training_features,
epochs=1,
validation_data=validation_features,
steps_per_epoch=training_steps,
validation_steps=validation_steps,
**kwargs)
# The fine-tuned model does not have the same input interface
# after being exported and loaded again.
self._reload_model()
def predict_one(
self,
text,
split_strategy=None,
aggregation_strategy=None
):
return next(
self.predict([text],
batch_size=1,
split_strategy=split_strategy,
aggregation_strategy=aggregation_strategy))
def predict(
self,
texts,
batch_size=32,
split_strategy=None,
aggregation_strategy=None
):
if split_strategy is None:
yield from self._predict_batch(texts, batch_size)
else:
if aggregation_strategy is None:
aggregation_strategy = AggregationStrategies.Mean
split_indexes = [0]
sentences = []
for text in texts:
new_sentences = split_strategy.split(text, self.tokenizer)
if not new_sentences:
continue
split_indexes.append(split_indexes[-1] + len(new_sentences))
sentences.extend(new_sentences)
predictions = list(self._predict_batch(sentences, batch_size))
for i, split_index in enumerate(split_indexes[:-1]):
stop_index = split_indexes[i + 1]
yield aggregation_strategy.aggregate(
predictions[split_index:stop_index]
)
def dump(self, path):
if self._model_path:
copy_dir(self._model_path, path)
else:
self._dump(path)
def _dump(self, path):
make_dir(path)
make_dir(path + '/tokenizer')
self._model.save_pretrained(path)
self._tokenizer.save_pretrained(path + '/tokenizer')
self._config.save_pretrained(path + '/tokenizer')
def _predict_batch(self, sentences: list, batch_size: int):
sentences_number = len(sentences)
if batch_size > sentences_number:
batch_size = sentences_number
for i in range(0, sentences_number, batch_size):
input_ids_list = []
attention_mask_list = []
stop_index = i + batch_size
stop_index = stop_index if stop_index < sentences_number \
else sentences_number
for j in range(i, stop_index):
features = self._tokenizer.encode_plus(
sentences[j],
add_special_tokens=True,
max_length=self._tokenizer.model_max_length
)
input_ids, _, attention_mask = (
features['input_ids'],
features['token_type_ids'],
features['attention_mask']
)
input_ids = self._list_to_padded_array(features['input_ids'])
attention_mask = self._list_to_padded_array(
features['attention_mask'])
input_ids_list.append(input_ids)
attention_mask_list.append(attention_mask)
input_dict = {
'input_ids': np.array(input_ids_list),
'attention_mask': np.array(attention_mask_list)
}
logit_predictions = self._model.predict_on_batch(input_dict)
yield from (
[softmax(logit_prediction)
for logit_prediction in logit_predictions[0]]
)
def _list_to_padded_array(self, items):
array = np.array(items)
padded_array = np.zeros(self._tokenizer.model_max_length, dtype=np.int)
padded_array[:array.shape[0]] = array
return padded_array
def _get_temporary_path(self, name=''):
return f'{AUTOSAVE_PATH}{name}/{int(round(time.time() * 1000))}'
def _reload_model(self):
self._model_path = self._get_temporary_path(
name=self._get_model_family())
self._dump(self._model_path)
self._load_local_model(self._model_path)
def _load_local_model(self, model_path):
try:
self._tokenizer = AutoTokenizer.from_pretrained(
model_path + '/tokenizer')
self._config = AutoConfig.from_pretrained(
model_path + '/tokenizer')
# Old models didn't use to have a tokenizer folder
except OSError:
self._tokenizer = AutoTokenizer.from_pretrained(model_path)
self._config = AutoConfig.from_pretrained(model_path)
self._model = TFAutoModelForSequenceClassification.from_pretrained(
model_path,
from_pt=False
)
def _get_model_family(self):
model_family = ''.join(self._model.name[2:].split('_')[:2])
return model_family
def _load_remote_model(self, model_name, tokenizer_kwargs, model_kwargs):
do_lower_case = False
if 'uncased' in model_name.lower():
do_lower_case = True
tokenizer_kwargs.update({'do_lower_case': do_lower_case})
self._tokenizer = AutoTokenizer.from_pretrained(
model_name, **tokenizer_kwargs)
self._config = AutoConfig.from_pretrained(model_name)
temporary_path = self._get_temporary_path()
make_dir(temporary_path)
# TensorFlow model
try:
self._model = TFAutoModelForSequenceClassification.from_pretrained(
model_name,
from_pt=False
)
# PyTorch model
except TypeError:
try:
self._model = \
TFAutoModelForSequenceClassification.from_pretrained(
model_name,
from_pt=True
)
# Loading a TF model from a PyTorch checkpoint is not supported
# when using a model identifier name
except OSError:
model = AutoModel.from_pretrained(model_name)
model.save_pretrained(temporary_path)
self._model = \
TFAutoModelForSequenceClassification.from_pretrained(
temporary_path,
from_pt=True
)
# Clean the model's last layer if the provided properties are different
clean_last_layer = False
for key, value in model_kwargs.items():
if not hasattr(self._model.config, key):
clean_last_layer = True
break
if getattr(self._model.config, key) != value:
clean_last_layer = True
break
if clean_last_layer:
try:
getattr(self._model, self._get_model_family()
).save_pretrained(temporary_path)
self._model = self._model.__class__.from_pretrained(
temporary_path,
from_pt=False,
**model_kwargs
)
# The model is itself the main layer
except AttributeError:
# TensorFlow model
try:
self._model = self._model.__class__.from_pretrained(
model_name,
from_pt=False,
**model_kwargs
)
# PyTorch Model
except (OSError, TypeError):
model = AutoModel.from_pretrained(model_name)
model.save_pretrained(temporary_path)
self._model = self._model.__class__.from_pretrained(
temporary_path,
from_pt=True,
**model_kwargs
)
remove_dir(temporary_path)
assert self._tokenizer and self._model