|
|
from typing import Optional, Tuple
|
|
|
|
|
|
import torch
|
|
|
import torch.nn.functional as F
|
|
|
from einops import rearrange
|
|
|
from .attn_mask import RadialAttention, MaskMap
|
|
|
|
|
|
def fill_radial_cache(radial_cache, nb_layers, lat_t, lat_h, lat_w):
|
|
|
MaskMap._log_mask = None
|
|
|
|
|
|
for i in range(nb_layers):
|
|
|
radial_cache[i] = WanSparseAttnProcessor2_0(i, lat_t, lat_h, lat_w)
|
|
|
|
|
|
class WanSparseAttnProcessor2_0:
|
|
|
mask_map = None
|
|
|
dense_timestep = 0
|
|
|
dense_block = 0
|
|
|
decay_factor = 1.0
|
|
|
sparse_type = "radial"
|
|
|
use_sage_attention = True
|
|
|
|
|
|
def __init__(self, layer_idx, lat_t, lat_h, lat_w):
|
|
|
self.layer_idx = layer_idx
|
|
|
self.mask_map = MaskMap(video_token_num=lat_t * lat_h * lat_w // 4 , num_frame=lat_t)
|
|
|
def __call__(
|
|
|
self,
|
|
|
qkv_list,
|
|
|
timestep_no = 0,
|
|
|
) -> torch.Tensor:
|
|
|
query, key, value = qkv_list
|
|
|
|
|
|
batch_size = query.shape[0]
|
|
|
|
|
|
query = rearrange(query, "b s h d -> (b s) h d")
|
|
|
key = rearrange(key, "b s h d -> (b s) h d")
|
|
|
value = rearrange(value, "b s h d -> (b s) h d")
|
|
|
if timestep_no < self.dense_timestep or self.layer_idx < self.dense_block or self.sparse_type == "dense":
|
|
|
hidden_states = RadialAttention(
|
|
|
query=query, key=key, value=value, mask_map=self.mask_map, sparsity_type="dense", block_size=128, decay_factor=self.decay_factor, model_type="wan", pre_defined_mask=None, use_sage_attention=self.use_sage_attention
|
|
|
)
|
|
|
else:
|
|
|
|
|
|
hidden_states = RadialAttention(
|
|
|
query=query, key=key, value=value, mask_map=self.mask_map, sparsity_type="radial", block_size=128, decay_factor=self.decay_factor, model_type="wan", pre_defined_mask=None, use_sage_attention=self.use_sage_attention
|
|
|
)
|
|
|
|
|
|
hidden_states = rearrange(hidden_states, "(b s) h d -> b s h d", b=batch_size)
|
|
|
|
|
|
return hidden_states
|
|
|
|