import gradio as gr import numpy as np from keras.models import Model from keras.saving import load_model from keras.layers import * from tensorflow.keras.optimizers import RMSprop from keras.preprocessing.text import Tokenizer import os import hashlib import keras os.mkdir("cache") def todset(text: str): lines = [x.rstrip("\n").lower().split("→") for x in text.split("\n")] lines = [(x[0].replace("\\n", "\n"), x[1].replace("\\n", "\n")) for x in lines] responses = [] for i in lines: if i[1] not in responses: responses.append(i[1]) dset = {} for sample in lines: dset[sample[0]] = responses.index(sample[1]) return (dset, responses) def hash_str(data: str): return hashlib.md5(data.encode('utf-8')).hexdigest() def train(message: str = "", epochs: int = 16, learning_rate: float = 0.001, emb_size: int = 128, input_len: int = 16, kernels_count: int = 8, kernel_size: int = 8, data: str = ""): data_hash = None if "→" not in data or "\n" not in data: if data in os.listdir("cache"): data_hash = data else: return "Dataset example:\nquestion→answer\nquestion→answer\netc." dset, responses = todset(data) resps_len = len(responses) tokenizer = Tokenizer() tokenizer.fit_on_texts(list(dset.keys())) vocab_size = len(tokenizer.word_index) + 1 inp_len = input_len if data_hash is None: data_hash = hash_str(data)+"_"+str(epochs)+"_"+str(learning_rate)+"_"+str(emb_size)+"_"+str(inp_len)+"_"+str(kernels_count)+"_"+str(kernel_size)+".keras" elif message == "!getmodelhash": return data_hash else: inp_len = int(data_hash.split("_")[-3]) if data_hash in os.listdir("cache"): model = load_model("cache/"+data_hash) else: input_layer = Input(shape=(inp_len,)) emb_layer = Embedding(input_dim=vocab_size, output_dim=emb_size, input_length=inp_len)(input_layer) attn_layer = MultiHeadAttention(num_heads=4, key_dim=128)(emb_layer, emb_layer, emb_layer) noise_layer = GaussianNoise(0.1)(attn_layer) conv1_layer = Conv1D(kernels_count, kernel_size, padding='same', activation='relu', strides=1, input_shape=(64, 128))(noise_layer) conv2_layer = Conv1D(16, 4, padding='same', activation='relu', strides=1)(conv1_layer) conv3_layer = Conv1D(8, 2, padding='same', activation='relu', strides=1)(conv2_layer) flatten_layer = Flatten()(conv3_layer) attn_flatten_layer = Flatten()(attn_layer) conv1_flatten_layer = Flatten()(conv1_layer) conv2_flatten_layer = Flatten()(conv2_layer) conv3_flatten_layer = Flatten()(conv3_layer) concat1_layer = Concatenate()([flatten_layer, attn_flatten_layer, conv1_flatten_layer, conv2_flatten_layer, conv3_flatten_layer]) dense1_layer = Dense(512, activation="linear")(concat1_layer) prelu1_layer = PReLU()(dense1_layer) dropout_layer = Dropout(0.3)(prelu1_layer) dense2_layer = Dense(256, activation="tanh")(dropout_layer) dense3_layer = Dense(256, activation="relu")(dense2_layer) dense4_layer = Dense(100, activation="tanh")(dense3_layer) concat2_layer = Concatenate()([dense4_layer, prelu1_layer, attn_flatten_layer, conv1_flatten_layer]) dense4_layer = Dense(resps_len, activation="softmax")(concat2_layer) model = Model(inputs=input_layer, outputs=dense4_layer) X = [] y = [] for key in dset: tokens = tokenizer.texts_to_sequences([key,])[0] X.append(np.array((list(tokens)+[0,]*inp_len)[:inp_len])) y.append(dset[key]) X = np.array(X) y = np.array(y) model.compile(optimizer=RMSprop(learning_rate=learning_rate), loss="sparse_categorical_crossentropy", metrics=["accuracy",]) model.fit(X, y, epochs=16, batch_size=8, workers=4, use_multiprocessing=True) model.save(f"cache/{data_hash}") tokens = tokenizer.texts_to_sequences([message,])[0] prediction = model.predict(np.array([(list(tokens)+[0,]*inp_len)[:inp_len],]))[0] keras.backend.clear_session() return responses[np.argmax(prediction)] if __name__ == "__main__": iface = gr.Interface(fn=train, inputs=["text", gr.inputs.Slider(1, 64, default=32, step=1, label="Epochs"), gr.inputs.Slider(0.00000001, 0.1, default=0.001, step=0.00000001, label="Learning rate"), gr.inputs.Slider(1, 256, default=100, step=1, label="Embedding size"), gr.inputs.Slider(1, 128, default=16, step=1, label="Input Length"), gr.inputs.Slider(1, 128, default=64, step=1, label="Convolution kernel count"), gr.inputs.Slider(1, 16, default=8, step=1, label="Convolution kernel size"), "text"], outputs="text") iface.launch()