|
|
|
import gradio as gr |
|
import pandas as pd |
|
import numpy as np |
|
import plotly.express as px |
|
import scipy.optimize as sco |
|
from datetime import datetime, timedelta |
|
import random |
|
import requests |
|
import time |
|
|
|
def fetch_stock_data(tickers): |
|
"""Fetch real stock data using Alpha Vantage API""" |
|
API_KEY = "Y86RZ52NQ8YVX7F6" |
|
BASE_URL = "https://www.alphavantage.co/query" |
|
all_data = {} |
|
|
|
for ticker in tickers: |
|
try: |
|
|
|
params = { |
|
"function": "TIME_SERIES_DAILY", |
|
"symbol": ticker, |
|
"apikey": API_KEY, |
|
"outputsize": "full" |
|
} |
|
|
|
print(f"Fetching data for {ticker}...") |
|
response = requests.get(BASE_URL, params=params) |
|
response.raise_for_status() |
|
data = response.json() |
|
|
|
if "Time Series (Daily)" in data: |
|
daily_data = data["Time Series (Daily)"] |
|
|
|
df = pd.DataFrame.from_dict(daily_data, orient='index') |
|
df = df.astype(float) |
|
|
|
all_data[ticker] = df['4. close'].iloc[:252] |
|
print(f"Successfully fetched data for {ticker}") |
|
else: |
|
print(f"No data found for {ticker}") |
|
if "Note" in data: |
|
print("API Message:", data["Note"]) |
|
|
|
|
|
time.sleep(12) |
|
|
|
except Exception as e: |
|
print(f"Error fetching {ticker}: {str(e)}") |
|
continue |
|
|
|
if not all_data: |
|
print("No data received, using backup data") |
|
return generate_sample_data(tickers) |
|
|
|
|
|
df = pd.DataFrame(all_data) |
|
df = df.sort_index(ascending=True) |
|
return df |
|
|
|
def generate_sample_data(tickers): |
|
"""Generate sample data as backup""" |
|
dates = pd.date_range(end=datetime.now(), periods=252) |
|
data = {} |
|
|
|
for ticker in tickers: |
|
|
|
np.random.seed(hash(ticker) % 2**32) |
|
returns = np.random.normal(loc=0.0001, scale=0.02, size=252) |
|
price = 100 * (1 + returns).cumprod() |
|
data[ticker] = price |
|
|
|
return pd.DataFrame(data, index=dates) |
|
|
|
|
|
SP500_TICKERS = [ |
|
'AAPL', |
|
'MSFT', |
|
'GOOGL', |
|
'AMZN', |
|
'TSLA' |
|
] |
|
|
|
def calculate_portfolio_metrics(weights, returns): |
|
portfolio_return = np.sum(returns.mean() * weights) * 252 |
|
portfolio_volatility = np.sqrt(np.dot(weights.T, np.dot(returns.cov() * 252, weights))) |
|
sharpe_ratio = portfolio_return / portfolio_volatility |
|
return portfolio_return, portfolio_volatility, sharpe_ratio |
|
|
|
def optimize_portfolio(returns, max_volatility): |
|
num_assets = len(returns.columns) |
|
args = (returns,) |
|
constraints = ( |
|
{'type': 'eq', 'fun': lambda x: np.sum(x) - 1}, |
|
{'type': 'ineq', 'fun': lambda x: max_volatility - np.sqrt(np.dot(x.T, np.dot(returns.cov() * 252, x)))} |
|
) |
|
bounds = tuple((0, 1) for _ in range(num_assets)) |
|
|
|
result = sco.minimize( |
|
lambda weights, returns: -calculate_portfolio_metrics(weights, returns)[2], |
|
num_assets * [1. / num_assets,], |
|
args=args, |
|
method='SLSQP', |
|
bounds=bounds, |
|
constraints=constraints |
|
) |
|
return result.x |
|
|
|
def simulate_investment(weights, mu, years, initial_investment=10000): |
|
projected_return = np.dot(weights, mu) * years |
|
return initial_investment * (1 + projected_return) |
|
|
|
def output_results(risk_level): |
|
try: |
|
|
|
selected_tickers = random.sample(SP500_TICKERS, min(len(SP500_TICKERS), 3)) |
|
|
|
|
|
stocks_df = fetch_stock_data(selected_tickers) |
|
|
|
if stocks_df.empty: |
|
raise ValueError("No stock data received") |
|
|
|
returns = stocks_df.pct_change().dropna() |
|
|
|
|
|
risk_thresholds = {"Low": 0.15, "Medium": 0.25, "High": 0.35} |
|
max_volatility = risk_thresholds.get(risk_level, 0.25) |
|
|
|
|
|
optimized_weights = optimize_portfolio(returns, max_volatility) |
|
mu = returns.mean() * 252 |
|
portfolio_return, portfolio_volatility, sharpe_ratio = calculate_portfolio_metrics(optimized_weights, returns) |
|
|
|
|
|
expected_annual_return = f'{(portfolio_return * 100):.2f}%' |
|
annual_volatility = f'{(portfolio_volatility * 100):.2f}%' |
|
sharpe_ratio_str = f'{sharpe_ratio:.2f}' |
|
|
|
|
|
weights_df = pd.DataFrame({ |
|
'Ticker': selected_tickers, |
|
'Weight': [f'{w:.2%}' for w in optimized_weights] |
|
}) |
|
|
|
|
|
correlation_matrix = returns.corr() |
|
fig_corr = px.imshow( |
|
correlation_matrix, |
|
text_auto=True, |
|
title='Stock Correlation Matrix', |
|
color_continuous_scale='RdBu' |
|
) |
|
|
|
|
|
cumulative_returns = (1 + returns).cumprod() |
|
fig_cum_returns = px.line( |
|
cumulative_returns, |
|
title='Cumulative Returns of Individual Stocks' |
|
) |
|
|
|
|
|
projected_1yr = simulate_investment(optimized_weights, mu, 1) |
|
projected_5yr = simulate_investment(optimized_weights, mu, 5) |
|
projected_10yr = simulate_investment(optimized_weights, mu, 10) |
|
|
|
projection_df = pd.DataFrame({ |
|
"Years": [1, 5, 10], |
|
"Projected Value": [projected_1yr, projected_5yr, projected_10yr] |
|
}) |
|
|
|
fig_simulation = px.line( |
|
projection_df, |
|
x='Years', |
|
y='Projected Value', |
|
title='Projected $10,000 Investment Growth' |
|
) |
|
|
|
return ( |
|
fig_cum_returns, |
|
weights_df, |
|
fig_corr, |
|
expected_annual_return, |
|
annual_volatility, |
|
sharpe_ratio_str, |
|
fig_simulation |
|
) |
|
|
|
except Exception as e: |
|
print(f"Error in output_results: {str(e)}") |
|
return None, None, None, f"Error: {str(e)}", "Error", "Error", None |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as app: |
|
gr.Markdown("## Investment Portfolio Generator") |
|
gr.Markdown("Select your risk level to generate a balanced portfolio based on S&P 500 stocks.") |
|
|
|
with gr.Row(): |
|
risk_level = gr.Radio( |
|
["Low", "Medium", "High"], |
|
label="Select Your Risk Level", |
|
value="Medium" |
|
) |
|
|
|
btn = gr.Button("Generate Portfolio") |
|
|
|
with gr.Row(): |
|
expected_annual_return = gr.Textbox(label="Expected Annual Return") |
|
annual_volatility = gr.Textbox(label="Annual Volatility") |
|
sharpe_ratio = gr.Textbox(label="Sharpe Ratio") |
|
|
|
with gr.Row(): |
|
fig_cum_returns = gr.Plot(label="Cumulative Returns") |
|
weights_df = gr.DataFrame(label="Portfolio Weights") |
|
|
|
with gr.Row(): |
|
fig_corr = gr.Plot(label="Correlation Matrix") |
|
fig_simulation = gr.Plot(label="Investment Projection") |
|
|
|
btn.click( |
|
output_results, |
|
inputs=[risk_level], |
|
outputs=[ |
|
fig_cum_returns, |
|
weights_df, |
|
fig_corr, |
|
expected_annual_return, |
|
annual_volatility, |
|
sharpe_ratio, |
|
fig_simulation |
|
] |
|
) |
|
|
|
if __name__ == "__main__": |
|
app.launch() |