| | """Dummy Gradio entrypoint with layout similar to original RoboMME UI.""" |
| |
|
| | import logging |
| | import os |
| | import sys |
| |
|
| | |
| | os.environ["GRADIO_SSR_MODE"] = "false" |
| |
|
| | import gradio as gr |
| | import numpy as np |
| |
|
| |
|
| | PHASE_DEMO_VIDEO = "demo_video" |
| | PHASE_ACTION_KEYPOINT = "action_keypoint" |
| |
|
| | DUMMY_TASKS = [ |
| | "Peg Insertion Side", |
| | "Pick Cube and Place", |
| | "Stack Green Block", |
| | "Open Cabinet Door", |
| | ] |
| |
|
| | CSS = """ |
| | #loading_overlay_group { |
| | position: fixed !important; |
| | inset: 0 !important; |
| | z-index: 9999 !important; |
| | background: rgba(255, 255, 255, 0.92) !important; |
| | text-align: center !important; |
| | } |
| | |
| | #loading_overlay_group > div { |
| | min-height: 100%; |
| | display: flex; |
| | align-items: center; |
| | justify-content: center; |
| | } |
| | """ |
| |
|
| |
|
| | def _setup_logging() -> logging.Logger: |
| | """Configure terminal logging for runtime debugging.""" |
| | level_name = os.getenv("LOG_LEVEL", "INFO").upper() |
| | level = getattr(logging, level_name, logging.INFO) |
| | try: |
| | sys.stdout.reconfigure(line_buffering=True) |
| | except Exception: |
| | pass |
| | logging.basicConfig( |
| | level=level, |
| | format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", |
| | stream=sys.stdout, |
| | force=True, |
| | ) |
| | |
| | logging.getLogger("httpx").setLevel(logging.WARNING) |
| | logging.getLogger("uvicorn.access").setLevel(logging.INFO) |
| | logger = logging.getLogger("robomme.app") |
| | logger.info("Logging initialized with LOG_LEVEL=%s", level_name) |
| | return logger |
| |
|
| |
|
| | LOGGER = _setup_logging() |
| |
|
| |
|
| | def _task_goal(task_name: str) -> str: |
| | return f"Dummy goal for {task_name}" |
| |
|
| |
|
| | def _task_hint(task_name: str) -> str: |
| | return ( |
| | f"[DUMMY] Current task: {task_name}\n" |
| | "1) Select an action option.\n" |
| | "2) Click image when action needs coordinates.\n" |
| | "3) Press EXECUTE to simulate one step." |
| | ) |
| |
|
| |
|
| | def _task_actions(task_name: str) -> list[str]: |
| | return [ |
| | f"{task_name}: Move", |
| | f"{task_name}: Grasp", |
| | f"{task_name}: Place", |
| | "Reset Arm Pose", |
| | ] |
| |
|
| |
|
| | def _dummy_frame(task_name: str, step: int) -> np.ndarray: |
| | seed = sum(ord(ch) for ch in task_name) + step * 31 |
| | r = 80 + (seed % 120) |
| | g = 80 + ((seed * 3) % 120) |
| | b = 80 + ((seed * 7) % 120) |
| | frame = np.zeros((360, 640, 3), dtype=np.uint8) |
| | frame[:, :] = [r, g, b] |
| | frame[20:340, 20:620] = [min(r + 25, 255), min(g + 25, 255), min(b + 25, 255)] |
| | frame[170:190, 40:600] = [245, 245, 245] |
| | return frame |
| |
|
| |
|
| | def _build_task_updates(task_name: str, step: int, phase: str) -> tuple: |
| | in_video_phase = phase == PHASE_DEMO_VIDEO |
| | log_text = ( |
| | f"[DUMMY] Loaded task: {task_name}\n" |
| | + ("[DUMMY] Demo video phase. Click 'Skip Demo Video' to continue." if in_video_phase else "[DUMMY] Action phase ready.") |
| | ) |
| | return ( |
| | _task_goal(task_name), |
| | gr.update(choices=_task_actions(task_name), value=None), |
| | "", |
| | log_text, |
| | _task_hint(task_name), |
| | f"Episode progress: {step}/5 (dummy)", |
| | _dummy_frame(task_name, step), |
| | gr.update(visible=in_video_phase), |
| | gr.update(visible=not in_video_phase), |
| | gr.update(visible=not in_video_phase), |
| | phase, |
| | ) |
| |
|
| |
|
| | def create_dummy_demo() -> gr.Blocks: |
| | """Build a dummy app that mimics original layout without ManiSkill.""" |
| |
|
| | def on_task_change(task_name: str): |
| | LOGGER.debug("on_task_change(task_name=%s)", task_name) |
| | task = task_name if task_name in DUMMY_TASKS else DUMMY_TASKS[0] |
| | step = 0 |
| | return (step,) + _build_task_updates(task, step, PHASE_DEMO_VIDEO) |
| |
|
| | def skip_video(task_name: str, step: int): |
| | LOGGER.debug("skip_video(task_name=%s, step=%s)", task_name, step) |
| | task = task_name if task_name in DUMMY_TASKS else DUMMY_TASKS[0] |
| | return _build_task_updates(task, step, PHASE_ACTION_KEYPOINT) |
| |
|
| | def on_reference_action(task_name: str): |
| | LOGGER.debug("on_reference_action(task_name=%s)", task_name) |
| | actions = _task_actions(task_name) |
| | return ( |
| | gr.update(value=actions[0]), |
| | "[DUMMY] Filled with reference action.", |
| | ) |
| |
|
| | def on_map_click(evt: gr.SelectData): |
| | if evt is None or evt.index is None: |
| | LOGGER.debug("on_map_click received empty event") |
| | return "" |
| | x, y = evt.index |
| | LOGGER.debug("on_map_click(x=%s, y=%s)", x, y) |
| | return f"({x}, {y})" |
| |
|
| | def execute_step(task_name: str, action_name: str, coords_text: str, step: int): |
| | LOGGER.info( |
| | "execute_step(task_name=%s, action=%s, coords=%s, step=%s)", |
| | task_name, |
| | action_name, |
| | coords_text, |
| | step, |
| | ) |
| | task = task_name if task_name in DUMMY_TASKS else DUMMY_TASKS[0] |
| | next_step = min(step + 1, 5) |
| | action = action_name or "No action selected" |
| | coords = coords_text.strip() if coords_text else "N/A" |
| | log = ( |
| | f"[DUMMY] Execute step {next_step}/5\n" |
| | f"- task: {task}\n" |
| | f"- action: {action}\n" |
| | f"- coords: {coords}" |
| | ) |
| | progress = f"Episode progress: {next_step}/5 (dummy)" |
| | return next_step, _dummy_frame(task, next_step), log, progress |
| |
|
| | def restart_episode(task_name: str): |
| | LOGGER.info("restart_episode(task_name=%s)", task_name) |
| | task = task_name if task_name in DUMMY_TASKS else DUMMY_TASKS[0] |
| | step = 0 |
| | return (step,) + _build_task_updates(task, step, PHASE_DEMO_VIDEO) |
| |
|
| | def next_task(current_task: str): |
| | LOGGER.info("next_task(current_task=%s)", current_task) |
| | try: |
| | idx = DUMMY_TASKS.index(current_task) |
| | except ValueError: |
| | idx = 0 |
| | nxt = DUMMY_TASKS[(idx + 1) % len(DUMMY_TASKS)] |
| | step = 0 |
| | return (nxt, step) + _build_task_updates(nxt, step, PHASE_DEMO_VIDEO) |
| |
|
| | with gr.Blocks(title="Oracle Planner Interface") as demo: |
| | demo.theme = gr.themes.Soft() |
| | demo.css = CSS |
| |
|
| | step_state = gr.State(0) |
| | ui_phase_state = gr.State(PHASE_DEMO_VIDEO) |
| |
|
| | gr.Markdown("## RoboMME Human Evaluation", elem_id="header_title") |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | header_task_box = gr.Dropdown( |
| | choices=DUMMY_TASKS, |
| | value=DUMMY_TASKS[0], |
| | label="Current Task", |
| | show_label=True, |
| | interactive=True, |
| | elem_id="header_task", |
| | ) |
| | with gr.Column(scale=2): |
| | header_goal_box = gr.Textbox( |
| | value=_task_goal(DUMMY_TASKS[0]), |
| | label="Goal", |
| | show_label=True, |
| | interactive=False, |
| | lines=1, |
| | elem_id="header_goal", |
| | ) |
| |
|
| | with gr.Column(visible=False, elem_id="loading_overlay_group") as loading_overlay: |
| | gr.Markdown("### Logging in and setting up environment... Please wait.") |
| |
|
| | with gr.Column(visible=True, elem_id="main_interface_root") as main_interface: |
| | with gr.Row(elem_id="main_layout_row"): |
| | with gr.Column(scale=5): |
| | with gr.Column(elem_classes=["native-card"], elem_id="media_card"): |
| | with gr.Column(visible=True, elem_id="video_phase_group") as video_phase_group: |
| | video_display = gr.Video( |
| | label="Demonstration Video", |
| | interactive=False, |
| | elem_id="demo_video", |
| | autoplay=False, |
| | show_label=True, |
| | value=None, |
| | visible=True, |
| | ) |
| | skip_video_btn = gr.Button("Skip Demo Video", variant="secondary") |
| |
|
| | with gr.Column(visible=False, elem_id="action_phase_group") as action_phase_group: |
| | img_display = gr.Image( |
| | label="Keypoint Selection", |
| | interactive=False, |
| | type="numpy", |
| | elem_id="live_obs", |
| | show_label=True, |
| | buttons=[], |
| | sources=[], |
| | value=_dummy_frame(DUMMY_TASKS[0], 0), |
| | ) |
| |
|
| | with gr.Column(scale=4): |
| | with gr.Column(visible=False, elem_id="control_panel_group") as control_panel_group: |
| | with gr.Row(elem_id="right_top_row", equal_height=False): |
| | with gr.Column(scale=3, elem_id="right_action_col"): |
| | with gr.Column(elem_classes=["native-card"], elem_id="action_selection_card"): |
| | options_radio = gr.Radio( |
| | choices=_task_actions(DUMMY_TASKS[0]), |
| | label=" Action Selection", |
| | type="value", |
| | show_label=True, |
| | elem_id="action_radio", |
| | ) |
| | coords_box = gr.Textbox( |
| | label="Coords", |
| | value="", |
| | interactive=False, |
| | show_label=False, |
| | visible=False, |
| | elem_id="coords_box", |
| | ) |
| |
|
| | with gr.Column(scale=2, elem_id="right_log_col"): |
| | with gr.Column(elem_classes=["native-card"], elem_id="log_card"): |
| | log_output = gr.Textbox( |
| | value="[DUMMY] Demo video phase. Click 'Skip Demo Video' to continue.", |
| | lines=4, |
| | max_lines=None, |
| | show_label=True, |
| | interactive=False, |
| | elem_id="log_output", |
| | label="System Log", |
| | ) |
| |
|
| | with gr.Row(elem_id="action_buttons_row"): |
| | with gr.Column(elem_classes=["native-card", "native-button-card"], elem_id="exec_btn_card"): |
| | exec_btn = gr.Button("EXECUTE", variant="stop", size="lg", elem_id="exec_btn") |
| |
|
| | with gr.Column( |
| | elem_classes=["native-card", "native-button-card"], |
| | elem_id="reference_btn_card", |
| | ): |
| | reference_action_btn = gr.Button( |
| | "Ground Truth Action", |
| | variant="secondary", |
| | interactive=True, |
| | elem_id="reference_action_btn", |
| | ) |
| |
|
| | with gr.Column( |
| | elem_classes=["native-card", "native-button-card"], |
| | elem_id="restart_episode_btn_card", |
| | ): |
| | restart_episode_btn = gr.Button( |
| | "restart episode", |
| | variant="secondary", |
| | interactive=True, |
| | elem_id="restart_episode_btn", |
| | ) |
| |
|
| | with gr.Column( |
| | elem_classes=["native-card", "native-button-card"], |
| | elem_id="next_task_btn_card", |
| | ): |
| | next_task_btn = gr.Button( |
| | "change episode", |
| | variant="primary", |
| | interactive=True, |
| | elem_id="next_task_btn", |
| | ) |
| |
|
| | with gr.Accordion( |
| | "Task Hint", |
| | open=False, |
| | visible=True, |
| | elem_classes=["native-card"], |
| | elem_id="task_hint_card", |
| | ): |
| | task_hint_display = gr.Textbox( |
| | value=_task_hint(DUMMY_TASKS[0]), |
| | lines=8, |
| | max_lines=16, |
| | show_label=False, |
| | interactive=True, |
| | elem_id="task_hint_display", |
| | ) |
| |
|
| | progress_info_box = gr.Textbox(value="Episode progress: 0/5 (dummy)", visible=False) |
| |
|
| | header_task_box.change( |
| | fn=on_task_change, |
| | inputs=[header_task_box], |
| | outputs=[ |
| | step_state, |
| | header_goal_box, |
| | options_radio, |
| | coords_box, |
| | log_output, |
| | task_hint_display, |
| | progress_info_box, |
| | img_display, |
| | video_phase_group, |
| | action_phase_group, |
| | control_panel_group, |
| | ui_phase_state, |
| | ], |
| | ) |
| |
|
| | skip_video_btn.click( |
| | fn=skip_video, |
| | inputs=[header_task_box, step_state], |
| | outputs=[ |
| | header_goal_box, |
| | options_radio, |
| | coords_box, |
| | log_output, |
| | task_hint_display, |
| | progress_info_box, |
| | img_display, |
| | video_phase_group, |
| | action_phase_group, |
| | control_panel_group, |
| | ui_phase_state, |
| | ], |
| | ) |
| |
|
| | img_display.select( |
| | fn=on_map_click, |
| | outputs=[coords_box], |
| | ) |
| |
|
| | reference_action_btn.click( |
| | fn=on_reference_action, |
| | inputs=[header_task_box], |
| | outputs=[options_radio, log_output], |
| | ) |
| |
|
| | exec_btn.click( |
| | fn=execute_step, |
| | inputs=[header_task_box, options_radio, coords_box, step_state], |
| | outputs=[step_state, img_display, log_output, progress_info_box], |
| | ) |
| |
|
| | restart_episode_btn.click( |
| | fn=restart_episode, |
| | inputs=[header_task_box], |
| | outputs=[ |
| | step_state, |
| | header_goal_box, |
| | options_radio, |
| | coords_box, |
| | log_output, |
| | task_hint_display, |
| | progress_info_box, |
| | img_display, |
| | video_phase_group, |
| | action_phase_group, |
| | control_panel_group, |
| | ui_phase_state, |
| | ], |
| | ) |
| |
|
| | next_task_btn.click( |
| | fn=next_task, |
| | inputs=[header_task_box], |
| | outputs=[ |
| | header_task_box, |
| | step_state, |
| | header_goal_box, |
| | options_radio, |
| | coords_box, |
| | log_output, |
| | task_hint_display, |
| | progress_info_box, |
| | img_display, |
| | video_phase_group, |
| | action_phase_group, |
| | control_panel_group, |
| | ui_phase_state, |
| | ], |
| | ) |
| |
|
| | demo.load( |
| | fn=on_task_change, |
| | inputs=[header_task_box], |
| | outputs=[ |
| | step_state, |
| | header_goal_box, |
| | options_radio, |
| | coords_box, |
| | log_output, |
| | task_hint_display, |
| | progress_info_box, |
| | img_display, |
| | video_phase_group, |
| | action_phase_group, |
| | control_panel_group, |
| | ui_phase_state, |
| | ], |
| | ) |
| |
|
| | return demo |
| |
|
| |
|
| | demo = create_dummy_demo() |
| |
|
| | |
| | _original_launch = demo.launch |
| |
|
| |
|
| | def _patched_launch(**kwargs): |
| | kwargs.setdefault("ssr_mode", False) |
| | kwargs.setdefault("show_error", True) |
| | kwargs.setdefault("debug", True) |
| | kwargs.setdefault("quiet", False) |
| | LOGGER.info("Launching app with kwargs=%s", kwargs) |
| | return _original_launch(**kwargs) |
| |
|
| |
|
| | demo.launch = _patched_launch |
| |
|
| |
|
| | if __name__ == "__main__": |
| | LOGGER.info("Starting app.py entrypoint") |
| | demo.launch( |
| | server_name="0.0.0.0", |
| | server_port=int(os.getenv("PORT", "7860")), |
| | ssr_mode=False, |
| | show_error=True, |
| | debug=True, |
| | quiet=False, |
| | ) |
| |
|