Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import itertools | |
| import numpy as np | |
| import torch | |
| from PIL import Image, ImageOps | |
| import cv2 | |
| import psutil | |
| import subprocess | |
| import re | |
| import time | |
| import folder_paths | |
| from comfy.utils import common_upscale, ProgressBar | |
| import nodes | |
| from comfy.k_diffusion.utils import FolderOfImages | |
| from .logger import logger | |
| from .utils import BIGMAX, DIMMAX, calculate_file_hash, get_sorted_dir_files_from_directory,\ | |
| lazy_get_audio, hash_path, validate_path, strip_path, try_download_video, \ | |
| is_url, imageOrLatent, ffmpeg_path, ENCODE_ARGS, floatOrInt | |
| video_extensions = ['webm', 'mp4', 'mkv', 'gif', 'mov'] | |
| VHSLoadFormats = { | |
| 'None': {}, | |
| 'AnimateDiff': {'target_rate': 8, 'dim': (8,0,512,512)}, | |
| 'Mochi': {'target_rate': 24, 'dim': (16,0,848,480), 'frames':(6,1)}, | |
| 'LTXV': {'target_rate': 24, 'dim': (32,0,768,512), 'frames':(8,1)}, | |
| 'Hunyuan': {'target_rate': 24, 'dim': (16,0,848,480), 'frames':(4,1)}, | |
| 'Cosmos': {'target_rate': 24, 'dim': (16,0,1280,704), 'frames':(8,1)}, | |
| 'Wan': {'target_rate': 16, 'dim': (8,0,832,480), 'frames':(4,1)}, | |
| } | |
| """ | |
| External plugins may add additional formats to nodes.VHSLoadFormats | |
| In addition to shorthand options, direct widget names will map a given dict to options. | |
| Adding a third arguement to a frames tuple can enable strict checks on number | |
| of loaded frames, i.e (8,1,True) | |
| """ | |
| if not hasattr(nodes, 'VHSLoadFormats'): | |
| nodes.VHSLoadFormats = {} | |
| def get_load_formats(): | |
| #TODO: check if {**extra_config.VHSLoafFormats, **VHSLoadFormats} has minimum version | |
| formats = {} | |
| formats.update(nodes.VHSLoadFormats) | |
| formats.update(VHSLoadFormats) | |
| return (list(formats.keys()), | |
| {'default': 'AnimateDiff', 'formats': formats}) | |
| def get_format(format): | |
| if format in VHSLoadFormats: | |
| return VHSLoadFormats[format] | |
| return nodes.VHSLoadFormats.get(format, {}) | |
| def is_gif(filename) -> bool: | |
| file_parts = filename.split('.') | |
| return len(file_parts) > 1 and file_parts[-1] == "gif" | |
| def target_size(width, height, custom_width, custom_height, downscale_ratio=8) -> tuple[int, int]: | |
| if downscale_ratio is None: | |
| downscale_ratio = 8 | |
| if custom_width == 0 and custom_height == 0: | |
| pass | |
| elif custom_height == 0: | |
| height *= custom_width/width | |
| width = custom_width | |
| elif custom_width == 0: | |
| width *= custom_height/height | |
| height = custom_height | |
| else: | |
| width = custom_width | |
| height = custom_height | |
| width = int(width/downscale_ratio + 0.5) * downscale_ratio | |
| height = int(height/downscale_ratio + 0.5) * downscale_ratio | |
| return (width, height) | |
| def cv_frame_generator(video, force_rate, frame_load_cap, skip_first_frames, | |
| select_every_nth, meta_batch=None, unique_id=None): | |
| video_cap = cv2.VideoCapture(video) | |
| if not video_cap.isOpened() or not video_cap.grab(): | |
| raise ValueError(f"{video} could not be loaded with cv.") | |
| # extract video metadata | |
| fps = video_cap.get(cv2.CAP_PROP_FPS) | |
| width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(video_cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = total_frames / fps | |
| width = 0 | |
| if width <=0 or height <=0: | |
| _, frame = video_cap.retrieve() | |
| height, width, _ = frame.shape | |
| # set video_cap to look at start_index frame | |
| total_frame_count = 0 | |
| total_frames_evaluated = -1 | |
| frames_added = 0 | |
| base_frame_time = 1 / fps | |
| prev_frame = None | |
| if force_rate == 0: | |
| target_frame_time = base_frame_time | |
| else: | |
| target_frame_time = 1/force_rate | |
| if total_frames > 0: | |
| if force_rate != 0: | |
| yieldable_frames = int(total_frames / fps * force_rate) | |
| else: | |
| yieldable_frames = total_frames | |
| if select_every_nth: | |
| yieldable_frames //= select_every_nth | |
| if frame_load_cap != 0: | |
| yieldable_frames = min(frame_load_cap, yieldable_frames) | |
| else: | |
| yieldable_frames = 0 | |
| yield (width, height, fps, duration, total_frames, target_frame_time, yieldable_frames) | |
| pbar = ProgressBar(yieldable_frames) | |
| time_offset=target_frame_time | |
| while video_cap.isOpened(): | |
| if time_offset < target_frame_time: | |
| is_returned = video_cap.grab() | |
| # if didn't return frame, video has ended | |
| if not is_returned: | |
| break | |
| time_offset += base_frame_time | |
| if time_offset < target_frame_time: | |
| continue | |
| time_offset -= target_frame_time | |
| # if not at start_index, skip doing anything with frame | |
| total_frame_count += 1 | |
| if total_frame_count <= skip_first_frames: | |
| continue | |
| else: | |
| total_frames_evaluated += 1 | |
| # if should not be selected, skip doing anything with frame | |
| if total_frames_evaluated%select_every_nth != 0: | |
| continue | |
| # opencv loads images in BGR format (yuck), so need to convert to RGB for ComfyUI use | |
| # follow up: can videos ever have an alpha channel? | |
| # To my testing: No. opencv has no support for alpha | |
| unused, frame = video_cap.retrieve() | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # convert frame to comfyui's expected format | |
| # TODO: frame contains no exif information. Check if opencv2 has already applied | |
| frame = np.array(frame, dtype=np.float32) | |
| torch.from_numpy(frame).div_(255) | |
| if prev_frame is not None: | |
| inp = yield prev_frame | |
| if inp is not None: | |
| #ensure the finally block is called | |
| return | |
| prev_frame = frame | |
| frames_added += 1 | |
| if pbar is not None: | |
| pbar.update_absolute(frames_added, yieldable_frames) | |
| # if cap exists and we've reached it, stop processing frames | |
| if frame_load_cap > 0 and frames_added >= frame_load_cap: | |
| break | |
| if meta_batch is not None: | |
| meta_batch.inputs.pop(unique_id) | |
| meta_batch.has_closed_inputs = True | |
| if prev_frame is not None: | |
| yield prev_frame | |
| def ffmpeg_frame_generator(video, force_rate, frame_load_cap, start_time, | |
| custom_width, custom_height, downscale_ratio=8, | |
| meta_batch=None, unique_id=None): | |
| args_dummy = [ffmpeg_path, "-i", video, '-c', 'copy', '-frames:v', '1', "-f", "null", "-"] | |
| size_base = None | |
| fps_base = None | |
| try: | |
| dummy_res = subprocess.run(args_dummy, stdout=subprocess.DEVNULL, | |
| stderr=subprocess.PIPE, check=True) | |
| except subprocess.CalledProcessError as e: | |
| raise Exception("An error occurred in the ffmpeg subprocess:\n" \ | |
| + e.stderr.decode(*ENCODE_ARGS)) | |
| lines = dummy_res.stderr.decode(*ENCODE_ARGS) | |
| for line in lines.split('\n'): | |
| match = re.search("^ *Stream .* Video.*, ([1-9]|\\d{2,})x(\\d+)", line) | |
| if match is not None: | |
| size_base = [int(match.group(1)), int(match.group(2))] | |
| fps_match = re.search(", ([\\d\\.]+) fps", line) | |
| if fps_match: | |
| fps_base = float(fps_match.group(1)) | |
| else: | |
| fps_base = 1 | |
| alpha = re.search("(yuva|rgba)", line) is not None | |
| break | |
| else: | |
| raise Exception("Failed to parse video/image information. FFMPEG output:\n" + lines) | |
| durs_match = re.search("Duration: (\\d+:\\d+:\\d+\\.\\d+),", lines) | |
| if durs_match: | |
| durs = durs_match.group(1).split(':') | |
| duration = int(durs[0])*360 + int(durs[1])*60 + float(durs[2]) | |
| else: | |
| duration = 0 | |
| if start_time > 0: | |
| if start_time > 4: | |
| post_seek = ['-ss', '4'] | |
| pre_seek = ['-ss', str(start_time - 4)] | |
| else: | |
| post_seek = ['-ss', str(start_time)] | |
| pre_seek = [] | |
| else: | |
| pre_seek = [] | |
| post_seek = [] | |
| args_all_frames = [ffmpeg_path, "-v", "error", "-an"] + pre_seek + \ | |
| ["-i", video, "-pix_fmt", "rgba" if alpha else "rgb24"] + post_seek | |
| vfilters = [] | |
| if force_rate != 0: | |
| vfilters.append("fps=fps="+str(force_rate)) | |
| if custom_width != 0 or custom_height != 0: | |
| size = target_size(size_base[0], size_base[1], custom_width, | |
| custom_height, downscale_ratio=downscale_ratio) | |
| ar = float(size[0])/float(size[1]) | |
| if abs(size_base[0]*ar-size_base[1]) >= 1: | |
| #Aspect ratio is changed. Crop to new aspect ratio before scale | |
| vfilters.append(f"crop=if(gt({ar}\\,a)\\,iw\\,ih*{ar}):if(gt({ar}\\,a)\\,iw/{ar}\\,ih)") | |
| size_arg = ':'.join(map(str,size)) | |
| vfilters.append(f"scale={size_arg}") | |
| else: | |
| size = size_base | |
| if len(vfilters) > 0: | |
| args_all_frames += ["-vf", ",".join(vfilters)] | |
| yieldable_frames = (force_rate or fps_base)*duration | |
| if frame_load_cap > 0: | |
| args_all_frames += ["-frames:v", str(frame_load_cap)] | |
| yieldable_frames = min(yieldable_frames, frame_load_cap) | |
| yield (size_base[0], size_base[1], fps_base, duration, fps_base * duration, | |
| 1/(force_rate or fps_base), yieldable_frames, size[0], size[1], alpha) | |
| args_all_frames += ["-f", "rawvideo", "-"] | |
| pbar = ProgressBar(yieldable_frames) | |
| try: | |
| with subprocess.Popen(args_all_frames, stdout=subprocess.PIPE) as proc: | |
| #Manually buffer enough bytes for an image | |
| bpi = size[0] * size[1] * (4 if alpha else 3) | |
| current_bytes = bytearray(bpi) | |
| current_offset=0 | |
| prev_frame = None | |
| while True: | |
| bytes_read = proc.stdout.read(bpi - current_offset) | |
| if bytes_read is None:#sleep to wait for more data | |
| time.sleep(.1) | |
| continue | |
| if len(bytes_read) == 0:#EOF | |
| break | |
| current_bytes[current_offset:len(bytes_read)] = bytes_read | |
| current_offset+=len(bytes_read) | |
| if current_offset == bpi: | |
| if prev_frame is not None: | |
| yield prev_frame | |
| pbar.update(1) | |
| prev_frame = np.array(current_bytes, dtype=np.float32).reshape(size[1], size[0], 4 if alpha else 3) / 255.0 | |
| current_offset = 0 | |
| except BrokenPipeError as e: | |
| raise Exception("An error occured in the ffmpeg subprocess:\n" \ | |
| + proc.stderr.read().decode(*ENCODE_ARGS)) | |
| if meta_batch is not None: | |
| meta_batch.inputs.pop(unique_id) | |
| meta_batch.has_closed_inputs = True | |
| if prev_frame is not None: | |
| yield prev_frame | |
| #Python 3.12 adds an itertools.batched, but it's easily replicated for legacy support | |
| def batched(it, n): | |
| while batch := tuple(itertools.islice(it, n)): | |
| yield batch | |
| def batched_vae_encode(images, vae, frames_per_batch): | |
| for batch in batched(images, frames_per_batch): | |
| image_batch = torch.from_numpy(np.array(batch)) | |
| yield from vae.encode(image_batch).numpy() | |
| def resized_cv_frame_gen(custom_width, custom_height, downscale_ratio, **kwargs): | |
| gen = cv_frame_generator(**kwargs) | |
| info = next(gen) | |
| width, height = info[0], info[1] | |
| frames_per_batch = (1920 * 1080 * 16) // (width * height) or 1 | |
| if kwargs.get('meta_batch', None) is not None: | |
| frames_per_batch = min(frames_per_batch, kwargs['meta_batch'].frames_per_batch) | |
| if custom_width != 0 or custom_height != 0 or downscale_ratio is not None: | |
| new_size = target_size(width, height, custom_width, custom_height, downscale_ratio) | |
| yield (*info, new_size[0], new_size[1], False) | |
| if new_size[0] != width or new_size[1] != height: | |
| def rescale(frame): | |
| s = torch.from_numpy(np.fromiter(frame, np.dtype((np.float32, (height, width, 3))))) | |
| s = s.movedim(-1,1) | |
| s = common_upscale(s, new_size[0], new_size[1], "lanczos", "center") | |
| return s.movedim(1,-1).numpy() | |
| yield from itertools.chain.from_iterable(map(rescale, batched(gen, frames_per_batch))) | |
| return | |
| else: | |
| yield (*info, info[0], info[1], False) | |
| yield from gen | |
| def load_video(meta_batch=None, unique_id=None, memory_limit_mb=None, vae=None, | |
| generator=resized_cv_frame_gen, format='None', **kwargs): | |
| if 'force_size' in kwargs: | |
| kwargs.pop('force_size') | |
| logger.warn("force_size has been removed. Did you reload the webpage after updating?") | |
| format = get_format(format) | |
| kwargs['video'] = strip_path(kwargs['video']) | |
| if vae is not None: | |
| downscale_ratio = getattr(vae, "downscale_ratio", 8) | |
| else: | |
| downscale_ratio = format.get('dim', (1,))[0] | |
| if meta_batch is None or unique_id not in meta_batch.inputs: | |
| gen = generator(meta_batch=meta_batch, unique_id=unique_id, downscale_ratio=downscale_ratio, **kwargs) | |
| (width, height, fps, duration, total_frames, target_frame_time, yieldable_frames, new_width, new_height, alpha) = next(gen) | |
| if meta_batch is not None: | |
| meta_batch.inputs[unique_id] = (gen, width, height, fps, duration, total_frames, target_frame_time, yieldable_frames, new_width, new_height, alpha) | |
| if yieldable_frames: | |
| meta_batch.total_frames = min(meta_batch.total_frames, yieldable_frames) | |
| else: | |
| (gen, width, height, fps, duration, total_frames, target_frame_time, yieldable_frames, new_width, new_height, alpha) = meta_batch.inputs[unique_id] | |
| memory_limit = None | |
| if memory_limit_mb is not None: | |
| memory_limit *= 2 ** 20 | |
| else: | |
| #TODO: verify if garbage collection should be performed here. | |
| #leaves ~128 MB unreserved for safety | |
| try: | |
| memory_limit = (psutil.virtual_memory().available + psutil.swap_memory().free) - 2 ** 27 | |
| except: | |
| logger.warn("Failed to calculate available memory. Memory load limit has been disabled") | |
| memory_limit = BIGMAX | |
| if vae is not None: | |
| #space required to load as f32, exist as latent with wiggle room, decode to f32 | |
| max_loadable_frames = int(memory_limit//(width*height*3*(4+4+1/10))) | |
| else: | |
| #TODO: use better estimate for when vae is not None | |
| #Consider completely ignoring for load_latent case? | |
| max_loadable_frames = int(memory_limit//(width*height*3*(.1))) | |
| if meta_batch is not None: | |
| if 'frames' in format: | |
| if meta_batch.frames_per_batch % format['frames'][0] != format['frames'][1]: | |
| error = (meta_batch.frames_per_batch - format['frames'][1]) % format['frames'][0] | |
| suggested = meta_batch.frames_per_batch - error | |
| if error > format['frames'][0] / 2: | |
| suggested += format['frames'][0] | |
| raise RuntimeError(f"The chosen frames per batch is incompatible with the selected format. Try {suggested}") | |
| if meta_batch.frames_per_batch > max_loadable_frames: | |
| raise RuntimeError(f"Meta Batch set to {meta_batch.frames_per_batch} frames but only {max_loadable_frames} can fit in memory") | |
| gen = itertools.islice(gen, meta_batch.frames_per_batch) | |
| else: | |
| original_gen = gen | |
| gen = itertools.islice(gen, max_loadable_frames) | |
| frames_per_batch = (1920 * 1080 * 16) // (width * height) or 1 | |
| if vae is not None: | |
| gen = batched_vae_encode(gen, vae, frames_per_batch) | |
| vw,vh = new_width//downscale_ratio, new_height//downscale_ratio | |
| channels = getattr(vae, 'latent_channels', 4) | |
| images = torch.from_numpy(np.fromiter(gen, np.dtype((np.float32, (channels,vh,vw))))) | |
| else: | |
| #Some minor wizardry to eliminate a copy and reduce max memory by a factor of ~2 | |
| images = torch.from_numpy(np.fromiter(gen, np.dtype((np.float32, (new_height, new_width, 4 if alpha else 3))))) | |
| if meta_batch is None and memory_limit is not None: | |
| try: | |
| next(original_gen) | |
| raise RuntimeError(f"Memory limit hit after loading {len(images)} frames. Stopping execution.") | |
| except StopIteration: | |
| pass | |
| if len(images) == 0: | |
| raise RuntimeError("No frames generated") | |
| if 'frames' in format and len(images) % format['frames'][0] != format['frames'][1]: | |
| err_msg = f"The number of frames loaded {len(images)}, does not match the requirements of the currently selected format." | |
| if len(format['frames']) > 2 and format['frames'][2]: | |
| raise RuntimeError(err_msg) | |
| div, mod = format['frames'][:2] | |
| frames = (len(images) - mod) // div * div + mod | |
| images = images[:frames] | |
| #Commenting out log message since it's displayed in UI. consider further | |
| #logger.warn(err_msg + f" Output has been truncated to {len(images)} frames.") | |
| if 'start_time' in kwargs: | |
| start_time = kwargs['start_time'] | |
| else: | |
| start_time = kwargs['skip_first_frames'] * target_frame_time | |
| target_frame_time *= kwargs.get('select_every_nth', 1) | |
| #Setup lambda for lazy audio capture | |
| audio = lazy_get_audio(kwargs['video'], start_time, kwargs['frame_load_cap']*target_frame_time) | |
| #Adjust target_frame_time for select_every_nth | |
| video_info = { | |
| "source_fps": fps, | |
| "source_frame_count": total_frames, | |
| "source_duration": duration, | |
| "source_width": width, | |
| "source_height": height, | |
| "loaded_fps": 1/target_frame_time, | |
| "loaded_frame_count": len(images), | |
| "loaded_duration": len(images) * target_frame_time, | |
| "loaded_width": new_width, | |
| "loaded_height": new_height, | |
| } | |
| if vae is None: | |
| return (images, len(images), audio, video_info) | |
| else: | |
| return ({"samples": images}, len(images), audio, video_info) | |
| class LoadVideoUpload: | |
| def INPUT_TYPES(s): | |
| input_dir = folder_paths.get_input_directory() | |
| files = [] | |
| for f in os.listdir(input_dir): | |
| if os.path.isfile(os.path.join(input_dir, f)): | |
| file_parts = f.split('.') | |
| if len(file_parts) > 1 and (file_parts[-1].lower() in video_extensions): | |
| files.append(f) | |
| return {"required": { | |
| "video": (sorted(files),), | |
| "force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}), | |
| "custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
| "custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
| "frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}), | |
| "skip_first_frames": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1}), | |
| "select_every_nth": ("INT", {"default": 1, "min": 1, "max": BIGMAX, "step": 1}), | |
| }, | |
| "optional": { | |
| "meta_batch": ("VHS_BatchManager",), | |
| "vae": ("VAE",), | |
| "format": get_load_formats(), | |
| }, | |
| "hidden": { | |
| "force_size": "STRING", | |
| "unique_id": "UNIQUE_ID" | |
| }, | |
| } | |
| CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" | |
| RETURN_TYPES = (imageOrLatent, "INT", "AUDIO", "VHS_VIDEOINFO") | |
| RETURN_NAMES = ("IMAGE", "frame_count", "audio", "video_info") | |
| FUNCTION = "load_video" | |
| def load_video(self, **kwargs): | |
| kwargs['video'] = folder_paths.get_annotated_filepath(strip_path(kwargs['video'])) | |
| return load_video(**kwargs) | |
| def IS_CHANGED(s, video, **kwargs): | |
| image_path = folder_paths.get_annotated_filepath(video) | |
| return calculate_file_hash(image_path) | |
| def VALIDATE_INPUTS(s, video): | |
| if not folder_paths.exists_annotated_filepath(video): | |
| return "Invalid video file: {}".format(video) | |
| return True | |
| class LoadVideoPath: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "video": ("STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}), | |
| "force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}), | |
| "custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
| "custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
| "frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}), | |
| "skip_first_frames": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1}), | |
| "select_every_nth": ("INT", {"default": 1, "min": 1, "max": BIGMAX, "step": 1}), | |
| }, | |
| "optional": { | |
| "meta_batch": ("VHS_BatchManager",), | |
| "vae": ("VAE",), | |
| "format": get_load_formats(), | |
| }, | |
| "hidden": { | |
| "force_size": "STRING", | |
| "unique_id": "UNIQUE_ID" | |
| }, | |
| } | |
| CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" | |
| RETURN_TYPES = (imageOrLatent, "INT", "AUDIO", "VHS_VIDEOINFO") | |
| RETURN_NAMES = ("IMAGE", "frame_count", "audio", "video_info") | |
| FUNCTION = "load_video" | |
| def load_video(self, **kwargs): | |
| if kwargs['video'] is None or validate_path(kwargs['video']) != True: | |
| raise Exception("video is not a valid path: " + kwargs['video']) | |
| if is_url(kwargs['video']): | |
| kwargs['video'] = try_download_video(kwargs['video']) or kwargs['video'] | |
| return load_video(**kwargs) | |
| def IS_CHANGED(s, video, **kwargs): | |
| return hash_path(video) | |
| def VALIDATE_INPUTS(s, video): | |
| return validate_path(video, allow_none=True) | |
| class LoadVideoFFmpegUpload: | |
| def INPUT_TYPES(s): | |
| input_dir = folder_paths.get_input_directory() | |
| files = [] | |
| for f in os.listdir(input_dir): | |
| if os.path.isfile(os.path.join(input_dir, f)): | |
| file_parts = f.split('.') | |
| if len(file_parts) > 1 and (file_parts[-1].lower() in video_extensions): | |
| files.append(f) | |
| return {"required": { | |
| "video": (sorted(files),), | |
| "force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}), | |
| "custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
| "custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
| "frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}), | |
| "start_time": ("FLOAT", {"default": 0, "min": 0, "max": BIGMAX, "step": .001}), | |
| }, | |
| "optional": { | |
| "meta_batch": ("VHS_BatchManager",), | |
| "vae": ("VAE",), | |
| "format": get_load_formats(), | |
| }, | |
| "hidden": { | |
| "force_size": "STRING", | |
| "unique_id": "UNIQUE_ID" | |
| }, | |
| } | |
| CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" | |
| RETURN_TYPES = (imageOrLatent, "MASK", "AUDIO", "VHS_VIDEOINFO") | |
| RETURN_NAMES = ("IMAGE", "mask", "audio", "video_info") | |
| FUNCTION = "load_video" | |
| def load_video(self, **kwargs): | |
| kwargs['video'] = folder_paths.get_annotated_filepath(strip_path(kwargs['video'])) | |
| image, _, audio, video_info = load_video(**kwargs, generator=ffmpeg_frame_generator) | |
| if image.size(3) == 4: | |
| return (image[:,:,:,:3], 1-image[:,:,:,3], audio, video_info) | |
| return (image, torch.zeros(image.size(0), 64, 64, device="cpu"), audio, video_info) | |
| def IS_CHANGED(s, video, **kwargs): | |
| image_path = folder_paths.get_annotated_filepath(video) | |
| return calculate_file_hash(image_path) | |
| def VALIDATE_INPUTS(s, video): | |
| if not folder_paths.exists_annotated_filepath(video): | |
| return "Invalid video file: {}".format(video) | |
| return True | |
| class LoadVideoFFmpegPath: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "video": ("STRING", {"placeholder": "X://insert/path/here.mp4", "vhs_path_extensions": video_extensions}), | |
| "force_rate": (floatOrInt, {"default": 0, "min": 0, "max": 60, "step": 1, "disable": 0}), | |
| "custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
| "custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, 'disable': 0}), | |
| "frame_load_cap": ("INT", {"default": 0, "min": 0, "max": BIGMAX, "step": 1, "disable": 0}), | |
| "start_time": ("FLOAT", {"default": 0, "min": 0, "max": BIGMAX, "step": .001}), | |
| }, | |
| "optional": { | |
| "meta_batch": ("VHS_BatchManager",), | |
| "vae": ("VAE",), | |
| "format": get_load_formats(), | |
| }, | |
| "hidden": { | |
| "force_size": "STRING", | |
| "unique_id": "UNIQUE_ID" | |
| }, | |
| } | |
| CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" | |
| RETURN_TYPES = (imageOrLatent, "MASK", "AUDIO", "VHS_VIDEOINFO") | |
| RETURN_NAMES = ("IMAGE", "mask", "audio", "video_info") | |
| FUNCTION = "load_video" | |
| def load_video(self, **kwargs): | |
| if kwargs['video'] is None or validate_path(kwargs['video']) != True: | |
| raise Exception("video is not a valid path: " + kwargs['video']) | |
| if is_url(kwargs['video']): | |
| kwargs['video'] = try_download_video(kwargs['video']) or kwargs['video'] | |
| image, _, audio, video_info = load_video(**kwargs, generator=ffmpeg_frame_generator) | |
| if isinstance(image, dict): | |
| return (image, None, audio, video_info) | |
| if image.size(3) == 4: | |
| return (image[:,:,:,:3], 1-image[:,:,:,3], audio, video_info) | |
| return (image, torch.zeros(image.size(0), 64, 64, device="cpu"), audio, video_info) | |
| def IS_CHANGED(s, video, **kwargs): | |
| return hash_path(video) | |
| def VALIDATE_INPUTS(s, video): | |
| return validate_path(video, allow_none=True) | |
| class LoadImagePath: | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "image": ("STRING", {"placeholder": "X://insert/path/here.png", "vhs_path_extensions": list(FolderOfImages.IMG_EXTENSIONS)}), | |
| "custom_width": ("INT", {"default": 0, "min": 0, "max": DIMMAX, "step": 8, 'disable': 0}), | |
| "custom_height": ("INT", {"default": 0, "min": 0, "max": DIMMAX, "step": 8, 'disable': 0}), | |
| }, | |
| "optional": { | |
| "vae": ("VAE",), | |
| }, | |
| "hidden": { | |
| "force_size": "STRING", | |
| }, | |
| } | |
| CATEGORY = "Video Helper Suite 🎥🅥🅗🅢" | |
| RETURN_TYPES = (imageOrLatent, "MASK") | |
| RETURN_NAMES = ("IMAGE", "mask") | |
| FUNCTION = "load_image" | |
| def load_image(self, **kwargs): | |
| if kwargs['image'] is None or validate_path(kwargs['image']) != True: | |
| raise Exception("image is not a valid path: " + kwargs['image']) | |
| kwargs.update({'video': kwargs['image'], 'force_rate': 0, 'frame_load_cap': 0, | |
| 'start_time': 0}) | |
| kwargs.pop('image') | |
| image, _, _, _ = load_video(**kwargs, generator=ffmpeg_frame_generator) | |
| if isinstance(image, dict): | |
| return (image, None) | |
| if image.size(3) == 4: | |
| return (image[:,:,:,:3], 1-image[:,:,:,3]) | |
| return (image, torch.zeros(image.size(0), 64, 64, device="cpu")) | |
| def IS_CHANGED(s, image, **kwargs): | |
| return hash_path(image) | |
| def VALIDATE_INPUTS(s, image): | |
| return validate_path(image, allow_none=True) | |