| | import os
|
| | import re
|
| | import numpy as np
|
| | import pandas as pd
|
| | from glob import glob
|
| | from sklearn.preprocessing import StandardScaler
|
| |
|
| |
|
| | class DamageCalculator:
|
| |
|
| | @staticmethod
|
| | def compute_freeze_thaw_damage(FN, FT, a1=0.002, b1=1.0, c1=0.02):
|
| | return a1 * (FN ** b1) * np.exp(c1 * FT)
|
| |
|
| | @staticmethod
|
| | def compute_chemical_damage(pH, a2=0.01, b2=1.5):
|
| | return a2 * np.abs(pH - 7.0) ** b2
|
| |
|
| | @staticmethod
|
| | def compute_thermal_damage(T, T0=100.0, a3=0.0003, b3=1.2):
|
| | if T < T0:
|
| | return 0.0
|
| | return a3 * ((T - T0) ** b3)
|
| |
|
| | @staticmethod
|
| | def compute_total_damage(pH, FN, FT, T):
|
| | D_ft = DamageCalculator.compute_freeze_thaw_damage(FN, FT)
|
| | D_ch = DamageCalculator.compute_chemical_damage(pH)
|
| | D_th = DamageCalculator.compute_thermal_damage(T)
|
| |
|
| | D_total = 1.0 - (1.0 - D_ft) * (1.0 - D_ch) * (1.0 - D_th)
|
| | return np.clip(D_total, 0.0, 0.99)
|
| |
|
| | @staticmethod
|
| | def compute_lambda(D0):
|
| | return 1.0 - D0
|
| |
|
| |
|
| | class CrackDataLoader:
|
| |
|
| | def __init__(self, base_path, stress_type="major"):
|
| | self.base_path = base_path
|
| | self.stress_type = stress_type
|
| |
|
| | if stress_type == "major":
|
| | self.data_dir = os.path.join(base_path, "major_principal_stress")
|
| | else:
|
| | self.data_dir = os.path.join(base_path, "minor_principal_stress")
|
| |
|
| | self.scaler_X = StandardScaler()
|
| | self.scaler_y = StandardScaler()
|
| | self.damage_calculator = DamageCalculator()
|
| |
|
| | def parse_filename(self, filename):
|
| | pattern = r'(\d+)-(\d+)-(\d+)-(\d+)'
|
| | match = re.search(pattern, filename)
|
| |
|
| | if match:
|
| | pH = int(match.group(1))
|
| | FN = int(match.group(2))
|
| | FT = int(match.group(3))
|
| | T = int(match.group(4))
|
| |
|
| | return {
|
| | 'pH': pH,
|
| | 'FN': FN,
|
| | 'FT': FT,
|
| | 'T': T
|
| | }
|
| | else:
|
| | raise ValueError(f"Cannot parse filename: {filename}")
|
| |
|
| | def load_single_csv(self, csv_path):
|
| | data = pd.read_csv(csv_path, header=None, names=['angle', 'count'])
|
| | angles = data['angle'].values
|
| | counts = data['count'].values
|
| | return angles, counts
|
| |
|
| | def load_all_data(self, phase="both"):
|
| | X_list = []
|
| | y_list = []
|
| | damage_list = []
|
| |
|
| | if phase == "both":
|
| | subdirs = ["unstable_development", "peak_stress"]
|
| | elif phase == "early":
|
| | subdirs = ["unstable_development"]
|
| | elif phase == "peak":
|
| | subdirs = ["peak_stress"]
|
| | else:
|
| | raise ValueError(f"Unknown phase: {phase}")
|
| |
|
| | for subdir in subdirs:
|
| | subdir_path = os.path.join(self.data_dir, subdir)
|
| |
|
| | if not os.path.exists(subdir_path):
|
| | print(f"Warning: Directory does not exist {subdir_path}")
|
| | continue
|
| |
|
| | phase_code = 0 if "unstable" in subdir else 1
|
| |
|
| | csv_files = glob(os.path.join(subdir_path, "*.csv"))
|
| |
|
| | print(f"Loading {len(csv_files)} files from {subdir}...")
|
| |
|
| | for csv_file in csv_files:
|
| | try:
|
| | params = self.parse_filename(os.path.basename(csv_file))
|
| |
|
| | angles, counts = self.load_single_csv(csv_file)
|
| |
|
| | D0 = DamageCalculator.compute_total_damage(
|
| | params['pH'], params['FN'], params['FT'], params['T']
|
| | )
|
| | lambda_coef = DamageCalculator.compute_lambda(D0)
|
| |
|
| | features = np.array([
|
| | params['pH'],
|
| | params['FN'],
|
| | params['FT'],
|
| | params['T'],
|
| | phase_code
|
| | ], dtype=np.float32)
|
| |
|
| | X_list.append(features)
|
| | y_list.append(counts)
|
| | damage_list.append({'D0': D0, 'lambda': lambda_coef})
|
| |
|
| | except Exception as e:
|
| | print(f"Skipping file {csv_file}: {e}")
|
| | continue
|
| |
|
| | if len(X_list) == 0:
|
| | raise ValueError("No data loaded successfully!")
|
| |
|
| | X = np.array(X_list)
|
| |
|
| | y_length = len(y_list[0])
|
| | y_padded = []
|
| |
|
| | for y_sample in y_list:
|
| | if len(y_sample) < y_length:
|
| | y_sample = np.pad(y_sample, (0, y_length - len(y_sample)), 'constant')
|
| | elif len(y_sample) > y_length:
|
| | y_sample = y_sample[:y_length]
|
| | y_padded.append(y_sample)
|
| |
|
| | y = np.array(y_padded)
|
| |
|
| | angles, _ = self.load_single_csv(csv_files[0])
|
| | angle_bins = angles[:y_length]
|
| |
|
| | print(f"\nData loading complete:")
|
| | print(f" Samples: {X.shape[0]}")
|
| | print(f" Input features: {X.shape[1]} (pH, FN, FT, T, phase)")
|
| | print(f" Output dimension: {y.shape[1]} (angle bins)")
|
| | print(f" Angle range: {angle_bins[0]:.1f} - {angle_bins[-1]:.1f}")
|
| | print(f" Total cracks range: {y.sum(axis=1).min():.0f} - {y.sum(axis=1).max():.0f}")
|
| |
|
| | return X, y, angle_bins, damage_list
|
| |
|
| | def create_synthetic_data(self, n_samples=100, output_dim=72):
|
| | pH_values = [1, 3, 5, 7]
|
| | FN_values = [5, 10, 20, 40]
|
| | FT_values = [10, 20, 30, 40]
|
| | T_values = [25, 300, 600, 900]
|
| | phase_values = [0, 1]
|
| |
|
| | X_list = []
|
| | y_list = []
|
| |
|
| | for _ in range(n_samples):
|
| | pH = np.random.choice(pH_values)
|
| | FN = np.random.choice(FN_values)
|
| | FT = np.random.choice(FT_values)
|
| | T = np.random.choice(T_values)
|
| | phase = np.random.choice(phase_values)
|
| |
|
| | D0 = DamageCalculator.compute_total_damage(pH, FN, FT, T)
|
| |
|
| | if self.stress_type == "major":
|
| | peak_angle = 90.0 + np.random.normal(0, 10)
|
| | spread = 15.0 + D0 * 20.0
|
| | else:
|
| | peak_angle = 45.0 + np.random.normal(0, 15)
|
| | spread = 20.0 + D0 * 25.0
|
| |
|
| | angles = np.linspace(0, 175, output_dim)
|
| | distribution = np.exp(-0.5 * ((angles - peak_angle) / spread) ** 2)
|
| | distribution = distribution * (100 + D0 * 200) * (1 + 0.5 * phase)
|
| | distribution = distribution + np.random.normal(0, 5, output_dim)
|
| | distribution = np.maximum(distribution, 0)
|
| |
|
| | X_list.append([pH, FN, FT, T, phase])
|
| | y_list.append(distribution)
|
| |
|
| | X = np.array(X_list, dtype=np.float32)
|
| | y = np.array(y_list, dtype=np.float32)
|
| | angle_bins = np.linspace(0, 175, output_dim)
|
| |
|
| | return X, y, angle_bins
|
| |
|
| | def normalize_data(self, X_train, y_train, X_test=None, y_test=None):
|
| | X_train_norm = self.scaler_X.fit_transform(X_train)
|
| | y_train_norm = self.scaler_y.fit_transform(y_train)
|
| |
|
| | if X_test is not None and y_test is not None:
|
| | X_test_norm = self.scaler_X.transform(X_test)
|
| | y_test_norm = self.scaler_y.transform(y_test)
|
| | return X_train_norm, y_train_norm, X_test_norm, y_test_norm
|
| | else:
|
| | return X_train_norm, y_train_norm
|
| |
|
| | def denormalize_output(self, y_norm):
|
| | return self.scaler_y.inverse_transform(y_norm)
|
| |
|
| | def get_statistics(self, X, y):
|
| | stats = {
|
| | 'n_samples': X.shape[0],
|
| | 'input_dim': X.shape[1],
|
| | 'output_dim': y.shape[1],
|
| | 'pH_range': (X[:, 0].min(), X[:, 0].max()),
|
| | 'FN_range': (X[:, 1].min(), X[:, 1].max()),
|
| | 'FT_range': (X[:, 2].min(), X[:, 2].max()),
|
| | 'T_range': (X[:, 3].min(), X[:, 3].max()),
|
| | 'total_cracks_range': (y.sum(axis=1).min(), y.sum(axis=1).max()),
|
| | 'total_cracks_mean': y.sum(axis=1).mean(),
|
| | 'total_cracks_std': y.sum(axis=1).std(),
|
| | }
|
| |
|
| | D0_values = []
|
| | for i in range(X.shape[0]):
|
| | D0 = DamageCalculator.compute_total_damage(X[i, 0], X[i, 1], X[i, 2], X[i, 3])
|
| | D0_values.append(D0)
|
| |
|
| | stats['D0_range'] = (min(D0_values), max(D0_values))
|
| | stats['D0_mean'] = np.mean(D0_values)
|
| |
|
| | return stats
|
| |
|