# app.py import gradio as gr import pandas as pd import numpy as np import plotly.express as px import scipy.optimize as sco from datetime import datetime, timedelta import random import requests import time def fetch_stock_data(tickers): """Fetch real stock data using Alpha Vantage API""" API_KEY = "Y86RZ52NQ8YVX7F6" BASE_URL = "https://www.alphavantage.co/query" all_data = {} for ticker in tickers: try: # Use TIME_SERIES_DAILY for daily data params = { "function": "TIME_SERIES_DAILY", "symbol": ticker, "apikey": API_KEY, "outputsize": "full" } print(f"Fetching data for {ticker}...") response = requests.get(BASE_URL, params=params) response.raise_for_status() data = response.json() if "Time Series (Daily)" in data: daily_data = data["Time Series (Daily)"] # Convert to DataFrame df = pd.DataFrame.from_dict(daily_data, orient='index') df = df.astype(float) # Use adjusted close price all_data[ticker] = df['4. close'].iloc[:252] # Get last year of data print(f"Successfully fetched data for {ticker}") else: print(f"No data found for {ticker}") if "Note" in data: print("API Message:", data["Note"]) # Add delay between requests (Alpha Vantage has a rate limit) time.sleep(12) # 12 second delay between requests except Exception as e: print(f"Error fetching {ticker}: {str(e)}") continue if not all_data: print("No data received, using backup data") return generate_sample_data(tickers) # Combine all data and align dates df = pd.DataFrame(all_data) df = df.sort_index(ascending=True) return df def generate_sample_data(tickers): """Generate sample data as backup""" dates = pd.date_range(end=datetime.now(), periods=252) # One year of trading days data = {} for ticker in tickers: # Generate realistic-looking price data np.random.seed(hash(ticker) % 2**32) returns = np.random.normal(loc=0.0001, scale=0.02, size=252) price = 100 * (1 + returns).cumprod() data[ticker] = price return pd.DataFrame(data, index=dates) # Updated S&P 500 Stock List (reduced number due to API rate limits) SP500_TICKERS = [ 'AAPL', # Apple 'MSFT', # Microsoft 'GOOGL', # Google 'AMZN', # Amazon 'TSLA' # Tesla ] def calculate_portfolio_metrics(weights, returns): portfolio_return = np.sum(returns.mean() * weights) * 252 portfolio_volatility = np.sqrt(np.dot(weights.T, np.dot(returns.cov() * 252, weights))) sharpe_ratio = portfolio_return / portfolio_volatility return portfolio_return, portfolio_volatility, sharpe_ratio def optimize_portfolio(returns, max_volatility): num_assets = len(returns.columns) args = (returns,) constraints = ( {'type': 'eq', 'fun': lambda x: np.sum(x) - 1}, # Sum of weights must be 1 {'type': 'ineq', 'fun': lambda x: max_volatility - np.sqrt(np.dot(x.T, np.dot(returns.cov() * 252, x)))} ) bounds = tuple((0, 1) for _ in range(num_assets)) result = sco.minimize( lambda weights, returns: -calculate_portfolio_metrics(weights, returns)[2], num_assets * [1. / num_assets,], args=args, method='SLSQP', bounds=bounds, constraints=constraints ) return result.x def simulate_investment(weights, mu, years, initial_investment=10000): projected_return = np.dot(weights, mu) * years return initial_investment * (1 + projected_return) def output_results(risk_level): try: # Select random tickers (reduced number due to API rate limits) selected_tickers = random.sample(SP500_TICKERS, min(len(SP500_TICKERS), 3)) # Fetch real stock data stocks_df = fetch_stock_data(selected_tickers) if stocks_df.empty: raise ValueError("No stock data received") returns = stocks_df.pct_change().dropna() # Set risk thresholds risk_thresholds = {"Low": 0.15, "Medium": 0.25, "High": 0.35} max_volatility = risk_thresholds.get(risk_level, 0.25) # Calculate optimal portfolio optimized_weights = optimize_portfolio(returns, max_volatility) mu = returns.mean() * 252 portfolio_return, portfolio_volatility, sharpe_ratio = calculate_portfolio_metrics(optimized_weights, returns) # Format metrics expected_annual_return = f'{(portfolio_return * 100):.2f}%' annual_volatility = f'{(portfolio_volatility * 100):.2f}%' sharpe_ratio_str = f'{sharpe_ratio:.2f}' # Create visualizations weights_df = pd.DataFrame({ 'Ticker': selected_tickers, 'Weight': [f'{w:.2%}' for w in optimized_weights] }) # Correlation matrix correlation_matrix = returns.corr() fig_corr = px.imshow( correlation_matrix, text_auto=True, title='Stock Correlation Matrix', color_continuous_scale='RdBu' ) # Cumulative returns cumulative_returns = (1 + returns).cumprod() fig_cum_returns = px.line( cumulative_returns, title='Cumulative Returns of Individual Stocks' ) # Investment projection projected_1yr = simulate_investment(optimized_weights, mu, 1) projected_5yr = simulate_investment(optimized_weights, mu, 5) projected_10yr = simulate_investment(optimized_weights, mu, 10) projection_df = pd.DataFrame({ "Years": [1, 5, 10], "Projected Value": [projected_1yr, projected_5yr, projected_10yr] }) fig_simulation = px.line( projection_df, x='Years', y='Projected Value', title='Projected $10,000 Investment Growth' ) return ( fig_cum_returns, weights_df, fig_corr, expected_annual_return, annual_volatility, sharpe_ratio_str, fig_simulation ) except Exception as e: print(f"Error in output_results: {str(e)}") return None, None, None, f"Error: {str(e)}", "Error", "Error", None # Create Gradio interface with gr.Blocks(theme=gr.themes.Soft()) as app: gr.Markdown("## Investment Portfolio Generator") gr.Markdown("Select your risk level to generate a balanced portfolio based on S&P 500 stocks.") with gr.Row(): risk_level = gr.Radio( ["Low", "Medium", "High"], label="Select Your Risk Level", value="Medium" ) btn = gr.Button("Generate Portfolio") with gr.Row(): expected_annual_return = gr.Textbox(label="Expected Annual Return") annual_volatility = gr.Textbox(label="Annual Volatility") sharpe_ratio = gr.Textbox(label="Sharpe Ratio") with gr.Row(): fig_cum_returns = gr.Plot(label="Cumulative Returns") weights_df = gr.DataFrame(label="Portfolio Weights") with gr.Row(): fig_corr = gr.Plot(label="Correlation Matrix") fig_simulation = gr.Plot(label="Investment Projection") btn.click( output_results, inputs=[risk_level], outputs=[ fig_cum_returns, weights_df, fig_corr, expected_annual_return, annual_volatility, sharpe_ratio, fig_simulation ] ) if __name__ == "__main__": app.launch()