Spaces:
Running
Running
| """HuggingFace Space: ghostloop control panel. | |
| Live URL: https://huggingface.co/spaces/Ghostgim/ghostloop-demo | |
| A no-code interface to ghostloop. Visitors pick a robot profile, then | |
| drive it via: | |
| - Per-primitive dispatch buttons (auto-generated from the profile's | |
| registry, no JSON typing required). | |
| - Virtual joystick (D-pad style) for mobile bases / quadrupeds / | |
| drones. | |
| - Free-form Intent dispatch (advanced: JSON args). | |
| - Live intervention controls: Pause, Resume, Emergency Stop. | |
| - Live trace pane that updates after every dispatch. | |
| - Live state pane showing the backend snapshot. | |
| The runtime is held per-session so the trace accumulates across | |
| button clicks. Switch profiles to reset. | |
| Backend is MockBackend so the Space runs on the free CPU tier without | |
| any sim install. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from typing import Any | |
| import gradio as gr | |
| from ghostloop import ( | |
| Intent, | |
| InterventionGate, | |
| InterventionState, | |
| LivePolicyController, | |
| PolicyPipeline, | |
| Runtime, | |
| ) | |
| from ghostloop.profiles import ( | |
| build_runtime_from_profile, | |
| franka_arm, | |
| humanoid_demo, | |
| spot_quadruped, | |
| stretch_mobile_arm, | |
| tello_drone, | |
| turtlebot_base, | |
| ) | |
| PRESETS = { | |
| "franka_arm (7-DOF arm)": franka_arm, | |
| "spot (Boston Dynamics quadruped)": spot_quadruped, | |
| "tello (quadcopter drone)": tello_drone, | |
| "stretch (mobile arm)": stretch_mobile_arm, | |
| "humanoid_demo (stationary humanoid)": humanoid_demo, | |
| "turtlebot (wheeled mobile base)": turtlebot_base, | |
| } | |
| DEFAULT_PROFILE = "franka_arm (7-DOF arm)" | |
| # --------------------------------------------------------------------------- | |
| # Per-session runtime state (persisted via gr.State). | |
| # --------------------------------------------------------------------------- | |
| def make_session(profile_label: str) -> dict[str, Any]: | |
| """Construct a fresh runtime session for a profile selection.""" | |
| factory = PRESETS[profile_label] | |
| profile = factory() | |
| runtime = build_runtime_from_profile(profile) | |
| controller = LivePolicyController( | |
| policy=lambda state: Intent("emit_event", {"kind": "noop"}), | |
| fallback_policy=lambda state: Intent( | |
| "stop" if "stop" in runtime.registry.names() else "emit_event", | |
| {"kind": "fallback"}, | |
| ), | |
| ) | |
| # Add the InterventionGate to the front of the pipeline so pause | |
| # affects future dispatches. | |
| runtime.policy_pipeline = PolicyPipeline( | |
| gates=[InterventionGate(controller=controller), *runtime.policy_pipeline.gates] | |
| ) | |
| return { | |
| "profile_label": profile_label, | |
| "profile": profile, | |
| "runtime": runtime, | |
| "controller": controller, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Default-args lookup so primitive buttons launch with sensible values. | |
| # --------------------------------------------------------------------------- | |
| DEFAULT_ARGS: dict[str, dict[str, Any]] = { | |
| # Arm primitives. | |
| "move_to": {"x": 0.4, "y": 0.0, "z": 0.5}, | |
| "scan": {"radius": 0.3}, | |
| "pick": {"object_id": "widget-7"}, | |
| "place": {}, | |
| "set_joint": {"joint_name": "shoulder", "angle": 0.5, "duration": 1.0}, | |
| "set_gripper": {"state": "open", "force": 0.0}, | |
| # Mobile base. | |
| "drive": {"linear_x": 0.2, "angular_z": 0.0}, | |
| "stop": {}, | |
| "goto": {"x": 1.0, "y": 0.0, "theta": 0.0}, | |
| "rotate": {"dtheta": 1.57}, | |
| # Quadruped. | |
| "sit": {}, | |
| "stand": {}, | |
| "lie_down": {}, | |
| "walk_to": {"x": 1.5, "y": 0.0, "theta": 0.0}, | |
| # Humanoid. | |
| "wave": {"hand": "right"}, | |
| "look_at": {"x": 1.0, "y": 0.0, "z": 1.5}, | |
| "point_at": {"x": 1.0, "y": 0.0, "z": 1.5}, | |
| "nod": {"direction": "yes"}, | |
| # Aerial. | |
| "takeoff": {"altitude": 1.0}, | |
| "land": {}, | |
| "fly_to": {"x": 1.0, "y": 0.0, "z": 1.5, "yaw": 0.0}, | |
| "hover": {"seconds": 2.0}, | |
| # Sensing. | |
| "sense": {"modality": "rgb"}, | |
| "scan_360": {}, | |
| "take_photo": {}, | |
| "read_battery": {}, | |
| "wait": {"seconds": 1.0}, | |
| "emit_event": {"kind": "note", "message": "demo event"}, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Render helpers. Keep all formatting in one place. | |
| # --------------------------------------------------------------------------- | |
| def _render_profile_summary(session: dict[str, Any]) -> str: | |
| profile = session["profile"] | |
| runtime = session["runtime"] | |
| return ( | |
| f"### `{profile.name}` (`{profile.morphology}`)\n\n" | |
| f"**Backend:** `{runtime.backend.name}` (mock) · " | |
| f"**Workspace:** `{profile.workspace_bounds}` · " | |
| f"**Max velocity:** `{profile.max_velocity}` m/s · " | |
| f"**HITL primitives:** `{profile.hitl_primitives}`" | |
| ) | |
| def _render_primitives_list(session: dict[str, Any]) -> str: | |
| runtime = session["runtime"] | |
| return "\n".join( | |
| f"- **`{name}`**: {runtime.registry.get(name).description}" | |
| for name in runtime.registry.names() | |
| ) | |
| def _render_gates_list(session: dict[str, Any]) -> str: | |
| runtime = session["runtime"] | |
| return "\n".join( | |
| f"- {g.__class__.__name__}" for g in runtime.policy_pipeline.gates | |
| ) | |
| def _render_state(session: dict[str, Any]) -> str: | |
| runtime = session["runtime"] | |
| return f"```json\n{json.dumps(runtime.backend.snapshot(), indent=2)}\n```" | |
| def _render_trace(session: dict[str, Any]) -> str: | |
| runtime = session["runtime"] | |
| if not runtime.trace.events: | |
| return "_(no events yet. Dispatch a primitive to see the trace.)_" | |
| rows = ["| step | intent | decision | gate | result | reason |", | |
| "|---:|---|:---:|---|:---:|---|"] | |
| for ev in runtime.trace.events[-12:]: | |
| decision_label = {"allow": "OK", "deny": "BLOCKED", "escalate": "WARN"}.get( | |
| ev.decision.action.value, "?" | |
| ) | |
| result_label = {"ok": "OK", "blocked": "BLOCKED", "error": "ERR"}.get( | |
| ev.result.status.value, "?" | |
| ) | |
| args_compact = json.dumps(ev.intent.args, separators=(",", ":")) | |
| if len(args_compact) > 40: | |
| args_compact = args_compact[:37] + "..." | |
| reason = (ev.decision.reason or ev.result.message or "")[:80] | |
| rows.append( | |
| f"| {ev.step} | `{ev.intent.name}` {args_compact} | " | |
| f"`{decision_label}` | `{ev.decision.gate_name or ''}` | " | |
| f"`{result_label}` | {reason} |" | |
| ) | |
| n = len(runtime.trace.events) | |
| if n > 12: | |
| rows.append(f"\n_showing last 12 of {n} events_") | |
| return "\n".join(rows) | |
| def _render_intervention_state(session: dict[str, Any]) -> str: | |
| state = session["controller"].state | |
| label = { | |
| InterventionState.RUNNING: "RUNNING", | |
| InterventionState.PAUSED: "PAUSED", | |
| InterventionState.SWAPPING: "SWAPPING", | |
| InterventionState.EMERGENCY_STOP: "STOPPED", | |
| }.get(state, "UNKNOWN") | |
| return f"### Intervention: `{state.value}` [{label}]" | |
| def _render_instructions(session: dict[str, Any]) -> str: | |
| return session["profile"].instructions or "(no instructions block)" | |
| def _all_outputs(session: dict[str, Any]) -> tuple: | |
| return ( | |
| _render_profile_summary(session), | |
| _render_primitives_list(session), | |
| _render_gates_list(session), | |
| _render_instructions(session), | |
| _render_state(session), | |
| _render_trace(session), | |
| _render_intervention_state(session), | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Action handlers. | |
| # --------------------------------------------------------------------------- | |
| def select_profile(profile_label: str): | |
| session = make_session(profile_label) | |
| names = session["runtime"].registry.names() | |
| # Up to 12 primitive buttons; pad with empty updates for the rest. | |
| btn_updates = [] | |
| for i in range(12): | |
| if i < len(names): | |
| btn_updates.append(gr.update(value=names[i], visible=True, interactive=True)) | |
| else: | |
| btn_updates.append(gr.update(visible=False)) | |
| return (session, *_all_outputs(session), *btn_updates) | |
| def dispatch_primitive(session: dict[str, Any], primitive_name: str): | |
| if not primitive_name: | |
| return session, *_all_outputs(session) | |
| args = dict(DEFAULT_ARGS.get(primitive_name, {})) | |
| session["runtime"].step(Intent(primitive_name, args)) | |
| return session, *_all_outputs(session) | |
| def dispatch_custom(session: dict[str, Any], primitive_name: str, args_json: str): | |
| if not primitive_name.strip(): | |
| return session, *_all_outputs(session) | |
| try: | |
| args = json.loads(args_json) if args_json.strip() else {} | |
| if not isinstance(args, dict): | |
| raise ValueError("args must be a JSON object") | |
| except (ValueError, json.JSONDecodeError): | |
| return session, *_all_outputs(session) | |
| if primitive_name not in session["runtime"].registry.names(): | |
| return session, *_all_outputs(session) | |
| session["runtime"].step(Intent(primitive_name, args)) | |
| return session, *_all_outputs(session) | |
| def dispatch_drive(session: dict[str, Any], linear_x: float, angular_z: float): | |
| """Joystick handler. Emits drive(linear_x, angular_z) for mobile / quad.""" | |
| runtime = session["runtime"] | |
| name = None | |
| if "drive" in runtime.registry.names(): | |
| name = "drive" | |
| args = {"linear_x": float(linear_x), "angular_z": float(angular_z)} | |
| elif "fly_to" in runtime.registry.names(): | |
| # Drone: interpret as relative flight. | |
| name = "fly_to" | |
| args = {"x": float(linear_x), "y": 0.0, "z": 1.0, "yaw": float(angular_z)} | |
| elif "walk_to" in runtime.registry.names(): | |
| name = "walk_to" | |
| args = {"x": float(linear_x), "y": 0.0, "theta": float(angular_z)} | |
| elif "move_to" in runtime.registry.names(): | |
| # Arm: interpret as a delta on x. | |
| name = "move_to" | |
| args = {"x": float(linear_x), "y": 0.0, "z": 0.5} | |
| if name is None: | |
| return session, *_all_outputs(session) | |
| runtime.step(Intent(name, args)) | |
| return session, *_all_outputs(session) | |
| def pause_runtime(session: dict[str, Any]): | |
| session["controller"].pause(operator="ui_visitor", reason="UI pause button") | |
| return session, *_all_outputs(session) | |
| def resume_runtime(session: dict[str, Any]): | |
| session["controller"].resume(operator="ui_visitor", reason="UI resume button") | |
| return session, *_all_outputs(session) | |
| def emergency_stop(session: dict[str, Any]): | |
| runtime = session["runtime"] | |
| stop_intent_name = ( | |
| "stop" if "stop" in runtime.registry.names() else | |
| "land" if "land" in runtime.registry.names() else | |
| "lie_down" if "lie_down" in runtime.registry.names() else | |
| "emit_event" | |
| ) | |
| session["controller"].emergency_stop( | |
| stop_intent=Intent(stop_intent_name, {}), | |
| operator="ui_visitor", reason="UI E-STOP button", | |
| ) | |
| return session, *_all_outputs(session) | |
| def clear_trace(session: dict[str, Any]): | |
| session["runtime"].trace.events.clear() | |
| return session, *_all_outputs(session) | |
| # --------------------------------------------------------------------------- | |
| # UI. | |
| # --------------------------------------------------------------------------- | |
| with gr.Blocks( | |
| title="ghostloop control panel", | |
| theme=gr.themes.Soft(primary_hue="teal"), | |
| css=""" | |
| .gl-pad-button { min-height: 56px; font-weight: 600; } | |
| .gl-estop button { background: #DC2626 !important; color: white !important; font-weight: 700; } | |
| .gl-pause button { background: #F59E0B !important; color: white !important; } | |
| .gl-resume button { background: #10B981 !important; color: white !important; } | |
| """, | |
| ) as demo: | |
| gr.Markdown(""" | |
| # ghostloop · control panel | |
| Pick a robot profile, then drive it through the safety pipeline. Every | |
| dispatch goes through `Geofence + ForceCap + ActionSmoothing + RateLimit | |
| + HITL`. Try sending a `move_to` outside the workspace and watch the | |
| geofence reject it. | |
| Sister to **[GhostLM](https://github.com/joemunene-by/GhostLM)** · `pip install ghostloop` · [GitHub](https://github.com/joemunene-by/ghostloop) · [PyPI](https://pypi.org/project/ghostloop/) | |
| """) | |
| session_state = gr.State() | |
| # ---------- Profile picker ---------- | |
| with gr.Row(): | |
| profile_dd = gr.Dropdown( | |
| label="Robot profile", | |
| choices=list(PRESETS.keys()), | |
| value=DEFAULT_PROFILE, | |
| scale=4, | |
| ) | |
| summary_md = gr.Markdown() | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Available primitives") | |
| primitives_md = gr.Markdown() | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Active safety gates") | |
| gates_md = gr.Markdown() | |
| with gr.Accordion("Robot instructions (LLM system prompt)", open=False): | |
| instructions_md = gr.Markdown() | |
| # ---------- Quick-dispatch buttons ---------- | |
| gr.Markdown("## Dispatch a primitive (one click, uses sensible defaults)") | |
| primitive_buttons: list[gr.Button] = [] | |
| with gr.Row(): | |
| for _ in range(6): | |
| primitive_buttons.append(gr.Button("", variant="primary", elem_classes="gl-pad-button")) | |
| with gr.Row(): | |
| for _ in range(6): | |
| primitive_buttons.append(gr.Button("", variant="primary", elem_classes="gl-pad-button")) | |
| # ---------- Joystick (D-pad) ---------- | |
| gr.Markdown("## Virtual joystick: drive / walk / fly / move") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| pass | |
| with gr.Column(scale=1): | |
| joy_forward = gr.Button("FORWARD", elem_classes="gl-pad-button") | |
| with gr.Column(scale=1): | |
| pass | |
| with gr.Row(): | |
| joy_left = gr.Button("LEFT", elem_classes="gl-pad-button") | |
| joy_stop = gr.Button("STOP", elem_classes="gl-pad-button") | |
| joy_right = gr.Button("RIGHT", elem_classes="gl-pad-button") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| pass | |
| with gr.Column(scale=1): | |
| joy_back = gr.Button("BACK", elem_classes="gl-pad-button") | |
| with gr.Column(scale=1): | |
| pass | |
| # ---------- Intervention controls ---------- | |
| gr.Markdown("## Live intervention") | |
| intervention_md = gr.Markdown() | |
| with gr.Row(): | |
| pause_btn = gr.Button("Pause", elem_classes="gl-pause") | |
| resume_btn = gr.Button("Resume", elem_classes="gl-resume") | |
| estop_btn = gr.Button("EMERGENCY STOP", elem_classes="gl-estop") | |
| # ---------- Live trace + state ---------- | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("## Live trace (most recent first)") | |
| trace_md = gr.Markdown() | |
| clear_trace_btn = gr.Button("Clear trace") | |
| with gr.Column(scale=1): | |
| gr.Markdown("## Current backend state") | |
| state_md = gr.Markdown() | |
| # ---------- Advanced: free-form Intent ---------- | |
| with gr.Accordion("Advanced: free-form Intent (JSON args)", open=False): | |
| with gr.Row(): | |
| adv_name = gr.Textbox(label="Primitive", value="move_to", scale=1) | |
| adv_args = gr.Textbox( | |
| label="args JSON", | |
| value='{"x": 0.4, "y": 0.0, "z": 0.5}', | |
| lines=2, scale=3, | |
| ) | |
| adv_btn = gr.Button("runtime.step(intent)", variant="secondary") | |
| # ---------- Try-this hints ---------- | |
| gr.Markdown(""" | |
| ### Try this: | |
| 1. Pick the **`franka_arm`** profile, then click **`move_to`**. It dispatches with `(0.4, 0, 0.5)` and lands inside the workspace (allowed). Now expand the **Advanced** accordion and dispatch `move_to` with `{"x": 5.0, "y": 0, "z": 0}`. The GeofenceGate rejects it. | |
| 2. Switch to **`spot`**, then drive with the joystick. Each press emits `walk_to(linear_x, 0, angular_z)` through the safety pipeline. | |
| 3. Hit **EMERGENCY STOP**. Try clicking another primitive: it gets denied. Hit **Resume** to recover. | |
| Every dispatch is recorded in the trace pane below. That's the same `TraceEvent` shape the library exports for replay, diff, query, energy ledger, and LLM-judge scoring. | |
| """) | |
| # ---------- Wiring ---------- | |
| profile_outputs = [ | |
| session_state, | |
| summary_md, primitives_md, gates_md, instructions_md, | |
| state_md, trace_md, intervention_md, | |
| *primitive_buttons, | |
| ] | |
| standard_outputs = [ | |
| session_state, | |
| summary_md, primitives_md, gates_md, instructions_md, | |
| state_md, trace_md, intervention_md, | |
| ] | |
| profile_dd.change(select_profile, inputs=[profile_dd], outputs=profile_outputs) | |
| demo.load(select_profile, inputs=[profile_dd], outputs=profile_outputs) | |
| for btn in primitive_buttons: | |
| btn.click( | |
| dispatch_primitive, inputs=[session_state, btn], outputs=standard_outputs, | |
| ) | |
| joy_forward.click( | |
| lambda s: dispatch_drive(s, 0.2, 0.0), | |
| inputs=[session_state], outputs=standard_outputs, | |
| ) | |
| joy_back.click( | |
| lambda s: dispatch_drive(s, -0.2, 0.0), | |
| inputs=[session_state], outputs=standard_outputs, | |
| ) | |
| joy_left.click( | |
| lambda s: dispatch_drive(s, 0.0, 0.5), | |
| inputs=[session_state], outputs=standard_outputs, | |
| ) | |
| joy_right.click( | |
| lambda s: dispatch_drive(s, 0.0, -0.5), | |
| inputs=[session_state], outputs=standard_outputs, | |
| ) | |
| joy_stop.click( | |
| lambda s: dispatch_drive(s, 0.0, 0.0), | |
| inputs=[session_state], outputs=standard_outputs, | |
| ) | |
| pause_btn.click(pause_runtime, inputs=[session_state], outputs=standard_outputs) | |
| resume_btn.click(resume_runtime, inputs=[session_state], outputs=standard_outputs) | |
| estop_btn.click(emergency_stop, inputs=[session_state], outputs=standard_outputs) | |
| clear_trace_btn.click(clear_trace, inputs=[session_state], outputs=standard_outputs) | |
| adv_btn.click( | |
| dispatch_custom, | |
| inputs=[session_state, adv_name, adv_args], | |
| outputs=standard_outputs, | |
| ) | |
| if __name__ == "__main__": | |
| import os | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", 7860)), | |
| show_error=True, | |
| ) | |