Spaces:
Running
Running
import cv2 | |
import time | |
import numpy as np | |
from tqdm import tqdm | |
from scipy.spatial import Delaunay | |
from concurrent.futures import ProcessPoolExecutor | |
from src.process_images import get_images_and_landmarks | |
def morph(image_files, duration, frame_rate, output, guideline, is_dlib): | |
# Get the list of images and landmarks | |
images_list, landmarks_list = get_images_and_landmarks(image_files, is_dlib) | |
video_frames = [] # List of frames for the video | |
sequence_time = time.time() | |
print("Generating morph sequence...", end="\n\n") | |
# Use ProcessPoolExecutor to parallelize the generation of morph sequences | |
with ProcessPoolExecutor() as executor: | |
futures = [] | |
for i in range(1, len(images_list)): | |
src_image, src_landmarks = images_list[i-1], landmarks_list[i-1] | |
dst_image, dst_landmarks = images_list[i], landmarks_list[i] | |
# Generate Delaunay Triangulation | |
tri = Delaunay(dst_landmarks).simplices | |
# Submit the task to the executor | |
futures.append((i, executor.submit(generate_morph_sequence, duration, frame_rate, src_image, dst_image, src_landmarks, dst_landmarks, tri, guideline))) | |
# Retrieve and store the results in the correct order | |
results = [None] * (len(images_list) - 1) | |
for idx, future in futures: | |
results[idx - 1] = future.result() | |
for sequence_frames in results: | |
video_frames.extend(sequence_frames) | |
print(f"Total time taken to generate morph sequence: {time.time() - sequence_time:.2f} seconds", end="\n\n") | |
# Write the frames to a video file | |
write_frames_to_video(video_frames, frame_rate, output) | |
def generate_morph_sequence(duration, frame_rate, image1, image2, landmarks1, landmarks2, tri, guideline): | |
num_frames = int(duration * frame_rate) | |
morphed_frames = [] | |
for frame in range(num_frames): | |
alpha = frame / (num_frames - 1) | |
# Working with floats for better precision | |
image1_float = np.float32(image1) | |
image2_float = np.float32(image2) | |
# Compute the intermediate landmarks at time alpha | |
landmarks = [] | |
for i in range(len(landmarks1)): | |
x = (1 - alpha) * landmarks1[i][0] + alpha * landmarks2[i][0] | |
y = (1 - alpha) * landmarks1[i][1] + alpha * landmarks2[i][1] | |
landmarks.append((x, y)) | |
# Allocate space for final output | |
morphed_frame = np.zeros_like(image1_float) | |
for i in range(len(tri)): | |
x = tri[i][0] | |
y = tri[i][1] | |
z = tri[i][2] | |
t1 = [landmarks1[x], landmarks1[y], landmarks1[z]] | |
t2 = [landmarks2[x], landmarks2[y], landmarks2[z]] | |
t = [landmarks[x], landmarks[y], landmarks[z]] | |
# Morph one triangle at a time. | |
morph_triangle(image1_float, image2_float, morphed_frame, t1, t2, t, alpha) | |
if guideline: | |
# Draw lines for the face landmarks | |
points = [(int(t[i][0]), int(t[i][1])) for i in range(3)] | |
for i in range(3): | |
# image, (x1, y1), (x2, y2), color, thickness, lineType, shift | |
cv2.line(morphed_frame, points[i], points[(i + 1) % 3], (255, 255, 255), 1, 8, 0) | |
# Convert the morphed image to RGB color space (from BGR) | |
morphed_frame = cv2.cvtColor(np.uint8(morphed_frame), cv2.COLOR_BGR2RGB) | |
morphed_frames.append(morphed_frame) | |
return morphed_frames | |
def morph_triangle(image1, image2, morphed_image, t1, t2, t, alpha): | |
# Calculate bounding rectangles and offset points together | |
r, r1, r2 = [cv2.boundingRect(np.float32([tri])) for tri in [t, t1, t2]] | |
# Offset the triangle points by the top-left corner of the corresponding bounding rectangle | |
t_rect, t1_rect, t2_rect = [[(tri[i][0] - rect[0], tri[i][1] - rect[1]) for i in range(3)] | |
for tri, rect in zip([t, t1, t2], [r, r1, r2])] | |
# Create a mask to keep only the pixels inside the triangle | |
mask = np.zeros((r[3], r[2], 3), dtype=np.float32) | |
# Fill the mask with white pixels inside the triangle | |
cv2.fillConvexPoly(mask, np.int32(t_rect), (1.0, 1.0, 1.0), 16, 0) | |
# Extract the triangle from the first and second image | |
image1_rect = image1[r1[1]:r1[1]+r1[3], r1[0]:r1[0]+r1[2]] | |
image2_rect = image2[r2[1]:r2[1]+r2[3], r2[0]:r2[0]+r2[2]] | |
size = (r[2], r[3]) # Size of the bounding rectangle | |
# Apply affine transformation to warp the triangles from the source image to the destination image | |
warpImage1 = apply_affine_transform(image1_rect, t1_rect, t_rect, size) | |
warpImage2 = apply_affine_transform(image2_rect, t2_rect, t_rect, size) | |
# Perform alpha blending between the warped triangles and copy the result to the destination image | |
morphed_image_rect = warpImage1 * (1 - alpha) + warpImage2 * alpha | |
morphed_image[r[1]:r[1]+r[3], r[0]:r[0]+r[2]] = morphed_image[r[1]:r[1]+r[3], r[0]:r[0]+r[2]] * (1 - mask) + morphed_image_rect * mask | |
return morphed_image | |
def apply_affine_transform(img, src, dst, size): | |
""" | |
Apply an affine transformation to the image. | |
""" | |
warp_matrix = cv2.getAffineTransform(np.float32(src), np.float32(dst)) | |
return cv2.warpAffine(img, warp_matrix, (size[0], size[1]), None, flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REPLICATE) | |
def write_frames_to_video(frames, frame_rate, output): | |
# Get the height and width of the frames | |
height, width, _ = frames[0].shape | |
# Cut the outside pixels to remove the black border | |
pad = 2 | |
new_height = height - pad * 2 | |
new_width = width - pad * 2 | |
# Initialize the video writer | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
out = cv2.VideoWriter(output, fourcc, frame_rate, (new_width, new_height)) | |
# Write the frames to the video | |
print("Writing frames to video...") | |
for frame in tqdm(frames): | |
# Cut the outside pixels | |
cut_frame = frame[pad:new_height+pad, pad:new_width+pad] | |
out.write(cut_frame) | |
out.release() | |
print(f"Video saved at: {output}") | |