| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| """Misc functions and modules for Cosmos-Embed1.""" |
|
|
| import functools |
| from logging import getLogger |
| from typing import Callable, Optional, Protocol |
|
|
| import torch |
| import torch.distributed as dist |
| import torch.nn as nn |
|
|
| logger = getLogger(__file__) |
|
|
|
|
| def get_rank(group: Optional[dist.ProcessGroup] = None) -> int: |
| """Get the rank (GPU device) of the worker. |
| |
| Returns: |
| rank (int): The rank of the worker. |
| """ |
| rank = 0 |
| if dist.is_available() and dist.is_initialized(): |
| rank = dist.get_rank(group) |
| return rank |
|
|
|
|
| def barrier() -> None: |
| """Barrier for all GPUs.""" |
| if dist.is_available() and dist.is_initialized(): |
| dist.barrier() |
|
|
|
|
| def rank0_first(func: Callable) -> Callable: |
| """Run the function on rank 0 first, then on other ranks.""" |
|
|
| @functools.wraps(func) |
| def wrapper(*args, **kwargs): |
| if get_rank() == 0: |
| result = func(*args, **kwargs) |
| barrier() |
| if get_rank() != 0: |
| result = func(*args, **kwargs) |
| return result |
|
|
| return wrapper |
|
|
|
|
| def add_docstring(docstring: str): |
| def decorator(func): |
| func.__doc__ = docstring |
| return func |
|
|
| return decorator |
|
|
|
|
| INIT_DOCSTRING = """ |
| Constructor for encoding module. |
| |
| Args: |
| embed_dim: size of embedding vectors, e.g. x.shape[3]. |
| max_len: maximum length of temporal sequence, e.g. x.shape[1]. |
| """ |
|
|
| FORWARD_DOCSTRING = """ |
| Forward function. |
| |
| Args: |
| x (`torch.Tensor`): rank 4 tensor to add spatio-temporal encodings to. |
| |
| Returns: |
| `torch.Tensor` of rank 4. |
| """ |
|
|
|
|
| class EncodingProtocol(Protocol): |
| def __init__(self, embed_dim: int, max_len: int) -> None: |
| pass |
|
|
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| pass |
|
|
|
|
| def interpolate_temp_pos_embed(temp_embed: torch.Tensor, num_frames: int) -> torch.Tensor: |
| """Linearly interpolates temporal encoding from `temp_embed.shape[0] to num_frames.""" |
|
|
| temp_embed_resized = temp_embed.permute(1, 0).unsqueeze(0) |
| temp_embed_resized = nn.functional.interpolate( |
| temp_embed_resized, |
| size=(num_frames), |
| mode="linear", |
| align_corners=False, |
| ) |
| return temp_embed_resized.squeeze(0).permute(1, 0) |
|
|
|
|
| class TemporalParameterEncoding(nn.Module, EncodingProtocol): |
| @add_docstring(INIT_DOCSTRING) |
| def __init__(self, embed_dim: int, max_len: int) -> None: |
| super().__init__() |
| self.embed_dim = embed_dim |
| self.max_len = max_len |
| self.temp_embed = nn.Parameter(torch.zeros(self.max_len, self.embed_dim)) |
| nn.init.trunc_normal_(self.temp_embed, std=0.02) |
|
|
| @add_docstring(FORWARD_DOCSTRING) |
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| _, t, _, _ = x.shape |
| if t != self.temp_embed.shape[0]: |
| logger.debug(f"Interpolating temporal encodings from {self.temp_embed.shape[0]} to {t}.") |
| temp_embed = interpolate_temp_pos_embed(self.temp_embed, t) |
| else: |
| temp_embed = self.temp_embed |
| temp_embed = temp_embed.unsqueeze(0).unsqueeze(2) |
| return x + temp_embed |
|
|
|
|
| def create_neighbor_weight_matrix(num_tokens: int, device: torch.device, dtype: torch.dtype) -> torch.Tensor: |
| indices = torch.arange(num_tokens, dtype=dtype, device=device) |
| abs_diff = torch.abs(indices.unsqueeze(0) - indices.unsqueeze(1)) |
| weights = 1.0 / (2.0**abs_diff) |
| return weights |
|
|
|
|
| def compute_t_adj(x: torch.Tensor, weights: torch.Tensor) -> torch.Tensor: |
| return torch.einsum("bfnd,nk->bfkd", x, weights) |
|
|
|
|
| def token_propagation(x: torch.Tensor, num_tokens: int) -> torch.Tensor: |
| """Apply neighboring token propagation update.""" |
| weights = create_neighbor_weight_matrix(num_tokens, x.device, x.dtype) |
| t_adj = compute_t_adj(x, weights) |
| return x + t_adj - t_adj.detach() |
|
|
|
|
| class NeighboringTokenPropagationEncoding(TemporalParameterEncoding): |
| """ |
| Neighboring Token Propagation method inspired by Momentor (https://arxiv.org/abs/2402.11435) |
| """ |
|
|
| @add_docstring(FORWARD_DOCSTRING) |
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| _, t, q, _ = x.shape |
| if t != self.temp_embed.shape[0]: |
| logger.debug(f"Interpolating temporal encodings from {self.temp_embed.shape[0]} to {t}.") |
| temp_embed = interpolate_temp_pos_embed(self.temp_embed, t) |
| else: |
| temp_embed = self.temp_embed |
| temp_embed = temp_embed.unsqueeze(0).unsqueeze(2) |
|
|
| if self.training: |
| temp_embed = token_propagation(temp_embed, q) |
| return x + temp_embed |
|
|
|
|
| class EncodingFactory(nn.Module): |
| def __init__(self, encoding_type: str, embed_dim: int, max_len: int) -> None: |
| super().__init__() |
| fn = { |
| "temporal_parameter": TemporalParameterEncoding, |
| "neighboring_token_propagation": NeighboringTokenPropagationEncoding, |
| }[encoding_type] |
| self.encoding = fn(embed_dim=embed_dim, max_len=max_len) |
|
|
| @add_docstring(FORWARD_DOCSTRING) |
| def forward(self, x: torch.Tensor) -> torch.Tensor: |
| return self.encoding(x) |
|
|