from diffusers import EulerAncestralDiscreteScheduler, LCMScheduler from diffusers.utils import BaseOutput from diffusers.utils.torch_utils import randn_tensor import torch from typing import List, Optional, Tuple, Union import numpy as np from src.eunms import Epsilon_Update_Type # g_cpu = torch.Generator().manual_seed(7865) # noise = [randn_tensor((1, 4, 64, 64), dtype=torch.float16, device=torch.device("cuda:0"), generator=g_cpu) for i in range(4)] # for i, n in enumerate(noise): # torch.save(n, f"noise_{i}.pt") class EulerAncestralDiscreteSchedulerOutput(BaseOutput): """ Output class for the scheduler's `step` function output. Args: prev_sample (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)` for images): Computed sample `(x_{t-1})` of previous timestep. `prev_sample` should be used as next model input in the denoising loop. pred_original_sample (`torch.FloatTensor` of shape `(batch_size, num_channels, height, width)` for images): The predicted denoised sample `(x_{0})` based on the model output from the current timestep. `pred_original_sample` can be used to preview progress or for guidance. """ prev_sample: torch.FloatTensor pred_original_sample: Optional[torch.FloatTensor] = None class MyEulerAncestralDiscreteScheduler(EulerAncestralDiscreteScheduler): def set_noise_list(self, noise_list): self.noise_list = noise_list def get_noise_to_remove(self): sigma_from = self.sigmas[self.step_index] sigma_to = self.sigmas[self.step_index + 1] sigma_up = (sigma_to**2 * (sigma_from**2 - sigma_to**2) / sigma_from**2) ** 0.5 return self.noise_list[self.step_index] * sigma_up\ def scale_model_input( self, sample: torch.FloatTensor, timestep: Union[float, torch.FloatTensor] ) -> torch.FloatTensor: """ Ensures interchangeability with schedulers that need to scale the denoising model input depending on the current timestep. Scales the denoising model input by `(sigma**2 + 1) ** 0.5` to match the Euler algorithm. Args: sample (`torch.FloatTensor`): The input sample. timestep (`int`, *optional*): The current timestep in the diffusion chain. Returns: `torch.FloatTensor`: A scaled input sample. """ self._init_step_index(timestep.view((1))) return EulerAncestralDiscreteScheduler.scale_model_input(self, sample, timestep) def step( self, model_output: torch.FloatTensor, timestep: Union[float, torch.FloatTensor], sample: torch.FloatTensor, generator: Optional[torch.Generator] = None, return_dict: bool = True, ) -> Union[EulerAncestralDiscreteSchedulerOutput, Tuple]: """ Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion process from the learned model outputs (most often the predicted noise). Args: model_output (`torch.FloatTensor`): The direct output from learned diffusion model. timestep (`float`): The current discrete timestep in the diffusion chain. sample (`torch.FloatTensor`): A current instance of a sample created by the diffusion process. generator (`torch.Generator`, *optional*): A random number generator. return_dict (`bool`): Whether or not to return a [`~schedulers.scheduling_euler_ancestral_discrete.EulerAncestralDiscreteSchedulerOutput`] or tuple. Returns: [`~schedulers.scheduling_euler_ancestral_discrete.EulerAncestralDiscreteSchedulerOutput`] or `tuple`: If return_dict is `True`, [`~schedulers.scheduling_euler_ancestral_discrete.EulerAncestralDiscreteSchedulerOutput`] is returned, otherwise a tuple is returned where the first element is the sample tensor. """ if ( isinstance(timestep, int) or isinstance(timestep, torch.IntTensor) or isinstance(timestep, torch.LongTensor) ): raise ValueError( ( "Passing integer indices (e.g. from `enumerate(timesteps)`) as timesteps to" " `EulerDiscreteScheduler.step()` is not supported. Make sure to pass" " one of the `scheduler.timesteps` as a timestep." ), ) if not self.is_scale_input_called: logger.warning( "The `scale_model_input` function should be called before `step` to ensure correct denoising. " "See `StableDiffusionPipeline` for a usage example." ) self._init_step_index(timestep.view((1))) sigma = self.sigmas[self.step_index] # Upcast to avoid precision issues when computing prev_sample sample = sample.to(torch.float32) # 1. compute predicted original sample (x_0) from sigma-scaled predicted noise if self.config.prediction_type == "epsilon": pred_original_sample = sample - sigma * model_output elif self.config.prediction_type == "v_prediction": # * c_out + input * c_skip pred_original_sample = model_output * (-sigma / (sigma**2 + 1) ** 0.5) + (sample / (sigma**2 + 1)) elif self.config.prediction_type == "sample": raise NotImplementedError("prediction_type not implemented yet: sample") else: raise ValueError( f"prediction_type given as {self.config.prediction_type} must be one of `epsilon`, or `v_prediction`" ) sigma_from = self.sigmas[self.step_index] sigma_to = self.sigmas[self.step_index + 1] sigma_up = (sigma_to**2 * (sigma_from**2 - sigma_to**2) / sigma_from**2) ** 0.5 sigma_down = (sigma_to**2 - sigma_up**2) ** 0.5 # 2. Convert to an ODE derivative # derivative = (sample - pred_original_sample) / sigma derivative = model_output dt = sigma_down - sigma prev_sample = sample + derivative * dt device = model_output.device # noise = randn_tensor(model_output.shape, dtype=model_output.dtype, device=device, generator=generator) # prev_sample = prev_sample + noise * sigma_up prev_sample = prev_sample + self.noise_list[self.step_index] * sigma_up # Cast sample back to model compatible dtype prev_sample = prev_sample.to(model_output.dtype) # upon completion increase step index by one self._step_index += 1 if not return_dict: return (prev_sample,) return EulerAncestralDiscreteSchedulerOutput( prev_sample=prev_sample, pred_original_sample=pred_original_sample ) def step_and_update_noise( self, model_output: torch.FloatTensor, timestep: Union[float, torch.FloatTensor], sample: torch.FloatTensor, expected_prev_sample: torch.FloatTensor, update_epsilon_type=Epsilon_Update_Type.OVERRIDE, generator: Optional[torch.Generator] = None, return_dict: bool = True, ) -> Union[EulerAncestralDiscreteSchedulerOutput, Tuple]: """ Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion process from the learned model outputs (most often the predicted noise). Args: model_output (`torch.FloatTensor`): The direct output from learned diffusion model. timestep (`float`): The current discrete timestep in the diffusion chain. sample (`torch.FloatTensor`): A current instance of a sample created by the diffusion process. generator (`torch.Generator`, *optional*): A random number generator. return_dict (`bool`): Whether or not to return a [`~schedulers.scheduling_euler_ancestral_discrete.EulerAncestralDiscreteSchedulerOutput`] or tuple. Returns: [`~schedulers.scheduling_euler_ancestral_discrete.EulerAncestralDiscreteSchedulerOutput`] or `tuple`: If return_dict is `True`, [`~schedulers.scheduling_euler_ancestral_discrete.EulerAncestralDiscreteSchedulerOutput`] is returned, otherwise a tuple is returned where the first element is the sample tensor. """ if ( isinstance(timestep, int) or isinstance(timestep, torch.IntTensor) or isinstance(timestep, torch.LongTensor) ): raise ValueError( ( "Passing integer indices (e.g. from `enumerate(timesteps)`) as timesteps to" " `EulerDiscreteScheduler.step()` is not supported. Make sure to pass" " one of the `scheduler.timesteps` as a timestep." ), ) if not self.is_scale_input_called: logger.warning( "The `scale_model_input` function should be called before `step` to ensure correct denoising. " "See `StableDiffusionPipeline` for a usage example." ) self._init_step_index(timestep.view((1))) sigma = self.sigmas[self.step_index] # Upcast to avoid precision issues when computing prev_sample sample = sample.to(torch.float32) # 1. compute predicted original sample (x_0) from sigma-scaled predicted noise if self.config.prediction_type == "epsilon": pred_original_sample = sample - sigma * model_output elif self.config.prediction_type == "v_prediction": # * c_out + input * c_skip pred_original_sample = model_output * (-sigma / (sigma**2 + 1) ** 0.5) + (sample / (sigma**2 + 1)) elif self.config.prediction_type == "sample": raise NotImplementedError("prediction_type not implemented yet: sample") else: raise ValueError( f"prediction_type given as {self.config.prediction_type} must be one of `epsilon`, or `v_prediction`" ) sigma_from = self.sigmas[self.step_index] sigma_to = self.sigmas[self.step_index + 1] sigma_up = (sigma_to**2 * (sigma_from**2 - sigma_to**2) / sigma_from**2) ** 0.5 sigma_down = (sigma_to**2 - sigma_up**2) ** 0.5 # 2. Convert to an ODE derivative # derivative = (sample - pred_original_sample) / sigma derivative = model_output dt = sigma_down - sigma prev_sample = sample + derivative * dt device = model_output.device # noise = randn_tensor(model_output.shape, dtype=model_output.dtype, device=device, generator=generator) # prev_sample = prev_sample + noise * sigma_up if sigma_up > 0: req_noise = (expected_prev_sample - prev_sample) / sigma_up if update_epsilon_type == Epsilon_Update_Type.OVERRIDE: self.noise_list[self.step_index] = req_noise else: for i in range(10): n = torch.autograd.Variable(self.noise_list[self.step_index].detach().clone(), requires_grad=True) loss = torch.norm(n - req_noise.detach()) loss.backward() self.noise_list[self.step_index] -= n.grad.detach() * 1.8 prev_sample = prev_sample + self.noise_list[self.step_index] * sigma_up # Cast sample back to model compatible dtype prev_sample = prev_sample.to(model_output.dtype) # upon completion increase step index by one self._step_index += 1 if not return_dict: return (prev_sample,) return EulerAncestralDiscreteSchedulerOutput( prev_sample=prev_sample, pred_original_sample=pred_original_sample ) def inv_step( self, model_output: torch.FloatTensor, timestep: Union[float, torch.FloatTensor], sample: torch.FloatTensor, generator: Optional[torch.Generator] = None, return_dict: bool = True, ) -> Union[EulerAncestralDiscreteSchedulerOutput, Tuple]: """ Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion process from the learned model outputs (most often the predicted noise). Args: model_output (`torch.FloatTensor`): The direct output from learned diffusion model. timestep (`float`): The current discrete timestep in the diffusion chain. sample (`torch.FloatTensor`): A current instance of a sample created by the diffusion process. generator (`torch.Generator`, *optional*): A random number generator. return_dict (`bool`): Whether or not to return a [`~schedulers.scheduling_euler_ancestral_discrete.EulerAncestralDiscreteSchedulerOutput`] or tuple. Returns: [`~schedulers.scheduling_euler_ancestral_discrete.EulerAncestralDiscreteSchedulerOutput`] or `tuple`: If return_dict is `True`, [`~schedulers.scheduling_euler_ancestral_discrete.EulerAncestralDiscreteSchedulerOutput`] is returned, otherwise a tuple is returned where the first element is the sample tensor. """ if ( isinstance(timestep, int) or isinstance(timestep, torch.IntTensor) or isinstance(timestep, torch.LongTensor) ): raise ValueError( ( "Passing integer indices (e.g. from `enumerate(timesteps)`) as timesteps to" " `EulerDiscreteScheduler.step()` is not supported. Make sure to pass" " one of the `scheduler.timesteps` as a timestep." ), ) if not self.is_scale_input_called: logger.warning( "The `scale_model_input` function should be called before `step` to ensure correct denoising. " "See `StableDiffusionPipeline` for a usage example." ) self._init_step_index(timestep.view((1))) sigma = self.sigmas[self.step_index] # Upcast to avoid precision issues when computing prev_sample sample = sample.to(torch.float32) # 1. compute predicted original sample (x_0) from sigma-scaled predicted noise if self.config.prediction_type == "epsilon": pred_original_sample = sample - sigma * model_output elif self.config.prediction_type == "v_prediction": # * c_out + input * c_skip pred_original_sample = model_output * (-sigma / (sigma**2 + 1) ** 0.5) + (sample / (sigma**2 + 1)) elif self.config.prediction_type == "sample": raise NotImplementedError("prediction_type not implemented yet: sample") else: raise ValueError( f"prediction_type given as {self.config.prediction_type} must be one of `epsilon`, or `v_prediction`" ) sigma_from = self.sigmas[self.step_index] sigma_to = self.sigmas[self.step_index+1] # sigma_up = (sigma_to**2 * (sigma_from**2 - sigma_to**2) / sigma_from**2) ** 0.5 sigma_up = (sigma_to**2 * (sigma_from**2 - sigma_to**2).abs() / sigma_from**2) ** 0.5 # sigma_down = (sigma_to**2 - sigma_up**2) ** 0.5 sigma_down = sigma_to**2 / sigma_from # 2. Convert to an ODE derivative # derivative = (sample - pred_original_sample) / sigma derivative = model_output dt = sigma_down - sigma # dt = sigma_down - sigma_from prev_sample = sample - derivative * dt device = model_output.device # noise = randn_tensor(model_output.shape, dtype=model_output.dtype, device=device, generator=generator) # prev_sample = prev_sample + noise * sigma_up prev_sample = prev_sample - self.noise_list[self.step_index] * sigma_up # Cast sample back to model compatible dtype prev_sample = prev_sample.to(model_output.dtype) # upon completion increase step index by one self._step_index += 1 if not return_dict: return (prev_sample,) return EulerAncestralDiscreteSchedulerOutput( prev_sample=prev_sample, pred_original_sample=pred_original_sample ) def get_all_sigmas(self) -> torch.FloatTensor: sigmas = np.array(((1 - self.alphas_cumprod) / self.alphas_cumprod) ** 0.5) sigmas = np.concatenate([sigmas[::-1], [0.0]]).astype(np.float32) return torch.from_numpy(sigmas) def add_noise_off_schedule( self, original_samples: torch.FloatTensor, noise: torch.FloatTensor, timesteps: torch.FloatTensor, ) -> torch.FloatTensor: # Make sure sigmas and timesteps have the same device and dtype as original_samples sigmas = self.get_all_sigmas() sigmas = sigmas.to(device=original_samples.device, dtype=original_samples.dtype) if original_samples.device.type == "mps" and torch.is_floating_point(timesteps): # mps does not support float64 timesteps = timesteps.to(original_samples.device, dtype=torch.float32) else: timesteps = timesteps.to(original_samples.device) step_indices = 1000 - int(timesteps.item()) sigma = sigmas[step_indices].flatten() while len(sigma.shape) < len(original_samples.shape): sigma = sigma.unsqueeze(-1) noisy_samples = original_samples + noise * sigma return noisy_samples # def update_noise_for_friendly_inversion( # self, # model_output: torch.FloatTensor, # timestep: Union[float, torch.FloatTensor], # z_t: torch.FloatTensor, # z_tp1: torch.FloatTensor, # return_dict: bool = True, # ) -> Union[EulerAncestralDiscreteSchedulerOutput, Tuple]: # if ( # isinstance(timestep, int) # or isinstance(timestep, torch.IntTensor) # or isinstance(timestep, torch.LongTensor) # ): # raise ValueError( # ( # "Passing integer indices (e.g. from `enumerate(timesteps)`) as timesteps to" # " `EulerDiscreteScheduler.step()` is not supported. Make sure to pass" # " one of the `scheduler.timesteps` as a timestep." # ), # ) # if not self.is_scale_input_called: # logger.warning( # "The `scale_model_input` function should be called before `step` to ensure correct denoising. " # "See `StableDiffusionPipeline` for a usage example." # ) # self._init_step_index(timestep.view((1))) # sigma = self.sigmas[self.step_index] # sigma_from = self.sigmas[self.step_index] # sigma_to = self.sigmas[self.step_index+1] # # sigma_up = (sigma_to**2 * (sigma_from**2 - sigma_to**2) / sigma_from**2) ** 0.5 # sigma_up = (sigma_to**2 * (sigma_from**2 - sigma_to**2).abs() / sigma_from**2) ** 0.5 # # sigma_down = (sigma_to**2 - sigma_up**2) ** 0.5 # sigma_down = sigma_to**2 / sigma_from # # 2. Conv = (sample - pred_original_sample) / sigma # derivative = model_output # dt = sigma_down - sigma # # dt = sigma_down - sigma_from # prev_sample = z_t - derivative * dt # if sigma_up > 0: # self.noise_list[self.step_index] = (prev_sample - z_tp1) / sigma_up # prev_sample = prev_sample - self.noise_list[self.step_index] * sigma_up # if not return_dict: # return (prev_sample,) # return EulerAncestralDiscreteSchedulerOutput( # prev_sample=prev_sample, pred_original_sample=None # ) # def step_friendly_inversion( # self, # model_output: torch.FloatTensor, # timestep: Union[float, torch.FloatTensor], # sample: torch.FloatTensor, # generator: Optional[torch.Generator] = None, # return_dict: bool = True, # expected_next_sample: torch.FloatTensor = None, # ) -> Union[EulerAncestralDiscreteSchedulerOutput, Tuple]: # """ # Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion # process from the learned model outputs (most often the predicted noise). # Args: # model_output (`torch.FloatTensor`): # The direct output from learned diffusion model. # timestep (`float`): # The current discrete timestep in the diffusion chain. # sample (`torch.FloatTensor`): # A current instance of a sample created by the diffusion process. # generator (`torch.Generator`, *optional*): # A random number generator. # return_dict (`bool`): # Whether or not to return a # [`~schedulers.scheduling_euler_ancestral_discrete.EulerAncestralDiscreteSchedulerOutput`] or tuple. # Returns: # [`~schedulers.scheduling_euler_ancestral_discrete.EulerAncestralDiscreteSchedulerOutput`] or `tuple`: # If return_dict is `True`, # [`~schedulers.scheduling_euler_ancestral_discrete.EulerAncestralDiscreteSchedulerOutput`] is returned, # otherwise a tuple is returned where the first element is the sample tensor. # """ # if ( # isinstance(timestep, int) # or isinstance(timestep, torch.IntTensor) # or isinstance(timestep, torch.LongTensor) # ): # raise ValueError( # ( # "Passing integer indices (e.g. from `enumerate(timesteps)`) as timesteps to" # " `EulerDiscreteScheduler.step()` is not supported. Make sure to pass" # " one of the `scheduler.timesteps` as a timestep." # ), # ) # if not self.is_scale_input_called: # logger.warning( # "The `scale_model_input` function should be called before `step` to ensure correct denoising. " # "See `StableDiffusionPipeline` for a usage example." # ) # self._init_step_index(timestep.view((1))) # sigma = self.sigmas[self.step_index] # # Upcast to avoid precision issues when computing prev_sample # sample = sample.to(torch.float32) # # 1. compute predicted original sample (x_0) from sigma-scaled predicted noise # if self.config.prediction_type == "epsilon": # pred_original_sample = sample - sigma * model_output # elif self.config.prediction_type == "v_prediction": # # * c_out + input * c_skip # pred_original_sample = model_output * (-sigma / (sigma**2 + 1) ** 0.5) + (sample / (sigma**2 + 1)) # elif self.config.prediction_type == "sample": # raise NotImplementedError("prediction_type not implemented yet: sample") # else: # raise ValueError( # f"prediction_type given as {self.config.prediction_type} must be one of `epsilon`, or `v_prediction`" # ) # sigma_from = self.sigmas[self.step_index] # sigma_to = self.sigmas[self.step_index + 1] # sigma_up = (sigma_to**2 * (sigma_from**2 - sigma_to**2) / sigma_from**2) ** 0.5 # sigma_down = (sigma_to**2 - sigma_up**2) ** 0.5 # # 2. Convert to an ODE derivative # # derivative = (sample - pred_original_sample) / sigma # derivative = model_output # dt = sigma_down - sigma # prev_sample = sample + derivative * dt # device = model_output.device # # noise = randn_tensor(model_output.shape, dtype=model_output.dtype, device=device, generator=generator) # # prev_sample = prev_sample + noise * sigma_up # if sigma_up > 0: # self.noise_list[self.step_index] = (expected_next_sample - prev_sample) / sigma_up # prev_sample = prev_sample + self.noise_list[self.step_index] * sigma_up # # Cast sample back to model compatible dtype # prev_sample = prev_sample.to(model_output.dtype) # # upon completion increase step index by one # self._step_index += 1 # if not return_dict: # return (prev_sample,) # return EulerAncestralDiscreteSchedulerOutput( # prev_sample=prev_sample, pred_original_sample=pred_original_sample # )