|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
import torch.optim as optim |
|
from torch.utils.data import DataLoader, TensorDataset |
|
import numpy as np |
|
import matplotlib.pyplot as plt |
|
from tqdm import tqdm |
|
from sklearn.model_selection import train_test_split |
|
from sklearn.metrics import mean_squared_error, r2_score |
|
|
|
|
|
torch.manual_seed(42) |
|
np.random.seed(42) |
|
|
|
|
|
class HodgeDualLayer(nn.Module): |
|
""" |
|
实现霍奇对偶操作的层,灵感来自霍奇理论中的对偶性 |
|
""" |
|
def __init__(self, in_features, out_features): |
|
super(HodgeDualLayer, self).__init__() |
|
self.forward_map = nn.Linear(in_features, out_features) |
|
self.dual_map = nn.Linear(out_features, in_features) |
|
|
|
def forward(self, x): |
|
|
|
y = self.forward_map(x) |
|
|
|
dual_x = self.dual_map(y) |
|
return y, dual_x |
|
|
|
class MirrorSymmetryBlock(nn.Module): |
|
""" |
|
镜像对称块:实现类似于镜像对称的结构 |
|
""" |
|
def __init__(self, dim): |
|
super(MirrorSymmetryBlock, self).__init__() |
|
self.dim = dim |
|
|
|
self.branch_a = nn.Sequential( |
|
nn.Linear(dim, dim), |
|
nn.LayerNorm(dim), |
|
nn.GELU() |
|
) |
|
self.branch_b = nn.Sequential( |
|
nn.Linear(dim, dim), |
|
nn.LayerNorm(dim), |
|
nn.GELU() |
|
) |
|
|
|
self.symmetry_preserving = nn.Parameter(torch.ones(1)) |
|
|
|
def forward(self, x): |
|
a = self.branch_a(x) |
|
b = self.branch_b(x) |
|
|
|
mirror_term = self.symmetry_preserving * (a * b) |
|
return a + b + mirror_term |
|
|
|
class ComplexStructureModule(nn.Module): |
|
""" |
|
模拟复几何结构的模块 |
|
""" |
|
def __init__(self, dim): |
|
super(ComplexStructureModule, self).__init__() |
|
self.real_transform = nn.Linear(dim, dim) |
|
self.imag_transform = nn.Linear(dim, dim) |
|
|
|
def forward(self, x): |
|
|
|
mid_point = x.shape[1] // 2 |
|
real_part = x[:, :mid_point] |
|
imag_part = x[:, mid_point:] |
|
|
|
|
|
new_real = self.real_transform(real_part) - self.imag_transform(imag_part) |
|
new_imag = self.imag_transform(real_part) + self.real_transform(imag_part) |
|
|
|
|
|
return torch.cat([new_real, new_imag], dim=1) |
|
|
|
class MirrorSymmetryHodgeNetwork(nn.Module): |
|
""" |
|
基于镜像对称和霍奇理论概念的神经网络 |
|
""" |
|
def __init__(self, input_dim, hidden_dim, output_dim, num_blocks=3): |
|
super(MirrorSymmetryHodgeNetwork, self).__init__() |
|
|
|
|
|
self.embedding = nn.Linear(input_dim, hidden_dim*2) |
|
|
|
|
|
self.hodge_dual = HodgeDualLayer(hidden_dim*2, hidden_dim*2) |
|
|
|
|
|
self.mirror_blocks = nn.ModuleList([ |
|
MirrorSymmetryBlock(hidden_dim*2) for _ in range(num_blocks) |
|
]) |
|
|
|
|
|
self.complex_structure = ComplexStructureModule(hidden_dim) |
|
|
|
|
|
self.output_map = nn.Linear(hidden_dim*2, output_dim) |
|
|
|
|
|
self.scale_factor = nn.Parameter(torch.ones(1)) |
|
|
|
def forward(self, x): |
|
|
|
x = self.embedding(x) |
|
|
|
|
|
primary, dual = self.hodge_dual(x) |
|
|
|
|
|
x = primary + self.scale_factor * dual |
|
|
|
|
|
for block in self.mirror_blocks: |
|
x = x + block(x) |
|
|
|
|
|
x = self.complex_structure(x) |
|
|
|
|
|
return self.output_map(x) |
|
|
|
|
|
class BaselineMLP(nn.Module): |
|
def __init__(self, input_dim, hidden_dim, output_dim, num_layers=3): |
|
super(BaselineMLP, self).__init__() |
|
|
|
layers = [nn.Linear(input_dim, hidden_dim), nn.ReLU()] |
|
for _ in range(num_layers - 1): |
|
layers.extend([nn.Linear(hidden_dim, hidden_dim), nn.ReLU()]) |
|
layers.append(nn.Linear(hidden_dim, output_dim)) |
|
|
|
self.network = nn.Sequential(*layers) |
|
|
|
def forward(self, x): |
|
return self.network(x) |
|
|
|
|
|
def generate_symmetric_data(n_samples=1000, input_dim=10, noise_level=0.1): |
|
""" |
|
生成具有对称性质的数据,适合测试镜像对称模型 |
|
""" |
|
|
|
X = np.random.randn(n_samples, input_dim) |
|
|
|
|
|
|
|
mid = input_dim // 2 |
|
|
|
|
|
y_base = np.sum(X[:, :mid]**2, axis=1) - np.sum(X[:, mid:]**2, axis=1) |
|
|
|
|
|
mirror_terms = np.sum(X[:, :mid] * X[:, mid:], axis=1) |
|
|
|
|
|
if mid > 1: |
|
complex_terms = np.sum(X[:, :mid-1] * X[:, 1:mid] - X[:, mid:-1] * X[:, mid+1:], axis=1) |
|
else: |
|
complex_terms = np.zeros(n_samples) |
|
|
|
|
|
y = y_base + 0.5 * mirror_terms + 0.3 * complex_terms |
|
|
|
|
|
y += noise_level * np.random.randn(n_samples) |
|
|
|
|
|
X_tensor = torch.FloatTensor(X) |
|
y_tensor = torch.FloatTensor(y).reshape(-1, 1) |
|
|
|
return X_tensor, y_tensor |
|
|
|
|
|
def train_model(model, train_loader, val_loader, epochs=100, lr=0.001, device='cpu'): |
|
""" |
|
训练模型并返回训练历史 |
|
""" |
|
model.to(device) |
|
criterion = nn.MSELoss() |
|
optimizer = optim.Adam(model.parameters(), lr=lr) |
|
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5) |
|
|
|
history = { |
|
'train_loss': [], |
|
'val_loss': [], |
|
} |
|
|
|
best_val_loss = float('inf') |
|
best_model_state = None |
|
|
|
for epoch in range(epochs): |
|
|
|
model.train() |
|
train_loss = 0.0 |
|
|
|
for X_batch, y_batch in train_loader: |
|
X_batch, y_batch = X_batch.to(device), y_batch.to(device) |
|
|
|
|
|
y_pred = model(X_batch) |
|
loss = criterion(y_pred, y_batch) |
|
|
|
|
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
|
|
train_loss += loss.item() |
|
|
|
train_loss /= len(train_loader) |
|
history['train_loss'].append(train_loss) |
|
|
|
|
|
model.eval() |
|
val_loss = 0.0 |
|
|
|
with torch.no_grad(): |
|
for X_batch, y_batch in val_loader: |
|
X_batch, y_batch = X_batch.to(device), y_batch.to(device) |
|
y_pred = model(X_batch) |
|
loss = criterion(y_pred, y_batch) |
|
val_loss += loss.item() |
|
|
|
val_loss /= len(val_loader) |
|
history['val_loss'].append(val_loss) |
|
|
|
|
|
scheduler.step(val_loss) |
|
|
|
|
|
if val_loss < best_val_loss: |
|
best_val_loss = val_loss |
|
best_model_state = model.state_dict().copy() |
|
|
|
|
|
if (epoch + 1) % 10 == 0: |
|
print(f'Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}') |
|
|
|
|
|
model.load_state_dict(best_model_state) |
|
|
|
return model, history |
|
|
|
|
|
def evaluate_model(model, test_loader, device='cpu'): |
|
""" |
|
评估模型性能 |
|
""" |
|
model.eval() |
|
criterion = nn.MSELoss() |
|
|
|
all_preds = [] |
|
all_targets = [] |
|
test_loss = 0.0 |
|
|
|
with torch.no_grad(): |
|
for X_batch, y_batch in test_loader: |
|
X_batch, y_batch = X_batch.to(device), y_batch.to(device) |
|
y_pred = model(X_batch) |
|
loss = criterion(y_pred, y_batch) |
|
test_loss += loss.item() |
|
|
|
all_preds.append(y_pred.cpu().numpy()) |
|
all_targets.append(y_batch.cpu().numpy()) |
|
|
|
test_loss /= len(test_loader) |
|
all_preds = np.vstack(all_preds) |
|
all_targets = np.vstack(all_targets) |
|
|
|
|
|
r2 = r2_score(all_targets, all_preds) |
|
rmse = np.sqrt(mean_squared_error(all_targets, all_preds)) |
|
|
|
return { |
|
'test_loss': test_loss, |
|
'r2': r2, |
|
'rmse': rmse, |
|
'predictions': all_preds, |
|
'targets': all_targets |
|
} |
|
|
|
|
|
def plot_training_history(history_mirror, history_baseline): |
|
""" |
|
绘制训练和验证损失的对比图 |
|
""" |
|
plt.figure(figsize=(12, 5)) |
|
|
|
|
|
plt.subplot(1, 2, 1) |
|
plt.plot(history_mirror['train_loss'], label='Mirror Symmetry Model') |
|
plt.plot(history_baseline['train_loss'], label='Baseline MLP') |
|
plt.title('Training Loss') |
|
plt.xlabel('Epochs') |
|
plt.ylabel('Loss') |
|
plt.legend() |
|
|
|
|
|
plt.subplot(1, 2, 2) |
|
plt.plot(history_mirror['val_loss'], label='Mirror Symmetry Model') |
|
plt.plot(history_baseline['val_loss'], label='Baseline MLP') |
|
plt.title('Validation Loss') |
|
plt.xlabel('Epochs') |
|
plt.ylabel('Loss') |
|
plt.legend() |
|
|
|
plt.tight_layout() |
|
plt.show() |
|
|
|
|
|
def plot_predictions(mirror_results, baseline_results): |
|
""" |
|
绘制预测值与真实值的对比图 |
|
""" |
|
plt.figure(figsize=(12, 5)) |
|
|
|
|
|
plt.subplot(1, 2, 1) |
|
plt.scatter(mirror_results['targets'], mirror_results['predictions'], alpha=0.5) |
|
min_val = min(mirror_results['targets'].min(), mirror_results['predictions'].min()) |
|
max_val = max(mirror_results['targets'].max(), mirror_results['predictions'].max()) |
|
plt.plot([min_val, max_val], [min_val, max_val], 'r--') |
|
plt.title(f'Mirror Symmetry Model\nR² = {mirror_results["r2"]:.4f}, RMSE = {mirror_results["rmse"]:.4f}') |
|
plt.xlabel('True Values') |
|
plt.ylabel('Predicted Values') |
|
|
|
|
|
plt.subplot(1, 2, 2) |
|
plt.scatter(baseline_results['targets'], baseline_results['predictions'], alpha=0.5) |
|
min_val = min(baseline_results['targets'].min(), baseline_results['predictions'].min()) |
|
max_val = max(baseline_results['targets'].max(), baseline_results['predictions'].max()) |
|
plt.plot([min_val, max_val], [min_val, max_val], 'r--') |
|
plt.title(f'Baseline MLP\nR² = {baseline_results["r2"]:.4f}, RMSE = {baseline_results["rmse"]:.4f}') |
|
plt.xlabel('True Values') |
|
plt.ylabel('Predicted Values') |
|
|
|
plt.tight_layout() |
|
plt.show() |
|
|
|
|
|
def main(): |
|
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
print(f"Using device: {device}") |
|
|
|
|
|
input_dim = 10 |
|
hidden_dim = 64 |
|
output_dim = 1 |
|
batch_size = 32 |
|
epochs = 100 |
|
lr = 0.001 |
|
|
|
|
|
X, y = generate_symmetric_data(n_samples=5000, input_dim=input_dim) |
|
|
|
|
|
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42) |
|
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42) |
|
|
|
|
|
train_dataset = TensorDataset(X_train, y_train) |
|
val_dataset = TensorDataset(X_val, y_val) |
|
test_dataset = TensorDataset(X_test, y_test) |
|
|
|
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) |
|
val_loader = DataLoader(val_dataset, batch_size=batch_size) |
|
test_loader = DataLoader(test_dataset, batch_size=batch_size) |
|
|
|
|
|
mirror_model = MirrorSymmetryHodgeNetwork(input_dim, hidden_dim, output_dim) |
|
baseline_model = BaselineMLP(input_dim, hidden_dim, output_dim) |
|
|
|
|
|
print("Training Mirror Symmetry Hodge Network...") |
|
mirror_model, history_mirror = train_model( |
|
mirror_model, train_loader, val_loader, |
|
epochs=epochs, lr=lr, device=device |
|
) |
|
|
|
|
|
print("\nTraining Baseline MLP...") |
|
baseline_model, history_baseline = train_model( |
|
baseline_model, train_loader, val_loader, |
|
epochs=epochs, lr=lr, device=device |
|
) |
|
|
|
|
|
print("\nEvaluating models on test set...") |
|
mirror_results = evaluate_model(mirror_model, test_loader, device) |
|
baseline_results = evaluate_model(baseline_model, test_loader, device) |
|
|
|
|
|
print("\nMirror Symmetry Hodge Network Results:") |
|
print(f"Test Loss: {mirror_results['test_loss']:.6f}") |
|
print(f"R2 Score: {mirror_results['r2']:.6f}") |
|
print(f"RMSE: {mirror_results['rmse']:.6f}") |
|
|
|
print("\nBaseline MLP Results:") |
|
print(f"Test Loss: {baseline_results['test_loss']:.6f}") |
|
print(f"R2 Score: {baseline_results['r2']:.6f}") |
|
print(f"RMSE: {baseline_results['rmse']:.6f}") |
|
|
|
|
|
plot_training_history(history_mirror, history_baseline) |
|
plot_predictions(mirror_results, baseline_results) |
|
|
|
|
|
torch.save(mirror_model.state_dict(), 'mirror_symmetry_hodge_model.pth') |
|
torch.save(baseline_model.state_dict(), 'baseline_mlp_model.pth') |
|
|
|
print("\nModels saved successfully!") |
|
|
|
if __name__ == "__main__": |
|
main() |