import streamlit as st import pandas as pd import numpy as np import torch import torch.nn as nn import matplotlib.pyplot as plt from sklearn.preprocessing import MinMaxScaler # Cargar los datos de los dos CSV file1 = 'PARCIAL-AGUA-_2_.csv' file2 = 'PARCIAL-AGUA-_3_.csv' data1 = pd.read_csv(file1) data2 = pd.read_csv(file2) # Convertir la columna 'FECHA' a objetos datetime y filtrar por años data1['FECHA'] = pd.to_datetime(data1['FECHA']) data2['FECHA'] = pd.to_datetime(data2['FECHA']) filtered_data1 = data1[data1['FECHA'].dt.year >= 2007] filtered_data2 = data2[data2['FECHA'].dt.year >= 2007] combined_values = np.concatenate([filtered_data1['VALOR-LS-CF-N'].values, filtered_data2['VALOR-LS-CF-N'].values]).reshape(-1, 1) scaler = MinMaxScaler() scaled_values = scaler.fit_transform(combined_values) scaled_values1 = scaled_values[:len(filtered_data1)] scaled_values2 = scaled_values[len(filtered_data1):] def sliding_windows(data, seq_length): x, y = [], [] for i in range(len(data) - seq_length): x.append(data[i:i + seq_length]) y.append(data[i + seq_length]) return np.array(x), np.array(y) seq_length = 4 x_train, y_train = sliding_windows(scaled_values1, seq_length) x_test, y_test = sliding_windows(scaled_values2, seq_length) trainX = torch.Tensor(x_train) trainY = torch.Tensor(y_train) testX = torch.Tensor(x_test) testY = torch.Tensor(y_test) class LSTM(nn.Module): def __init__(self, input_size, hidden_size, num_layers, output_size): super(LSTM, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) self.fc = nn.Linear(hidden_size, output_size) def forward(self, x): h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) out, _ = self.lstm(x, (h0, c0)) out = self.fc(out[:, -1, :]) return out #CLASE GRU class GRU(nn.Module): def __init__(self, input_size, hidden_size, num_layers, output_size): super(GRU, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True) self.fc = nn.Linear(hidden_size, output_size) self.relu = nn.ReLU() self.dropout = nn.Dropout(0.3) # Dropout para regularización # Inicialización de los pesos de la capa lineal nn.init.xavier_normal_(self.fc.weight) def forward(self, x): # Inicialización de los estados ocultos h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device) # Propagación a través de la capa GRU out, _ = self.gru(x, h0) # Última capa GRU out = self.fc(out[:, -1, :]) return out st.title('Predicción de Series de Tiempo') st.sidebar.title('Parámetros del Modelo') model_type = st.sidebar.selectbox('Selecciona el modelo', ('LSTM', 'GRU')) num_epochs = st.sidebar.slider('Número de épocas', 100, 200) learning_rate = st.sidebar.number_input('Tasa de aprendizaje', 0.001, 0.1, 0.01, 0.001) if model_type == 'LSTM': input_size = 1 hidden_size = 50 num_layers = 2 output_size = 1 model = LSTM(input_size, hidden_size, num_layers, output_size) criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) if st.sidebar.button('Entrenar y Predecir'): for epoch in range(num_epochs): model.train() outputs = model(trainX) optimizer.zero_grad() loss = criterion(outputs, trainY) loss.backward() optimizer.step() if (epoch+1) % 100 == 0: st.write(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}') model.eval() train_predict = model(trainX) test_predict = model(testX) train_predict = scaler.inverse_transform(train_predict.detach().numpy().reshape(-1, 1)) trainY_plot = scaler.inverse_transform(trainY.numpy().reshape(-1, 1)) test_predict = scaler.inverse_transform(test_predict.detach().numpy().reshape(-1, 1)) testY_plot = scaler.inverse_transform(testY.numpy().reshape(-1, 1)) train_data = pd.DataFrame({ 'Fecha': filtered_data1['FECHA'].values[seq_length:seq_length+len(trainY)], 'Datos de entrenamiento': trainY_plot.ravel(), 'Predicciones de entrenamiento': train_predict.ravel() }) test_data = pd.DataFrame({ 'Fecha': filtered_data2['FECHA'].values[seq_length:seq_length+len(testY)], 'Datos de prueba': testY_plot.ravel(), 'Predicciones de prueba': test_predict.ravel() }) # Concatenar los datos para tener una sola tabla combined_data = pd.concat([train_data, test_data]) # Ajustar el índice combined_data.set_index('Fecha', inplace=True) # Mostrar la gráfica en Streamlit st.line_chart(combined_data) # fig, ax = plt.subplots(figsize=(12, 6)) # ax.plot(filtered_data1['FECHA'].values[seq_length:seq_length+len(trainY)], trainY_plot, label='Datos de entrenamiento') # ax.plot(filtered_data1['FECHA'].values[seq_length:seq_length+len(trainY)], train_predict, label='Predicciones de entrenamiento') # ax.plot(filtered_data2['FECHA'].values[seq_length:seq_length+len(testY)], testY_plot, label='Datos de prueba') # ax.plot(filtered_data2['FECHA'].values[seq_length:seq_length+len(testY)], test_predict, label='Predicciones de prueba') # ax.set_xlabel('Fecha') # ax.set_ylabel('VALOR-LS-CF-N') # ax.set_title('Predicciones con LSTM') # ax.legend() # ax.grid(True) # st.pyplot(fig) else : input_size = 1 hidden_size = 50 num_layers = 2 output_size = 1 model = GRU(input_size, hidden_size, num_layers, output_size) criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) if st.sidebar.button('Entrenar y Predecir'): for epoch in range(num_epochs): model.train() outputs = model(trainX) optimizer.zero_grad() loss = criterion(outputs, trainY) loss.backward() optimizer.step() if (epoch+1) % 100 == 0: st.write(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}') model.eval() train_predict = model(trainX) test_predict = model(testX) train_predict = scaler.inverse_transform(train_predict.detach().numpy().reshape(-1, 1)) trainY_plot = scaler.inverse_transform(trainY.numpy().reshape(-1, 1)) test_predict = scaler.inverse_transform(test_predict.detach().numpy().reshape(-1, 1)) testY_plot = scaler.inverse_transform(testY.numpy().reshape(-1, 1)) train_data = pd.DataFrame({ 'Fecha': filtered_data1['FECHA'].values[seq_length:seq_length+len(trainY)], 'Datos de entrenamiento': trainY_plot.ravel(), 'Predicciones de entrenamiento': train_predict.ravel() }) test_data = pd.DataFrame({ 'Fecha': filtered_data2['FECHA'].values[seq_length:seq_length+len(testY)], 'Datos de prueba': testY_plot.ravel(), 'Predicciones de prueba': test_predict.ravel() }) # Concatenar los datos para tener una sola tabla combined_data = pd.concat([train_data, test_data]) # Ajustar el índice combined_data.set_index('Fecha', inplace=True) # Mostrar la gráfica en Streamlit st.line_chart(combined_data)