from control_joints import * from read_joints import * from realsense import * import os import cv2 import numpy as np import torch import time import argparse import sys import json # get current workspace sys.path.insert(0, 'policy/ACT_DP_multitask') # Add the path to the policy directory from policy_anyrobot import ACTDiffusionPolicy from t5_encoder import T5Embedder from utils import convert_weight import yaml def get_image(cameras): """ Get the latest images from all cameras and return them as a dictionary. """ obs_image = dict() for i in range(3): for cam in cameras: color_image = cam.get_latest_image() # Get the latest image from the camera BGR 0-255 H W 3 if color_image is not None: obs_image[cam.name] = color_image[:,:,::-1] # BGR to RGB conversion 0-255 H W 3 # cv save filename = f"{cam.name}_image_{i}.png" cv2.imwrite(filename, color_image) print(f"Saved image: {filename}") return obs_image # Return the dictionary containing images from all cameras def get_observation(obs_image, joint): # for key, value in obs_image.items(): # print(f"Camera: {key}, Image shape: {value.shape} , Image: {value.max()}") # Debugging line to check image shapes observation = dict() observation['images'] = dict() observation['images']['cam_high'] = np.moveaxis(obs_image['head_camera'], -1, 0) # rgb H W C -> C H W observation['images']['cam_left_wrist']= np.moveaxis(obs_image['left_camera'], -1, 0) # rgb H W C -> C H W observation['images']['cam_right_wrist'] = np.moveaxis(obs_image['right_camera'], -1, 0) # rgb H W C -> C H W observation['qpos'] = joint return observation def encode_obs(observations, camera_names): obs_img = [] for camera_name in camera_names: # ['cam_high', 'cam_left_wrist', 'cam_right_wrist'] obs_img.append(observations['images'][camera_name]) obs_img = np.stack(obs_img, axis=0) # shape: (N_views, H, W, C) 0-255 rgb image_data = torch.from_numpy(obs_img).unsqueeze(0).float() / 255.0 # Normalize to [0, 1] and add history dimension qpos_data = torch.from_numpy(np.array(observations['qpos'], dtype=np.float32)).unsqueeze(0) # shape: (1, 14) return image_data, qpos_data # no batch dimension def get_model(config_file, ckpt_file, device): with open(config_file, "r", encoding="utf-8") as file: policy_config = json.load(file) print(f"Loading policy config from {config_file}") policy = ACTDiffusionPolicy(policy_config) print(f"Loading model from {ckpt_file}") policy.load_state_dict(convert_weight(torch.load(ckpt_file, weights_only=False)["state_dict"])) policy.to(device) policy.eval() stats = torch.load(ckpt_file, weights_only=False)["stats"] print('Resetting observation normalization stats') policy.reset_obs(stats, norm_type = policy_config["norm_type"]) camera_names = policy_config["camera_names"] return policy, camera_names def get_language_encoder(device): MODEL_PATH ='/data/gjx/.cache/huggingface/hub/models--google--t5-v1_1-xxl/snapshots/3db67ab1af984cf10548a73467f0e5bca2aaaeb2' # MODEL_PATH = 'policy/weights/RDT/t5-v1_1-xxl' CONFIG_PATH = os.path.join('policy/ACT_DP_multitask',"base.yaml") with open(CONFIG_PATH, "r") as fp: config = yaml.safe_load(fp) text_embedder = T5Embedder( from_pretrained=MODEL_PATH, model_max_length=config["dataset"]["tokenizer_max_length"], device=device, use_offload_folder=None, ) tokenizer, text_encoder = text_embedder.tokenizer, text_embedder.model return tokenizer, text_encoder def get_language_embed(tokenizer, text_encoder, language_instruction, device): tokens = tokenizer( language_instruction, return_tensors="pt", padding="longest", truncation=True )["input_ids"].to(device) tokens = tokens.view(1, -1) with torch.no_grad(): pred = text_encoder(tokens).last_hidden_state.detach().cpu() return pred.squeeze(0).mean(0) # shape: (hidden_size) cpu tensor def test_code(): print("Testing the code...") # Dummy test function to ensure the script runs without errors # Dummy test function to ensure the script runs without errors # reader = JointReader(left_can="can_left_1", right_can="can_right_1") # ��ȡ�ؽڶ��� # cameras = [ # RealSenseCam("250122079815", "left_camera"), # RealSenseCam("048522073543", "head_camera"), # RealSenseCam("030522070109", "right_camera"), # ] # left_can, right_can = "can_left_2", "can_right_2" # ctx = rs.context() # if len(ctx.devices) > 0: # print("Found RealSense devices:") # for d in ctx.devices: # # ��ȡ�豸�����ƺ����к� # name = d.get_info(rs.camera_info.name) # serial_number = d.get_info(rs.camera_info.serial_number) # print(f"Device: {name}, Serial Number: {serial_number}") # else: # print("No Intel RealSense devices connected") # for cam in cameras: # cam.start() # for i in range(20): # print(f"Warm up: {i}", end="\r") # for cam in cameras: # color_image = cam.get_latest_image() # time.sleep(0.15) device = 'cpu' ckpt_dir = 'policy/ACT_DP_multitask/checkpoints/real_fintune_50_2000/act_dp' config_path = os.path.join(ckpt_dir, 'policy_config.json') ckpt_path = os.path.join(ckpt_dir, 'policy_lastest_seed_0.ckpt') policy, cameras_name = get_model(config_path, ckpt_path, device) instruction_file = 'instruction.txt' with open(instruction_file, 'r') as f: instruction = f.readline().strip().strip('"') print(f"Instruction: {instruction}") tokenizer, text_encoder = get_language_encoder(device) # ��ȡ���Ա����� task_emb = get_language_embed(tokenizer, text_encoder, instruction, device) # ��ȡ����Ƕ�� temsor D controller = ControlJoints(left_can=left_can, right_can=right_can) image = torch.rand(1, 3, 3, 480, 640) # Dummy image tensor qpos = torch.rand(1, 14) # Dummy joint position tensor actions = policy.get_action(qpos.float().to(device), image.float().to(device), task_emb.float().to(device)) print(f"Actions shape: {actions.shape}") if __name__ == "__main__": # test_code() # Uncomment to run the test function # print("Testing the done...") # exit() print(torch.backends.cudnn.enabled) print(torch.backends.cudnn.version()) print(torch.version.cuda) print(torch.__version__) # ������� parser = argparse.ArgumentParser(description="Deploy the action for a specific player") parser.add_argument("--ckpt_path", type=str, default='policy/ACT_DP_multitask/checkpoints/real_fintune_50_2000/act_dp') # ��ȡ�������� PLAYER player_value = os.getenv("PLAYER") # ��黷�������Ƿ������������ if player_value is None: raise ValueError("�������� PLAYER δ����") try: player_value = int(player_value) except ValueError: raise ValueError("�������� PLAYER ������һ������") # ���� PLAYER ��ִֵ�в�ͬ�IJ��� if player_value == 1: print("Player 1") cameras = [ RealSenseCam("337322073280", "left_camera"), RealSenseCam("337322074191", "head_camera"), RealSenseCam("337122072617", "right_camera"), ] left_can, right_can = "can_left_1", "can_right_1" elif player_value == 2: print("Player 2") cameras = [ RealSenseCam("250122079815", "left_camera"), RealSenseCam("048522073543", "head_camera"), RealSenseCam("030522070109", "right_camera"), ] left_can, right_can = "can_left_2", "can_right_2" else: raise ValueError("PLAYER ֵ��Ч�������� 1 �� 2") reader = JointReader(left_can=left_can, right_can=right_can) # ��ȡ�ؽڶ��� # ==== Get RGB ==== # ���������Ķ������ڹ����������ӵ� RealSense �豸 ctx = rs.context() # ����Ƿ����豸���� if len(ctx.devices) > 0: print("Found RealSense devices:") for d in ctx.devices: # ��ȡ�豸�����ƺ����к� name = d.get_info(rs.camera_info.name) serial_number = d.get_info(rs.camera_info.serial_number) print(f"Device: {name}, Serial Number: {serial_number}") else: print("No Intel RealSense devices connected") # ����������� for cam in cameras: cam.start() # Ԥ����� for i in range(10): print(f"Warm up: {i}", end="\r") for cam in cameras: color_image = cam.get_latest_image() time.sleep(0.15) # Load model device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # device ='cpu' ckpt_dir = parser.parse_args().ckpt_path config_path = os.path.join(ckpt_dir, 'policy_config.json') # ckpt_path = os.path.join(ckpt_dir, 'policy_lastest_seed_0.ckpt') # policy_epoch_600_seed_0.ckpt ckpt_path = os.path.join(ckpt_dir, 'policy_epoch_600_seed_0.ckpt') policy, camera_names = get_model(config_path, ckpt_path, device) # ��ȡģ�� print('camera_names:', camera_names) # Get instructions instruction_file = 'instruction.txt' with open(instruction_file, 'r') as f: instruction = f.readline().strip() print(f"Using instruction: {instruction}") tokenizer, text_encoder = get_language_encoder(device) # ��ȡ���Ա����� print('Loading language tokenizer and encoder...') task_emb = get_language_embed(tokenizer, text_encoder, instruction, device) # ��ȡ����Ƕ�� temsor D # # ==== Get Observation ==== controller = ControlJoints(left_can=left_can, right_can=right_can) max_timestep = 600 step = 0 while step < max_timestep: obs_image = get_image(cameras) joint = reader.get_joint_value() observation = get_observation(obs_image, joint) image, qpos = encode_obs(observation, camera_names) actions = policy.get_action(qpos.float().to(device), image.float().to(device), task_emb.float().to(device)) print(f"Step: {step}/{max_timestep}, Action: {actions.shape}") for action in actions[0:30]: # ִ��ÿ������ controller.control(action) step += 1 # import pdb; pdb.set_trace() # Debugging line to pause execution time.sleep(0.05) # joint = actions[-1] # obs = dict() # for i in range(3): # for cam in cameras: # color_image = cam.get_latest_image() # if color_image is not None: # # ����ͼ�� # obs[cam] = color_image # # filename = f"{cam.name}_image_{i}.png" # # cv2.imwrite(filename, color_image) # # print(f"Saved image: {filename}") # # ==== Get Joint ==== # reader = JointReader(left_can=left_can, right_can=right_can) # print(reader.get_joint_value()) # # ==== Deploy Action ==== # controller = ControlJoints(left_can=left_can, right_can=right_can) # for i in range(10): # positions = [0] * 14 # controller.control(positions) # time.sleep(0.1)