ec98's picture
Update app.py
85727f6 verified
raw
history blame
No virus
7.82 kB
import streamlit as st
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
# Cargar los datos de los dos CSV
file1 = 'PARCIAL-AGUA-_2_.csv'
file2 = 'PARCIAL-AGUA-_3_.csv'
data1 = pd.read_csv(file1)
data2 = pd.read_csv(file2)
# Convertir la columna 'FECHA' a objetos datetime y filtrar por años
data1['FECHA'] = pd.to_datetime(data1['FECHA'])
data2['FECHA'] = pd.to_datetime(data2['FECHA'])
filtered_data1 = data1[data1['FECHA'].dt.year >= 2007]
filtered_data2 = data2[data2['FECHA'].dt.year >= 2007]
combined_values = np.concatenate([filtered_data1['VALOR-LS-CF-N'].values, filtered_data2['VALOR-LS-CF-N'].values]).reshape(-1, 1)
scaler = MinMaxScaler()
scaled_values = scaler.fit_transform(combined_values)
scaled_values1 = scaled_values[:len(filtered_data1)]
scaled_values2 = scaled_values[len(filtered_data1):]
def sliding_windows(data, seq_length):
x, y = [], []
for i in range(len(data) - seq_length):
x.append(data[i:i + seq_length])
y.append(data[i + seq_length])
return np.array(x), np.array(y)
seq_length = 4
x_train, y_train = sliding_windows(scaled_values1, seq_length)
x_test, y_test = sliding_windows(scaled_values2, seq_length)
trainX = torch.Tensor(x_train)
trainY = torch.Tensor(y_train)
testX = torch.Tensor(x_test)
testY = torch.Tensor(y_test)
class LSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size):
super(LSTM, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size)
out, _ = self.lstm(x, (h0, c0))
out = self.fc(out[:, -1, :])
return out
#CLASE GRU
class GRU(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, output_size):
super(GRU, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.3) # Dropout para regularización
# Inicialización de los pesos de la capa lineal
nn.init.xavier_normal_(self.fc.weight)
def forward(self, x):
# Inicialización de los estados ocultos
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
# Propagación a través de la capa GRU
out, _ = self.gru(x, h0)
# Última capa GRU
out = self.fc(out[:, -1, :])
return out
st.title('Predicción de Series de Tiempo')
st.sidebar.title('Parámetros del Modelo')
model_type = st.sidebar.selectbox('Selecciona el modelo', ('LSTM', 'GRU'))
num_epochs = st.sidebar.slider('Número de épocas', 100, 200)
learning_rate = st.sidebar.number_input('Tasa de aprendizaje', 0.001, 0.1, 0.01, 0.001)
if model_type == 'LSTM':
input_size = 1
hidden_size = 50
num_layers = 2
output_size = 1
model = LSTM(input_size, hidden_size, num_layers, output_size)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
if st.sidebar.button('Entrenar y Predecir'):
for epoch in range(num_epochs):
model.train()
outputs = model(trainX)
optimizer.zero_grad()
loss = criterion(outputs, trainY)
loss.backward()
optimizer.step()
if (epoch+1) % 100 == 0:
st.write(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
model.eval()
train_predict = model(trainX)
test_predict = model(testX)
train_predict = scaler.inverse_transform(train_predict.detach().numpy().reshape(-1, 1))
trainY_plot = scaler.inverse_transform(trainY.numpy().reshape(-1, 1))
test_predict = scaler.inverse_transform(test_predict.detach().numpy().reshape(-1, 1))
testY_plot = scaler.inverse_transform(testY.numpy().reshape(-1, 1))
train_data = pd.DataFrame({
'Fecha': filtered_data1['FECHA'].values[seq_length:seq_length+len(trainY)],
'Datos de entrenamiento': trainY_plot.ravel(),
'Predicciones de entrenamiento': train_predict.ravel()
})
test_data = pd.DataFrame({
'Fecha': filtered_data2['FECHA'].values[seq_length:seq_length+len(testY)],
'Datos de prueba': testY_plot.ravel(),
'Predicciones de prueba': test_predict.ravel()
})
# Concatenar los datos para tener una sola tabla
combined_data = pd.concat([train_data, test_data])
# Ajustar el índice
combined_data.set_index('Fecha', inplace=True)
# Mostrar la gráfica en Streamlit
st.line_chart(combined_data)
# fig, ax = plt.subplots(figsize=(12, 6))
# ax.plot(filtered_data1['FECHA'].values[seq_length:seq_length+len(trainY)], trainY_plot, label='Datos de entrenamiento')
# ax.plot(filtered_data1['FECHA'].values[seq_length:seq_length+len(trainY)], train_predict, label='Predicciones de entrenamiento')
# ax.plot(filtered_data2['FECHA'].values[seq_length:seq_length+len(testY)], testY_plot, label='Datos de prueba')
# ax.plot(filtered_data2['FECHA'].values[seq_length:seq_length+len(testY)], test_predict, label='Predicciones de prueba')
# ax.set_xlabel('Fecha')
# ax.set_ylabel('VALOR-LS-CF-N')
# ax.set_title('Predicciones con LSTM')
# ax.legend()
# ax.grid(True)
# st.pyplot(fig)
else :
input_size = 1
hidden_size = 50
num_layers = 2
output_size = 1
model = GRU(input_size, hidden_size, num_layers, output_size)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
if st.sidebar.button('Entrenar y Predecir'):
for epoch in range(num_epochs):
model.train()
outputs = model(trainX)
optimizer.zero_grad()
loss = criterion(outputs, trainY)
loss.backward()
optimizer.step()
if (epoch+1) % 100 == 0:
st.write(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
model.eval()
train_predict = model(trainX)
test_predict = model(testX)
train_predict = scaler.inverse_transform(train_predict.detach().numpy().reshape(-1, 1))
trainY_plot = scaler.inverse_transform(trainY.numpy().reshape(-1, 1))
test_predict = scaler.inverse_transform(test_predict.detach().numpy().reshape(-1, 1))
testY_plot = scaler.inverse_transform(testY.numpy().reshape(-1, 1))
train_data = pd.DataFrame({
'Fecha': filtered_data1['FECHA'].values[seq_length:seq_length+len(trainY)],
'Datos de entrenamiento': trainY_plot.ravel(),
'Predicciones de entrenamiento': train_predict.ravel()
})
test_data = pd.DataFrame({
'Fecha': filtered_data2['FECHA'].values[seq_length:seq_length+len(testY)],
'Datos de prueba': testY_plot.ravel(),
'Predicciones de prueba': test_predict.ravel()
})
# Concatenar los datos para tener una sola tabla
combined_data = pd.concat([train_data, test_data])
# Ajustar el índice
combined_data.set_index('Fecha', inplace=True)
# Mostrar la gráfica en Streamlit
st.line_chart(combined_data)