Spaces:
Sleeping
Sleeping
import gradio as gr | |
import numpy as np | |
from keras.models import Model | |
from keras.saving import load_model | |
from keras.layers import * | |
from keras.regularizers import L1 | |
from keras.constraints import Constraint | |
from tensorflow.keras.optimizers import RMSprop | |
from keras.preprocessing.text import Tokenizer | |
import keras.backend as K | |
import os | |
import hashlib | |
import keras | |
os.mkdir("cache") | |
class ValueConstraint(Constraint): | |
def __init__(self, min_value: float = -1, max_value: float = 1): | |
self.min_value = min_value | |
self.max_value = max_value | |
def __call__(self, w): | |
return K.clip(w, self.min_value, self.max_value) | |
def todset(text: str): | |
lines = [x.rstrip("\n").lower().split("→") for x in text.split("\n")] | |
lines = [(x[0].replace("\\n", "\n"), x[1].replace("\\n", "\n")) for x in lines] | |
responses = [] | |
for i in lines: | |
if i[1] not in responses: | |
responses.append(i[1]) | |
dset = {} | |
for sample in lines: | |
dset[sample[0]] = responses.index(sample[1]) | |
return (dset, responses) | |
def hash_str(data: str): | |
return hashlib.md5(data.encode('utf-8')).hexdigest() | |
def train(message: str = "", regularization: float = 0.0001, dropout: float = 0.1, learning_rate: float = 0.001, epochs: int = 16, emb_size: int = 128, input_len: int = 16, kernels_count: int = 8, kernel_size: int = 8, left_padding: bool = True, end_activation: str = "softmax", data: str = ""): | |
data_hash = None | |
if "→" not in data or "\n" not in data: | |
if data in os.listdir("cache"): # data = filename | |
data_hash = data # set the hash to the file name | |
else: | |
return "Dataset example:\nquestion→answer\nquestion→answer\netc." | |
dset, responses = todset(data) | |
resps_len = len(responses) | |
tokenizer = Tokenizer() | |
tokenizer.fit_on_texts(list(dset.keys())) | |
vocab_size = len(tokenizer.word_index) + 1 | |
inp_len = input_len | |
if data_hash is None: | |
data_hash = hash_str(data)+"_"+str(regularization)+"_"+str(dropout)+"_"+str(learning_rate)+"_"+str(epochs)+"_"+str(emb_size)+"_"+str(inp_len)+"_"+str(kernels_count)+"_"+str(kernel_size)+"_"+str(left_padding)+"_"+end_activation+".keras" | |
elif message == "!getmodelhash": | |
return data_hash | |
else: | |
inp_len = int(data_hash.split("_")[-3]) | |
if data_hash in os.listdir("cache"): | |
model = load_model("cache/"+data_hash) | |
else: | |
input_layer = Input(shape=(inp_len,)) | |
emb_layer = Embedding(input_dim=vocab_size, output_dim=emb_size, input_length=inp_len)(input_layer) | |
dropout1_layer = Dropout(dropout)(emb_layer) | |
attn_layer = MultiHeadAttention(num_heads=4, key_dim=128)(dropout1_layer, dropout1_layer, dropout1_layer) | |
noise_layer = GaussianNoise(0.1)(attn_layer) | |
conv1_layer = Conv1D(kernels_count, kernel_size, padding='same', activation='relu', strides=1, input_shape=(64, 128), kernel_regularizer=L1(regularization))(noise_layer) | |
conv2_layer = Conv1D(16, 4, padding='same', activation='relu', strides=1, kernel_regularizer=L1(regularization))(conv1_layer) | |
conv3_layer = Conv1D(8, 2, padding='same', activation='relu', strides=1, kernel_regularizer=L1(regularization))(conv2_layer) | |
flatten_layer = Flatten()(conv3_layer) | |
attn_flatten_layer = Flatten()(attn_layer) | |
conv1_flatten_layer = Flatten()(conv1_layer) | |
conv2_flatten_layer = Flatten()(conv2_layer) | |
conv3_flatten_layer = Flatten()(conv3_layer) | |
concat1_layer = Concatenate()([flatten_layer, attn_flatten_layer, conv1_flatten_layer, conv2_flatten_layer, conv3_flatten_layer]) | |
dropout2_layer = Dropout(dropout)(concat1_layer) | |
dense1_layer = Dense(2048, activation="linear", kernel_regularizer=L1(regularization))(dropout2_layer) | |
prelu1_layer = PReLU(alpha_constraint=ValueConstraint())(dense1_layer) | |
dropout3_layer = Dropout(dropout)(prelu1_layer) | |
dense2_layer = Dense(1024, activation="relu", kernel_regularizer=L1(regularization))(dropout3_layer) | |
dropout4_layer = Dropout(dropout)(dense2_layer) | |
dense3_layer = Dense(512, activation="relu", kernel_regularizer=L1(regularization))(dropout4_layer) | |
dropout5_layer = Dropout(dropout)(dense3_layer) | |
dense4_layer = Dense(256, activation="relu", kernel_regularizer=L1(regularization))(dropout5_layer) | |
concat2_layer = Concatenate()([dense4_layer, prelu1_layer, attn_flatten_layer, conv1_flatten_layer]) | |
dense4_layer = Dense(resps_len, activation=end_activation, kernel_regularizer=L1(regularization))(concat2_layer) | |
model = Model(inputs=input_layer, outputs=dense4_layer) | |
X = [] | |
y = [] | |
if left_padding: | |
for key in dset: | |
tokens = tokenizer.texts_to_sequences([key,])[0] | |
X.append(np.array(([0,]*inp_len+list(tokens))[-inp_len:])) | |
y.append(dset[key]) | |
else: | |
for key in dset: | |
tokens = tokenizer.texts_to_sequences([key,])[0] | |
X.append(np.array((list(tokens)+[0,]*inp_len)[:inp_len])) | |
y.append(dset[key]) | |
X = np.array(X) | |
y = np.array(y) | |
model.compile(optimizer=RMSprop(learning_rate=learning_rate), loss="sparse_categorical_crossentropy", metrics=["accuracy",]) | |
model.fit(X, y, epochs=epochs, batch_size=8, workers=4, use_multiprocessing=True) | |
model.save(f"cache/{data_hash}") | |
tokens = tokenizer.texts_to_sequences([message,])[0] | |
prediction = model.predict(np.array([(list(tokens)+[0,]*inp_len)[:inp_len],]))[0] | |
K.clear_session() | |
return responses[np.argmax(prediction)] | |
if __name__ == "__main__": | |
iface = gr.Interface(fn=train, inputs=["text", | |
gr.inputs.Slider(0, 0.01, default=0.0001, step=1e-8, label="Regularization L1"), | |
gr.inputs.Slider(0, 0.5, default=0.1, step=1e-8, label="Dropout"), | |
gr.inputs.Slider(1e-8, 0.01, default=0.001, step=1e-8, label="Learning rate"), | |
gr.inputs.Slider(1, 64, default=32, step=1, label="Epochs"), | |
gr.inputs.Slider(1, 256, default=100, step=1, label="Embedding size"), | |
gr.inputs.Slider(1, 128, default=16, step=1, label="Input Length"), | |
gr.inputs.Slider(1, 128, default=64, step=1, label="Convolution kernel count"), | |
gr.inputs.Slider(1, 16, default=8, step=1, label="Convolution kernel size"), | |
gr.inputs.Checkbox(False, label="Use left padding"), | |
gr.inputs.Radio(['softmax', 'sigmoid', 'linear', 'softplus', 'exponential', 'log_softmax'], label="Output activation function"), | |
"text"], | |
outputs="text") | |
iface.launch() | |