|
import numpy as np |
|
import tensorflow as tf |
|
from tensorflow.keras.utils import to_categorical |
|
from tensorflow.keras.layers import Layer, Dense, Dropout, LayerNormalization, MultiHeadAttention |
|
from tensorflow.keras.models import Sequential |
|
from sklearn.preprocessing import LabelEncoder |
|
import gradio as gr |
|
import os |
|
|
|
|
|
class EnhancedTransformerBlock(Layer): |
|
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs): |
|
super(EnhancedTransformerBlock, self).__init__(**kwargs) |
|
self.embed_dim = embed_dim |
|
self.num_heads = num_heads |
|
self.ff_dim = ff_dim |
|
self.rate = rate |
|
|
|
self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim) |
|
self.ffn = Sequential([ |
|
Dense(ff_dim, activation="relu"), |
|
Dense(embed_dim), |
|
]) |
|
self.layernorm1 = LayerNormalization(epsilon=1e-6) |
|
self.layernorm2 = LayerNormalization(epsilon=1e-6) |
|
self.dropout1 = Dropout(rate) |
|
self.dropout2 = Dropout(rate) |
|
self.self_attention = MultiHeadAttention(num_heads=1, key_dim=embed_dim) |
|
|
|
def call(self, inputs, training=False): |
|
attn_output = self.att(inputs, inputs) |
|
attn_output = self.dropout1(attn_output, training=training) |
|
out1 = self.layernorm1(inputs + attn_output) |
|
ffn_output = self.ffn(out1) |
|
ffn_output = self.dropout2(ffn_output, training=training) |
|
out2 = self.layernorm2(out1 + ffn_output) |
|
self_attn_output = self.self_attention(out2, out2) |
|
return self.layernorm2(out2 + self_attn_output) |
|
|
|
def get_config(self): |
|
config = super().get_config() |
|
config.update({ |
|
"embed_dim": self.embed_dim, |
|
"num_heads": self.num_heads, |
|
"ff_dim": self.ff_dim, |
|
"rate": self.rate, |
|
}) |
|
return config |
|
|
|
|
|
sequence_length = 10 |
|
data = ["Single big", "Double big", "Double big", "Single small", "Single big", |
|
"Double small", "Single big", "Double small", "Single small", "Single small", |
|
"Single big", "Single small", "triple", "single big", "double small", "double small", "double small", "double small"] |
|
encoder = LabelEncoder() |
|
|
|
|
|
try: |
|
model = tf.keras.models.load_model("enhanced_adaptive_model.keras", custom_objects={'EnhancedTransformerBlock': EnhancedTransformerBlock}) |
|
encoder.classes_ = np.load('label_encoder_classes.npy', allow_pickle=True) |
|
except FileNotFoundError: |
|
print("Model or encoder classes file not found. Using a dummy model and encoder for demonstration.") |
|
|
|
model = tf.keras.Sequential([tf.keras.layers.Dense(10, input_shape=(sequence_length,), activation='softmax')]) |
|
encoder.classes_ = np.array(['Single small', 'Single big', 'Double small', 'Double big', 'Triple']) |
|
|
|
def update_data(data, new_outcome): |
|
data.append(new_outcome) |
|
if len(data) > sequence_length: |
|
data.pop(0) |
|
return data |
|
|
|
def enhanced_predict_next(model, data, sequence_length, encoder): |
|
last_sequence = data[-sequence_length:] |
|
last_sequence = np.array(encoder.transform(last_sequence)).reshape((1, sequence_length)) |
|
|
|
|
|
predictions = [] |
|
for _ in range(100): |
|
prediction = model(last_sequence, training=True) |
|
predictions.append(prediction) |
|
|
|
mean_prediction = np.mean(predictions, axis=0) |
|
std_prediction = np.std(predictions, axis=0) |
|
|
|
predicted_label = encoder.inverse_transform([np.argmax(mean_prediction)]) |
|
uncertainty = np.mean(std_prediction) |
|
|
|
return predicted_label[0], uncertainty |
|
|
|
def gradio_predict(outcome): |
|
global data |
|
|
|
if outcome not in encoder.classes_: |
|
return "Invalid outcome. Please try again." |
|
|
|
data = update_data(data, outcome) |
|
|
|
if len(data) < sequence_length: |
|
return f"Not enough data to make a prediction. Please enter {sequence_length - len(data)} more outcomes." |
|
|
|
predicted_next, uncertainty = enhanced_predict_next(model, data, sequence_length, encoder) |
|
return f'Predicted next outcome: {predicted_next} (Uncertainty: {uncertainty:.4f})' |
|
|
|
def gradio_update(actual_next): |
|
global data, model |
|
|
|
if actual_next not in encoder.classes_: |
|
return "Invalid outcome. Please try again." |
|
|
|
data = update_data(data, actual_next) |
|
|
|
if len(data) < sequence_length: |
|
return f"Not enough data to update the model. Please enter {sequence_length - len(data)} more outcomes." |
|
|
|
encoded_actual_next = encoder.transform([actual_next])[0] |
|
new_X = np.array(encoder.transform(data[-sequence_length:])).reshape((1, sequence_length)) |
|
new_y = to_categorical(encoded_actual_next, num_classes=len(encoder.classes_)) |
|
|
|
model.fit(new_X, new_y, epochs=1, verbose=0) |
|
return "Model updated with new data." |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("## Enhanced Outcome Prediction Model") |
|
gr.Markdown(f"Enter a sequence of {sequence_length} outcomes to get started.") |
|
gr.Markdown(f"Valid outcomes: {', '.join(encoder.classes_)}") |
|
|
|
with gr.Row(): |
|
outcome_input = gr.Textbox(label="Enter an outcome") |
|
predict_button = gr.Button("Predict Next") |
|
predicted_output = gr.Textbox(label="Prediction") |
|
|
|
with gr.Row(): |
|
actual_input = gr.Textbox(label="Enter actual next outcome") |
|
update_button = gr.Button("Update Model") |
|
update_output = gr.Textbox(label="Update Status") |
|
|
|
predict_button.click(gradio_predict, inputs=outcome_input, outputs=predicted_output) |
|
update_button.click(gradio_update, inputs=actual_input, outputs=update_output) |
|
|
|
demo.launch() |