import json import logging import uuid import asyncio from typing import Optional import gradio as gr from api import request_sse_stream_parsed, stop_chat from utils import contains_chinese, replace_chinese_punctuation logger = logging.getLogger(__name__) running_tasks = {} from typing import Optional # ========================= Gradio Integration ========================= def _init_render_state(): return { "agent_order": [], "agents": {}, # agent_id -> {"agent_name": str, "tool_call_order": [], "tools": {tool_call_id: {...}}} "current_agent_id": None, "errors": [], } def _append_show_text(tool_entry: dict, delta: str): existing = tool_entry.get("content", "") tool_entry["content"] = existing + delta def _is_empty_payload(value) -> bool: if value is None: return True if isinstance(value, str): stripped = value.strip() return stripped == "" or stripped in ("{}", "[]") if isinstance(value, (dict, list, tuple, set)): return len(value) == 0 return False def _render_markdown(state: dict) -> str: lines = [] emoji_cycle = ["๐Ÿง ", "๐Ÿ”Ž", "๐Ÿ› ๏ธ", "๐Ÿ“š", "๐Ÿค–", "๐Ÿงช", "๐Ÿ“", "๐Ÿงญ", "โš™๏ธ", "๐Ÿงฎ"] # Render errors first if any if state.get("errors"): lines.append("### โŒ Errors") for idx, err in enumerate(state["errors"], start=1): lines.append(f"- **Error {idx}**: {err}") lines.append("\n---\n") for idx, agent_id in enumerate(state.get("agent_order", [])): agent = state["agents"].get(agent_id, {}) agent_name = agent.get("agent_name", "unknown") emoji = emoji_cycle[idx % len(emoji_cycle)] lines.append(f"### {emoji} Agent: {agent_name}") for call_id in agent.get("tool_call_order", []): call = agent["tools"].get(call_id, {}) tool_name = call.get("tool_name", "unknown_tool") if tool_name in ("show_text", "message"): content = call.get("content", "") if content: lines.append(content) else: tool_input = call.get("input") tool_output = call.get("output") has_input = not _is_empty_payload(tool_input) has_output = not _is_empty_payload(tool_output) if not has_input and not has_output: # No parameters, only show tool name with emoji on separate line if tool_name == "Partial Summary": lines.append("\n๐Ÿ’กPartial Summary\n") else: lines.append(f"\n๐Ÿ”ง{tool_name}\n") else: # Show as collapsible details for any tool with input or output if tool_name == "Partial Summary": summary = f"๐Ÿ’ก{tool_name} ({call_id[:8]})" else: summary = f"๐Ÿ”ง{tool_name} ({call_id[:8]})" lines.append(f"\n
{summary}") if has_input: pretty = json.dumps(tool_input, ensure_ascii=False, indent=2) lines.append("\n**Input**:\n") lines.append(f"```json\n{pretty}\n```") if has_output: pretty = json.dumps(tool_output, ensure_ascii=False, indent=2) lines.append("\n**Output**:\n") lines.append(f"```json\n{pretty}\n```") lines.append("
\n") lines.append("\n---\n") return "\n".join(lines) if lines else "Waiting..." def _update_state_with_event(ui_state: dict, state: dict, message: dict): event = message.get("event") data = message.get("data", {}) if event == "job_started": chat_id = data.get("chat_id") state["chat_id"] = chat_id ui_state["chat_id"] = chat_id elif event == "start_of_agent": agent_id = data.get("agent_id") agent_name = data.get("agent_name", "unknown") if agent_id and agent_id not in state["agents"]: state["agents"][agent_id] = { "agent_name": agent_name, "tool_call_order": [], "tools": {} } state["agent_order"].append(agent_id) state["current_agent_id"] = agent_id elif event == "end_of_agent": # End marker, no special handling needed, keep structure state["current_agent_id"] = None elif event == "tool_call": tool_call_id = data.get("tool_call_id") tool_name = data.get("tool_name", "unknown_tool") agent_id = state.get("current_agent_id") or (state["agent_order"][-1] if state["agent_order"] else None) if not agent_id: return state agent = state["agents"].setdefault(agent_id, {"agent_name": "unknown", "tool_call_order": [], "tools": {}}) tools = agent["tools"] if tool_call_id not in tools: tools[tool_call_id] = {"tool_name": tool_name} agent["tool_call_order"].append(tool_call_id) entry = tools[tool_call_id] if tool_name == "show_text" and "delta_input" in data: delta = data.get("delta_input", {}).get("text", "") _append_show_text(entry, delta) elif tool_name == "show_text" and "tool_input" in data: ti = data.get("tool_input") text = "" if isinstance(ti,dict): text = ti.get("text", "") or ((ti.get('result') or {}).get("text") if isinstance(ti.get('result'),dict) else "") elif isinstance(ti,str): text = ti if text: _append_show_text(entry, text) else: # Distinguish between input and output: if "tool_input" in data: # Could be input (first time) or output with result (second time) ti = data["tool_input"] # If contains result, assign to output; otherwise assign to input if isinstance(ti, dict) and "result" in ti: entry["output"] = ti else: # Only update input if we don't already have valid input data, or if the new data is not empty if "input" not in entry or not _is_empty_payload(ti): entry["input"] = ti elif event == "message": # Same incremental text display as show_text, aggregated by message_id message_id = data.get("message_id") agent_id = state.get("current_agent_id") or (state["agent_order"][-1] if state["agent_order"] else None) if not agent_id: return state agent = state["agents"].setdefault(agent_id, {"agent_name": "unknown", "tool_call_order": [], "tools": {}}) tools = agent["tools"] if message_id not in tools: tools[message_id] = {"tool_name": "message"} agent["tool_call_order"].append(message_id) entry = tools[message_id] delta_content = (data.get("delta") or {}).get("content", "") if isinstance(delta_content, str) and delta_content: _append_show_text(entry, delta_content) elif event == "error": # Collect errors, display uniformly during rendering err_text = data.get("error") if isinstance(data, dict) else None if not err_text: try: err_text = json.dumps(data, ensure_ascii=False) except Exception: err_text = str(data) state.setdefault("errors", []).append(err_text) else: # Ignore heartbeat or other events pass return state def _spinner_markup(running: bool) -> str: if not running: return "" return ( "\n\n
" "
" "Generating..." "
\n\n" ) async def gradio_run(query: str, ui_state: Optional[dict]): query = replace_chinese_punctuation(query or "") if contains_chinese(query): warning_html = ( "
" "
๐Ÿ’ก Notice
" "
We only support English input for the time being. " "Please translate your question to English and try again.
" "
" ) yield ( warning_html, gr.update(interactive=True), gr.update(interactive=False), ui_state or {"chat_id": None} ) return state = _init_render_state() task_id = str(uuid.uuid4()) if ui_state is None: ui_state = {"chat_id": None} ui_state["task_id"] = task_id # Initial: disable Run, enable Stop, and show spinner at bottom of text yield ( _render_markdown(state) + _spinner_markup(True), gr.update(interactive=False), gr.update(interactive=True), ui_state ) try: current_task = asyncio.current_task() running_tasks[task_id] = current_task async for message in request_sse_stream_parsed(query): if current_task.cancelled(): break state = _update_state_with_event(ui_state, state, message) md = _render_markdown(state) yield ( md + _spinner_markup(True), gr.update(interactive=False), gr.update(interactive=True), ui_state ) except asyncio.CancelledError: cancelled_html = ( "
" "
๐Ÿ›‘ Task Cancelled
" "
The current task has been cancelled successfully.
" "
" ) existing_content = _render_markdown(state) final_content = cancelled_html + existing_content if existing_content and existing_content != "Waiting..." else cancelled_html yield ( final_content, gr.update(interactive=True), gr.update(interactive=False), ui_state ) return finally: if task_id in running_tasks: del running_tasks[task_id] # End: enable Run, disable Stop, remove spinner yield ( _render_markdown(state), gr.update(interactive=True), gr.update(interactive=False), ui_state ) async def stop_current(ui_state: Optional[dict]): if ui_state is None: ui_state = {} task_id = ui_state.get("task_id") if task_id and task_id in running_tasks: task = running_tasks[task_id] if task and not task.done(): task.cancel() logger.info(f"Task has been cancelled: {task_id}") chat_id = ui_state.get("chat_id") if chat_id: try: res = await stop_chat(chat_id) logger.info(f"Chat has been stopped: {chat_id}, res: {res}") except Exception as e: logger.error(f"Stop chat API call failed: {e}") return ( gr.update(interactive=True), gr.update(interactive=False), ) def build_demo(): custom_css = """ #log-view { border: 1px solid #ececec; padding: 12px; border-radius: 8px; scroll-behavior: smooth; } """ with gr.Blocks(css=custom_css, title="MiroMind Open-Source Deep Research") as demo: gr.HTML( "
MiroMind Open-Source Deep Research
" "
" "High performance in deep research and tool use.
" "Couple with framework: https://github.com/MiroMindAI/MiroThinker
" "
" "Model | " "Full Featured Website | " "Blog | " "Data
" ) with gr.Row(): inp = gr.Textbox(lines=3, label="Question (English only)") with gr.Row(): run_btn = gr.Button("Send") stop_btn = gr.Button("Stop", variant="stop", interactive=False) out_md = gr.Markdown("", elem_id="log-view") ui_state = gr.State({"chat_id": None}) # run: outputs -> markdown, run_btn(update), stop_btn(update), ui_state run_btn.click(fn=gradio_run, inputs=[inp, ui_state], outputs=[out_md, run_btn, stop_btn, ui_state]) # stop: outputs -> run_btn(update), stop_btn(update) stop_btn.click(fn=stop_current, inputs=[ui_state], outputs=[run_btn, stop_btn]) return demo if __name__ == "__main__": demo = build_demo() demo.launch(favicon_path="./favicon.ico")