Encoder + GRU for time series

#711
by vinben007 - opened

Hi everyone,

I have a time series of size (2000, 300, 3) representing 2000 data points, 300 time steps and 3 inputs features (current, voltage and temperature) and I want to predict health indicators related to battery degradation. Thus, my output is (2000, 3). I am originally using a GRU for one dataset but in another dataset, the data is quite sparse and I know that a Transformer or at least for now a Transformer Encoder + GRU would eventually do the job. Essentially, my GRU's output predicts those 3 health indicators and my loss function would simply be the mean square error of predicted and true value.

I am using pytorch and I have done the following implementation that i have implemented from an NLP transformer tutorial and I am trying to adapt it to a time series. i was wondering if anyone could help me because it does not work.

Here is my code and I believe I have some work to be done regarding the positional encoding that I need to modify for my time series.

This is my module class:

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        # Ensure that the model dimension (d_model) is divisible by the number of heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        # Initialize dimensions
        self.d_model = d_model # Model's dimension
        self.num_heads = num_heads # Number of attention heads
        self.d_k = d_model // num_heads # Dimension of each head's key, query, and value
        
        # Linear layers for transforming inputs
        self.W_q = nn.Linear(d_model, d_model) # Query transformation
        self.W_k = nn.Linear(d_model, d_model) # Key transformation
        self.W_v = nn.Linear(d_model, d_model) # Value transformation
        self.W_o = nn.Linear(d_model, d_model) # Output transformation
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Calculate attention scores
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        # Apply mask if provided (useful for preventing attention to certain parts like padding)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        
        # Softmax is applied to obtain attention probabilities
        attn_probs = torch.softmax(attn_scores, dim=-1)
        
        # Multiply by values to obtain the final output
        output = torch.matmul(attn_probs, V)
        return output
        
    def split_heads(self, x):
        # Reshape the input to have num_heads for multi-head attention
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        
    def combine_heads(self, x):
        # Combine the multiple heads back to original shape
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        
    def forward(self, Q, K, V, mask=None):
        # Apply linear transformations and split heads
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        # Perform scaled dot-product attention
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        
        # Combine heads and apply output transformation
        output = self.W_o(self.combine_heads(attn_output))
        return output 

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff,max_seq_length, dropout):
        super(EncoderLayer, self).__init__()
        self.encoder_embedding = nn.Linear(3, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, x):
        x = (x != 0).unsqueeze(1).unsqueeze(2)
        return x

    def forward(self, x):
        mask_x = self.generate_mask(x)
        x_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(x)))
        attn_output = self.self_attn(x_embedded, x_embedded, x_embedded, mask_x)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))

        return x

class MultiLayerEncoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_layers, max_seq_length, dropout):
        super(MultiLayerEncoder, self).__init__()
        self.encoder_layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, d_ff, max_seq_length, dropout) for _ in range(num_layers)
        ])
        
    def forward(self, x):
        # Pass through each encoder layer
        for layer in self.encoder_layers:
            x = layer(x)
        return x

This is how I would call my training:

    encoder = MultiLayerEncoder(d_model=3, num_heads=8, d_ff=64,num_layers=5, max_seq_length=100, dropout=0.1).to(device)
    feature_extractor = GRU_Dense(gru_dense_para, num_features=num_features, device=device).to(device)
"""
    Still quite unsure about this line
    """
    # Loop through parameters and categorize them
    for name, param in feature_extractor.named_parameters(): # iterates over all the named parameters (i.e., weights and biases) within the feature_extractor module
        if param.requires_grad:
            if 'linear_relu_stack' in name:
                Dense_name += [name]
                Dense_params += [param]
            elif 'gru' in name:
                GRU_name += [name]
                GRU_params += [param]
        else:
            print('no need')
            
    # Group parameters for the GRU-Dense model and GP model
    GRUDense_params = [
        {"params": GRU_params, "lr": init_lr},
        {"params": Dense_params, "lr": init_lr},
        ]
    Encider_params = [
        {"params": encoder.parameters(), "lr": init_lr},
        ]
# Set models to training mode
    encoder.train()
    feature_extractor.train()
# Initialize optimizers and schedulers
    Encoder_optimizer = torch.optim.AdamW(Encider_params)
    Encoder_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer=Encoder_optimizer, mode='min', verbose=True, min_lr=1e-4)
    GRUDense_optimizer = torch.optim.AdamW(GRUDense_params)
    GRUDense_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer=GRUDense_optimizer, mode='min', verbose=True, min_lr=1e-4) # Reduce learning rate when a metric has stopped improving.
    GRUDense_loss_function = nn.MSELoss() # means square error loss function
 # training loop
    for epoch in iterator:
        
        # Reset gradients
        GRUDense_optimizer.zero_grad()
        GPoptimizer.zero_grad()

        train_x_label.requires_grad_(True)

        # Forward pass through the GRU-Dense feature extractor for labeled data
        # this only extracts trainig labeled data
        encoder_output = encoder(train_x_label) # output would be (train_label_num, 300, 3)
        GRUDense_output = feature_extractor(encoder_output) # train_x_label has the size of (train_label_num, 300, 3)
        # output of gru would be (train_label_num, 3)
# Compute the loss for the GRU-Dense output compared to the labeled training data
        GRUDense_loss = GRUDense_loss_function(GRUDense_output, train_y_label) # train_y_label and GRUDense_output are the truth and the prediction
        GRUDense_loss_list.append(GRUDense_loss.item())

        # Saliency Map implementation - start

        # Backward pass to compute gradients
        GRUDense_loss.backward(retain_graph=True)
# Update the GRU-Dense and GP model parameters based on the gradients
        # This how I think they are trying to match the train_x_label and truth label: train_y_label
        GRUDense_optimizer.step()
        GRUDense_scheduler.step(GRUDense_loss.item())
# Checkpointing: Save the model and optimizer states periodically and when the loss is at its minimum
        # checkpointing occurs every 50 epochs
        if 0 == (epoch+1) % 50 and GRUDense_loss < GRUDense_min_loss :
            GRUDense_min_loss = GRUDense_loss
            GRUDense_check_point = {
                'epoch':epoch,
                'model':feature_extractor.state_dict(),
                'optimizer':GRUDense_optimizer.state_dict(),
                'lr_schedule':GRUDense_scheduler.state_dict()
            }
            torch.save(GRUDense_check_point, os.path.join(result_log_paths[0], 'ckpt_feature_extractor_{}th.pth'.format(epoch+1)))
Hugging Face H4 org

Hi!
This question is completely unrelated to the Open LLM Leaderboard - I think you will have better luck on the HuggingFace forums or on StackOverflow.

Best of luck with your problem!

clefourrier changed discussion status to closed

Sign up or log in to comment