File size: 8,479 Bytes
3a85408 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 |
import torch.nn as nn
import torch
from matplotlib import pyplot as plt
from torch.utils.data import DataLoader
from data_load import load_soil_data
from data_processing import process_spectra
from data_processing import preprocess_with_downsampling
from resnet1d_multitask import ResNet1D_MultiTask,get_model
bin_sizes = [5,10,15,20] # 不同的降采样窗口大小
# 预处理数据
methods = ['Abs-SG0', 'Abs-SG0-SNV', 'Abs-SG1', 'Abs-SG1-SNV', 'Abs-SG2', 'Abs-SG2-SNV']
# 定义目标列
target_columns = ['pH.in.CaCl2', 'pH.in.H2O', 'OC', 'CaCO3', 'N', 'P', 'K', 'CEC']
for j in len(bin_sizes):
# 加载数据
X_train, X_test, y_train, y_test, wavelengths = load_soil_data('../LUCAS.2009_abs.csv', target_columns)
# 确保数据形状为 (n_samples, n_wavelengths)
X_train, X_test = X_train.squeeze(), X_test.squeeze()
X_train= process_spectra(X_train,methods[5])
X_test = process_spectra(X_test,methods[5])
X_train,X_train_nwavelengths=preprocess_with_downsampling(X_train,wavelengths,bin_sizes[j])
X_test,X_test_nwavelengths=preprocess_with_downsampling(X_test,wavelengths,bin_sizes[j])
# 将数据形状调整为 (n_samples, 1, n_wavelengths)
X_train = X_train.reshape(X_train.shape[0], 1, X_train.shape[1])
X_test = X_test.reshape(X_test.shape[0], 1, X_test.shape[1])
# 检查数据形状
print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)
print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)
assert X_train.shape[0] == y_train.shape[0], "Mismatch in number of samples between X_train and y_train"
assert X_test.shape[0] == y_test.shape[0], "Mismatch in number of samples between X_test and y_test"
# 创建数据加载器
train_dataset = torch.utils.data.TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32))
test_dataset = torch.utils.data.TensorDataset(torch.tensor(X_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32))
train_loader = DataLoader(train_dataset, batch_size=256, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=256, shuffle=False)
# 模型参数设置
model_name = 'C' # 可以改为 'A' 或 'B'
model = get_model(model_name)
# 损失函数
criterion = nn.SmoothL1Loss()
# 优化器
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.81)
from sklearn.metrics import root_mean_squared_error, r2_score
import numpy as np
# 训练参数
num_epochs = 50
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
# 初始化指标列表
train_losses = []
test_losses = []
train_rmse = []
train_r2 = []
# 训练循环
for epoch in range(num_epochs):
model.train()
total_loss = 0
all_preds = []
all_targets = []
for batch_x, batch_y in train_loader:
# 移动数据到设备
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
# 前向传播
outputs = model(batch_x)
# 计算损失
loss = criterion(outputs, batch_y)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
# 收集预测和真实值
all_preds.append(outputs.cpu().detach().numpy())
all_targets.append(batch_y.cpu().detach().numpy())
train_losses.append(total_loss / len(train_loader))
# 更新学习率
scheduler.step() # 在每个epoch结束后调整学习率
# 计算RMSE和R²
all_preds = np.concatenate(all_preds, axis=0)
all_targets = np.concatenate(all_targets, axis=0)
epoch_rmse = root_mean_squared_error(all_targets, all_preds)
epoch_r2 = r2_score(all_targets, all_preds)
train_rmse.append(epoch_rmse)
train_r2.append(epoch_r2)
# 在每个epoch结束后评估测试集
model.eval()
test_loss = 0
with torch.no_grad():
for batch_x, batch_y in test_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
test_outputs = model(batch_x)
loss = criterion(test_outputs, batch_y)
test_loss += loss.item()
test_losses.append(test_loss / len(test_loader))
print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_losses[-1]:.4f}, Test Loss: {test_losses[-1]:.4f}')
#if epoch % 20 == 0: torch.save(model.state_dict(), f'models/now.pth')
from sklearn.metrics import root_mean_squared_error, r2_score
# 模型评估
model.eval()
total_test_loss = 0
test_preds = []
test_targets = []
# 将列名和索引建立映射
target_columns = ['pH.in.CaCl2', 'pH.in.H2O', 'OC', 'CaCO3', 'N', 'P', 'K', 'CEC']
column_mapping = {i: col for i, col in enumerate(target_columns)}
with torch.no_grad():
for batch_x, batch_y in test_loader:
batch_x, batch_y = batch_x.to(device), batch_y.to(device)
test_outputs = model(batch_x)
test_loss = criterion(test_outputs, batch_y)
total_test_loss += test_loss.item()
# 收集预测值和真实值
test_preds.append(test_outputs.cpu().numpy())
test_targets.append(batch_y.cpu().numpy())
# 计算平均测试损失
avg_test_loss = total_test_loss / len(test_loader)
print(f'Average Test Loss: {avg_test_loss:.4f}')
# 将预测值和真实值拼接在一起
test_preds = np.concatenate(test_preds, axis=0)
test_targets = np.concatenate(test_targets, axis=0)
# 计算每个指标的 RMSE 和 R²
from datetime import datetime
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
results_file = f'../results3/metrics_model{model_name}_{current_time}.txt'
# 确保results目录存在
import os
if not os.path.exists('../results3'):
os.makedirs('../results3')
with open(results_file, 'w') as f:
f.write(f"Results for Model {model_name} generated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write("-" * 50 + "\n")
for i in range(test_targets.shape[1]):
target_i = test_targets[:, i]
pred_i = test_preds[:, i]
# 计算 RMSE 和 R²
rmse_i = np.sqrt(root_mean_squared_error(target_i, pred_i))
r2_i = r2_score(target_i, pred_i)
# 打印当前指标的结果
result_line = f'Indicator {i + 1} ({column_mapping[i]}) - RMSE: {rmse_i:.4f}, R²: {r2_i:.4f}'
print(result_line)
#f.write(result_line + '\n')
#f.write("\nAverage Test Loss: {:.4f}\n".format(avg_test_loss))
# 绘制并保存训练和测试损失图
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label='Training Loss')
plt.plot(test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title(f'Training and Test Loss over Epochs (Model {model_name})')
plt.legend()
plt.grid(True)
plt.savefig(f'../results3/loss_curves_model{model_name}_{current_time}_{bin_sizes[j]}.png', dpi=300, bbox_inches='tight')
plt.show()
# 绘制并保存指标图
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(train_rmse, label='Mean_Training RMSE')
plt.plot(train_r2, label='Mean_Training R2')
plt.xlabel('Epoch')
plt.ylabel('Metric')
plt.title(f'Training Metrics (Model {model_name})')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(test_losses, label='Test Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title(f'Test Loss (Model {model_name})')
plt.legend()
plt.tight_layout()
plt.savefig(f'../results3/training_metrics_model{model_name}_{current_time}_{bin_sizes[j]}.png', dpi=300, bbox_inches='tight')
plt.show()
print(f"\nResults have been saved to: {results_file}")
print(f"Figures have been saved to: ../results3/") |