neural-chatbot-constructor / chatbot_constructor.py
ierhon's picture
Set the maximum epoch amount to 128
6109b5b verified
raw history blame
No virus
7.31 kB
import gradio as gr
import numpy as np
from keras.models import Model
from keras.saving import load_model
from keras.layers import *
from keras.regularizers import L1
from keras.constraints import Constraint
from tensorflow.keras.optimizers import RMSprop
from keras.preprocessing.text import Tokenizer
import keras.backend as K
import os
import hashlib
import keras
os.mkdir("cache")
def todset(text: str):
lines = [x.rstrip("\n").lower().split("→") for x in text.split("\n")]
lines = [(x[0].replace("\\n", "\n"), x[1].replace("\\n", "\n")) for x in lines]
responses = []
for i in lines:
if i[1] not in responses:
responses.append(i[1])
dset = {}
for sample in lines:
dset[sample[0]] = responses.index(sample[1])
return (dset, responses)
def hash_str(data: str):
return hashlib.md5(data.encode('utf-8')).hexdigest()
def train(message: str = "", regularization: float = 0.0001, dropout: float = 0.1, learning_rate: float = 0.001, epochs: int = 16, emb_size: int = 100, input_len: int = 16, kernels_count: int = 64, kernel_size: int = 4, left_padding: bool = False, end_activation: str = "softmax", data: str = ""):
data_hash = None
if "→" not in data or "\n" not in data:
if data in os.listdir("cache"): # data = filename
data_hash = data # set the hash to the file name
else:
return "Data example:\nquestion→answer\nquestion→answer\netc."
dset, responses = todset(data)
resps_len = len(responses)
tokenizer = Tokenizer()
tokenizer.fit_on_texts(list(dset.keys()))
vocab_size = len(tokenizer.word_index) + 1
inp_len = input_len
if data_hash is None:
if end_activation is not None:
data_hash = hash_str(data)+"_"+str(regularization)+"_"+str(dropout)+"_"+str(learning_rate)+"_"+str(epochs)+"_"+str(emb_size)+"_"+str(inp_len)+"_"+str(kernels_count)+"_"+str(kernel_size)+"_"+str(left_padding)+"_"+end_activation+".keras"
else:
data_hash = hash_str(data)+"_"+str(regularization)+"_"+str(dropout)+"_"+str(learning_rate)+"_"+str(epochs)+"_"+str(emb_size)+"_"+str(inp_len)+"_"+str(kernels_count)+"_"+str(kernel_size)+"_"+str(left_padding)+".keras"
if message == "!getmodelhash":
return data_hash
else:
inp_len = int(data_hash.split("_")[-3])
if data_hash in os.listdir("cache"):
model = load_model("cache/"+data_hash)
else:
input_layer = Input(shape=(inp_len,))
emb_layer = Embedding(input_dim=vocab_size, output_dim=emb_size, input_length=inp_len)(input_layer)
dropout1_layer = Dropout(dropout)(emb_layer)
attn_layer = MultiHeadAttention(num_heads=4, key_dim=128)(dropout1_layer, dropout1_layer, dropout1_layer)
noise_layer = GaussianNoise(0.1)(attn_layer)
conv1_layer = Conv1D(kernels_count, kernel_size, padding='same', activation='relu', strides=1, input_shape=(64, 128), kernel_regularizer=L1(regularization))(noise_layer)
conv2_layer = Conv1D(16, 4, padding='same', activation='relu', strides=1, kernel_regularizer=L1(regularization))(conv1_layer)
conv3_layer = Conv1D(8, 2, padding='same', activation='relu', strides=1, kernel_regularizer=L1(regularization))(conv2_layer)
flatten_layer = Flatten()(conv3_layer)
attn_flatten_layer = Flatten()(attn_layer)
conv1_flatten_layer = Flatten()(conv1_layer)
conv2_flatten_layer = Flatten()(conv2_layer)
conv3_flatten_layer = Flatten()(conv3_layer)
concat1_layer = Concatenate()([flatten_layer, attn_flatten_layer, conv1_flatten_layer, conv2_flatten_layer, conv3_flatten_layer])
dropout2_layer = Dropout(dropout)(concat1_layer)
dense1_layer = Dense(1024, activation="linear", kernel_regularizer=L1(regularization))(dropout2_layer)
prelu1_layer = PReLU()(dense1_layer)
dropout3_layer = Dropout(dropout)(prelu1_layer)
dense2_layer = Dense(512, activation="relu", kernel_regularizer=L1(regularization))(dropout3_layer)
dropout4_layer = Dropout(dropout)(dense2_layer)
dense3_layer = Dense(512, activation="relu", kernel_regularizer=L1(regularization))(dropout4_layer)
dropout5_layer = Dropout(dropout)(dense3_layer)
dense4_layer = Dense(256, activation="relu", kernel_regularizer=L1(regularization))(dropout5_layer)
concat2_layer = Concatenate()([dense4_layer, prelu1_layer, attn_flatten_layer, conv1_flatten_layer])
if end_activation is not None:
dense4_layer = Dense(resps_len, activation=end_activation, kernel_regularizer=L1(regularization))(concat2_layer)
else:
dense4_layer = Dense(resps_len, activation="softmax", kernel_regularizer=L1(regularization))(concat2_layer)
model = Model(inputs=input_layer, outputs=dense4_layer)
X = []
y = []
if left_padding:
for key in dset:
tokens = tokenizer.texts_to_sequences([key,])[0]
X.append(np.array(([0,]*inp_len+list(tokens))[-inp_len:]))
y.append(dset[key])
else:
for key in dset:
tokens = tokenizer.texts_to_sequences([key,])[0]
X.append(np.array((list(tokens)+[0,]*inp_len)[:inp_len]))
y.append(dset[key])
X = np.array(X)
y = np.array(y)
model.compile(optimizer=RMSprop(learning_rate=learning_rate), loss="sparse_categorical_crossentropy", metrics=["accuracy",])
model.fit(X, y, epochs=epochs, batch_size=8, workers=4, use_multiprocessing=True)
model.save(f"cache/{data_hash}")
tokens = tokenizer.texts_to_sequences([message,])[0]
prediction = model.predict(np.array([(list(tokens)+[0,]*inp_len)[:inp_len],]))[0]
K.clear_session()
return responses[np.argmax(prediction)]
if __name__ == "__main__":
iface = gr.Interface(fn=train, inputs=["text",
gr.components.Slider(0, 0.01, value=0.0001, step=1e-8, label="Regularization L1"),
gr.components.Slider(0, 0.5, value=0.1, step=1e-8, label="Dropout"),
gr.components.Slider(1e-8, 0.01, value=0.001, step=1e-8, label="Learning rate"),
gr.components.Slider(1, 128, value=16, step=1, label="Epochs"),
gr.components.Slider(1, 256, value=88, step=1, label="Embedding size"),
gr.components.Slider(1, 128, value=16, step=1, label="Input Length"),
gr.components.Slider(1, 128, value=64, step=1, label="Convolution kernel count"),
gr.components.Slider(1, 16, value=2, step=1, label="Convolution kernel size"),
gr.components.Checkbox(False, label="Use left padding"),
gr.components.Radio(['softmax', 'sigmoid', 'linear', 'softplus', 'exponential', 'log_softmax'], label="Output activation function"),
"text"],
outputs="text")
iface.launch()