Spaces:
Running
Running
import json | |
import os | |
import cv2 | |
import numpy as np | |
import math | |
import heapq | |
import datetime | |
from moviepy.editor import ImageSequenceClip | |
import tqdm | |
from utils import get_scene_dir_path, OBJECT_PICTURE_DIR | |
class Video_Generator(): | |
def __init__(self): | |
self.frame_radius = 20 | |
self.frame_thickness = 1 | |
self.frame_outline_color = (0,0,0) #black | |
self.frame_filling_color = (255, 255, 255) #white | |
self.traj_count = {} | |
def get_img_coord(self, points, camera_pose, image_width, image_height): | |
transformation_matrix = compute_transformation_matrix(camera_pose) | |
camera_positions = world_to_camera(points, transformation_matrix) | |
projects_points = project_to_2d(camera_positions, camera_pose, image_width, image_height) | |
return projects_points | |
def draw_objframe(self, obj_type, point, background): | |
image = background | |
obj_path = os.path.join(OBJECT_PICTURE_DIR, '{}.png'.format(obj_type)) | |
if not os.path.exists(obj_path): | |
obj_path = os.path.join(OBJECT_PICTURE_DIR, 'Phone.png') | |
obj_img = cv2.imread(obj_path) | |
#draw frame | |
center = (int(point[0]-1.2*self.frame_radius), int(point[1]-1.2*self.frame_radius)) | |
cv2.circle(image, center, self.frame_radius, self.frame_filling_color, -1) | |
cv2.circle(image, center, self.frame_radius, self.frame_outline_color, self.frame_thickness) | |
theta = np.pi/8 | |
line_start1 = (int(center[0]+self.frame_radius*np.sin(theta)), int(center[1]+self.frame_radius*np.cos(theta))) | |
line_start2 = (int(center[0]+self.frame_radius*np.cos(theta)), int(center[1]+self.frame_radius*np.sin(theta))) | |
line_end = (int(center[0] + 1.2*self.frame_radius), int(center[1] + 1.2*self.frame_radius)) | |
cv2.line(image, line_start1, line_end, self.frame_outline_color, self.frame_thickness) | |
cv2.line(image, line_start2, line_end, self.frame_outline_color, self.frame_thickness) | |
cv2.circle(image, line_end, 3, (0,0,255), -1) | |
#put object | |
obj_resized = cv2.resize(obj_img, (self.frame_radius, self.frame_radius)) | |
x_start = max(0, obj_resized.shape[0]//2-center[1]) | |
x_end = min(obj_resized.shape[0], obj_resized.shape[0]//2 + image.shape[0] - center[1]) | |
y_start = max(0, obj_resized.shape[1]//2-center[0]) | |
y_end = min(obj_resized.shape[1], obj_resized.shape[1]//2 + image.shape[1] - center[0]) | |
img_x_start = max(0, center[1]-obj_resized.shape[0]//2) | |
img_x_end = min(image.shape[0], center[1]+obj_resized.shape[0]//2) | |
img_y_start = max(0, center[0]-obj_resized.shape[1]//2) | |
img_y_end = min(image.shape[1], center[0]+obj_resized.shape[1]//2) | |
image[img_x_start:img_x_end, img_y_start: img_y_end] = obj_resized[x_start:x_end, y_start:y_end] | |
def add_description(self, activity_name, time, obj_list, receptacle_list, background): | |
image = background.copy() | |
descrpition_width = 300 | |
description_bg = np.zeros((background.shape[0], descrpition_width,3), np.uint8) | |
res = np.hstack((image, description_bg)) | |
font = cv2.FONT_HERSHEY_COMPLEX | |
font_scale = 0.5 | |
font_color = (0,0,0) | |
thickness = 1 | |
line_type = 8 | |
# text_size = cv2.getTextSize(text,font, font_scale, line_type)[0] | |
text_x = background.shape[0] + 10 | |
text_y = descrpition_width//2 -50 | |
text_y = 50 | |
line_interval = 30 | |
cv2.rectangle(res, (background.shape[1],0), (background.shape[1]+descrpition_width, background.shape[0]), (255,255,255),-1) | |
texts = ['activity:', activity_name, 'time: ', str(time), 'object movement: '] | |
for i, text in enumerate(texts): | |
if i%2==0: | |
text_x = background.shape[0] + 10 | |
font_color = (0,0,0) | |
else: | |
text_x = background.shape[0] + 30 | |
font_color = (0,0,255) | |
cv2.putText(res, text, (text_x, text_y + i*line_interval), font, font_scale, font_color,thickness, line_type) | |
start_line = 5 | |
for i in range(len(obj_list)): | |
obj = obj_list[i].split('|')[0] | |
recep = receptacle_list[i].split('|')[0] | |
# obj_move_text = '{} -> {}'.format(obj, recep) | |
text_x = background.shape[0] + 120 | |
font_color = (0,0,255) | |
obj_text_size = cv2.getTextSize(obj, font, font_scale, thickness)[0][0] | |
cv2.putText(res, obj, (text_x - 20 - obj_text_size, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type) | |
cv2.putText(res, '->', (text_x, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type) | |
cv2.putText(res, recep, (text_x + 40, text_y + (start_line+i)*line_interval), font, font_scale, font_color,thickness, line_type) | |
return res | |
def draw_traj(self, info, image): | |
last_point = info['last_point'] | |
point = info['point'] | |
is_end = info['end'] | |
is_arrow = info['arrow'] | |
radius = 3 | |
next_point = (int(point[0]), int(point[1])) | |
if last_point is None: | |
start_color = (0,0,255) | |
end_color = (255, 255, 0) | |
cv2.circle(image, next_point, radius, start_color, -1) | |
return | |
pre_point = (int(last_point[0]), int(last_point[1])) | |
line_color = (0,0,0) | |
line_thickness = 1 | |
arrowcolor = (0,255,0) | |
arrow_thickness = 1 | |
#count | |
count = self.traj_count.get((pre_point, next_point),0) | |
self.traj_count[(pre_point, next_point)] = count + 1 | |
count = self.traj_count.get((next_point, pre_point),0) | |
self.traj_count[(next_point, pre_point)] = count + 1 | |
step = 0.2 | |
line_thickness = min(int(1 + count * step), 5) | |
#draw | |
cv2.line(image, pre_point, next_point, line_color, line_thickness) | |
if is_arrow: | |
cv2.arrowedLine(image, pre_point, next_point,arrowcolor,arrow_thickness,tipLength=1.5) | |
if is_end: | |
end_color = (255, 255, 0) | |
cv2.circle(image, next_point, radius, end_color, -1) | |
def get_multiobj_image(self, draw_infos, background): | |
image_list = [] | |
if len(draw_infos)<=0: | |
return image_list, background | |
activity_name = draw_infos[0]['activity'] | |
time = draw_infos[0]['time'] | |
object_list = [info['object'] for info in draw_infos] | |
receptacle_list = [info['receptacle'] for info in draw_infos] | |
image_infos = [] | |
for draw_info in draw_infos: | |
obj = draw_info['object'].split('|')[0] | |
points = draw_info['points'] | |
last_point = None | |
for point_num, point in enumerate(points): | |
if point_num >= len(image_infos): | |
image_infos.append([]) | |
image_infos[point_num].append({ | |
'object':obj, | |
'point':point, | |
'last_point':last_point, | |
'end':point_num == len(points)-1, | |
'arrow':point_num == len(points)//3 | |
}) | |
last_point = (point[0], point[1]) | |
image_with_traj = background.copy() | |
for image_info in image_infos: | |
#draw traj | |
for info in image_info: | |
self.draw_traj(info, image_with_traj) | |
#draw obj with frame | |
image = image_with_traj.copy() | |
for info in image_info: | |
self.draw_objframe(info['object'], info['point'], image) | |
image = self.add_description(activity_name, time, object_list, receptacle_list, image) | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
image_list.append(image) | |
return image_list, image_with_traj | |
def get_receptacle_position_from_meta(self, receptacle_id, metadata): | |
position = None | |
objects = metadata['objects'] | |
for obj in objects: | |
if obj['objectId'] == receptacle_id: | |
position = obj['position'] | |
break | |
return position | |
def get_distance(self, point1, point2): | |
return math.sqrt((point1['x'] - point2['x'])**2 + (point1['z'] - point2['z'])**2) | |
def get_nearest_point(self, point, reachable_points): | |
min_distance = 100000 | |
nearest_point = None | |
for rp in reachable_points: | |
distance = self.get_distance(point, rp) | |
if distance < min_distance: | |
min_distance = distance | |
nearest_point = rp | |
return nearest_point | |
def get_path(self, start_position, end_position, reachable_positions): | |
res = [] | |
res.append(start_position) | |
start_point = self.get_nearest_point(start_position, reachable_positions) | |
target_point = self.get_nearest_point(end_position, reachable_positions) | |
point_id = 0 | |
open_list = [(0, point_id, start_point)] | |
came_from = {tuple((start_point['x'], start_point['z'])):0} | |
cost_so_far = {tuple((start_point['x'], start_point['z'])):0} | |
while open_list: | |
current = heapq.heappop(open_list)[-1] | |
if current == target_point: | |
break | |
for next_point in reachable_positions: | |
dis = self.get_distance(current, next_point) | |
if dis - 0.25 > 0.001: | |
continue | |
new_cost = cost_so_far[tuple((current['x'], current['z']))] + 1 | |
if tuple((next_point['x'], next_point['z'])) not in cost_so_far or new_cost < cost_so_far[tuple((next_point['x'], next_point['z']))]: | |
cost_so_far[tuple((next_point['x'], next_point['z']))] = new_cost | |
priority = new_cost + abs(next_point['x'] - current['x']) + abs(next_point['z'] - current['z']) | |
point_id += 1 | |
heapq.heappush(open_list, (priority, point_id, next_point)) | |
came_from[tuple((next_point['x'], next_point['z']))] = current | |
path = [] | |
current = target_point | |
while current != start_point: | |
path.append(current) | |
current = came_from[tuple((current['x'], current['z']))] | |
path.append(start_point) | |
path.reverse() | |
res.extend(path) | |
res.append(end_position) | |
return res | |
def init_obj_traj(self, metadata): | |
res = {} | |
for obj in metadata['objects']: | |
if obj['pickupable']: | |
parentReceptacle = obj['parentReceptacles'] | |
if parentReceptacle is not None and 'Floor' in parentReceptacle: | |
parentReceptacle.remove('Floor') | |
res[obj['objectId']] = parentReceptacle[0] if (parentReceptacle is not None and len(parentReceptacle)) >0 else None | |
return res | |
def save_vedio(self, dynamic_info, background, camera_pose, metadata, reachable_positions, output_path): | |
object_traj = self.init_obj_traj(metadata) | |
self.traj_count = {} | |
image_list = [] | |
paths_list = [] | |
for time,timeinfo in tqdm.tqdm(dynamic_info.items()): | |
draw_infos = [] | |
for info in timeinfo: | |
info['time'] = time | |
target_object_id = info['object'] | |
target_receptacle_id = info['receptacle'] | |
target_object_receptacle = object_traj[target_object_id] | |
if target_object_receptacle == target_receptacle_id: | |
continue | |
if target_object_receptacle is None: | |
for obj in metadata['objects']: | |
if obj['objectId'] == target_object_id: | |
start_position = obj['position'] | |
break | |
else: | |
start_position = self.get_receptacle_position_from_meta(target_object_receptacle, metadata) | |
end_position = self.get_receptacle_position_from_meta(target_receptacle_id, metadata) | |
path = self.get_path(start_position, end_position, reachable_positions) #path 包括start, end | |
image_width = 300 | |
image_height = 300 | |
img_path = self.get_img_coord(path, camera_pose, image_width, image_height) | |
paths_list.append(img_path) | |
draw_info = { | |
'time':time, | |
'activity':info['activity'], | |
'object':target_object_id, | |
'receptacle':target_receptacle_id, | |
'points':img_path, | |
} | |
draw_infos.append(draw_info) | |
object_traj[target_object_id] = target_receptacle_id | |
time_images,image_with_traj = self.get_multiobj_image(draw_infos, background) | |
background = image_with_traj | |
image_list.extend(time_images) | |
clip = ImageSequenceClip(image_list, fps=30) | |
clip.write_videofile(str(output_path), fps=30, codec="libx264") | |
def get_dynamic_info(self, schedules): | |
res = {} | |
for day, day_schedules in schedules.items(): | |
for activity in day_schedules: | |
activity_name = activity['activity'] | |
start_time = activity['start_time'] | |
content = activity['content'] | |
for c in content: | |
c['activity'] = activity_name | |
time = datetime.datetime.combine(day, start_time) | |
if time not in res: | |
res[time] = [] | |
res[time].extend(content) | |
return res | |
def generate(self, schedules, scene_file_name, vedio_path): | |
metadata, camera_pose, background, reachable_points = read_scene(scene_file_name) | |
schekeys = list(schedules.keys()) | |
schedules_filter = {} | |
schedules_filter[schekeys[0]] = schedules[schekeys[0]] | |
dynamic_info = self.get_dynamic_info(schedules_filter) | |
self.save_vedio(dynamic_info, background, camera_pose, metadata, reachable_points, vedio_path) | |
def read_scene(scene_file_name): | |
data_dir = get_scene_dir_path(scene_file_name) | |
metadata = json.load(open(os.path.join(data_dir, 'metadata.json'),'r',encoding='utf-8')) | |
camera_pose = json.load(open(os.path.join(data_dir, 'camera_pose.json'),'r',encoding='utf-8')) | |
background = cv2.imread(os.path.join(data_dir, 'background.png')) | |
reachable_points = json.load(open(os.path.join(data_dir, 'reachablePositions.json'),'r',encoding='utf-8')) | |
return metadata, camera_pose, background, reachable_points | |
def compute_transformation_matrix(camera_pose): | |
position = camera_pose['position'] | |
rotation = camera_pose['rotation'] | |
translation_matrix = np.array([ | |
[1, 0, 0, -position['x']], | |
[0, 1, 0, -position['y']], | |
[0, 0, 1, -position['z']], | |
[0, 0, 0, 1] | |
]) | |
theta_x = np.radians(rotation['x']) | |
rotation_matrix_x = np.array([ | |
[1, 0, 0, 0], | |
[0, np.cos(theta_x), -np.sin(theta_x), 0], | |
[0, np.sin(theta_x), np.cos(theta_x), 0], | |
[0, 0, 0, 1] | |
]) | |
transformation_matrix = np.dot(rotation_matrix_x, translation_matrix) | |
return transformation_matrix | |
def world_to_camera(positions, transformation_matrix): | |
camera_positions = [] | |
for pos in positions: | |
world_pos = np.array([pos['x'], pos['y'], pos['z'], 1]) | |
camera_pos = np.dot(transformation_matrix, world_pos) | |
camera_positions.append(camera_pos) | |
return camera_positions | |
def project_to_2d(camera_positions, camera_pose, image_width, image_height): | |
fov = camera_pose['fieldOfView'] | |
aspect_ratio = image_width / image_height | |
f = 1 / np.tan(np.radians(fov) / 2) | |
projection_matrix = np.array([ | |
[f / aspect_ratio, 0, 0, 0], | |
[0, f, 0, 0], | |
[0, 0, 1, 0] | |
]) | |
projected_points = [] | |
for pos in camera_positions: | |
projected = np.dot(projection_matrix, pos) | |
projected /= projected[2] | |
x = (projected[0] + 1) * image_width / 2 | |
y = (1 - projected[1]) * image_height / 2 | |
x = image_width - x | |
projected_points.append((x, y)) | |
return projected_points | |