Spaces:
Sleeping
Sleeping
import pandas as pd | |
import numpy as np | |
from sklearn.base import BaseEstimator, TransformerMixin | |
from scipy.fftpack import fft | |
from scipy.signal import welch | |
import pywt | |
#from _config import config | |
class ExtractFeatures(BaseEstimator, TransformerMixin): | |
def __init__(self, window_length, window_step_size, data_frequency, selected_domains=None, include_magnitude=False, features_label_columns=None): | |
self.window_length = window_length | |
self.window_step_size = window_step_size | |
self.data_frequency = data_frequency | |
self.selected_domains = selected_domains | |
self.include_magnitude = include_magnitude | |
self.features_label_columns = features_label_columns #if label_columns else ["arousal", "valence"] # Default to arousal and valence if not specified | |
def fit(self, X, y=None): | |
return self | |
def transform(self, X): | |
features_list = [] | |
if 'groupid' in X.columns: # Check for groupid column | |
for groupid in X['groupid'].unique(): # Iterate over unique group IDs | |
temp = X[X['groupid'] == groupid] # Filter rows by group ID | |
temp_ex = temp[['accel_time', 'x', 'y', 'z']].copy() # Keep only the necessary columns (accel_time can be removed if unused) | |
windows = self._window_data(temp_ex[['x', 'y', 'z']]) # Create windows of data | |
for window in windows: | |
features = self._extract_features_from_window(window) # Extract features from each window | |
features['groupid'] = groupid # Add groupid to the features | |
# Dynamically add emotion labels to the features | |
for label in self.features_label_columns: | |
features[label] = temp[label].iloc[0] | |
features_list.append(pd.DataFrame([features])) # Convert dictionary to DataFrame | |
else: # In case there's no groupid, calculate features without it | |
windows = self._window_data(X[['x', 'y', 'z']]) | |
for window in windows: | |
features = self._extract_features_from_window(window) | |
features_list.append(pd.DataFrame([features])) | |
all_features = pd.concat(features_list, ignore_index=True) | |
# Export features to CSV | |
window_length_str = str(self.window_length) | |
window_step_size_str = str(self.window_step_size) | |
if self.selected_domains is None: # All features calculated if domains are not selected | |
domain_str = "all_features" | |
else: | |
domain_str = "_".join(self.selected_domains) | |
file_name = f"features_window_{window_length_str}_step_{window_step_size_str}_{domain_str}.csv" | |
all_features.to_csv(file_name, index=False) | |
print("All features extracted successfully.") | |
return all_features | |
# Time Domain Features | |
def _calculate_magnitude(self, window): | |
return np.sqrt(window[:, 0]**2 + window[:, 1]**2 + window[:, 2]**2) | |
def _window_data(self, data): # Function to create windows of the data | |
window_samples = int(self.window_length * self.data_frequency) # Number of samples in each window 60sec * 25Hz = 1500 samples | |
step_samples = int(self.window_step_size * self.data_frequency) # Number of samples to move the window | |
windows = [data[i:i + window_samples] for i in range(0, len(data) - window_samples + 1, step_samples)] # Create windows | |
return np.array(windows) | |
def _extract_features_from_window(self, window): #DONE Mehrere domains gleichzeitig berechnen | |
all_features = {} | |
if self.selected_domains is None or 'time_domain' in self.selected_domains: | |
all_features.update(self._extract_time_domain_features(window)) | |
if self.selected_domains is None or 'spatial' in self.selected_domains: | |
all_features.update(self._extract_spatial_features(window)) | |
if self.selected_domains is None or 'frequency' in self.selected_domains: | |
all_features.update(self._extract_frequency_domain_features(window)) | |
if self.selected_domains is None or 'statistical' in self.selected_domains: | |
all_features.update(self._extract_statistical_features(window)) | |
if self.selected_domains is None or 'wavelet' in self.selected_domains: | |
all_features.update(self._extract_wavelet_features(window)) | |
return all_features | |
def _extract_time_domain_features(self, window): | |
features = { | |
'mean_x': np.mean(window[:, 0]), | |
'mean_y': np.mean(window[:, 1]), | |
'mean_z': np.mean(window[:, 2]), | |
'std_x': np.std(window[:, 0]), | |
'std_y': np.std(window[:, 1]), | |
'std_z': np.std(window[:, 2]), | |
'variance_x': np.var(window[:, 0]), | |
'variance_y': np.var(window[:, 1]), | |
'variance_z': np.var(window[:, 2]), | |
'rms_x': np.sqrt(np.mean(window[:, 0]**2)), | |
'rms_y': np.sqrt(np.mean(window[:, 1]**2)), | |
'rms_z': np.sqrt(np.mean(window[:, 2]**2)), | |
'max_x': np.max(window[:, 0]), | |
'max_y': np.max(window[:, 1]), | |
'max_z': np.max(window[:, 2]), | |
'min_x': np.min(window[:, 0]), | |
'min_y': np.min(window[:, 1]), | |
'min_z': np.min(window[:, 2]), | |
'peak_to_peak_x': np.ptp(window[:, 0]), | |
'peak_to_peak_y': np.ptp(window[:, 1]), | |
'peak_to_peak_z': np.ptp(window[:, 2]), | |
'skewness_x': pd.Series(window[:, 0]).skew(), | |
'skewness_y': pd.Series(window[:, 1]).skew(), | |
'skewness_z': pd.Series(window[:, 2]).skew(), | |
'kurtosis_x': pd.Series(window[:, 0]).kurt(), | |
'kurtosis_y': pd.Series(window[:, 1]).kurt(), | |
'kurtosis_z': pd.Series(window[:, 2]).kurt(), | |
'zero_crossing_rate_x': np.sum(np.diff(np.sign(window[:, 0])) != 0), | |
'zero_crossing_rate_y': np.sum(np.diff(np.sign(window[:, 1])) != 0), | |
'zero_crossing_rate_z': np.sum(np.diff(np.sign(window[:, 2])) != 0), | |
'sma' : np.sum(np.abs(window[:, 0])) + np.sum(np.abs(window[:, 1])) + np.sum(np.abs(window[:, 2])), #Signal Magnitude Area | |
} | |
# print(f"Time domain features extracted successfully.") | |
# Additional features for Magnitude (xyz in one vector) | |
if self.include_magnitude: | |
magnitude = self._calculate_magnitude(window) | |
features['mean_magnitude'] = np.mean(magnitude) | |
features['std_magnitude'] = np.std(magnitude) | |
features['variance_magnitude'] = np.var(magnitude) | |
features['rms_magnitude'] = np.sqrt(np.mean(magnitude**2)) | |
features['max_magnitude'] = np.max(magnitude) | |
features['min_magnitude'] = np.min(magnitude) | |
features['peak_to_peak_magnitude'] = np.ptp(magnitude) | |
features['skewness_magnitude'] = pd.Series(magnitude).skew() | |
features['kurtosis_magnitude'] = pd.Series(magnitude).kurt() | |
features['zero_crossing_rate_magnitude'] = np.sum(np.diff(np.sign(magnitude)) != 0) | |
# print(f"Additional time domain features for magnitude extracted successfully.") | |
return features | |
# Spatial Features | |
def _extract_spatial_features(self, window): | |
features = {} | |
# Euclidean Norm (Magnitude) | |
magnitude = self._calculate_magnitude(window) | |
features['euclidean_norm'] = np.mean(magnitude) # or np.linalg.norm for each window | |
# Tilt Angles (Pitch and Roll) | |
pitch = np.arctan2(window[:, 1], np.sqrt(window[:, 0]**2 + window[:, 2]**2)) * (180 / np.pi) | |
roll = np.arctan2(window[:, 0], np.sqrt(window[:, 1]**2 + window[:, 2]**2)) * (180 / np.pi) | |
features['mean_pitch'] = np.mean(pitch) | |
features['mean_roll'] = np.mean(roll) | |
# Correlation between Axes | |
features['correlation_xy'] = np.corrcoef(window[:, 0], window[:, 1])[0, 1] | |
features['correlation_xz'] = np.corrcoef(window[:, 0], window[:, 2])[0, 1] | |
features['correlation_yz'] = np.corrcoef(window[:, 1], window[:, 2])[0, 1] | |
# print(f"Spatial features extracted successfully.") | |
return features | |
# Frequency Domain Features | |
def _extract_frequency_domain_features(self, window): | |
n = len(window) | |
freq_values = np.fft.fftfreq(n, d=1/self.data_frequency)[:n // 2] | |
fft_values = fft(window, axis=0) | |
fft_magnitude = np.abs(fft_values)[:n // 2] | |
features = {} | |
# Spectral Entropy | |
def spectral_entropy(signal): | |
psd = np.square(signal) | |
psd_norm = psd / np.sum(psd) | |
return -np.sum(psd_norm * np.log(psd_norm + 1e-10)) | |
for i, axis in enumerate(['x', 'y', 'z']): | |
# Dominant Frequency | |
dominant_frequency = freq_values[np.argmax(fft_magnitude[:, i])] | |
features[f'dominant_frequency_{axis}'] = dominant_frequency | |
# Spectral Entropy | |
entropy = spectral_entropy(fft_magnitude[:, i]) | |
features[f'spectral_entropy_{axis}'] = entropy | |
# Power Spectral Density (PSD) and Energy | |
f, psd_values = welch(window[:, i], fs=self.data_frequency, nperseg=n) | |
features[f'psd_mean_{axis}'] = np.mean(psd_values) | |
features[f'energy_{axis}'] = np.sum(psd_values**2) | |
# Bandwidth (frequency range containing significant portion of the energy) | |
cumulative_energy = np.cumsum(psd_values) | |
total_energy = cumulative_energy[-1] | |
low_cutoff_idx = np.argmax(cumulative_energy > 0.1 * total_energy) | |
high_cutoff_idx = np.argmax(cumulative_energy > 0.9 * total_energy) | |
bandwidth = f[high_cutoff_idx] - f[low_cutoff_idx] | |
features[f'bandwidth_{axis}'] = bandwidth | |
# Spectral Centroid (Center of mass of the spectrum) | |
spectral_centroid = np.sum(f * psd_values) / np.sum(psd_values) | |
features[f'spectral_centroid_{axis}'] = spectral_centroid | |
if self.include_magnitude: | |
# Magnitude-based Frequency Domain Features | |
magnitude = self._calculate_magnitude(window) | |
fft_magnitude_mag = np.abs(fft(magnitude))[:n // 2] | |
# Dominant Frequency for Magnitude | |
features['dominant_frequency_magnitude'] = freq_values[np.argmax(fft_magnitude_mag)] | |
# Spectral Entropy for Magnitude | |
features['spectral_entropy_magnitude'] = spectral_entropy(fft_magnitude_mag) | |
# Power Spectral Density and Energy for Magnitude | |
f, psd_values_mag = welch(magnitude, fs=self.data_frequency, nperseg=n) | |
features['psd_mean_magnitude'] = np.mean(psd_values_mag) | |
features['energy_magnitude'] = np.sum(psd_values_mag**2) | |
# Bandwidth for Magnitude | |
cumulative_energy_mag = np.cumsum(psd_values_mag) | |
total_energy_mag = cumulative_energy_mag[-1] | |
low_cutoff_idx_mag = np.argmax(cumulative_energy_mag > 0.1 * total_energy_mag) | |
high_cutoff_idx_mag = np.argmax(cumulative_energy_mag > 0.9 * total_energy_mag) | |
bandwidth_mag = f[high_cutoff_idx_mag] - f[low_cutoff_idx_mag] | |
features['bandwidth_magnitude'] = bandwidth_mag | |
# Spectral Centroid for Magnitude | |
features['spectral_centroid_magnitude'] = np.sum(f * psd_values_mag) / np.sum(psd_values_mag) | |
# print(f"Frequency domain features extracted successfully.") | |
return features | |
def _extract_statistical_features(self, window): | |
features = { | |
'25th_percentile_x': np.percentile(window[:, 0], 25), | |
'25th_percentile_y': np.percentile(window[:, 1], 25), | |
'25th_percentile_z': np.percentile(window[:, 2], 25), | |
'75th_percentile_x': np.percentile(window[:, 0], 75), | |
'75th_percentile_y': np.percentile(window[:, 1], 75), | |
'75th_percentile_z': np.percentile(window[:, 2], 75), | |
} | |
if self.include_magnitude: | |
magnitude = self._calculate_magnitude(window) | |
features['25th_percentile_magnitude'] = np.percentile(magnitude, 25) | |
features['75th_percentile_magnitude'] = np.percentile(magnitude, 75) | |
# print(f"Statistical features extracted successfully.") | |
return features | |
def _extract_wavelet_features(self, window, wavelet='db1'): | |
coeffs = pywt.wavedec(window, wavelet, axis=0, level=3) | |
features = { | |
'wavelet_energy_approx_x': np.sum(coeffs[0][:, 0]**2), | |
'wavelet_energy_approx_y': np.sum(coeffs[0][:, 1]**2), | |
'wavelet_energy_approx_z': np.sum(coeffs[0][:, 2]**2), | |
} | |
if self.include_magnitude: | |
magnitude = self._calculate_magnitude(window) | |
coeffs_magnitude = pywt.wavedec(magnitude, wavelet, level=3) | |
features['wavelet_energy_approx_magnitude'] = np.sum(coeffs_magnitude[0]**2) | |
# print(f"Wavelet features extracted successfully.") | |
return features |