kadirnar's picture
Upload 98 files
e7d5680 verified
raw
history blame
4.6 kB
import os
import subprocess
import cv2
from imageio_ffmpeg import get_ffmpeg_exe
from mmengine.logging import print_log
from moviepy.editor import VideoFileClip
from scenedetect import FrameTimecode
def iterate_files(folder_path):
for root, dirs, files in os.walk(folder_path):
# root contains the current directory path
# dirs contains the list of subdirectories in the current directory
# files contains the list of files in the current directory
# Process files in the current directory
for file in files:
file_path = os.path.join(root, file)
# print("File:", file_path)
yield file_path
# Process subdirectories and recursively call the function
for subdir in dirs:
subdir_path = os.path.join(root, subdir)
# print("Subdirectory:", subdir_path)
iterate_files(subdir_path)
def iterate_folders(folder_path):
for root, dirs, files in os.walk(folder_path):
for subdir in dirs:
subdir_path = os.path.join(root, subdir)
yield subdir_path
# print("Subdirectory:", subdir_path)
iterate_folders(subdir_path)
def clone_folder_structure(root_src, root_dst, verbose=False):
src_path_list = iterate_folders(root_src)
src_relpath_list = [os.path.relpath(x, root_src) for x in src_path_list]
os.makedirs(root_dst, exist_ok=True)
dst_path_list = [os.path.join(root_dst, x) for x in src_relpath_list]
for folder_path in dst_path_list:
os.makedirs(folder_path, exist_ok=True)
if verbose:
print(f"Create folder: '{folder_path}'")
def count_files(root, suffix=".mp4"):
files_list = iterate_files(root)
cnt = len([x for x in files_list if x.endswith(suffix)])
return cnt
def check_mp4_integrity(file_path, verbose=True, logger=None):
try:
VideoFileClip(file_path)
if verbose:
print_log(f"The MP4 file '{file_path}' is intact.", logger=logger)
return True
except Exception as e:
if verbose:
print_log(f"Error: {e}", logger=logger)
print_log(f"The MP4 file '{file_path}' is not intact.", logger=logger)
return False
def count_frames(video_path):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Could not open video file '{video_path}'")
return
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print(f"Total frames in the video '{video_path}': {total_frames}")
cap.release()
def split_video(
sample_path,
scene_list,
save_dir,
target_fps=30,
min_seconds=1,
max_seconds=10,
shorter_size=512,
verbose=False,
logger=None,
):
FFMPEG_PATH = get_ffmpeg_exe()
save_path_list = []
for idx, scene in enumerate(scene_list):
s, t = scene # FrameTimecode
fps = s.framerate
max_duration = FrameTimecode(timecode="00:00:00", fps=fps)
max_duration.frame_num = round(fps * max_seconds)
duration = min(max_duration, t - s)
if duration.get_frames() < round(min_seconds * fps):
continue
# save path
fname = os.path.basename(sample_path)
fname_wo_ext = os.path.splitext(fname)[0]
# TODO: fname pattern
save_path = os.path.join(save_dir, f"{fname_wo_ext}_scene-{idx}.mp4")
# ffmpeg cmd
cmd = [FFMPEG_PATH]
# Only show ffmpeg output for the first call, which will display any
# errors if it fails, and then break the loop. We only show error messages
# for the remaining calls.
# cmd += ['-v', 'error']
# input path
cmd += ["-i", sample_path]
# clip to cut
cmd += ["-nostdin", "-y", "-ss", str(s.get_seconds()), "-t", str(duration.get_seconds())]
# target fps
# cmd += ['-vf', 'select=mod(n\,2)']
cmd += ["-r", f"{target_fps}"]
# aspect ratio
cmd += ["-vf", f"scale='if(gt(iw,ih),-2,{shorter_size})':'if(gt(iw,ih),{shorter_size},-2)'"]
# cmd += ['-vf', f"scale='if(gt(iw,ih),{shorter_size},trunc(ow/a/2)*2)':-2"]
cmd += ["-map", "0", save_path]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout, stderr = proc.communicate()
if verbose:
stdout = stdout.decode("utf-8")
print_log(stdout, logger=logger)
save_path_list.append(sample_path)
print_log(f"Video clip saved to '{save_path}'", logger=logger)
return save_path_list