Spaces:
Runtime error
Runtime error
import gradio as gr | |
import modelscope_studio.components.base as ms | |
import modelscope_studio.components.antd as antd | |
import modelscope_studio.components.pro as pro | |
import modelscope_studio.components.antdx as antdx | |
from modelscope_studio.components.pro.multimodal_input import MultimodalInputUploadConfig | |
import json | |
from langchain.chat_models import init_chat_model | |
from exceptiongroup import ExceptionGroup | |
from ui_components.config_form import ConfigForm | |
from ui_components.mcp_servers_button import McpServersButton | |
from mcp_client import generate_with_mcp, get_mcp_prompts | |
from config import bot_config, default_mcp_config, default_mcp_prompts, default_mcp_servers, model_options, user_config, welcome_config, default_theme, default_locale, bot_avatars, primary_color, mcp_prompt_model, model_options_map | |
from env import api_key, internal_mcp_config | |
from tools.oss import file_path_to_oss_url | |
def merge_mcp_config(mcp_config1, mcp_config2): | |
return { | |
"mcpServers": { | |
**mcp_config1.get("mcpServers", {}), | |
**mcp_config2.get("mcpServers", {}) | |
} | |
} | |
def format_messages(messages, oss_cache): | |
formatted_messages = [] | |
for message in messages: | |
if message["role"] == "user": | |
contents = '' | |
for content in message["content"]: | |
if content["type"] == "text": | |
contents += content["content"] | |
elif content["type"] == "file": | |
files = [] | |
for file_path in content["content"]: | |
file_url = oss_cache.get( | |
file_path, file_path_to_oss_url(file_path)) | |
oss_cache[file_path] = file_url | |
files.append(file_url) | |
contents += f"\n\nAttachment links: [{','.join(files)}]\n\n" | |
formatted_messages.append({"role": "user", "content": contents}) | |
elif message["role"] == "assistant": | |
formatted_messages.append({ | |
"role": | |
"assistant", | |
"content": | |
"\n".join([ | |
content["content"] for content in message["content"] | |
if content["type"] == "text" | |
]) | |
}) | |
return formatted_messages | |
def format_chatbot_header(model, model_config): | |
tag_label = model_config.get("tag", {}).get("label") | |
model_name = model.split('/')[1] | |
if tag_label: | |
return f"{model_name} `{tag_label}`" | |
else: | |
return model_name | |
async def submit(input_value, config_form_value, mcp_config_value, | |
mcp_servers_btn_value, chatbot_value, oss_state_value): | |
model_value = config_form_value.get("model", "") | |
model = model_value.split(":")[0] | |
model_config = next( | |
(x for x in model_options if x["value"] == model_value)) | |
model_params = model_config.get("model_params", {}) | |
sys_prompt = config_form_value.get("sys_prompt", "") | |
enabled_mcp_servers = [ | |
item["name"] for item in mcp_servers_btn_value["data_source"] | |
if item.get("enabled") and not item.get("disabled") | |
] | |
if input_value: | |
chatbot_value.append({ | |
"role": | |
"user", | |
"content": [{ | |
"type": "text", | |
"content": input_value["text"] | |
}] + ([{ | |
"type": "file", | |
"content": [file for file in input_value["files"]] | |
}] if len(input_value["files"]) > 0 else []), | |
"class_names": | |
dict(content="user-message-content") | |
}) | |
chatbot_value.append({ | |
"role": "assistant", | |
"loading": True, | |
"content": [], | |
"header": format_chatbot_header(model, model_config), | |
"avatar": bot_avatars.get(model.split("/")[0], None), | |
"status": "pending" | |
}) | |
yield gr.update( | |
loading=True, value=None), gr.update(disabled=True), gr.update( | |
value=chatbot_value, | |
bot_config=bot_config( | |
disabled_actions=['edit', 'retry', 'delete']), | |
user_config=user_config(disabled_actions=['edit', 'delete'])) | |
try: | |
prev_chunk_type = None | |
tool_name = "" | |
tool_args = "" | |
tool_content = "" | |
async for chunk in generate_with_mcp( | |
format_messages(chatbot_value[:-1], | |
oss_state_value["oss_cache"]), | |
mcp_config=merge_mcp_config(json.loads(mcp_config_value), | |
internal_mcp_config), | |
enabled_mcp_servers=enabled_mcp_servers, | |
sys_prompt=sys_prompt, | |
get_llm=lambda: init_chat_model( | |
model=model, | |
model_provider="openai", | |
api_key=api_key, | |
base_url="https://api-inference.modelscope.cn/v1/", | |
**model_params)): | |
chatbot_value[-1]["loading"] = False | |
current_content = chatbot_value[-1]["content"] | |
if prev_chunk_type != chunk["type"] and not ( | |
prev_chunk_type == "tool_call_chunks" | |
and chunk["type"] == "tool"): | |
current_content.append({}) | |
prev_chunk_type = chunk["type"] | |
if chunk["type"] == "content": | |
current_content[-1]['type'] = "text" | |
if not isinstance(current_content[-1].get("content"), str): | |
current_content[-1]['content'] = '' | |
current_content[-1]['content'] += chunk['content'] | |
elif chunk["type"] == "tool": | |
if not isinstance(current_content[-1].get("content"), str): | |
current_content[-1]['content'] = '' | |
chunk_content = chunk["content"] | |
current_content[-1]["content"] = current_content[-1][ | |
"content"] + f'\n\n**🎯 结果**\n```\n{chunk_content}\n```' | |
tool_name = "" | |
tool_args = "" | |
tool_content = "" | |
current_content[-1]['options']["status"] = "done" | |
elif chunk["type"] == "tool_call_chunks": | |
current_content[-1]['type'] = "tool" | |
current_content[-1]['editable'] = False | |
current_content[-1]['copyable'] = False | |
if not isinstance(current_content[-1].get("options"), dict): | |
current_content[-1]['options'] = { | |
"title": "", | |
"status": "pending" | |
} | |
if chunk["next_tool"]: | |
tool_name += ' ' | |
tool_content = tool_content + f"**📝 参数**\n```json\n{tool_args}\n```\n\n" | |
tool_args = "" | |
if chunk["name"]: | |
tool_name += chunk["name"] | |
current_content[-1]['options'][ | |
"title"] = f"**🔧 调用 MCP 工具** `{tool_name}`" | |
if chunk["content"]: | |
tool_args += chunk["content"] | |
current_content[-1][ | |
'content'] = tool_content + f"**📝 参数**\n```json\n{tool_args}\n```" | |
yield gr.skip(), gr.skip(), gr.update(value=chatbot_value) | |
except ExceptionGroup as eg: | |
e = eg.exceptions[0] | |
chatbot_value[-1]["loading"] = False | |
chatbot_value[-1]["content"] += [{ | |
"type": | |
"text", | |
"content": | |
f'<span style="color: var(--color-red-500)">{str(e)}</span>' | |
}] | |
print('Error: ', e) | |
raise gr.Error(str(e)) | |
except Exception as e: | |
chatbot_value[-1]["loading"] = False | |
chatbot_value[-1]["content"] += [{ | |
"type": | |
"text", | |
"content": | |
f'<span style="color: var(--color-red-500)">{str(e)}</span>' | |
}] | |
print('Error: ', e) | |
raise gr.Error(str(e)) | |
finally: | |
chatbot_value[-1]["status"] = "done" | |
yield gr.update(loading=False), gr.update(disabled=False), gr.update( | |
value=chatbot_value, | |
bot_config=bot_config(), | |
user_config=user_config()) | |
def cancel(chatbot_value): | |
chatbot_value[-1]["loading"] = False | |
chatbot_value[-1]["status"] = "done" | |
chatbot_value[-1]["footer"] = "对话已暂停" | |
yield gr.update(loading=False), gr.update(disabled=False), gr.update( | |
value=chatbot_value, | |
bot_config=bot_config(), | |
user_config=user_config()) | |
async def retry(config_form_value, mcp_config_value, mcp_servers_btn_value, | |
chatbot_value, oss_state_value, e: gr.EventData): | |
index = e._data["payload"][0]["index"] | |
chatbot_value = chatbot_value[:index] | |
async for chunk in submit(None, config_form_value, mcp_config_value, | |
mcp_servers_btn_value, chatbot_value, | |
oss_state_value): | |
yield chunk | |
def clear(): | |
return gr.update(value=None) | |
def select_welcome_prompt(input_value, e: gr.EventData): | |
input_value["text"] = e._data["payload"][0]["value"]["description"] | |
return gr.update(value=input_value) | |
def select_model(e: gr.EventData): | |
return gr.update(visible=e._data["payload"][1].get("thought", False)) | |
async def reset_mcp_config(mcp_servers_btn_value): | |
mcp_servers_btn_value["data_source"] = default_mcp_servers | |
return gr.update(value=default_mcp_config), gr.update( | |
value=mcp_servers_btn_value), gr.update( | |
welcome_config=welcome_config(default_mcp_prompts)), gr.update( | |
value={ | |
"mcp_config": default_mcp_config, | |
"mcp_prompts": default_mcp_prompts, | |
"mcp_servers": default_mcp_servers | |
}) | |
def has_mcp_config_changed(old_config: dict, new_config: dict) -> bool: | |
old_servers = old_config.get("mcpServers", {}) | |
new_servers = new_config.get("mcpServers", {}) | |
if set(old_servers.keys()) != set(new_servers.keys()): | |
return True | |
for server_name in old_servers: | |
old_server = old_servers[server_name] | |
new_server = new_servers.get(server_name) | |
if new_server is None: | |
return True | |
if old_server.get("type") == "sse" and new_server.get("type") == "sse": | |
if old_server.get("url") != new_server.get("url"): | |
return True | |
else: | |
return True | |
return False | |
def save_mcp_config_wrapper(initial: bool): | |
async def save_mcp_config(mcp_config_value, mcp_servers_btn_value, | |
browser_state_value): | |
mcp_config = json.loads(mcp_config_value) | |
prev_mcp_config = json.loads(browser_state_value["mcp_config"]) | |
browser_state_value["mcp_config"] = mcp_config_value | |
if has_mcp_config_changed(prev_mcp_config, mcp_config): | |
mcp_servers_btn_value["data_source"] = [{ | |
"name": mcp_name, | |
"enabled": True | |
} for mcp_name in mcp_config.get("mcpServers", {}).keys() | |
] + default_mcp_servers | |
browser_state_value["mcp_servers"] = mcp_servers_btn_value[ | |
"data_source"] | |
yield gr.update( | |
welcome_config=welcome_config({}, loading=True)), gr.update( | |
value=mcp_servers_btn_value), gr.skip() | |
if not initial: | |
gr.Success("保存成功") | |
prompts = await get_mcp_prompts( | |
mcp_config=merge_mcp_config(mcp_config, internal_mcp_config), | |
get_llm=lambda: init_chat_model( | |
model=mcp_prompt_model, | |
model_provider="openai", | |
api_key=api_key, | |
base_url="https://api-inference.modelscope.cn/v1/")) | |
browser_state_value["mcp_prompts"] = prompts | |
yield gr.update( | |
welcome_config=welcome_config(prompts)), gr.skip(), gr.update( | |
value=browser_state_value) | |
else: | |
yield gr.skip(), gr.skip(), gr.update(value=browser_state_value) | |
if not initial: | |
gr.Success("保存成功") | |
return save_mcp_config | |
def save_mcp_servers(mcp_servers_btn_value, browser_state_value): | |
browser_state_value["mcp_servers"] = mcp_servers_btn_value["data_source"] | |
return gr.update(value=browser_state_value) | |
def load(mcp_servers_btn_value, browser_state_value, url_mcp_config_value): | |
if browser_state_value: | |
mcp_servers_btn_value["data_source"] = browser_state_value[ | |
"mcp_servers"] | |
try: | |
url_mcp_config = json.loads(url_mcp_config_value) | |
except: | |
url_mcp_config = {} | |
return gr.update(value=json.dumps( | |
merge_mcp_config(json.loads(browser_state_value["mcp_config"]), | |
url_mcp_config), | |
indent=4)), gr.update(welcome_config=welcome_config( | |
browser_state_value["mcp_prompts"])), gr.update( | |
value=mcp_servers_btn_value) | |
elif url_mcp_config_value: | |
try: | |
url_mcp_config = json.loads(url_mcp_config_value) | |
except: | |
url_mcp_config = {} | |
return gr.update(value=json.dumps(merge_mcp_config(url_mcp_config, {}), | |
indent=4)), gr.skip(), gr.skip() | |
return gr.skip() | |
def lighten_color(hex_color, factor=0.2): | |
hex_color = hex_color.lstrip("#") | |
# 解析RGB值 | |
r = int(hex_color[0:2], 16) | |
g = int(hex_color[2:4], 16) | |
b = int(hex_color[4:6], 16) | |
# 向白色方向调整 | |
r = min(255, r + int((255 - r) * factor)) | |
g = min(255, g + int((255 - g) * factor)) | |
b = min(255, b + int((255 - b) * factor)) | |
# 转回十六进制 | |
return f"{r:02x}{g:02x}{b:02x}" | |
lighten_primary_color = lighten_color(primary_color, 0.4) | |
css = f""" | |
@media (max-width: 768px) {{ | |
.mcp-playground-header {{ | |
margin-top: 36px; | |
}} | |
}} | |
.ms-gr-auto-loading-default-antd {{ | |
z-index: 1001 !important; | |
}} | |
.user-message-content {{ | |
background-color: #{lighten_primary_color}; | |
}} | |
""" | |
js = "function init() { window.MODEL_OPTIONS_MAP=" + json.dumps( | |
model_options_map) + "}" | |
with gr.Blocks(css=css, js=js) as demo: | |
browser_state = gr.BrowserState( | |
{ | |
"mcp_config": default_mcp_config, | |
"mcp_prompts": default_mcp_prompts, | |
"mcp_servers": default_mcp_servers | |
}, | |
storage_key="mcp_config") | |
oss_state = gr.State({"oss_cache": {}}) | |
with ms.Application(), antdx.XProvider( | |
locale=default_locale, theme=default_theme), ms.AutoLoading(): | |
with antd.Badge.Ribbon(placement="start"): | |
with ms.Slot("text"): | |
with antd.Typography.Link(elem_style=dict(color="#fff"), | |
type="link", | |
href="https://modelscope.cn/mcp", | |
href_target="_blank"): | |
with antd.Flex(align="center", | |
gap=2, | |
elem_style=dict(padding=2)): | |
antd.Icon("ExportOutlined", | |
elem_style=dict(marginRight=4)) | |
ms.Text("前往") | |
antd.Image("./assets/modelscope-mcp.png", | |
preview=False, | |
width=20, | |
height=20) | |
ms.Text("ModelScope MCP 广场") | |
with ms.Div(elem_style=dict(overflow="hidden")): | |
with antd.Flex(justify="center", | |
gap="small", | |
align="center", | |
elem_classes="mcp-playground-header"): | |
with ms.Div(elem_style=dict(flexShrink=0, display='flex')): | |
antd.Image("./assets/logo.png", | |
preview=False, | |
elem_style=dict(backgroundColor="#fff"), | |
width=50, | |
height=50) | |
antd.Typography.Title("MCP Playground", | |
level=1, | |
elem_style=dict(fontSize=28, | |
margin=0)) | |
with antd.Tabs(): | |
with antd.Tabs.Item(label="实验场"): | |
with antd.Flex( | |
vertical=True, | |
gap="middle", | |
elem_style=dict( | |
height= | |
'calc(100vh - 46px - 16px - 50px - 16px - 16px - 21px - 16px)', | |
maxHeight=1500)): | |
with antd.Card( | |
elem_style=dict(flex=1, | |
height=0, | |
display="flex", | |
flexDirection="column"), | |
styles=dict(body=dict(flex=1, | |
height=0, | |
display='flex', | |
flexDirection='column'))): | |
chatbot = pro.Chatbot( | |
height=0, | |
bot_config=bot_config(), | |
user_config=user_config(), | |
welcome_config=welcome_config(default_mcp_prompts), | |
elem_style=dict(flex=1)) | |
with pro.MultimodalInput( | |
upload_config=MultimodalInputUploadConfig( | |
placeholder={ | |
"inline": { | |
"title": "上传文件", | |
"description": "拖拽文件到此处或点击录音按钮开始录音" | |
}, | |
"drop": { | |
"title": "将文件拖放到此处", | |
} | |
}, | |
title= | |
"上传附件(只对部分支持远程文件 URL 的 MCP Server 生效,文件个数上限: 10)", | |
multiple=True, | |
allow_paste_file=True, | |
allow_speech=True, | |
max_count=10)) as input: | |
with ms.Slot("prefix"): | |
with antd.Button(value=None, | |
variant="text", | |
color="default") as clear_btn: | |
with ms.Slot("icon"): | |
antd.Icon("ClearOutlined") | |
mcp_servers_btn = McpServersButton( | |
data_source=default_mcp_servers) | |
with antd.Tabs.Item(label="配置"): | |
with antd.Flex(vertical=True, gap="small"): | |
with antd.Card(): | |
config_form, mcp_config_confirm_btn, reset_mcp_config_btn, mcp_config = ConfigForm( | |
) | |
url_mcp_config = gr.Textbox(visible=False) | |
load_event = demo.load( | |
fn=load, | |
js= | |
"(mcp_servers_btn_value, browser_state_value) => [mcp_servers_btn_value, browser_state_value, decodeURIComponent(new URLSearchParams(window.location.search).get('studio_additional_params') || '') || null]", | |
inputs=[mcp_servers_btn, browser_state, url_mcp_config], | |
outputs=[mcp_config, chatbot, mcp_servers_btn]) | |
chatbot.welcome_prompt_select(fn=select_welcome_prompt, | |
inputs=[input], | |
outputs=[input], | |
queue=False) | |
retry_event = chatbot.retry( | |
fn=retry, | |
inputs=[config_form, mcp_config, mcp_servers_btn, chatbot, oss_state], | |
outputs=[input, clear_btn, chatbot]) | |
clear_btn.click(fn=clear, outputs=[chatbot], queue=False) | |
mcp_servers_btn.change(fn=save_mcp_servers, | |
inputs=[mcp_servers_btn, browser_state], | |
outputs=[browser_state]) | |
load_success_save_mcp_config_event = load_event.success( | |
fn=save_mcp_config_wrapper(initial=True), | |
inputs=[mcp_config, mcp_servers_btn, browser_state], | |
outputs=[chatbot, mcp_servers_btn, browser_state]) | |
save_mcp_config_event = mcp_config_confirm_btn.click( | |
fn=save_mcp_config_wrapper(initial=False), | |
inputs=[mcp_config, mcp_servers_btn, browser_state], | |
cancels=[load_success_save_mcp_config_event], | |
outputs=[chatbot, mcp_servers_btn, browser_state]) | |
reset_mcp_config_btn.click( | |
fn=reset_mcp_config, | |
inputs=[mcp_servers_btn], | |
outputs=[mcp_config, mcp_servers_btn, chatbot, browser_state], | |
cancels=[save_mcp_config_event, load_success_save_mcp_config_event]) | |
submit_event = input.submit(fn=submit, | |
inputs=[ | |
input, config_form, mcp_config, | |
mcp_servers_btn, chatbot, oss_state | |
], | |
outputs=[input, clear_btn, chatbot]) | |
input.cancel(fn=cancel, | |
inputs=[chatbot], | |
outputs=[input, clear_btn, chatbot], | |
cancels=[submit_event, retry_event], | |
queue=False) | |
demo.queue(default_concurrency_limit=100, max_size=100).launch(ssr_mode=False, | |
max_threads=100) | |