PyVision / app.py
stzhao's picture
Update app.py
1e0f1ba verified
import sys
import os
import re
import json
import base64
from io import BytesIO
from PIL import Image
import argparse
# from vis_python_exe import PythonExecutor
from shared_vis_python_exe import PythonExecutor
from openai import OpenAI
from typing import Optional, Union
import gradio as gr
import markdown
from openai import AzureOpenAI
import zipfile
import tempfile
from datetime import datetime
def export_to_zip(images, conversations):
"""
将图像和对话数据导出为ZIP文件
Args:
images: 提取的图像列表
conversations: 对话JSON数据
Returns:
生成的ZIP文件路径
"""
# 创建临时目录
temp_dir = tempfile.mkdtemp()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_filename = os.path.join(temp_dir, f"export_{timestamp}.zip")
# 创建ZIP文件
with zipfile.ZipFile(zip_filename, 'w') as zipf:
# 保存图像
for i, img in enumerate(images):
img_path = os.path.join(temp_dir, f"image_{i}.png")
img.save(img_path)
zipf.write(img_path, f"images/image_{i}.png")
os.remove(img_path) # 删除临时图像文件
# 保存对话数据
json_path = os.path.join(temp_dir, "conversations.json")
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(conversations, f, ensure_ascii=False, indent=4)
zipf.write(json_path, "conversations.json")
os.remove(json_path) # 删除临时JSON文件
return zip_filename
def base64_to_image(
base64_str: str,
remove_prefix: bool = True,
convert_mode: Optional[str] = "RGB"
) -> Union[Image.Image, None]:
"""
将Base64编码的图片字符串转换为PIL Image对象
Args:
base64_str: Base64编码的图片字符串(可带data:前缀)
remove_prefix: 是否自动去除"...")
>>> img = base64_to_image("iVBORw0KGg...", remove_prefix=False)
"""
try:
# 1. 处理Base64前缀
if remove_prefix and "," in base64_str:
base64_str = base64_str.split(",")[1]
# 2. 解码Base64
image_data = base64.b64decode(base64_str)
# 3. 转换为PIL Image
image = Image.open(BytesIO(image_data))
# 4. 可选模式转换
if convert_mode:
image = image.convert(convert_mode)
return image
except (base64.binascii.Error, OSError, Exception) as e:
print(f"Base64解码失败: {str(e)}")
return None
def encode_image(image):
"""
将PIL.Image对象或图像文件路径转换为base64编码字符串,并获取分辨率信息
参数:
image: 可以是PIL.Image对象或图像文件路径
返回:
包含以下键的字典:
- 'base64': base64编码的字符串
- 'width': 图片宽度(像素)
- 'height': 图片高度(像素)
- 'resolution': 字符串形式的"宽度x高度"
"""
img_obj = None
if isinstance(image, str):
# 处理文件路径的情况
img_obj = Image.open(image)
with open(image, "rb") as image_file:
base64_str = base64.b64encode(image_file.read()).decode('utf-8')
else:
# 处理PIL.Image对象的情况
img_obj = image
buffered = BytesIO()
image.save(buffered, format='PNG')
base64_str = base64.b64encode(buffered.getvalue()).decode('utf-8')
# 获取分辨率信息
width, height = img_obj.size
return {
'base64': base64_str,
'width': width,
'height': height
}
def excute_codes(codes, messages, executor: PythonExecutor):
no_code_idx = []
codes_use = []
for i, code in enumerate(codes):
if code == "":
no_code_idx.append(i)
else:
codes_use.append(code)
batch_results = executor.batch_apply(codes_use, messages)
return batch_results, no_code_idx
def process_prompt_init(question, image, prompt_template, prompt_type):
prompt_prefix = prompt_template[prompt_type]
img_result = encode_image(image)
image_base64 = img_result['base64']
width = img_result['width']
height = img_result['height']
question_with_options = question
messages = [
{
"role": "user",
"content": [{"type": "text", "text": "<image_clue_0>"}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}}] + [{"type": "text", "text": "</image_clue_0>\n\n"}] + [{"type": "text", "text": prompt_prefix.format(query=question_with_options, width=str(width), height=str(height))}]
}
]
return messages
def update_messages_with_excu_content(messages, images_result, text_result, error_result, image_clue_idx):
if error_result is None:
new_messages = []
image_content = []
for message_item in messages[:-1]:
new_messages.append(message_item)
assistant_message_item = messages[-1]['content']
interpreter_message_text_prefix = [{"type": "text", "text": f"<interpreter>\nText Result:\n{text_result}\nImage Result:\n"}]
if images_result is not None:
# for image_base64_item in images_result[image_clue_idx-1:]:
for image_base64_item in images_result:
interpreter_message_images = [{"type": "text", "text": f"<image_clue_{image_clue_idx}>"}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64_item}"}}] + [{"type": "text", "text": f"</image_clue_{image_clue_idx}>"}]
image_content += interpreter_message_images
image_clue_idx += 1
else:
image_content = [{"type": "text", "text": "None"}]
interpreter_message_text_profill = [{"type": "text", "text": "</interpreter>\n"}]
assistant_message_item = assistant_message_item + interpreter_message_text_prefix + image_content + interpreter_message_text_profill
new_messages.append({"role": "assistant", "content": assistant_message_item})
else:
new_messages = []
for message_item in messages[:-1]:
new_messages.append(message_item)
assistant_message_item = messages[-1]['content']
interpreter_message_text_prefix = [{"type": "text", "text": f"<interpreter>{error_result}"}]
interpreter_message_text_profill = [{"type": "text", "text": "</interpreter>\n"}]
assistant_message_item = assistant_message_item + interpreter_message_text_prefix + interpreter_message_text_profill
new_messages.append({"role": "assistant", "content": assistant_message_item})
return new_messages, image_clue_idx
def update_messages_with_code(messages, generated_content):
message_item = {
"role": "assistant",
"content": [{"type": "text", "text": f"{generated_content}</code>\n"}]
}
messages.append(message_item)
return messages
def update_messages_with_text(messages, generated_content):
message_item = {
"role": "assistant",
"content": [{"type": "text", "text": f"{generated_content}"}]
}
messages.append(message_item)
return messages
def call_chatgpt_api(model_name, messages, client, max_tokens=10000, stop=None, temperature=0.6):
"""Call ChatGPT API with the given messages"""
try:
response = client.chat.completions.create(
model=model_name, # 使用支持视觉的模型
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=1.0,
stop=stop
)
response_text = response.choices[0].message.content
# 检查是否遇到停止标记
stop_reason = None
if stop and any(s in response_text for s in stop):
for s in stop:
if s in response_text:
stop_reason = s
break
else:
stop_reason = response.choices[0].finish_reason
if "<code>" in response_text:
stop_reason = "</code>"
return response_text, stop_reason
except Exception as e:
print(f"API Error: {str(e)}")
return None, None
def evaluate_single_data(model_name, data, client, executor, prompt_template, prompt_type):
messages = process_prompt_init(data["question"], data['image'], prompt_template, prompt_type)
# 生成初始响应
response_text, pred_stop_reason = call_chatgpt_api(
model_name,
messages,
client,
max_tokens=10000,
stop=["</code>"]
)
# 处理响应
final_response = response_text
code_execution_count = 0
image_clue_idx = 1
while True:
# 检查是否需要执行代码
if pred_stop_reason == "</code>":
# 提取要执行的代码
messages = update_messages_with_code(messages, response_text)
code_to_execute = response_text.split("```python")[-1].split("```")[0].strip()
# 执行代码
exe_result = excute_codes([code_to_execute], messages, executor)[0][0]
if exe_result is None:
text_result = "None"
images_result = None
else:
output, report = exe_result
if report == "Done":
error_result = None
try:
text_result = exe_result[0]['text']
except:
text_result = None
print("text result is none.")
try:
images_result = exe_result[0]['images']
except:
images_result = None
print("image result is none.")
else:
error_result = report
text_result = None
images_result = None
messages, new_image_clue_idx = update_messages_with_excu_content(messages, images_result, text_result, error_result, image_clue_idx)
image_clue_idx = new_image_clue_idx
code_execution_count += 1
print(f"Code Execution Count: {code_execution_count}")
# 生成下一部分响应
response_text, pred_stop_reason = call_chatgpt_api(
model_name,
messages,
client,
max_tokens=10000,
stop=["</code>"]
)
else:
final_response = response_text
messages = update_messages_with_text(messages, response_text)
print("GPT-4.1 finish.")
break
return messages
def process_message_to_sharegpt_format(message):
sharegpt_images = []
sharegpt_conversation = []
image_idx = 0
for i, message_item in enumerate(message):
role = message_item['role']
content_list = message_item['content']
whole_content = ""
for content_item in content_list:
content_type = content_item['type']
if content_type == "text":
content_value = content_item['text']
whole_content += content_value
elif content_type == "image_url":
content_value = content_item['image_url']['url']
whole_content += "<image>"
# image_path = os.path.join(sub_images_save_folder_path, f"{image_idx}.png")
image = base64_to_image(content_value)
if image:
# image.save(image_path)
# sharegpt_images.append(image_path)
sharegpt_images.append(image)
image_idx += 1
if i == 0:
sharegpt_conversation.append({"from": "human", "value": whole_content})
continue
if "<interpreter>" in whole_content:
gpt_content, observation_content = whole_content.split("<interpreter>", -1)
sharegpt_conversation.append({"from": "gpt", "value": gpt_content})
sharegpt_conversation.append({"from": "observation", "value": "<interpreter>"+observation_content})
elif i != 0:
sharegpt_conversation.append({"from": "gpt", "value": whole_content})
sharegpt_data_item = {
"conversations": sharegpt_conversation,
"images": sharegpt_images
}
return sharegpt_data_item
def process_message(messages):
# 创建HTML输出
html_output = '<div style="color: black;">' # 添加一个包裹所有内容的div,设置文本颜色为黑色
for message_item in messages:
role = message_item['role']
content = message_item['content']
# 根据角色设置样式
if role == "user" or role == "human":
html_output += f'<div style="background-color: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>User:</strong><br>'
elif role == "assistant":
html_output += f'<div style="background-color: #e6f7ff; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>Assistant:</strong><br>'
else:
html_output += f'<div style="background-color: #f9f9f9; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>{role.capitalize()}:</strong><br>'
# 处理内容
for content_item in content:
content_type = content_item['type']
if content_type == "text":
# 将Markdown文本转换为HTML
md_text = content_item['text']
html_text = markdown.markdown(md_text, extensions=['fenced_code', 'codehilite'])
# html_text = markdown.markdown(md_text)
# html_text = md_text
html_output += f'<div style="color: black;">{html_text}</div>'
elif content_type == "image_url":
content_value = content_item['image_url']['url']
# 如果是base64图片
if content_value.startswith("data:"):
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">'
else:
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">'
html_output += '</div>'
html_output += '</div>' # 关闭最外层div
return html_output
def pyvision_chat(model_name, client_type, api_key, base_url, question, image):
print("done!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
# 初始化组件
# client = AzureOpenAI(
# api_key=os.getenv("AZURE_OPENAI_API_KEY"), # 从环境变量读取
# api_version="2025-04-01-preview", # 使用最新API版本
# azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT") # 终结点URL
# )
if client_type == "Azure":
client = AzureOpenAI(
api_key=api_key, # 从环境变量读取
api_version="2025-04-01-preview", # 使用最新API版本
azure_endpoint=base_url # 终结点URL
)
else:
client = OpenAI(api_key=api_key, base_url=base_url)
executor = PythonExecutor()
prompt_template = json.load(open("./prompt_template_vis.json", "r", encoding="utf-8"))
prompt_type = 'vistool_with_img_info_v2'
data = {
"question": question,
"image": image,
}
# 评估单个数据点
messages = evaluate_single_data(model_name, data, client, executor, prompt_template, prompt_type)
html_output = process_message(messages)
# 将消息转换为JSON字符串,用于下载
json_str = json.dumps(messages, ensure_ascii=False, indent=4)
sharegpt_data_item = process_message_to_sharegpt_format(messages)
return html_output, sharegpt_data_item['images'], sharegpt_data_item['conversations']
# Gradio界面
def create_demo():
with gr.Blocks(title="PyVision's Online Demo", css="div.prose * {color: black !important;}") as demo:
gr.Markdown("# PyVision's Online Demo")
gr.Markdown("Upload an image and ask a question to get a response via PyVision's dynamic tooling ability.")
gr.Markdown("[Project Page](https://agent-x.space/pyvision/), [Research Paper](https://arxiv.org/abs/2507.07998), [Inference Code](https://github.com/agents-x-project/PyVision)")
with gr.Row():
with gr.Column(scale=1):
model_name = gr.Dropdown(
label="Model Selection",
choices=["gpt-4.1"],
value="gpt-4.1"
)
client_type = gr.Dropdown(label="Client Type Selection", choices=["Azure", "OpenAI"], value="Azure")
api_key = gr.Textbox(label="API Key (optional)", type="password", value=os.getenv("AZURE_OPENAI_API_KEY"))
base_url = gr.Textbox(label="Base URL (optional)", value=os.getenv("AZURE_OPENAI_ENDPOINT"))
image_input = gr.Image(label="Upload Image", type="pil")
question = gr.Textbox(label="Question", placeholder="Ask a question about the image...")
submit_btn = gr.Button("Submit")
with gr.Row():
output = gr.HTML(label="Response")
# with gr.Row():
# with gr.Column(scale=1):
# extracted_images = gr.Gallery(label="Extracted Images")
# with gr.Column(scale=1):
# extracted_json = gr.JSON(label="Extracted JSON")
# # 添加导出按钮
# with gr.Row():
# export_btn = gr.Button("Export to ZIP")
# download_file = gr.File(label="Download ZIP")
# 存储当前结果的状态变量
current_images = gr.State([])
current_json = gr.State(None)
# # 处理提交
# def handle_submit(model, client_type, key, url, q, img):
# html, images, json_data = pyvision_chat(model, client_type, key, url, q, img)
# return html, images, json_data, images, json_data
# submit_btn.click(
# fn=handle_submit,
# inputs=[model_name, client_type, api_key, base_url, question, image_input],
# outputs=[output, extracted_images, extracted_json, current_images, current_json]
# )
# 处理提交
def handle_submit(model, client_type, key, url, q, img):
html, images, json_data = pyvision_chat(model, client_type, key, url, q, img)
return html, images, json_data
submit_btn.click(
fn=handle_submit,
inputs=[model_name, client_type, api_key, base_url, question, image_input],
outputs=[output, current_images, current_json]
)
# # 处理导出
# def handle_export(images, conversations):
# if not images or conversations is None:
# return None
# zip_path = export_to_zip(images, conversations)
# return zip_path
# export_btn.click(
# fn=handle_export,
# inputs=[current_images, current_json],
# outputs=[download_file]
# )
# 示例部分
examples = [
# [
# "./examples/1.png",
# "From the information on that advertising board, what is the type of this shop?\nA. The shop is a yoga studio.\nB. The shop is a cafe.\nC. The shop is a seven-eleven.\nD. The shop is a milk tea shop.",
# ],
[
"./examples/2.png",
"What is the diagnosis for the abnormality seen in this image?\nA. Pulmonary embolism.\nB. Tuberculosis.\nC. COVID-19 infection.\nD. Influenza.",
],
[
"./examples/3.png",
"What is the color of the liquid contained in the glass on the table?\nA. The color of the liquid contained in the glass on the table is green.\nB. The color of the liquid contained in the glass on the table is transparent.\nC. The color of the liquid contained in the glass on the table is white.\nD. The color of the liquid contained in the glass on the table is orange.",
],
[
"./examples/4.png",
"Is the dog on the left or right side of the bicycle?\nA. The dog is on the right side of the bicycle.\nB. The dog is on the left side of the bicycle.",
],
[
"./examples/5.png",
"Is the kid with black shirt on the left or right side of the kid with blue shirt?\nA. The kid with black shirt is on the left side of the kid with blue shirt.\nB. The kid with black shirt is on the right side of the kid with blue shirt.",
],
[
"./examples/6.png",
"What can be observed in this image?\nA. Nerve entrapment.\nB. Musculoskeletal abnormality.\nC. Arteriovenous anomaly.\nD. Renal cyst.",
],
[
"./examples/7.png",
"What is the specific stage of cancer depicted in the image? A)Stage Ib, B)Stage IIIb, C)Stage IIc, D)Stage IIIa",
],
[
"./examples/8.png",
"A gymnast jotted down the number of cartwheels she did each day. What is the mode of the numbers?",
],
[
"./examples/9.png",
"Does Virginia have the highest value in the USA ?",
],
[
"./examples/10.png",
"AB is the diameter of ⊙O, PA is tangent to ⊙O at point A, and PO intersects ⊙O at point C; connect BC, if ∠P = 40.0, then ∠B is equal to ()",
],
[
"./examples/11.png",
"How many single-color paths go from C to A?",
],
[
"./examples/12.png",
"There is a numerical converter, the principle of which is shown in the following diagram: When the input x=16, the output y equals.",
],
[
"./examples/13.png",
"As shown in Figure 1, it is a right-angled triangular paper piece, $$ \angle A=30^{ \circ }$$, $$BC=\quantity{4}{cm}$$, it is folded so that point $$C$$ lands on point $$C'$$ on the hypotenuse, with the fold line being $$BD$$, as shown in Figure 2. Then, Figure 2 is folded along $$DE$$, so that point $$A$$ lands on point $$A'$$ on the extension of $$DC'$$, as shown in Figure 3. The length of the fold line $$DE$$ is ___.",
],
[
"./examples/14.png",
"As shown in the figure, in the 'Pascal's Triangle', the numbers above the diagonal line $$AB$$, indicated by the arrows, form a zigzag sequence: $$1$$, $$2$$, $$3$$, $$3$$, $$6$$, $$4$$, $$10$$, $$\cdots$$, let the sum of the first $$n$$ terms of this sequence be $$S_{n}$$, then $$S_{16}=$$ ___.",
],
[
"./examples/15.png",
"What do satellite 1, 2, 3 separately monitor in ?\nA. Earth's energy balance, Earth's water cycle, Earth's surface\nB. Earth's water cycle, Earth's energy balance, Earth's surface\nC. Earth's surface, Earth's water cycle, Earth's energy balance\nD. Earth's surface, Earth's energy balance, Earth's water cycle",
],
[
"./examples/16.png",
"This is a background image. Based on it, design a poster, rendering some text on it, including title, subtitle, and some slogan, making it a aesthetic and hormanized poster.",
],
[
"./examples/17.png",
"Fill the exact green shape shown in the question grid. Choose the only option set whose pieces perfectly tile the shape without gaps or overlap.",
],
[
"./examples/18.png",
"Consider a string of \(L=2.00 \mathrm{~m}\) attached to an adjustable-frequency string vibrator as shown in the figure. The waves produced by the vibrator travel down the string and are reflected by the fixed boundary condition at the pulley. The string, which has a linear mass density of \(\mu=0.006 \mathrm{~kg} / \mathrm{m}\), is passed over a frictionless pulley of a negligible mass, and the tension is provided by a 2.00-kg hanging mass.\n(a) What is the velocity of the waves on the string?\n(b) Draw a sketch of the first three normal modes of the standing waves that can be produced on the string and label each with the wavelength.\n(c) List the frequencies that the string vibrator must be tuned to in order to produce the first three normal modes of the standing waves.",
],
[
"./examples/19.png",
"The adventure starts at the green square. By following travel leftward 1 step, where do you conclude?",
],
]
gr.Examples(
examples,
[image_input, question],
label="Click any example to try it out!"
)
gr.Markdown("""
### Tips
1. We have set the API in this space, but if you want try run this demo on more data, please duplicate this space and set your own API.
2. It may take 2~5 min.
""")
return demo
# 创建并启动应用
if __name__ == "__main__":
demo = create_demo()
demo.launch()