Spaces:
Running
Running
import os | |
import base64 | |
from io import BytesIO | |
from PIL import Image | |
from MobileAgent.api import inference_chat | |
from MobileAgent.prompt_no_input import get_action_prompt, get_reflect_prompt, get_memory_prompt, get_process_prompt | |
from MobileAgent.chat import init_action_chat, init_reflect_chat, init_memory_chat, add_response, add_response_two_image | |
from dashscope import MultiModalConversation | |
import dashscope | |
import concurrent | |
API_url = os.environ.get('url') | |
token = os.environ.get('token') | |
def base64_to_pil(base64_string): | |
if base64_string.startswith('data:image'): | |
base64_string = base64_string.split(',')[-1] | |
image_data = base64.b64decode(base64_string) | |
image_stream = BytesIO(image_data) | |
pil_image = Image.open(image_stream) | |
return pil_image | |
def process_image(image, query): | |
dashscope.api_key = os.environ.get('qwen') | |
image = "file://" + image | |
messages = [{ | |
'role': 'user', | |
'content': [ | |
{ | |
'image': image | |
}, | |
{ | |
'text': query | |
}, | |
] | |
}] | |
response = MultiModalConversation.call(model="qwen-vl-plus", messages=messages) | |
try: | |
response = response['output']['choices'][0]['message']['content'][0]["text"] | |
except: | |
response = "This is an icon." | |
return response | |
if not os.path.exists("screenshot"): | |
os.mkdir("screenshot") | |
if not os.path.exists("temp"): | |
os.mkdir("temp") | |
def mobile_agent_infer(json_data): | |
task = json_data["task"] | |
if task == "caption": | |
query = json_data["query"] | |
images = json_data["images"] | |
local_images = [] | |
for image in images: | |
image_name = image["image_name"] | |
image_file = image["image_file"] | |
image_file = base64_to_pil(image_file) | |
image_path = "temp/" + image_name | |
image_file.save(image_path, "PNG") | |
local_images.append(image_path) | |
icon_map = {} | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
futures = {executor.submit(process_image, image, query): i for i, image in enumerate(local_images)} | |
for future in concurrent.futures.as_completed(futures): | |
i = futures[future] | |
response = future.result() | |
icon_map[i + 1] = response | |
output = {"icon_map": icon_map} | |
return output | |
elif task == "planning": | |
instruction = json_data["instruction"] | |
thought_history = json_data["thought_history"] | |
summary_history = json_data["summary_history"] | |
action_history = json_data["action_history"] | |
completed_requirements = json_data["completed_requirements"] | |
add_info = json_data["add_info"] | |
prompt_planning = get_process_prompt(instruction, thought_history, summary_history, action_history, | |
completed_requirements, add_info) | |
chat_planning = init_memory_chat() | |
chat_planning = add_response("user", prompt_planning, chat_planning) | |
output_planning = inference_chat(chat_planning, 'gpt-4-turbo', API_url, token) | |
output = {"planning": output_planning} | |
return output | |
elif task == "decision": | |
screenshot_file = json_data["screenshot_file"] | |
screenshot_file = base64_to_pil(screenshot_file) | |
image_path = "screenshot/screenshot_local.png" | |
screenshot_file.save(image_path, "PNG") | |
instruction = json_data["instruction"] | |
perception_infos = json_data["perception_infos"] | |
width = json_data["width"] | |
height = json_data["height"] | |
summary_history = json_data["summary_history"] | |
action_history = json_data["action_history"] | |
summary = json_data["summary"] | |
action = json_data["action"] | |
add_info = json_data["add_info"] | |
error_flag = json_data["error_flag"] | |
completed_requirements = json_data["completed_requirements"] | |
memory = json_data["memory"] | |
memory_switch = json_data["memory_switch"] | |
insight = json_data["insight"] | |
prompt_action = get_action_prompt(instruction, perception_infos, width, height, summary_history, | |
action_history, summary, action, add_info, error_flag, completed_requirements, | |
memory) | |
chat_action = init_action_chat() | |
chat_action = add_response("user", prompt_action, chat_action, image_path) | |
output_action = inference_chat(chat_action, 'gpt-4o', API_url, token) | |
if output_action == "No token": | |
output = {"decision": "No token", "memory": None} | |
return output | |
chat_action = add_response("assistant", output_action, chat_action) | |
output_memory = None | |
if memory_switch: | |
prompt_memory = get_memory_prompt(insight) | |
chat_action = add_response("user", prompt_memory, chat_action) | |
output_memory = inference_chat(chat_action, 'gpt-4o', API_url, token) | |
output = {"decision": output_action, "memory": output_memory} | |
return output | |
elif task == "reflection": | |
screenshot_file = json_data["screenshot_file"] | |
screenshot_file = base64_to_pil(screenshot_file) | |
image_path = "screenshot/screenshot_local.png" | |
screenshot_file.save(image_path, "PNG") | |
last_screenshot_file = json_data["last_screenshot_file"] | |
last_screenshot_file = base64_to_pil(last_screenshot_file) | |
last_image_path = "screenshot/last_screenshot_local.png" | |
last_screenshot_file.save(last_image_path, "PNG") | |
instruction = json_data["instruction"] | |
last_perception_infos = json_data["last_perception_infos"] | |
perception_infos = json_data["perception_infos"] | |
width = json_data["width"] | |
height = json_data["height"] | |
summary = json_data["summary"] | |
action = json_data["action"] | |
add_info = json_data["add_info"] | |
prompt_reflect = get_reflect_prompt(instruction, last_perception_infos, perception_infos, width, height, | |
summary, action, add_info) | |
chat_reflect = init_reflect_chat() | |
chat_reflect = add_response_two_image("user", prompt_reflect, chat_reflect, [last_image_path, image_path]) | |
output_reflect = inference_chat(chat_reflect, 'gpt-4o', API_url, token) | |
output = {"reflection": output_reflect} | |
return output | |
else: | |
output = {"error": "The task must be in \"caption\", \"planning\", \"decision\" and \"reflection\"."} | |
return output |