import json import os import cv2 import numpy as np import math import heapq import datetime from moviepy.editor import ImageSequenceClip import tqdm from utils import get_scene_dir_path, OBJECT_PICTURE_DIR class Video_Generator(): def __init__(self): self.frame_radius = 20 self.frame_thickness = 1 self.frame_outline_color = (0,0,0) #black self.frame_filling_color = (255, 255, 255) #white self.traj_count = {} def get_img_coord(self, points, camera_pose, image_width, image_height): transformation_matrix = compute_transformation_matrix(camera_pose) camera_positions = world_to_camera(points, transformation_matrix) projects_points = project_to_2d(camera_positions, camera_pose, image_width, image_height) return projects_points def draw_objframe(self, obj_type, point, background): image = background obj_path = os.path.join(OBJECT_PICTURE_DIR, '{}.png'.format(obj_type)) if not os.path.exists(obj_path): obj_path = os.path.join(OBJECT_PICTURE_DIR, 'Phone.png') obj_img = cv2.imread(obj_path) #draw frame center = (int(point[0]-1.2*self.frame_radius), int(point[1]-1.2*self.frame_radius)) cv2.circle(image, center, self.frame_radius, self.frame_filling_color, -1) cv2.circle(image, center, self.frame_radius, self.frame_outline_color, self.frame_thickness) theta = np.pi/8 line_start1 = (int(center[0]+self.frame_radius*np.sin(theta)), int(center[1]+self.frame_radius*np.cos(theta))) line_start2 = (int(center[0]+self.frame_radius*np.cos(theta)), int(center[1]+self.frame_radius*np.sin(theta))) line_end = (int(center[0] + 1.2*self.frame_radius), int(center[1] + 1.2*self.frame_radius)) cv2.line(image, line_start1, line_end, self.frame_outline_color, self.frame_thickness) cv2.line(image, line_start2, line_end, self.frame_outline_color, self.frame_thickness) cv2.circle(image, line_end, 3, (0,0,255), -1) #put object obj_resized = cv2.resize(obj_img, (self.frame_radius, self.frame_radius)) x_start = max(0, obj_resized.shape[0]//2-center[1]) x_end = min(obj_resized.shape[0], obj_resized.shape[0]//2 + image.shape[0] - center[1]) y_start = max(0, obj_resized.shape[1]//2-center[0]) y_end = min(obj_resized.shape[1], obj_resized.shape[1]//2 + image.shape[1] - center[0]) img_x_start = max(0, center[1]-obj_resized.shape[0]//2) img_x_end = min(image.shape[0], center[1]+obj_resized.shape[0]//2) img_y_start = max(0, center[0]-obj_resized.shape[1]//2) img_y_end = min(image.shape[1], center[0]+obj_resized.shape[1]//2) image[img_x_start:img_x_end, img_y_start: img_y_end] = obj_resized[x_start:x_end, y_start:y_end] def add_description(self, activity_name, time, obj_list, receptacle_list, background): image = background.copy() descrpition_width = 300 description_bg = np.zeros((background.shape[0], descrpition_width,3), np.uint8) res = np.hstack((image, description_bg)) font = cv2.FONT_HERSHEY_COMPLEX font_scale = 0.5 font_color = (0,0,0) thickness = 1 line_type = 8 # text_size = cv2.getTextSize(text,font, font_scale, line_type)[0] text_x = background.shape[0] + 10 text_y = descrpition_width//2 -50 text_y = 50 line_interval = 30 cv2.rectangle(res, (background.shape[1],0), (background.shape[1]+descrpition_width, background.shape[0]), (255,255,255),-1) texts = ['activity:', activity_name, 'time: ', str(time), 'object movement: '] for i, text in enumerate(texts): if i%2==0: text_x = background.shape[0] + 10 font_color = (0,0,0) else: text_x = background.shape[0] + 30 font_color = (0,0,255) cv2.putText(res, text, (text_x, text_y + i*line_interval), font, font_scale, font_color,thickness, line_type) start_line = 5 for i in range(len(obj_list)): obj = obj_list[i].split('|')[0] recep = receptacle_list[i].split('|')[0] # obj_move_text = '{} -> {}'.format(obj, recep) text_x = background.shape[0] + 120 font_color = (0,0,255) obj_text_size = cv2.getTextSize(obj, font, font_scale, thickness)[0][0] cv2.putText(res, obj, (text_x - 20 - obj_text_size, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type) cv2.putText(res, '->', (text_x, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type) cv2.putText(res, recep, (text_x + 40, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type) return res def draw_traj(self, info, image): last_point = info['last_point'] point = info['point'] is_end = info['end'] is_arrow = info['arrow'] radius = 3 next_point = (int(point[0]), int(point[1])) if last_point is None: start_color = (0,0,255) end_color = (255, 255, 0) cv2.circle(image, next_point, radius, start_color, -1) return pre_point = (int(last_point[0]), int(last_point[1])) line_color = (0,0,0) line_thickness = 1 arrowcolor = (0,255,0) arrow_thickness = 1 #count count = self.traj_count.get((pre_point, next_point),0) self.traj_count[(pre_point, next_point)] = count + 1 count = self.traj_count.get((next_point, pre_point),0) self.traj_count[(next_point, pre_point)] = count + 1 step = 0.2 line_thickness = min(int(1 + count * step), 5) #draw cv2.line(image, pre_point, next_point, line_color, line_thickness) if is_arrow: cv2.arrowedLine(image, pre_point, next_point,arrowcolor,arrow_thickness,tipLength=1.5) if is_end: end_color = (255, 255, 0) cv2.circle(image, next_point, radius, end_color, -1) def get_multiobj_image(self, draw_infos, background): image_list = [] if len(draw_infos)<=0: return image_list, background activity_name = draw_infos[0]['activity'] time = draw_infos[0]['time'] object_list = [info['object'] for info in draw_infos] receptacle_list = [info['receptacle'] for info in draw_infos] image_infos = [] for draw_info in draw_infos: obj = draw_info['object'].split('|')[0] points = draw_info['points'] last_point = None for point_num, point in enumerate(points): if point_num >= len(image_infos): image_infos.append([]) image_infos[point_num].append({ 'object':obj, 'point':point, 'last_point':last_point, 'end':point_num == len(points)-1, 'arrow':point_num == len(points)//3 }) last_point = (point[0], point[1]) image_with_traj = background.copy() for image_info in image_infos: #draw traj for info in image_info: self.draw_traj(info, image_with_traj) #draw obj with frame image = image_with_traj.copy() for info in image_info: self.draw_objframe(info['object'], info['point'], image) image = self.add_description(activity_name, time, object_list, receptacle_list, image) image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) image_list.append(image) return image_list, image_with_traj def get_receptacle_position_from_meta(self, receptacle_id, metadata): position = None objects = metadata['objects'] for obj in objects: if obj['objectId'] == receptacle_id: position = obj['position'] break return position def get_distance(self, point1, point2): return math.sqrt((point1['x'] - point2['x'])**2 + (point1['z'] - point2['z'])**2) def get_nearest_point(self, point, reachable_points): min_distance = 100000 nearest_point = None for rp in reachable_points: distance = self.get_distance(point, rp) if distance < min_distance: min_distance = distance nearest_point = rp return nearest_point def get_path(self, start_position, end_position, reachable_positions): res = [] res.append(start_position) start_point = self.get_nearest_point(start_position, reachable_positions) target_point = self.get_nearest_point(end_position, reachable_positions) point_id = 0 open_list = [(0, point_id, start_point)] came_from = {tuple((start_point['x'], start_point['z'])):0} cost_so_far = {tuple((start_point['x'], start_point['z'])):0} while open_list: current = heapq.heappop(open_list)[-1] if current == target_point: break for next_point in reachable_positions: dis = self.get_distance(current, next_point) if dis - 0.25 > 0.001: continue new_cost = cost_so_far[tuple((current['x'], current['z']))] + 1 if tuple((next_point['x'], next_point['z'])) not in cost_so_far or new_cost < cost_so_far[tuple((next_point['x'], next_point['z']))]: cost_so_far[tuple((next_point['x'], next_point['z']))] = new_cost priority = new_cost + abs(next_point['x'] - current['x']) + abs(next_point['z'] - current['z']) point_id += 1 heapq.heappush(open_list, (priority, point_id, next_point)) came_from[tuple((next_point['x'], next_point['z']))] = current path = [] current = target_point while current != start_point: path.append(current) current = came_from[tuple((current['x'], current['z']))] path.append(start_point) path.reverse() res.extend(path) res.append(end_position) return res def init_obj_traj(self, metadata): res = {} for obj in metadata['objects']: if obj['pickupable']: parentReceptacle = obj['parentReceptacles'] if parentReceptacle is not None and 'Floor' in parentReceptacle: parentReceptacle.remove('Floor') res[obj['objectId']] = parentReceptacle[0] if (parentReceptacle is not None and len(parentReceptacle)) >0 else None return res def save_vedio(self, dynamic_info, background, camera_pose, metadata, reachable_positions, output_path): object_traj = self.init_obj_traj(metadata) self.traj_count = {} image_list = [] paths_list = [] for time,timeinfo in tqdm.tqdm(dynamic_info.items()): draw_infos = [] for info in timeinfo: info['time'] = time target_object_id = info['object'] target_receptacle_id = info['receptacle'] target_object_receptacle = object_traj[target_object_id] if target_object_receptacle == target_receptacle_id: continue if target_object_receptacle is None: for obj in metadata['objects']: if obj['objectId'] == target_object_id: start_position = obj['position'] break else: start_position = self.get_receptacle_position_from_meta(target_object_receptacle, metadata) end_position = self.get_receptacle_position_from_meta(target_receptacle_id, metadata) path = self.get_path(start_position, end_position, reachable_positions) #path 包括start, end image_width = 300 image_height = 300 img_path = self.get_img_coord(path, camera_pose, image_width, image_height) paths_list.append(img_path) draw_info = { 'time':time, 'activity':info['activity'], 'object':target_object_id, 'receptacle':target_receptacle_id, 'points':img_path, } draw_infos.append(draw_info) object_traj[target_object_id] = target_receptacle_id time_images,image_with_traj = self.get_multiobj_image(draw_infos, background) background = image_with_traj image_list.extend(time_images) clip = ImageSequenceClip(image_list, fps=30) clip.write_videofile(str(output_path), fps=30, codec="libx264") def get_dynamic_info(self, schedules): res = {} for day, day_schedules in schedules.items(): for activity in day_schedules: activity_name = activity['activity'] start_time = activity['start_time'] content = activity['content'] for c in content: c['activity'] = activity_name time = datetime.datetime.combine(day, start_time) if time not in res: res[time] = [] res[time].extend(content) return res def generate(self, schedules, scene_file_name, vedio_path): metadata, camera_pose, background, reachable_points = read_scene(scene_file_name) schekeys = list(schedules.keys()) schedules_filter = {} schedules_filter[schekeys[0]] = schedules[schekeys[0]] dynamic_info = self.get_dynamic_info(schedules_filter) self.save_vedio(dynamic_info, background, camera_pose, metadata, reachable_points, vedio_path) def read_scene(scene_file_name): data_dir = get_scene_dir_path(scene_file_name) metadata = json.load(open(os.path.join(data_dir, 'metadata.json'),'r',encoding='utf-8')) camera_pose = json.load(open(os.path.join(data_dir, 'camera_pose.json'),'r',encoding='utf-8')) background = cv2.imread(os.path.join(data_dir, 'background.png')) reachable_points = json.load(open(os.path.join(data_dir, 'reachablePositions.json'),'r',encoding='utf-8')) return metadata, camera_pose, background, reachable_points def compute_transformation_matrix(camera_pose): position = camera_pose['position'] rotation = camera_pose['rotation'] translation_matrix = np.array([ [1, 0, 0, -position['x']], [0, 1, 0, -position['y']], [0, 0, 1, -position['z']], [0, 0, 0, 1] ]) theta_x = np.radians(rotation['x']) rotation_matrix_x = np.array([ [1, 0, 0, 0], [0, np.cos(theta_x), -np.sin(theta_x), 0], [0, np.sin(theta_x), np.cos(theta_x), 0], [0, 0, 0, 1] ]) transformation_matrix = np.dot(rotation_matrix_x, translation_matrix) return transformation_matrix def world_to_camera(positions, transformation_matrix): camera_positions = [] for pos in positions: world_pos = np.array([pos['x'], pos['y'], pos['z'], 1]) camera_pos = np.dot(transformation_matrix, world_pos) camera_positions.append(camera_pos) return camera_positions def project_to_2d(camera_positions, camera_pose, image_width, image_height): fov = camera_pose['fieldOfView'] aspect_ratio = image_width / image_height f = 1 / np.tan(np.radians(fov) / 2) projection_matrix = np.array([ [f / aspect_ratio, 0, 0, 0], [0, f, 0, 0], [0, 0, 1, 0] ]) projected_points = [] for pos in camera_positions: projected = np.dot(projection_matrix, pos) projected /= projected[2] x = (projected[0] + 1) * image_width / 2 y = (1 - projected[1]) * image_height / 2 x = image_width - x projected_points.append((x, y)) return projected_points