Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
import torch | |
import torch.nn as nn | |
import matplotlib.pyplot as plt | |
from sklearn.preprocessing import MinMaxScaler | |
# Funci贸n para cargar y filtrar datos | |
def load_and_filter_data(file, year): | |
data = pd.read_csv(file) | |
data['FECHA'] = pd.to_datetime(data['FECHA']) | |
return data[data['FECHA'].dt.year >= year] | |
# Funci贸n para crear ventanas deslizantes | |
def sliding_windows(data, seq_length): | |
x, y = [], [] | |
for i in range(len(data) - seq_length): | |
x.append(data[i:i + seq_length]) | |
y.append(data[i + seq_length]) | |
return np.array(x), np.array(y) | |
# Clase LSTM | |
class LSTM(nn.Module): | |
def __init__(self, input_size, hidden_size, num_layers, output_size): | |
super(LSTM, self).__init__() | |
self.hidden_size = hidden_size | |
self.num_layers = num_layers | |
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) | |
self.fc = nn.Linear(hidden_size, output_size) | |
def forward(self, x): | |
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) | |
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) | |
out, _ = self.lstm(x, (h0, c0)) | |
return self.fc(out[:, -1, :]) | |
# Clase GRU | |
class GRU(nn.Module): | |
def __init__(self, input_size, hidden_size, num_layers, output_size): | |
super(GRU, self).__init__() | |
self.hidden_size = hidden_size | |
self.num_layers = num_layers | |
self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True) | |
self.fc = nn.Linear(hidden_size, output_size) | |
def forward(self, x): | |
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size) | |
out, _ = self.gru(x, h0) | |
return self.fc(out[:, -1, :]) | |
# Funci贸n para entrenar el modelo | |
def train_model(model, criterion, optimizer, trainX, trainY, num_epochs): | |
for epoch in range(num_epochs): | |
model.train() | |
outputs = model(trainX) | |
optimizer.zero_grad() | |
loss = criterion(outputs, trainY) | |
loss.backward() | |
optimizer.step() | |
if (epoch+1) % 100 == 0: | |
st.write(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}') | |
# Funci贸n para predecir y graficar resultados | |
def predict_and_plot(model, trainX, trainY, testX, testY, scaler, filtered_data1, filtered_data2, seq_length): | |
model.eval() | |
train_predict = model(trainX) | |
test_predict = model(testX) | |
train_predict = scaler.inverse_transform(train_predict.detach().numpy().reshape(-1, 1)) | |
trainY_plot = scaler.inverse_transform(trainY.numpy().reshape(-1, 1)) | |
test_predict = scaler.inverse_transform(test_predict.detach().numpy().reshape(-1, 1)) | |
testY_plot = scaler.inverse_transform(testY.numpy().reshape(-1, 1)) | |
train_data = pd.DataFrame({ | |
'Fecha': filtered_data1['FECHA'].values[seq_length:seq_length+len(trainY)], | |
'Datos de entrenamiento': trainY_plot.ravel(), | |
'Predicciones de entrenamiento': train_predict.ravel() | |
}) | |
test_data = pd.DataFrame({ | |
'Fecha': filtered_data2['FECHA'].values[seq_length:seq_length+len(testY)], | |
'Datos de prueba': testY_plot.ravel(), | |
'Predicciones de prueba': test_predict.ravel() | |
}) | |
combined_data = pd.concat([train_data, test_data]) | |
combined_data.set_index('Fecha', inplace=True) | |
st.line_chart(combined_data) | |
def main(): | |
st.title('Predicci贸n de Series de Tiempo') | |
st.sidebar.title('Par谩metros del Modelo') | |
#Incluso podemos agregar funcion para datos futuros. | |
# 1. Crear boton para cargar archivo csv. | |
# 2. Llamar a ese archivo y guardarlo. | |
# 3. Usarlo para test y entrenamiento. (Se puede crear una funcion o filtro) | |
# //Posible implementaci贸n | |
# file = 'archivo.csv' | |
# butt = input('Ingrese al archivo para generar la prediccion de tiempo', file) | |
# //resto del codigo | |
file1 = 'PARCIAL-AGUA-_2_.csv' | |
file2 = 'PARCIAL-AGUA-_3_.csv' | |
year_filter = 2007 | |
seq_length = 4 | |
data1 = load_and_filter_data(file1, year_filter) | |
data2 = load_and_filter_data(file2, year_filter) | |
combined_values = np.concatenate([data1['VALOR-LS-CF-N'].values, data2['VALOR-LS-CF-N'].values]).reshape(-1, 1) | |
scaler = MinMaxScaler() | |
scaled_values = scaler.fit_transform(combined_values) | |
scaled_values1 = scaled_values[:len(data1)] | |
scaled_values2 = scaled_values[len(data1):] | |
x_train, y_train = sliding_windows(scaled_values1, seq_length) | |
x_test, y_test = sliding_windows(scaled_values2, seq_length) | |
trainX = torch.Tensor(x_train) | |
trainY = torch.Tensor(y_train) | |
testX = torch.Tensor(x_test) | |
testY = torch.Tensor(y_test) | |
model_type = st.sidebar.selectbox('Selecciona el modelo', ('LSTM', 'GRU')) | |
num_epochs = st.sidebar.slider('N煤mero de 茅pocas', 100, 500, 200) | |
learning_rate = 0.01 | |
#learning_rate = st.sidebar.number_input('Tasa de aprendizaje', 0.001, 0.1, 0.01, 0.001) | |
input_size = 1 | |
hidden_size = 50 | |
num_layers = 2 | |
output_size = 1 | |
if model_type == 'LSTM': | |
model = LSTM(input_size, hidden_size, num_layers, output_size) | |
else: | |
model = GRU(input_size, hidden_size, num_layers, output_size) | |
criterion = nn.MSELoss() | |
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate) | |
if st.sidebar.button('Entrenar y Predecir'): | |
train_model(model, criterion, optimizer, trainX, trainY, num_epochs) | |
predict_and_plot(model, trainX, trainY, testX, testY, scaler, data1, data2, seq_length) | |
if __name__ == "__main__": | |
main() |