MS-HNN / mshnn_model.py
FutureMa's picture
Create mshnn_model.py
20ffd89 verified
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
# 设置随机种子以确保结果可复现
torch.manual_seed(42)
np.random.seed(42)
# 模型定义
class HodgeDualLayer(nn.Module):
"""
实现霍奇对偶操作的层,灵感来自霍奇理论中的对偶性
"""
def __init__(self, in_features, out_features):
super(HodgeDualLayer, self).__init__()
self.forward_map = nn.Linear(in_features, out_features)
self.dual_map = nn.Linear(out_features, in_features)
def forward(self, x):
# 前向映射
y = self.forward_map(x)
# 对偶映射 (类似于霍奇对偶)
dual_x = self.dual_map(y)
return y, dual_x
class MirrorSymmetryBlock(nn.Module):
"""
镜像对称块:实现类似于镜像对称的结构
"""
def __init__(self, dim):
super(MirrorSymmetryBlock, self).__init__()
self.dim = dim
# 两个互为"镜像"的分支
self.branch_a = nn.Sequential(
nn.Linear(dim, dim), # 修改:保持输出维度与输入相同,以便残差连接
nn.LayerNorm(dim),
nn.GELU()
)
self.branch_b = nn.Sequential(
nn.Linear(dim, dim), # 修改:保持输出维度与输入相同,以便残差连接
nn.LayerNorm(dim),
nn.GELU()
)
# 对称性保持层
self.symmetry_preserving = nn.Parameter(torch.ones(1))
def forward(self, x):
a = self.branch_a(x)
b = self.branch_b(x)
# 通过对称操作连接两个分支
mirror_term = self.symmetry_preserving * (a * b)
return a + b + mirror_term
class ComplexStructureModule(nn.Module):
"""
模拟复几何结构的模块
"""
def __init__(self, dim):
super(ComplexStructureModule, self).__init__()
self.real_transform = nn.Linear(dim, dim)
self.imag_transform = nn.Linear(dim, dim)
def forward(self, x):
# 分离实部和虚部通道
mid_point = x.shape[1] // 2
real_part = x[:, :mid_point]
imag_part = x[:, mid_point:]
# 应用复几何变换
new_real = self.real_transform(real_part) - self.imag_transform(imag_part)
new_imag = self.imag_transform(real_part) + self.real_transform(imag_part)
# 合并实部和虚部
return torch.cat([new_real, new_imag], dim=1)
class MirrorSymmetryHodgeNetwork(nn.Module):
"""
基于镜像对称和霍奇理论概念的神经网络
"""
def __init__(self, input_dim, hidden_dim, output_dim, num_blocks=3):
super(MirrorSymmetryHodgeNetwork, self).__init__()
# 输入嵌入层
self.embedding = nn.Linear(input_dim, hidden_dim*2) # 双倍维度用于复结构
# 霍奇对偶层
self.hodge_dual = HodgeDualLayer(hidden_dim*2, hidden_dim*2)
# 镜像对称块
self.mirror_blocks = nn.ModuleList([
MirrorSymmetryBlock(hidden_dim*2) for _ in range(num_blocks)
])
# 复结构模块
self.complex_structure = ComplexStructureModule(hidden_dim)
# 输出映射
self.output_map = nn.Linear(hidden_dim*2, output_dim)
# 标度因子(代表霍奇结构中的度量)
self.scale_factor = nn.Parameter(torch.ones(1))
def forward(self, x):
# 初始嵌入
x = self.embedding(x)
# 应用霍奇对偶
primary, dual = self.hodge_dual(x)
# 残差连接
x = primary + self.scale_factor * dual
# 应用镜像对称块
for block in self.mirror_blocks:
x = x + block(x) # 残差连接
# 应用复结构变换
x = self.complex_structure(x)
# 输出层
return self.output_map(x)
# 基准模型 - 标准MLP
class BaselineMLP(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim, num_layers=3):
super(BaselineMLP, self).__init__()
layers = [nn.Linear(input_dim, hidden_dim), nn.ReLU()]
for _ in range(num_layers - 1):
layers.extend([nn.Linear(hidden_dim, hidden_dim), nn.ReLU()])
layers.append(nn.Linear(hidden_dim, output_dim))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# 生成具有对称性的合成数据
def generate_symmetric_data(n_samples=1000, input_dim=10, noise_level=0.1):
"""
生成具有对称性质的数据,适合测试镜像对称模型
"""
# 随机生成输入特征
X = np.random.randn(n_samples, input_dim)
# 创建符合对称性的目标变量
# 一半特征与另一半特征之间存在对称关系
mid = input_dim // 2
# 基础函数
y_base = np.sum(X[:, :mid]**2, axis=1) - np.sum(X[:, mid:]**2, axis=1)
# 添加一些镜像对称项
mirror_terms = np.sum(X[:, :mid] * X[:, mid:], axis=1)
# 添加复结构项 - 确保索引不会越界
if mid > 1: # 确保有足够的维度进行复结构计算
complex_terms = np.sum(X[:, :mid-1] * X[:, 1:mid] - X[:, mid:-1] * X[:, mid+1:], axis=1)
else:
complex_terms = np.zeros(n_samples)
# 组合各项,创建最终目标
y = y_base + 0.5 * mirror_terms + 0.3 * complex_terms
# 添加噪声
y += noise_level * np.random.randn(n_samples)
# 转换为张量
X_tensor = torch.FloatTensor(X)
y_tensor = torch.FloatTensor(y).reshape(-1, 1)
return X_tensor, y_tensor
# 训练函数
def train_model(model, train_loader, val_loader, epochs=100, lr=0.001, device='cpu'):
"""
训练模型并返回训练历史
"""
model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)
history = {
'train_loss': [],
'val_loss': [],
}
best_val_loss = float('inf')
best_model_state = None
for epoch in range(epochs):
# 训练阶段
model.train()
train_loss = 0.0
for X_batch, y_batch in train_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
# 前向传播
y_pred = model(X_batch)
loss = criterion(y_pred, y_batch)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
train_loss /= len(train_loader)
history['train_loss'].append(train_loss)
# 验证阶段
model.eval()
val_loss = 0.0
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
y_pred = model(X_batch)
loss = criterion(y_pred, y_batch)
val_loss += loss.item()
val_loss /= len(val_loader)
history['val_loss'].append(val_loss)
# 更新学习率
scheduler.step(val_loss)
# 保存最佳模型
if val_loss < best_val_loss:
best_val_loss = val_loss
best_model_state = model.state_dict().copy()
# 输出进度
if (epoch + 1) % 10 == 0:
print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}')
# 加载最佳模型权重
model.load_state_dict(best_model_state)
return model, history
# 评估函数
def evaluate_model(model, test_loader, device='cpu'):
"""
评估模型性能
"""
model.eval()
criterion = nn.MSELoss()
all_preds = []
all_targets = []
test_loss = 0.0
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
y_pred = model(X_batch)
loss = criterion(y_pred, y_batch)
test_loss += loss.item()
all_preds.append(y_pred.cpu().numpy())
all_targets.append(y_batch.cpu().numpy())
test_loss /= len(test_loader)
all_preds = np.vstack(all_preds)
all_targets = np.vstack(all_targets)
# 计算R2和RMSE
r2 = r2_score(all_targets, all_preds)
rmse = np.sqrt(mean_squared_error(all_targets, all_preds))
return {
'test_loss': test_loss,
'r2': r2,
'rmse': rmse,
'predictions': all_preds,
'targets': all_targets
}
# 绘制训练历史
def plot_training_history(history_mirror, history_baseline):
"""
绘制训练和验证损失的对比图
"""
plt.figure(figsize=(12, 5))
# 训练损失
plt.subplot(1, 2, 1)
plt.plot(history_mirror['train_loss'], label='Mirror Symmetry Model')
plt.plot(history_baseline['train_loss'], label='Baseline MLP')
plt.title('Training Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
# 验证损失
plt.subplot(1, 2, 2)
plt.plot(history_mirror['val_loss'], label='Mirror Symmetry Model')
plt.plot(history_baseline['val_loss'], label='Baseline MLP')
plt.title('Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.tight_layout()
plt.show()
# 绘制预测对比
def plot_predictions(mirror_results, baseline_results):
"""
绘制预测值与真实值的对比图
"""
plt.figure(figsize=(12, 5))
# 镜像对称模型的预测
plt.subplot(1, 2, 1)
plt.scatter(mirror_results['targets'], mirror_results['predictions'], alpha=0.5)
min_val = min(mirror_results['targets'].min(), mirror_results['predictions'].min())
max_val = max(mirror_results['targets'].max(), mirror_results['predictions'].max())
plt.plot([min_val, max_val], [min_val, max_val], 'r--')
plt.title(f'Mirror Symmetry Model\nR² = {mirror_results["r2"]:.4f}, RMSE = {mirror_results["rmse"]:.4f}')
plt.xlabel('True Values')
plt.ylabel('Predicted Values')
# 基准模型的预测
plt.subplot(1, 2, 2)
plt.scatter(baseline_results['targets'], baseline_results['predictions'], alpha=0.5)
min_val = min(baseline_results['targets'].min(), baseline_results['predictions'].min())
max_val = max(baseline_results['targets'].max(), baseline_results['predictions'].max())
plt.plot([min_val, max_val], [min_val, max_val], 'r--')
plt.title(f'Baseline MLP\nR² = {baseline_results["r2"]:.4f}, RMSE = {baseline_results["rmse"]:.4f}')
plt.xlabel('True Values')
plt.ylabel('Predicted Values')
plt.tight_layout()
plt.show()
# 主函数
def main():
# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# 超参数
input_dim = 10
hidden_dim = 64
output_dim = 1
batch_size = 32
epochs = 100
lr = 0.001
# 生成数据
X, y = generate_symmetric_data(n_samples=5000, input_dim=input_dim)
# 划分数据集
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)
# 创建数据加载器
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)
# 实例化模型
mirror_model = MirrorSymmetryHodgeNetwork(input_dim, hidden_dim, output_dim)
baseline_model = BaselineMLP(input_dim, hidden_dim, output_dim)
# 训练镜像对称模型
print("Training Mirror Symmetry Hodge Network...")
mirror_model, history_mirror = train_model(
mirror_model, train_loader, val_loader,
epochs=epochs, lr=lr, device=device
)
# 训练基准模型
print("\nTraining Baseline MLP...")
baseline_model, history_baseline = train_model(
baseline_model, train_loader, val_loader,
epochs=epochs, lr=lr, device=device
)
# 评估两个模型
print("\nEvaluating models on test set...")
mirror_results = evaluate_model(mirror_model, test_loader, device)
baseline_results = evaluate_model(baseline_model, test_loader, device)
# 输出结果
print("\nMirror Symmetry Hodge Network Results:")
print(f"Test Loss: {mirror_results['test_loss']:.6f}")
print(f"R2 Score: {mirror_results['r2']:.6f}")
print(f"RMSE: {mirror_results['rmse']:.6f}")
print("\nBaseline MLP Results:")
print(f"Test Loss: {baseline_results['test_loss']:.6f}")
print(f"R2 Score: {baseline_results['r2']:.6f}")
print(f"RMSE: {baseline_results['rmse']:.6f}")
# 绘制结果
plot_training_history(history_mirror, history_baseline)
plot_predictions(mirror_results, baseline_results)
# 保存最佳模型
torch.save(mirror_model.state_dict(), 'mirror_symmetry_hodge_model.pth')
torch.save(baseline_model.state_dict(), 'baseline_mlp_model.pth')
print("\nModels saved successfully!")
if __name__ == "__main__":
main()