DynamicSceneGeneration / video_utils.py
wcx
Init commit
8f3b56b
import json
import os
import cv2
import numpy as np
import math
import heapq
import datetime
from moviepy.editor import ImageSequenceClip
import tqdm
from utils import get_scene_dir_path, OBJECT_PICTURE_DIR
class Video_Generator():
def __init__(self):
self.frame_radius = 20
self.frame_thickness = 1
self.frame_outline_color = (0,0,0) #black
self.frame_filling_color = (255, 255, 255) #white
self.traj_count = {}
def get_img_coord(self, points, camera_pose, image_width, image_height):
transformation_matrix = compute_transformation_matrix(camera_pose)
camera_positions = world_to_camera(points, transformation_matrix)
projects_points = project_to_2d(camera_positions, camera_pose, image_width, image_height)
return projects_points
def draw_objframe(self, obj_type, point, background):
image = background
obj_path = os.path.join(OBJECT_PICTURE_DIR, '{}.png'.format(obj_type))
if not os.path.exists(obj_path):
obj_path = os.path.join(OBJECT_PICTURE_DIR, 'Phone.png')
obj_img = cv2.imread(obj_path)
#draw frame
center = (int(point[0]-1.2*self.frame_radius), int(point[1]-1.2*self.frame_radius))
cv2.circle(image, center, self.frame_radius, self.frame_filling_color, -1)
cv2.circle(image, center, self.frame_radius, self.frame_outline_color, self.frame_thickness)
theta = np.pi/8
line_start1 = (int(center[0]+self.frame_radius*np.sin(theta)), int(center[1]+self.frame_radius*np.cos(theta)))
line_start2 = (int(center[0]+self.frame_radius*np.cos(theta)), int(center[1]+self.frame_radius*np.sin(theta)))
line_end = (int(center[0] + 1.2*self.frame_radius), int(center[1] + 1.2*self.frame_radius))
cv2.line(image, line_start1, line_end, self.frame_outline_color, self.frame_thickness)
cv2.line(image, line_start2, line_end, self.frame_outline_color, self.frame_thickness)
cv2.circle(image, line_end, 3, (0,0,255), -1)
#put object
obj_resized = cv2.resize(obj_img, (self.frame_radius, self.frame_radius))
x_start = max(0, obj_resized.shape[0]//2-center[1])
x_end = min(obj_resized.shape[0], obj_resized.shape[0]//2 + image.shape[0] - center[1])
y_start = max(0, obj_resized.shape[1]//2-center[0])
y_end = min(obj_resized.shape[1], obj_resized.shape[1]//2 + image.shape[1] - center[0])
img_x_start = max(0, center[1]-obj_resized.shape[0]//2)
img_x_end = min(image.shape[0], center[1]+obj_resized.shape[0]//2)
img_y_start = max(0, center[0]-obj_resized.shape[1]//2)
img_y_end = min(image.shape[1], center[0]+obj_resized.shape[1]//2)
image[img_x_start:img_x_end, img_y_start: img_y_end] = obj_resized[x_start:x_end, y_start:y_end]
def add_description(self, activity_name, time, obj_list, receptacle_list, background):
image = background.copy()
descrpition_width = 300
description_bg = np.zeros((background.shape[0], descrpition_width,3), np.uint8)
res = np.hstack((image, description_bg))
font = cv2.FONT_HERSHEY_COMPLEX
font_scale = 0.5
font_color = (0,0,0)
thickness = 1
line_type = 8
# text_size = cv2.getTextSize(text,font, font_scale, line_type)[0]
text_x = background.shape[0] + 10
text_y = descrpition_width//2 -50
text_y = 50
line_interval = 30
cv2.rectangle(res, (background.shape[1],0), (background.shape[1]+descrpition_width, background.shape[0]), (255,255,255),-1)
texts = ['activity:', activity_name, 'time: ', str(time), 'object movement: ']
for i, text in enumerate(texts):
if i%2==0:
text_x = background.shape[0] + 10
font_color = (0,0,0)
else:
text_x = background.shape[0] + 30
font_color = (0,0,255)
cv2.putText(res, text, (text_x, text_y + i*line_interval), font, font_scale, font_color,thickness, line_type)
start_line = 5
for i in range(len(obj_list)):
obj = obj_list[i].split('|')[0]
recep = receptacle_list[i].split('|')[0]
# obj_move_text = '{} -> {}'.format(obj, recep)
text_x = background.shape[0] + 120
font_color = (0,0,255)
obj_text_size = cv2.getTextSize(obj, font, font_scale, thickness)[0][0]
cv2.putText(res, obj, (text_x - 20 - obj_text_size, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type)
cv2.putText(res, '->', (text_x, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type)
cv2.putText(res, recep, (text_x + 40, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type)
return res
def draw_traj(self, info, image):
last_point = info['last_point']
point = info['point']
is_end = info['end']
is_arrow = info['arrow']
radius = 3
next_point = (int(point[0]), int(point[1]))
if last_point is None:
start_color = (0,0,255)
end_color = (255, 255, 0)
cv2.circle(image, next_point, radius, start_color, -1)
return
pre_point = (int(last_point[0]), int(last_point[1]))
line_color = (0,0,0)
line_thickness = 1
arrowcolor = (0,255,0)
arrow_thickness = 1
#count
count = self.traj_count.get((pre_point, next_point),0)
self.traj_count[(pre_point, next_point)] = count + 1
count = self.traj_count.get((next_point, pre_point),0)
self.traj_count[(next_point, pre_point)] = count + 1
step = 0.2
line_thickness = min(int(1 + count * step), 5)
#draw
cv2.line(image, pre_point, next_point, line_color, line_thickness)
if is_arrow:
cv2.arrowedLine(image, pre_point, next_point,arrowcolor,arrow_thickness,tipLength=1.5)
if is_end:
end_color = (255, 255, 0)
cv2.circle(image, next_point, radius, end_color, -1)
def get_multiobj_image(self, draw_infos, background):
image_list = []
if len(draw_infos)<=0:
return image_list, background
activity_name = draw_infos[0]['activity']
time = draw_infos[0]['time']
object_list = [info['object'] for info in draw_infos]
receptacle_list = [info['receptacle'] for info in draw_infos]
image_infos = []
for draw_info in draw_infos:
obj = draw_info['object'].split('|')[0]
points = draw_info['points']
last_point = None
for point_num, point in enumerate(points):
if point_num >= len(image_infos):
image_infos.append([])
image_infos[point_num].append({
'object':obj,
'point':point,
'last_point':last_point,
'end':point_num == len(points)-1,
'arrow':point_num == len(points)//3
})
last_point = (point[0], point[1])
image_with_traj = background.copy()
for image_info in image_infos:
#draw traj
for info in image_info:
self.draw_traj(info, image_with_traj)
#draw obj with frame
image = image_with_traj.copy()
for info in image_info:
self.draw_objframe(info['object'], info['point'], image)
image = self.add_description(activity_name, time, object_list, receptacle_list, image)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image_list.append(image)
return image_list, image_with_traj
def get_receptacle_position_from_meta(self, receptacle_id, metadata):
position = None
objects = metadata['objects']
for obj in objects:
if obj['objectId'] == receptacle_id:
position = obj['position']
break
return position
def get_distance(self, point1, point2):
return math.sqrt((point1['x'] - point2['x'])**2 + (point1['z'] - point2['z'])**2)
def get_nearest_point(self, point, reachable_points):
min_distance = 100000
nearest_point = None
for rp in reachable_points:
distance = self.get_distance(point, rp)
if distance < min_distance:
min_distance = distance
nearest_point = rp
return nearest_point
def get_path(self, start_position, end_position, reachable_positions):
res = []
res.append(start_position)
start_point = self.get_nearest_point(start_position, reachable_positions)
target_point = self.get_nearest_point(end_position, reachable_positions)
point_id = 0
open_list = [(0, point_id, start_point)]
came_from = {tuple((start_point['x'], start_point['z'])):0}
cost_so_far = {tuple((start_point['x'], start_point['z'])):0}
while open_list:
current = heapq.heappop(open_list)[-1]
if current == target_point:
break
for next_point in reachable_positions:
dis = self.get_distance(current, next_point)
if dis - 0.25 > 0.001:
continue
new_cost = cost_so_far[tuple((current['x'], current['z']))] + 1
if tuple((next_point['x'], next_point['z'])) not in cost_so_far or new_cost < cost_so_far[tuple((next_point['x'], next_point['z']))]:
cost_so_far[tuple((next_point['x'], next_point['z']))] = new_cost
priority = new_cost + abs(next_point['x'] - current['x']) + abs(next_point['z'] - current['z'])
point_id += 1
heapq.heappush(open_list, (priority, point_id, next_point))
came_from[tuple((next_point['x'], next_point['z']))] = current
path = []
current = target_point
while current != start_point:
path.append(current)
current = came_from[tuple((current['x'], current['z']))]
path.append(start_point)
path.reverse()
res.extend(path)
res.append(end_position)
return res
def init_obj_traj(self, metadata):
res = {}
for obj in metadata['objects']:
if obj['pickupable']:
parentReceptacle = obj['parentReceptacles']
if parentReceptacle is not None and 'Floor' in parentReceptacle:
parentReceptacle.remove('Floor')
res[obj['objectId']] = parentReceptacle[0] if (parentReceptacle is not None and len(parentReceptacle)) >0 else None
return res
def save_vedio(self, dynamic_info, background, camera_pose, metadata, reachable_positions, output_path):
object_traj = self.init_obj_traj(metadata)
self.traj_count = {}
image_list = []
paths_list = []
for time,timeinfo in tqdm.tqdm(dynamic_info.items()):
draw_infos = []
for info in timeinfo:
info['time'] = time
target_object_id = info['object']
target_receptacle_id = info['receptacle']
target_object_receptacle = object_traj[target_object_id]
if target_object_receptacle == target_receptacle_id:
continue
if target_object_receptacle is None:
for obj in metadata['objects']:
if obj['objectId'] == target_object_id:
start_position = obj['position']
break
else:
start_position = self.get_receptacle_position_from_meta(target_object_receptacle, metadata)
end_position = self.get_receptacle_position_from_meta(target_receptacle_id, metadata)
path = self.get_path(start_position, end_position, reachable_positions) #path 包括start, end
image_width = 300
image_height = 300
img_path = self.get_img_coord(path, camera_pose, image_width, image_height)
paths_list.append(img_path)
draw_info = {
'time':time,
'activity':info['activity'],
'object':target_object_id,
'receptacle':target_receptacle_id,
'points':img_path,
}
draw_infos.append(draw_info)
object_traj[target_object_id] = target_receptacle_id
time_images,image_with_traj = self.get_multiobj_image(draw_infos, background)
background = image_with_traj
image_list.extend(time_images)
clip = ImageSequenceClip(image_list, fps=30)
clip.write_videofile(str(output_path), fps=30, codec="libx264")
def get_dynamic_info(self, schedules):
res = {}
for day, day_schedules in schedules.items():
for activity in day_schedules:
activity_name = activity['activity']
start_time = activity['start_time']
content = activity['content']
for c in content:
c['activity'] = activity_name
time = datetime.datetime.combine(day, start_time)
if time not in res:
res[time] = []
res[time].extend(content)
return res
def generate(self, schedules, scene_file_name, vedio_path):
metadata, camera_pose, background, reachable_points = read_scene(scene_file_name)
schekeys = list(schedules.keys())
schedules_filter = {}
schedules_filter[schekeys[0]] = schedules[schekeys[0]]
dynamic_info = self.get_dynamic_info(schedules_filter)
self.save_vedio(dynamic_info, background, camera_pose, metadata, reachable_points, vedio_path)
def read_scene(scene_file_name):
data_dir = get_scene_dir_path(scene_file_name)
metadata = json.load(open(os.path.join(data_dir, 'metadata.json'),'r',encoding='utf-8'))
camera_pose = json.load(open(os.path.join(data_dir, 'camera_pose.json'),'r',encoding='utf-8'))
background = cv2.imread(os.path.join(data_dir, 'background.png'))
reachable_points = json.load(open(os.path.join(data_dir, 'reachablePositions.json'),'r',encoding='utf-8'))
return metadata, camera_pose, background, reachable_points
def compute_transformation_matrix(camera_pose):
position = camera_pose['position']
rotation = camera_pose['rotation']
translation_matrix = np.array([
[1, 0, 0, -position['x']],
[0, 1, 0, -position['y']],
[0, 0, 1, -position['z']],
[0, 0, 0, 1]
])
theta_x = np.radians(rotation['x'])
rotation_matrix_x = np.array([
[1, 0, 0, 0],
[0, np.cos(theta_x), -np.sin(theta_x), 0],
[0, np.sin(theta_x), np.cos(theta_x), 0],
[0, 0, 0, 1]
])
transformation_matrix = np.dot(rotation_matrix_x, translation_matrix)
return transformation_matrix
def world_to_camera(positions, transformation_matrix):
camera_positions = []
for pos in positions:
world_pos = np.array([pos['x'], pos['y'], pos['z'], 1])
camera_pos = np.dot(transformation_matrix, world_pos)
camera_positions.append(camera_pos)
return camera_positions
def project_to_2d(camera_positions, camera_pose, image_width, image_height):
fov = camera_pose['fieldOfView']
aspect_ratio = image_width / image_height
f = 1 / np.tan(np.radians(fov) / 2)
projection_matrix = np.array([
[f / aspect_ratio, 0, 0, 0],
[0, f, 0, 0],
[0, 0, 1, 0]
])
projected_points = []
for pos in camera_positions:
projected = np.dot(projection_matrix, pos)
projected /= projected[2]
x = (projected[0] + 1) * image_width / 2
y = (1 - projected[1]) * image_height / 2
x = image_width - x
projected_points.append((x, y))
return projected_points