Spaces:
Sleeping
Sleeping
| """ | |
| app.py β Data Analyst Duo MCP (no OpenAI) Gradio Space | |
| Shows preview table, stats, corr, plus full JSON histories, with rule-based interpretation. | |
| """ | |
| import os | |
| import uuid | |
| import logging | |
| import datetime | |
| import pandas as pd | |
| import numpy as np | |
| import gradio as gr | |
| # βββ Logging ββββββββββββββββββββββββββββββββββββββββββββββ | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s:%(name)s: %(message)s" | |
| ) | |
| logger = logging.getLogger("DataAnalystDuo") | |
| # βββ MCP Core ββββββββββββββββββββββββββββββββββββββββββββββ | |
| class MCPMessage: | |
| def __init__(self, sender, message_type, content): | |
| self.id = str(uuid.uuid4()) | |
| self.sender = sender | |
| self.message_type = message_type | |
| self.content = content | |
| self.timestamp = datetime.datetime.now().isoformat() | |
| def to_dict(self): | |
| return { | |
| "id": self.id, | |
| "sender": self.sender, | |
| "message_type": self.message_type, | |
| "content": self.content, | |
| "timestamp": self.timestamp, | |
| } | |
| class MCPTool: | |
| def __init__(self, name, description, func): | |
| self.name = name | |
| self.description = description | |
| self.func = func | |
| def execute(self, params): | |
| return self.func(params) | |
| class MCPAgent: | |
| def __init__(self, name, description): | |
| self.name = name | |
| self.description = description | |
| self.tools = {} | |
| self.peers = {} | |
| self.queue = [] | |
| self.history = [] | |
| def register_tool(self, tool): | |
| self.tools[tool.name] = tool | |
| def connect(self, peer): | |
| self.peers[peer.name] = peer | |
| def send_message(self, to, mtype, content): | |
| if to not in self.peers: | |
| raise ValueError(f"Peer {to} not found") | |
| msg = MCPMessage(self.name, mtype, content) | |
| self.history.append({"type": "sent", "message": msg.to_dict()}) | |
| self.peers[to].receive(msg) | |
| logger.info(f"{self.name} β {to}: {mtype}") | |
| return msg.to_dict() | |
| def receive(self, msg): | |
| self.queue.append(msg) | |
| self.history.append({"type": "received", "message": msg.to_dict()}) | |
| logger.info(f"{self.name} received {msg.message_type} from {msg.sender}") | |
| def process(self): | |
| while self.queue: | |
| msg = self.queue.pop(0) | |
| self.handle_message(msg) | |
| def handle_message(self, message): | |
| raise NotImplementedError | |
| def get_history(self): | |
| return self.history | |
| # βββ ComputeAgent ββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ComputeAgent(MCPAgent): | |
| def __init__(self): | |
| super().__init__("ComputeAgent", "Loads & computes data") | |
| self.df = None | |
| self.register_tool(MCPTool("load_dataset", "Load CSV", self._load)) | |
| self.register_tool(MCPTool("compute_statistics", "Stats", self._stats)) | |
| self.register_tool(MCPTool("compute_correlation", "Corr", self._corr)) | |
| def _load(self, params): | |
| url = params.get("url", "").strip() | |
| if not url: | |
| url = "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/diamonds.csv" | |
| try: | |
| self.df = pd.read_csv(url) | |
| return { | |
| "status": "success", | |
| "rows": self.df.shape[0], | |
| "columns": list(self.df.columns), | |
| "preview": self.df.head(5).to_dict(orient="records") | |
| } | |
| except Exception as e: | |
| logger.exception("Load failed") | |
| return {"status": "error", "message": str(e)} | |
| def _stats(self, params): | |
| if self.df is None: | |
| return {"status": "error", "message": "No data loaded"} | |
| cols = self.df.select_dtypes(include=[np.number]).columns | |
| stats = self.df[cols].describe().to_dict() | |
| return {"status": "success", "statistics": stats} | |
| def _corr(self, params): | |
| if self.df is None: | |
| return {"status": "error", "message": "No data loaded"} | |
| cols = self.df.select_dtypes(include=[np.number]).columns | |
| corr = self.df[cols].corr().to_dict() | |
| return {"status": "success", "correlation_matrix": corr} | |
| def handle_message(self, m): | |
| if m.message_type == "request_data_load": | |
| res = self._load(m.content) | |
| self.send_message(m.sender, "data_load_result", res) | |
| elif m.message_type == "request_statistics": | |
| res = self._stats(m.content) | |
| self.send_message(m.sender, "statistics_result", res) | |
| elif m.message_type == "request_correlation": | |
| res = self._corr(m.content) | |
| self.send_message(m.sender, "correlation_result", res) | |
| # βββ InterpretAgent ββββββββββββββββββββββββββββββββββββββββββββββ | |
| class InterpretAgent(MCPAgent): | |
| def __init__(self): | |
| super().__init__("InterpretAgent", "Generates insights from stats & corr") | |
| self.data_info = None | |
| self.stats = None | |
| self.corr = None | |
| self.register_tool(MCPTool("interpret_statistics", "Rule-based stats insights", self._int_stats)) | |
| self.register_tool(MCPTool("interpret_correlation", "Rule-based corr insights", self._int_corr)) | |
| def _int_stats(self, params): | |
| stats = self.stats.get("statistics", {}) | |
| insights = [] | |
| # Pick top 3 columns by range (max-min) | |
| ranges = {col: vals.get("max", 0) - vals.get("min", 0) for col, vals in stats.items()} | |
| top3 = sorted(ranges, key=ranges.get, reverse=True)[:3] | |
| for col in top3: | |
| vals = stats[col] | |
| insights.append(f"{col}: mean={vals['mean']:.2f}, range=[{vals['min']:.2f}, {vals['max']:.2f}]") | |
| return {"status": "success", "insights": insights, "summary": "Top 3 columns by range"} | |
| def _int_corr(self, params): | |
| cm = self.corr.get("correlation_matrix", {}) | |
| pairs = [] | |
| for c1, row in cm.items(): | |
| for c2, val in row.items(): | |
| if c1 != c2: | |
| pairs.append((c1, c2, val)) | |
| # sort by absolute correlation | |
| top3 = sorted(pairs, key=lambda x: abs(x[2]), reverse=True)[:3] | |
| insights = [f"{c1} vs {c2}: corr={corr:.2f}" for c1, c2, corr in top3] | |
| return {"status": "success", "insights": insights, "summary": "Top 3 correlated pairs"} | |
| def handle_message(self, m): | |
| if m.message_type == "data_load_result": | |
| self.data_info = m.content | |
| self.send_message(m.sender, "ack", {"status": "loaded"}) | |
| elif m.message_type == "statistics_result": | |
| self.stats = m.content | |
| res = self.tools["interpret_statistics"].execute({}) | |
| self.send_message(m.sender, "statistics_interpretation", res) | |
| elif m.message_type == "correlation_result": | |
| self.corr = m.content | |
| res = self.tools["interpret_correlation"].execute({}) | |
| self.send_message(m.sender, "correlation_interpretation", res) | |
| elif m.message_type == "request_report": | |
| # assemble a simple markdown report | |
| report_md = "## Analysis Report\n" | |
| report_md += "### Stats Insights\n- " + "\n- ".join(self.tools["interpret_statistics"].execute({})["insights"]) + "\n" | |
| report_md += "### Corr Insights\n- " + "\n- ".join(self.tools["interpret_correlation"].execute({})["insights"]) + "\n" | |
| self.send_message(m.sender, "report_result", {"status": "success", "report_md": report_md}) | |
| # βββ Orchestration βββββββββββββββββββββββββββββββββββββββββββββ | |
| class DataAnalystDuo: | |
| def __init__(self): | |
| self.C = ComputeAgent() | |
| self.I = InterpretAgent() | |
| self.C.connect(self.I) | |
| self.I.connect(self.C) | |
| def run(self, url): | |
| self.I.send_message("ComputeAgent", "request_data_load", {"url": url}) | |
| self.C.process(); self.I.process() | |
| self.I.send_message("ComputeAgent", "request_statistics", {}) | |
| self.C.process(); self.I.process() | |
| self.I.send_message("ComputeAgent", "request_correlation", {}) | |
| self.C.process(); self.I.process() | |
| self.C.send_message("InterpretAgent", "request_report", {}) | |
| self.I.process(); self.C.process() | |
| hist_c = self.C.get_history() | |
| hist_i = self.I.get_history() | |
| load = next(m['message']['content'] for m in hist_c if m['message']['message_type']=='data_load_result') | |
| stats = next(m['message']['content'] for m in hist_c if m['message']['message_type']=='statistics_result') | |
| corr = next(m['message']['content'] for m in hist_c if m['message']['message_type']=='correlation_result') | |
| preview_df = pd.DataFrame(load.get('preview', [])) | |
| # extract latest report | |
| report = next(m['message']['content'] for m in hist_i if m['message']['message_type']=='report_result') | |
| return preview_df, stats, corr, hist_c, hist_i, report['report_md'] | |
| # βββ Gradio app βββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_analysis(url: str): | |
| return DataAnalystDuo().run(url) | |
| demo = gr.Interface( | |
| fn=run_analysis, | |
| inputs=[gr.Textbox(label="CSV URL", placeholder="https://...")], | |
| outputs=[ | |
| gr.Dataframe(label="Preview (first 5 rows)"), | |
| gr.JSON(label="Statistics"), | |
| gr.JSON(label="Correlation Matrix"), | |
| gr.JSON(label="Compute History"), | |
| gr.JSON(label="Interpret History"), | |
| gr.Markdown(label="Analysis Report") | |
| ], | |
| title="Data Analyst Duo", | |
| description="Paste any CSV URL (e.g. diamonds.csv) to see data + stats + insights + report" | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", 7860)), | |
| share=True | |
| ) | |