Unique3D / custum_3d_diffusion /custum_modules /attention_processors.py
Wuvin's picture
init
37aeb5b
from typing import Any, Dict, Optional
import torch
from diffusers.models.attention_processor import Attention
def construct_pix2pix_attention(hidden_states_dim, norm_type="none"):
if norm_type == "layernorm":
norm = torch.nn.LayerNorm(hidden_states_dim)
else:
norm = torch.nn.Identity()
attention = Attention(
query_dim=hidden_states_dim,
heads=8,
dim_head=hidden_states_dim // 8,
bias=True,
)
# NOTE: xformers 0.22 does not support batchsize >= 4096
attention.xformers_not_supported = True # hacky solution
return norm, attention
class ExtraAttnProc(torch.nn.Module):
def __init__(
self,
chained_proc,
enabled=False,
name=None,
mode='extract',
with_proj_in=False,
proj_in_dim=768,
target_dim=None,
pixel_wise_crosspond=False,
norm_type="none", # none or layernorm
crosspond_effect_on="all", # all or first
crosspond_chain_pos="parralle", # before or parralle or after
simple_3d=False,
views=4,
) -> None:
super().__init__()
self.enabled = enabled
self.chained_proc = chained_proc
self.name = name
self.mode = mode
self.with_proj_in=with_proj_in
self.proj_in_dim = proj_in_dim
self.target_dim = target_dim or proj_in_dim
self.hidden_states_dim = self.target_dim
self.pixel_wise_crosspond = pixel_wise_crosspond
self.crosspond_effect_on = crosspond_effect_on
self.crosspond_chain_pos = crosspond_chain_pos
self.views = views
self.simple_3d = simple_3d
if self.with_proj_in and self.enabled:
self.in_linear = torch.nn.Linear(self.proj_in_dim, self.target_dim, bias=False)
if self.target_dim == self.proj_in_dim:
self.in_linear.weight.data = torch.eye(proj_in_dim)
else:
self.in_linear = None
if self.pixel_wise_crosspond and self.enabled:
self.crosspond_norm, self.crosspond_attention = construct_pix2pix_attention(self.hidden_states_dim, norm_type=norm_type)
def do_crosspond_attention(self, hidden_states: torch.FloatTensor, other_states: torch.FloatTensor):
hidden_states = self.crosspond_norm(hidden_states)
batch, L, D = hidden_states.shape
assert hidden_states.shape == other_states.shape, f"got {hidden_states.shape} and {other_states.shape}"
# to -> batch * L, 1, D
hidden_states = hidden_states.reshape(batch * L, 1, D)
other_states = other_states.reshape(batch * L, 1, D)
hidden_states_catted = other_states
hidden_states = self.crosspond_attention(
hidden_states,
encoder_hidden_states=hidden_states_catted,
)
return hidden_states.reshape(batch, L, D)
def __call__(
self, attn: Attention, hidden_states, encoder_hidden_states=None, attention_mask=None,
ref_dict: dict = None, mode=None, **kwargs
) -> Any:
if not self.enabled:
return self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs)
if encoder_hidden_states is None:
encoder_hidden_states = hidden_states
assert ref_dict is not None
if (mode or self.mode) == 'extract':
ref_dict[self.name] = hidden_states
hidden_states1 = self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs)
if self.pixel_wise_crosspond and self.crosspond_chain_pos == "after":
ref_dict[self.name] = hidden_states1
return hidden_states1
elif (mode or self.mode) == 'inject':
ref_state = ref_dict.pop(self.name)
if self.with_proj_in:
ref_state = self.in_linear(ref_state)
B, L, D = ref_state.shape
if hidden_states.shape[0] == B:
modalities = 1
views = 1
else:
modalities = hidden_states.shape[0] // B // self.views
views = self.views
if self.pixel_wise_crosspond:
if self.crosspond_effect_on == "all":
ref_state = ref_state[:, None].expand(-1, modalities * views, -1, -1).reshape(-1, *ref_state.shape[-2:])
if self.crosspond_chain_pos == "before":
hidden_states = hidden_states + self.do_crosspond_attention(hidden_states, ref_state)
hidden_states1 = self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs)
if self.crosspond_chain_pos == "parralle":
hidden_states1 = hidden_states1 + self.do_crosspond_attention(hidden_states, ref_state)
if self.crosspond_chain_pos == "after":
hidden_states1 = hidden_states1 + self.do_crosspond_attention(hidden_states1, ref_state)
return hidden_states1
else:
assert self.crosspond_effect_on == "first"
# hidden_states [B * modalities * views, L, D]
# ref_state [B, L, D]
ref_state = ref_state[:, None].expand(-1, modalities, -1, -1).reshape(-1, ref_state.shape[-2], ref_state.shape[-1]) # [B * modalities, L, D]
def do_paritial_crosspond(hidden_states, ref_state):
first_view_hidden_states = hidden_states.view(-1, views, hidden_states.shape[1], hidden_states.shape[2])[:, 0] # [B * modalities, L, D]
hidden_states2 = self.do_crosspond_attention(first_view_hidden_states, ref_state) # [B * modalities, L, D]
hidden_states2_padded = torch.zeros_like(hidden_states).reshape(-1, views, hidden_states.shape[1], hidden_states.shape[2])
hidden_states2_padded[:, 0] = hidden_states2
hidden_states2_padded = hidden_states2_padded.reshape(-1, hidden_states.shape[1], hidden_states.shape[2])
return hidden_states2_padded
if self.crosspond_chain_pos == "before":
hidden_states = hidden_states + do_paritial_crosspond(hidden_states, ref_state)
hidden_states1 = self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs) # [B * modalities * views, L, D]
if self.crosspond_chain_pos == "parralle":
hidden_states1 = hidden_states1 + do_paritial_crosspond(hidden_states, ref_state)
if self.crosspond_chain_pos == "after":
hidden_states1 = hidden_states1 + do_paritial_crosspond(hidden_states1, ref_state)
return hidden_states1
elif self.simple_3d:
B, L, C = encoder_hidden_states.shape
mv = self.views
encoder_hidden_states = encoder_hidden_states.reshape(B // mv, mv, L, C)
ref_state = ref_state[:, None]
encoder_hidden_states = torch.cat([encoder_hidden_states, ref_state], dim=1)
encoder_hidden_states = encoder_hidden_states.reshape(B // mv, 1, (mv+1) * L, C)
encoder_hidden_states = encoder_hidden_states.repeat(1, mv, 1, 1).reshape(-1, (mv+1) * L, C)
return self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs)
else:
ref_state = ref_state[:, None].expand(-1, modalities * views, -1, -1).reshape(-1, ref_state.shape[-2], ref_state.shape[-1])
encoder_hidden_states = torch.cat([encoder_hidden_states, ref_state], dim=1)
return self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs)
else:
raise NotImplementedError("mode or self.mode is required to be 'extract' or 'inject'")
def add_extra_processor(model: torch.nn.Module, enable_filter=lambda x:True, **kwargs):
return_dict = torch.nn.ModuleDict()
proj_in_dim = kwargs.get('proj_in_dim', False)
kwargs.pop('proj_in_dim', None)
def recursive_add_processors(name: str, module: torch.nn.Module):
for sub_name, child in module.named_children():
if "ref_unet" not in (sub_name + name):
recursive_add_processors(f"{name}.{sub_name}", child)
if isinstance(module, Attention):
new_processor = ExtraAttnProc(
chained_proc=module.get_processor(),
enabled=enable_filter(f"{name}.processor"),
name=f"{name}.processor",
proj_in_dim=proj_in_dim if proj_in_dim else module.cross_attention_dim,
target_dim=module.cross_attention_dim,
**kwargs
)
module.set_processor(new_processor)
return_dict[f"{name}.processor".replace(".", "__")] = new_processor
for name, module in model.named_children():
recursive_add_processors(name, module)
return return_dict
def switch_extra_processor(model, enable_filter=lambda x:True):
def recursive_add_processors(name: str, module: torch.nn.Module):
for sub_name, child in module.named_children():
recursive_add_processors(f"{name}.{sub_name}", child)
if isinstance(module, ExtraAttnProc):
module.enabled = enable_filter(name)
for name, module in model.named_children():
recursive_add_processors(name, module)
class multiviewAttnProc(torch.nn.Module):
def __init__(
self,
chained_proc,
enabled=False,
name=None,
hidden_states_dim=None,
chain_pos="parralle", # before or parralle or after
num_modalities=1,
views=4,
base_img_size=64,
) -> None:
super().__init__()
self.enabled = enabled
self.chained_proc = chained_proc
self.name = name
self.hidden_states_dim = hidden_states_dim
self.num_modalities = num_modalities
self.views = views
self.base_img_size = base_img_size
self.chain_pos = chain_pos
self.diff_joint_attn = True
def __call__(
self,
attn: Attention,
hidden_states: torch.FloatTensor,
encoder_hidden_states: Optional[torch.FloatTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
**kwargs
) -> torch.Tensor:
if not self.enabled:
return self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs)
B, L, C = hidden_states.shape
mv = self.views
hidden_states = hidden_states.reshape(B // mv, mv, L, C).reshape(-1, mv * L, C)
hidden_states = self.chained_proc(attn, hidden_states, encoder_hidden_states, attention_mask, **kwargs)
return hidden_states.reshape(B // mv, mv, L, C).reshape(-1, L, C)
def add_multiview_processor(model: torch.nn.Module, enable_filter=lambda x:True, **kwargs):
return_dict = torch.nn.ModuleDict()
def recursive_add_processors(name: str, module: torch.nn.Module):
for sub_name, child in module.named_children():
if "ref_unet" not in (sub_name + name):
recursive_add_processors(f"{name}.{sub_name}", child)
if isinstance(module, Attention):
new_processor = multiviewAttnProc(
chained_proc=module.get_processor(),
enabled=enable_filter(f"{name}.processor"),
name=f"{name}.processor",
hidden_states_dim=module.inner_dim,
**kwargs
)
module.set_processor(new_processor)
return_dict[f"{name}.processor".replace(".", "__")] = new_processor
for name, module in model.named_children():
recursive_add_processors(name, module)
return return_dict
def switch_multiview_processor(model, enable_filter=lambda x:True):
def recursive_add_processors(name: str, module: torch.nn.Module):
for sub_name, child in module.named_children():
recursive_add_processors(f"{name}.{sub_name}", child)
if isinstance(module, Attention):
processor = module.get_processor()
if isinstance(processor, multiviewAttnProc):
processor.enabled = enable_filter(f"{name}.processor")
for name, module in model.named_children():
recursive_add_processors(name, module)
class NNModuleWrapper(torch.nn.Module):
def __init__(self, module):
super().__init__()
self.module = module
def forward(self, *args, **kwargs):
return self.module(*args, **kwargs)
def __getattr__(self, name: str):
try:
return super().__getattr__(name)
except AttributeError:
return getattr(self.module, name)
class AttnProcessorSwitch(torch.nn.Module):
def __init__(
self,
proc_dict: dict,
enabled_proc="default",
name=None,
switch_name="default_switch",
):
super().__init__()
self.proc_dict = torch.nn.ModuleDict({k: (v if isinstance(v, torch.nn.Module) else NNModuleWrapper(v)) for k, v in proc_dict.items()})
self.enabled_proc = enabled_proc
self.name = name
self.switch_name = switch_name
self.choose_module(enabled_proc)
def choose_module(self, enabled_proc):
self.enabled_proc = enabled_proc
assert enabled_proc in self.proc_dict.keys()
def __call__(
self,
*args,
**kwargs
) -> torch.FloatTensor:
used_proc = self.proc_dict[self.enabled_proc]
return used_proc(*args, **kwargs)
def add_switch(model: torch.nn.Module, module_filter=lambda x:True, switch_dict_fn=lambda x: {"default": x}, switch_name="default_switch", enabled_proc="default"):
return_dict = torch.nn.ModuleDict()
def recursive_add_processors(name: str, module: torch.nn.Module):
for sub_name, child in module.named_children():
if "ref_unet" not in (sub_name + name):
recursive_add_processors(f"{name}.{sub_name}", child)
if isinstance(module, Attention):
processor = module.get_processor()
if module_filter(processor):
proc_dict = switch_dict_fn(processor)
new_processor = AttnProcessorSwitch(
proc_dict=proc_dict,
enabled_proc=enabled_proc,
name=f"{name}.processor",
switch_name=switch_name,
)
module.set_processor(new_processor)
return_dict[f"{name}.processor".replace(".", "__")] = new_processor
for name, module in model.named_children():
recursive_add_processors(name, module)
return return_dict
def change_switch(model: torch.nn.Module, switch_name="default_switch", enabled_proc="default"):
def recursive_change_processors(name: str, module: torch.nn.Module):
for sub_name, child in module.named_children():
recursive_change_processors(f"{name}.{sub_name}", child)
if isinstance(module, Attention):
processor = module.get_processor()
if isinstance(processor, AttnProcessorSwitch) and processor.switch_name == switch_name:
processor.choose_module(enabled_proc)
for name, module in model.named_children():
recursive_change_processors(name, module)
########## Hack: Attention fix #############
from diffusers.models.attention import Attention
def forward(
self,
hidden_states: torch.FloatTensor,
encoder_hidden_states: Optional[torch.FloatTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
**cross_attention_kwargs,
) -> torch.Tensor:
r"""
The forward method of the `Attention` class.
Args:
hidden_states (`torch.Tensor`):
The hidden states of the query.
encoder_hidden_states (`torch.Tensor`, *optional*):
The hidden states of the encoder.
attention_mask (`torch.Tensor`, *optional*):
The attention mask to use. If `None`, no mask is applied.
**cross_attention_kwargs:
Additional keyword arguments to pass along to the cross attention.
Returns:
`torch.Tensor`: The output of the attention layer.
"""
# The `Attention` class can call different attention processors / attention functions
# here we simply pass along all tensors to the selected processor class
# For standard processors that are defined here, `**cross_attention_kwargs` is empty
return self.processor(
self,
hidden_states,
encoder_hidden_states=encoder_hidden_states,
attention_mask=attention_mask,
**cross_attention_kwargs,
)
Attention.forward = forward