|
|
import pandas as pd |
|
|
import numpy as np |
|
|
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, LabelEncoder, PowerTransformer |
|
|
import json |
|
|
import pickle |
|
|
from datetime import datetime |
|
|
import warnings |
|
|
warnings.filterwarnings('ignore') |
|
|
import os |
|
|
|
|
|
class CryptoDataNormalizer: |
|
|
""" |
|
|
Enhanced normalization pipeline for cryptocurrency features data with crypto-specific handling |
|
|
""" |
|
|
|
|
|
def __init__(self, preserve_symbol=True, handle_outliers=True, feature_engineering=True): |
|
|
self.scalers = {} |
|
|
self.encoders = {} |
|
|
self.feature_info = {} |
|
|
self.is_fitted = False |
|
|
self.preserve_symbol = preserve_symbol |
|
|
self.handle_outliers = handle_outliers |
|
|
self.feature_engineering = feature_engineering |
|
|
self.outlier_bounds = {} |
|
|
|
|
|
def _detect_outliers(self, df, column): |
|
|
"""Detect outliers using IQR method""" |
|
|
Q1 = df[column].quantile(0.25) |
|
|
Q3 = df[column].quantile(0.75) |
|
|
IQR = Q3 - Q1 |
|
|
lower_bound = Q1 - 1.5 * IQR |
|
|
upper_bound = Q3 + 1.5 * IQR |
|
|
return lower_bound, upper_bound |
|
|
|
|
|
def _handle_outliers(self, df, column, method='clip'): |
|
|
"""Handle outliers in numerical data""" |
|
|
if column not in self.outlier_bounds: |
|
|
lower_bound, upper_bound = self._detect_outliers(df, column) |
|
|
self.outlier_bounds[column] = (lower_bound, upper_bound) |
|
|
else: |
|
|
lower_bound, upper_bound = self.outlier_bounds[column] |
|
|
|
|
|
if method == 'clip': |
|
|
return df[column].clip(lower_bound, upper_bound) |
|
|
elif method == 'remove': |
|
|
return df[column].where((df[column] >= lower_bound) & (df[column] <= upper_bound)) |
|
|
return df[column] |
|
|
|
|
|
def _categorize_features(self, df): |
|
|
"""Enhanced feature categorization for crypto data""" |
|
|
|
|
|
id_features = ['symbol', 'backup_id', '__index_level_0__', 'cg_id'] |
|
|
|
|
|
|
|
|
timestamp_features = [col for col in df.columns if 'timestamp' in col.lower()] |
|
|
|
|
|
|
|
|
binary_features = [] |
|
|
for col in df.columns: |
|
|
if col not in id_features + timestamp_features: |
|
|
unique_vals = set(df[col].dropna().unique()) |
|
|
if (df[col].dtype == bool or |
|
|
(len(unique_vals) <= 2 and unique_vals.issubset({0, 1, True, False, np.nan})) or |
|
|
col in ['stable']): |
|
|
binary_features.append(col) |
|
|
|
|
|
|
|
|
categorical_features = [] |
|
|
for col in df.columns: |
|
|
if (col not in id_features + binary_features + timestamp_features and |
|
|
(df[col].dtype == 'object' or |
|
|
df[col].dtype.name == 'category' or |
|
|
(df[col].nunique() < 20 and df[col].dtype in ['int64', 'int32']))): |
|
|
categorical_features.append(col) |
|
|
|
|
|
|
|
|
crypto_specific_features = [] |
|
|
crypto_keywords = ['dominance', 'rank'] |
|
|
for col in df.columns: |
|
|
if any(keyword in col.lower() for keyword in crypto_keywords): |
|
|
if col not in id_features + timestamp_features + binary_features + categorical_features: |
|
|
crypto_specific_features.append(col) |
|
|
|
|
|
|
|
|
price_volume_features = [] |
|
|
for col in df.columns: |
|
|
if any(keyword in col.lower() for keyword in ['price', 'volume', 'marketcap', 'open']): |
|
|
if col not in id_features + timestamp_features + binary_features + categorical_features + crypto_specific_features: |
|
|
price_volume_features.append(col) |
|
|
|
|
|
|
|
|
exchange_features = [] |
|
|
for col in df.columns: |
|
|
if col.startswith('exchangePrices.'): |
|
|
exchange_features.append(col) |
|
|
|
|
|
|
|
|
performance_features = [] |
|
|
for col in df.columns: |
|
|
if col.startswith('performance.'): |
|
|
performance_features.append(col) |
|
|
|
|
|
|
|
|
rank_diff_features = [] |
|
|
for col in df.columns: |
|
|
if col.startswith('rankDiffs.'): |
|
|
rank_diff_features.append(col) |
|
|
|
|
|
|
|
|
technical_features = [] |
|
|
tech_keywords = ['rsi', 'macd', 'ema', 'sma', 'bb_', 'cci', 'mfi', 'atr', 'stoch', 'roc'] |
|
|
for col in df.columns: |
|
|
if any(keyword in col.lower() for keyword in tech_keywords): |
|
|
if col not in (id_features + timestamp_features + binary_features + categorical_features + |
|
|
crypto_specific_features + price_volume_features + exchange_features + |
|
|
performance_features + rank_diff_features): |
|
|
technical_features.append(col) |
|
|
|
|
|
|
|
|
social_features = [] |
|
|
for col in df.columns: |
|
|
if any(keyword in col.lower() for keyword in ['social', 'sentiment', 'confidence', 'pos', 'neg', 'neu']): |
|
|
if col not in (id_features + timestamp_features + binary_features + categorical_features + |
|
|
crypto_specific_features + price_volume_features + exchange_features + |
|
|
performance_features + rank_diff_features + technical_features): |
|
|
social_features.append(col) |
|
|
|
|
|
|
|
|
transaction_features = [] |
|
|
for col in df.columns: |
|
|
if any(keyword in col.lower() for keyword in ['transaction', 'tx_', 'gas', 'fees']): |
|
|
if col not in (id_features + timestamp_features + binary_features + categorical_features + |
|
|
crypto_specific_features + price_volume_features + exchange_features + |
|
|
performance_features + rank_diff_features + technical_features + social_features): |
|
|
transaction_features.append(col) |
|
|
|
|
|
|
|
|
quality_features = [] |
|
|
for col in df.columns: |
|
|
if any(keyword in col.lower() for keyword in ['completeness', 'quality', 'correlation']): |
|
|
if col not in (id_features + timestamp_features + binary_features + categorical_features + |
|
|
crypto_specific_features + price_volume_features + exchange_features + |
|
|
performance_features + rank_diff_features + technical_features + |
|
|
social_features + transaction_features): |
|
|
quality_features.append(col) |
|
|
|
|
|
|
|
|
numerical_features = [] |
|
|
all_categorized = (id_features + timestamp_features + binary_features + categorical_features + |
|
|
crypto_specific_features + price_volume_features + exchange_features + |
|
|
performance_features + rank_diff_features + technical_features + |
|
|
social_features + transaction_features + quality_features) |
|
|
|
|
|
for col in df.columns: |
|
|
if (col not in all_categorized and |
|
|
pd.api.types.is_numeric_dtype(df[col])): |
|
|
numerical_features.append(col) |
|
|
|
|
|
return { |
|
|
'id_features': id_features, |
|
|
'timestamp_features': timestamp_features, |
|
|
'binary_features': binary_features, |
|
|
'categorical_features': categorical_features, |
|
|
'crypto_specific_features': crypto_specific_features, |
|
|
'price_volume_features': price_volume_features, |
|
|
'exchange_features': exchange_features, |
|
|
'performance_features': performance_features, |
|
|
'rank_diff_features': rank_diff_features, |
|
|
'technical_features': technical_features, |
|
|
'social_features': social_features, |
|
|
'transaction_features': transaction_features, |
|
|
'quality_features': quality_features, |
|
|
'numerical_features': numerical_features |
|
|
} |
|
|
|
|
|
def _engineer_crypto_features(self, df, normalized_df): |
|
|
"""Create crypto-specific engineered features""" |
|
|
if not self.feature_engineering: |
|
|
return normalized_df |
|
|
|
|
|
|
|
|
exchange_cols = [col for col in df.columns if col.startswith('exchangePrices.')] |
|
|
if len(exchange_cols) > 1: |
|
|
exchange_prices = df[exchange_cols].replace([np.inf, -np.inf], np.nan) |
|
|
if not exchange_prices.empty and exchange_prices.notna().any().any(): |
|
|
price_mean = exchange_prices.mean(axis=1) |
|
|
price_max = exchange_prices.max(axis=1) |
|
|
price_min = exchange_prices.min(axis=1) |
|
|
price_std = exchange_prices.std(axis=1) |
|
|
|
|
|
|
|
|
valid_mask = (price_mean > 0) & price_mean.notna() |
|
|
if valid_mask.any(): |
|
|
normalized_df['exchange_price_spread'] = ((price_max - price_min) / price_mean).fillna(0) |
|
|
normalized_df['exchange_price_std'] = (price_std / price_mean).fillna(0) |
|
|
|
|
|
|
|
|
perf_short_cols = [col for col in df.columns if col.startswith('performance.') and any(timeframe in col for timeframe in ['min1', 'min5', 'min15', 'hour'])] |
|
|
perf_long_cols = [col for col in df.columns if col.startswith('performance.') and any(timeframe in col for timeframe in ['day', 'week', 'month'])] |
|
|
|
|
|
if perf_short_cols: |
|
|
short_perf = df[perf_short_cols].replace([np.inf, -np.inf], np.nan) |
|
|
normalized_df['short_term_momentum'] = short_perf.mean(axis=1).fillna(0) |
|
|
if perf_long_cols: |
|
|
long_perf = df[perf_long_cols].replace([np.inf, -np.inf], np.nan) |
|
|
normalized_df['long_term_momentum'] = long_perf.mean(axis=1).fillna(0) |
|
|
|
|
|
|
|
|
rank_diff_cols = [col for col in df.columns if col.startswith('rankDiffs.')] |
|
|
if rank_diff_cols: |
|
|
rank_diffs = df[rank_diff_cols].replace([np.inf, -np.inf], np.nan).fillna(0) |
|
|
normalized_df['rank_stability'] = 1 / (1 + rank_diffs.abs().sum(axis=1) + 1e-8) |
|
|
|
|
|
|
|
|
social_sentiment_cols = [col for col in df.columns if 'social_sentiment' in col.lower()] |
|
|
if social_sentiment_cols: |
|
|
social_data = df[social_sentiment_cols].replace([np.inf, -np.inf], np.nan) |
|
|
normalized_df['avg_social_sentiment'] = social_data.mean(axis=1).fillna(0.5) |
|
|
|
|
|
|
|
|
tech_cols = [col for col in df.columns if any(tech in col.lower() for tech in ['rsi', 'macd', 'cci'])] |
|
|
if tech_cols: |
|
|
tech_data = df[tech_cols].replace([np.inf, -np.inf], np.nan) |
|
|
normalized_df['technical_strength'] = tech_data.mean(axis=1).fillna(0) |
|
|
|
|
|
|
|
|
if 'volume' in df.columns and 'price' in df.columns: |
|
|
volume = df['volume'].replace([np.inf, -np.inf], np.nan) |
|
|
price = df['price'].replace([np.inf, -np.inf], np.nan) |
|
|
valid_mask = (price > 0) & price.notna() & volume.notna() |
|
|
if valid_mask.any(): |
|
|
ratio = volume / price |
|
|
normalized_df['volume_price_ratio'] = ratio.fillna(0) |
|
|
|
|
|
|
|
|
if 'dominance' in df.columns and 'rank' in df.columns: |
|
|
dominance = df['dominance'].replace([np.inf, -np.inf], np.nan).fillna(0) |
|
|
rank = df['rank'].replace([np.inf, -np.inf], np.nan).fillna(1000) |
|
|
|
|
|
rank_reciprocal = 1 / (rank + 1e-8) |
|
|
normalized_df['dominance_rank_ratio'] = (dominance / rank_reciprocal).fillna(0) |
|
|
|
|
|
return normalized_df |
|
|
|
|
|
def fit(self, df): |
|
|
"""Fit the normalizer on training data with crypto-specific preprocessing""" |
|
|
if isinstance(df, dict): |
|
|
df = pd.DataFrame([df]) |
|
|
|
|
|
self.feature_info = self._categorize_features(df) |
|
|
|
|
|
|
|
|
feature_types = { |
|
|
'crypto_specific_features': RobustScaler(), |
|
|
'price_volume_features': RobustScaler(), |
|
|
'exchange_features': StandardScaler(), |
|
|
'performance_features': StandardScaler(), |
|
|
'rank_diff_features': StandardScaler(), |
|
|
'technical_features': StandardScaler(), |
|
|
'social_features': StandardScaler(), |
|
|
'transaction_features': PowerTransformer(), |
|
|
'quality_features': MinMaxScaler(), |
|
|
'numerical_features': PowerTransformer() |
|
|
} |
|
|
|
|
|
for feature_type, scaler in feature_types.items(): |
|
|
features = self.feature_info[feature_type] |
|
|
if features: |
|
|
|
|
|
existing_features = [col for col in features if col in df.columns] |
|
|
if existing_features: |
|
|
|
|
|
if self.handle_outliers and feature_type in ['crypto_specific_features', 'price_volume_features']: |
|
|
df_clean = df.copy() |
|
|
for col in existing_features: |
|
|
df_clean[col] = self._handle_outliers(df_clean, col) |
|
|
else: |
|
|
df_clean = df.copy() |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
df_clean[existing_features] = df_clean[existing_features].replace([np.inf, -np.inf], np.nan) |
|
|
|
|
|
|
|
|
if feature_type in ['crypto_specific_features', 'price_volume_features']: |
|
|
|
|
|
for col in existing_features: |
|
|
df_clean[col] = df_clean[col].fillna(method='ffill').fillna(df_clean[col].median()).fillna(0) |
|
|
elif feature_type in ['performance_features', 'rank_diff_features']: |
|
|
|
|
|
df_clean[existing_features] = df_clean[existing_features].fillna(0) |
|
|
elif feature_type == 'quality_features': |
|
|
|
|
|
df_clean[existing_features] = df_clean[existing_features].fillna(0.5) |
|
|
else: |
|
|
|
|
|
for col in existing_features: |
|
|
df_clean[col] = df_clean[col].fillna(df_clean[col].median()).fillna(0) |
|
|
|
|
|
|
|
|
df_clean[existing_features] = df_clean[existing_features].replace([np.inf, -np.inf], 0) |
|
|
|
|
|
|
|
|
scaler.fit(df_clean[existing_features]) |
|
|
self.scalers[feature_type] = scaler |
|
|
self.feature_info[f'{feature_type}_existing'] = existing_features |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Warning: Could not fit scaler for {feature_type}: {e}") |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
for col in self.feature_info['categorical_features']: |
|
|
if col in df.columns: |
|
|
self.encoders[col] = LabelEncoder() |
|
|
self.encoders[col].fit(df[col].astype(str).fillna('unknown')) |
|
|
|
|
|
self.is_fitted = True |
|
|
return self |
|
|
|
|
|
def transform(self, data): |
|
|
"""Transform data using fitted normalizers with crypto-specific handling""" |
|
|
if not self.is_fitted: |
|
|
raise ValueError("Normalizer must be fitted before transform") |
|
|
|
|
|
if isinstance(data, dict): |
|
|
df = pd.DataFrame([data]) |
|
|
else: |
|
|
df = data.copy() |
|
|
|
|
|
normalized_df = pd.DataFrame(index=df.index) |
|
|
|
|
|
|
|
|
if self.preserve_symbol and 'symbol' in df.columns: |
|
|
normalized_df['symbol'] = df['symbol'] |
|
|
|
|
|
|
|
|
for col in self.feature_info['timestamp_features']: |
|
|
if col in df.columns: |
|
|
ts = pd.to_datetime(df[col], unit='ms', errors='coerce') |
|
|
|
|
|
normalized_df[f'{col}_hour'] = ts.dt.hour / 23.0 |
|
|
normalized_df[f'{col}_day_of_week'] = ts.dt.dayofweek / 6.0 |
|
|
normalized_df[f'{col}_month'] = (ts.dt.month - 1) / 11.0 |
|
|
normalized_df[f'{col}_quarter'] = (ts.dt.quarter - 1) / 3.0 |
|
|
normalized_df[f'{col}_is_weekend'] = (ts.dt.dayofweek >= 5).astype(int) |
|
|
|
|
|
normalized_df[f'{col}_is_asian_hours'] = ((ts.dt.hour >= 0) & (ts.dt.hour <= 8)).astype(int) |
|
|
normalized_df[f'{col}_is_european_hours'] = ((ts.dt.hour >= 8) & (ts.dt.hour <= 16)).astype(int) |
|
|
normalized_df[f'{col}_is_american_hours'] = ((ts.dt.hour >= 16) & (ts.dt.hour <= 24)).astype(int) |
|
|
|
|
|
|
|
|
for col in self.feature_info['binary_features']: |
|
|
if col in df.columns: |
|
|
normalized_df[col] = df[col].fillna(0).astype(int) |
|
|
|
|
|
|
|
|
for col in self.feature_info['categorical_features']: |
|
|
if col in df.columns and col in self.encoders: |
|
|
try: |
|
|
|
|
|
values = df[col].astype(str).fillna('unknown') |
|
|
encoded_values = [] |
|
|
for val in values: |
|
|
try: |
|
|
encoded_values.append(self.encoders[col].transform([val])[0]) |
|
|
except ValueError: |
|
|
|
|
|
encoded_values.append(0) |
|
|
normalized_df[f'{col}_encoded'] = encoded_values |
|
|
except Exception: |
|
|
normalized_df[f'{col}_encoded'] = 0 |
|
|
|
|
|
|
|
|
feature_types = ['crypto_specific_features', 'price_volume_features', 'exchange_features', |
|
|
'performance_features', 'rank_diff_features', 'technical_features', |
|
|
'social_features', 'transaction_features', 'quality_features', 'numerical_features'] |
|
|
|
|
|
for feature_type in feature_types: |
|
|
if feature_type in self.scalers: |
|
|
existing_features = self.feature_info.get(f'{feature_type}_existing', []) |
|
|
available_features = [col for col in existing_features if col in df.columns] |
|
|
if available_features: |
|
|
try: |
|
|
|
|
|
if (self.handle_outliers and |
|
|
feature_type in ['crypto_specific_features', 'price_volume_features']): |
|
|
df_clean = df.copy() |
|
|
for col in available_features: |
|
|
if col in self.outlier_bounds: |
|
|
lower_bound, upper_bound = self.outlier_bounds[col] |
|
|
df_clean[col] = df_clean[col].clip(lower_bound, upper_bound) |
|
|
else: |
|
|
df_clean = df.copy() |
|
|
|
|
|
|
|
|
|
|
|
df_clean[available_features] = df_clean[available_features].replace([np.inf, -np.inf], np.nan) |
|
|
|
|
|
|
|
|
if feature_type in ['crypto_specific_features', 'price_volume_features']: |
|
|
|
|
|
for col in available_features: |
|
|
df_clean[col] = df_clean[col].fillna(method='ffill').fillna(method='bfill').fillna(0) |
|
|
elif feature_type in ['performance_features', 'rank_diff_features']: |
|
|
|
|
|
df_clean[available_features] = df_clean[available_features].fillna(0) |
|
|
elif feature_type == 'quality_features': |
|
|
|
|
|
df_clean[available_features] = df_clean[available_features].fillna(0.5) |
|
|
else: |
|
|
|
|
|
df_clean[available_features] = df_clean[available_features].fillna(0) |
|
|
|
|
|
|
|
|
df_clean[available_features] = df_clean[available_features].replace([np.inf, -np.inf], 0) |
|
|
|
|
|
|
|
|
scaled_data = self.scalers[feature_type].transform(df_clean[available_features]) |
|
|
|
|
|
|
|
|
scaler_name = type(self.scalers[feature_type]).__name__.lower().replace('scaler', '').replace('transformer', '') |
|
|
for i, col in enumerate(available_features): |
|
|
normalized_df[f'{col}_{scaler_name}_scaled'] = scaled_data[:, i] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Warning: Could not transform {feature_type}: {e}") |
|
|
|
|
|
for col in available_features: |
|
|
if col in df.columns: |
|
|
clean_col = df[col].replace([np.inf, -np.inf], np.nan).fillna(0) |
|
|
normalized_df[f'{col}_raw'] = clean_col |
|
|
|
|
|
|
|
|
normalized_df = self._engineer_crypto_features(df, normalized_df) |
|
|
|
|
|
|
|
|
|
|
|
normalized_df = normalized_df.replace([np.inf, -np.inf], np.nan) |
|
|
|
|
|
|
|
|
for col in normalized_df.columns: |
|
|
if normalized_df[col].isna().any(): |
|
|
if col == 'symbol': |
|
|
continue |
|
|
elif 'sentiment' in col.lower(): |
|
|
normalized_df[col] = normalized_df[col].fillna(0.5) |
|
|
elif 'ratio' in col.lower() or 'momentum' in col.lower(): |
|
|
normalized_df[col] = normalized_df[col].fillna(0) |
|
|
elif 'hour' in col or 'day_of_week' in col or 'month' in col or 'quarter' in col: |
|
|
normalized_df[col] = normalized_df[col].fillna(0) |
|
|
elif col.endswith('_encoded'): |
|
|
normalized_df[col] = normalized_df[col].fillna(0) |
|
|
else: |
|
|
normalized_df[col] = normalized_df[col].fillna(0) |
|
|
|
|
|
|
|
|
try: |
|
|
assert not normalized_df.isnull().any().any(), "Still contains NaN values after cleanup" |
|
|
assert not np.isinf(normalized_df.select_dtypes(include=[np.number])).any().any(), "Still contains infinite values after cleanup" |
|
|
except AssertionError as e: |
|
|
print(f"Warning: {e}") |
|
|
|
|
|
normalized_df = normalized_df.fillna(0).replace([np.inf, -np.inf], 0) |
|
|
|
|
|
return normalized_df |
|
|
|
|
|
def fit_transform(self, data): |
|
|
"""Fit and transform in one step""" |
|
|
return self.fit(data).transform(data) |
|
|
|
|
|
def get_feature_importance_info(self): |
|
|
"""Return information about feature categories for model interpretation""" |
|
|
return { |
|
|
'feature_categories': self.feature_info, |
|
|
'scalers_used': {k: type(v).__name__ for k, v in self.scalers.items()}, |
|
|
'total_features': sum(len(features) for features in self.feature_info.values() if isinstance(features, list)) |
|
|
} |
|
|
|
|
|
def save(self, filepath): |
|
|
"""Save the fitted normalizer""" |
|
|
with open(filepath, 'wb') as f: |
|
|
pickle.dump({ |
|
|
'scalers': self.scalers, |
|
|
'encoders': self.encoders, |
|
|
'feature_info': self.feature_info, |
|
|
'is_fitted': self.is_fitted, |
|
|
'preserve_symbol': self.preserve_symbol, |
|
|
'handle_outliers': self.handle_outliers, |
|
|
'feature_engineering': self.feature_engineering, |
|
|
'outlier_bounds': self.outlier_bounds |
|
|
}, f) |
|
|
|
|
|
def load(self, filepath): |
|
|
"""Load a fitted normalizer""" |
|
|
with open(filepath, 'rb') as f: |
|
|
data = pickle.load(f) |
|
|
self.scalers = data['scalers'] |
|
|
self.encoders = data['encoders'] |
|
|
self.feature_info = data['feature_info'] |
|
|
self.is_fitted = data['is_fitted'] |
|
|
self.preserve_symbol = data.get('preserve_symbol', True) |
|
|
self.handle_outliers = data.get('handle_outliers', True) |
|
|
self.feature_engineering = data.get('feature_engineering', True) |
|
|
self.outlier_bounds = data.get('outlier_bounds', {}) |
|
|
return self |
|
|
|
|
|
def normalize_crypto_data_file(input_file, output_file, save_normalizer=True, **kwargs): |
|
|
""" |
|
|
Enhanced normalization function for crypto data |
|
|
""" |
|
|
|
|
|
if input_file.endswith('.parquet'): |
|
|
df = pd.read_parquet(input_file) |
|
|
print(f"Loaded {len(df)} records with {len(df.columns)} features from parquet") |
|
|
else: |
|
|
data = [] |
|
|
with open(input_file, 'r') as f: |
|
|
for line in f: |
|
|
data.append(json.loads(line.strip())) |
|
|
df = pd.DataFrame(data) |
|
|
print(f"Loaded {len(df)} records with {len(df.columns)} features from jsonl") |
|
|
|
|
|
|
|
|
normalizer = CryptoDataNormalizer(**kwargs) |
|
|
|
|
|
|
|
|
feature_info = normalizer._categorize_features(df) |
|
|
print("\nCrypto Feature Categorization:") |
|
|
for category, features in feature_info.items(): |
|
|
if features: |
|
|
print(f" {category}: {len(features)} features") |
|
|
|
|
|
|
|
|
normalized_df = normalizer.fit_transform(df) |
|
|
|
|
|
print(f"\nNormalized to {len(normalized_df.columns)} features") |
|
|
print(f"Data shape: {normalized_df.shape}") |
|
|
|
|
|
|
|
|
importance_info = normalizer.get_feature_importance_info() |
|
|
print(f"\nScalers used: {importance_info['scalers_used']}") |
|
|
|
|
|
|
|
|
import os |
|
|
output_dir = os.path.dirname(output_file) |
|
|
if output_dir and not os.path.exists(output_dir): |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
|
|
|
pkl_output_file = output_file.replace('.csv', '.pkl') |
|
|
normalized_df.to_pickle(pkl_output_file) |
|
|
print(f"Saved normalized data to {pkl_output_file}") |
|
|
|
|
|
|
|
|
if save_normalizer: |
|
|
normalizer_file = output_file.replace('.csv', '_crypto_normalizer.pkl') |
|
|
normalizer.save(normalizer_file) |
|
|
print(f"Saved normalizer to {normalizer_file}") |
|
|
|
|
|
return normalized_df, normalizer |
|
|
|
|
|
|
|
|
import argparse |
|
|
|
|
|
def main(): |
|
|
parser = argparse.ArgumentParser( |
|
|
description="Enhanced normalization for cryptocurrency features with crypto-specific handling" |
|
|
) |
|
|
parser.add_argument('input', nargs='?', default='data/merged/features/crypto_features.parquet', |
|
|
help='Input file (.parquet or .jsonl)') |
|
|
parser.add_argument('output', nargs='?', default='data/merged/features/norm/crypto_features_normalized.pkl', |
|
|
help='Output PKL file for normalized features') |
|
|
parser.add_argument('--no-save-normalizer', action='store_true', |
|
|
help='Do not save the normalizer pickle') |
|
|
parser.add_argument('--no-preserve-symbol', action='store_true', |
|
|
help='Do not preserve symbol column') |
|
|
parser.add_argument('--no-handle-outliers', action='store_true', |
|
|
help='Do not handle outliers') |
|
|
parser.add_argument('--no-feature-engineering', action='store_true', |
|
|
help='Do not create engineered features') |
|
|
parser.add_argument('--train', action='store_true', |
|
|
help='Normalize the train file and save under train/norm/') |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
kwargs = { |
|
|
'preserve_symbol': not args.no_preserve_symbol, |
|
|
'handle_outliers': not args.no_handle_outliers, |
|
|
'feature_engineering': not args.no_feature_engineering |
|
|
} |
|
|
|
|
|
if args.train: |
|
|
train_input = 'data/merged/train/crypto_features_train.parquet' |
|
|
train_norm_dir = 'data/merged/train/norm' |
|
|
os.makedirs(train_norm_dir, exist_ok=True) |
|
|
train_output = os.path.join(train_norm_dir, 'crypto_features_train_normalized.pkl') |
|
|
print(f"[INFO] Normalizing train file: {train_input} -> {train_output}") |
|
|
normalize_crypto_data_file( |
|
|
train_input, |
|
|
train_output, |
|
|
save_normalizer=not args.no_save_normalizer, |
|
|
**kwargs |
|
|
) |
|
|
else: |
|
|
print(f"[INFO] Enhanced crypto normalizing: {args.input} -> {args.output}") |
|
|
print(f"[INFO] Options: {kwargs}") |
|
|
normalize_crypto_data_file( |
|
|
args.input, |
|
|
args.output, |
|
|
save_normalizer=not args.no_save_normalizer, |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |