Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Real MCP with Gradio Agent - Stock Analysis Platform | |
| Comprehensive implementation with MCP server and Gradio interface | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Any, Optional | |
| import traceback | |
| # MCP and async imports | |
| from mcp.server import Server | |
| from mcp.server.models import InitializationOptions | |
| from mcp.server.stdio import stdio_server | |
| from mcp.types import Tool, TextContent | |
| import mcp.types as types | |
| # Data analysis imports | |
| import yfinance as yf | |
| import pandas as pd | |
| import numpy as np | |
| from dataclasses import dataclass | |
| # Gradio for web interface | |
| import gradio as gr | |
| import plotly.graph_objects as go | |
| import plotly.express as px | |
| from plotly.subplots import make_subplots | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class StockAnalysis: | |
| """Data class for stock analysis results""" | |
| symbol: str | |
| company_name: str | |
| current_price: float | |
| ytd_return: float | |
| volatility: float | |
| investment_score: int | |
| recommendation: str | |
| risk_level: str | |
| sector: str | |
| market_cap: int | |
| class StockAnalyzer: | |
| """Advanced stock analysis engine""" | |
| def __init__(self): | |
| self.cache = {} | |
| self.cache_timeout = 300 # 5 minutes | |
| def get_stock_data(self, symbol: str, period: str = "1y") -> Optional[pd.DataFrame]: | |
| """Get stock data with caching""" | |
| cache_key = f"{symbol}_{period}" | |
| current_time = datetime.now() | |
| if cache_key in self.cache: | |
| data, timestamp = self.cache[cache_key] | |
| if (current_time - timestamp).seconds < self.cache_timeout: | |
| return data | |
| try: | |
| stock = yf.Ticker(symbol) | |
| data = stock.history(period=period) | |
| self.cache[cache_key] = (data, current_time) | |
| return data | |
| except Exception as e: | |
| logger.error(f"Error fetching data for {symbol}: {e}") | |
| return None | |
| def calculate_technical_indicators(self, data: pd.DataFrame) -> Dict: | |
| """Calculate technical indicators""" | |
| if data.empty: | |
| return {} | |
| # Moving averages | |
| data['MA20'] = data['Close'].rolling(window=20).mean() | |
| data['MA50'] = data['Close'].rolling(window=50).mean() | |
| data['MA200'] = data['Close'].rolling(window=200).mean() | |
| # RSI | |
| delta = data['Close'].diff() | |
| gain = (delta.where(delta > 0, 0)).rolling(window=14).mean() | |
| loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean() | |
| rs = gain / loss | |
| data['RSI'] = 100 - (100 / (1 + rs)) | |
| # Bollinger Bands | |
| data['BB_Middle'] = data['Close'].rolling(window=20).mean() | |
| bb_std = data['Close'].rolling(window=20).std() | |
| data['BB_Upper'] = data['BB_Middle'] + (bb_std * 2) | |
| data['BB_Lower'] = data['BB_Middle'] - (bb_std * 2) | |
| # MACD | |
| exp1 = data['Close'].ewm(span=12).mean() | |
| exp2 = data['Close'].ewm(span=26).mean() | |
| data['MACD'] = exp1 - exp2 | |
| data['MACD_Signal'] = data['MACD'].ewm(span=9).mean() | |
| return { | |
| 'rsi': data['RSI'].iloc[-1] if not data['RSI'].empty else 0, | |
| 'macd': data['MACD'].iloc[-1] if not data['MACD'].empty else 0, | |
| 'macd_signal': data['MACD_Signal'].iloc[-1] if not data['MACD_Signal'].empty else 0, | |
| 'ma20': data['MA20'].iloc[-1] if not data['MA20'].empty else 0, | |
| 'ma50': data['MA50'].iloc[-1] if not data['MA50'].empty else 0, | |
| 'current_price': data['Close'].iloc[-1] if not data['Close'].empty else 0 | |
| } | |
| def calculate_investment_score(self, symbol: str) -> Dict: | |
| """Calculate comprehensive investment score""" | |
| try: | |
| stock = yf.Ticker(symbol) | |
| info = stock.info | |
| # Get YTD data | |
| ytd_start = datetime(2025, 1, 1) | |
| ytd_data = stock.history(start=ytd_start.strftime("%Y-%m-%d")) | |
| if ytd_data.empty: | |
| return {'error': f'No YTD data available for {symbol}'} | |
| # Calculate YTD return | |
| ytd_return = ((ytd_data['Close'].iloc[-1] - ytd_data['Close'].iloc[0]) / | |
| ytd_data['Close'].iloc[0]) * 100 | |
| # Get 1-year data for volatility | |
| year_data = self.get_stock_data(symbol, "1y") | |
| volatility = 0 | |
| max_drawdown = 0 | |
| if year_data is not None and not year_data.empty: | |
| returns = year_data['Close'].pct_change().dropna() | |
| volatility = returns.std() * np.sqrt(252) * 100 # Annualized volatility | |
| # Calculate max drawdown | |
| rolling_max = year_data['Close'].expanding().max() | |
| drawdown = (year_data['Close'] - rolling_max) / rolling_max | |
| max_drawdown = drawdown.min() * 100 | |
| # Technical indicators | |
| technical = self.calculate_technical_indicators(year_data) if year_data is not None else {} | |
| # Fundamental metrics | |
| pe_ratio = info.get('trailingPE', 0) or 0 | |
| forward_pe = info.get('forwardPE', 0) or 0 | |
| peg_ratio = info.get('pegRatio', 0) or 0 | |
| roe = info.get('returnOnEquity', 0) or 0 | |
| profit_margin = info.get('profitMargins', 0) or 0 | |
| revenue_growth = info.get('revenueGrowth', 0) or 0 | |
| # Calculate investment score (0-100) | |
| score = 50 # Base score | |
| # YTD Performance (30% weight) | |
| if ytd_return > 25: | |
| score += 25 | |
| elif ytd_return > 15: | |
| score += 20 | |
| elif ytd_return > 5: | |
| score += 15 | |
| elif ytd_return > 0: | |
| score += 10 | |
| elif ytd_return > -10: | |
| score += 5 | |
| else: | |
| score -= 15 | |
| # Technical indicators (25% weight) | |
| rsi = technical.get('rsi', 50) | |
| if 30 <= rsi <= 70: # Not oversold or overbought | |
| score += 12 | |
| elif rsi < 30: # Oversold - potential buy | |
| score += 8 | |
| elif rsi > 70: # Overbought - caution | |
| score -= 5 | |
| # MACD signal | |
| macd = technical.get('macd', 0) | |
| macd_signal = technical.get('macd_signal', 0) | |
| if macd > macd_signal: # Bullish signal | |
| score += 8 | |
| else: | |
| score -= 3 | |
| # Valuation (25% weight) | |
| if pe_ratio and 8 < pe_ratio < 20: | |
| score += 15 | |
| elif pe_ratio and pe_ratio < 8: | |
| score += 20 # Very undervalued | |
| elif pe_ratio and 20 < pe_ratio < 30: | |
| score += 5 | |
| elif pe_ratio and pe_ratio > 35: | |
| score -= 10 | |
| # Growth and profitability (20% weight) | |
| if revenue_growth and revenue_growth > 0.20: | |
| score += 15 | |
| elif revenue_growth and revenue_growth > 0.10: | |
| score += 10 | |
| elif revenue_growth and revenue_growth > 0.05: | |
| score += 5 | |
| if profit_margin and profit_margin > 0.15: | |
| score += 5 | |
| elif profit_margin and profit_margin > 0.10: | |
| score += 3 | |
| # Risk adjustment | |
| if volatility < 15: | |
| score += 5 | |
| elif volatility > 35: | |
| score -= 10 | |
| if max_drawdown > -15: | |
| score += 5 | |
| elif max_drawdown < -30: | |
| score -= 8 | |
| # Ensure score bounds | |
| score = max(0, min(100, score)) | |
| # Determine risk level and recommendation | |
| if volatility < 15: | |
| risk_level = "Low" | |
| elif volatility < 25: | |
| risk_level = "Medium" | |
| else: | |
| risk_level = "High" | |
| if score >= 80: | |
| recommendation = "Strong Buy" | |
| elif score >= 70: | |
| recommendation = "Buy" | |
| elif score >= 60: | |
| recommendation = "Hold" | |
| elif score >= 50: | |
| recommendation = "Weak Hold" | |
| else: | |
| recommendation = "Sell" | |
| return { | |
| 'symbol': symbol.upper(), | |
| 'company_name': info.get('longName', 'N/A'), | |
| 'current_price': ytd_data['Close'].iloc[-1], | |
| 'ytd_return': ytd_return, | |
| 'volatility': volatility, | |
| 'max_drawdown': max_drawdown, | |
| 'pe_ratio': pe_ratio, | |
| 'forward_pe': forward_pe, | |
| 'peg_ratio': peg_ratio, | |
| 'roe': roe * 100 if roe else 0, | |
| 'profit_margin': profit_margin * 100 if profit_margin else 0, | |
| 'revenue_growth': revenue_growth * 100 if revenue_growth else 0, | |
| 'investment_score': score, | |
| 'recommendation': recommendation, | |
| 'risk_level': risk_level, | |
| 'sector': info.get('sector', 'N/A'), | |
| 'industry': info.get('industry', 'N/A'), | |
| 'market_cap': info.get('marketCap', 0), | |
| 'technical_indicators': technical, | |
| 'analysis_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| except Exception as e: | |
| logger.error(f"Error calculating investment score for {symbol}: {e}") | |
| return {'error': f'Error analyzing {symbol}: {str(e)}'} | |
| # Initialize the stock analyzer | |
| analyzer = StockAnalyzer() | |
| # MCP Server Setup | |
| server = Server("stock-analysis-mcp") | |
| async def handle_list_tools() -> List[Tool]: | |
| """List available MCP tools""" | |
| return [ | |
| Tool( | |
| name="get_stock_price", | |
| description="Get current stock price and basic info", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "symbol": {"type": "string", "description": "Stock symbol (e.g., AAPL)"} | |
| }, | |
| "required": ["symbol"] | |
| } | |
| ), | |
| Tool( | |
| name="analyze_stock_comprehensive", | |
| description="Comprehensive stock analysis with technical and fundamental metrics", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "symbol": {"type": "string", "description": "Stock symbol (e.g., AAPL)"} | |
| }, | |
| "required": ["symbol"] | |
| } | |
| ), | |
| Tool( | |
| name="compare_stocks_ytd", | |
| description="Compare multiple stocks for YTD 2025 performance", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "symbols": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "List of stock symbols to compare" | |
| } | |
| }, | |
| "required": ["symbols"] | |
| } | |
| ), | |
| Tool( | |
| name="get_market_sector_analysis", | |
| description="Analyze stocks by sector performance", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "symbols": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "List of stock symbols to analyze by sector" | |
| } | |
| }, | |
| "required": ["symbols"] | |
| } | |
| ) | |
| ] | |
| async def handle_call_tool(name: str, arguments: Dict[str, Any]) -> List[types.TextContent]: | |
| """Handle MCP tool calls""" | |
| try: | |
| if name == "get_stock_price": | |
| symbol = arguments.get("symbol", "").upper() | |
| if not symbol: | |
| return [TextContent(type="text", text="Error: Symbol is required")] | |
| stock = yf.Ticker(symbol) | |
| info = stock.info | |
| hist = stock.history(period="2d") | |
| if hist.empty: | |
| return [TextContent(type="text", text=f"Error: No data found for {symbol}")] | |
| current_price = hist['Close'].iloc[-1] | |
| prev_close = hist['Close'].iloc[-2] if len(hist) > 1 else current_price | |
| change = current_price - prev_close | |
| change_percent = (change / prev_close) * 100 | |
| result = { | |
| "symbol": symbol, | |
| "company_name": info.get('longName', 'N/A'), | |
| "current_price": round(current_price, 2), | |
| "change": round(change, 2), | |
| "change_percent": round(change_percent, 2), | |
| "previous_close": round(prev_close, 2), | |
| "market_cap": info.get('marketCap', 0), | |
| "volume": hist['Volume'].iloc[-1], | |
| "sector": info.get('sector', 'N/A') | |
| } | |
| return [TextContent(type="text", text=json.dumps(result, indent=2))] | |
| elif name == "analyze_stock_comprehensive": | |
| symbol = arguments.get("symbol", "").upper() | |
| if not symbol: | |
| return [TextContent(type="text", text="Error: Symbol is required")] | |
| analysis = analyzer.calculate_investment_score(symbol) | |
| return [TextContent(type="text", text=json.dumps(analysis, indent=2))] | |
| elif name == "compare_stocks_ytd": | |
| symbols = arguments.get("symbols", []) | |
| if not symbols: | |
| return [TextContent(type="text", text="Error: Symbols list is required")] | |
| comparisons = [] | |
| for symbol in symbols: | |
| analysis = analyzer.calculate_investment_score(symbol) | |
| if 'error' not in analysis: | |
| comparisons.append(analysis) | |
| # Sort by investment score | |
| comparisons.sort(key=lambda x: x.get('investment_score', 0), reverse=True) | |
| result = { | |
| "comparison_results": comparisons, | |
| "winner": comparisons[0] if comparisons else None, | |
| "analysis_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| return [TextContent(type="text", text=json.dumps(result, indent=2))] | |
| elif name == "get_market_sector_analysis": | |
| symbols = arguments.get("symbols", []) | |
| if not symbols: | |
| return [TextContent(type="text", text="Error: Symbols list is required")] | |
| sector_data = {} | |
| for symbol in symbols: | |
| analysis = analyzer.calculate_investment_score(symbol) | |
| if 'error' not in analysis: | |
| sector = analysis.get('sector', 'Unknown') | |
| if sector not in sector_data: | |
| sector_data[sector] = [] | |
| sector_data[sector].append(analysis) | |
| # Calculate sector averages | |
| sector_summary = {} | |
| for sector, stocks in sector_data.items(): | |
| avg_score = sum(s['investment_score'] for s in stocks) / len(stocks) | |
| avg_ytd = sum(s['ytd_return'] for s in stocks) / len(stocks) | |
| sector_summary[sector] = { | |
| "average_score": round(avg_score, 1), | |
| "average_ytd_return": round(avg_ytd, 2), | |
| "stock_count": len(stocks), | |
| "stocks": [s['symbol'] for s in stocks] | |
| } | |
| result = { | |
| "sector_analysis": sector_summary, | |
| "detailed_stocks": sector_data, | |
| "analysis_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| return [TextContent(type="text", text=json.dumps(result, indent=2))] | |
| else: | |
| return [TextContent(type="text", text=f"Error: Unknown tool '{name}'")] | |
| except Exception as e: | |
| error_msg = f"Error executing tool '{name}': {str(e)}" | |
| logger.error(error_msg) | |
| return [TextContent(type="text", text=error_msg)] | |
| # Gradio Interface Functions | |
| def create_stock_chart(symbol: str): | |
| """Create interactive stock chart""" | |
| try: | |
| data = analyzer.get_stock_data(symbol, "6mo") | |
| if data is None or data.empty: | |
| return None | |
| fig = make_subplots( | |
| rows=2, cols=1, | |
| shared_xaxes=True, | |
| vertical_spacing=0.1, | |
| subplot_titles=(f'{symbol.upper()} Stock Price', 'Volume'), | |
| row_width=[0.7, 0.3] | |
| ) | |
| # Candlestick chart | |
| fig.add_trace( | |
| go.Candlestick( | |
| x=data.index, | |
| open=data['Open'], | |
| high=data['High'], | |
| low=data['Low'], | |
| close=data['Close'], | |
| name="Price" | |
| ), | |
| row=1, col=1 | |
| ) | |
| # Moving averages | |
| if len(data) >= 20: | |
| data['MA20'] = data['Close'].rolling(window=20).mean() | |
| fig.add_trace( | |
| go.Scatter(x=data.index, y=data['MA20'], name='MA20', line=dict(color='orange')), | |
| row=1, col=1 | |
| ) | |
| if len(data) >= 50: | |
| data['MA50'] = data['Close'].rolling(window=50).mean() | |
| fig.add_trace( | |
| go.Scatter(x=data.index, y=data['MA50'], name='MA50', line=dict(color='blue')), | |
| row=1, col=1 | |
| ) | |
| # Volume | |
| fig.add_trace( | |
| go.Bar(x=data.index, y=data['Volume'], name='Volume', marker_color='lightblue'), | |
| row=2, col=1 | |
| ) | |
| fig.update_layout( | |
| title=f'{symbol.upper()} - Stock Analysis', | |
| xaxis_rangeslider_visible=False, | |
| height=600, | |
| showlegend=True | |
| ) | |
| return fig | |
| except Exception as e: | |
| logger.error(f"Error creating chart for {symbol}: {e}") | |
| return None | |
| def analyze_single_stock(symbol: str) -> tuple: | |
| """Analyze a single stock and return results""" | |
| if not symbol: | |
| return "Please enter a stock symbol", None, None | |
| try: | |
| analysis = analyzer.calculate_investment_score(symbol.upper()) | |
| if 'error' in analysis: | |
| return f"Error: {analysis['error']}", None, None | |
| # Create formatted analysis text | |
| analysis_text = f""" | |
| # 📊 Stock Analysis for {analysis['symbol']} | |
| ## 🏢 Company Information | |
| - **Company**: {analysis['company_name']} | |
| - **Sector**: {analysis['sector']} | |
| - **Industry**: {analysis['industry']} | |
| - **Market Cap**: ${analysis['market_cap']/1e9:.2f}B | |
| ## 💰 Current Performance | |
| - **Current Price**: ${analysis['current_price']:.2f} | |
| - **YTD 2025 Return**: {analysis['ytd_return']:+.2f}% | |
| - **Investment Score**: {analysis['investment_score']}/100 | |
| ## 📈 Investment Recommendation | |
| - **Recommendation**: {analysis['recommendation']} | |
| - **Risk Level**: {analysis['risk_level']} | |
| - **Volatility**: {analysis['volatility']:.1f}% | |
| ## 🔍 Fundamental Metrics | |
| - **P/E Ratio**: {analysis['pe_ratio']:.1f if analysis['pe_ratio'] else 'N/A'} | |
| - **Forward P/E**: {analysis['forward_pe']:.1f if analysis['forward_pe'] else 'N/A'} | |
| - **ROE**: {analysis['roe']:.1f}% | |
| - **Profit Margin**: {analysis['profit_margin']:.1f}% | |
| - **Revenue Growth**: {analysis['revenue_growth']:.1f}% | |
| ## 📊 Technical Indicators | |
| - **RSI**: {analysis['technical_indicators'].get('rsi', 0):.1f} | |
| - **MACD**: {analysis['technical_indicators'].get('macd', 0):.3f} | |
| --- | |
| *Analysis Date: {analysis['analysis_date']}* | |
| """ | |
| # Create chart | |
| chart = create_stock_chart(symbol) | |
| # Create comparison data for table | |
| comparison_df = pd.DataFrame([{ | |
| 'Metric': 'Investment Score', | |
| 'Value': f"{analysis['investment_score']}/100", | |
| 'Interpretation': analysis['recommendation'] | |
| }, { | |
| 'Metric': 'YTD Return', | |
| 'Value': f"{analysis['ytd_return']:+.2f}%", | |
| 'Interpretation': 'Strong' if analysis['ytd_return'] > 10 else 'Moderate' if analysis['ytd_return'] > 0 else 'Weak' | |
| }, { | |
| 'Metric': 'Risk Level', | |
| 'Value': analysis['risk_level'], | |
| 'Interpretation': f"Volatility: {analysis['volatility']:.1f}%" | |
| }]) | |
| return analysis_text, chart, comparison_df | |
| except Exception as e: | |
| error_msg = f"Error analyzing {symbol}: {str(e)}" | |
| logger.error(error_msg) | |
| return error_msg, None, None | |
| def compare_multiple_stocks(symbols_input: str) -> tuple: | |
| """Compare multiple stocks""" | |
| if not symbols_input: | |
| return "Please enter stock symbols separated by commas", None, None | |
| try: | |
| symbols = [s.strip().upper() for s in symbols_input.split(',') if s.strip()] | |
| if len(symbols) < 2: | |
| return "Please enter at least 2 stock symbols for comparison", None, None | |
| comparisons = [] | |
| for symbol in symbols: | |
| analysis = analyzer.calculate_investment_score(symbol) | |
| if 'error' not in analysis: | |
| comparisons.append(analysis) | |
| if not comparisons: | |
| return "No valid stock data found for the provided symbols", None, None | |
| # Sort by investment score | |
| comparisons.sort(key=lambda x: x['investment_score'], reverse=True) | |
| # Create comparison text | |
| comparison_text = f"# 🏆 Stock Comparison Results\n\n" | |
| comparison_text += f"**Analysis of {len(comparisons)} stocks:**\n\n" | |
| for i, stock in enumerate(comparisons[:5]): # Top 5 | |
| rank_emoji = ["🥇", "🥈", "🥉", "4️⃣", "5️⃣"][i] | |
| comparison_text += f""" | |
| ## {rank_emoji} {stock['symbol']} - {stock['company_name']} | |
| - **Score**: {stock['investment_score']}/100 | |
| - **Recommendation**: {stock['recommendation']} | |
| - **YTD Return**: {stock['ytd_return']:+.2f}% | |
| - **Current Price**: ${stock['current_price']:.2f} | |
| - **Sector**: {stock['sector']} | |
| - **Risk Level**: {stock['risk_level']} | |
| """ | |
| # Create comparison DataFrame | |
| comparison_df = pd.DataFrame([{ | |
| 'Rank': i+1, | |
| 'Symbol': stock['symbol'], | |
| 'Company': stock['company_name'][:30] + '...' if len(stock['company_name']) > 30 else stock['company_name'], | |
| 'Score': stock['investment_score'], | |
| 'YTD Return %': f"{stock['ytd_return']:+.2f}", | |
| 'Price': f"${stock['current_price']:.2f}", | |
| 'Recommendation': stock['recommendation'], | |
| 'Sector': stock['sector'] | |
| } for i, stock in enumerate(comparisons)]) | |
| # Create comparison chart | |
| fig = go.Figure() | |
| fig.add_trace(go.Bar( | |
| x=[s['symbol'] for s in comparisons], | |
| y=[s['investment_score'] for s in comparisons], | |
| text=[f"{s['investment_score']}" for s in comparisons], | |
| textposition='auto', | |
| marker_color=['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd'][:len(comparisons)] | |
| )) | |
| fig.update_layout( | |
| title='Investment Score Comparison', | |
| xaxis_title='Stock Symbol', | |
| yaxis_title='Investment Score (0-100)', | |
| height=400 | |
| ) | |
| return comparison_text, fig, comparison_df | |
| except Exception as e: | |
| error_msg = f"Error comparing stocks: {str(e)}" | |
| logger.error(error_msg) | |
| return error_msg, None, None | |
| # Create Gradio Interface | |
| def create_gradio_app(): | |
| """Create the Gradio web interface""" | |
| with gr.Blocks(title="🚀 MCP Stock Analysis Agent", theme=gr.themes.Soft()) as app: | |
| gr.Markdown(""" | |
| # 🚀 Real MCP with Gradio Agent - Stock Analysis Platform | |
| Advanced stock analysis powered by MCP (Model Context Protocol) with comprehensive technical and fundamental analysis. | |
| ## Features: | |
| - 📊 Real-time stock data analysis | |
| - 🎯 AI-powered investment scoring | |
| - 📈 Technical indicator analysis | |
| - 🏆 Multi-stock comparison | |
| - 📉 Interactive charts and visualizations | |
| """) | |
| with gr.Tabs(): | |
| # Single Stock Analysis Tab | |
| with gr.Tab("📊 Single Stock Analysis"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| stock_input = gr.Textbox( | |
| label="Stock Symbol", | |
| placeholder="Enter symbol (e.g., AAPL, MSFT, GOOGL)", | |
| value="AAPL" | |
| ) | |
| analyze_btn = gr.Button("🔍 Analyze Stock", variant="primary") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| analysis_output = gr.Markdown(label="Analysis Results") | |
| with gr.Column(scale=1): | |
| metrics_table = gr.Dataframe( | |
| label="Key Metrics", | |
| headers=["Metric", "Value", "Interpretation"] | |
| ) | |
| stock_chart = gr.Plot(label="Stock Chart") | |
| # Stock Comparison Tab | |
| with gr.Tab("🏆 Stock Comparison"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| stocks_input = gr.Textbox( | |
| label="Stock Symbols (comma-separated)", | |
| placeholder="Enter symbols (e.g., AAPL, MSFT, GOOGL, TSLA)", | |
| value="AAPL, MSFT, GOOGL" | |
| ) | |
| compare_btn = gr.Button("🔍 Compare Stocks", variant="primary") | |
| comparison_output = gr.Markdown(label="Comparison Results") | |
| comparison_chart = gr.Plot(label="Comparison Chart") | |
| comparison_table = gr.Dataframe( | |
| label="Detailed Comparison", | |
| headers=["Rank", "Symbol", "Company", "Score", "YTD Return %", "Price", "Recommendation", "Sector"] | |
| ) | |
| # MCP Tools Tab | |
| with gr.Tab("🛠️ MCP Tools"): | |
| gr.Markdown(""" | |
| ## Available MCP Tools: | |
| 1. **get_stock_price** - Get current stock price and basic info | |
| 2. **analyze_stock_comprehensive** - Comprehensive analysis with scoring | |
| 3. **compare_stocks_ytd** - Compare multiple stocks for YTD performance | |
| 4. **get_market_sector_analysis** - Analyze stocks by sector | |
| These tools can be called programmatically via the MCP protocol. | |
| """) | |
| with gr.Row(): | |
| mcp_tool_select = gr.Dropdown( | |
| choices=["get_stock_price", "analyze_stock_comprehensive", "compare_stocks_ytd", "get_market_sector_analysis"], | |
| label="Select MCP Tool", | |
| value="get_stock_price" | |
| ) | |
| mcp_symbol_input = gr.Textbox( | |
| label="Symbol/Parameters", | |
| placeholder="AAPL or AAPL,MSFT,GOOGL for comparison", | |
| value="AAPL" | |
| ) | |
| mcp_execute_btn = gr.Button("⚡ Execute MCP Tool", variant="secondary") | |
| mcp_output = gr.JSON(label="MCP Tool Response") | |
| # Event handlers | |
| analyze_btn.click( | |
| fn=analyze_single_stock, | |
| inputs=[stock_input], | |
| outputs=[analysis_output, stock_chart, metrics_table] | |
| ) | |
| compare_btn.click( | |
| fn=compare_multiple_stocks, | |
| inputs=[stocks_input], | |
| outputs=[comparison_output, comparison_chart, comparison_table] | |
| ) | |
| def execute_mcp_tool(tool_name, params): | |
| """Execute MCP tool from Gradio interface""" | |
| try: | |
| if tool_name == "get_stock_price": | |
| arguments = {"symbol": params.strip()} | |
| elif tool_name == "analyze_stock_comprehensive": | |
| arguments = {"symbol": params.strip()} | |
| elif tool_name in ["compare_stocks_ytd", "get_market_sector_analysis"]: | |
| symbols = [s.strip() for s in params.split(',')] | |
| arguments = {"symbols": symbols} | |
| else: | |
| return {"error": f"Unknown tool: {tool_name}"} | |
| # Simulate MCP tool execution | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| result = loop.run_until_complete(handle_call_tool(tool_name, arguments)) | |
| loop.close() | |
| # Parse the result | |
| if result and len(result) > 0: | |
| response_text = result[0].text | |
| try: | |
| return json.loads(response_text) | |
| except json.JSONDecodeError: | |
| return {"response": response_text} | |
| else: | |
| return {"error": "No response from MCP tool"} | |
| except Exception as e: | |
| return {"error": f"Error executing MCP tool: {str(e)}"} | |
| mcp_execute_btn.click( | |
| fn=execute_mcp_tool, | |
| inputs=[mcp_tool_select, mcp_symbol_input], | |
| outputs=[mcp_output] | |
| ) | |
| # Add footer | |
| gr.Markdown(""" | |
| --- | |
| ### 🔧 Technical Details: | |
| - **MCP Protocol**: Model Context Protocol for tool integration | |
| - **Data Source**: Yahoo Finance API via yfinance | |
| - **Analysis Engine**: Custom investment scoring algorithm | |
| - **Visualization**: Plotly interactive charts | |
| - **Interface**: Gradio web framework | |
| *This platform provides educational analysis and should not be considered financial advice.* | |
| """) | |
| return app | |
| # MCP Server Runner | |
| async def run_mcp_server(): | |
| """Run the MCP server""" | |
| logger.info("Starting MCP Stock Analysis Server...") | |
| async with stdio_server() as (read_stream, write_stream): | |
| await server.run( | |
| read_stream, | |
| write_stream, | |
| InitializationOptions( | |
| server_name="stock-analysis-mcp", | |
| server_version="1.0.0", | |
| capabilities=server.get_capabilities() | |
| ) | |
| ) | |
| # Enhanced Portfolio Analysis | |
| class PortfolioAnalyzer: | |
| """Advanced portfolio analysis with risk metrics""" | |
| def __init__(self): | |
| self.analyzer = analyzer | |
| def calculate_portfolio_metrics(self, symbols: List[str], weights: List[float] = None) -> Dict: | |
| """Calculate comprehensive portfolio metrics""" | |
| try: | |
| if not weights: | |
| weights = [1.0 / len(symbols)] * len(symbols) # Equal weights | |
| portfolio_data = [] | |
| total_weight = sum(weights) | |
| weights = [w / total_weight for w in weights] # Normalize weights | |
| # Get data for all stocks | |
| returns_data = [] | |
| for symbol in symbols: | |
| data = self.analyzer.get_stock_data(symbol, "1y") | |
| if data is not None and not data.empty: | |
| returns = data['Close'].pct_change().dropna() | |
| returns_data.append(returns) | |
| # Individual stock analysis | |
| analysis = self.analyzer.calculate_investment_score(symbol) | |
| if 'error' not in analysis: | |
| portfolio_data.append(analysis) | |
| if not returns_data: | |
| return {'error': 'No valid data for portfolio analysis'} | |
| # Calculate portfolio returns | |
| portfolio_returns = pd.DataFrame(returns_data).T | |
| portfolio_returns.columns = symbols[:len(returns_data)] | |
| # Portfolio daily returns | |
| weighted_returns = (portfolio_returns * weights[:len(returns_data)]).sum(axis=1) | |
| # Portfolio metrics | |
| portfolio_return = weighted_returns.mean() * 252 * 100 # Annualized return | |
| portfolio_volatility = weighted_returns.std() * np.sqrt(252) * 100 # Annualized volatility | |
| sharpe_ratio = portfolio_return / portfolio_volatility if portfolio_volatility > 0 else 0 | |
| # Portfolio max drawdown | |
| cumulative_returns = (1 + weighted_returns).cumprod() | |
| rolling_max = cumulative_returns.expanding().max() | |
| drawdown = (cumulative_returns - rolling_max) / rolling_max | |
| max_drawdown = drawdown.min() * 100 | |
| # Risk metrics | |
| var_95 = np.percentile(weighted_returns, 5) * 100 # 5% VaR | |
| # Correlation matrix | |
| correlation_matrix = portfolio_returns.corr().to_dict() | |
| # Weighted portfolio score | |
| portfolio_score = sum(stock['investment_score'] * weight | |
| for stock, weight in zip(portfolio_data, weights[:len(portfolio_data)])) | |
| return { | |
| 'portfolio_return': portfolio_return, | |
| 'portfolio_volatility': portfolio_volatility, | |
| 'sharpe_ratio': sharpe_ratio, | |
| 'max_drawdown': max_drawdown, | |
| 'var_95': var_95, | |
| 'portfolio_score': portfolio_score, | |
| 'correlation_matrix': correlation_matrix, | |
| 'individual_stocks': portfolio_data, | |
| 'weights': dict(zip(symbols[:len(weights)], weights)), | |
| 'analysis_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in portfolio analysis: {e}") | |
| return {'error': f'Portfolio analysis error: {str(e)}'} | |
| # Enhanced Gradio Interface with Portfolio Analysis | |
| def create_enhanced_gradio_app(): | |
| """Create enhanced Gradio interface with portfolio analysis""" | |
| portfolio_analyzer = PortfolioAnalyzer() | |
| def analyze_portfolio(symbols_input: str, weights_input: str = "") -> tuple: | |
| """Analyze a portfolio of stocks""" | |
| try: | |
| if not symbols_input: | |
| return "Please enter stock symbols", None, None, None | |
| symbols = [s.strip().upper() for s in symbols_input.split(',') if s.strip()] | |
| # Parse weights if provided | |
| weights = None | |
| if weights_input.strip(): | |
| try: | |
| weights = [float(w.strip()) for w in weights_input.split(',')] | |
| if len(weights) != len(symbols): | |
| return "Number of weights must match number of symbols", None, None, None | |
| except ValueError: | |
| return "Invalid weights format. Use comma-separated numbers (e.g., 0.4, 0.3, 0.3)", None, None, None | |
| # Analyze portfolio | |
| portfolio_analysis = portfolio_analyzer.calculate_portfolio_metrics(symbols, weights) | |
| if 'error' in portfolio_analysis: | |
| return f"Error: {portfolio_analysis['error']}", None, None, None | |
| # Create analysis text | |
| analysis_text = f""" | |
| # 📊 Portfolio Analysis Results | |
| ## 🏦 Portfolio Overview | |
| - **Number of Holdings**: {len(symbols)} | |
| - **Portfolio Score**: {portfolio_analysis['portfolio_score']:.1f}/100 | |
| - **Analysis Date**: {portfolio_analysis['analysis_date']} | |
| ## 📈 Performance Metrics | |
| - **Expected Annual Return**: {portfolio_analysis['portfolio_return']:+.2f}% | |
| - **Annual Volatility**: {portfolio_analysis['portfolio_volatility']:.2f}% | |
| - **Sharpe Ratio**: {portfolio_analysis['sharpe_ratio']:.2f} | |
| - **Maximum Drawdown**: {portfolio_analysis['max_drawdown']:.2f}% | |
| - **Value at Risk (95%)**: {portfolio_analysis['var_95']:.2f}% | |
| ## 🏭 Portfolio Composition | |
| """ | |
| for symbol, weight in portfolio_analysis['weights'].items(): | |
| analysis_text += f"- **{symbol}**: {weight:.1%}\n" | |
| analysis_text += "\n## 📊 Individual Stock Performance\n" | |
| for stock in portfolio_analysis['individual_stocks']: | |
| weight = portfolio_analysis['weights'].get(stock['symbol'], 0) | |
| analysis_text += f""" | |
| ### {stock['symbol']} - {stock['company_name']} ({weight:.1%}) | |
| - **Score**: {stock['investment_score']}/100 | **YTD**: {stock['ytd_return']:+.2f}% | |
| - **Price**: ${stock['current_price']:.2f} | **Sector**: {stock['sector']} | |
| """ | |
| # Create portfolio composition chart | |
| fig_composition = go.Figure(data=[go.Pie( | |
| labels=list(portfolio_analysis['weights'].keys()), | |
| values=list(portfolio_analysis['weights'].values()), | |
| hole=0.3 | |
| )]) | |
| fig_composition.update_layout(title="Portfolio Composition", height=400) | |
| # Create performance comparison chart | |
| stocks_data = portfolio_analysis['individual_stocks'] | |
| fig_performance = go.Figure() | |
| fig_performance.add_trace(go.Bar( | |
| x=[s['symbol'] for s in stocks_data], | |
| y=[s['ytd_return'] for s in stocks_data], | |
| name='YTD Return %', | |
| text=[f"{s['ytd_return']:+.1f}%" for s in stocks_data], | |
| textposition='auto' | |
| )) | |
| fig_performance.update_layout( | |
| title='Individual Stock YTD Performance', | |
| xaxis_title='Stock Symbol', | |
| yaxis_title='YTD Return (%)', | |
| height=400 | |
| ) | |
| # Create portfolio metrics table | |
| metrics_df = pd.DataFrame([ | |
| {'Metric': 'Portfolio Score', 'Value': f"{portfolio_analysis['portfolio_score']:.1f}/100"}, | |
| {'Metric': 'Expected Return', 'Value': f"{portfolio_analysis['portfolio_return']:+.2f}%"}, | |
| {'Metric': 'Volatility', 'Value': f"{portfolio_analysis['portfolio_volatility']:.2f}%"}, | |
| {'Metric': 'Sharpe Ratio', 'Value': f"{portfolio_analysis['sharpe_ratio']:.2f}"}, | |
| {'Metric': 'Max Drawdown', 'Value': f"{portfolio_analysis['max_drawdown']:.2f}%"}, | |
| {'Metric': 'VaR (95%)', 'Value': f"{portfolio_analysis['var_95']:.2f}%"} | |
| ]) | |
| return analysis_text, fig_composition, fig_performance, metrics_df | |
| except Exception as e: | |
| error_msg = f"Error analyzing portfolio: {str(e)}" | |
| logger.error(error_msg) | |
| return error_msg, None, None, None | |
| with gr.Blocks(title="🚀 Advanced MCP Stock Analysis Agent", theme=gr.themes.Soft()) as app: | |
| gr.Markdown(""" | |
| # 🚀 Advanced MCP Stock Analysis Agent | |
| **Real Model Context Protocol (MCP) implementation with comprehensive stock analysis** | |
| Features: Real-time data • AI scoring • Technical analysis • Portfolio optimization • Risk metrics | |
| """) | |
| with gr.Tabs(): | |
| # Single Stock Analysis Tab | |
| with gr.Tab("📊 Stock Analysis"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| stock_input = gr.Textbox( | |
| label="📈 Stock Symbol", | |
| placeholder="AAPL, MSFT, GOOGL, etc.", | |
| value="AAPL" | |
| ) | |
| analyze_btn = gr.Button("🔍 Analyze Stock", variant="primary", size="lg") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| analysis_output = gr.Markdown() | |
| with gr.Column(scale=1): | |
| metrics_table = gr.Dataframe(label="📊 Key Metrics") | |
| stock_chart = gr.Plot(label="📈 Interactive Chart") | |
| # Portfolio Analysis Tab | |
| with gr.Tab("🏦 Portfolio Analysis"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| portfolio_symbols = gr.Textbox( | |
| label="📊 Portfolio Symbols (comma-separated)", | |
| placeholder="AAPL, MSFT, GOOGL, TSLA, NVDA", | |
| value="AAPL, MSFT, GOOGL" | |
| ) | |
| portfolio_weights = gr.Textbox( | |
| label="⚖️ Weights (optional, comma-separated)", | |
| placeholder="0.4, 0.3, 0.3 (leave empty for equal weights)", | |
| value="" | |
| ) | |
| portfolio_btn = gr.Button("🔍 Analyze Portfolio", variant="primary", size="lg") | |
| portfolio_output = gr.Markdown() | |
| with gr.Row(): | |
| portfolio_composition = gr.Plot(label="🥧 Portfolio Composition") | |
| portfolio_performance = gr.Plot(label="📊 Performance Comparison") | |
| portfolio_metrics = gr.Dataframe(label="📈 Portfolio Metrics") | |
| # Stock Comparison Tab | |
| with gr.Tab("🏆 Stock Comparison"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| stocks_input = gr.Textbox( | |
| label="🔍 Stock Symbols (comma-separated)", | |
| placeholder="AAPL, MSFT, GOOGL, TSLA, NVDA", | |
| value="AAPL, MSFT, GOOGL" | |
| ) | |
| compare_btn = gr.Button("⚡ Compare Stocks", variant="primary", size="lg") | |
| comparison_output = gr.Markdown() | |
| comparison_chart = gr.Plot(label="📊 Comparison Chart") | |
| comparison_table = gr.Dataframe(label="📋 Detailed Comparison") | |
| # MCP Server Tools Tab | |
| with gr.Tab("🛠️ MCP Server"): | |
| gr.Markdown(""" | |
| ## 🔧 MCP (Model Context Protocol) Tools | |
| This tab demonstrates the MCP server capabilities: | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| mcp_tool_select = gr.Dropdown( | |
| choices=[ | |
| "get_stock_price", | |
| "analyze_stock_comprehensive", | |
| "compare_stocks_ytd", | |
| "get_market_sector_analysis" | |
| ], | |
| label="🛠️ Select MCP Tool", | |
| value="analyze_stock_comprehensive" | |
| ) | |
| mcp_symbol_input = gr.Textbox( | |
| label="📊 Parameters", | |
| placeholder="AAPL or AAPL,MSFT,GOOGL", | |
| value="AAPL" | |
| ) | |
| mcp_execute_btn = gr.Button("⚡ Execute MCP Tool", variant="secondary") | |
| mcp_output = gr.JSON(label="📋 MCP Response") | |
| gr.Markdown(""" | |
| ### 📡 MCP Server Information: | |
| - **Server Name**: stock-analysis-mcp | |
| - **Version**: 1.0.0 | |
| - **Protocol**: stdio | |
| - **Tools**: 4 available tools for stock analysis | |
| """) | |
| # Event handlers | |
| analyze_btn.click( | |
| fn=analyze_single_stock, | |
| inputs=[stock_input], | |
| outputs=[analysis_output, stock_chart, metrics_table] | |
| ) | |
| portfolio_btn.click( | |
| fn=analyze_portfolio, | |
| inputs=[portfolio_symbols, portfolio_weights], | |
| outputs=[portfolio_output, portfolio_composition, portfolio_performance, portfolio_metrics] | |
| ) | |
| compare_btn.click( | |
| fn=compare_multiple_stocks, | |
| inputs=[stocks_input], | |
| outputs=[comparison_output, comparison_chart, comparison_table] | |
| ) | |
| def execute_mcp_tool(tool_name, params): | |
| """Execute MCP tool from Gradio interface""" | |
| try: | |
| if tool_name == "get_stock_price": | |
| arguments = {"symbol": params.strip()} | |
| elif tool_name == "analyze_stock_comprehensive": | |
| arguments = {"symbol": params.strip()} | |
| elif tool_name in ["compare_stocks_ytd", "get_market_sector_analysis"]: | |
| symbols = [s.strip() for s in params.split(',') if s.strip()] | |
| arguments = {"symbols": symbols} | |
| else: | |
| return {"error": f"Unknown tool: {tool_name}"} | |
| # Execute MCP tool | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| result = loop.run_until_complete(handle_call_tool(tool_name, arguments)) | |
| loop.close() | |
| # Parse the result | |
| if result and len(result) > 0: | |
| response_text = result[0].text | |
| try: | |
| parsed_result = json.loads(response_text) | |
| parsed_result["_mcp_tool"] = tool_name | |
| parsed_result["_execution_time"] = datetime.now().isoformat() | |
| return parsed_result | |
| except json.JSONDecodeError: | |
| return { | |
| "response": response_text, | |
| "_mcp_tool": tool_name, | |
| "_execution_time": datetime.now().isoformat() | |
| } | |
| else: | |
| return {"error": "No response from MCP tool"} | |
| except Exception as e: | |
| return { | |
| "error": f"Error executing MCP tool: {str(e)}", | |
| "_mcp_tool": tool_name, | |
| "_execution_time": datetime.now().isoformat() | |
| } | |
| mcp_execute_btn.click( | |
| fn=execute_mcp_tool, | |
| inputs=[mcp_tool_select, mcp_symbol_input], | |
| outputs=[mcp_output] | |
| ) | |
| # Footer | |
| gr.Markdown(""" | |
| --- | |
| ## 🚀 System Architecture | |
| **MCP Server**: Implements Model Context Protocol for tool integration | |
| **Analysis Engine**: Advanced scoring algorithm with 15+ metrics | |
| **Data Pipeline**: Real-time Yahoo Finance integration | |
| **Risk Engine**: Portfolio optimization and risk analytics | |
| **Visualization**: Interactive Plotly charts and dashboards | |
| *Educational platform - not financial advice. Always consult professionals.* | |
| """) | |
| return app | |
| # Main execution functions | |
| def main(): | |
| """Main function to run the application""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description="MCP Stock Analysis Agent") | |
| parser.add_argument("--mode", choices=["mcp", "gradio", "both"], default="both", | |
| help="Run mode: mcp (server only), gradio (web interface), or both") | |
| parser.add_argument("--port", type=int, default=7860, help="Gradio server port") | |
| parser.add_argument("--share", action="store_true", help="Share Gradio interface publicly") | |
| args = parser.parse_args() | |
| if args.mode == "mcp": | |
| # Run MCP server only | |
| asyncio.run(run_mcp_server()) | |
| elif args.mode == "gradio": | |
| # Run Gradio interface only | |
| app = create_enhanced_gradio_app() | |
| app.launch(server_port=args.port, share=args.share) | |
| else: | |
| # Run both MCP server and Gradio interface | |
| print("🚀 Starting MCP Stock Analysis Agent...") | |
| print("📊 MCP Server will run in background") | |
| print(f"🌐 Gradio Interface will be available at http://localhost:{args.port}") | |
| # Start Gradio interface (MCP server runs on-demand) | |
| app = create_enhanced_gradio_app() | |
| app.launch(server_port=args.port, share=args.share) | |
| if __name__ == "__main__": | |
| main() |