Spaces:
Runtime error
Runtime error
| from io import BytesIO | |
| import cv2 | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| from ..log import log | |
| from ..utils import EASINGS, apply_easing, pil2tensor | |
| from .transform import MTB_TransformImage | |
| def hex_to_rgb(hex_color: str, bgr: bool = False): | |
| hex_color = hex_color.lstrip("#") | |
| if bgr: | |
| return tuple(int(hex_color[i : i + 2], 16) for i in (4, 2, 0)) | |
| return tuple(int(hex_color[i : i + 2], 16) for i in (0, 2, 4)) | |
| class MTB_BatchFloatMath: | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "reverse": ("BOOLEAN", {"default": False}), | |
| "operation": ( | |
| ["add", "sub", "mul", "div", "pow", "abs"], | |
| {"default": "add"}, | |
| ), | |
| } | |
| } | |
| RETURN_TYPES = ("FLOATS",) | |
| CATEGORY = "mtb/utils" | |
| FUNCTION = "execute" | |
| def execute(self, reverse: bool, operation: str, **kwargs: list[float]): | |
| res: list[float] = [] | |
| vals = list(kwargs.values()) | |
| if reverse: | |
| vals = vals[::-1] | |
| ref_count = len(vals[0]) | |
| for v in vals: | |
| if len(v) != ref_count: | |
| raise ValueError( | |
| f"All values must have the same length (current: {len(v)}, ref: {ref_count}" | |
| ) | |
| match operation: | |
| case "add": | |
| for i in range(ref_count): | |
| result = sum(v[i] for v in vals) | |
| res.append(result) | |
| case "sub": | |
| for i in range(ref_count): | |
| result = vals[0][i] - sum(v[i] for v in vals[1:]) | |
| res.append(result) | |
| case "mul": | |
| for i in range(ref_count): | |
| result = vals[0][i] * vals[1][i] | |
| res.append(result) | |
| case "div": | |
| for i in range(ref_count): | |
| result = vals[0][i] / vals[1][i] | |
| res.append(result) | |
| case "pow": | |
| for i in range(ref_count): | |
| result: float = vals[0][i] ** vals[1][i] | |
| res.append(result) | |
| case "abs": | |
| for i in range(ref_count): | |
| result = abs(vals[0][i]) | |
| res.append(result) | |
| case _: | |
| log.info(f"For now this mode ({operation}) is not implemented") | |
| return (res,) | |
| class MTB_BatchFloatNormalize: | |
| """Normalize the values in the list of floats""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": {"floats": ("FLOATS",)}, | |
| } | |
| RETURN_TYPES = ("FLOATS",) | |
| RETURN_NAMES = ("normalized_floats",) | |
| CATEGORY = "mtb/batch" | |
| FUNCTION = "execute" | |
| def execute( | |
| self, | |
| floats: list[float], | |
| ): | |
| min_value = min(floats) | |
| max_value = max(floats) | |
| normalized_floats = [ | |
| (x - min_value) / (max_value - min_value) for x in floats | |
| ] | |
| log.debug(f"Floats: {floats}") | |
| log.debug(f"Normalized Floats: {normalized_floats}") | |
| return (normalized_floats,) | |
| class MTB_BatchTimeWrap: | |
| """Remap a batch using a time curve (FLOATS)""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "target_count": ("INT", {"default": 25, "min": 2}), | |
| "frames": ("IMAGE",), | |
| "curve": ("FLOATS",), | |
| }, | |
| } | |
| RETURN_TYPES = ("IMAGE", "FLOATS") | |
| RETURN_NAMES = ("image", "interpolated_floats") | |
| CATEGORY = "mtb/batch" | |
| FUNCTION = "execute" | |
| def execute( | |
| self, target_count: int, frames: torch.Tensor, curve: list[float] | |
| ): | |
| """Apply time warping to a list of video frames based on a curve.""" | |
| log.debug(f"Input frames shape: {frames.shape}") | |
| log.debug(f"Curve: {curve}") | |
| total_duration = sum(curve) | |
| log.debug(f"Total duration: {total_duration}") | |
| B, H, W, C = frames.shape | |
| log.debug(f"Batch Size: {B}") | |
| normalized_times = np.linspace(0, 1, target_count) | |
| interpolated_curve = np.interp( | |
| normalized_times, np.linspace(0, 1, len(curve)), curve | |
| ).tolist() | |
| log.debug(f"Interpolated curve: {interpolated_curve}") | |
| interpolated_frame_indices = [ | |
| (B - 1) * value for value in interpolated_curve | |
| ] | |
| log.debug(f"Interpolated frame indices: {interpolated_frame_indices}") | |
| rounded_indices = [ | |
| int(round(idx)) for idx in interpolated_frame_indices | |
| ] | |
| rounded_indices = np.clip(rounded_indices, 0, B - 1) | |
| # Gather frames based on interpolated indices | |
| warped_frames = [] | |
| for index in rounded_indices: | |
| warped_frames.append(frames[index].unsqueeze(0)) | |
| warped_tensor = torch.cat(warped_frames, dim=0) | |
| log.debug(f"Warped frames shape: {warped_tensor.shape}") | |
| return (warped_tensor, interpolated_curve) | |
| class MTB_BatchMake: | |
| """Simply duplicates the input frame as a batch""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "image": ("IMAGE",), | |
| "count": ("INT", {"default": 1}), | |
| } | |
| } | |
| RETURN_TYPES = ("IMAGE",) | |
| FUNCTION = "generate_batch" | |
| CATEGORY = "mtb/batch" | |
| def generate_batch(self, image: torch.Tensor, count): | |
| if len(image.shape) == 3: | |
| image = image.unsqueeze(0) | |
| return (image.repeat(count, 1, 1, 1),) | |
| class MTB_BatchShape: | |
| """Generates a batch of 2D shapes with optional shading (experimental)""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "count": ("INT", {"default": 1}), | |
| "shape": ( | |
| ["Box", "Circle", "Diamond", "Tube"], | |
| {"default": "Circle"}, | |
| ), | |
| "image_width": ("INT", {"default": 512}), | |
| "image_height": ("INT", {"default": 512}), | |
| "shape_size": ("INT", {"default": 100}), | |
| "color": ("COLOR", {"default": "#ffffff"}), | |
| "bg_color": ("COLOR", {"default": "#000000"}), | |
| "shade_color": ("COLOR", {"default": "#000000"}), | |
| "thickness": ("INT", {"default": 5}), | |
| "shadex": ("FLOAT", {"default": 0.0}), | |
| "shadey": ("FLOAT", {"default": 0.0}), | |
| }, | |
| } | |
| RETURN_TYPES = ("IMAGE",) | |
| FUNCTION = "generate_shapes" | |
| CATEGORY = "mtb/batch" | |
| def generate_shapes( | |
| self, | |
| count, | |
| shape, | |
| image_width, | |
| image_height, | |
| shape_size, | |
| color, | |
| bg_color, | |
| shade_color, | |
| thickness, | |
| shadex, | |
| shadey, | |
| ): | |
| log.debug(f"COLOR: {color}") | |
| log.debug(f"BG_COLOR: {bg_color}") | |
| log.debug(f"SHADE_COLOR: {shade_color}") | |
| # Parse color input to BGR tuple for OpenCV | |
| color = hex_to_rgb(color) | |
| bg_color = hex_to_rgb(bg_color) | |
| shade_color = hex_to_rgb(shade_color) | |
| res = [] | |
| for x in range(count): | |
| # Initialize an image canvas | |
| canvas = np.full( | |
| (image_height, image_width, 3), bg_color, dtype=np.uint8 | |
| ) | |
| mask = np.zeros((image_height, image_width), dtype=np.uint8) | |
| # Compute the center point of the shape | |
| center = (image_width // 2, image_height // 2) | |
| if shape == "Box": | |
| half_size = shape_size // 2 | |
| top_left = (center[0] - half_size, center[1] - half_size) | |
| bottom_right = (center[0] + half_size, center[1] + half_size) | |
| cv2.rectangle(mask, top_left, bottom_right, 255, -1) | |
| elif shape == "Circle": | |
| cv2.circle(mask, center, shape_size // 2, 255, -1) | |
| elif shape == "Diamond": | |
| pts = np.array( | |
| [ | |
| [center[0], center[1] - shape_size // 2], | |
| [center[0] + shape_size // 2, center[1]], | |
| [center[0], center[1] + shape_size // 2], | |
| [center[0] - shape_size // 2, center[1]], | |
| ] | |
| ) | |
| cv2.fillPoly(mask, [pts], 255) | |
| elif shape == "Tube": | |
| cv2.ellipse( | |
| mask, | |
| center, | |
| (shape_size // 2, shape_size // 2), | |
| 0, | |
| 0, | |
| 360, | |
| 255, | |
| thickness, | |
| ) | |
| # Color the shape | |
| canvas[mask == 255] = color | |
| # Apply shading effects to a separate shading canvas | |
| shading = np.zeros_like(canvas, dtype=np.float32) | |
| shading[:, :, 0] = shadex * np.linspace(0, 1, image_width) | |
| shading[:, :, 1] = shadey * np.linspace( | |
| 0, 1, image_height | |
| ).reshape(-1, 1) | |
| shading_canvas = cv2.addWeighted( | |
| canvas.astype(np.float32), 1, shading, 1, 0 | |
| ).astype(np.uint8) | |
| # Apply shading only to the shape area using the mask | |
| canvas[mask == 255] = shading_canvas[mask == 255] | |
| res.append(canvas) | |
| return (pil2tensor(res),) | |
| class MTB_BatchFloatFill: | |
| """Fills a batch float with a single value until it reaches the target length""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "floats": ("FLOATS",), | |
| "direction": (["head", "tail"], {"default": "tail"}), | |
| "value": ("FLOAT", {"default": 0.0}), | |
| "count": ("INT", {"default": 1}), | |
| } | |
| } | |
| FUNCTION = "fill_floats" | |
| RETURN_TYPES = ("FLOATS",) | |
| CATEGORY = "mtb/batch" | |
| def fill_floats(self, floats, direction, value, count): | |
| size = len(floats) | |
| if size > count: | |
| raise ValueError( | |
| f"Size ({size}) is less then target count ({count})" | |
| ) | |
| rem = count - size | |
| if direction == "tail": | |
| floats = floats + [value] * rem | |
| else: | |
| floats = [value] * rem + floats | |
| return (floats,) | |
| class MTB_BatchFloatAssemble: | |
| """Assembles mutiple batches of floats into a single stream (batch)""" | |
| def INPUT_TYPES(cls): | |
| return {"required": {"reverse": ("BOOLEAN", {"default": False})}} | |
| RETURN_TYPES = ("FLOATS",) | |
| CATEGORY = "mtb/batch" | |
| FUNCTION = "assemble_floats" | |
| def assemble_floats(self, reverse: bool, **kwargs: list[float]): | |
| res: list[float] = [] | |
| if reverse: | |
| for x in reversed(kwargs.values()): | |
| if x: | |
| res += x | |
| else: | |
| for x in kwargs.values(): | |
| if x: | |
| res += x | |
| return (res,) | |
| class MTB_BatchFloat: | |
| """Generates a batch of float values with interpolation""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "mode": ( | |
| ["Single", "Steps"], | |
| {"default": "Steps"}, | |
| ), | |
| "count": ("INT", {"default": 2}), | |
| "min": ("FLOAT", {"default": 0.0, "step": 0.001}), | |
| "max": ("FLOAT", {"default": 1.0, "step": 0.001}), | |
| "easing": ( | |
| [ | |
| "Linear", | |
| "Sine In", | |
| "Sine Out", | |
| "Sine In/Out", | |
| "Quart In", | |
| "Quart Out", | |
| "Quart In/Out", | |
| "Cubic In", | |
| "Cubic Out", | |
| "Cubic In/Out", | |
| "Circ In", | |
| "Circ Out", | |
| "Circ In/Out", | |
| "Back In", | |
| "Back Out", | |
| "Back In/Out", | |
| "Elastic In", | |
| "Elastic Out", | |
| "Elastic In/Out", | |
| "Bounce In", | |
| "Bounce Out", | |
| "Bounce In/Out", | |
| ], | |
| {"default": "Linear"}, | |
| ), | |
| } | |
| } | |
| FUNCTION = "set_floats" | |
| RETURN_TYPES = ("FLOATS",) | |
| CATEGORY = "mtb/batch" | |
| def set_floats(self, mode, count, min, max, easing): | |
| if mode == "Steps" and count == 1: | |
| raise ValueError( | |
| "Steps mode requires at least a count of 2 values" | |
| ) | |
| keyframes = [] | |
| if mode == "Single": | |
| keyframes = [min] * count | |
| return (keyframes,) | |
| for i in range(count): | |
| normalized_step = i / (count - 1) | |
| eased_step = apply_easing(normalized_step, easing) | |
| eased_value = min + (max - min) * eased_step | |
| keyframes.append(eased_value) | |
| return (keyframes,) | |
| class MTB_BatchMerge: | |
| """Merges multiple image batches with different frame counts""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "fusion_mode": ( | |
| ["add", "multiply", "average"], | |
| {"default": "average"}, | |
| ), | |
| "fill": (["head", "tail"], {"default": "tail"}), | |
| } | |
| } | |
| RETURN_TYPES = ("IMAGE",) | |
| FUNCTION = "merge_batches" | |
| CATEGORY = "mtb/batch" | |
| def merge_batches(self, fusion_mode: str, fill: str, **kwargs): | |
| images = kwargs.values() | |
| max_frames = max(img.shape[0] for img in images) | |
| adjusted_images = [] | |
| for img in images: | |
| frame_count = img.shape[0] | |
| if frame_count < max_frames: | |
| fill_frame = img[0] if fill == "head" else img[-1] | |
| fill_frames = fill_frame.repeat( | |
| max_frames - frame_count, 1, 1, 1 | |
| ) | |
| adjusted_batch = ( | |
| torch.cat((fill_frames, img), dim=0) | |
| if fill == "head" | |
| else torch.cat((img, fill_frames), dim=0) | |
| ) | |
| else: | |
| adjusted_batch = img | |
| adjusted_images.append(adjusted_batch) | |
| # Merge the adjusted batches | |
| merged_image = None | |
| for img in adjusted_images: | |
| if merged_image is None: | |
| merged_image = img | |
| else: | |
| if fusion_mode == "add": | |
| merged_image += img | |
| elif fusion_mode == "multiply": | |
| merged_image *= img | |
| elif fusion_mode == "average": | |
| merged_image = (merged_image + img) / 2 | |
| return (merged_image,) | |
| class MTB_Batch2dTransform: | |
| """Transform a batch of images using a batch of keyframes""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "image": ("IMAGE",), | |
| "border_handling": ( | |
| ["edge", "constant", "reflect", "symmetric"], | |
| {"default": "edge"}, | |
| ), | |
| "constant_color": ("COLOR", {"default": "#000000"}), | |
| }, | |
| "optional": { | |
| "x": ("FLOATS",), | |
| "y": ("FLOATS",), | |
| "zoom": ("FLOATS",), | |
| "angle": ("FLOATS",), | |
| "shear": ("FLOATS",), | |
| }, | |
| } | |
| RETURN_TYPES = ("IMAGE",) | |
| FUNCTION = "transform_batch" | |
| CATEGORY = "mtb/batch" | |
| def get_num_elements( | |
| self, param: None | torch.Tensor | list[torch.Tensor] | list[float] | |
| ) -> int: | |
| if isinstance(param, torch.Tensor): | |
| return torch.numel(param) | |
| elif isinstance(param, list): | |
| return len(param) | |
| return 0 | |
| def transform_batch( | |
| self, | |
| image: torch.Tensor, | |
| border_handling: str, | |
| constant_color: str, | |
| x: list[float] | None = None, | |
| y: list[float] | None = None, | |
| zoom: list[float] | None = None, | |
| angle: list[float] | None = None, | |
| shear: list[float] | None = None, | |
| ): | |
| if all( | |
| self.get_num_elements(param) <= 0 | |
| for param in [x, y, zoom, angle, shear] | |
| ): | |
| raise ValueError( | |
| "At least one transform parameter must be provided" | |
| ) | |
| keyframes: dict[str, list[float]] = { | |
| "x": [], | |
| "y": [], | |
| "zoom": [], | |
| "angle": [], | |
| "shear": [], | |
| } | |
| default_vals = {"x": 0, "y": 0, "zoom": 1.0, "angle": 0, "shear": 0} | |
| if x and self.get_num_elements(x) > 0: | |
| keyframes["x"] = x | |
| if y and self.get_num_elements(y) > 0: | |
| keyframes["y"] = y | |
| if zoom and self.get_num_elements(zoom) > 0: | |
| # some easing types like elastic can pull back... maybe it should abs the value? | |
| keyframes["zoom"] = [max(x, 0.00001) for x in zoom] | |
| if angle and self.get_num_elements(angle) > 0: | |
| keyframes["angle"] = angle | |
| if shear and self.get_num_elements(shear) > 0: | |
| keyframes["shear"] = shear | |
| for name, values in keyframes.items(): | |
| count = len(values) | |
| if count > 0 and count != image.shape[0]: | |
| raise ValueError( | |
| f"Length of {name} values ({count}) must match number of images ({image.shape[0]})" | |
| ) | |
| if count == 0: | |
| keyframes[name] = [default_vals[name]] * image.shape[0] | |
| transformer = MTB_TransformImage() | |
| res = [ | |
| transformer.transform( | |
| image[i].unsqueeze(0), | |
| keyframes["x"][i], | |
| keyframes["y"][i], | |
| keyframes["zoom"][i], | |
| keyframes["angle"][i], | |
| keyframes["shear"][i], | |
| border_handling, | |
| constant_color, | |
| )[0] | |
| for i in range(image.shape[0]) | |
| ] | |
| return (torch.cat(res, dim=0),) | |
| class MTB_BatchFloatFit: | |
| """Fit a list of floats using a source and target range""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "values": ("FLOATS", {"forceInput": True}), | |
| "clamp": ("BOOLEAN", {"default": False}), | |
| "auto_compute_source": ("BOOLEAN", {"default": False}), | |
| "source_min": ("FLOAT", {"default": 0.0, "step": 0.01}), | |
| "source_max": ("FLOAT", {"default": 1.0, "step": 0.01}), | |
| "target_min": ("FLOAT", {"default": 0.0, "step": 0.01}), | |
| "target_max": ("FLOAT", {"default": 1.0, "step": 0.01}), | |
| "easing": ( | |
| EASINGS, | |
| {"default": "Linear"}, | |
| ), | |
| } | |
| } | |
| FUNCTION = "fit_range" | |
| RETURN_TYPES = ("FLOATS",) | |
| CATEGORY = "mtb/batch" | |
| DESCRIPTION = "Fit a list of floats using a source and target range" | |
| def fit_range( | |
| self, | |
| values: list[float], | |
| clamp: bool, | |
| auto_compute_source: bool, | |
| source_min: float, | |
| source_max: float, | |
| target_min: float, | |
| target_max: float, | |
| easing: str, | |
| ): | |
| if auto_compute_source: | |
| source_min = min(values) | |
| source_max = max(values) | |
| from .graph_utils import MTB_FitNumber | |
| res = [] | |
| fit_number = MTB_FitNumber() | |
| for value in values: | |
| (transformed_value,) = fit_number.set_range( | |
| value, | |
| clamp, | |
| source_min, | |
| source_max, | |
| target_min, | |
| target_max, | |
| easing, | |
| ) | |
| res.append(transformed_value) | |
| return (res,) | |
| class MTB_PlotBatchFloat: | |
| """Plot floats""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "width": ("INT", {"default": 768}), | |
| "height": ("INT", {"default": 768}), | |
| "point_size": ("INT", {"default": 4}), | |
| "seed": ("INT", {"default": 1}), | |
| "start_at_zero": ("BOOLEAN", {"default": False}), | |
| } | |
| } | |
| RETURN_TYPES = ("IMAGE",) | |
| RETURN_NAMES = ("plot",) | |
| FUNCTION = "plot" | |
| CATEGORY = "mtb/batch" | |
| def plot( | |
| self, | |
| width: int, | |
| height: int, | |
| point_size: int, | |
| seed: int, | |
| start_at_zero: bool, | |
| interactive_backend: bool = False, | |
| **kwargs, | |
| ): | |
| import matplotlib | |
| # NOTE: This is for notebook usage or tests, i.e not exposed to comfy that should always use Agg | |
| if not interactive_backend: | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| fig, ax = plt.subplots(figsize=(width / 100, height / 100), dpi=100) | |
| fig.set_edgecolor("black") | |
| fig.patch.set_facecolor("#2e2e2e") | |
| # Setting background color and grid | |
| ax.set_facecolor("#2e2e2e") # Dark gray background | |
| ax.grid(color="gray", linestyle="-", linewidth=0.5, alpha=0.5) | |
| # Finding global min and max across all lists for scaling the plot | |
| all_values = [value for values in kwargs.values() for value in values] | |
| global_min = min(all_values) | |
| global_max = max(all_values) | |
| y_padding = 0.05 * (global_max - global_min) | |
| ax.set_ylim(global_min - y_padding, global_max + y_padding) | |
| max_length = max(len(values) for values in kwargs.values()) | |
| if start_at_zero: | |
| x_values = np.linspace(0, max_length - 1, max_length) | |
| else: | |
| x_values = np.linspace(1, max_length, max_length) | |
| ax.set_xlim(1, max_length) # Set X-axis limits | |
| np.random.seed(seed) | |
| colors = np.random.rand(len(kwargs), 3) # Generate random RGB values | |
| for color, (label, values) in zip(colors, kwargs.items()): | |
| ax.plot(x_values[: len(values)], values, label=label, color=color) | |
| ax.legend( | |
| title="Legend", | |
| title_fontsize="large", | |
| fontsize="medium", | |
| edgecolor="black", | |
| loc="best", | |
| ) | |
| # Setting labels and title | |
| ax.set_xlabel("Time", fontsize="large", color="white") | |
| ax.set_ylabel("Value", fontsize="large", color="white") | |
| ax.set_title( | |
| "Plot of Values over Time", fontsize="x-large", color="white" | |
| ) | |
| # Adjusting tick colors to be visible on dark background | |
| ax.tick_params(colors="white") | |
| # Changing color of the axes border | |
| for _, spine in ax.spines.items(): | |
| spine.set_edgecolor("white") | |
| # Rendering the plot into a NumPy array | |
| buf = BytesIO() | |
| plt.savefig(buf, format="png", bbox_inches="tight") | |
| buf.seek(0) | |
| image = Image.open(buf) | |
| plt.close(fig) # Closing the figure to free up memory | |
| return (pil2tensor(image),) | |
| def draw_point(self, image, point, color, point_size): | |
| x, y = point | |
| y = image.shape[0] - 1 - y # Invert Y-coordinate | |
| half_size = point_size // 2 | |
| x_start, x_end = ( | |
| max(0, x - half_size), | |
| min(image.shape[1], x + half_size + 1), | |
| ) | |
| y_start, y_end = ( | |
| max(0, y - half_size), | |
| min(image.shape[0], y + half_size + 1), | |
| ) | |
| image[y_start:y_end, x_start:x_end] = color | |
| def draw_line(self, image, start, end, color): | |
| x1, y1 = start | |
| x2, y2 = end | |
| # Invert Y-coordinate | |
| y1 = image.shape[0] - 1 - y1 | |
| y2 = image.shape[0] - 1 - y2 | |
| dx = x2 - x1 | |
| dy = y2 - y1 | |
| is_steep = abs(dy) > abs(dx) | |
| if is_steep: | |
| x1, y1 = y1, x1 | |
| x2, y2 = y2, x2 | |
| swapped = False | |
| if x1 > x2: | |
| x1, x2 = x2, x1 | |
| y1, y2 = y2, y1 | |
| swapped = True | |
| dx = x2 - x1 | |
| dy = y2 - y1 | |
| error = int(dx / 2.0) | |
| y = y1 | |
| ystep = None | |
| if y1 < y2: | |
| ystep = 1 | |
| else: | |
| ystep = -1 | |
| for x in range(x1, x2 + 1): | |
| coord = (y, x) if is_steep else (x, y) | |
| image[coord] = color | |
| error -= abs(dy) | |
| if error < 0: | |
| y += ystep | |
| error += dx | |
| if swapped: | |
| image[(x1, y1)] = color | |
| image[(x2, y2)] = color | |
| DEFAULT_INTERPOLANT = lambda t: t * t * t * (t * (t * 6 - 15) + 10) | |
| class MTB_BatchShake: | |
| """Applies a shaking effect to batches of images.""" | |
| def INPUT_TYPES(cls): | |
| return { | |
| "required": { | |
| "images": ("IMAGE",), | |
| "position_amount_x": ("FLOAT", {"default": 1.0}), | |
| "position_amount_y": ("FLOAT", {"default": 1.0}), | |
| "rotation_amount": ("FLOAT", {"default": 10.0}), | |
| "frequency": ("FLOAT", {"default": 1.0, "min": 0.005}), | |
| "frequency_divider": ("FLOAT", {"default": 1.0, "min": 0.005}), | |
| "octaves": ("INT", {"default": 1, "min": 1}), | |
| "seed": ("INT", {"default": 0}), | |
| }, | |
| } | |
| RETURN_TYPES = ("IMAGE", "FLOATS", "FLOATS", "FLOATS") | |
| RETURN_NAMES = ("image", "pos_x", "pos_y", "rot") | |
| FUNCTION = "apply_shake" | |
| CATEGORY = "mtb/batch" | |
| # def interpolant(self, t): | |
| # return t * t * t * (t * (t * 6 - 15) + 10) | |
| def generate_perlin_noise_2d( | |
| self, shape, res, tileable=(False, False), interpolant=None | |
| ): | |
| """Generate a 2D numpy array of perlin noise. | |
| Args: | |
| shape: The shape of the generated array (tuple of two ints). | |
| This must be a multple of res. | |
| res: The number of periods of noise to generate along each | |
| axis (tuple of two ints). Note shape must be a multiple of | |
| res. | |
| tileable: If the noise should be tileable along each axis | |
| (tuple of two bools). Defaults to (False, False). | |
| interpolant: The interpolation function, defaults to | |
| t*t*t*(t*(t*6 - 15) + 10). | |
| Returns | |
| ------- | |
| A numpy array of shape shape with the generated noise. | |
| Raises | |
| ------ | |
| ValueError: If shape is not a multiple of res. | |
| """ | |
| interpolant = interpolant or DEFAULT_INTERPOLANT | |
| delta = (res[0] / shape[0], res[1] / shape[1]) | |
| d = (shape[0] // res[0], shape[1] // res[1]) | |
| grid = ( | |
| np.mgrid[0 : res[0] : delta[0], 0 : res[1] : delta[1]].transpose( | |
| 1, 2, 0 | |
| ) | |
| % 1 | |
| ) | |
| # Gradients | |
| angles = 2 * np.pi * np.random.rand(res[0] + 1, res[1] + 1) | |
| gradients = np.dstack((np.cos(angles), np.sin(angles))) | |
| if tileable[0]: | |
| gradients[-1, :] = gradients[0, :] | |
| if tileable[1]: | |
| gradients[:, -1] = gradients[:, 0] | |
| gradients = gradients.repeat(d[0], 0).repeat(d[1], 1) | |
| g00 = gradients[: -d[0], : -d[1]] | |
| g10 = gradients[d[0] :, : -d[1]] | |
| g01 = gradients[: -d[0], d[1] :] | |
| g11 = gradients[d[0] :, d[1] :] | |
| # Ramps | |
| n00 = np.sum(np.dstack((grid[:, :, 0], grid[:, :, 1])) * g00, 2) | |
| n10 = np.sum(np.dstack((grid[:, :, 0] - 1, grid[:, :, 1])) * g10, 2) | |
| n01 = np.sum(np.dstack((grid[:, :, 0], grid[:, :, 1] - 1)) * g01, 2) | |
| n11 = np.sum( | |
| np.dstack((grid[:, :, 0] - 1, grid[:, :, 1] - 1)) * g11, 2 | |
| ) | |
| # Interpolation | |
| t = interpolant(grid) | |
| n0 = n00 * (1 - t[:, :, 0]) + t[:, :, 0] * n10 | |
| n1 = n01 * (1 - t[:, :, 0]) + t[:, :, 0] * n11 | |
| return np.sqrt(2) * ((1 - t[:, :, 1]) * n0 + t[:, :, 1] * n1) | |
| def generate_fractal_noise_2d( | |
| self, | |
| shape, | |
| res, | |
| octaves=1, | |
| persistence=0.5, | |
| lacunarity=2, | |
| tileable=(True, True), | |
| interpolant=None, | |
| ): | |
| """Generate a 2D numpy array of fractal noise. | |
| Args: | |
| shape: The shape of the generated array (tuple of two ints). | |
| This must be a multiple of lacunarity**(octaves-1)*res. | |
| res: The number of periods of noise to generate along each | |
| axis (tuple of two ints). Note shape must be a multiple of | |
| (lacunarity**(octaves-1)*res). | |
| octaves: The number of octaves in the noise. Defaults to 1. | |
| persistence: The scaling factor between two octaves. | |
| lacunarity: The frequency factor between two octaves. | |
| tileable: If the noise should be tileable along each axis | |
| (tuple of two bools). Defaults to (True,True). | |
| interpolant: The, interpolation function, defaults to | |
| t*t*t*(t*(t*6 - 15) + 10). | |
| Returns | |
| ------- | |
| A numpy array of fractal noise and of shape shape generated by | |
| combining several octaves of perlin noise. | |
| Raises | |
| ------ | |
| ValueError: If shape is not a multiple of | |
| (lacunarity**(octaves-1)*res). | |
| """ | |
| interpolant = interpolant or DEFAULT_INTERPOLANT | |
| noise = np.zeros(shape) | |
| frequency = 1 | |
| amplitude = 1 | |
| for _ in range(octaves): | |
| noise += amplitude * self.generate_perlin_noise_2d( | |
| shape, | |
| (frequency * res[0], frequency * res[1]), | |
| tileable, | |
| interpolant, | |
| ) | |
| frequency *= lacunarity | |
| amplitude *= persistence | |
| return noise | |
| def fbm(self, x, y, octaves): | |
| # noise_2d = self.generate_fractal_noise_2d((256, 256), (8, 8), octaves) | |
| # Now, extract a single noise value based on x and y, wrapping indices if necessary | |
| x_idx = int(x) % 256 | |
| y_idx = int(y) % 256 | |
| return self.noise_pattern[x_idx, y_idx] | |
| def apply_shake( | |
| self, | |
| images, | |
| position_amount_x, | |
| position_amount_y, | |
| rotation_amount, | |
| frequency, | |
| frequency_divider, | |
| octaves, | |
| seed, | |
| ): | |
| # Rehash | |
| np.random.seed(seed) | |
| self.position_offset = np.random.uniform(-1e3, 1e3, 3) | |
| self.rotation_offset = np.random.uniform(-1e3, 1e3, 3) | |
| self.noise_pattern = self.generate_perlin_noise_2d( | |
| (512, 512), (32, 32), (True, True) | |
| ) | |
| # Assuming frame count is derived from the first dimension of images tensor | |
| frame_count = images.shape[0] | |
| frequency = frequency / frequency_divider | |
| # Generate shaking parameters for each frame | |
| x_translations = [] | |
| y_translations = [] | |
| rotations = [] | |
| for frame_num in range(frame_count): | |
| time = frame_num * frequency | |
| x_idx = (self.position_offset[0] + frame_num) % 256 | |
| y_idx = (self.position_offset[1] + frame_num) % 256 | |
| np_position = np.array( | |
| [ | |
| self.fbm(x_idx, time, octaves), | |
| self.fbm(y_idx, time, octaves), | |
| ] | |
| ) | |
| # np_position = np.array( | |
| # [ | |
| # self.fbm(self.position_offset[0] + frame_num, time, octaves), | |
| # self.fbm(self.position_offset[1] + frame_num, time, octaves), | |
| # ] | |
| # ) | |
| # np_rotation = self.fbm(self.rotation_offset[2] + frame_num, time, octaves) | |
| rot_idx = (self.rotation_offset[2] + frame_num) % 256 | |
| np_rotation = self.fbm(rot_idx, time, octaves) | |
| x_translations.append(np_position[0] * position_amount_x) | |
| y_translations.append(np_position[1] * position_amount_y) | |
| rotations.append(np_rotation * rotation_amount) | |
| # Convert lists to tensors | |
| # x_translations = torch.tensor(x_translations, dtype=torch.float32) | |
| # y_translations = torch.tensor(y_translations, dtype=torch.float32) | |
| # rotations = torch.tensor(rotations, dtype=torch.float32) | |
| # Create an instance of Batch2dTransform | |
| transform = MTB_Batch2dTransform() | |
| log.debug( | |
| f"Applying shaking with parameters: \nposition {position_amount_x}, {position_amount_y}\nrotation {rotation_amount}\nfrequency {frequency}\noctaves {octaves}" | |
| ) | |
| # Apply shaking transformations to images | |
| shaken_images = transform.transform_batch( | |
| images, | |
| border_handling="edge", # Assuming edge handling as default | |
| constant_color="#000000", # Assuming black as default constant color | |
| x=x_translations, | |
| y=y_translations, | |
| angle=rotations, | |
| )[0] | |
| return (shaken_images, x_translations, y_translations, rotations) | |
| __nodes__ = [ | |
| MTB_BatchFloat, | |
| MTB_Batch2dTransform, | |
| MTB_BatchShape, | |
| MTB_BatchMake, | |
| MTB_BatchFloatAssemble, | |
| MTB_BatchFloatFill, | |
| MTB_BatchFloatNormalize, | |
| MTB_BatchMerge, | |
| MTB_BatchShake, | |
| MTB_PlotBatchFloat, | |
| MTB_BatchTimeWrap, | |
| MTB_BatchFloatFit, | |
| MTB_BatchFloatMath, | |
| ] | |