MiniMax-M1 / app.py
sriting's picture
feat: support two tabs
13cd324
raw
history blame
25.9 kB
import base64
import gradio as gr
import json
import mimetypes
import os
import requests
import time
import modelscope_studio.components.legacy as legacy
import modelscope_studio.components.antd as antd
import modelscope_studio.components.antdx as antdx
import modelscope_studio.components.base as ms
import modelscope_studio.components.pro as pro
from modelscope_studio.components.pro.chatbot import (
ChatbotActionConfig, ChatbotBotConfig, ChatbotMarkdownConfig,
ChatbotPromptsConfig, ChatbotUserConfig, ChatbotWelcomeConfig)
from config import DEFAULT_PROMPTS, EXAMPLES, SystemPrompt
import re
MODEL_VERSION = os.environ['MODEL_VERSION']
API_URL = os.environ['API_URL']
API_KEY = os.environ['API_KEY']
SYSTEM_PROMPT = os.environ.get('SYSTEM_PROMPT')
MULTIMODAL_FLAG = os.environ.get('MULTIMODAL')
MODEL_CONTROL_DEFAULTS = json.loads(os.environ['MODEL_CONTROL_DEFAULTS'])
NAME_MAP = {
'system': os.environ.get('SYSTEM_NAME'),
'user': os.environ.get('USER_NAME'),
}
MODEL_NAME = 'MiniMax-M1'
def prompt_select(e: gr.EventData):
return gr.update(value=e._data["payload"][0]["value"]["description"])
def clear():
return gr.update(value=None)
def retry(chatbot_value, e: gr.EventData):
index = e._data["payload"][0]["index"]
chatbot_value = chatbot_value[:index]
yield gr.update(loading=True), gr.update(value=chatbot_value), gr.update(disabled=True)
for chunk in submit(None, chatbot_value):
yield chunk
def cancel(chatbot_value):
chatbot_value[-1]["loading"] = False
chatbot_value[-1]["status"] = "done"
chatbot_value[-1]["footer"] = "Chat completion paused"
return gr.update(value=chatbot_value), gr.update(loading=False), gr.update(disabled=False)
def add_name_for_message(message):
name = NAME_MAP.get(message['role'])
if name is not None:
message['name'] = name
def convert_content(content):
if isinstance(content, str):
return content
if isinstance(content, tuple):
return [{
'type': 'image_url',
'image_url': {
'url': encode_base64(content[0]),
},
}]
content_list = []
for key, val in content.items():
if key == 'text':
content_list.append({
'type': 'text',
'text': val,
})
elif key == 'files':
for f in val:
content_list.append({
'type': 'image_url',
'image_url': {
'url': encode_base64(f),
},
})
return content_list
def encode_base64(path):
guess_type = mimetypes.guess_type(path)[0]
if not guess_type.startswith('image/'):
raise gr.Error('not an image ({}): {}'.format(guess_type, path))
with open(path, 'rb') as handle:
data = handle.read()
return 'data:{};base64,{}'.format(
guess_type,
base64.b64encode(data).decode(),
)
def format_history(history):
"""Convert chatbot history format to API call format"""
messages = []
if SYSTEM_PROMPT is not None:
messages.append({
'role': 'system',
'content': SYSTEM_PROMPT,
})
for item in history:
if item["role"] == "user":
messages.append({
'role': 'user',
'content': convert_content(item["content"]),
})
elif item["role"] == "assistant":
# Extract reasoning content and main content
reasoning_content = ""
main_content = ""
if isinstance(item["content"], list):
for content_item in item["content"]:
if content_item.get("type") == "tool":
reasoning_content = content_item.get("content", "")
elif content_item.get("type") == "text":
main_content = content_item.get("content", "")
else:
main_content = item["content"]
messages.append({
'role': 'assistant',
'content': convert_content(main_content),
'reasoning_content': convert_content(reasoning_content),
})
return messages
def submit(sender_value, chatbot_value):
if sender_value is not None:
chatbot_value.append({
"role": "user",
"content": sender_value,
})
api_messages = format_history(chatbot_value)
for message in api_messages:
add_name_for_message(message)
chatbot_value.append({
"role": "assistant",
"content": [],
"loading": True,
"status": "pending"
})
yield {
sender: gr.update(value=None, loading=True),
clear_btn: gr.update(disabled=True),
chatbot: gr.update(value=chatbot_value)
}
try:
data = {
'model': MODEL_VERSION,
'messages': api_messages,
'stream': True,
'max_tokens': MODEL_CONTROL_DEFAULTS['tokens_to_generate'],
'temperature': MODEL_CONTROL_DEFAULTS['temperature'],
'top_p': MODEL_CONTROL_DEFAULTS['top_p'],
}
r = requests.post(
API_URL,
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(API_KEY),
},
data=json.dumps(data),
stream=True,
)
thought_done = False
start_time = time.time()
message_content = chatbot_value[-1]["content"]
# Reasoning content (tool type)
message_content.append({
"type": "tool",
"content": "",
"options": {
"title": "πŸ€” Thinking..."
}
})
# Main content (text type)
message_content.append({
"type": "text",
"content": "",
})
reasoning_start_time = None
reasoning_duration = None
for row in r.iter_lines():
if row.startswith(b'data:'):
data = json.loads(row[5:])
if 'choices' not in data:
raise gr.Error('request failed')
choice = data['choices'][0]
if 'delta' in choice:
delta = choice['delta']
reasoning_content = delta.get('reasoning_content', '')
content = delta.get('content', '')
chatbot_value[-1]["loading"] = False
# Handle reasoning content
if reasoning_content:
if reasoning_start_time is None:
reasoning_start_time = time.time()
message_content[-2]["content"] += reasoning_content
# Handle main content
if content:
message_content[-1]["content"] += content
if not thought_done:
thought_done = True
if reasoning_start_time is not None:
reasoning_duration = time.time() - reasoning_start_time
thought_cost_time = "{:.2f}".format(reasoning_duration)
else:
reasoning_duration = 0.0
thought_cost_time = "0.00"
message_content[-2]["options"] = {"title": f"End of Thought ({thought_cost_time}s)"}
yield {chatbot: gr.update(value=chatbot_value)}
elif 'message' in choice:
message_data = choice['message']
reasoning_content = message_data.get('reasoning_content', '')
main_content = message_data.get('content', '')
message_content[-2]["content"] = reasoning_content
message_content[-1]["content"] = main_content
if reasoning_content and main_content:
if reasoning_duration is None:
if reasoning_start_time is not None:
reasoning_duration = time.time() - reasoning_start_time
thought_cost_time = "{:.2f}".format(reasoning_duration)
else:
reasoning_duration = 0.0
thought_cost_time = "0.00"
else:
thought_cost_time = "{:.2f}".format(reasoning_duration)
message_content[-2]["options"] = {"title": f"End of Thought ({thought_cost_time}s)"}
chatbot_value[-1]["loading"] = False
yield {chatbot: gr.update(value=chatbot_value)}
chatbot_value[-1]["footer"] = "{:.2f}s".format(time.time() - start_time)
chatbot_value[-1]["status"] = "done"
yield {
clear_btn: gr.update(disabled=False),
sender: gr.update(loading=False),
chatbot: gr.update(value=chatbot_value),
}
except Exception as e:
chatbot_value[-1]["loading"] = False
chatbot_value[-1]["status"] = "done"
chatbot_value[-1]["content"] = "Request failed, please try again."
yield {
clear_btn: gr.update(disabled=False),
sender: gr.update(loading=False),
chatbot: gr.update(value=chatbot_value),
}
raise e
def remove_code_block(text):
# Try to match code blocks with language markers
patterns = [
r'```(?:html|HTML)\n([\s\S]+?)\n```', # Match ```html or ```HTML
r'```\n([\s\S]+?)\n```', # Match code blocks without language markers
r'```([\s\S]+?)```' # Match code blocks without line breaks
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
extracted = match.group(1).strip()
print("Successfully extracted code block:", extracted)
return extracted
# If no code block is found, check if the entire text is HTML
if text.strip().startswith('<!DOCTYPE html>') or text.strip().startswith('<html'):
print("Text appears to be raw HTML, using as is")
return text.strip()
print("No code block found in text:", text)
return text.strip()
def send_to_sandbox(code):
# Add a wrapper to inject necessary permissions
wrapped_code = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<script>
// Create a safe storage alternative
const safeStorage = {{
_data: {{}},
getItem: function(key) {{
return this._data[key] || null;
}},
setItem: function(key, value) {{
this._data[key] = value;
}},
removeItem: function(key) {{
delete this._data[key];
}},
clear: function() {{
this._data = {{}};
}}
}};
// Replace native localStorage
Object.defineProperty(window, 'localStorage', {{
value: safeStorage,
writable: false
}});
// Add error handling without using alert
window.onerror = function(message, source, lineno, colno, error) {{
console.error('Error:', message);
}};
</script>
</head>
<body>
{code}
</body>
</html>
"""
encoded_html = base64.b64encode(wrapped_code.encode('utf-8')).decode('utf-8')
data_uri = f"data:text/html;charset=utf-8;base64,{encoded_html}"
iframe = f'<iframe src="{data_uri}" width="100%" height="920px" sandbox="allow-scripts allow-same-origin allow-forms allow-popups allow-modals allow-presentation" allow="display-capture"></iframe>'
print("Generated iframe:", iframe)
return iframe
def select_example(example: dict):
return lambda: gr.update(value=example["description"])
def generate_code(query: str):
if not query:
return None, None, None, gr.update(active_key="empty"), gr.update(active_key="reasoning", visible=False)
print("Starting code generation with query:", query)
messages = [{
'role': 'system',
'content': SystemPrompt
}, {
'role': 'user',
'content': query
}]
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
data = {
'model': MODEL_VERSION,
'messages': messages,
'stream': True,
'max_tokens': MODEL_CONTROL_DEFAULTS['tokens_to_generate'],
'temperature': MODEL_CONTROL_DEFAULTS['temperature'],
'top_p': MODEL_CONTROL_DEFAULTS['top_p'],
}
print(f"Attempt {retry_count + 1}: Sending request to API with data:", json.dumps(data, indent=2))
r = requests.post(
API_URL,
headers={
'Content-Type': 'application/json',
'Authorization': 'Bearer {}'.format(API_KEY),
},
data=json.dumps(data),
stream=True,
timeout=60 # Set 60 seconds timeout
)
content = ""
reasoning_content = ""
for row in r.iter_lines():
if row.startswith(b'data:'):
data = json.loads(row[5:])
print("Received data from API:", json.dumps(data, indent=2))
if 'choices' not in data:
raise gr.Error('request failed')
choice = data['choices'][0]
if 'delta' in choice:
delta = choice['delta']
content += delta.get('content', '')
reasoning_content += delta.get('reasoning_content', '')
print("Current content:", content)
print("Current reasoning:", reasoning_content)
yield {
code_output: content,
reasoning_output: reasoning_content + "\n",
state_tab: gr.update(active_key="loading"),
output_tabs: gr.update(active_key="reasoning", visible=True)
}
elif 'message' in choice:
message_data = choice['message']
content = message_data.get('content', '')
reasoning_content = message_data.get('reasoning_content', '')
print("Final content:", content)
print("Final reasoning:", reasoning_content)
html_content = remove_code_block(content)
print("Extracted HTML:", html_content)
yield {
code_output: content,
reasoning_output: reasoning_content + "\n",
sandbox: send_to_sandbox(html_content),
state_tab: gr.update(active_key="render"),
output_tabs: gr.update(active_key="code", visible=True) # Switch to code tab when complete
}
# If successful, break out of retry loop
break
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
retry_count += 1
if retry_count == max_retries:
print(f"Failed after {max_retries} attempts:", str(e))
raise gr.Error(f"Request failed after {max_retries} attempts: {str(e)}")
print(f"Attempt {retry_count} failed, retrying...")
time.sleep(1) # Wait 1 second before retrying
except Exception as e:
print("Error occurred:", str(e))
raise gr.Error(str(e))
css = """
.output-loading {
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
width: 100%;
min-height: 680px;
height: calc(100vh - 200px);
}
.output-html {
display: flex;
flex-direction: column;
width: 100%;
min-height: 680px;
}
.output-html > iframe {
flex: 1;
}
.reasoning-box {
height: 300px;
overflow-y: auto;
background-color: #f5f5f5;
border-radius: 4px;
margin-bottom: 12px;
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif;
font-size: 14px;
line-height: 1.2;
white-space: pre-wrap;
word-break: break-word;
width: 100%;
box-sizing: border-box;
scroll-behavior: smooth;
}
.reasoning-box .ms-markdown {
padding: 0 12px;
}
.reasoning-box::-webkit-scrollbar {
width: 6px;
}
.reasoning-box::-webkit-scrollbar-track {
background: #f1f1f1;
border-radius: 3px;
}
.reasoning-box::-webkit-scrollbar-thumb {
background: #888;
border-radius: 3px;
}
.reasoning-box::-webkit-scrollbar-thumb:hover {
background: #555;
}
.markdown-container {
height: 300px;
overflow-y: auto;
background-color: #f5f5f5;
border-radius: 4px;
margin-bottom: 12px;
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif;
font-size: 14px;
line-height: 1.6;
white-space: pre-wrap;
word-break: break-word;
width: 100%;
box-sizing: border-box;
scroll-behavior: smooth;
}
"""
def scroll_to_bottom():
return """
function() {
setTimeout(() => {
const reasoningBox = document.querySelector('.reasoning-box');
if (reasoningBox) {
reasoningBox.scrollTop = reasoningBox.scrollHeight;
}
const markdownContainer = document.querySelector('.markdown-container');
if (markdownContainer) {
markdownContainer.scrollTop = markdownContainer.scrollHeight;
}
}, 100);
}
"""
with gr.Blocks(css=css) as demo, ms.Application(), antdx.XProvider():
with antd.Tabs() as tabs:
with antd.Tabs.Item(key="chat", label="Chatbot"):
with antd.Flex(vertical=True, gap="middle"):
chatbot = pro.Chatbot(
height="calc(100vh - 200px)",
markdown_config=ChatbotMarkdownConfig(allow_tags=["think"]),
welcome_config=ChatbotWelcomeConfig(
variant="borderless",
icon="./assets/minimax-logo.png",
title="Hello, I'm MiniMax-M1",
description="You can input text to get started.",
prompts=ChatbotPromptsConfig(
title="How can I help you today?",
styles={
"list": {
"width": '100%',
},
"item": {
"flex": 1,
},
},
items=DEFAULT_PROMPTS
)
),
user_config=ChatbotUserConfig(actions=["copy", "edit"]),
bot_config=ChatbotBotConfig(
header=MODEL_NAME,
avatar="./assets/minimax-logo.png",
actions=["copy", "retry"]
)
)
with antdx.Sender() as sender:
with ms.Slot("prefix"):
with antd.Button(value=None, color="default", variant="text") as clear_btn:
with ms.Slot("icon"):
antd.Icon("ClearOutlined")
clear_btn.click(fn=clear, outputs=[chatbot])
submit_event = sender.submit(
fn=submit,
inputs=[sender, chatbot],
outputs=[sender, chatbot, clear_btn]
)
sender.cancel(
fn=cancel,
inputs=[chatbot],
outputs=[chatbot, sender, clear_btn],
cancels=[submit_event],
queue=False
)
chatbot.retry(
fn=retry,
inputs=[chatbot],
outputs=[sender, chatbot, clear_btn]
)
chatbot.welcome_prompt_select(fn=prompt_select, outputs=[sender])
with antd.Tabs.Item(key="code", label="Code Playground (WebDev)"):
with antd.Row(gutter=[32, 12]):
with antd.Col(span=12):
with antd.Flex(vertical=True, gap="middle"):
code_input = antd.InputTextarea(
size="large",
allow_clear=True,
placeholder="Please enter what kind of application you want or choose an example below and click the button"
)
code_btn = antd.Button("Generate Code", type="primary", size="large")
with antd.Tabs(active_key="reasoning", visible=False) as output_tabs:
with antd.Tabs.Item(key="reasoning", label="πŸ€” Thinking Process"):
reasoning_output = legacy.Markdown(
elem_classes="reasoning-box"
)
with antd.Tabs.Item(key="code", label="πŸ’» Generated Code"):
code_output = legacy.Markdown(elem_classes="markdown-container")
antd.Divider("Examples")
# Examples
with antd.Flex(gap="small", wrap=True):
for example in EXAMPLES:
with antd.Card(
elem_style=dict(
flex="1 1 fit-content"),
hoverable=True) as example_card:
antd.Card.Meta(
title=example['title'],
description=example['description'])
example_card.click(
fn=select_example(
example),
outputs=[code_input])
with antd.Col(span=12):
with ms.Div(elem_classes="right_panel"):
gr.HTML('<div class="render_header"><span class="header_btn"></span><span class="header_btn"></span><span class="header_btn"></span></div>')
with ms.Slot("extra"):
with ms.Div(elem_id="output-container-extra"):
with antd.Button(
"Download HTML",
type="link",
href_target="_blank",
disabled=True,
) as download_btn:
with ms.Slot("icon"):
antd.Icon("DownloadOutlined")
download_content = gr.Text(visible=False)
view_code_btn = antd.Button(
"πŸ§‘β€πŸ’» View Code", type="primary")
with antd.Tabs(active_key="empty", render_tab_bar="() => null") as state_tab:
with antd.Tabs.Item(key="empty"):
empty = antd.Empty(description="empty input", elem_classes="right_content")
with antd.Tabs.Item(key="loading"):
loading = antd.Spin(True, tip="coding...", size="large", elem_classes="output-loading")
with antd.Tabs.Item(key="render"):
sandbox = gr.HTML(elem_classes="output-html")
code_btn.click(
generate_code,
inputs=[code_input],
outputs=[code_output, reasoning_output, sandbox, state_tab, output_tabs]
)
# Add auto-scroll functionality
reasoning_output.change(
fn=scroll_to_bottom,
inputs=[],
outputs=[],
)
code_output.change(
fn=scroll_to_bottom,
inputs=[],
outputs=[],
)
def on_tab_change(tab_key):
return gr.update(active_key=tab_key, visible=True)
output_tabs.change(
fn=on_tab_change,
inputs=[output_tabs],
outputs=[output_tabs],
)
if __name__ == '__main__':
demo.queue(default_concurrency_limit=50).launch(ssr_mode=False)