Step3 / app.py
Zenith Wang
Revert to stable Gradio 4.19.2 and redesign interface without MultimodalTextbox
284099f
raw
history blame
10.7 kB
import gradio as gr
import time
import base64
from openai import OpenAI
import os
from io import BytesIO
from PIL import Image
import re
# 配置
BASE_URL = "https://api.stepfun.com/v1"
# 从环境变量获取API密钥
STEP_API_KEY = os.environ.get("STEP_API_KEY", "")
def image_to_base64(image):
"""将PIL图像转换为base64字符串"""
if image is None:
return None
if isinstance(image, Image.Image):
buffered = BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
return img_str
elif isinstance(image, str) and os.path.exists(image):
# 如果是文件路径
with open(image, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
return None
def extract_cot_and_answer(text):
"""从响应中提取CoT推理过程和最终答案"""
# 匹配<reasoning>标签内的内容
reasoning_pattern = re.compile(r'<reasoning>(.*?)</reasoning>', re.DOTALL)
match = reasoning_pattern.search(text)
if match:
cot = match.group(1).strip()
# 移除reasoning标签及其内容,得到最终答案
answer = reasoning_pattern.sub('', text).strip()
return cot, answer
else:
# 如果没有reasoning标签,整个响应就是答案
return "", text
def call_step_api_stream(message, history, image=None):
"""调用Step API进行流式对话"""
print(f"[DEBUG] Starting API call - Message: {message}, Has Image: {image is not None}")
if not message and not image:
print("[DEBUG] No message or image provided")
yield history, "", ""
return
if not STEP_API_KEY:
print("[DEBUG] API key not configured")
error_msg = "❌ API key not configured. Please add STEP_API_KEY in Settings."
history.append([message or "[Image]", error_msg])
yield history, "", ""
return
print(f"[DEBUG] API Key exists: {bool(STEP_API_KEY)}")
# 处理消息和图片
display_message = message or ""
image_content = None
if image:
try:
image_content = image_to_base64(image)
if image_content:
display_message = f"[Image uploaded] {message}" if message else "[Image uploaded]"
print(f"[DEBUG] Image processed successfully")
except Exception as e:
print(f"[DEBUG] Failed to process image: {e}")
# 添加用户消息到历史
history.append([display_message, ""])
yield history, "", ""
# 构造API消息
messages = []
# 添加历史对话(只保留文本,不包含标记)
for h in history[:-1]: # 不包含当前消息
if h[0]: # 用户消息
# 移除[Image uploaded]标记
user_text = h[0].replace("[Image uploaded] ", "").replace("[Image uploaded]", "")
if user_text:
messages.append({"role": "user", "content": user_text})
if h[1] and not h[1].startswith("❌"): # 助手回复(排除错误消息)
# 提取纯文本内容
assistant_text = h[1]
# 如果包含格式化的CoT和Answer,提取完整内容
if "**Reasoning Process:**" in assistant_text:
# 移除格式化标记,保留原始内容
assistant_text = re.sub(r'\*\*.*?\*\*', '', assistant_text)
assistant_text = assistant_text.replace("💭", "").replace("📝", "").replace("---", "").strip()
messages.append({"role": "assistant", "content": assistant_text})
# 构造当前消息
if image_content:
# 有图片的情况
current_content = [
{"type": "image_url", "image_url": {"url": f"data:image/jpg;base64,{image_content}", "detail": "high"}}
]
if message:
current_content.append({"type": "text", "text": message})
messages.append({"role": "user", "content": current_content})
else:
# 纯文本
if message:
messages.append({"role": "user", "content": message})
print(f"[DEBUG] Messages count: {len(messages)}")
# 创建客户端
try:
client = OpenAI(api_key=STEP_API_KEY, base_url=BASE_URL)
print("[DEBUG] Client created successfully")
except Exception as e:
print(f"[DEBUG] Client initialization failed: {e}")
history[-1][1] = f"❌ Client initialization failed: {str(e)}"
yield history, "", ""
return
# 调用API
try:
print("[DEBUG] Calling API...")
response = client.chat.completions.create(
model="step-3",
messages=messages,
temperature=0.7,
max_tokens=2000,
stream=True
)
print("[DEBUG] API call successful, processing stream...")
# 处理流式响应
full_response = ""
current_cot = ""
current_answer = ""
chunk_count = 0
for chunk in response:
chunk_count += 1
if chunk.choices and len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if hasattr(delta, 'content') and delta.content:
full_response += delta.content
# 实时提取CoT和答案
current_cot, current_answer = extract_cot_and_answer(full_response)
# 更新历史中的回复
if current_cot and current_answer:
# 如果有CoT,显示完整格式
history[-1][1] = f"💭 **Reasoning Process:**\n\n{current_cot}\n\n---\n\n📝 **Answer:**\n\n{current_answer}"
elif current_cot:
# 只有CoT,还没有答案
history[-1][1] = f"💭 **Reasoning Process:**\n\n{current_cot}\n\n---\n\n📝 **Answer:**\n\n*Generating...*"
else:
# 没有CoT,直接显示答案
history[-1][1] = current_answer
if chunk_count % 5 == 0: # 每5个chunk更新一次,减少更新频率
print(f"[DEBUG] Processed {chunk_count} chunks")
yield history, current_cot, current_answer
if not full_response:
print("[DEBUG] No response content received")
history[-1][1] = "⚠️ No response received from API"
yield history, "", ""
else:
print(f"[DEBUG] Final response length: {len(full_response)} chars")
# 最终更新
yield history, current_cot, current_answer
except Exception as e:
print(f"[DEBUG] API request failed: {e}")
import traceback
traceback.print_exc()
history[-1][1] = f"❌ API request failed: {str(e)}"
yield history, "", ""
def clear_all():
"""Clear all components"""
return [], None, "", "", ""
# 创建Gradio界面
with gr.Blocks(title="Step-3", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🤖 Step-3
Hello, I am Step-3!
""")
with gr.Row():
with gr.Column(scale=2):
# 对话界面
chatbot = gr.Chatbot(
height=500,
show_label=False,
elem_id="chatbot",
bubble_full_width=False
)
with gr.Row():
with gr.Column(scale=6):
# 文本输入框
msg = gr.Textbox(
placeholder="Type your message here...",
show_label=False,
lines=2,
max_lines=4,
container=False,
elem_id="msg"
)
with gr.Column(scale=2):
# 图片上传
image_input = gr.Image(
label="Upload Image",
type="filepath",
height=80,
scale=1
)
with gr.Column(scale=1):
send_btn = gr.Button("Send", variant="primary", scale=1)
clear_btn = gr.Button("Clear", scale=1)
with gr.Column(scale=1):
# CoT推理过程展示
gr.Markdown("### 💭 Chain of Thought")
cot_display = gr.Textbox(
label="Reasoning Process",
lines=10,
max_lines=15,
show_label=False,
interactive=False,
show_copy_button=True
)
gr.Markdown("### 📝 Final Answer")
answer_display = gr.Textbox(
label="Answer",
lines=10,
max_lines=15,
show_label=False,
interactive=False,
show_copy_button=True
)
# 事件处理
def on_submit(message, history, image):
if message or image:
return "", history, None
return message, history, image
# 提交消息
msg.submit(
on_submit,
[msg, chatbot, image_input],
[msg, chatbot, image_input],
queue=False
).then(
call_step_api_stream,
[msg, chatbot, image_input],
[chatbot, cot_display, answer_display]
)
send_btn.click(
on_submit,
[msg, chatbot, image_input],
[msg, chatbot, image_input],
queue=False
).then(
call_step_api_stream,
[msg, chatbot, image_input],
[chatbot, cot_display, answer_display]
)
clear_btn.click(
clear_all,
None,
[chatbot, image_input, msg, cot_display, answer_display]
)
# 页脚
gr.Markdown("""
---
<div style="text-align: center;">
<img src="https://huggingface.co/stepfun-ai/step3/resolve/main/figures/stepfun-logo.png" alt="StepFun Logo" style="height: 40px; margin: 10px;">
<br>
Powered by <a href="https://www.stepfun.com/" target="_blank">StepFun</a>
</div>
""")
# 启动应用
if __name__ == "__main__":
print(f"[DEBUG] Starting app with API key: {'Set' if STEP_API_KEY else 'Not set'}")
print(f"[DEBUG] Base URL: {BASE_URL}")
demo.queue(max_size=10)
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=False
)