Spaces:
Running
Running
| import yfinance as yf | |
| import pandas as pd | |
| import numpy as np | |
| import torch | |
| from datetime import datetime, timedelta | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| import spaces | |
| import gc | |
| import time | |
| import random | |
| from chronos import ChronosPipeline | |
| from scipy.stats import skew, kurtosis | |
| from typing import Dict, Union, List | |
| # Global variable for model pipeline | |
| pipeline = None | |
| # --- ADVANCED UTILITIES & CONFIG --- | |
| # Sumber data Covariate eksternal | |
| COVARIATE_SOURCES = { | |
| 'market_indices': ['^GSPC', '^DJI', '^IXIC', '^VIX'], | |
| 'commodities': ['GC=F', 'CL=F'], | |
| } | |
| def clear_gpu_memory(): | |
| """Membersihkan cache memori GPU""" | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| def load_pipeline(): | |
| """ | |
| Memuat model Chronos-2 dengan konfigurasi GPU canggih. | |
| Menggunakan device_map="cuda" dan torch_dtype=torch.float16. | |
| """ | |
| global pipeline | |
| try: | |
| model_name = "amazon/chronos-2" | |
| if pipeline is None: | |
| clear_gpu_memory() | |
| print(f"Loading Chronos model: {model_name}...") | |
| # FIX 1: Menyederhanakan argumen untuk menghindari error 'input_patch_size' | |
| pipeline = ChronosPipeline.from_pretrained( | |
| model_name, | |
| device_map="cuda", | |
| torch_dtype=torch.float16, | |
| # Menghapus argumen yang mungkin memicu error konfigurasi | |
| ) | |
| pipeline.model = pipeline.model.eval() | |
| for param in pipeline.model.parameters(): | |
| param.requires_grad = False | |
| print(f"Chronos model {model_name} loaded successfully on CUDA") | |
| return pipeline | |
| except Exception as e: | |
| # Menampilkan error yang lebih spesifik | |
| print(f"Error loading pipeline on CUDA, trying CPU: {str(e)}") | |
| try: | |
| # Fallback ke CPU | |
| pipeline = ChronosPipeline.from_pretrained(model_name, device_map="cpu") | |
| pipeline.model = pipeline.model.eval() | |
| print(f"Chronos model {model_name} loaded successfully on CPU (performance degraded)") | |
| return pipeline | |
| except Exception as cpu_e: | |
| raise RuntimeError(f"Failed to load model {model_name} on both CUDA and CPU: {str(cpu_e)}") | |
| # ... (Fungsi-fungsi lain: retry_yfinance_request, fetch_enhanced_covariates, calculate_advanced_risk_metrics) | |
| def retry_yfinance_request(func, max_retries=3, initial_delay=1): | |
| """Mekanisme retry untuk permintaan yfinance dengan backoff eksponensial.""" | |
| for attempt in range(max_retries): | |
| try: | |
| result = func() | |
| if result is not None and not result.empty: | |
| return result | |
| if attempt == max_retries - 1: | |
| return None | |
| delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0, 1)) | |
| time.sleep(delay) | |
| except Exception: | |
| if attempt == max_retries - 1: | |
| return None | |
| delay = min(8.0, initial_delay * (2 ** attempt) + random.uniform(0, 1)) | |
| time.sleep(delay) | |
| def fetch_enhanced_covariates(data: pd.DataFrame) -> pd.DataFrame: | |
| """Mengambil data covariate (Indeks Pasar) dan menggabungkannya.""" | |
| start_date = data.index.min().strftime('%Y-%m-%d') | |
| end_date = data.index.max().strftime('%Y-%m-%d') | |
| date_range = pd.date_range(start=start_date, end=end_date, freq='D') | |
| # 1. Reindex data asli ke range hari yang kontinu | |
| data_full = data.reindex(date_range) | |
| data_full['Close'] = data_full['Close'].fillna(method='ffill') | |
| data_full['Volume'] = data_full['Volume'].fillna(0) | |
| covariate_df = pd.DataFrame(index=date_range) | |
| # 2. Ambil data dari semua sumber covariate eksternal | |
| for source_key, symbols in COVARIATE_SOURCES.items(): | |
| for symbol in symbols: | |
| def fetch_covariate(): | |
| return yf.download(symbol, start=start_date, end=end_date, interval="1d", progress=False) | |
| cov_data = retry_yfinance_request(fetch_covariate) | |
| if cov_data is not None and not cov_data.empty: | |
| cov_data = cov_data['Close'].rename(f'cov_{symbol.replace("^", "_").replace("=", "_")}') | |
| cov_data = cov_data.reindex(date_range) | |
| covariate_df = covariate_df.merge(cov_data, left_index=True, right_index=True, how='left') | |
| # 3. Gabungkan dan imputasi | |
| final_df = data_full.merge(covariate_df, left_index=True, right_index=True, how='left') | |
| cov_cols = [col for col in final_df.columns if col.startswith('cov_') or col == 'Volume'] | |
| # Imputasi Covariates: Forward fill untuk harga/indeks, 0 untuk Volume | |
| final_df['Volume'] = final_df['Volume'].fillna(0) | |
| final_df[[col for col in cov_cols if col != 'Volume']] = final_df[[col for col in cov_cols if col != 'Volume']].fillna(method='ffill') | |
| final_df = final_df.dropna(subset=['Close'], how='all') | |
| # Ganti nama kolom sesuai format Chronos | |
| return final_df.rename(columns={'Close': 'target', 'Volume': 'cov_volume'}) | |
| def calculate_advanced_risk_metrics(df: pd.DataFrame, risk_free_rate: float = 0.05) -> Dict[str, Union[float, str]]: | |
| """Menghitung metrik risiko dan performa lanjutan (Sharpe Ratio, VaR, CVaR, Max Drawdown).""" | |
| if df.empty or 'Close' not in df.columns: | |
| return {"error": "Data historis tidak valid untuk perhitungan risiko."} | |
| try: | |
| df['Returns'] = df['Close'].pct_change() | |
| returns = df['Returns'].dropna() | |
| if returns.empty: | |
| return {"error": "Return historis tidak tersedia."} | |
| days_per_year = 252 | |
| annual_return = returns.mean() * days_per_year | |
| annual_vol = returns.std() * np.sqrt(days_per_year) | |
| sharpe_ratio = (annual_return - risk_free_rate) / annual_vol if annual_vol != 0 else 0 | |
| var_95 = np.percentile(returns, 5) * -1 | |
| cvar_95 = returns[returns < -var_95].mean() * -1 | |
| cumulative_returns = (1 + returns).cumprod() | |
| peak = cumulative_returns.expanding(min_periods=1).max() | |
| drawdown = (cumulative_returns / peak) - 1 | |
| max_drawdown = drawdown.min() | |
| skewness = skew(returns) | |
| kurtosis_val = kurtosis(returns) | |
| return { | |
| "Annual_Return": f"{annual_return*100:.2f}%", | |
| "Annual_Volatility": f"{annual_vol*100:.2f}%", | |
| "Sharpe_Ratio": f"{sharpe_ratio:.2f}", | |
| "Max_Drawdown": f"{max_drawdown*100:.2f}%", | |
| "VaR_95_Daily_Loss": f"{var_95*100:.2f}%", | |
| "CVaR_95_Avg_Loss": f"{cvar_95*100:.2f}%", | |
| "Skewness": f"{skewness:.2f}", | |
| "Kurtosis": f"{kurtosis_val:.2f}", | |
| } | |
| except Exception as e: | |
| return {"error": f"Risk calculation failed: {str(e)}"} | |
| def predict_technical_indicators_future(data: pd.DataFrame, price_prediction: np.ndarray) -> Dict[str, np.ndarray]: | |
| """Memprediksi MACD dan Bollinger Bands di masa depan berdasarkan prediksi harga.""" | |
| predictions = {} | |
| # Pastikan price_prediction tidak kosong sebelum diolah | |
| if price_prediction.size == 0: | |
| return {"MACD_Future": np.array([]), "MACD_Signal_Future": np.array([]), "BB_Upper_Future": np.array([]), "BB_Lower_Future": np.array([])} | |
| full_price_series = np.concatenate([data['Close'].values, price_prediction]) | |
| full_price_series = pd.Series(full_price_series) | |
| # MACD dan Signal Line Future | |
| def calculate_ema(prices, span): | |
| return prices.ewm(span=span, adjust=False).mean() | |
| ema_12_full = calculate_ema(full_price_series, 12) | |
| ema_26_full = calculate_ema(full_price_series, 26) | |
| macd_full = ema_12_full - ema_26_full | |
| macd_signal_full = calculate_ema(macd_full, 9) | |
| predictions['MACD_Future'] = macd_full.iloc[-len(price_prediction):].values | |
| predictions['MACD_Signal_Future'] = macd_signal_full.iloc[-len(price_prediction):].values | |
| # Bollinger Bands Future | |
| period = 20 | |
| std_dev = 2 | |
| middle_band_full = full_price_series.rolling(window=period).mean() | |
| std_full = full_price_series.rolling(window=period).std() | |
| upper_band_full = middle_band_full + (std_full * std_dev) | |
| lower_band_full = middle_band_full - (std_full * std_dev) | |
| predictions['BB_Upper_Future'] = upper_band_full.iloc[-len(price_prediction):].values | |
| predictions['BB_Lower_Future'] = lower_band_full.iloc[-len(price_prediction):].values | |
| return predictions | |
| def predict_prices(data, prediction_days=30): | |
| """Fungsi prediksi utama menggunakan Chronos-2 dengan enhanced covariates.""" | |
| # Default return structure for errors (Menggunakan np.array([]) yang aman) | |
| empty_result = { | |
| 'values': np.array([]), 'dates': pd.Series([], dtype='datetime64[ns]'), | |
| 'high_30d': 0, 'low_30d': 0, 'mean_30d': 0, 'change_pct': 0, | |
| 'q01': np.array([]), 'q09': np.array([]), | |
| 'future_macd': np.array([]), 'future_macd_signal': np.array([]), | |
| 'future_bb_upper': np.array([]), 'future_bb_lower': np.array([]), | |
| 'summary': 'Prediction failed due to model or data error.' | |
| } | |
| try: | |
| # 1. Load Model (Akan memanggil load_pipeline yang sudah diperbaiki) | |
| pipeline = load_pipeline() | |
| data_original = data.copy() | |
| # 2. Enhanced Data Preprocessing & Covariate | |
| data_enhanced = fetch_enhanced_covariates(data_original) | |
| context_df = data_enhanced.reset_index() | |
| context_df.columns = ['timestamp'] + [col for col in context_df.columns[1:]] | |
| context_df['id'] = 'stock_price' | |
| all_covariates = [col for col in context_df.columns if col not in ['timestamp', 'id', 'target']] | |
| # 3. Model Prediction | |
| with torch.no_grad(): | |
| pred_df = pipeline.predict_df( | |
| context_df, | |
| prediction_length=prediction_days, | |
| id_column="id", | |
| timestamp_column="timestamp", | |
| target="target", | |
| covariates=all_covariates, | |
| quantile_levels=[0.1, 0.5, 0.9] | |
| ) | |
| required_cols = ['target_0.1', 'target_0.5', 'target_0.9'] | |
| if pred_df.empty or not all(col in pred_df.columns for col in required_cols): | |
| missing = [col for col in required_cols if col not in pred_df.columns] | |
| raise RuntimeError(f"Prediction output incomplete. Missing: {missing}") | |
| q05_forecast = pred_df['target_0.5'].values.astype(np.float32) | |
| q09_forecast = pred_df['target_0.9'].values.astype(np.float32) | |
| q01_forecast = pred_df['target_0.1'].values.astype(np.float32) | |
| predicted_dates = pred_df['timestamp'] | |
| last_price = data_original['Close'].iloc[-1] | |
| # Proyeksi Indikator Teknikal Masa Depan | |
| future_indicators = predict_technical_indicators_future(data_original, q05_forecast) | |
| predicted_high = float(np.max(q05_forecast)) | |
| predicted_low = float(np.min(q05_forecast)) | |
| predicted_mean = float(np.mean(q05_forecast)) | |
| change_pct = ((predicted_mean - last_price) / last_price) * 100 if last_price != 0 else 0 | |
| # Menambahkan data teknikal prediksi ke hasil | |
| return { | |
| 'values': q05_forecast, | |
| 'dates': predicted_dates, | |
| 'high_30d': predicted_high, | |
| 'low_30d': predicted_low, | |
| 'mean_30d': predicted_mean, | |
| 'change_pct': change_pct, | |
| 'q01': q01_forecast, | |
| 'q09': q09_forecast, | |
| 'future_macd': future_indicators.get('MACD_Future', np.array([])), | |
| 'future_macd_signal': future_indicators.get('MACD_Signal_Future', np.array([])), | |
| 'future_bb_upper': future_indicators.get('BB_Upper_Future', np.array([])), | |
| 'future_bb_lower': future_indicators.get('BB_Lower_Future', np.array([])), | |
| 'summary': f"AI Model: Amazon Chronos-2 (Enhanced Covariates: {len(all_covariates)} features)\nExpected High: {predicted_high:.2f}\nExpected Low: {predicted_low:.2f}\nExpected Change: {change_pct:.2f}%" | |
| } | |
| except Exception as e: | |
| error_message = f'Model prediction failed: {e}' | |
| print(f"Error in prediction: {e}") | |
| empty_result['summary'] = error_message | |
| return empty_result | |
| # Memperbarui fungsi create_prediction_chart untuk menampilkan Quantile Bands (q01, q09) dan Future BB | |
| def create_prediction_chart(data, predictions): | |
| # Cek yang lebih aman untuk array kosong | |
| if not predictions['values'].size or not predictions['q01'].size: | |
| return go.Figure().update_layout(title="Prediction Failed: No Data Available") | |
| fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.05, | |
| row_heights=[0.7, 0.3], subplot_titles=('Price Forecast & Confidence Band', 'MACD Forecast')) | |
| # 1. Price Forecast (Row 1) | |
| fig.add_trace(go.Scatter(x=data.index, y=data['Close'].values, name='Historical Price', line=dict(color='blue', width=2)), row=1, col=1) | |
| # Upper/Lower Quantile Band (Confidence) | |
| fig.add_trace(go.Scatter(x=predictions['dates'], y=predictions['q09'], name='90% Upper Bound (Q0.9)', line=dict(color='lightcoral', width=0)), row=1, col=1) | |
| fig.add_trace(go.Scatter( | |
| x=predictions['dates'], y=predictions['q01'], name='90% Confidence Band', | |
| line=dict(color='lightcoral', width=0), fill='tonexty', fillcolor='rgba(255,182,193,0.3)' | |
| ), row=1, col=1) | |
| fig.add_trace(go.Scatter(x=predictions['dates'], y=predictions['values'], name='Median Forecast (Q0.5)', line=dict(color='red', width=3, dash='solid')), row=1, col=1) | |
| # Future Bollinger Bands | |
| if predictions['future_bb_upper'].size == predictions['dates'].size: | |
| fig.add_trace(go.Scatter(x=predictions['dates'], y=predictions['future_bb_upper'], name='BB Upper (Future)', line=dict(color='green', width=1, dash='dot')), row=1, col=1) | |
| fig.add_trace(go.Scatter(x=predictions['dates'], y=predictions['future_bb_lower'], name='BB Lower (Future)', line=dict(color='green', width=1, dash='dot')), row=1, col=1) | |
| last_hist_date = data.index[-1] | |
| last_hist_price = data['Close'].iloc[-1] | |
| fig.add_trace(go.Scatter(x=[last_hist_date], y=[last_hist_price], mode='markers', marker=dict(size=10, color='blue', symbol='circle'), name='Last Known Price'), row=1, col=1) | |
| # 2. MACD Forecast (Row 2) | |
| if predictions['future_macd'].size == predictions['dates'].size: | |
| # Perluas data historis MACD untuk charting yang lebih baik | |
| lookback_period = 60 | |
| macd_hist = data['Close'].ewm(span=12).mean() - data['Close'].ewm(span=26).mean() | |
| macd_signal_hist = macd_hist.ewm(span=9).mean() | |
| macd_full = np.concatenate([macd_hist.iloc[-lookback_period:].values, predictions['future_macd']]) | |
| macd_signal_full = np.concatenate([macd_signal_hist.iloc[-lookback_period:].values, predictions['future_macd_signal']]) | |
| macd_dates_full = pd.to_datetime(np.concatenate([data.index[-lookback_period:].values, predictions['dates']])) | |
| fig.add_trace(go.Scatter(x=macd_dates_full, y=macd_full, name='MACD Line', line=dict(color='blue', width=2)), row=2, col=1) | |
| fig.add_trace(go.Scatter(x=macd_dates_full, y=macd_signal_full, name='Signal Line', line=dict(color='red', width=1)), row=2, col=1) | |
| fig.add_vline(x=data.index[-1], line_width=1, line_dash="dash", line_color="gray", row=2, col=1) | |
| fig.add_vline(x=data.index[-1], line_width=1, line_dash="dash", line_color="gray", row=1, col=1) | |
| fig.update_layout( | |
| title=f'Advanced Price & Technical Forecast - Next {len(predictions["dates"])} Days (Chronos-2)', | |
| xaxis_title='Date', yaxis_title='Price (IDR)', hovermode='x unified', height=900, | |
| legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01) | |
| ) | |
| fig.update_yaxes(title_text="Price (IDR)", row=1, col=1) | |
| fig.update_yaxes(title_text="MACD Value", row=2, col=1) | |
| return fig | |
| # ... (Fungsi-fungsi lama lainnya seperti get_indonesian_stocks, calculate_technical_indicators, dll. tetap sama) | |
| def get_indonesian_stocks(): | |
| return { | |
| "BBCA.JK": "Bank Central Asia", "BBRI.JK": "Bank BRI", "BBNI.JK": "Bank BNI", | |
| "BMRI.JK": "Bank Mandiri", "TLKM.JK": "Telkom Indonesia", "UNVR.JK": "Unilever Indonesia", | |
| "ASII.JK": "Astra International", "INDF.JK": "Indofood Sukses Makmur", "KLBF.JK": "Kalbe Farma", | |
| "HMSP.JK": "HM Sampoerna", "GGRM.JK": "Gudang Garam", "ADRO.JK": "Adaro Energy", | |
| "PGAS.JK": "Perusahaan Gas Negara", "JSMR.JK": "Jasa Marga", "WIKA.JK": "Wijaya Karya", | |
| "PTBA.JK": "Tambang Batubara Bukit Asam", "ANTM.JK": "Aneka Tambang", "SMGR.JK": "Semen Indonesia", | |
| "INTP.JK": "Indocement Tunggal Prakasa", "ITMG.JK": "Indo Tambangraya Megah" | |
| } | |
| def calculate_technical_indicators(data): | |
| indicators = {} | |
| def calculate_rsi(prices, period=14): | |
| delta = prices.diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() | |
| rs = gain / loss | |
| rsi = 100 - (100 / (1 + rs)) | |
| return rsi | |
| rsi_series = calculate_rsi(data['Close']) | |
| indicators['rsi'] = {'current': rsi_series.iloc[-1], 'values': rsi_series} | |
| def calculate_macd(prices, fast=12, slow=26, signal=9): | |
| exp1 = prices.ewm(span=fast).mean() | |
| exp2 = prices.ewm(span=slow).mean() | |
| macd = exp1 - exp2 | |
| signal_line = macd.ewm(span=signal).mean() | |
| histogram = macd - signal_line | |
| return macd, signal_line, histogram | |
| macd, signal_line, histogram = calculate_macd(data['Close']) | |
| indicators['macd'] = {'macd': macd.iloc[-1], 'signal': signal_line.iloc[-1], 'histogram': histogram.iloc[-1], 'signal_text': 'BUY' if histogram.iloc[-1] > 0 else 'SELL', 'macd_values': macd, 'signal_values': signal_line} | |
| def calculate_bollinger_bands(prices, period=20, std_dev=2): | |
| sma = prices.rolling(window=period).mean() | |
| std = prices.rolling(window=period).std() | |
| upper_band = sma + (std * std_dev) | |
| lower_band = sma - (std * std_dev) | |
| return upper_band, sma, lower_band | |
| upper, middle, lower = calculate_bollinger_bands(data['Close']) | |
| current_price = data['Close'].iloc[-1] | |
| bb_position = (current_price - lower.iloc[-1]) / (upper.iloc[-1] - lower.iloc[-1]) | |
| indicators['bollinger'] = { | |
| 'upper': upper.iloc[-1], 'middle': middle.iloc[-1], 'lower': lower.iloc[-1], | |
| 'upper_values': upper, 'middle_values': middle, 'lower_values': lower, | |
| 'position': 'UPPER' if bb_position > 0.8 else 'LOWER' if bb_position < 0.2 else 'MIDDLE' | |
| } | |
| sma_20_series = data['Close'].rolling(20).mean() | |
| sma_50_series = data['Close'].rolling(50).mean() | |
| indicators['moving_averages'] = {'sma_20': sma_20_series.iloc[-1], 'sma_50': sma_50_series.iloc[-1], 'sma_200': data['Close'].rolling(200).mean().iloc[-1], 'ema_12': data['Close'].ewm(span=12).mean().iloc[-1], 'ema_26': data['Close'].ewm(span=26).mean().iloc[-1], 'sma_20_values': sma_20_series, 'sma_50_values': sma_50_series} | |
| indicators['volume'] = {'current': data['Volume'].iloc[-1], 'avg_20': data['Volume'].rolling(20).mean().iloc[-1], 'ratio': data['Volume'].iloc[-1] / data['Volume'].rolling(20).mean().iloc[-1]} | |
| # Tambahkan kolom indikator ke DataFrame input untuk digunakan nanti (di predict_technical_indicators_future) | |
| data['RSI'] = rsi_series | |
| data['MACD'] = macd | |
| data['MACD_Signal'] = signal_line | |
| return indicators | |
| def generate_trading_signals(data, indicators): | |
| signals = {} | |
| current_price = data['Close'].iloc[-1] | |
| buy_signals = 0 | |
| sell_signals = 0 | |
| signal_details = [] | |
| rsi = indicators['rsi']['current'] | |
| if rsi < 30: | |
| buy_signals += 1 | |
| signal_details.append(f"β RSI ({rsi:.1f}) - Oversold - BUY signal") | |
| elif rsi > 70: | |
| sell_signals += 1 | |
| signal_details.append(f"β RSI ({rsi:.1f}) - Overbought - SELL signal") | |
| else: | |
| signal_details.append(f"βͺ RSI ({rsi:.1f}) - Neutral") | |
| macd_hist = indicators['macd']['histogram'] | |
| if macd_hist > 0: | |
| buy_signals += 1 | |
| signal_details.append(f"β MACD Histogram ({macd_hist:.4f}) - Positive - BUY signal") | |
| else: | |
| sell_signals += 1 | |
| signal_details.append(f"β MACD Histogram ({macd_hist:.4f}) - Negative - SELL signal") | |
| bb_position = indicators['bollinger']['position'] | |
| if bb_position == 'LOWER': | |
| buy_signals += 1 | |
| signal_details.append(f"β Bollinger Bands - Near lower band - BUY signal") | |
| elif bb_position == 'UPPER': | |
| sell_signals += 1 | |
| signal_details.append(f"β Bollinger Bands - Near upper band - SELL signal") | |
| else: | |
| signal_details.append("βͺ Bollinger Bands - Middle position") | |
| sma_20 = indicators['moving_averages']['sma_20'] | |
| sma_50 = indicators['moving_averages']['sma_50'] | |
| if current_price > sma_20 > sma_50: | |
| buy_signals += 1 | |
| signal_details.append(f"β Price above MA(20,50) - Bullish - BUY signal") | |
| elif current_price < sma_20 < sma_50: | |
| sell_signals += 1 | |
| signal_details.append(f"β Price below MA(20,50) - Bearish - SELL signal") | |
| else: | |
| signal_details.append("βͺ Moving Averages - Mixed signals") | |
| volume_ratio = indicators['volume']['ratio'] | |
| if volume_ratio > 1.5: | |
| buy_signals += 0.5 | |
| signal_details.append(f"β High volume ({volume_ratio:.1f}x avg) - Strengthens BUY signal") | |
| elif volume_ratio < 0.5: | |
| sell_signals += 0.5 | |
| signal_details.append(f"β Low volume ({volume_ratio:.1f}x avg) - Weakens SELL signal") | |
| else: | |
| signal_details.append(f"βͺ Normal volume ({volume_ratio:.1f}x avg)") | |
| total_signals = buy_signals + sell_signals | |
| signal_strength = (buy_signals / max(total_signals, 1)) * 100 | |
| overall_signal = "BUY" if buy_signals > sell_signals else "SELL" if sell_signals > buy_signals else "HOLD" | |
| recent_high = data['High'].tail(20).max() | |
| recent_low = data['Low'].tail(20).min() | |
| signals = {'overall': overall_signal, 'strength': signal_strength, 'details': '\n'.join(signal_details), 'support': recent_low, 'resistance': recent_high, 'stop_loss': recent_low * 0.95 if overall_signal == "BUY" else recent_high * 1.05} | |
| return signals | |
| def get_fundamental_data(stock): | |
| try: | |
| info = stock.info | |
| history = stock.history(period="1d") | |
| fundamental_info = {'name': info.get('longName', 'N/A'), 'current_price': history['Close'].iloc[-1] if not history.empty else 0, 'market_cap': info.get('marketCap', 0), 'pe_ratio': info.get('forwardPE', 0), 'dividend_yield': info.get('dividendYield', 0) * 100 if info.get('dividendYield') else 0, 'volume': history['Volume'].iloc[-1] if not history.empty else 0, 'info': f"Sector: {info.get('sector', 'N/A')}\nIndustry: {info.get('industry', 'N/A')}\nMarket Cap: {info.get('marketCap', 0)}\n52 Week High: {info.get('fiftyTwoWeekHigh', 'N/A')}\n52 Week Low: {info.get('fiftyTwoWeekLow', 'N/A')}\nBeta: {info.get('beta', 'N/A')}\nEPS: {info.get('forwardEps', 'N/A')}\nBook Value: {info.get('bookValue', 'N/A')}\nPrice to Book: {info.get('priceToBook', 'N/A')}"} | |
| return fundamental_info | |
| except: | |
| return {'name': 'N/A', 'current_price': 0, 'market_cap': 0, 'pe_ratio': 0, 'dividend_yield': 0, 'volume': 0, 'info': 'Unable to fetch fundamental data'} | |
| def format_large_number(num): | |
| if num >= 1e12: | |
| return f"{num/1e12:.2f}T" | |
| elif num >= 1e9: | |
| return f"{num/1e9:.2f}B" | |
| elif num >= 1e6: | |
| return f"{num/1e6:.2f}M" | |
| elif num >= 1e3: | |
| return f"{num/1e3:.2f}K" | |
| else: | |
| return f"{num:.2f}" | |
| def create_price_chart(data, indicators): | |
| fig = make_subplots(rows=3, cols=1, shared_xaxes=True, vertical_spacing=0.05) | |
| fig.add_trace(go.Candlestick(x=data.index, open=data['Open'], high=data['High'], low=data['Low'], close=data['Close'], name='Price'), row=1, col=1) | |
| fig.add_trace(go.Scatter(x=data.index, y=indicators['moving_averages']['sma_20_values'], name='SMA 20', line=dict(color='orange')), row=1, col=1) | |
| fig.add_trace(go.Scatter(x=data.index, y=indicators['moving_averages']['sma_50_values'], name='SMA 50', line=dict(color='blue')), row=1, col=1) | |
| fig.add_trace(go.Scatter(x=data.index, y=indicators['rsi']['values'], name='RSI', line=dict(color='purple')), row=2, col=1) | |
| fig.add_trace(go.Scatter(x=data.index, y=indicators['macd']['macd_values'], name='MACD', line=dict(color='blue')), row=3, col=1) | |
| fig.add_trace(go.Scatter(x=data.index, y=indicators['macd']['signal_values'], name='Signal', line=dict(color='red')), row=3, col=1) | |
| fig.update_layout(title='Technical Analysis Dashboard', height=900, showlegend=True) | |
| return fig | |
| def create_technical_chart(data, indicators): | |
| fig = make_subplots(rows=2, cols=2, subplot_titles=('Bollinger Bands', 'Volume', 'Price vs MA', 'RSI Analysis')) | |
| fig.add_trace(go.Scatter(x=data.index, y=data['Close'], name='Price', line=dict(color='black')), row=1, col=1) | |
| fig.add_trace(go.Scatter(x=data.index, y=indicators['bollinger']['upper_values'], name='Upper Band', line=dict(color='red')), row=1, col=1) | |
| fig.add_trace(go.Scatter(x=data.index, y=indicators['bollinger']['lower_values'], name='Lower Band', line=dict(color='green'), fill='tonexty', fillcolor='rgba(0,255,0,0.1)'), row=1, col=1) | |
| fig.add_trace(go.Bar(x=data.index, y=data['Volume'], name='Volume', marker_color='lightblue'), row=1, col=2) | |
| fig.add_trace(go.Scatter(x=data.index, y=data['Close'], name='Price', line=dict(color='gray')), row=2, col=1) | |
| fig.add_trace(go.Scatter(x=data.index, y=indicators['moving_averages']['sma_20_values'], name='SMA 20', line=dict(color='orange', dash='dash')), row=2, col=1) | |
| fig.add_trace(go.Scatter(x=data.index, y=indicators['moving_averages']['sma_50_values'], name='SMA 50', line=dict(color='blue', dash='dash')), row=2, col=1) | |
| fig.add_trace(go.Scatter(x=data.index, y=indicators['rsi']['values'], name='RSI', line=dict(color='purple')), row=2, col=2) | |
| fig.add_hline(y=70, line_dash="dash", line_color="red", row=2, col=2) | |
| fig.add_hline(y=30, line_dash="dash", line_color="green", row=2, col=2) | |
| fig.update_layout(title='Technical Indicators Overview', height=800, showlegend=False, hovermode='x unified') | |
| return fig |