import os import subprocess import cv2 from imageio_ffmpeg import get_ffmpeg_exe from mmengine.logging import print_log from moviepy.editor import VideoFileClip from scenedetect import FrameTimecode def iterate_files(folder_path): for root, dirs, files in os.walk(folder_path): # root contains the current directory path # dirs contains the list of subdirectories in the current directory # files contains the list of files in the current directory # Process files in the current directory for file in files: file_path = os.path.join(root, file) # print("File:", file_path) yield file_path # Process subdirectories and recursively call the function for subdir in dirs: subdir_path = os.path.join(root, subdir) # print("Subdirectory:", subdir_path) iterate_files(subdir_path) def iterate_folders(folder_path): for root, dirs, files in os.walk(folder_path): for subdir in dirs: subdir_path = os.path.join(root, subdir) yield subdir_path # print("Subdirectory:", subdir_path) iterate_folders(subdir_path) def clone_folder_structure(root_src, root_dst, verbose=False): src_path_list = iterate_folders(root_src) src_relpath_list = [os.path.relpath(x, root_src) for x in src_path_list] os.makedirs(root_dst, exist_ok=True) dst_path_list = [os.path.join(root_dst, x) for x in src_relpath_list] for folder_path in dst_path_list: os.makedirs(folder_path, exist_ok=True) if verbose: print(f"Create folder: '{folder_path}'") def count_files(root, suffix=".mp4"): files_list = iterate_files(root) cnt = len([x for x in files_list if x.endswith(suffix)]) return cnt def check_mp4_integrity(file_path, verbose=True, logger=None): try: VideoFileClip(file_path) if verbose: print_log(f"The MP4 file '{file_path}' is intact.", logger=logger) return True except Exception as e: if verbose: print_log(f"Error: {e}", logger=logger) print_log(f"The MP4 file '{file_path}' is not intact.", logger=logger) return False def count_frames(video_path): cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print(f"Error: Could not open video file '{video_path}'") return total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"Total frames in the video '{video_path}': {total_frames}") cap.release() def split_video( sample_path, scene_list, save_dir, target_fps=30, min_seconds=1, max_seconds=10, shorter_size=512, verbose=False, logger=None, ): FFMPEG_PATH = get_ffmpeg_exe() save_path_list = [] for idx, scene in enumerate(scene_list): s, t = scene # FrameTimecode fps = s.framerate max_duration = FrameTimecode(timecode="00:00:00", fps=fps) max_duration.frame_num = round(fps * max_seconds) duration = min(max_duration, t - s) if duration.get_frames() < round(min_seconds * fps): continue # save path fname = os.path.basename(sample_path) fname_wo_ext = os.path.splitext(fname)[0] # TODO: fname pattern save_path = os.path.join(save_dir, f"{fname_wo_ext}_scene-{idx}.mp4") # ffmpeg cmd cmd = [FFMPEG_PATH] # Only show ffmpeg output for the first call, which will display any # errors if it fails, and then break the loop. We only show error messages # for the remaining calls. # cmd += ['-v', 'error'] # input path cmd += ["-i", sample_path] # clip to cut cmd += ["-nostdin", "-y", "-ss", str(s.get_seconds()), "-t", str(duration.get_seconds())] # target fps # cmd += ['-vf', 'select=mod(n\,2)'] cmd += ["-r", f"{target_fps}"] # aspect ratio cmd += ["-vf", f"scale='if(gt(iw,ih),-2,{shorter_size})':'if(gt(iw,ih),{shorter_size},-2)'"] # cmd += ['-vf', f"scale='if(gt(iw,ih),{shorter_size},trunc(ow/a/2)*2)':-2"] cmd += ["-map", "0", save_path] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, stderr = proc.communicate() if verbose: stdout = stdout.decode("utf-8") print_log(stdout, logger=logger) save_path_list.append(sample_path) print_log(f"Video clip saved to '{save_path}'", logger=logger) return save_path_list