neural-chatbot-constructor / chatbot_constructor.py
ierhon's picture
Set the maximum epoch amount to 128
6109b5b verified
import gradio as gr
import numpy as np
from keras.models import Model
from keras.saving import load_model
from keras.layers import *
from keras.regularizers import L1
from keras.constraints import Constraint
from tensorflow.keras.optimizers import RMSprop
from keras.preprocessing.text import Tokenizer
import keras.backend as K
import os
import hashlib
import keras
os.mkdir("cache")
def todset(text: str):
lines = [x.rstrip("\n").lower().split("→") for x in text.split("\n")]
lines = [(x[0].replace("\\n", "\n"), x[1].replace("\\n", "\n")) for x in lines]
responses = []
for i in lines:
if i[1] not in responses:
responses.append(i[1])
dset = {}
for sample in lines:
dset[sample[0]] = responses.index(sample[1])
return (dset, responses)
def hash_str(data: str):
return hashlib.md5(data.encode('utf-8')).hexdigest()
def train(message: str = "", regularization: float = 0.0001, dropout: float = 0.1, learning_rate: float = 0.001, epochs: int = 16, emb_size: int = 100, input_len: int = 16, kernels_count: int = 64, kernel_size: int = 4, left_padding: bool = False, end_activation: str = "softmax", data: str = ""):
data_hash = None
if "→" not in data or "\n" not in data:
if data in os.listdir("cache"): # data = filename
data_hash = data # set the hash to the file name
else:
return "Data example:\nquestion→answer\nquestion→answer\netc."
dset, responses = todset(data)
resps_len = len(responses)
tokenizer = Tokenizer()
tokenizer.fit_on_texts(list(dset.keys()))
vocab_size = len(tokenizer.word_index) + 1
inp_len = input_len
if data_hash is None:
if end_activation is not None:
data_hash = hash_str(data)+"_"+str(regularization)+"_"+str(dropout)+"_"+str(learning_rate)+"_"+str(epochs)+"_"+str(emb_size)+"_"+str(inp_len)+"_"+str(kernels_count)+"_"+str(kernel_size)+"_"+str(left_padding)+"_"+end_activation+".keras"
else:
data_hash = hash_str(data)+"_"+str(regularization)+"_"+str(dropout)+"_"+str(learning_rate)+"_"+str(epochs)+"_"+str(emb_size)+"_"+str(inp_len)+"_"+str(kernels_count)+"_"+str(kernel_size)+"_"+str(left_padding)+".keras"
if message == "!getmodelhash":
return data_hash
else:
inp_len = int(data_hash.split("_")[-3])
if data_hash in os.listdir("cache"):
model = load_model("cache/"+data_hash)
else:
input_layer = Input(shape=(inp_len,))
emb_layer = Embedding(input_dim=vocab_size, output_dim=emb_size, input_length=inp_len)(input_layer)
dropout1_layer = Dropout(dropout)(emb_layer)
attn_layer = MultiHeadAttention(num_heads=4, key_dim=128)(dropout1_layer, dropout1_layer, dropout1_layer)
noise_layer = GaussianNoise(0.1)(attn_layer)
conv1_layer = Conv1D(kernels_count, kernel_size, padding='same', activation='relu', strides=1, input_shape=(64, 128), kernel_regularizer=L1(regularization))(noise_layer)
conv2_layer = Conv1D(16, 4, padding='same', activation='relu', strides=1, kernel_regularizer=L1(regularization))(conv1_layer)
conv3_layer = Conv1D(8, 2, padding='same', activation='relu', strides=1, kernel_regularizer=L1(regularization))(conv2_layer)
flatten_layer = Flatten()(conv3_layer)
attn_flatten_layer = Flatten()(attn_layer)
conv1_flatten_layer = Flatten()(conv1_layer)
conv2_flatten_layer = Flatten()(conv2_layer)
conv3_flatten_layer = Flatten()(conv3_layer)
concat1_layer = Concatenate()([flatten_layer, attn_flatten_layer, conv1_flatten_layer, conv2_flatten_layer, conv3_flatten_layer])
dropout2_layer = Dropout(dropout)(concat1_layer)
dense1_layer = Dense(1024, activation="linear", kernel_regularizer=L1(regularization))(dropout2_layer)
prelu1_layer = PReLU()(dense1_layer)
dropout3_layer = Dropout(dropout)(prelu1_layer)
dense2_layer = Dense(512, activation="relu", kernel_regularizer=L1(regularization))(dropout3_layer)
dropout4_layer = Dropout(dropout)(dense2_layer)
dense3_layer = Dense(512, activation="relu", kernel_regularizer=L1(regularization))(dropout4_layer)
dropout5_layer = Dropout(dropout)(dense3_layer)
dense4_layer = Dense(256, activation="relu", kernel_regularizer=L1(regularization))(dropout5_layer)
concat2_layer = Concatenate()([dense4_layer, prelu1_layer, attn_flatten_layer, conv1_flatten_layer])
if end_activation is not None:
dense4_layer = Dense(resps_len, activation=end_activation, kernel_regularizer=L1(regularization))(concat2_layer)
else:
dense4_layer = Dense(resps_len, activation="softmax", kernel_regularizer=L1(regularization))(concat2_layer)
model = Model(inputs=input_layer, outputs=dense4_layer)
X = []
y = []
if left_padding:
for key in dset:
tokens = tokenizer.texts_to_sequences([key,])[0]
X.append(np.array(([0,]*inp_len+list(tokens))[-inp_len:]))
y.append(dset[key])
else:
for key in dset:
tokens = tokenizer.texts_to_sequences([key,])[0]
X.append(np.array((list(tokens)+[0,]*inp_len)[:inp_len]))
y.append(dset[key])
X = np.array(X)
y = np.array(y)
model.compile(optimizer=RMSprop(learning_rate=learning_rate), loss="sparse_categorical_crossentropy", metrics=["accuracy",])
model.fit(X, y, epochs=epochs, batch_size=8, workers=4, use_multiprocessing=True)
model.save(f"cache/{data_hash}")
tokens = tokenizer.texts_to_sequences([message,])[0]
prediction = model.predict(np.array([(list(tokens)+[0,]*inp_len)[:inp_len],]))[0]
K.clear_session()
return responses[np.argmax(prediction)]
if __name__ == "__main__":
iface = gr.Interface(fn=train, inputs=["text",
gr.components.Slider(0, 0.01, value=0.0001, step=1e-8, label="Regularization L1"),
gr.components.Slider(0, 0.5, value=0.1, step=1e-8, label="Dropout"),
gr.components.Slider(1e-8, 0.01, value=0.001, step=1e-8, label="Learning rate"),
gr.components.Slider(1, 128, value=16, step=1, label="Epochs"),
gr.components.Slider(1, 256, value=88, step=1, label="Embedding size"),
gr.components.Slider(1, 128, value=16, step=1, label="Input Length"),
gr.components.Slider(1, 128, value=64, step=1, label="Convolution kernel count"),
gr.components.Slider(1, 16, value=2, step=1, label="Convolution kernel size"),
gr.components.Checkbox(False, label="Use left padding"),
gr.components.Radio(['softmax', 'sigmoid', 'linear', 'softplus', 'exponential', 'log_softmax'], label="Output activation function"),
"text"],
outputs="text")
iface.launch()