# crypto_price_prediction.py import os import torch import torch.nn as nn from torch.optim.lr_scheduler import ReduceLROnPlateau import numpy as np import pandas as pd import yfinance as yf import plotly.graph_objects as go from plotly.subplots import make_subplots import gradio as gr from sklearn.preprocessing import MinMaxScaler from datetime import datetime, timedelta import joblib import warnings import ta from tqdm import tqdm warnings.filterwarnings('ignore') class PriceScaler: def __init__(self): self.scaler = MinMaxScaler() def fit_transform(self, data): data_2d = np.array(data).reshape(-1, 1) return self.scaler.fit_transform(data_2d).flatten() def inverse_transform(self, data): data_2d = np.array(data).reshape(-1, 1) return self.scaler.inverse_transform(data_2d).flatten() class CryptoPredictor(nn.Module): def __init__(self, input_dim, hidden_dim=128, num_layers=2, dropout=0.2): super().__init__() self.hidden_dim = hidden_dim self.num_layers = num_layers self.lstm = nn.LSTM( input_dim, hidden_dim, num_layers=num_layers, batch_first=True, dropout=dropout if num_layers > 1 else 0, bidirectional=True ) self.bn = nn.BatchNorm1d(hidden_dim * 2) self.fc = nn.Sequential( nn.Linear(hidden_dim * 2, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, 1) ) self.confidence_fc = nn.Sequential( nn.Linear(hidden_dim * 2, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, 1), nn.Sigmoid() ) def forward(self, x): batch_size = x.size(0) h0 = torch.zeros(self.num_layers * 2, batch_size, self.hidden_dim).to(x.device) c0 = torch.zeros(self.num_layers * 2, batch_size, self.hidden_dim).to(x.device) lstm_out, _ = self.lstm(x, (h0, c0)) last_hidden = lstm_out[:, -1, :] normalized_hidden = self.bn(last_hidden) prediction = self.fc(normalized_hidden) confidence = self.confidence_fc(normalized_hidden) return prediction, confidence class CryptoAnalyzer: def __init__(self, model_dir="models", cache_dir="cache"): self.scaler = MinMaxScaler() self.price_scaler = PriceScaler() self.model_dir = model_dir self.cache_dir = cache_dir os.makedirs(model_dir, exist_ok=True) os.makedirs(cache_dir, exist_ok=True) self.feature_columns = [ 'Open', 'High', 'Low', 'Close', 'Volume', 'Returns', 'Volatility', 'MA5', 'MA20', 'RSI', 'Price_Momentum', 'Volume_Momentum', 'MACD', 'BB_upper', 'BB_lower', 'Stoch_K', 'Stoch_D', 'ADX', 'ATR' ] self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') def get_data(self, symbol, days): end_date = datetime.now() start_date = end_date - timedelta(days=days + 30) df = yf.download(f"{symbol}-USD", start=start_date, end=end_date, progress=False) if df.empty: raise ValueError(f"No data available for {symbol}") df['Returns'] = df['Close'].pct_change() df['Volatility'] = df['Returns'].rolling(window=20).std() df['MA5'] = df['Close'].rolling(window=5).mean() df['MA20'] = df['Close'].rolling(window=20).mean() df['RSI'] = ta.momentum.rsi(df['Close']) df['Price_Momentum'] = ta.momentum.roc(df['Close']) df['Volume_Momentum'] = ta.momentum.roc(df['Volume']) macd = ta.trend.macd(df['Close']) df['MACD'] = macd.iloc[:, 0] bollinger = ta.volatility.BollingerBands(df['Close']) df['BB_upper'] = bollinger.bollinger_hband() df['BB_lower'] = bollinger.bollinger_lband() stoch = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close']) df['Stoch_K'] = stoch.stoch() df['Stoch_D'] = stoch.stoch_signal() df['ADX'] = ta.trend.adx(df['High'], df['Low'], df['Close']) df['ATR'] = ta.volatility.average_true_range(df['High'], df['Low'], df['Close']) df = df.dropna() return df.iloc[-days:] def prepare_data(self, df, lookback): features = df[self.feature_columns].values scaled_features = self.scaler.fit_transform(features) close_prices = df['Close'].values scaled_close = self.price_scaler.fit_transform(close_prices) X, y = [], [] for i in range(len(df) - lookback): X.append(scaled_features[i:(i + lookback)]) y.append(scaled_close[i + lookback]) X = torch.FloatTensor(np.array(X)).to(self.device) y = torch.FloatTensor(np.array(y)).reshape(-1).to(self.device) return X, y def get_model_path(self, symbol): return os.path.join(self.model_dir, f"{symbol.lower()}_model.pth") def get_scaler_path(self, symbol): return os.path.join(self.model_dir, f"{symbol.lower()}_scaler.pkl") def train_model(self, X, y, symbol): model = CryptoPredictor(X.shape[2]).to(self.device) criterion = nn.HuberLoss() optimizer = torch.optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01) scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, verbose=True) batch_size = min(32, len(X) // 4) dataset = torch.utils.data.TensorDataset(X, y) train_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True) best_loss = float('inf') patience = 10 patience_counter = 0 model.train() with tqdm(range(50), desc=f"Training {symbol} model") as pbar: for epoch in pbar: total_loss = 0 for batch_X, batch_y in train_loader: optimizer.zero_grad() predictions, _ = model(batch_X) loss = criterion(predictions, batch_y) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() total_loss += loss.item() avg_loss = total_loss / len(train_loader) scheduler.step(avg_loss) pbar.set_postfix({'loss': f'{avg_loss:.6f}'}) if avg_loss < best_loss: best_loss = avg_loss patience_counter = 0 torch.save(model.state_dict(), self.get_model_path(symbol)) else: patience_counter += 1 if patience_counter >= patience: break return model def get_predictions(self, symbol, days, lookback): try: logging.info("Fetching data...") df = self.get_data(symbol, days) logging.info(f"Data fetched: {len(df)} rows.") logging.info("Preparing data...") X, y = self.prepare_data(df, lookback) logging.info(f"Data prepared. Features shape: {X.shape}, Targets shape: {y.shape}") model_path = self.get_model_path(symbol) if os.path.exists(model_path): logging.info("Loading existing model...") model = CryptoPredictor(X.shape[2]).to(self.device) model.load_state_dict(torch.load(model_path)) else: logging.info("Training new model...") model = self.train_model(X, y, symbol) joblib.dump(self.scaler, self.get_scaler_path(symbol)) model.eval() with torch.no_grad(): logging.info("Generating predictions...") predictions, confidence = model(X) # Log raw predictions shape logging.info(f"Raw predictions shape: {predictions.shape}") # Ensure predictions are 2D for inverse_transform predictions_reshaped = predictions.cpu().numpy().reshape(-1, 1) logging.info(f"Reshaped predictions for inverse transform: {predictions_reshaped.shape}") predictions = self.price_scaler.inverse_transform(predictions_reshaped).flatten() # Ensure actual prices are 2D for inverse_transform y_np_reshaped = y.cpu().numpy().reshape(-1, 1) logging.info(f"Reshaped actual prices for inverse transform: {y_np_reshaped.shape}") actual_prices = self.price_scaler.inverse_transform(y_np_reshaped).flatten() # Calculate metrics rmse = float(np.sqrt(np.mean((actual_prices - predictions) ** 2))) mape = float(np.mean(np.abs((actual_prices - predictions) / actual_prices)) * 100) r2 = float(1 - np.sum((actual_prices - predictions) ** 2) / np.sum((actual_prices - actual_prices.mean()) ** 2)) logging.info("Metrics calculated.") # Prepare date labels dates = df.index[lookback:].strftime('%Y-%m-%d').tolist() return { 'dates': dates, 'actual': actual_prices.tolist(), 'predicted': predictions.tolist(), 'confidence': confidence.cpu().numpy().flatten().tolist(), 'rmse': rmse, 'mape': mape, 'r2': r2, 'volatility': float(df['Volatility'].mean() * 100), 'current_price': float(df['Close'].iloc[-1]), 'volume': float(df['Volume'].iloc[-1]), 'rsi': float(df['RSI'].iloc[-1]), 'macd': float(df['MACD'].iloc[-1]) } except Exception as e: logging.error(f"Error during predictions: {str(e)}") raise ValueError(f"Prediction failed: {str(e)}") def create_analysis_plots(symbol, days=180, lookback=30): try: analyzer = CryptoAnalyzer() predictions = analyzer.get_predictions(symbol, days, lookback) fig = make_subplots( rows=3, cols=1, subplot_titles=( f"{symbol} Price Prediction with Confidence Bands", "Technical Indicators", "Model Performance Metrics" ), vertical_spacing=0.1, specs=[[{"secondary_y": True}], [{"secondary_y": True}], [{"secondary_y": True}]] ) confidence_upper = np.array(predictions['predicted']) * (1 + np.array(predictions['confidence'])) confidence_lower = np.array(predictions['predicted']) * (1 - np.array(predictions['confidence'])) fig.add_trace( go.Scatter( x=predictions['dates'], y=predictions['actual'], name='Actual Price', line=dict(color='blue', width=2) ), row=1, col=1 ) fig.add_trace( go.Scatter( x=predictions['dates'], y=predictions['predicted'], name='Predicted Price', line=dict(color='red', width=2) ), row=1, col=1 ) fig.add_trace( go.Scatter( x=predictions['dates'] + predictions['dates'][::-1], y=list(confidence_upper) + list(confidence_lower)[::-1], fill='toself', fillcolor='rgba(255,0,0,0.1)', line=dict(color='rgba(255,0,0,0)'), name='Confidence Band' ), row=1, col=1 ) fig.update_layout( height=1200, title_text=f"📈 {symbol} Price Analysis Dashboard", showlegend=True, template="plotly_dark", paper_bgcolor='rgba(0,0,0,0)', plot_bgcolor='rgba(0,0,0,0)', font=dict(size=12) ) summary = f""" ### 📊 Analysis Summary for {symbol} #### Current Market Status - **Current Price:** ${predictions['current_price']:,.2f} - **Predicted Next Price:** ${predictions['predicted'][-1]:,.2f} - **Expected Change:** {((predictions['predicted'][-1] - predictions['current_price']) / predictions['current_price'] * 100):,.2f}% - **24h Volume:** {predictions['volume']:,.0f} #### Technical Indicators - **RSI:** {predictions['rsi']:,.2f} - **MACD:** {predictions['macd']:,.2f} - **Volatility:** {predictions['volatility']:,.2f}% #### Model Performance Metrics - **R² Score:** {predictions['r2']:,.4f} - **RMSE:** ${predictions['rmse']:,.2f} - **MAPE:** {predictions['mape']:,.2f}% #### Prediction Confidence - **Average Confidence:** {np.mean(predictions['confidence']) * 100:,.2f}% - **Trend Direction:** {'🔺 Upward' if predictions['predicted'][-1] > predictions['actual'][-1] else '🔻 Downward'} > *Note: Past performance does not guarantee future results. This analysis is for informational purposes only.* """ return fig, summary except Exception as e: fig = go.Figure() fig.add_annotation( text=str(e), xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False ) return fig, f"⚠️ Error: {str(e)}" def create_interface(): with gr.Blocks(theme=gr.themes.Soft()) as iface: gr.Markdown(""" # 🚀 Advanced Cryptocurrency Price Prediction This app uses deep learning to predict cryptocurrency prices and provide comprehensive market analysis. ### Features: - Real-time price predictions - Technical indicators analysis - Confidence metrics - Performance visualization """) with gr.Row(): with gr.Column(scale=1): crypto_input = gr.Dropdown( choices=['BTC', 'ETH', 'BNB', 'XRP', 'ADA', 'SOL', 'DOT', 'DOGE'], label="Select Cryptocurrency", value="BTC" ) custom_crypto = gr.Textbox( label="Or enter custom symbol", placeholder="e.g., MATIC" ) with gr.Row(): days_slider = gr.Slider( minimum=30, maximum=365, value=180, step=30, label="Historical Days" ) lookback_slider = gr.Slider( minimum=7, maximum=60, value=30, step=1, label="Lookback Period (Days)" ) submit_btn = gr.Button("📊 Generate Analysis", variant="primary") with gr.Column(scale=2): plot_output = gr.Plot(label="Analysis Plots") with gr.Row(): analysis_output = gr.Markdown(label="Analysis Summary") error_output = gr.Markdown(visible=False) def handle_analysis(symbol, custom_symbol, days, lookback): try: final_symbol = custom_symbol if custom_symbol else symbol figure, summary = create_analysis_plots(final_symbol, days, lookback) return figure, summary, gr.update(visible=False, value="") except Exception as e: empty_fig = go.Figure() error_msg = f"⚠️ Error during analysis: {str(e)}" return empty_fig, "", gr.update(visible=True, value=error_msg) submit_btn.click( fn=handle_analysis, inputs=[crypto_input, custom_crypto, days_slider, lookback_slider], outputs=[plot_output, analysis_output, error_output] ) return iface if __name__ == "__main__": import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('crypto_predictor.log'), logging.StreamHandler() ] ) try: os.makedirs("models", exist_ok=True) os.makedirs("cache", exist_ok=True) iface = create_interface() iface.launch( share=False, server_name="0.0.0.0", server_port=7860, debug=True ) except Exception as e: logging.error(f"Application failed to start: {str(e)}") raise