import numpy as np import pandas as pd from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import train_test_split from tensorflow.keras.models import Sequential, Model from tensorflow.keras.layers import Dense, Dropout, Input, LayerNormalization, MultiHeadAttention, GlobalAveragePooling1D, Embedding, Layer, LSTM, Bidirectional, Conv1D from tensorflow.keras.optimizers import Adam from tensorflow.keras.utils import to_categorical from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau import tensorflow as tf import optuna import gradio as gr # Combined data set (unchanged) data = [ "Double big 12", "Single big 11", "Single big 13", "Double big 12", "Double small 10", # ... (rest of the data) ] # Encoding the labels encoder = LabelEncoder() encoded_data = encoder.fit_transform(data) # Create sequences sequence_length = 10 X, y = [], [] for i in range(len(encoded_data) - sequence_length): X.append(encoded_data[i:i + sequence_length]) y.append(encoded_data[i + sequence_length]) X = np.array(X) y = np.array(y) y = to_categorical(y, num_classes=len(encoder.classes_)) # Reshape X for Transformer X = X.reshape((X.shape[0], X.shape[1])) print(f'Input shape: {X.shape}') print(f'Output shape: {y.shape}') class TransformerBlock(Layer): def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1): super(TransformerBlock, self).__init__() self.att = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim) self.ffn = Sequential([ Dense(ff_dim, activation="relu"), Dense(embed_dim), ]) self.layernorm1 = LayerNormalization(epsilon=1e-6) self.layernorm2 = LayerNormalization(epsilon=1e-6) self.dropout1 = Dropout(rate) self.dropout2 = Dropout(rate) def call(self, inputs, training=False): attn_output = self.att(inputs, inputs) attn_output = self.dropout1(attn_output, training=training) out1 = self.layernorm1(inputs + attn_output) ffn_output = self.ffn(out1) ffn_output = self.dropout2(ffn_output, training=training) return self.layernorm2(out1 + ffn_output) def build_model(trial): embed_dim = trial.suggest_int('embed_dim', 128, 512, step=64) num_heads = trial.suggest_int('num_heads', 4, 16, step=4) ff_dim = trial.suggest_int('ff_dim', 256, 1024, step=128) rate = trial.suggest_float('dropout', 0.1, 0.5, step=0.1) num_transformer_blocks = trial.suggest_int('num_transformer_blocks', 2, 6) lstm_units = trial.suggest_int('lstm_units', 64, 256, step=64) inputs = Input(shape=(sequence_length,)) embedding_layer = Embedding(input_dim=len(encoder.classes_), output_dim=embed_dim) x = embedding_layer(inputs) for _ in range(num_transformer_blocks): transformer_block = TransformerBlock(embed_dim, num_heads, ff_dim, rate) x = transformer_block(x) x = Conv1D(256, 3, activation='relu')(x) x = Bidirectional(LSTM(lstm_units, return_sequences=True))(x) x = GlobalAveragePooling1D()(x) x = Dropout(rate)(x) x = Dense(ff_dim, activation="relu")(x) x = Dropout(rate)(x) outputs = Dense(len(encoder.classes_), activation="softmax")(x) model = Model(inputs=inputs, outputs=outputs) optimizer = Adam(learning_rate=trial.suggest_float('lr', 1e-5, 1e-2, log=True)) model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy']) return model # Split data into train, validation, and test sets X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42) def objective(trial): model = build_model(trial) early_stopping = EarlyStopping(monitor='val_loss', patience=15, restore_best_weights=True) reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, min_lr=1e-6) history = model.fit( X_train, y_train, epochs=150, batch_size=64, validation_data=(X_val, y_val), callbacks=[early_stopping, reduce_lr], verbose=0 ) val_accuracy = max(history.history['val_accuracy']) return val_accuracy study = optuna.create_study(direction='maximize') study.optimize(objective, n_trials=100) best_trial = study.best_trial print(f'Best hyperparameters: {best_trial.params}') best_model = build_model(best_trial) early_stopping = EarlyStopping(monitor='val_loss', patience=25, restore_best_weights=True) reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=15, min_lr=1e-6) history = best_model.fit( X_train, y_train, epochs=1000, batch_size=64, validation_data=(X_val, y_val), callbacks=[early_stopping, reduce_lr], verbose=2 ) # Evaluate on test set test_loss, test_accuracy = best_model.evaluate(X_test, y_test, verbose=0) print(f'Test accuracy: {test_accuracy:.4f}') def predict_next(model, data, sequence_length, encoder): last_sequence = data[-sequence_length:] last_sequence_encoded = np.array(encoder.transform(last_sequence)).reshape((1, sequence_length)) prediction = model.predict(last_sequence_encoded) predicted_label = encoder.inverse_transform([np.argmax(prediction)]) return predicted_label[0] def update_data(data, new_outcome): data.append(new_outcome) if len(data) > sequence_length: data.pop(0) return data def retrain_model(model, X, y, epochs=20): early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=5, min_lr=1e-6) X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42) model.fit( X_train, y_train, epochs=epochs, batch_size=64, validation_data=(X_val, y_val), callbacks=[early_stopping, reduce_lr], verbose=0 ) return model # Interactive component def gradio_predict(outcome): global data, X, y, best_model if outcome not in encoder.classes_: return "Invalid outcome. Please try again." data = update_data(data, outcome) if len(data) < sequence_length: return "Not enough data to make a prediction." predicted_next = predict_next(best_model, data, sequence_length, encoder) return f'Predicted next outcome: {predicted_next}' def gradio_update(actual_next): global data, X, y, best_model if actual_next not in encoder.classes_: return "Invalid outcome. Please try again." data = update_data(data, actual_next) if len(data) < sequence_length: return "Not enough data to update the model." encoded_actual_next = encoder.transform([actual_next])[0] new_X = np.append(X, [X[-sequence_length:]], axis=0) new_y = np.append(y, to_categorical(encoded_actual_next, num_classes=len(encoder.classes_)), axis=0) best_model = retrain_model(best_model, new_X, new_y, epochs=20) return "Model updated with new data." # Gradio interface with gr.Blocks() as demo: gr.Markdown("## Advanced Outcome Prediction with Enhanced Transformer") with gr.Row(): outcome_input = gr.Textbox(label="Current Outcome") predict_button = gr.Button("Predict Next") predicted_output = gr.Textbox(label="Predicted Next Outcome") with gr.Row(): actual_input = gr.Textbox(label="Actual Next Outcome") update_button = gr.Button("Update Model") update_output = gr.Textbox(label="Update Status") predict_button.click(gradio_predict, inputs=outcome_input, outputs=predicted_output) update_button.click(gradio_update, inputs=actual_input, outputs=update_output) demo.launch() # Save the model for future use best_model.save("enhanced_transformer_model.h5") print("Model saved as enhanced_transformer_model.h5") # Loading the model for later use loaded_model = tf.keras.models.load_model("enhanced_transformer_model.h5", custom_objects={'TransformerBlock': TransformerBlock}) # Function to test the loaded model def test_loaded_model(test_outcome): global data if test_outcome not in encoder.classes_: return "Invalid outcome. Test prediction aborted." data = update_data(data, test_outcome) if len(data) >= sequence_length: predicted_next = predict_next(loaded_model, data, sequence_length, encoder) return f'Predicted next outcome with loaded model: {predicted_next}' else: return "Not enough data to make a prediction." # Adding testing functionality to Gradio interface with gr.Blocks() as demo: gr.Markdown("## Advanced Outcome Prediction with Enhanced Transformer") with gr.Row(): outcome_input = gr.Textbox(label="Current Outcome") predict_button = gr.Button("Predict Next") predicted_output = gr.Textbox(label="Predicted Next Outcome") with gr.Row(): actual_input = gr.Textbox(label="Actual Next Outcome") update_button = gr.Button("Update Model") update_output = gr.Textbox(label="Update Status") with gr.Row(): test_input = gr.Textbox(label="Test Outcome for Loaded Model") test_button = gr.Button("Test Loaded Model") test_output = gr.Textbox(label="Loaded Model Prediction") predict_button.click(gradio_predict, inputs=outcome_input, outputs=predicted_output) update_button.click(gradio_update, inputs=actual_input, outputs=update_output) test_button.click(test_loaded_model, inputs=test_input, outputs=test_output) demo.launch()