Maaroufabousaleh
f
c49b21b
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, MinMaxScaler, RobustScaler, LabelEncoder, PowerTransformer
import json
import pickle
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
import os
class CryptoDataNormalizer:
"""
Enhanced normalization pipeline for cryptocurrency features data with crypto-specific handling
"""
def __init__(self, preserve_symbol=True, handle_outliers=True, feature_engineering=True):
self.scalers = {}
self.encoders = {}
self.feature_info = {}
self.is_fitted = False
self.preserve_symbol = preserve_symbol
self.handle_outliers = handle_outliers
self.feature_engineering = feature_engineering
self.outlier_bounds = {}
def _detect_outliers(self, df, column):
"""Detect outliers using IQR method"""
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return lower_bound, upper_bound
def _handle_outliers(self, df, column, method='clip'):
"""Handle outliers in numerical data"""
if column not in self.outlier_bounds:
lower_bound, upper_bound = self._detect_outliers(df, column)
self.outlier_bounds[column] = (lower_bound, upper_bound)
else:
lower_bound, upper_bound = self.outlier_bounds[column]
if method == 'clip':
return df[column].clip(lower_bound, upper_bound)
elif method == 'remove':
return df[column].where((df[column] >= lower_bound) & (df[column] <= upper_bound))
return df[column]
def _categorize_features(self, df):
"""Enhanced feature categorization for crypto data"""
# Core identification features
id_features = ['symbol', 'backup_id', '__index_level_0__', 'cg_id']
# Timestamp features
timestamp_features = [col for col in df.columns if 'timestamp' in col.lower()]
# Binary features (0/1, True/False, or boolean-like)
binary_features = []
for col in df.columns:
if col not in id_features + timestamp_features:
unique_vals = set(df[col].dropna().unique())
if (df[col].dtype == bool or
(len(unique_vals) <= 2 and unique_vals.issubset({0, 1, True, False, np.nan})) or
col in ['stable']):
binary_features.append(col)
# Categorical features (strings, objects, or low cardinality integers)
categorical_features = []
for col in df.columns:
if (col not in id_features + binary_features + timestamp_features and
(df[col].dtype == 'object' or
df[col].dtype.name == 'category' or
(df[col].nunique() < 20 and df[col].dtype in ['int64', 'int32']))):
categorical_features.append(col)
# Crypto-specific features
crypto_specific_features = []
crypto_keywords = ['dominance', 'rank']
for col in df.columns:
if any(keyword in col.lower() for keyword in crypto_keywords):
if col not in id_features + timestamp_features + binary_features + categorical_features:
crypto_specific_features.append(col)
# Price/volume/market features
price_volume_features = []
for col in df.columns:
if any(keyword in col.lower() for keyword in ['price', 'volume', 'marketcap', 'open']):
if col not in id_features + timestamp_features + binary_features + categorical_features + crypto_specific_features:
price_volume_features.append(col)
# Exchange price features
exchange_features = []
for col in df.columns:
if col.startswith('exchangePrices.'):
exchange_features.append(col)
# Performance features
performance_features = []
for col in df.columns:
if col.startswith('performance.'):
performance_features.append(col)
# Rank difference features
rank_diff_features = []
for col in df.columns:
if col.startswith('rankDiffs.'):
rank_diff_features.append(col)
# Technical indicator features
technical_features = []
tech_keywords = ['rsi', 'macd', 'ema', 'sma', 'bb_', 'cci', 'mfi', 'atr', 'stoch', 'roc']
for col in df.columns:
if any(keyword in col.lower() for keyword in tech_keywords):
if col not in (id_features + timestamp_features + binary_features + categorical_features +
crypto_specific_features + price_volume_features + exchange_features +
performance_features + rank_diff_features):
technical_features.append(col)
# Social sentiment features
social_features = []
for col in df.columns:
if any(keyword in col.lower() for keyword in ['social', 'sentiment', 'confidence', 'pos', 'neg', 'neu']):
if col not in (id_features + timestamp_features + binary_features + categorical_features +
crypto_specific_features + price_volume_features + exchange_features +
performance_features + rank_diff_features + technical_features):
social_features.append(col)
# Transaction/blockchain features
transaction_features = []
for col in df.columns:
if any(keyword in col.lower() for keyword in ['transaction', 'tx_', 'gas', 'fees']):
if col not in (id_features + timestamp_features + binary_features + categorical_features +
crypto_specific_features + price_volume_features + exchange_features +
performance_features + rank_diff_features + technical_features + social_features):
transaction_features.append(col)
# Data quality features
quality_features = []
for col in df.columns:
if any(keyword in col.lower() for keyword in ['completeness', 'quality', 'correlation']):
if col not in (id_features + timestamp_features + binary_features + categorical_features +
crypto_specific_features + price_volume_features + exchange_features +
performance_features + rank_diff_features + technical_features +
social_features + transaction_features):
quality_features.append(col)
# Remaining numerical features
numerical_features = []
all_categorized = (id_features + timestamp_features + binary_features + categorical_features +
crypto_specific_features + price_volume_features + exchange_features +
performance_features + rank_diff_features + technical_features +
social_features + transaction_features + quality_features)
for col in df.columns:
if (col not in all_categorized and
pd.api.types.is_numeric_dtype(df[col])):
numerical_features.append(col)
return {
'id_features': id_features,
'timestamp_features': timestamp_features,
'binary_features': binary_features,
'categorical_features': categorical_features,
'crypto_specific_features': crypto_specific_features,
'price_volume_features': price_volume_features,
'exchange_features': exchange_features,
'performance_features': performance_features,
'rank_diff_features': rank_diff_features,
'technical_features': technical_features,
'social_features': social_features,
'transaction_features': transaction_features,
'quality_features': quality_features,
'numerical_features': numerical_features
}
def _engineer_crypto_features(self, df, normalized_df):
"""Create crypto-specific engineered features"""
if not self.feature_engineering:
return normalized_df
# Exchange price spread analysis
exchange_cols = [col for col in df.columns if col.startswith('exchangePrices.')]
if len(exchange_cols) > 1:
exchange_prices = df[exchange_cols].replace([np.inf, -np.inf], np.nan)
if not exchange_prices.empty and exchange_prices.notna().any().any():
price_mean = exchange_prices.mean(axis=1)
price_max = exchange_prices.max(axis=1)
price_min = exchange_prices.min(axis=1)
price_std = exchange_prices.std(axis=1)
# Only calculate if we have valid data
valid_mask = (price_mean > 0) & price_mean.notna()
if valid_mask.any():
normalized_df['exchange_price_spread'] = ((price_max - price_min) / price_mean).fillna(0)
normalized_df['exchange_price_std'] = (price_std / price_mean).fillna(0)
# Performance momentum
perf_short_cols = [col for col in df.columns if col.startswith('performance.') and any(timeframe in col for timeframe in ['min1', 'min5', 'min15', 'hour'])]
perf_long_cols = [col for col in df.columns if col.startswith('performance.') and any(timeframe in col for timeframe in ['day', 'week', 'month'])]
if perf_short_cols:
short_perf = df[perf_short_cols].replace([np.inf, -np.inf], np.nan)
normalized_df['short_term_momentum'] = short_perf.mean(axis=1).fillna(0)
if perf_long_cols:
long_perf = df[perf_long_cols].replace([np.inf, -np.inf], np.nan)
normalized_df['long_term_momentum'] = long_perf.mean(axis=1).fillna(0)
# Rank stability
rank_diff_cols = [col for col in df.columns if col.startswith('rankDiffs.')]
if rank_diff_cols:
rank_diffs = df[rank_diff_cols].replace([np.inf, -np.inf], np.nan).fillna(0)
normalized_df['rank_stability'] = 1 / (1 + rank_diffs.abs().sum(axis=1) + 1e-8) # Add small epsilon to avoid division by zero
# Social sentiment aggregation
social_sentiment_cols = [col for col in df.columns if 'social_sentiment' in col.lower()]
if social_sentiment_cols:
social_data = df[social_sentiment_cols].replace([np.inf, -np.inf], np.nan)
normalized_df['avg_social_sentiment'] = social_data.mean(axis=1).fillna(0.5) # Neutral sentiment
# Technical strength (similar to stocks but crypto-focused)
tech_cols = [col for col in df.columns if any(tech in col.lower() for tech in ['rsi', 'macd', 'cci'])]
if tech_cols:
tech_data = df[tech_cols].replace([np.inf, -np.inf], np.nan)
normalized_df['technical_strength'] = tech_data.mean(axis=1).fillna(0)
# Volume-price relationship
if 'volume' in df.columns and 'price' in df.columns:
volume = df['volume'].replace([np.inf, -np.inf], np.nan)
price = df['price'].replace([np.inf, -np.inf], np.nan)
valid_mask = (price > 0) & price.notna() & volume.notna()
if valid_mask.any():
ratio = volume / price
normalized_df['volume_price_ratio'] = ratio.fillna(0)
# Market dominance relative to rank
if 'dominance' in df.columns and 'rank' in df.columns:
dominance = df['dominance'].replace([np.inf, -np.inf], np.nan).fillna(0)
rank = df['rank'].replace([np.inf, -np.inf], np.nan).fillna(1000) # High rank for unknown
# Avoid division by zero
rank_reciprocal = 1 / (rank + 1e-8)
normalized_df['dominance_rank_ratio'] = (dominance / rank_reciprocal).fillna(0)
return normalized_df
def fit(self, df):
"""Fit the normalizer on training data with crypto-specific preprocessing"""
if isinstance(df, dict):
df = pd.DataFrame([df])
self.feature_info = self._categorize_features(df)
# Fit scalers for different feature types
feature_types = {
'crypto_specific_features': RobustScaler(), # Rank and dominance can have outliers
'price_volume_features': RobustScaler(), # Price and volume data often has outliers
'exchange_features': StandardScaler(), # Exchange prices should be similar
'performance_features': StandardScaler(), # Performance percentages
'rank_diff_features': StandardScaler(), # Rank differences are usually small integers
'technical_features': StandardScaler(), # Technical indicators are usually normalized
'social_features': StandardScaler(), # Sentiment scores
'transaction_features': PowerTransformer(), # Transaction data can be very skewed
'quality_features': MinMaxScaler(), # Quality scores are usually 0-1
'numerical_features': PowerTransformer() # General numerical features
}
for feature_type, scaler in feature_types.items():
features = self.feature_info[feature_type]
if features:
# Filter existing columns
existing_features = [col for col in features if col in df.columns]
if existing_features:
# Handle outliers if enabled
if self.handle_outliers and feature_type in ['crypto_specific_features', 'price_volume_features']:
df_clean = df.copy()
for col in existing_features:
df_clean[col] = self._handle_outliers(df_clean, col)
else:
df_clean = df.copy()
# Comprehensive data cleaning for fitting
try:
# Replace inf/-inf with NaN
df_clean[existing_features] = df_clean[existing_features].replace([np.inf, -np.inf], np.nan)
# Fill NaN with appropriate strategy based on feature type
if feature_type in ['crypto_specific_features', 'price_volume_features']:
# For price/volume data, use forward fill then median
for col in existing_features:
df_clean[col] = df_clean[col].fillna(method='ffill').fillna(df_clean[col].median()).fillna(0)
elif feature_type in ['performance_features', 'rank_diff_features']:
# Performance and rank diffs can be 0 when no change
df_clean[existing_features] = df_clean[existing_features].fillna(0)
elif feature_type == 'quality_features':
# Quality features should default to reasonable values
df_clean[existing_features] = df_clean[existing_features].fillna(0.5)
else:
# General strategy: median then 0
for col in existing_features:
df_clean[col] = df_clean[col].fillna(df_clean[col].median()).fillna(0)
# Ensure no infinite values remain
df_clean[existing_features] = df_clean[existing_features].replace([np.inf, -np.inf], 0)
# Fit the scaler
scaler.fit(df_clean[existing_features])
self.scalers[feature_type] = scaler
self.feature_info[f'{feature_type}_existing'] = existing_features
except Exception as e:
print(f"Warning: Could not fit scaler for {feature_type}: {e}")
# Skip this feature type if fitting fails
continue
# Fit encoders for categorical features
for col in self.feature_info['categorical_features']:
if col in df.columns:
self.encoders[col] = LabelEncoder()
self.encoders[col].fit(df[col].astype(str).fillna('unknown'))
self.is_fitted = True
return self
def transform(self, data):
"""Transform data using fitted normalizers with crypto-specific handling"""
if not self.is_fitted:
raise ValueError("Normalizer must be fitted before transform")
if isinstance(data, dict):
df = pd.DataFrame([data])
else:
df = data.copy()
normalized_df = pd.DataFrame(index=df.index)
# 1. Preserve symbol if requested
if self.preserve_symbol and 'symbol' in df.columns:
normalized_df['symbol'] = df['symbol']
# 2. Enhanced timestamp features
for col in self.feature_info['timestamp_features']:
if col in df.columns:
ts = pd.to_datetime(df[col], unit='ms', errors='coerce')
# Crypto markets are 24/7, so different time features
normalized_df[f'{col}_hour'] = ts.dt.hour / 23.0
normalized_df[f'{col}_day_of_week'] = ts.dt.dayofweek / 6.0
normalized_df[f'{col}_month'] = (ts.dt.month - 1) / 11.0
normalized_df[f'{col}_quarter'] = (ts.dt.quarter - 1) / 3.0
normalized_df[f'{col}_is_weekend'] = (ts.dt.dayofweek >= 5).astype(int)
# For crypto, we might want to track different time patterns
normalized_df[f'{col}_is_asian_hours'] = ((ts.dt.hour >= 0) & (ts.dt.hour <= 8)).astype(int)
normalized_df[f'{col}_is_european_hours'] = ((ts.dt.hour >= 8) & (ts.dt.hour <= 16)).astype(int)
normalized_df[f'{col}_is_american_hours'] = ((ts.dt.hour >= 16) & (ts.dt.hour <= 24)).astype(int)
# 3. Binary features (keep as is, fill NaN with 0)
for col in self.feature_info['binary_features']:
if col in df.columns:
normalized_df[col] = df[col].fillna(0).astype(int)
# 4. Categorical features with better encoding
for col in self.feature_info['categorical_features']:
if col in df.columns and col in self.encoders:
try:
# Handle unknown categories
values = df[col].astype(str).fillna('unknown')
encoded_values = []
for val in values:
try:
encoded_values.append(self.encoders[col].transform([val])[0])
except ValueError:
# Unknown category, assign most frequent class
encoded_values.append(0)
normalized_df[f'{col}_encoded'] = encoded_values
except Exception:
normalized_df[f'{col}_encoded'] = 0
# 5. Scale different feature types with appropriate scalers
feature_types = ['crypto_specific_features', 'price_volume_features', 'exchange_features',
'performance_features', 'rank_diff_features', 'technical_features',
'social_features', 'transaction_features', 'quality_features', 'numerical_features']
for feature_type in feature_types:
if feature_type in self.scalers:
existing_features = self.feature_info.get(f'{feature_type}_existing', [])
available_features = [col for col in existing_features if col in df.columns]
if available_features:
try:
# Handle outliers if enabled
if (self.handle_outliers and
feature_type in ['crypto_specific_features', 'price_volume_features']):
df_clean = df.copy()
for col in available_features:
if col in self.outlier_bounds:
lower_bound, upper_bound = self.outlier_bounds[col]
df_clean[col] = df_clean[col].clip(lower_bound, upper_bound)
else:
df_clean = df.copy()
# Comprehensive data cleaning for transform
# Replace inf/-inf with NaN
df_clean[available_features] = df_clean[available_features].replace([np.inf, -np.inf], np.nan)
# Fill NaN with appropriate strategy based on feature type
if feature_type in ['crypto_specific_features', 'price_volume_features']:
# For price/volume data, use forward fill then median from training
for col in available_features:
df_clean[col] = df_clean[col].fillna(method='ffill').fillna(method='bfill').fillna(0)
elif feature_type in ['performance_features', 'rank_diff_features']:
# Performance and rank diffs can be 0 when no change
df_clean[available_features] = df_clean[available_features].fillna(0)
elif feature_type == 'quality_features':
# Quality features should default to reasonable values
df_clean[available_features] = df_clean[available_features].fillna(0.5)
else:
# General strategy: 0 (since we don't have training medians in transform)
df_clean[available_features] = df_clean[available_features].fillna(0)
# Ensure no infinite values remain
df_clean[available_features] = df_clean[available_features].replace([np.inf, -np.inf], 0)
# Transform the data
scaled_data = self.scalers[feature_type].transform(df_clean[available_features])
# Add scaled features with descriptive names
scaler_name = type(self.scalers[feature_type]).__name__.lower().replace('scaler', '').replace('transformer', '')
for i, col in enumerate(available_features):
normalized_df[f'{col}_{scaler_name}_scaled'] = scaled_data[:, i]
except Exception as e:
print(f"Warning: Could not transform {feature_type}: {e}")
# If transformation fails, add original features with minimal processing
for col in available_features:
if col in df.columns:
clean_col = df[col].replace([np.inf, -np.inf], np.nan).fillna(0)
normalized_df[f'{col}_raw'] = clean_col
# 6. Crypto-specific feature engineering
normalized_df = self._engineer_crypto_features(df, normalized_df)
# 7. Final comprehensive cleanup of any remaining issues
# Replace any infinite values that might have been created
normalized_df = normalized_df.replace([np.inf, -np.inf], np.nan)
# Fill remaining NaN values with appropriate defaults
for col in normalized_df.columns:
if normalized_df[col].isna().any():
if col == 'symbol':
continue # Don't fill symbol
elif 'sentiment' in col.lower():
normalized_df[col] = normalized_df[col].fillna(0.5) # Neutral sentiment
elif 'ratio' in col.lower() or 'momentum' in col.lower():
normalized_df[col] = normalized_df[col].fillna(0) # No change/neutral
elif 'hour' in col or 'day_of_week' in col or 'month' in col or 'quarter' in col:
normalized_df[col] = normalized_df[col].fillna(0) # Time features
elif col.endswith('_encoded'):
normalized_df[col] = normalized_df[col].fillna(0) # Encoded categories
else:
normalized_df[col] = normalized_df[col].fillna(0) # General fallback
# Final validation - ensure no NaN or infinite values remain
try:
assert not normalized_df.isnull().any().any(), "Still contains NaN values after cleanup"
assert not np.isinf(normalized_df.select_dtypes(include=[np.number])).any().any(), "Still contains infinite values after cleanup"
except AssertionError as e:
print(f"Warning: {e}")
# Emergency cleanup
normalized_df = normalized_df.fillna(0).replace([np.inf, -np.inf], 0)
return normalized_df
def fit_transform(self, data):
"""Fit and transform in one step"""
return self.fit(data).transform(data)
def get_feature_importance_info(self):
"""Return information about feature categories for model interpretation"""
return {
'feature_categories': self.feature_info,
'scalers_used': {k: type(v).__name__ for k, v in self.scalers.items()},
'total_features': sum(len(features) for features in self.feature_info.values() if isinstance(features, list))
}
def save(self, filepath):
"""Save the fitted normalizer"""
with open(filepath, 'wb') as f:
pickle.dump({
'scalers': self.scalers,
'encoders': self.encoders,
'feature_info': self.feature_info,
'is_fitted': self.is_fitted,
'preserve_symbol': self.preserve_symbol,
'handle_outliers': self.handle_outliers,
'feature_engineering': self.feature_engineering,
'outlier_bounds': self.outlier_bounds
}, f)
def load(self, filepath):
"""Load a fitted normalizer"""
with open(filepath, 'rb') as f:
data = pickle.load(f)
self.scalers = data['scalers']
self.encoders = data['encoders']
self.feature_info = data['feature_info']
self.is_fitted = data['is_fitted']
self.preserve_symbol = data.get('preserve_symbol', True)
self.handle_outliers = data.get('handle_outliers', True)
self.feature_engineering = data.get('feature_engineering', True)
self.outlier_bounds = data.get('outlier_bounds', {})
return self
def normalize_crypto_data_file(input_file, output_file, save_normalizer=True, **kwargs):
"""
Enhanced normalization function for crypto data
"""
# Load data
if input_file.endswith('.parquet'):
df = pd.read_parquet(input_file)
print(f"Loaded {len(df)} records with {len(df.columns)} features from parquet")
else:
data = []
with open(input_file, 'r') as f:
for line in f:
data.append(json.loads(line.strip()))
df = pd.DataFrame(data)
print(f"Loaded {len(df)} records with {len(df.columns)} features from jsonl")
# Initialize crypto normalizer
normalizer = CryptoDataNormalizer(**kwargs)
# Show feature categorization
feature_info = normalizer._categorize_features(df)
print("\nCrypto Feature Categorization:")
for category, features in feature_info.items():
if features:
print(f" {category}: {len(features)} features")
# Fit and transform
normalized_df = normalizer.fit_transform(df)
print(f"\nNormalized to {len(normalized_df.columns)} features")
print(f"Data shape: {normalized_df.shape}")
# Show feature importance info
importance_info = normalizer.get_feature_importance_info()
print(f"\nScalers used: {importance_info['scalers_used']}")
# Ensure output directory exists
import os
output_dir = os.path.dirname(output_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
# Save normalized data as pickle instead of CSV
pkl_output_file = output_file.replace('.csv', '.pkl')
normalized_df.to_pickle(pkl_output_file)
print(f"Saved normalized data to {pkl_output_file}")
# Save normalizer
if save_normalizer:
normalizer_file = output_file.replace('.csv', '_crypto_normalizer.pkl')
normalizer.save(normalizer_file)
print(f"Saved normalizer to {normalizer_file}")
return normalized_df, normalizer
# CLI function
import argparse
def main():
parser = argparse.ArgumentParser(
description="Enhanced normalization for cryptocurrency features with crypto-specific handling"
)
parser.add_argument('input', nargs='?', default='data/merged/features/crypto_features.parquet',
help='Input file (.parquet or .jsonl)')
parser.add_argument('output', nargs='?', default='data/merged/features/norm/crypto_features_normalized.pkl',
help='Output PKL file for normalized features')
parser.add_argument('--no-save-normalizer', action='store_true',
help='Do not save the normalizer pickle')
parser.add_argument('--no-preserve-symbol', action='store_true',
help='Do not preserve symbol column')
parser.add_argument('--no-handle-outliers', action='store_true',
help='Do not handle outliers')
parser.add_argument('--no-feature-engineering', action='store_true',
help='Do not create engineered features')
parser.add_argument('--train', action='store_true',
help='Normalize the train file and save under train/norm/')
args = parser.parse_args()
kwargs = {
'preserve_symbol': not args.no_preserve_symbol,
'handle_outliers': not args.no_handle_outliers,
'feature_engineering': not args.no_feature_engineering
}
if args.train:
train_input = 'data/merged/train/crypto_features_train.parquet'
train_norm_dir = 'data/merged/train/norm'
os.makedirs(train_norm_dir, exist_ok=True)
train_output = os.path.join(train_norm_dir, 'crypto_features_train_normalized.pkl')
print(f"[INFO] Normalizing train file: {train_input} -> {train_output}")
normalize_crypto_data_file(
train_input,
train_output,
save_normalizer=not args.no_save_normalizer,
**kwargs
)
else:
print(f"[INFO] Enhanced crypto normalizing: {args.input} -> {args.output}")
print(f"[INFO] Options: {kwargs}")
normalize_crypto_data_file(
args.input,
args.output,
save_normalizer=not args.no_save_normalizer,
**kwargs
)
if __name__ == "__main__":
main()