# main.py # 主入口文件,负责启动 Gradio UI import gradio as gr from config import SCENE_CONFIGS, MODEL_CHOICES, MODE_CHOICES, EPISODE_CONFIGS from backend_api import submit_to_backend, get_task_status, get_task_result from logging_utils import log_access, log_submission, is_request_allowed from simulation import stream_simulation_results, convert_to_h264, create_final_video_from_oss_images from ui_components import update_history_display, update_scene_display, update_episode_display, update_log_display, get_scene_instruction from oss_utils import download_oss_file, get_user_tmp_dir, cleanup_user_tmp_dir, oss_file_exists, clean_oss_result_path import os from datetime import datetime SESSION_TASKS = {} # 添加版本检查函数 def get_version_info(): """获取Python、Gradio相关包的版本信息""" import sys # Python版本 python_version = f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" # Gradio版本 try: import gradio gradio_version = gradio.__version__ except: gradio_version = "Unknown" # Gradio Client版本 try: import gradio_client gradio_client_version = gradio_client.__version__ except: gradio_client_version = "Not installed" return python_version, gradio_version, gradio_client_version def display_version_info(): """显示版本信息""" python_version, gradio_version, gradio_client_version = get_version_info() # 在控制台输出版本信息 print("=" * 50) print("📦 Package Versions:") print(f" Python: {python_version}") print(f" Gradio: {gradio_version}") print(f" Gradio Client: {gradio_client_version}") print("=" * 50) # 返回格式化的版本信息字符串 version_info = f"""**📦 Package Versions:** - **Python**: `{python_version}` - **Gradio**: `{gradio_version}` - **Gradio Client**: `{gradio_client_version}`""" return version_info def run_simulation(scene, episode, model, mode, prompt, history, request: gr.Request): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") scene_desc = SCENE_CONFIGS.get(scene, {}).get("description", scene) user_ip = request.client.host if request else "unknown" session_id = request.session_hash if not is_request_allowed(user_ip): log_submission(scene, prompt, model, user_ip, "IP blocked temporarily") raise gr.Error("Too many requests from this IP. Please wait and try again one minute later.") # 提交任务到后端 submission_result = submit_to_backend(scene, episode, prompt, mode, model, user_ip) if submission_result.get("status") != "pending": log_submission(scene, prompt, model, user_ip, "Submission failed") raise gr.Error(f"Submission failed: {submission_result.get('message', 'unknown issue')}") try: task_id = submission_result["task_id"] SESSION_TASKS[session_id] = task_id gr.Info(f"Simulation started, task_id: {task_id}") import time time.sleep(5) status = get_task_status(task_id) # OSS上的结果文件夹路径,不再检查本地路径是否存在 result_folder = clean_oss_result_path(status.get("result", f"gradio_demo/tasks/{task_id}"), task_id) except Exception as e: log_submission(scene, prompt, model, user_ip, str(e)) raise gr.Error(f"error occurred when parsing submission result from backend: {str(e)}") # 流式输出视频片段(从OSS读取) try: for video_path in stream_simulation_results(result_folder, task_id, request): if video_path: yield video_path, history except Exception as e: log_submission(scene, prompt, model, user_ip, str(e)) raise gr.Error(f"流式输出过程中出错: {str(e)}") # 获取最终任务状态 status = get_task_status(task_id) if status.get("status") == "completed": try: # 从OSS上的所有图片拼接成最终视频(6帧每秒) gr.Info("Creating final video from all OSS images...") video_path = create_final_video_from_oss_images(result_folder, task_id, request, fps=6) gr.Info(f"Final video created successfully with 6 fps!") except Exception as e: print(f"Error creating final video from OSS images: {e}") log_submission(scene, prompt, model, user_ip, f"Final video creation failed: {str(e)}") video_path = None new_entry = { "timestamp": timestamp, "scene": scene, "model": model, "mode": mode, "prompt": prompt, "video_path": video_path } updated_history = history + [new_entry] if len(updated_history) > 10: updated_history = updated_history[:10] log_submission(scene, prompt, model, user_ip, "success") gr.Info("Simulation completed successfully!") yield None, updated_history elif status.get("status") == "failed": log_submission(scene, prompt, model, user_ip, status.get('result', 'backend error')) raise gr.Error(f"任务执行失败: {status.get('result', 'backend 未知错误')}") yield None, history elif status.get("status") == "terminated": log_submission(scene, prompt, model, user_ip, "terminated") # 对于终止的任务,不再检查本地文件 yield None, history else: log_submission(scene, prompt, model, user_ip, "missing task's status from backend") raise gr.Error("missing task's status from backend") yield None, history def cleanup_session(request: gr.Request): session_id = request.session_hash task_id = SESSION_TASKS.pop(session_id, None) from config import BACKEND_URL import requests if task_id: try: requests.post(f"{BACKEND_URL}/predict/terminate/{task_id}", timeout=3) except Exception: pass # 清理用户临时目录 cleanup_user_tmp_dir(session_id) def record_access(request: gr.Request): user_ip = request.client.host if request else "unknown" user_agent = request.headers.get("user-agent", "unknown") log_access(user_ip, user_agent) return update_log_display() custom_css = """ #simulation-panel { border-radius: 8px; padding: 20px; background: #f9f9f9; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } #result-panel { border-radius: 8px; padding: 20px; background: #f0f8ff; } .dark #simulation-panel { background: #2a2a2a; } .dark #result-panel { background: #1a2a3a; } .history-container { max-height: 600px; overflow-y: auto; margin-top: 20px; } .history-accordion { margin-bottom: 10px; } .scene-preview { height: 400px; border: 1px solid #ddd; border-radius: 8px; overflow: hidden; } .version-info { background: #e8f4fd; border: 1px solid #bee5eb; border-radius: 6px; padding: 10px; margin: 10px 0; font-size: 0.9em; } .dark .version-info { background: #1a2a3a; border-color: #3a5a7a; color: #e0e0e0; } """ header_html = """
Institution Logo

