Spaces:
Sleeping
Sleeping
#!/usr/bin/env python | |
# coding: utf-8 | |
import pandas as pd | |
from torch.utils.data import Dataset | |
class LanguageDataset(Dataset): | |
def __init__(self, df, tokenizer): | |
# Make sure data is compatible | |
if len(df.columns) !=2: | |
raise Exception("Dataset can only have two columns!") | |
self.data = df.to_dict(orient='records') | |
self.tokenizer = tokenizer | |
# set the length of smallest square needed | |
self.max_length = smallest_square_length(df) | |
self.labels = df.columns | |
def __len__(self): | |
return len(self.data) | |
def __getitem__(self, i): | |
X = self.data[i][self.labels[0]] | |
Y = self.data[i][self.labels[1]] | |
if str(type(self.tokenizer)) == "<class 'transformers.models.gpt2.tokenization_gpt2.GPT2Tokenizer'>": | |
return self.tokenizer.encode_plus(X + ' | ' + Y, | |
return_tensors='pt', | |
max_length = self.max_length, | |
padding='max_length', | |
truncation=True) | |
elif str(type(self.tokenizer)) == "<class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>": | |
input_tokens = self.tokenizer.encode_plus( | |
X, | |
max_length=self.max_length, | |
padding='max_length', | |
truncation=True, | |
return_tensors='pt' | |
) | |
target_tokens = self.tokenizer.encode_plus( | |
Y, | |
max_length=self.max_length, | |
padding='max_length', | |
truncation=True, | |
return_tensors='pt' | |
) | |
return { | |
'input_ids': input_tokens['input_ids'].squeeze(), | |
# 'attention_mask': input_tokens['attention_mask'].squeeze(), | |
'labels': target_tokens['input_ids'].squeeze() | |
} | |
def smallest_square_length(df): | |
col1 = df[df.columns[0]].astype(str).apply(lambda x: len(x)).max() | |
col2 = df[df.columns[1]].astype(str).apply(lambda x: len(x)).max() | |
max_length = max(col1, col2) | |
x = 2 | |
while x < max_length: | |
x = x * 2 | |
return x | |
def levenshtein_distance(str1, str2): | |
""" | |
Computes the Levenshtein distance between two strings. | |
Parameters: | |
str1 (str): The first string. | |
str2 (str): The second string. | |
Returns: | |
int: The Levenshtein distance between the two strings. | |
""" | |
m, n = len(str1), len(str2) | |
dp = [[0] * (n + 1) for _ in range(m + 1)] | |
for i in range(m + 1): | |
dp[i][0] = i | |
for j in range(n + 1): | |
dp[0][j] = j | |
for i in range(1, m + 1): | |
for j in range(1, n + 1): | |
if str1[i - 1] == str2[j - 1]: | |
dp[i][j] = dp[i - 1][j - 1] | |
else: | |
dp[i][j] = 1 + min(dp[i - 1][j], dp[i][j - 1], dp[i - 1][j - 1]) | |
return dp[m][n] | |
def grid_search(model, tokenizer, input_str, topK_values, topP_values, temperature_values, repetition_penalty_values, expected_output): | |
""" | |
Conducts a grid search over specified hyperparameters to find the best text generation settings (GPT series). | |
Parameters: | |
- model: The pre-trained model used for text generation. | |
- tokenizer: The tokenizer associated with the model. | |
- input_str: The input string to the model for text generation. | |
- topK_values: A list of integer values for the topK sampling hyperparameter. | |
- topP_values: A list of float values for the topP (nucleus) sampling hyperparameter. | |
- temperature_values: A list of float values for the temperature setting of the model. | |
- repetition_penalty_values: A list of float values for penalizing repetitions in the generated text. | |
- expected_output: The expected output string against which generated texts are evaluated using the Levenshtein distance. | |
Returns: | |
- results: A pandas DataFrame containing the combination of hyperparameters, the generated output for each combination, and its Levenshtein distance from the expected output. | |
Notes: | |
- The function prints out the best hyperparameters found during the search, based on the smallest Levenshtein distance. | |
- Levenshtein distance measures the number of edits required to transform one string into another. | |
""" | |
results = pd.DataFrame(columns=['topK', 'topP', 'temperature', 'repetition_penalty', 'generated_output', 'levenshtein_distance']) | |
min_distance = 9999999 | |
for topK in topK_values: | |
for topP in topP_values: | |
for temperature in temperature_values: | |
for repetition_penalty in repetition_penalty_values: | |
# try: | |
generated_output = model.generate_text(input_str, topK, topP, temperature, repetition_penalty) | |
# print(generated_output) | |
distance = levenshtein_distance(generated_output, expected_output) | |
if distance < min_distance: | |
print(f'topK={topK}, topP={topP}, temperature={temperature}, repetition_penalty={repetition_penalty}, levenshtein_distance={distance}') | |
min_distance = distance | |
new_row = {'topK': topK, | |
'topP': topP, | |
'temperature': temperature, | |
'repetition_penalty': repetition_penalty, | |
'generated_output': generated_output, | |
'levenshtein_distance': distance | |
} | |
results.loc[len(results)] = new_row | |
return results.sort_values(by='levenshtein_distance', ascending=True) | |
def to_coreml(gpt_model, path=''): | |
import torch | |
device = torch.device('mps') | |
if torch.cuda.is_available(): | |
device = torch.device('cuda') | |
else: | |
try: | |
device = torch.device('mps') # Apple Silicon | |
except Exception: | |
device = torch.device('cpu') | |
if path != '': lm_head_model = torch.load(path, map_location=device) | |
else: lm_head_model = gpt_model.model | |
""" | |
Recreate the Core ML model from scratch using | |
coremltools' neural_network.NeuralNetworkBuilder | |
""" | |
import coremltools | |
import coremltools.models.datatypes as datatypes | |
from coremltools.models import neural_network as neural_network | |
from coremltools.models.utils import save_spec | |
import numpy as np | |
import torch | |
model_name = 'model' | |
model = lm_head_model.transformer | |
wte = model.wte.weight.data.cpu().numpy().transpose() # shape (768, 50257) /!\ i hate this | |
wpe = model.wpe.weight.data.cpu().numpy().transpose() # shape (768, 1024) | |
sequence_length = 128 | |
steps = model.config.n_layer | |
# build model | |
input_features = [ | |
('input_ids', datatypes.Array(sequence_length)), | |
('position_ids', datatypes.Array(sequence_length)), | |
] | |
output_features = [('output_logits', None)] | |
builder = neural_network.NeuralNetworkBuilder( | |
input_features, | |
output_features, | |
mode=None, | |
disable_rank5_shape_mapping=True, | |
) | |
builder.add_expand_dims( | |
name='input_ids_expanded_to_rank5', | |
input_name='input_ids', | |
output_name='input_ids_expanded_to_rank5', | |
axes=(1, 2, 3, 4) | |
) | |
builder.add_expand_dims( | |
name='position_ids_expanded_to_rank5', | |
input_name='position_ids', | |
output_name='position_ids_expanded_to_rank5', | |
axes=(1, 2, 3, 4) | |
) | |
builder.add_embedding( | |
name='token_embeddings', | |
input_name='input_ids_expanded_to_rank5', | |
output_name='token_embeddings', | |
W=wte, | |
b=None, | |
input_dim=50257, | |
output_channels=768, | |
has_bias=False, | |
) | |
builder.add_embedding( | |
name='positional_embeddings', | |
input_name='position_ids_expanded_to_rank5', | |
output_name='positional_embeddings', | |
W=wpe, | |
b=None, | |
input_dim=1024, | |
output_channels=768, | |
has_bias=False, | |
) | |
# Input:, Output: (seq, 1, 768, 1, 1) | |
builder.add_add_broadcastable( | |
name='embeddings_addition', | |
input_names=['token_embeddings', 'positional_embeddings'], | |
output_name=f'{0}_previous_block' | |
) | |
for i in range(steps): | |
print(i) | |
ln_weight = model.h[i].ln_1.weight.data.cpu().numpy().reshape((1, 1, 768, 1, 1)) | |
ln_bias = model.h[i].ln_1.bias.data.cpu().numpy().reshape((1, 1, 768, 1, 1)) | |
ln_epsilon = model.h[i].ln_1.eps | |
builder.add_mvn( | |
name=f"{i}_block_ln_1", | |
input_name=f"{i}_previous_block", | |
# output_name=f"{i}_block_ln_1_output", | |
output_name=f"{i}_block_ln_1", | |
across_channels=True, | |
normalize_variance=True, | |
epsilon=ln_epsilon | |
) | |
builder.add_scale( | |
name=f"{i}_block_ln_1_scaled", | |
input_name=f"{i}_block_ln_1", | |
output_name=f"{i}_block_ln_1_scaled", | |
W=ln_weight, | |
b=ln_bias, | |
has_bias=True, | |
shape_scale=[768], | |
shape_bias=[768] | |
) | |
builder.add_transpose( | |
name=f"{i}_block_ln_1_reshape", | |
input_name=f"{i}_block_ln_1_scaled", | |
output_name=f"{i}_block_ln_1_scaled_transposed", | |
axes=(1, 0, 2, 3, 4) | |
) | |
conv_1D_bias = model.h[i].attn.c_attn.bias.data.cpu().numpy().reshape((1, 1, 2304, 1, 1)) | |
conv_1D_weights = model.h[i].attn.c_attn.weight.cpu().data.numpy().transpose().reshape((1, 768, 2304, 1, 1)) | |
builder.add_inner_product( | |
name=f"{i}_block_attn_conv", | |
input_name=f"{i}_block_ln_1_scaled_transposed", | |
output_name=f"{i}_block_attn_conv", | |
input_channels=768, | |
output_channels=2304, | |
W=conv_1D_weights, | |
b=conv_1D_bias, | |
has_bias=True | |
) | |
builder.add_split( | |
name=f"{i}_block_attn_qkv_split", | |
input_name=f"{i}_block_attn_conv", | |
output_names=[f"{i}_block_attn_q", f"{i}_block_attn_k", f"{i}_block_attn_v"] | |
) | |
builder.add_rank_preserving_reshape( | |
name=f"{i}_block_attn_q_reshape", | |
input_name=f"{i}_block_attn_q", | |
output_name=f"{i}_block_attn_q_reshape", | |
output_shape=(1, 1, sequence_length, 12, 64) | |
) | |
builder.add_transpose( | |
name=f"{i}_block_attn_q_reshape_permuted", | |
input_name=f"{i}_block_attn_q_reshape", | |
output_name=f"{i}_block_attn_q_reshape_permuted", | |
axes=(0, 1, 3, 2, 4) | |
) | |
builder.add_rank_preserving_reshape( | |
name=f"{i}_block_attn_k_reshape", | |
input_name=f"{i}_block_attn_k", | |
output_name=f"{i}_block_attn_k_reshape", | |
output_shape=(1, 1, sequence_length, 12, 64) | |
) | |
builder.add_transpose( | |
name=f"{i}_block_attn_k_reshape_permuted", | |
input_name=f"{i}_block_attn_k_reshape", | |
output_name=f"{i}_block_attn_k_reshape_permuted", | |
axes=(0, 1, 3, 4, 2) | |
) | |
builder.add_rank_preserving_reshape( | |
name=f"{i}_block_attn_v_reshape", | |
input_name=f"{i}_block_attn_v", | |
output_name=f"{i}_block_attn_v_reshape", | |
output_shape=(1, 1, sequence_length, 12, 64) | |
) | |
builder.add_transpose( | |
name=f"{i}_block_attn_v_reshape_permuted", | |
input_name=f"{i}_block_attn_v_reshape", | |
output_name=f"{i}_block_attn_v_reshape_permuted", | |
axes=(0, 1, 3, 2, 4) | |
) | |
builder.add_batched_mat_mul( | |
name=f"{i}_block_attn_qv_matmul", | |
input_names=[f"{i}_block_attn_q_reshape_permuted", f"{i}_block_attn_k_reshape_permuted"], | |
output_name=f"{i}_block_attn_qv_matmul" | |
) | |
builder.add_scale( | |
name=f"{i}_block_attn_qv_matmul_scaled", | |
input_name=f"{i}_block_attn_qv_matmul", | |
output_name=f"{i}_block_attn_qv_matmul_scaled", | |
W=np.array(1/8), | |
b=0, | |
has_bias=False | |
) | |
bias_0 = model.h[i].attn.bias | |
nd = ns = sequence_length | |
b = (model.h[i].attn.bias[:, :, ns-nd:ns, :ns]).unsqueeze(0) | |
builder.add_scale( | |
name=f"{i}_block_attn_bias", | |
input_name=f"{i}_block_attn_qv_matmul_scaled", | |
output_name=f"{i}_block_attn_bias", | |
W=b, | |
b=None, | |
has_bias=False, | |
shape_scale=[1, sequence_length, sequence_length] | |
) | |
bias_constant_0 = -1e4 * torch.logical_not(b) | |
builder.add_bias( | |
name=f"{i}_block_attn_afterbias", | |
input_name=f"{i}_block_attn_bias", | |
output_name=f"{i}_block_attn_afterbias", | |
# output_name=f"output_logits", | |
b=bias_constant_0, | |
shape_bias=[1, sequence_length, sequence_length], | |
) | |
builder.add_squeeze( | |
name=f"{i}_squeezit", | |
input_name=f"{i}_block_attn_afterbias", | |
output_name=f"{i}_squeezit", | |
axes=[0, 1] | |
) | |
builder.add_softmax( | |
name=f"{i}_block_attn_softmax", | |
input_name=f"{i}_squeezit", | |
output_name=f"{i}_block_attn_softmax", | |
) | |
builder.add_expand_dims( | |
name=f"{i}_expandit", | |
input_name=f"{i}_block_attn_softmax", | |
output_name=f"{i}_expandit", | |
axes=[0, 1] | |
) | |
builder.add_batched_mat_mul( | |
name=f"{i}_block_full_attention", | |
input_names=[f"{i}_expandit", f"{i}_block_attn_v_reshape_permuted"], | |
output_name=f"{i}_block_full_attention" | |
) | |
builder.add_transpose( | |
name=f"{i}_block_full_attention_merged_t", | |
input_name=f"{i}_block_full_attention", | |
output_name=f"{i}_block_full_attention_merged_t", | |
axes=[0, 1, 3, 2, 4] | |
) | |
builder.add_rank_preserving_reshape( | |
name=f"{i}_block_full_attention_merged", | |
input_name=f"{i}_block_full_attention_merged_t", | |
output_name=f"{i}_block_full_attention_merged", | |
output_shape=[1, 1, 1, sequence_length, 768] | |
) | |
builder.add_transpose( | |
name=f"{i}_block_attn_conv_proj_t", | |
input_name=f"{i}_block_full_attention_merged", | |
output_name=f"{i}_block_attn_conv_proj_t", | |
axes=[0, 3, 4, 1, 2] | |
) | |
conv_1D_proj_bias = model.h[i].attn.c_proj.bias.data.cpu().numpy().reshape((1, 1, 768, 1, 1)) | |
conv_1D_proj_weights = model.h[i].attn.c_proj.weight.data.cpu().numpy().transpose().reshape((1, 768, 768, 1, 1)) | |
# Input:, Output: (1, 3, 768, 1, 1) | |
builder.add_inner_product( | |
name=f"{i}_block_attn_conv_proj", | |
input_name=f"{i}_block_attn_conv_proj_t", | |
output_name=f"{i}_block_attn_conv_proj", | |
input_channels=768, | |
output_channels=768, | |
W=conv_1D_proj_weights, | |
b=conv_1D_proj_bias, | |
has_bias=True | |
) | |
# Input: (seq, 1, 768, 1, 1), Output: (1, seq, 768, 1, 1) | |
builder.add_transpose( | |
name=f"{i}_previous_block_t", | |
input_name=f'{i}_previous_block', | |
output_name=f"{i}_previous_block_t", | |
axes=[1, 0, 2, 3, 4] | |
) | |
# Input: [(1, seq, 768, 1, 1), (1, seq, 768, 1, 1)], Output: (1, seq, 768, 1, 1) | |
builder.add_add_broadcastable( | |
name=f"{i}_block_xa_sum", | |
input_names=[f"{i}_previous_block_t", f"{i}_block_attn_conv_proj"], | |
output_name=f"{i}_block_xa_sum", | |
# output_name=f"output_logits" | |
) | |
ln_2_weight = model.h[i].ln_2.weight.data.cpu().numpy().reshape((1, 1, 768, 1, 1)) | |
ln_2_bias = model.h[i].ln_2.bias.data.cpu().numpy().reshape((1, 1, 768, 1, 1)) | |
ln_2_epsilon = model.h[i].ln_2.eps | |
# Input: (1, seq, 768, 1, 1), Output: | |
builder.add_mvn( | |
name=f"{i}_block_ln_2", | |
input_name=f"{i}_block_xa_sum", | |
output_name=f"{i}_block_ln_2", | |
across_channels=True, | |
normalize_variance=True, | |
epsilon=ln_2_epsilon | |
) | |
builder.add_scale( | |
name=f"{i}_block_ln_2_scaled", | |
input_name=f"{i}_block_ln_2", | |
# output_name=f"output_logits", | |
output_name=f"{i}_block_ln_2_scaled", | |
W=ln_2_weight, | |
b=ln_2_bias, | |
has_bias=True, | |
shape_scale=[768], | |
shape_bias=[768] | |
) | |
mlp_conv_1D_fc_bias = model.h[i].mlp.c_fc.bias.data.cpu().numpy().reshape((1, 1, 3072, 1, 1)) | |
mlp_conv_1D_fc_weights = model.h[i].mlp.c_fc.weight.data.cpu().numpy().transpose().reshape((1, 768, 3072, 1, 1)) | |
# Input:, Output: (1, 3, 3072, 1, 1) | |
builder.add_inner_product( | |
name=f"{i}_block_mlp_conv_fc", | |
input_name=f"{i}_block_ln_2_scaled", | |
output_name=f"{i}_block_mlp_conv_fc", | |
# output_name=f"output_logits", | |
input_channels=768, | |
output_channels=3072, | |
W=mlp_conv_1D_fc_weights, | |
b=mlp_conv_1D_fc_bias, | |
has_bias=True | |
) | |
builder.add_gelu( | |
name=f"{i}_block_mlp_gelu", | |
input_name=f"{i}_block_mlp_conv_fc", | |
output_name=f"{i}_block_mlp_gelu", | |
# output_name=f"output_logits", | |
mode='TANH_APPROXIMATION' | |
) | |
mlp_conv_1D_proj_bias = model.h[i].mlp.c_proj.bias.data.cpu().numpy().reshape((1, 1, 768, 1, 1)) | |
mlp_conv_1D_proj_weights = model.h[i].mlp.c_proj.weight.data.cpu().numpy().transpose().reshape((1, 3072, 768, 1, 1)) | |
# Input:, Output: (1, 3, 3072, 1, 1) | |
builder.add_inner_product( | |
name=f"{i}_block_mlp_conv_proj", | |
input_name=f"{i}_block_mlp_gelu", | |
output_name=f"{i}_block_mlp_conv_proj", | |
# output_name=f"output_logits", | |
input_channels=3072, | |
output_channels=768, | |
W=mlp_conv_1D_proj_weights, | |
b=mlp_conv_1D_proj_bias, | |
has_bias=True | |
) | |
builder.add_add_broadcastable( | |
name=f"{i}_block_xm_sum", | |
input_names=[f"{i}_block_xa_sum", f"{i}_block_mlp_conv_proj"], | |
# output_name=f"output_logits" | |
output_name=f"{i + 1}_previous_block_final" | |
) | |
builder.add_transpose( | |
name=f"{i}_block_xm_sum_t", | |
input_name=f"{i + 1}_previous_block_final", | |
output_name=f"{i + 1}_previous_block", | |
axes=[1, 0, 2, 3, 4] | |
) | |
ln_f_weight = model.ln_f.weight.data.cpu().numpy().reshape((1, 1, 768, 1, 1)) | |
ln_f_bias = model.ln_f.bias.data.cpu().numpy().reshape((1, 1, 768, 1, 1)) | |
ln_f_epsilon = model.ln_f.eps | |
# Input: (1, seq, 768, 1, 1), Output: | |
builder.add_mvn( | |
name=f"ln_f", | |
input_name=f"{steps}_previous_block_final", | |
output_name=f"ln_f", | |
# output_name=f"output_logits", | |
across_channels=True, | |
normalize_variance=True, | |
epsilon=ln_f_epsilon | |
) | |
builder.add_scale( | |
name=f"ln_f_scaled", | |
input_name=f"ln_f", | |
output_name=f"ln_f_scaled", | |
# output_name=f"output_logits", | |
W=ln_f_weight, | |
b=ln_f_bias, | |
has_bias=True, | |
shape_scale=[768], | |
shape_bias=[768] | |
) | |
lm_head_weights = lm_head_model.lm_head.weight.data.cpu().numpy().reshape((1, 50257, 768, 1, 1)) | |
builder.add_inner_product( | |
name="lm_head", | |
input_name="ln_f_scaled", | |
output_name="output_logits", | |
input_channels=768, | |
output_channels=50257, | |
W=lm_head_weights, | |
b=None, | |
has_bias=False | |
) | |
# compile spec to model | |
mlmodel = coremltools.models.MLModel(builder.spec) | |
save_spec(builder.spec, f'{model_name}-{sequence_length}-{steps}.mlmodel') |