Spaces:
Paused
Paused
| import gradio as gr | |
| import networkx as nx | |
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| import random | |
| import copy | |
| import json | |
| import io | |
| # --- Local Imports (Must exist in your file system) --- | |
| import config | |
| from utils import calculate_metric | |
| import metric_visualizations | |
| # --- Constants --- | |
| # Metrics description for tooltips/help | |
| METRIC_DESCRIPTIONS = { | |
| "Density": "Ratio of actual connections to potential connections.", | |
| "Cyclomatic Number": "Number of fundamental independent loops.", | |
| "Global Efficiency": "Measure (0-1) of information flow ease.", | |
| "Supportive Gain": "Efficiency provided specifically by 'Soft' edges.", | |
| "Brittleness Ratio": "Balance of Supportive (Soft) vs Essential (Hard) edges.", | |
| "Critical Vulnerability": "Checks if 'Hard' skeleton is connected.", | |
| "Interdependence": "% of edges crossing between different agents.", | |
| "Total Cycles": "Total count of feedback loops.", | |
| "Modularity": "How well the system divides into isolated groups.", | |
| "Functional Redundancy": "Avg number of agents per function.", | |
| "Collaboration Ratio": "% of functions with shared authority." | |
| } | |
| # --- State Management Class --- | |
| class GraphState: | |
| """ | |
| Acts as the 'self' from your Tkinter app. | |
| Holds the graph, history, and configuration for the current session. | |
| """ | |
| def __init__(self): | |
| self.G = nx.DiGraph() | |
| self.pos = {} # Store display positions | |
| self.agents = config.DEFAULT_AGENTS.copy() | |
| self.undo_stack = [] | |
| self.redo_stack = [] | |
| self.saved_snapshots = {} # For comparison | |
| self.counter = 0 # Unique ID counter | |
| def save_to_history(self): | |
| """Push current state to undo stack""" | |
| # Deep copy the graph to ensure isolation | |
| self.undo_stack.append(self.G.copy()) | |
| if len(self.undo_stack) > 10: | |
| self.undo_stack.pop(0) | |
| self.redo_stack.clear() | |
| def get_node_choices(self): | |
| """Returns list of (Label, ID) tuples for Dropdowns""" | |
| return [(d.get('label', str(n)), n) for n, d in self.G.nodes(data=True)] | |
| # --- Global Initialization --- | |
| # In Gradio, we instantiate this once. For multi-user isolation, | |
| # you would pass this state through the function arguments, | |
| # but for a Space demo, a global instance often suffices for simplicity | |
| # unless you expect heavy concurrent traffic. | |
| session = GraphState() | |
| # --- Core Logic Functions --- | |
| def render_plot(vis_mode="None", edge_filter="ALL"): | |
| """ | |
| Replaces the Canvas drawing logic. | |
| Uses Matplotlib to render the JSAT layers and NetworkX graph. | |
| """ | |
| plt.figure(figsize=(12, 8)) | |
| ax = plt.gca() | |
| # 1. Draw Layer Backgrounds | |
| # We map config layers to Y-axis. Matplotlib Y increases upwards. | |
| for name, y_val in config.JSAT_LAYERS.items(): | |
| plt.axhline(y=y_val, color='#e0e0e0', linestyle='--', zorder=0) | |
| plt.text(50, y_val + 10, name, color='grey', fontsize=10, fontweight='bold', zorder=0) | |
| if session.G.number_of_nodes() == 0: | |
| plt.text(0.5, 0.5, "Graph is Empty.\nUse 'Editor' tab to add nodes.", | |
| ha='center', va='center', transform=ax.transAxes, color='grey') | |
| plt.axis('off') | |
| return plt.gcf() | |
| # 2. Filter Edges | |
| edges_to_draw = [] | |
| for u, v, d in session.G.edges(data=True): | |
| etype = d.get('type', config.EDGE_TYPE_HARD) | |
| if edge_filter == "ALL" or etype == edge_filter: | |
| edges_to_draw.append((u, v)) | |
| # 3. Draw Highlights (Visual Analytics) | |
| # This replaces the yellow overlay logic | |
| highlight_nodes = [] | |
| highlight_edges = [] | |
| if vis_mode == "Cycles": | |
| hl_data = metric_visualizations.get_cycle_highlights(session.G) | |
| for item in hl_data: | |
| highlight_nodes.extend(item.get('nodes', [])) | |
| highlight_edges.extend(item.get('edges', [])) | |
| elif vis_mode == "Interdependence": | |
| hl_data = metric_visualizations.get_interdependence_highlights(session.G) | |
| for item in hl_data: | |
| highlight_nodes.extend(item.get('nodes', [])) | |
| highlight_edges.extend(item.get('edges', [])) | |
| elif vis_mode == "Modularity": | |
| hl_data = metric_visualizations.get_modularity_highlights(session.G) | |
| for item in hl_data: | |
| highlight_nodes.extend(item.get('nodes', [])) | |
| highlight_edges.extend(item.get('edges', [])) | |
| pos = session.pos | |
| # Draw Highlights Underneath | |
| if highlight_nodes: | |
| nx.draw_networkx_nodes(session.G, pos, nodelist=highlight_nodes, node_color='yellow', node_size=900, alpha=0.5) | |
| if highlight_edges: | |
| nx.draw_networkx_edges(session.G, pos, edgelist=highlight_edges, edge_color='yellow', width=6, alpha=0.5) | |
| # 4. Draw Standard Edges | |
| hard_edges = [(u,v) for (u,v) in edges_to_draw if session.G.edges[u,v].get('type') == 'hard'] | |
| soft_edges = [(u,v) for (u,v) in edges_to_draw if session.G.edges[u,v].get('type') == 'soft'] | |
| nx.draw_networkx_edges(session.G, pos, edgelist=hard_edges, edge_color='black', width=2, arrowstyle='-|>') | |
| nx.draw_networkx_edges(session.G, pos, edgelist=soft_edges, edge_color='grey', width=2, style='dashed', arrowstyle='-|>') | |
| # 5. Draw Nodes (Shapes and Colors based on Type/Agent) | |
| for n, d in session.G.nodes(data=True): | |
| x, y = pos[n] | |
| lbl = d.get('label', str(n)) | |
| ntype = d.get('type', 'Function') | |
| agents = d.get('agent', ['Unassigned']) | |
| if isinstance(agents, str): agents = [agents] | |
| # Determine Color (Primary Agent) | |
| primary_agent = agents[0] | |
| color = session.agents.get(primary_agent, 'white') | |
| # Shape | |
| marker = 's' if ntype == 'Function' else 'o' | |
| # Manual scatter plot to handle mixed shapes/colors | |
| plt.scatter(x, y, s=600, c=color, marker=marker, edgecolors='black', linewidth=1.5, zorder=2) | |
| plt.text(x, y-40, lbl, ha='center', va='top', fontsize=9, fontweight='bold', zorder=3) | |
| plt.axis('off') | |
| plt.tight_layout() | |
| return plt.gcf() | |
| # --- Interaction Functions --- | |
| def add_node_fn(label, n_type, layer, agent): | |
| session.save_to_history() | |
| # Generate ID | |
| nid = session.counter | |
| session.counter += 1 | |
| # Calculate position | |
| y = config.JSAT_LAYERS.get(layer, 0) | |
| x = random.randint(100, 900) | |
| session.G.add_node(nid, label=label, type=n_type, layer=layer, | |
| agent=[agent], pos=(x, y)) | |
| session.pos[nid] = (x, y) | |
| return render_plot(), update_node_dropdown(), f"Added {label}" | |
| def add_edge_fn(u_id, v_id, e_type): | |
| if u_id is None or v_id is None: | |
| return render_plot(), update_node_dropdown(), "Error: Select nodes" | |
| # Enforce alternating type logic from Tkinter app | |
| t1 = session.G.nodes[u_id].get('type') | |
| t2 = session.G.nodes[v_id].get('type') | |
| if t1 == t2: | |
| return render_plot(), update_node_dropdown(), f"Error: Cannot connect {t1} to {t2}" | |
| session.save_to_history() | |
| session.G.add_edge(u_id, v_id, type=e_type) | |
| return render_plot(), update_node_dropdown(), "Connection Created" | |
| def delete_node_fn(u_id): | |
| if u_id is None: return render_plot(), update_node_dropdown(), "No node selected" | |
| session.save_to_history() | |
| session.G.remove_node(u_id) | |
| return render_plot(), update_node_dropdown(), "Node Deleted" | |
| def create_agent_fn(name, color): | |
| if not name: return "Name required", gr.update() | |
| session.agents[name] = color | |
| # Update choices for agent dropdowns | |
| return f"Created agent {name}", gr.Dropdown(choices=list(session.agents.keys())) | |
| def assign_agent_fn(node_id, agent_name): | |
| if node_id is None: return render_plot(), "Select a node" | |
| session.save_to_history() | |
| current = session.G.nodes[node_id].get('agent', []) | |
| if isinstance(current, str): current = [current] | |
| # Logic from Tkinter: Toggle agent | |
| if agent_name in current: | |
| current.remove(agent_name) | |
| else: | |
| if "Unassigned" in current: current.remove("Unassigned") | |
| current.append(agent_name) | |
| if not current: current = ["Unassigned"] | |
| session.G.nodes[node_id]['agent'] = current | |
| return render_plot(), f"Agents for node {node_id}: {current}" | |
| def update_node_dropdown(): | |
| # Helper to refresh dropdown options | |
| choices = session.get_node_choices() | |
| return gr.Dropdown(choices=choices) | |
| def calculate_stats_fn(): | |
| report = "### Network Statistics\n" | |
| metrics = [ | |
| "Density", "Cyclomatic Number", "Global Efficiency", | |
| "Supportive Gain", "Brittleness Ratio", "Interdependence", | |
| "Total Cycles", "Modularity" | |
| ] | |
| for m in metrics: | |
| try: | |
| val = calculate_metric(session.G, m) | |
| desc = METRIC_DESCRIPTIONS.get(m, "") | |
| report += f"**{m}**: {val}\n> *{desc}*\n\n" | |
| except Exception as e: | |
| report += f"**{m}**: Error ({str(e)})\n\n" | |
| return report | |
| def snapshot_fn(name): | |
| if not name: return "Enter a name", gr.update() | |
| session.saved_snapshots[name] = session.G.copy() | |
| return f"Saved snapshot: {name}", gr.update(choices=list(session.saved_snapshots.keys())) | |
| def compare_fn(selected_snapshots): | |
| if not selected_snapshots: return pd.DataFrame() | |
| data = [] | |
| metrics = ["Density", "Global Efficiency", "Total Cycles", "Modularity"] | |
| for name in selected_snapshots: | |
| g = session.saved_snapshots[name] | |
| row = {"Snapshot": name, "Nodes": g.number_of_nodes(), "Edges": g.number_of_edges()} | |
| for m in metrics: | |
| row[m] = calculate_metric(g, m) | |
| data.append(row) | |
| return pd.DataFrame(data) | |
| def export_json_fn(): | |
| # Mimic the Save JSON logic | |
| nodes_dict = {} | |
| agent_authorities = {name: [] for name in session.agents} | |
| for nid, d in session.G.nodes(data=True): | |
| lbl = d.get('label', f"Node_{nid}") | |
| layer = d.get('layer', "Base Environment").replace(" ", "") | |
| typ = d.get('type', "Function") | |
| nodes_dict[lbl] = {"Type": f"{layer}{typ}", "UserData": lbl} | |
| ag_list = d.get('agent', ["Unassigned"]) | |
| if not isinstance(ag_list, list): ag_list = [ag_list] | |
| for ag in ag_list: | |
| if ag in agent_authorities: agent_authorities[ag].append(lbl) | |
| edges_list = [] | |
| for u, v, d in session.G.edges(data=True): | |
| src_lbl = session.G.nodes[u].get('label', str(u)) | |
| tgt_lbl = session.G.nodes[v].get('label', str(v)) | |
| edges_list.append({ | |
| "Source": src_lbl, | |
| "Target": tgt_lbl, | |
| "UserData": {"type": d.get('type', config.EDGE_TYPE_HARD)} | |
| }) | |
| final = {"GraphData": { | |
| "Nodes": nodes_dict, | |
| "Edges": edges_list, | |
| "Agents": {name: {"Authority": auth} for name, auth in agent_authorities.items()} | |
| }} | |
| # Return as string for Textbox | |
| return json.dumps(final, indent=4) | |
| def load_json_fn(json_str): | |
| try: | |
| data = json.loads(json_str)["GraphData"] | |
| session.G.clear() | |
| session.pos = {} | |
| session.counter = 0 | |
| # Load Agents | |
| for ag_name, ag_data in data.get("Agents", {}).items(): | |
| if ag_name not in session.agents: | |
| session.agents[ag_name] = "#999999" # Default color if unknown | |
| # Load Nodes | |
| label_to_id = {} | |
| for lbl, props in data.get("Nodes", {}).items(): | |
| combined_type = props.get("Type", "") | |
| # Basic Parse logic | |
| ntype = "Function" if "Function" in combined_type else "Resource" | |
| # Find layer | |
| layer = "Distributed Work" | |
| for l in config.LAYER_ORDER: | |
| if l.replace(" ","") in combined_type: | |
| layer = l | |
| break | |
| nid = session.counter | |
| session.counter += 1 | |
| # Position logic | |
| y = config.JSAT_LAYERS.get(layer, 0) | |
| x = random.randint(100, 900) | |
| session.G.add_node(nid, label=lbl, type=ntype, layer=layer, agent=["Unassigned"], pos=(x,y)) | |
| session.pos[nid] = (x,y) | |
| label_to_id[lbl] = nid | |
| # Load Edges | |
| for e in data.get("Edges", []): | |
| u = label_to_id.get(e["Source"]) | |
| v = label_to_id.get(e["Target"]) | |
| if u is not None and v is not None: | |
| etype = e.get("UserData", {}).get("type", "hard") | |
| session.G.add_edge(u, v, type=etype) | |
| return render_plot(), update_node_dropdown(), "JSON Loaded Successfully" | |
| except Exception as e: | |
| return render_plot(), update_node_dropdown(), f"Error loading JSON: {str(e)}" | |
| # --- Layout Construction --- | |
| with gr.Blocks(title="Interactive JSAT", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# πΈοΈ Interactive JSAT Graph Builder") | |
| with gr.Row(): | |
| # LEFT COLUMN: Visualization | |
| with gr.Column(scale=2): | |
| plot_output = gr.Plot(label="Network Architecture") | |
| log_output = gr.Textbox(label="System Log", value="Ready.", interactive=False) | |
| with gr.Row(): | |
| vis_mode = gr.Radio(["None", "Cycles", "Interdependence", "Modularity"], label="Visual Analytics Overlay", value="None") | |
| edge_filter = gr.Radio(["ALL", "hard", "soft"], label="Show Edges", value="ALL") | |
| # RIGHT COLUMN: Controls (Tabs) | |
| with gr.Column(scale=1): | |
| # --- TAB 1: EDITOR --- | |
| with gr.Tab("π Editor"): | |
| gr.Markdown("### Add Node") | |
| with gr.Row(): | |
| n_lbl = gr.Textbox(label="Label", placeholder="F1") | |
| n_type = gr.Dropdown(["Function", "Resource"], label="Type", value="Function") | |
| with gr.Row(): | |
| n_layer = gr.Dropdown(config.LAYER_ORDER, label="Layer", value="Distributed Work") | |
| n_agent = gr.Dropdown(list(session.agents.keys()), label="Initial Agent", value="Unassigned") | |
| btn_add_n = gr.Button("β Create Node", variant="primary") | |
| gr.Markdown("### Connections") | |
| with gr.Row(): | |
| # These dropdowns update dynamically | |
| src_drop = gr.Dropdown(label="Source", choices=[]) | |
| tgt_drop = gr.Dropdown(label="Target", choices=[]) | |
| e_type = gr.Radio(["hard", "soft"], label="Constraint", value="hard") | |
| btn_add_e = gr.Button("π Connect", variant="secondary") | |
| gr.Markdown("### Management") | |
| del_node_drop = gr.Dropdown(label="Select Node to Delete", choices=[]) | |
| btn_del = gr.Button("ποΈ Delete Node", variant="stop") | |
| # --- TAB 2: AGENTS --- | |
| with gr.Tab("π₯ Agents"): | |
| gr.Markdown("### Create New Agent") | |
| with gr.Row(): | |
| new_ag_name = gr.Textbox(label="Name") | |
| new_ag_col = gr.ColorPicker(label="Color", value="#00ff00") | |
| btn_create_ag = gr.Button("Save Agent") | |
| gr.Markdown("### Assign to Node") | |
| with gr.Row(): | |
| node_assign_drop = gr.Dropdown(label="Node", choices=[]) | |
| agent_assign_drop = gr.Dropdown(label="Agent", choices=list(session.agents.keys())) | |
| btn_assign = gr.Button("Toggle Assignment") | |
| # --- TAB 3: ANALYTICS --- | |
| with gr.Tab("π Analytics"): | |
| stats_box = gr.Markdown("Click 'Calculate' to see metrics...") | |
| btn_stats = gr.Button("Calculate Metrics") | |
| # --- TAB 4: COMPARE --- | |
| with gr.Tab("βοΈ Compare"): | |
| snap_name = gr.Textbox(label="Snapshot Name") | |
| btn_snap = gr.Button("Save Snapshot") | |
| snap_select = gr.CheckboxGroup(label="Select Snapshots to Compare", choices=[]) | |
| btn_compare = gr.Button("Generate Comparison Table") | |
| compare_table = gr.Dataframe(label="Comparison Matrix") | |
| # --- TAB 5: I/O --- | |
| with gr.Tab("πΎ I/O"): | |
| btn_export = gr.Button("Generate JSON") | |
| json_out = gr.Textbox(label="JSON Output", lines=5, show_copy_button=True) | |
| gr.Markdown("---") | |
| json_in = gr.Textbox(label="Paste JSON Here", lines=5) | |
| btn_import = gr.Button("Load from JSON") | |
| # --- Event Wiring --- | |
| # Initialization | |
| demo.load(render_plot, None, plot_output) | |
| demo.load(update_node_dropdown, None, src_drop) | |
| demo.load(update_node_dropdown, None, tgt_drop) | |
| demo.load(update_node_dropdown, None, del_node_drop) | |
| demo.load(update_node_dropdown, None, node_assign_drop) | |
| # Visualization Triggers | |
| vis_mode.change(render_plot, [vis_mode, edge_filter], plot_output) | |
| edge_filter.change(render_plot, [vis_mode, edge_filter], plot_output) | |
| # Editor Actions | |
| btn_add_n.click(add_node_fn, [n_lbl, n_type, n_layer, n_agent], [plot_output, src_drop, log_output]) \ | |
| .then(update_node_dropdown, None, tgt_drop) \ | |
| .then(update_node_dropdown, None, del_node_drop) \ | |
| .then(update_node_dropdown, None, node_assign_drop) | |
| btn_add_e.click(add_edge_fn, [src_drop, tgt_drop, e_type], [plot_output, src_drop, log_output]) | |
| btn_del.click(delete_node_fn, [del_node_drop], [plot_output, del_node_drop, log_output]) \ | |
| .then(update_node_dropdown, None, src_drop) \ | |
| .then(update_node_dropdown, None, tgt_drop) | |
| # Agent Actions | |
| btn_create_ag.click(create_agent_fn, [new_ag_name, new_ag_col], [log_output, n_agent]) \ | |
| .then(lambda: gr.Dropdown(choices=list(session.agents.keys())), None, agent_assign_drop) | |
| btn_assign.click(assign_agent_fn, [node_assign_drop, agent_assign_drop], [plot_output, log_output]) | |
| # Analytics | |
| btn_stats.click(calculate_stats_fn, None, stats_box) | |
| # Comparison | |
| btn_snap.click(snapshot_fn, snap_name, [log_output, snap_select]) | |
| btn_compare.click(compare_fn, snap_select, compare_table) | |
| # I/O | |
| btn_export.click(export_json_fn, None, json_out) | |
| btn_import.click(load_json_fn, json_in, [plot_output, src_drop, log_output]) \ | |
| .then(update_node_dropdown, None, tgt_drop) \ | |
| .then(update_node_dropdown, None, del_node_drop) | |
| if __name__ == "__main__": | |
| demo.launch() |