Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import yfinance as yf | |
| import numpy as np | |
| import pandas as pd | |
| from datetime import datetime | |
| from scipy.optimize import brentq | |
| from scipy.stats import norm, gaussian_kde | |
| from scipy.interpolate import splrep, BSpline | |
| from scipy.integrate import simps | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| st.set_page_config(layout="wide", page_title="Forward-Looking Probability") | |
| st.markdown("## Forward-Looking Market-Implied Probability Distribution") | |
| st.markdown("#### Option-Based Price Forecasting Using Implied Volatility") | |
| st.write( | |
| "This tool analyzes the implied probability distribution of a stock's future price using call option data. " | |
| "It calculates implied volatilities via the Black-Scholes model, derives a risk-neutral probability density function using " | |
| "the Breeden-Litzenberger formula, and then smooths the result with Kernel Density Estimation (KDE). " | |
| "A unified strike grid is used for the 3D surface, while 2D analysis focuses on individual expiration dates." | |
| ) | |
| with st.expander("How It Works", expanded=False): | |
| st.write("The analysis is based on the Black-Scholes model for European call options:") | |
| st.latex(r"C(S,K,T,r,\sigma)=S\Phi(d_1)-Ke^{-rT}\Phi(d_2)") | |
| st.latex(r"d_1=\frac{\ln\left(\frac{S}{K}\right)+(r+0.5\sigma^2)T}{\sigma\sqrt{T}}") | |
| st.latex(r"d_2=d_1-\sigma\sqrt{T}") | |
| st.write("The risk-neutral probability density function (PDF) is derived using the Breeden-Litzenberger formula:") | |
| st.latex(r"\text{PDF}(K)=e^{rT}\frac{\partial^2C}{\partial K^2}") | |
| st.write("The resulting PDF is then smoothed using Kernel Density Estimation (KDE).") | |
| # ============================================================================= | |
| # SIDEBAR - General Settings | |
| # ============================================================================= | |
| st.sidebar.title("Parameters") | |
| with st.sidebar.expander("General Settings", expanded=True): | |
| ticker_input = st.text_input("Ticker Symbol", value="NVDA") | |
| lower_pct = st.number_input( | |
| "Price % Decrease", value=10, min_value=1, max_value=100, step=1, | |
| help="For 2D plots: lower threshold = current price * (1 - percentage/100)" | |
| ) | |
| upper_pct = st.number_input( | |
| "Price % Increase", value=10, min_value=1, max_value=100, step=1, | |
| help="For 2D plots: upper threshold = current price * (1 + percentage/100)" | |
| ) | |
| # ============================================================================= | |
| # SIDEBAR - Advanced Settings | |
| # ============================================================================= | |
| with st.sidebar.expander("Advanced Settings", expanded=True): | |
| risk_free = st.number_input( | |
| "Risk-Free Rate", value=0.04, step=0.01, format="%.2f", | |
| help="The annualized risk-free rate used in option pricing." | |
| ) | |
| min_volume = st.number_input( | |
| "Minimum Volume", value=20, step=1, | |
| help="Minimum trading volume required for an option to be considered liquid." | |
| ) | |
| max_spread_ratio = st.number_input( | |
| "Max Spread Ratio", value=0.2, step=0.01, format="%.2f", | |
| help="Maximum acceptable ratio of bid-ask spread to ask price. Options exceeding this will be excluded." | |
| ) | |
| # ============================================================================= | |
| # Run Analysis Button (placed outside the expanders) | |
| # ============================================================================= | |
| run_analysis = st.sidebar.button("Run Analysis") | |
| # ============================================================================= | |
| # HELPER FUNCTIONS | |
| # ============================================================================= | |
| def call_bs_price(S, K, T, r, sigma): | |
| if T <= 0: | |
| return max(S - K, 0) | |
| d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T)) | |
| d2 = d1 - sigma * np.sqrt(T) | |
| return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2) | |
| def implied_vol_call(price, S, K, T, r): | |
| if T <= 0: | |
| return np.nan | |
| def f(iv): | |
| return call_bs_price(S, K, T, r, iv) - price | |
| try: | |
| return brentq(f, 1e-9, 5.0) | |
| except: | |
| return np.nan | |
| def build_pdf(K_grid, iv_spline_tck, S, T, r): | |
| iv_vals = BSpline(*iv_spline_tck)(K_grid) | |
| call_prices = [call_bs_price(S, K, T, r, iv) for K, iv in zip(K_grid, iv_vals)] | |
| dC_dK = np.gradient(call_prices, K_grid) | |
| d2C_dK2 = np.gradient(dC_dK, K_grid) | |
| pdf_raw = np.exp(r * T) * d2C_dK2 | |
| return np.clip(pdf_raw, 0, None) | |
| def build_cdf(K_grid, pdf_vals): | |
| cdf_vals = [] | |
| running = 0.0 | |
| for i in range(len(K_grid)): | |
| if i == 0: | |
| cdf_vals.append(0.0) | |
| else: | |
| area = simps(pdf_vals[i-1:i+1], K_grid[i-1:i+1]) | |
| running += area | |
| cdf_vals.append(running) | |
| cdf_vals = np.array(cdf_vals) | |
| if cdf_vals[-1] > 0: | |
| cdf_vals /= cdf_vals[-1] | |
| return cdf_vals | |
| # Function to filter illiquid options | |
| def filter_liquid_options(df, min_volume=20, max_spread_ratio=0.2): | |
| spread = df["ask"] - df["bid"] | |
| return df[(spread / df["ask"] < max_spread_ratio) & (df["bid"] > 0) & (df["volume"] >= min_volume)] | |
| # ============================================================================= | |
| # 3D ANALYSIS FUNCTION (CALLS ONLY) | |
| # ============================================================================= | |
| def compute_3d_pdf(data_ticker, current_price, r, min_volume, max_spread_ratio): | |
| all_expirations = data_ticker.options | |
| valid_expiries = [] | |
| days_list = [] | |
| calls_data_dict = {} | |
| # First pass: collect calls data from valid expiries. | |
| for exp_date in all_expirations: | |
| try: | |
| expiry_dt = datetime.strptime(exp_date, "%Y-%m-%d") | |
| except: | |
| continue | |
| days_forward = (expiry_dt - datetime.now()).days | |
| if days_forward < 1: | |
| continue | |
| try: | |
| chain = data_ticker.option_chain(exp_date) | |
| except Exception: | |
| continue | |
| calls_df = chain.calls[['strike', 'lastPrice', 'bid', 'ask', 'volume']].dropna().copy() | |
| calls_df = filter_liquid_options(calls_df, min_volume, max_spread_ratio) | |
| calls_df = calls_df[calls_df['lastPrice'] > 0].sort_values('strike') | |
| if calls_df.empty: | |
| continue | |
| valid_expiries.append(exp_date) | |
| days_list.append(days_forward) | |
| calls_data_dict[exp_date] = calls_df | |
| if not valid_expiries: | |
| raise ValueError("No valid expiries with call data.") | |
| K_grid_3d = np.linspace(current_price * 0.25, current_price * 3, 300) | |
| pdf_list = [] | |
| # Second pass: compute smoothed PDF for each expiry. | |
| for exp_date in valid_expiries: | |
| expiry_dt = datetime.strptime(exp_date, "%Y-%m-%d") | |
| T_val = (expiry_dt - datetime.now()).days / 365.0 | |
| calls_df = calls_data_dict[exp_date] | |
| iv_vals = [] | |
| for _, row in calls_df.iterrows(): | |
| vol = implied_vol_call(row['lastPrice'], current_price, row['strike'], T_val, r) | |
| iv_vals.append(vol) | |
| calls_df['iv'] = iv_vals | |
| calls_df.dropna(subset=['iv'], inplace=True) | |
| if calls_df.empty: | |
| pdf_list.append(np.zeros_like(K_grid_3d)) | |
| continue | |
| strikes = calls_df['strike'].values | |
| ivs = calls_df['iv'].values | |
| try: | |
| iv_spline_tck = splrep(strikes, ivs, s=10, k=3) | |
| except Exception: | |
| pdf_list.append(np.zeros_like(K_grid_3d)) | |
| continue | |
| pdf_raw = build_pdf(K_grid_3d, iv_spline_tck, current_price, T_val, r) | |
| try: | |
| kde = gaussian_kde(K_grid_3d, weights=pdf_raw) | |
| pdf_smooth = kde(K_grid_3d) | |
| area = np.trapz(pdf_smooth, K_grid_3d) | |
| if area > 0: | |
| pdf_smooth /= area | |
| except Exception: | |
| pdf_smooth = pdf_raw | |
| pdf_list.append(pdf_smooth) | |
| pdf_matrix = np.array(pdf_list) | |
| days_array = np.array(days_list) | |
| TT, KK = np.meshgrid(days_array, K_grid_3d, indexing='ij') | |
| fig = go.Figure(data=[go.Surface( | |
| x=KK, | |
| y=TT, | |
| z=pdf_matrix, | |
| colorscale='Viridis', | |
| opacity=0.8 | |
| )]) | |
| fig.update_layout( | |
| scene=dict( | |
| xaxis_title='Strike', | |
| yaxis_title='Days to Expiry', | |
| zaxis_title='PDF' | |
| ), | |
| title="3D Smoothed Implied PDF Across Expiries", | |
| width=900, | |
| height=700 | |
| ) | |
| fig.add_annotation( | |
| x=0.98, y=0.98, xref="paper", yref="paper", | |
| text=f"Current Price: {current_price:.2f}", | |
| showarrow=False, | |
| align="right", | |
| font=dict(size=12), | |
| bordercolor="black", | |
| borderwidth=1, | |
| #bgcolor="white", | |
| opacity=0.8 | |
| ) | |
| return fig, valid_expiries | |
| # ============================================================================= | |
| # 2D ANALYSIS FUNCTION (CALLS ONLY) | |
| # ============================================================================= | |
| def compute_2d_pdf(exp_date, data_ticker, current_price, r, lower_pct, upper_pct, min_volume, max_spread_ratio): | |
| try: | |
| expiry_dt = datetime.strptime(exp_date, "%Y-%m-%d") | |
| except: | |
| return None | |
| days_forward = (expiry_dt - datetime.now()).days | |
| if days_forward < 1: | |
| return None | |
| T_val = days_forward / 365.0 | |
| try: | |
| chain = data_ticker.option_chain(exp_date) | |
| except: | |
| return None | |
| calls_df = chain.calls[['strike', 'lastPrice', 'bid', 'ask', 'volume']].dropna().copy() | |
| calls_df = filter_liquid_options(calls_df, min_volume, max_spread_ratio) | |
| calls_df = calls_df[calls_df['lastPrice'] > 0].sort_values('strike') | |
| if calls_df.empty: | |
| return None | |
| iv_list = [] | |
| for _, row in calls_df.iterrows(): | |
| vol = implied_vol_call(row['lastPrice'], current_price, row['strike'], T_val, r) | |
| iv_list.append(vol) | |
| calls_df['iv'] = iv_list | |
| calls_df.dropna(subset=['iv'], inplace=True) | |
| if calls_df.empty: | |
| return None | |
| strikes = calls_df['strike'].values | |
| ivs = calls_df['iv'].values | |
| try: | |
| iv_spline_tck = splrep(strikes, ivs, s=10, k=3) | |
| except: | |
| return None | |
| K_min = strikes.min() | |
| K_max = strikes.max() | |
| K_grid_2d = np.linspace(K_min, K_max, 300) | |
| pdf_raw = build_pdf(K_grid_2d, iv_spline_tck, current_price, T_val, r) | |
| try: | |
| kde = gaussian_kde(K_grid_2d, weights=pdf_raw) | |
| pdf_smooth = kde(K_grid_2d) | |
| area = np.trapz(pdf_smooth, K_grid_2d) | |
| if area > 0: | |
| pdf_smooth /= area | |
| except: | |
| pdf_smooth = pdf_raw | |
| cdf = build_cdf(K_grid_2d, pdf_smooth) | |
| lower_thresh = current_price * (1 - lower_pct / 100) | |
| upper_thresh = current_price * (1 + upper_pct / 100) | |
| mask_below = K_grid_2d < lower_thresh | |
| mask_between = (K_grid_2d >= lower_thresh) & (K_grid_2d <= upper_thresh) | |
| mask_above = K_grid_2d > upper_thresh | |
| p_below = np.trapz(pdf_smooth[mask_below], K_grid_2d[mask_below]) | |
| p_between = np.trapz(pdf_smooth[mask_between], K_grid_2d[mask_between]) | |
| p_above = np.trapz(pdf_smooth[mask_above], K_grid_2d[mask_above]) | |
| fig_pdf_cdf = make_subplots(rows=1, cols=2, subplot_titles=("Smoothed PDF", "Smoothed CDF")) | |
| fig_pdf_cdf.add_trace(go.Scatter( | |
| x=K_grid_2d, y=pdf_smooth, mode='lines', name='PDF', line=dict(color='blue') | |
| ), row=1, col=1) | |
| fig_pdf_cdf.add_vline(x=current_price, line=dict(color='red', dash='dash'), row=1, col=1) | |
| fig_pdf_cdf.update_xaxes(title_text="Strike", row=1, col=1) | |
| fig_pdf_cdf.update_yaxes(title_text="PDF", row=1, col=1) | |
| fig_pdf_cdf.add_trace(go.Scatter( | |
| x=K_grid_2d, y=cdf, mode='lines', name='CDF', line=dict(color='blue') | |
| ), row=1, col=2) | |
| fig_pdf_cdf.add_vline(x=current_price, line=dict(color='red', dash='dash'), row=1, col=2) | |
| fig_pdf_cdf.update_xaxes(title_text="Strike", row=1, col=2) | |
| fig_pdf_cdf.update_yaxes(title_text="CDF", row=1, col=2) | |
| fig_pdf_cdf.update_layout(title_text="2D Analysis: PDF and CDF") | |
| fig_pdf_cdf.add_annotation( | |
| x=0.98, y=0.98, xref="paper", yref="paper", | |
| text=f"Current Price: {current_price:.2f}", | |
| showarrow=False, | |
| align="right", | |
| font=dict(size=12), | |
| #bordercolor="white", | |
| borderwidth=1, | |
| opacity=0.8 | |
| ) | |
| fig_threshold = go.Figure() | |
| fig_threshold.add_trace(go.Scatter( | |
| x=K_grid_2d, y=pdf_smooth, mode='lines', name='PDF', line=dict(color='blue') | |
| )) | |
| fig_threshold.add_vline( | |
| x=lower_thresh, | |
| line=dict(color='orange', dash='dash'), | |
| annotation_text=f'Lower: {lower_thresh:.2f}', | |
| annotation_position="top left", | |
| annotation_xshift=10, | |
| annotation_yshift=-10 | |
| ) | |
| fig_threshold.add_vline( | |
| x=upper_thresh, | |
| line=dict(color='purple', dash='dash'), | |
| annotation_text=f'Upper: {upper_thresh:.2f}', | |
| annotation_position="top right", | |
| annotation_xshift=-10, | |
| annotation_yshift=-10 | |
| ) | |
| fig_threshold.add_vline( | |
| x=current_price, | |
| line=dict(color='red', dash='dash'), | |
| annotation_text=f'Current: {current_price:.2f}', | |
| annotation_position="bottom right", | |
| annotation_xshift=-10, | |
| annotation_yshift=10 | |
| ) | |
| fig_threshold.add_trace(go.Scatter( | |
| x=K_grid_2d[mask_below], y=pdf_smooth[mask_below], mode='lines', | |
| fill='tozeroy', line=dict(color='lightblue'), showlegend=False | |
| )) | |
| fig_threshold.add_trace(go.Scatter( | |
| x=K_grid_2d[mask_between], y=pdf_smooth[mask_between], mode='lines', | |
| fill='tozeroy', line=dict(color='lightgrey'), showlegend=False | |
| )) | |
| fig_threshold.add_trace(go.Scatter( | |
| x=K_grid_2d[mask_above], y=pdf_smooth[mask_above], mode='lines', | |
| fill='tozeroy', line=dict(color='lightcoral'), showlegend=False | |
| )) | |
| fig_threshold.update_layout( | |
| title="Threshold Probability Plot", | |
| xaxis_title="Strike", | |
| yaxis_title="PDF" | |
| ) | |
| annotation_text = ( | |
| f"Probability below {lower_thresh:.2f} is {p_below:.2%}<br>" | |
| f"Probability between {lower_thresh:.2f} and {upper_thresh:.2f} is {p_between:.2%}<br>" | |
| f"Probability above {upper_thresh:.2f} is {p_above:.2%}" | |
| ) | |
| fig_threshold.add_annotation( | |
| x=0.75, y=0.5, xref="paper", yref="paper", | |
| text=annotation_text, | |
| showarrow=False, | |
| align="left", | |
| font=dict(size=12), | |
| #bordercolor="white", | |
| borderwidth=1, | |
| opacity=0.8 | |
| ) | |
| fig_threshold.add_annotation( | |
| x=0.98, y=0.98, xref="paper", yref="paper", | |
| text=f"Current Price: {current_price:.2f}", | |
| showarrow=False, | |
| align="right", | |
| font=dict(size=12), | |
| bordercolor="black", | |
| borderwidth=1, | |
| #bgcolor="white", | |
| opacity=0.8 | |
| ) | |
| result = { | |
| "K_grid_2d": K_grid_2d, | |
| "pdf_smooth": pdf_smooth, | |
| "cdf": cdf, | |
| "lower_thresh": lower_thresh, | |
| "upper_thresh": upper_thresh, | |
| "p_below": p_below, | |
| "p_between": p_between, | |
| "p_above": p_above, | |
| "fig_pdf_cdf": fig_pdf_cdf, | |
| "fig_threshold": fig_threshold, | |
| "days_to_exp": days_forward | |
| } | |
| return result | |
| # ============================================================================= | |
| # MAIN RUN (only run when the button is clicked) | |
| # ============================================================================= | |
| if run_analysis: | |
| with st.spinner("Running analysis, please wait..."): | |
| try: | |
| data_ticker = yf.Ticker(ticker_input) | |
| hist_data = data_ticker.history(period="1d") | |
| if hist_data.empty: | |
| st.error("No price data found.") | |
| st.stop() | |
| current_price = hist_data["Close"].iloc[-1] | |
| except Exception as e: | |
| st.error(f"Error fetching data: {e}") | |
| st.stop() | |
| st.write(f"Current Price: {round(current_price, 2)}") | |
| r = risk_free | |
| try: | |
| fig3d, valid_expiries_3d = compute_3d_pdf(data_ticker, current_price, r, min_volume, max_spread_ratio) | |
| except Exception as e: | |
| st.error(f"3D analysis error: {e}") | |
| st.stop() | |
| results_2d = {} | |
| for exp_date in data_ticker.options: | |
| res = compute_2d_pdf(exp_date, data_ticker, current_price, r, lower_pct, upper_pct, min_volume, max_spread_ratio) | |
| if res is not None: | |
| results_2d[exp_date] = res | |
| if not results_2d: | |
| st.error("No valid expirations for 2D analysis.") | |
| st.stop() | |
| st.session_state.analysis_data = { | |
| "current_price": current_price, | |
| "expirations": list(results_2d.keys()), | |
| "results": results_2d, | |
| "fig3d": fig3d | |
| } | |
| # ============================================================================= | |
| # DISPLAY RESULTS (if analysis data exists) | |
| # ============================================================================= | |
| if "analysis_data" in st.session_state: | |
| ad = st.session_state.analysis_data | |
| st.write(f"**Current Price:** {round(ad['current_price'], 2)}") | |
| st.markdown("## 3D Probability Surface") | |
| st.plotly_chart(ad["fig3d"], use_container_width=True) | |
| st.markdown("## 2D Plots for Selected Expiration Date") | |
| chosen = st.selectbox("Choose expiration date:", options=ad["expirations"]) | |
| res2d = ad["results"][chosen] | |
| st.plotly_chart(res2d["fig_pdf_cdf"], use_container_width=True) | |
| st.plotly_chart(res2d["fig_threshold"], use_container_width=True) | |
| st.write("The 2D plots use calls data only. The 3D surface uses a unified strike grid.") | |
| else: | |
| st.info("Click 'Run Analysis' to start.") | |
| hide_streamlit_style = """ | |
| <style> | |
| #MainMenu {visibility: hidden;} | |
| footer {visibility: hidden;} | |
| </style> | |
| """ | |
| st.markdown(hide_streamlit_style, unsafe_allow_html=True) |