File size: 4,712 Bytes
e37ee12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from typing import List
import modelscope_studio.components.antd as antd
import modelscope_studio.components.base as ms
import gradio as gr
from config import max_mcp_server_count


def McpServersButton(data_source: List[dict]):
    state = gr.State({"data_source": data_source})
    with antd.Button(value=None, variant="text",
                     color="primary") as mcp_servers_btn:
        with ms.Slot("icon"):
            antd.Icon("ToolOutlined")
    with antd.Modal(
            width=420,
            footer=False,
            centered=True,
            styles=dict(footer=dict(display="none"))) as mcp_servers_modal:
        with ms.Slot("title"):
            with antd.Flex(gap="small", align="center"):
                ms.Text("MCP Servers")
                mcp_servers_switch = antd.Switch(True)
                antd.Typography.Text(
                    f"最大 MCP Server 连接数:{max_mcp_server_count}",
                    type="secondary",
                    elem_style=dict(fontSize=12, fontWeight="normal"))
        with antd.List(
                data_source=data_source,
                pagination=dict(pageSize=10,
                                hideOnSinglePage=True)) as mcp_servers_list:
            with ms.Slot(
                    "renderItem",
                    params_mapping=
                    "(item) => ({ text: { value: item.name, disabled: item.disabled }, tag: { style: { display: item.internal ? undefined: 'none' } }, switch: { value: item.enabled, mcp: item.name, disabled: item.disabled }})"
            ):
                with antd.List.Item():
                    with antd.Flex(justify="space-between",
                                   elem_style=dict(width="100%")):
                        with antd.Flex(gap="small"):
                            antd.Typography.Text(as_item="text")
                            antd.Tag("官方示例", color="green", as_item="tag")
                        mcp_server_switch = antd.Switch(as_item="switch")

    def change_mcp_servers_switch(mcp_servers_switch_value, state_value):
        state_value["data_source"] = [{
            **item, "disabled":
            not mcp_servers_switch_value
        } for item in state_value["data_source"]]
        return gr.update(value=state_value)

    def change_mcp_server_switch(state_value, e: gr.EventData):
        mcp = e._data["component"]["mcp"]

        enabled = e._data["payload"][0]

        state_value["data_source"] = [{
            **item, "enabled": enabled
        } if item["name"] == mcp else item
                                      for item in state_value["data_source"]]
        return gr.update(value=state_value)

    def apply_state_change(state_value):
        has_tool_use = False
        disabled_tool_use = False
        enabled_server_count = 0
        for item in state_value["data_source"]:
            if item.get("enabled"):
                if enabled_server_count >= max_mcp_server_count:
                    item["enabled"] = False
                else:
                    enabled_server_count += 1
                    if item.get("disabled"):
                        disabled_tool_use = True
                    else:
                        has_tool_use = True

        if not disabled_tool_use:
            for item in state_value["data_source"]:
                if enabled_server_count >= max_mcp_server_count:
                    item["disabled"] = not item.get("enabled", False)
                else:
                    item["disabled"] = False

        return gr.update(
            data_source=state_value["data_source"],
            footer="没有可用的 MCP Server"
            if len(state_value["data_source"]) == 0 else ""), gr.update(
                color="primary" if has_tool_use else "default"), gr.update(
                    value=not disabled_tool_use), gr.update(value=state_value)

    mcp_servers_btn.click(fn=lambda: gr.update(open=True),
                          outputs=[mcp_servers_modal],
                          queue=False)
    mcp_servers_switch.change(fn=change_mcp_servers_switch,
                              inputs=[mcp_servers_switch, state],
                              outputs=[state])
    mcp_server_switch.change(fn=change_mcp_server_switch,
                             inputs=[state],
                             outputs=[state])
    state.change(
        fn=apply_state_change,
        inputs=[state],
        outputs=[mcp_servers_list, mcp_servers_btn, mcp_servers_switch, state],
        queue=False)
    mcp_servers_modal.cancel(fn=lambda: gr.update(open=False),
                             outputs=[mcp_servers_modal],
                             queue=False)
    return state