|
import tensorflow as tf |
|
from tensorflow.keras.models import Sequential |
|
from tensorflow.keras.layers import LSTM, Dense, Dropout |
|
from tensorflow.keras.optimizers import Adam |
|
from tensorflow.keras.callbacks import EarlyStopping |
|
|
|
def huber_loss(y_true, y_pred, delta=1.0): |
|
""" |
|
Funci贸n de p茅rdida Huber personalizada. |
|
""" |
|
error = y_true - y_pred |
|
is_small_error = tf.abs(error) <= delta |
|
small_error_loss = 0.5 * tf.square(error) |
|
big_error_loss = delta * (tf.abs(error) - 0.5 * delta) |
|
return tf.where(is_small_error, small_error_loss, big_error_loss) |
|
|
|
def create_sequences(data, time_steps): |
|
""" |
|
Funci贸n para crear secuencias de datos de entrada y salida. |
|
""" |
|
X, y = [], [] |
|
for i in range(len(data) - time_steps): |
|
X.append(data[i:(i + time_steps), :]) |
|
y.append(data[i + time_steps, 0]) |
|
return np.array(X), np.array(y) |
|
|
|
def build_lstm_model(time_steps, input_size): |
|
""" |
|
Funci贸n para construir el modelo LSTM mejorado. |
|
""" |
|
model = Sequential([ |
|
LSTM(100, return_sequences=True, input_shape=(time_steps, input_size)), |
|
Dropout(0.2), |
|
LSTM(100), |
|
Dropout(0.2), |
|
Dense(50), |
|
Dense(1) |
|
]) |
|
|
|
optimizer = Adam(learning_rate=0.001) |
|
model.compile(optimizer=optimizer, loss=huber_loss) |
|
return model |
|
|
|
def train_model(model, X_train, y_train, X_val, y_val, epochs=200, batch_size=32): |
|
""" |
|
Funci贸n para entrenar el modelo LSTM mejorado. |
|
""" |
|
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True) |
|
|
|
history = model.fit( |
|
X_train, y_train, |
|
batch_size=batch_size, |
|
epochs=epochs, |
|
validation_data=(X_val, y_val), |
|
callbacks=[early_stopping], |
|
verbose=1 |
|
) |
|
|
|
return history |
|
|
|
def add_brownian_noise(X): |
|
brownian_noise = tf.random.normal(tf.shape(X), mean=0.0, stddev=0.1, dtype=tf.float32) |
|
return X + brownian_noise |
|
|
|
def evaluate_model(model, X_test, y_test): |
|
result = model.evaluate(X_test, y_test, verbose=0) |
|
print("P茅rdida en el conjunto de prueba:", result) |
|
|
|
def print_model_info(model): |
|
print("\nCaracter铆sticas del modelo:") |
|
print("N煤mero de capas:", len(model.layers)) |
|
model.summary() |
|
|
|
def calculate_accuracy(y_true, y_pred): |
|
tasa_acierto = mean_absolute_error(y_test, y_pred) |
|
print("Tasa de acierto (MAE):", tasa_acierto) |
|
|
|
def plot_predictions(y_test, y_pred, df): |
|
plt.figure(figsize=(10, 6)) |
|
plt.plot(y_test, label='Valor Real') |
|
plt.plot(y_pred, label='Predicci贸n', alpha=0.7) |
|
plt.title("Comparaci贸n de Predicciones vs. Valoreseales") |
|
plt.xlabel("脥ndice") |
|
plt.ylabel("Precio Escalado") |
|
plt.legend() |
|
plt.show() |
|
|
|
|
|
train_predict = model.predict(X_train) |
|
test_predict = model.predict(X_test) |
|
|
|
|
|
train_predict = scaler.inverse_transform(np.hstack([train_predict, np.zeros((train_predict.shape[0], X.shape[2]-1))]))[:, 0] |
|
test_predict = scaler.inverse_transform(np.hstack([test_predict, np.zeros((test_predict.shape[0], X.shape[2]-1))]))[:, 0] |
|
y_train_inv = scaler.inverse_transform(np.hstack([y_train.reshape(-1, 1), np.zeros((y_train.shape[0], X.shape[2]-1))]))[:, 0] |
|
y_test_inv = scaler.inverse_transform(np.hstack([y_test.reshape(-1, 1), np.zeros((y_test.shape[0], X.shape[2]-1))]))[:, 0] |
|
|
|
|
|
train_rmse = np.sqrt(np.mean((train_predict - y_train_inv)**2)) |
|
test_rmse = np.sqrt(np.mean((test_predict - y_test_inv)**2)) |
|
|
|
print(f"RMSE en entrenamiento: {train_rmse}") |
|
print(f"RMSE en prueba: {test_rmse}") |