Spaces:
Build error
Build error
| import pandas as pd | |
| import numpy as np | |
| from datetime import datetime | |
| from sklearn.preprocessing import MinMaxScaler | |
| import os | |
| class DataHandler: | |
| def __init__(self, data_path='data/Fraud.csv'): | |
| self.data_path = data_path | |
| self.scaler = MinMaxScaler() | |
| def haversine_distance(self, lat1, lon1, lat2, lon2): | |
| """Calculate haversine distance between two points""" | |
| R = 6371 # Earth radius in km | |
| lat1, lon1, lat2, lon2 = map(np.radians, [lat1, lon1, lat2, lon2]) | |
| dlat = lat2 - lat1 | |
| dlon = lon2 - lon1 | |
| a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2 | |
| c = 2 * np.arcsin(np.sqrt(a)) | |
| return R * c | |
| def engineer_features(self, df): | |
| """Apply feature engineering to the dataset""" | |
| print("Starting feature engineering...") | |
| # Convert datetime | |
| df['trans_date_trans_time'] = pd.to_datetime(df['trans_date_trans_time']) | |
| df['dob'] = pd.to_datetime(df['dob']) | |
| # Time features | |
| df['Hour_of_Day'] = df['trans_date_trans_time'].dt.hour | |
| df['Day_of_Week'] = df['trans_date_trans_time'].dt.dayofweek | |
| # Age calculation | |
| df['Age'] = (df['trans_date_trans_time'] - df['dob']).dt.days / 365.25 | |
| # Sort by card and time for velocity features | |
| df = df.sort_values(['cc_num', 'unix_time']).reset_index(drop=True) | |
| # Velocity features - transactions in last 1hr and 24hr (HIGHLY OPTIMIZED) | |
| print("Calculating velocity features (this may take 2-3 minutes)...") | |
| # Use vectorized operations with merge_asof for speed | |
| df['Txns_Last_1Hr'] = 0 | |
| df['Txns_Last_24Hr'] = 0 | |
| # Process in chunks by card to save memory and speed up | |
| velocity_results = [] | |
| for cc_num, group in df.groupby('cc_num'): | |
| group = group.sort_values('unix_time').reset_index(drop=True) | |
| times = group['unix_time'].values | |
| # Vectorized calculation using broadcasting | |
| time_matrix = times[:, np.newaxis] - times[np.newaxis, :] | |
| # Count transactions in windows (only look backwards, hence time_matrix > 0) | |
| count_1hr = np.sum((time_matrix > 0) & (time_matrix <= 3600), axis=1) | |
| count_24hr = np.sum((time_matrix > 0) & (time_matrix <= 86400), axis=1) | |
| group['Txns_Last_1Hr'] = count_1hr | |
| group['Txns_Last_24Hr'] = count_24hr | |
| velocity_results.append(group) | |
| # Combine all results | |
| df = pd.concat(velocity_results, ignore_index=True) | |
| # Sort back by time | |
| df = df.sort_values('unix_time').reset_index(drop=True) | |
| print(f"Velocity features calculated. Sample: 1Hr={df['Txns_Last_1Hr'].mean():.2f}, 24Hr={df['Txns_Last_24Hr'].mean():.2f}") | |
| # Geospatial features - distance between customer and merchant | |
| print("Calculating geospatial features...") | |
| df['Haversine_Distance'] = self.haversine_distance( | |
| df['lat'], df['long'], df['merch_lat'], df['merch_long'] | |
| ) | |
| # Target encoding for merchant and category | |
| print("Applying target encoding...") | |
| df['Merchant_Fraud_Rate'] = df.groupby('merchant')['is_fraud'].transform('mean') | |
| df['Category_Fraud_Rate'] = df.groupby('category')['is_fraud'].transform('mean') | |
| # Select features for modeling | |
| feature_cols = ['amt', 'Age', 'Hour_of_Day', 'Day_of_Week', | |
| 'Txns_Last_1Hr', 'Txns_Last_24Hr', 'Haversine_Distance', | |
| 'Merchant_Fraud_Rate', 'Category_Fraud_Rate', 'city_pop'] | |
| # Scale features | |
| print("Scaling features...") | |
| scaled_features = self.scaler.fit_transform(df[feature_cols]) | |
| scaled_df = pd.DataFrame(scaled_features, | |
| columns=[f'Scaled_{col}' for col in feature_cols]) | |
| # Combine with original | |
| df = pd.concat([df.reset_index(drop=True), scaled_df], axis=1) | |
| print(f"Feature engineering complete. Final shape: {df.shape}") | |
| return df | |
| def load_and_process_data(self): | |
| """Load and process the entire dataset""" | |
| print(f"Loading data from {self.data_path}...") | |
| df = pd.read_csv(self.data_path) | |
| print(f"Loaded {len(df)} rows") | |
| # Engineer features | |
| df = self.engineer_features(df) | |
| # Save processed data | |
| output_path = 'data/processed_data.csv' | |
| print(f"Saving processed data to {output_path}...") | |
| df.to_csv(output_path, index=False) | |
| print("Data processing complete!") | |
| return df | |
| def stream_transactions(self, processed_data, batch_size=100): | |
| """Generator to stream transactions in batches""" | |
| for i in range(0, len(processed_data), batch_size): | |
| yield processed_data.iloc[i:i+batch_size] | |
| if __name__ == "__main__": | |
| # Test the data handler | |
| handler = DataHandler() | |
| df = handler.load_and_process_data() | |
| print("\nFirst few rows of processed data:") | |
| print(df.head()) |