try / app.py
kimappl's picture
Update app.py
4f7d51b verified
# app.py
import gradio as gr
import pandas as pd
import numpy as np
import plotly.express as px
import scipy.optimize as sco
from datetime import datetime, timedelta
import random
import requests
import time
def fetch_stock_data(tickers):
"""Fetch real stock data using Alpha Vantage API"""
API_KEY = "Y86RZ52NQ8YVX7F6"
BASE_URL = "https://www.alphavantage.co/query"
all_data = {}
for ticker in tickers:
try:
# Use TIME_SERIES_DAILY for daily data
params = {
"function": "TIME_SERIES_DAILY",
"symbol": ticker,
"apikey": API_KEY,
"outputsize": "full"
}
print(f"Fetching data for {ticker}...")
response = requests.get(BASE_URL, params=params)
response.raise_for_status()
data = response.json()
if "Time Series (Daily)" in data:
daily_data = data["Time Series (Daily)"]
# Convert to DataFrame
df = pd.DataFrame.from_dict(daily_data, orient='index')
df = df.astype(float)
# Use adjusted close price
all_data[ticker] = df['4. close'].iloc[:252] # Get last year of data
print(f"Successfully fetched data for {ticker}")
else:
print(f"No data found for {ticker}")
if "Note" in data:
print("API Message:", data["Note"])
# Add delay between requests (Alpha Vantage has a rate limit)
time.sleep(12) # 12 second delay between requests
except Exception as e:
print(f"Error fetching {ticker}: {str(e)}")
continue
if not all_data:
print("No data received, using backup data")
return generate_sample_data(tickers)
# Combine all data and align dates
df = pd.DataFrame(all_data)
df = df.sort_index(ascending=True)
return df
def generate_sample_data(tickers):
"""Generate sample data as backup"""
dates = pd.date_range(end=datetime.now(), periods=252) # One year of trading days
data = {}
for ticker in tickers:
# Generate realistic-looking price data
np.random.seed(hash(ticker) % 2**32)
returns = np.random.normal(loc=0.0001, scale=0.02, size=252)
price = 100 * (1 + returns).cumprod()
data[ticker] = price
return pd.DataFrame(data, index=dates)
# Updated S&P 500 Stock List (reduced number due to API rate limits)
SP500_TICKERS = [
'AAPL', # Apple
'MSFT', # Microsoft
'GOOGL', # Google
'AMZN', # Amazon
'TSLA' # Tesla
]
def calculate_portfolio_metrics(weights, returns):
portfolio_return = np.sum(returns.mean() * weights) * 252
portfolio_volatility = np.sqrt(np.dot(weights.T, np.dot(returns.cov() * 252, weights)))
sharpe_ratio = portfolio_return / portfolio_volatility
return portfolio_return, portfolio_volatility, sharpe_ratio
def optimize_portfolio(returns, max_volatility):
num_assets = len(returns.columns)
args = (returns,)
constraints = (
{'type': 'eq', 'fun': lambda x: np.sum(x) - 1}, # Sum of weights must be 1
{'type': 'ineq', 'fun': lambda x: max_volatility - np.sqrt(np.dot(x.T, np.dot(returns.cov() * 252, x)))}
)
bounds = tuple((0, 1) for _ in range(num_assets))
result = sco.minimize(
lambda weights, returns: -calculate_portfolio_metrics(weights, returns)[2],
num_assets * [1. / num_assets,],
args=args,
method='SLSQP',
bounds=bounds,
constraints=constraints
)
return result.x
def simulate_investment(weights, mu, years, initial_investment=10000):
projected_return = np.dot(weights, mu) * years
return initial_investment * (1 + projected_return)
def output_results(risk_level):
try:
# Select random tickers (reduced number due to API rate limits)
selected_tickers = random.sample(SP500_TICKERS, min(len(SP500_TICKERS), 3))
# Fetch real stock data
stocks_df = fetch_stock_data(selected_tickers)
if stocks_df.empty:
raise ValueError("No stock data received")
returns = stocks_df.pct_change().dropna()
# Set risk thresholds
risk_thresholds = {"Low": 0.15, "Medium": 0.25, "High": 0.35}
max_volatility = risk_thresholds.get(risk_level, 0.25)
# Calculate optimal portfolio
optimized_weights = optimize_portfolio(returns, max_volatility)
mu = returns.mean() * 252
portfolio_return, portfolio_volatility, sharpe_ratio = calculate_portfolio_metrics(optimized_weights, returns)
# Format metrics
expected_annual_return = f'{(portfolio_return * 100):.2f}%'
annual_volatility = f'{(portfolio_volatility * 100):.2f}%'
sharpe_ratio_str = f'{sharpe_ratio:.2f}'
# Create visualizations
weights_df = pd.DataFrame({
'Ticker': selected_tickers,
'Weight': [f'{w:.2%}' for w in optimized_weights]
})
# Correlation matrix
correlation_matrix = returns.corr()
fig_corr = px.imshow(
correlation_matrix,
text_auto=True,
title='Stock Correlation Matrix',
color_continuous_scale='RdBu'
)
# Cumulative returns
cumulative_returns = (1 + returns).cumprod()
fig_cum_returns = px.line(
cumulative_returns,
title='Cumulative Returns of Individual Stocks'
)
# Investment projection
projected_1yr = simulate_investment(optimized_weights, mu, 1)
projected_5yr = simulate_investment(optimized_weights, mu, 5)
projected_10yr = simulate_investment(optimized_weights, mu, 10)
projection_df = pd.DataFrame({
"Years": [1, 5, 10],
"Projected Value": [projected_1yr, projected_5yr, projected_10yr]
})
fig_simulation = px.line(
projection_df,
x='Years',
y='Projected Value',
title='Projected $10,000 Investment Growth'
)
return (
fig_cum_returns,
weights_df,
fig_corr,
expected_annual_return,
annual_volatility,
sharpe_ratio_str,
fig_simulation
)
except Exception as e:
print(f"Error in output_results: {str(e)}")
return None, None, None, f"Error: {str(e)}", "Error", "Error", None
# Create Gradio interface
with gr.Blocks(theme=gr.themes.Soft()) as app:
gr.Markdown("## Investment Portfolio Generator")
gr.Markdown("Select your risk level to generate a balanced portfolio based on S&P 500 stocks.")
with gr.Row():
risk_level = gr.Radio(
["Low", "Medium", "High"],
label="Select Your Risk Level",
value="Medium"
)
btn = gr.Button("Generate Portfolio")
with gr.Row():
expected_annual_return = gr.Textbox(label="Expected Annual Return")
annual_volatility = gr.Textbox(label="Annual Volatility")
sharpe_ratio = gr.Textbox(label="Sharpe Ratio")
with gr.Row():
fig_cum_returns = gr.Plot(label="Cumulative Returns")
weights_df = gr.DataFrame(label="Portfolio Weights")
with gr.Row():
fig_corr = gr.Plot(label="Correlation Matrix")
fig_simulation = gr.Plot(label="Investment Projection")
btn.click(
output_results,
inputs=[risk_level],
outputs=[
fig_cum_returns,
weights_df,
fig_corr,
expected_annual_return,
annual_volatility,
sharpe_ratio,
fig_simulation
]
)
if __name__ == "__main__":
app.launch()