| | import os |
| | import cv2 |
| | import shutil |
| | import argparse |
| | import subprocess |
| | import numpy as np |
| | from pathlib import Path |
| |
|
| | from merge_videos_utils import combine_adaptive |
| |
|
| | def split_video_frames( |
| | video_path, |
| | output_path1, |
| | output_path2, |
| | split_frame=49, |
| | cross_frames=3, |
| | ): |
| | """ |
| | Splits a video into two parts with overlapping frames. |
| | |
| | Video 1: frames [0 .. split_frame-1] |
| | Video 2: frames [split_frame-cross_frames .. end] |
| | |
| | Example: |
| | split_frame=49, cross_frames=3 |
| | Video1: 0–48 |
| | Video2: 46–end |
| | |
| | """ |
| |
|
| | cap = cv2.VideoCapture(video_path) |
| | if not cap.isOpened(): |
| | print(f"Error opening video file {video_path}") |
| | return False |
| |
|
| | fps = cap.get(cv2.CAP_PROP_FPS) or 30.0 |
| | width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| | height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| | total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| |
|
| | |
| | if split_frame <= 0 or split_frame >= total_frames: |
| | print("Invalid split_frame value.") |
| | cap.release() |
| | return False |
| |
|
| | if cross_frames < 0: |
| | cross_frames = 0 |
| |
|
| | v1_start = 0 |
| | v1_end = split_frame |
| |
|
| | v2_start = max(0, split_frame - cross_frames) |
| | v2_end = total_frames |
| |
|
| | fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
| | out1 = cv2.VideoWriter(output_path1, fourcc, fps, (width, height)) |
| | out2 = cv2.VideoWriter(output_path2, fourcc, fps, (width, height)) |
| |
|
| | frame_idx = 0 |
| | written1 = 0 |
| | written2 = 0 |
| |
|
| | while True: |
| | ret, frame = cap.read() |
| | if not ret: |
| | break |
| |
|
| | if v1_start <= frame_idx < v1_end: |
| | out1.write(frame) |
| | written1 += 1 |
| |
|
| | if v2_start <= frame_idx < v2_end: |
| | out2.write(frame) |
| | written2 += 1 |
| |
|
| | frame_idx += 1 |
| |
|
| | cap.release() |
| | out1.release() |
| | out2.release() |
| |
|
| | print( |
| | f"Total frames: {total_frames}\n" |
| | f"Video1: [{v1_start}..{v1_end-1}] → {written1} frames\n" |
| | f"Video2: [{v2_start}..{v2_end-1}] → {written2} frames\n" |
| | f"Overlap frames: {max(0, v1_end - v2_start)}" |
| | ) |
| |
|
| | return True |
| |
|
| |
|
| | def split_mask_npz(mask_path, output_path1, output_path2, split_frame=49, cross_frames=3): |
| | """ |
| | Splits a mask npz file into two parts with overlapping frames. |
| | |
| | Assumes the npz contains a 'mask' key with shape: |
| | - [T, H, W] or |
| | - [T, 1, H, W] or generally [T, ...] |
| | |
| | mask1: frames [0 .. split_frame-1] |
| | mask2: frames [split_frame-cross_frames .. end] |
| | """ |
| |
|
| | try: |
| | data = np.load(mask_path) |
| | if "mask" not in data: |
| | print(f"Key 'mask' not found in {mask_path}") |
| | return False |
| |
|
| | mask = data["mask"] |
| | if mask.ndim < 1: |
| | print(f"Invalid mask shape {mask.shape}") |
| | return False |
| |
|
| | total_frames = mask.shape[0] |
| |
|
| | |
| | if split_frame <= 0 or split_frame >= total_frames: |
| | print(f"Invalid split_frame={split_frame} for total_frames={total_frames}") |
| | return False |
| |
|
| | if cross_frames < 0: |
| | cross_frames = 0 |
| |
|
| | v1_start, v1_end = 0, split_frame |
| | v2_start = max(0, split_frame - cross_frames) |
| | v2_end = total_frames |
| |
|
| | mask1 = mask[v1_start:v1_end] |
| | mask2 = mask[v2_start:v2_end] |
| |
|
| | np.savez_compressed(output_path1, mask=mask1) |
| | np.savez_compressed(output_path2, mask=mask2) |
| |
|
| | print( |
| | f"Split mask {mask_path} into {output_path1} and {output_path2}\n" |
| | f"Original shape: {mask.shape}\n" |
| | f"mask1 frames: [{v1_start}..{v1_end-1}] -> shape {mask1.shape}\n" |
| | f"mask2 frames: [{v2_start}..{v2_end-1}] -> shape {mask2.shape}\n" |
| | f"Overlap frames: {max(0, v1_end - v2_start)} (requested {cross_frames})" |
| | ) |
| | return True |
| |
|
| | except Exception as e: |
| | print(f"Failed to split mask {mask_path}: {e}") |
| | return False |
| |
|
| |
|
| | def process_split(output_anchor_path, cross_frames, split_frame=49): |
| | """ |
| | Main function to process the output_anchor folder. |
| | """ |
| | if not os.path.exists(output_anchor_path): |
| | print(f"Path {output_anchor_path} does not exist.") |
| | return |
| |
|
| | parent_dir = os.path.dirname(output_anchor_path) |
| | base_name = os.path.basename(output_anchor_path) |
| | |
| | dir_part1 = os.path.join(parent_dir, f"{base_name}_part1") |
| | dir_part2 = os.path.join(parent_dir, f"{base_name}_part2") |
| | |
| | if os.path.exists(dir_part1): |
| | shutil.rmtree(dir_part1) |
| | if os.path.exists(dir_part2): |
| | shutil.rmtree(dir_part2) |
| | |
| | os.makedirs(dir_part1) |
| | os.makedirs(dir_part2) |
| | |
| | target_subfolders = ['videos', 'masked_videos', 'masks', 'captions'] |
| | |
| | for sub in target_subfolders: |
| | src_sub = os.path.join(output_anchor_path, sub) |
| | dst_sub1 = os.path.join(dir_part1, sub) |
| | dst_sub2 = os.path.join(dir_part2, sub) |
| | |
| | os.makedirs(dst_sub1, exist_ok=True) |
| | os.makedirs(dst_sub2, exist_ok=True) |
| | |
| | if not os.path.exists(src_sub): |
| | continue |
| | |
| | for filename in os.listdir(src_sub): |
| | src_file = os.path.join(src_sub, filename) |
| | dst_file1 = os.path.join(dst_sub1, filename) |
| | dst_file2 = os.path.join(dst_sub2, filename) |
| | |
| | if filename.endswith('.mp4'): |
| | split_video_frames(src_file, dst_file1, dst_file2, split_frame, cross_frames) |
| | elif filename.endswith('.npz'): |
| | split_mask_npz(src_file, dst_file1, dst_file2, split_frame, cross_frames) |
| | elif filename.endswith('.txt'): |
| | |
| | shutil.copy(src_file, dst_file1) |
| | shutil.copy(src_file, dst_file2) |
| | else: |
| | pass |
| |
|
| |
|
| | def merge_crop_outputs(Unique_identifier): |
| | """ |
| | Merges split crop outputs back into the main crops folder. |
| | """ |
| | logs = [] |
| | logs.append("\n[MERGE] Merging split crop outputs...") |
| | crops_dir_path = Path(f"/app/{Unique_identifier}/crops") |
| | crops_part1_path = Path(f"/app/{Unique_identifier}/crops_part1") |
| | crops_part2_path = Path(f"/app/{Unique_identifier}/crops_part2") |
| | |
| | if crops_dir_path.exists(): |
| | for crop_folder in crops_dir_path.iterdir(): |
| | if crop_folder.is_dir() and crop_folder.name.lower() != "previews": |
| | crop_name = crop_folder.name |
| | |
| | |
| | part1_out_dir = crops_part1_path / crop_name / "output" |
| | part2_out_dir = crops_part2_path / crop_name / "output" |
| | dest_out_dir = crop_folder / "output" |
| | |
| | dest_out_dir.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | part1_video = None |
| | if part1_out_dir.exists(): |
| | videos = list(part1_out_dir.glob("*_vsr.mp4")) |
| | if videos: |
| | part1_video = videos[0] |
| | elif videos := list(part1_out_dir.glob("*_out.mp4")): |
| | part1_video = videos[0] |
| | |
| | part2_video = None |
| | if part2_out_dir.exists(): |
| | |
| | videos = list(part2_out_dir.glob("*_vsr.mp4")) |
| | if videos: |
| | part2_video = videos[0] |
| | elif videos := list(part2_out_dir.glob("*_out.mp4")): |
| | part2_video = videos[0] |
| | |
| | if part1_video and part2_video: |
| | |
| | dest_filename = part1_video.name |
| | dest_video_path = dest_out_dir / dest_filename |
| | combine_adaptive(str(part1_video), str(part2_video), str(dest_video_path)) |
| | else: |
| | logs.append(f"[MERGE] ⚠️ Missing parts for {crop_name}: Part1={part1_video is not None}, Part2={part2_video is not None}") |
| | return logs |
| |
|