|
import sys |
|
import os |
|
import re |
|
import json |
|
import base64 |
|
from io import BytesIO |
|
from PIL import Image |
|
import argparse |
|
|
|
from shared_vis_python_exe import PythonExecutor |
|
from openai import OpenAI |
|
from typing import Optional, Union |
|
import gradio as gr |
|
import markdown |
|
from openai import AzureOpenAI |
|
import zipfile |
|
import tempfile |
|
from datetime import datetime |
|
|
|
def export_to_zip(images, conversations): |
|
""" |
|
将图像和对话数据导出为ZIP文件 |
|
|
|
Args: |
|
images: 提取的图像列表 |
|
conversations: 对话JSON数据 |
|
|
|
Returns: |
|
生成的ZIP文件路径 |
|
""" |
|
|
|
temp_dir = tempfile.mkdtemp() |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
zip_filename = os.path.join(temp_dir, f"export_{timestamp}.zip") |
|
|
|
|
|
with zipfile.ZipFile(zip_filename, 'w') as zipf: |
|
|
|
for i, img in enumerate(images): |
|
img_path = os.path.join(temp_dir, f"image_{i}.png") |
|
img.save(img_path) |
|
zipf.write(img_path, f"images/image_{i}.png") |
|
os.remove(img_path) |
|
|
|
|
|
json_path = os.path.join(temp_dir, "conversations.json") |
|
with open(json_path, 'w', encoding='utf-8') as f: |
|
json.dump(conversations, f, ensure_ascii=False, indent=4) |
|
zipf.write(json_path, "conversations.json") |
|
os.remove(json_path) |
|
|
|
return zip_filename |
|
|
|
def base64_to_image( |
|
base64_str: str, |
|
remove_prefix: bool = True, |
|
convert_mode: Optional[str] = "RGB" |
|
) -> Union[Image.Image, None]: |
|
""" |
|
将Base64编码的图片字符串转换为PIL Image对象 |
|
|
|
Args: |
|
base64_str: Base64编码的图片字符串(可带data:前缀) |
|
remove_prefix: 是否自动去除"...") |
|
>>> img = base64_to_image("iVBORw0KGg...", remove_prefix=False) |
|
""" |
|
try: |
|
|
|
if remove_prefix and "," in base64_str: |
|
base64_str = base64_str.split(",")[1] |
|
|
|
|
|
image_data = base64.b64decode(base64_str) |
|
|
|
|
|
image = Image.open(BytesIO(image_data)) |
|
|
|
|
|
if convert_mode: |
|
image = image.convert(convert_mode) |
|
|
|
return image |
|
|
|
except (base64.binascii.Error, OSError, Exception) as e: |
|
print(f"Base64解码失败: {str(e)}") |
|
return None |
|
|
|
def encode_image(image): |
|
""" |
|
将PIL.Image对象或图像文件路径转换为base64编码字符串,并获取分辨率信息 |
|
|
|
参数: |
|
image: 可以是PIL.Image对象或图像文件路径 |
|
|
|
返回: |
|
包含以下键的字典: |
|
- 'base64': base64编码的字符串 |
|
- 'width': 图片宽度(像素) |
|
- 'height': 图片高度(像素) |
|
- 'resolution': 字符串形式的"宽度x高度" |
|
""" |
|
img_obj = None |
|
|
|
if isinstance(image, str): |
|
|
|
img_obj = Image.open(image) |
|
with open(image, "rb") as image_file: |
|
base64_str = base64.b64encode(image_file.read()).decode('utf-8') |
|
else: |
|
|
|
img_obj = image |
|
buffered = BytesIO() |
|
image.save(buffered, format='PNG') |
|
base64_str = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
|
|
|
width, height = img_obj.size |
|
|
|
return { |
|
'base64': base64_str, |
|
'width': width, |
|
'height': height |
|
} |
|
|
|
def excute_codes(codes, messages, executor: PythonExecutor): |
|
no_code_idx = [] |
|
codes_use = [] |
|
for i, code in enumerate(codes): |
|
if code == "": |
|
no_code_idx.append(i) |
|
else: |
|
codes_use.append(code) |
|
batch_results = executor.batch_apply(codes_use, messages) |
|
return batch_results, no_code_idx |
|
|
|
def process_prompt_init(question, image, prompt_template, prompt_type): |
|
prompt_prefix = prompt_template[prompt_type] |
|
|
|
img_result = encode_image(image) |
|
image_base64 = img_result['base64'] |
|
width = img_result['width'] |
|
height = img_result['height'] |
|
question_with_options = question |
|
|
|
messages = [ |
|
{ |
|
"role": "user", |
|
"content": [{"type": "text", "text": "<image_clue_0>"}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64}"}}] + [{"type": "text", "text": "</image_clue_0>\n\n"}] + [{"type": "text", "text": prompt_prefix.format(query=question_with_options, width=str(width), height=str(height))}] |
|
} |
|
] |
|
|
|
return messages |
|
|
|
def update_messages_with_excu_content(messages, images_result, text_result, error_result, image_clue_idx): |
|
if error_result is None: |
|
new_messages = [] |
|
image_content = [] |
|
for message_item in messages[:-1]: |
|
new_messages.append(message_item) |
|
|
|
assistant_message_item = messages[-1]['content'] |
|
interpreter_message_text_prefix = [{"type": "text", "text": f"<interpreter>\nText Result:\n{text_result}\nImage Result:\n"}] |
|
if images_result is not None: |
|
|
|
for image_base64_item in images_result: |
|
interpreter_message_images = [{"type": "text", "text": f"<image_clue_{image_clue_idx}>"}] + [{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_base64_item}"}}] + [{"type": "text", "text": f"</image_clue_{image_clue_idx}>"}] |
|
image_content += interpreter_message_images |
|
image_clue_idx += 1 |
|
else: |
|
image_content = [{"type": "text", "text": "None"}] |
|
interpreter_message_text_profill = [{"type": "text", "text": "</interpreter>\n"}] |
|
|
|
assistant_message_item = assistant_message_item + interpreter_message_text_prefix + image_content + interpreter_message_text_profill |
|
new_messages.append({"role": "assistant", "content": assistant_message_item}) |
|
else: |
|
new_messages = [] |
|
for message_item in messages[:-1]: |
|
new_messages.append(message_item) |
|
|
|
assistant_message_item = messages[-1]['content'] |
|
interpreter_message_text_prefix = [{"type": "text", "text": f"<interpreter>{error_result}"}] |
|
interpreter_message_text_profill = [{"type": "text", "text": "</interpreter>\n"}] |
|
|
|
assistant_message_item = assistant_message_item + interpreter_message_text_prefix + interpreter_message_text_profill |
|
new_messages.append({"role": "assistant", "content": assistant_message_item}) |
|
|
|
return new_messages, image_clue_idx |
|
|
|
|
|
|
|
def update_messages_with_code(messages, generated_content): |
|
message_item = { |
|
"role": "assistant", |
|
"content": [{"type": "text", "text": f"{generated_content}</code>\n"}] |
|
} |
|
|
|
messages.append(message_item) |
|
return messages |
|
|
|
def update_messages_with_text(messages, generated_content): |
|
message_item = { |
|
"role": "assistant", |
|
"content": [{"type": "text", "text": f"{generated_content}"}] |
|
} |
|
|
|
messages.append(message_item) |
|
return messages |
|
|
|
def call_chatgpt_api(model_name, messages, client, max_tokens=10000, stop=None, temperature=0.6): |
|
"""Call ChatGPT API with the given messages""" |
|
try: |
|
response = client.chat.completions.create( |
|
model=model_name, |
|
messages=messages, |
|
max_tokens=max_tokens, |
|
temperature=temperature, |
|
top_p=1.0, |
|
stop=stop |
|
) |
|
|
|
response_text = response.choices[0].message.content |
|
|
|
|
|
stop_reason = None |
|
if stop and any(s in response_text for s in stop): |
|
for s in stop: |
|
if s in response_text: |
|
stop_reason = s |
|
break |
|
else: |
|
stop_reason = response.choices[0].finish_reason |
|
|
|
if "<code>" in response_text: |
|
stop_reason = "</code>" |
|
|
|
return response_text, stop_reason |
|
|
|
except Exception as e: |
|
print(f"API Error: {str(e)}") |
|
return None, None |
|
|
|
def evaluate_single_data(model_name, data, client, executor, prompt_template, prompt_type): |
|
|
|
messages = process_prompt_init(data["question"], data['image'], prompt_template, prompt_type) |
|
|
|
|
|
response_text, pred_stop_reason = call_chatgpt_api( |
|
model_name, |
|
messages, |
|
client, |
|
max_tokens=10000, |
|
stop=["</code>"] |
|
) |
|
|
|
|
|
final_response = response_text |
|
code_execution_count = 0 |
|
image_clue_idx = 1 |
|
|
|
while True: |
|
|
|
if pred_stop_reason == "</code>": |
|
|
|
messages = update_messages_with_code(messages, response_text) |
|
code_to_execute = response_text.split("```python")[-1].split("```")[0].strip() |
|
|
|
|
|
exe_result = excute_codes([code_to_execute], messages, executor)[0][0] |
|
if exe_result is None: |
|
text_result = "None" |
|
images_result = None |
|
else: |
|
output, report = exe_result |
|
if report == "Done": |
|
error_result = None |
|
try: |
|
text_result = exe_result[0]['text'] |
|
except: |
|
text_result = None |
|
print("text result is none.") |
|
try: |
|
images_result = exe_result[0]['images'] |
|
except: |
|
images_result = None |
|
print("image result is none.") |
|
else: |
|
error_result = report |
|
text_result = None |
|
images_result = None |
|
|
|
messages, new_image_clue_idx = update_messages_with_excu_content(messages, images_result, text_result, error_result, image_clue_idx) |
|
image_clue_idx = new_image_clue_idx |
|
|
|
code_execution_count += 1 |
|
print(f"Code Execution Count: {code_execution_count}") |
|
|
|
|
|
response_text, pred_stop_reason = call_chatgpt_api( |
|
model_name, |
|
messages, |
|
client, |
|
max_tokens=10000, |
|
stop=["</code>"] |
|
) |
|
|
|
|
|
|
|
else: |
|
final_response = response_text |
|
messages = update_messages_with_text(messages, response_text) |
|
print("GPT-4.1 finish.") |
|
break |
|
|
|
return messages |
|
|
|
def process_message_to_sharegpt_format(message): |
|
sharegpt_images = [] |
|
sharegpt_conversation = [] |
|
image_idx = 0 |
|
|
|
for i, message_item in enumerate(message): |
|
role = message_item['role'] |
|
|
|
content_list = message_item['content'] |
|
whole_content = "" |
|
for content_item in content_list: |
|
content_type = content_item['type'] |
|
if content_type == "text": |
|
content_value = content_item['text'] |
|
whole_content += content_value |
|
elif content_type == "image_url": |
|
content_value = content_item['image_url']['url'] |
|
whole_content += "<image>" |
|
|
|
image = base64_to_image(content_value) |
|
if image: |
|
|
|
|
|
sharegpt_images.append(image) |
|
image_idx += 1 |
|
|
|
if i == 0: |
|
sharegpt_conversation.append({"from": "human", "value": whole_content}) |
|
continue |
|
|
|
if "<interpreter>" in whole_content: |
|
gpt_content, observation_content = whole_content.split("<interpreter>", -1) |
|
sharegpt_conversation.append({"from": "gpt", "value": gpt_content}) |
|
sharegpt_conversation.append({"from": "observation", "value": "<interpreter>"+observation_content}) |
|
elif i != 0: |
|
sharegpt_conversation.append({"from": "gpt", "value": whole_content}) |
|
|
|
sharegpt_data_item = { |
|
"conversations": sharegpt_conversation, |
|
"images": sharegpt_images |
|
} |
|
|
|
return sharegpt_data_item |
|
|
|
def process_message(messages): |
|
|
|
html_output = '<div style="color: black;">' |
|
|
|
for message_item in messages: |
|
role = message_item['role'] |
|
content = message_item['content'] |
|
|
|
|
|
if role == "user" or role == "human": |
|
html_output += f'<div style="background-color: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>User:</strong><br>' |
|
elif role == "assistant": |
|
html_output += f'<div style="background-color: #e6f7ff; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>Assistant:</strong><br>' |
|
else: |
|
html_output += f'<div style="background-color: #f9f9f9; padding: 10px; margin: 10px 0; border-radius: 10px; color: black;"><strong>{role.capitalize()}:</strong><br>' |
|
|
|
|
|
for content_item in content: |
|
content_type = content_item['type'] |
|
|
|
if content_type == "text": |
|
|
|
md_text = content_item['text'] |
|
html_text = markdown.markdown(md_text, extensions=['fenced_code', 'codehilite']) |
|
|
|
|
|
html_output += f'<div style="color: black;">{html_text}</div>' |
|
|
|
elif content_type == "image_url": |
|
content_value = content_item['image_url']['url'] |
|
|
|
if content_value.startswith("data:"): |
|
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">' |
|
else: |
|
html_output += f'<img src="{content_value}" style="max-width: 100%; margin: 10px 0;">' |
|
|
|
html_output += '</div>' |
|
|
|
html_output += '</div>' |
|
return html_output |
|
|
|
def pyvision_chat(model_name, client_type, api_key, base_url, question, image): |
|
print("done!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if client_type == "Azure": |
|
client = AzureOpenAI( |
|
api_key=api_key, |
|
api_version="2025-04-01-preview", |
|
azure_endpoint=base_url |
|
) |
|
else: |
|
client = OpenAI(api_key=api_key, base_url=base_url) |
|
executor = PythonExecutor() |
|
|
|
prompt_template = json.load(open("./prompt_template_vis.json", "r", encoding="utf-8")) |
|
prompt_type = 'vistool_with_img_info_v2' |
|
|
|
data = { |
|
"question": question, |
|
"image": image, |
|
} |
|
|
|
|
|
messages = evaluate_single_data(model_name, data, client, executor, prompt_template, prompt_type) |
|
html_output = process_message(messages) |
|
|
|
|
|
json_str = json.dumps(messages, ensure_ascii=False, indent=4) |
|
|
|
sharegpt_data_item = process_message_to_sharegpt_format(messages) |
|
|
|
return html_output, sharegpt_data_item['images'], sharegpt_data_item['conversations'] |
|
|
|
|
|
def create_demo(): |
|
with gr.Blocks(title="PyVision's Online Demo", css="div.prose * {color: black !important;}") as demo: |
|
gr.Markdown("# PyVision's Online Demo") |
|
gr.Markdown("Upload an image and ask a question to get a response via PyVision's dynamic tooling ability.") |
|
gr.Markdown("[Project Page](https://agent-x.space/pyvision/), [Research Paper](https://arxiv.org/abs/2507.07998), [Inference Code](https://github.com/agents-x-project/PyVision)") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
model_name = gr.Dropdown( |
|
label="Model Selection", |
|
choices=["gpt-4.1"], |
|
value="gpt-4.1" |
|
) |
|
client_type = gr.Dropdown(label="Client Type Selection", choices=["Azure", "OpenAI"], value="Azure") |
|
api_key = gr.Textbox(label="API Key (optional)", type="password", value=os.getenv("AZURE_OPENAI_API_KEY")) |
|
base_url = gr.Textbox(label="Base URL (optional)", value=os.getenv("AZURE_OPENAI_ENDPOINT")) |
|
image_input = gr.Image(label="Upload Image", type="pil") |
|
question = gr.Textbox(label="Question", placeholder="Ask a question about the image...") |
|
submit_btn = gr.Button("Submit") |
|
|
|
with gr.Row(): |
|
output = gr.HTML(label="Response") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
current_images = gr.State([]) |
|
current_json = gr.State(None) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_submit(model, client_type, key, url, q, img): |
|
html, images, json_data = pyvision_chat(model, client_type, key, url, q, img) |
|
return html, images, json_data |
|
|
|
submit_btn.click( |
|
fn=handle_submit, |
|
inputs=[model_name, client_type, api_key, base_url, question, image_input], |
|
outputs=[output, current_images, current_json] |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
examples = [ |
|
|
|
|
|
|
|
|
|
[ |
|
"./examples/2.png", |
|
"What is the diagnosis for the abnormality seen in this image?\nA. Pulmonary embolism.\nB. Tuberculosis.\nC. COVID-19 infection.\nD. Influenza.", |
|
], |
|
[ |
|
"./examples/3.png", |
|
"What is the color of the liquid contained in the glass on the table?\nA. The color of the liquid contained in the glass on the table is green.\nB. The color of the liquid contained in the glass on the table is transparent.\nC. The color of the liquid contained in the glass on the table is white.\nD. The color of the liquid contained in the glass on the table is orange.", |
|
], |
|
[ |
|
"./examples/4.png", |
|
"Is the dog on the left or right side of the bicycle?\nA. The dog is on the right side of the bicycle.\nB. The dog is on the left side of the bicycle.", |
|
], |
|
[ |
|
"./examples/5.png", |
|
"Is the kid with black shirt on the left or right side of the kid with blue shirt?\nA. The kid with black shirt is on the left side of the kid with blue shirt.\nB. The kid with black shirt is on the right side of the kid with blue shirt.", |
|
], |
|
[ |
|
"./examples/6.png", |
|
"What can be observed in this image?\nA. Nerve entrapment.\nB. Musculoskeletal abnormality.\nC. Arteriovenous anomaly.\nD. Renal cyst.", |
|
], |
|
[ |
|
"./examples/7.png", |
|
"What is the specific stage of cancer depicted in the image? A)Stage Ib, B)Stage IIIb, C)Stage IIc, D)Stage IIIa", |
|
], |
|
[ |
|
"./examples/8.png", |
|
"A gymnast jotted down the number of cartwheels she did each day. What is the mode of the numbers?", |
|
], |
|
[ |
|
"./examples/9.png", |
|
"Does Virginia have the highest value in the USA ?", |
|
], |
|
[ |
|
"./examples/10.png", |
|
"AB is the diameter of ⊙O, PA is tangent to ⊙O at point A, and PO intersects ⊙O at point C; connect BC, if ∠P = 40.0, then ∠B is equal to ()", |
|
], |
|
[ |
|
"./examples/11.png", |
|
"How many single-color paths go from C to A?", |
|
], |
|
[ |
|
"./examples/12.png", |
|
"There is a numerical converter, the principle of which is shown in the following diagram: When the input x=16, the output y equals.", |
|
], |
|
[ |
|
"./examples/13.png", |
|
"As shown in Figure 1, it is a right-angled triangular paper piece, $$ \angle A=30^{ \circ }$$, $$BC=\quantity{4}{cm}$$, it is folded so that point $$C$$ lands on point $$C'$$ on the hypotenuse, with the fold line being $$BD$$, as shown in Figure 2. Then, Figure 2 is folded along $$DE$$, so that point $$A$$ lands on point $$A'$$ on the extension of $$DC'$$, as shown in Figure 3. The length of the fold line $$DE$$ is ___.", |
|
], |
|
[ |
|
"./examples/14.png", |
|
"As shown in the figure, in the 'Pascal's Triangle', the numbers above the diagonal line $$AB$$, indicated by the arrows, form a zigzag sequence: $$1$$, $$2$$, $$3$$, $$3$$, $$6$$, $$4$$, $$10$$, $$\cdots$$, let the sum of the first $$n$$ terms of this sequence be $$S_{n}$$, then $$S_{16}=$$ ___.", |
|
], |
|
[ |
|
"./examples/15.png", |
|
"What do satellite 1, 2, 3 separately monitor in ?\nA. Earth's energy balance, Earth's water cycle, Earth's surface\nB. Earth's water cycle, Earth's energy balance, Earth's surface\nC. Earth's surface, Earth's water cycle, Earth's energy balance\nD. Earth's surface, Earth's energy balance, Earth's water cycle", |
|
], |
|
[ |
|
"./examples/16.png", |
|
"This is a background image. Based on it, design a poster, rendering some text on it, including title, subtitle, and some slogan, making it a aesthetic and hormanized poster.", |
|
], |
|
[ |
|
"./examples/17.png", |
|
"Fill the exact green shape shown in the question grid. Choose the only option set whose pieces perfectly tile the shape without gaps or overlap.", |
|
], |
|
[ |
|
"./examples/18.png", |
|
"Consider a string of \(L=2.00 \mathrm{~m}\) attached to an adjustable-frequency string vibrator as shown in the figure. The waves produced by the vibrator travel down the string and are reflected by the fixed boundary condition at the pulley. The string, which has a linear mass density of \(\mu=0.006 \mathrm{~kg} / \mathrm{m}\), is passed over a frictionless pulley of a negligible mass, and the tension is provided by a 2.00-kg hanging mass.\n(a) What is the velocity of the waves on the string?\n(b) Draw a sketch of the first three normal modes of the standing waves that can be produced on the string and label each with the wavelength.\n(c) List the frequencies that the string vibrator must be tuned to in order to produce the first three normal modes of the standing waves.", |
|
], |
|
[ |
|
"./examples/19.png", |
|
"The adventure starts at the green square. By following travel leftward 1 step, where do you conclude?", |
|
], |
|
] |
|
|
|
gr.Examples( |
|
examples, |
|
[image_input, question], |
|
label="Click any example to try it out!" |
|
) |
|
|
|
gr.Markdown(""" |
|
### Tips |
|
1. We have set the API in this space, but if you want try run this demo on more data, please duplicate this space and set your own API. |
|
2. It may take 2~5 min. |
|
""") |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
demo = create_demo() |
|
demo.launch() |
|
|
|
|
|
|