🤖 InternNav Model Inference Demo

Model trained on InternNav framework

GitHub HuggingFace
""" with gr.Blocks(title="InternNav Model Inference Demo", css=custom_css) as demo: gr.HTML(header_html) # 添加版本信息显示 version_display = gr.Markdown( value="", elem_classes=["version-info"], visible=True ) history_state = gr.State([]) with gr.Row(): with gr.Column(elem_id="simulation-panel"): gr.Markdown("### Simulation Settings") with gr.Row(): scene_dropdown = gr.Dropdown( label="Choose a scene", choices=list(SCENE_CONFIGS.keys()), value="demo1", interactive=True ) episode_dropdown = gr.Dropdown( label="Select Start Position", choices=list(EPISODE_CONFIGS.keys()), value="episode_1", interactive=True ) with gr.Row(): scene_preview = gr.Model3D( elem_classes=["scene-preview"], camera_position=(90.0, 120, 20000.0) ) fps_preview = gr.Image(label="FPS Preview") scene_description = gr.Markdown("### Scene preview") prompt_input = gr.Textbox( label="Navigation Prompt", value="Walk past the left side of the bed and stop in the doorway.", placeholder="e.g.: 'Walk past the left side of the bed and stop in the doorway.'", lines=2, max_lines=4 ) model_dropdown = gr.Dropdown( label="Chose a pretrained model", choices=MODEL_CHOICES, value=MODEL_CHOICES[0], interactive=True ) mode_dropdown = gr.Dropdown( label="Select Mode", choices=MODE_CHOICES, value=MODE_CHOICES[0], interactive=True ) scene_dropdown.change( fn=lambda scene: [update_scene_display(scene)[0], update_scene_display(scene)[1], get_scene_instruction(scene)], inputs=scene_dropdown, outputs=[scene_description, scene_preview, prompt_input] ).then( update_episode_display, inputs=[scene_dropdown, episode_dropdown], outputs=[fps_preview] ) episode_dropdown.change( update_episode_display, inputs=[scene_dropdown, episode_dropdown], outputs=[fps_preview] ) submit_btn = gr.Button("Start Navigation Simulation", variant="primary") with gr.Column(elem_id="result-panel"): gr.Markdown("### Latest Simulation Result") video_output = gr.Video( label="Live", interactive=False, format="mp4", autoplay=True, streaming=True ) with gr.Column() as history_container: gr.Markdown("### History") gr.Markdown("#### History will be reset after refresh") history_slots = [] for i in range(10): with gr.Column(visible=False) as slot: with gr.Accordion(visible=False, open=False) as accordion: video = gr.Video(interactive=False) detail_md = gr.Markdown() history_slots.append((slot, accordion, video, detail_md)) gr.Examples( examples=[ ["demo1", '1', "rdp", "vlnPE", "Walk past the left side of the bed and stop in the doorway."], ["demo2", '1', "rdp", "vlnPE", "Walk through the bathroom, past the sink and toilet. Stop in front of the counter with the two suitcase."], ["demo3", '1', "rdp", "vlnPE", "Do a U-turn. Walk forward through the kitchen, heading to the black door. Walk out of the door and take a right onto the deck. Walk out on to the deck and stop."], ["demo4", '1', "rdp", "vlnPE", "Walk out of bathroom and stand on white bath mat."], ["demo5", '1', "rdp", "vlnPE", "Walk straight through the double wood doors, follow the red carpet straight to the next doorway and stop where the carpet splits off."] ], inputs=[scene_dropdown, episode_dropdown, model_dropdown, mode_dropdown, prompt_input], label="Navigation Task Examples" ) submit_btn.click( fn=run_simulation, inputs=[scene_dropdown, episode_dropdown, model_dropdown, mode_dropdown, prompt_input, history_state], outputs=[video_output, history_state], queue=True, api_name="run_simulation" ).then( fn=update_history_display, inputs=history_state, outputs=[comp for slot in history_slots for comp in slot], queue=True ) demo.load( fn=lambda: [display_version_info(), update_scene_display("demo1")[0], update_scene_display("demo1")[1]], outputs=[version_display, scene_description, scene_preview] ).then( fn=lambda: update_episode_display("demo1", "episode_1"), outputs=[fps_preview] ) demo.load( fn=record_access, inputs=None, outputs=None, queue=False ) demo.queue(default_concurrency_limit=8) demo.unload(fn=cleanup_session) if __name__ == "__main__": # 在启动时显示版本信息 display_version_info() demo.launch( server_name="0.0.0.0", server_port=7860, # Hugging Face Space默认端口 share=False, debug=False, # 生产环境建议关闭debug allowed_paths=["./assets", "./logs", "./tmp"] # 添加临时目录到允许路径 )