lixin4ever's picture
first commit (#1)
e52682b verified
raw
history blame
9.67 kB
# Copyright 2024 Alibaba DAMO Academy
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import einops
import torch
import torch.nn as nn
import torch.nn.functional as F
from timm.models.regnet import RegStage
from timm.models.layers import LayerNorm, LayerNorm2d
from transformers import TRANSFORMERS_CACHE
def parse_snapshot_folder(repo_id, cache_dir=None, repo_type="model"):
revision = "main"
# 1. parse the downloaded cache folder
if cache_dir is None:
cache_dir = TRANSFORMERS_CACHE
else:
cache_dir = cache_dir
object_id = repo_id.replace("/", "--")
repo_cache = os.path.join(cache_dir, f"{repo_type}s--{object_id}")
# 2. resolve refs (for instance to convert main to the associated commit sha)
refs_dir = os.path.join(repo_cache, "refs")
if os.path.isdir(refs_dir):
revision_file = os.path.join(refs_dir, revision)
if os.path.isfile(revision_file):
with open(revision_file) as f:
revision = f.read()
# 3. acquire the snapshot folder
folder = os.path.join(repo_cache, "snapshots", revision)
return folder
def load_mm_projector(model_path, cache_dir=None, token=None):
if os.path.exists(os.path.join(model_path, 'mm_projector.bin')):
is_local = True
folder = model_path
else:
is_local = False
folder = parse_snapshot_folder(model_path, cache_dir=cache_dir, repo_type="model")
if not os.path.exists(os.path.join(folder, 'mm_projector.bin')):
# downloading from remote repo
from huggingface_hub import snapshot_download
snapshot_download(repo_id=model_path, cache_dir=cache_dir, token=token)
mm_projector_weights = torch.load(os.path.join(folder, 'mm_projector.bin'), map_location='cpu')
mm_projector_weights = {k: v.to(torch.float16) for k, v in mm_projector_weights.items()}
return mm_projector_weights
class IdentityMap(nn.Module):
def __init__(self):
super().__init__()
def forward(self, x, *args, **kwargs):
return x
@property
def config(self):
return {"mm_projector_type": 'identity'}
class SimpleResBlock(nn.Module):
def __init__(self, channels):
super().__init__()
self.pre_norm = nn.LayerNorm(channels)
self.proj = nn.Sequential(
nn.Linear(channels, channels),
nn.GELU(),
nn.Linear(channels, channels)
)
def forward(self, x):
x = self.pre_norm(x)
return x + self.proj(x)
def build_vision_projector(config, delay_load=False, **kwargs):
projector_type = getattr(config, 'mm_projector_type', 'linear')
mlp_gelu_match = re.match(r'^mlp(\d+)x_gelu$', projector_type)
if mlp_gelu_match:
mlp_depth = int(mlp_gelu_match.group(1))
modules = [nn.Linear(config.mm_hidden_size, config.hidden_size)]
for _ in range(1, mlp_depth):
modules.append(nn.GELU())
modules.append(nn.Linear(config.hidden_size, config.hidden_size))
return nn.Sequential(*modules)
if projector_type == "linear":
# NOTE: for both linear and mlp2x_gelu projector type, mean pooling is adopted to aggreate video features
return nn.Linear(config.mm_hidden_size, config.hidden_size)
elif projector_type == "stc_connector":
return STCConnector(config)
elif projector_type == "stp_connector":
return STPConnector(config)
elif projector_type == "stc_connector_v35":
return STCConnectorV35(config)
elif projector_type == "spatial_conv":
return SpatialConv(config)
elif projector_type == "spatial_pool":
return SpatialPool(config)
if projector_type == 'identity':
return IdentityMap()
raise ValueError(f'Unknown projector type: {projector_type}')
def build_audio_projector(config, delay_load=False, **kwargs):
projector_type = getattr(config, 'mm_projector_a_type', 'linear')
mlp_gelu_match = re.match(r'^mlp(\d+)x_gelu$', projector_type)
if mlp_gelu_match:
mlp_depth = int(mlp_gelu_match.group(1))
modules = [nn.Linear(config.mm_hidden_size_a, config.hidden_size_a)]
for _ in range(1, mlp_depth):
modules.append(nn.GELU())
modules.append(nn.Linear(config.hidden_size_a, config.hidden_size_a))
return nn.Sequential(*modules)
if projector_type == "linear":
# note that for both linear and mlp2x_gelu projector type, mean pooling is adopted to aggreate video features
return nn.Linear(config.mm_hidden_size_a, config.hidden_size_a)
if projector_type == 'identity':
return IdentityMap()
def build_mlp(depth, hidden_size, output_hidden_size):
modules = [nn.Linear(hidden_size, output_hidden_size)]
for _ in range(1, depth):
modules.append(nn.GELU())
modules.append(nn.Linear(output_hidden_size, output_hidden_size))
return nn.Sequential(*modules)
class STCConnector(nn.Module):
def __init__(self, config, downsample=(2, 2, 2), depth=4, mlp_depth=2):
"""Temporal Convolutional Vision-Language Connector.
Args:
config: config object.
downsample: (temporal, height, width) downsample rate.
depth: depth of the spatial interaction blocks.
mlp_depth: depth of the vision-language projector layers.
"""
super().__init__()
self.encoder_hidden_size = encoder_hidden_size = config.mm_hidden_size
self.hidden_size = hidden_size = config.hidden_size
self.output_hidden_size = output_hidden_size = config.hidden_size
# TODO: make these as config arguments
self.depth = depth
self.mlp_depth = mlp_depth
self.downsample = downsample
if depth != 0:
self.s1 = RegStage(
depth=depth,
in_chs=encoder_hidden_size,
out_chs=hidden_size,
stride=1,
dilation=1,
act_layer=nn.SiLU,
norm_layer=LayerNorm2d,
)
else:
self.s1 = nn.Identity()
self.sampler = nn.Sequential(
nn.Conv3d(
in_channels=hidden_size,
out_channels=hidden_size,
kernel_size=downsample,
stride=downsample,
padding=1,
bias=True
),
nn.SiLU()
)
if depth != 0:
self.s2 = RegStage(
depth=depth,
in_chs=hidden_size,
out_chs=hidden_size,
stride=1,
dilation=1,
act_layer=nn.SiLU,
norm_layer=LayerNorm2d,
)
else:
self.s2 = nn.Identity()
self.readout = build_mlp(mlp_depth, hidden_size, output_hidden_size)
def forward(self, x):
"""Aggregate tokens on the temporal and spatial dimensions.
Args:
x: input tokens [b, t, h, w, d] / [b, t, l, d]
Returns:
aggregated tokens [b, l, d]
"""
t = x.size(1)
if x.ndim == 4:
hw = int(x.size(2) ** 0.5)
x = einops.rearrange(x, "b t (h w) d -> b d t h w", h=hw, w=hw)
elif x.ndim == 5:
x = einops.rearrange(x, "b t h w d -> b d t h w")
x = einops.rearrange(x, "b d t h w -> (b t) d h w")
# 1. the first stage of the adapter
x = self.s1(x)
x = einops.rearrange(x, "(b t) d h w -> b d t h w", t=t)
# 2. downsampler
x = self.sampler(x)
new_t = x.size(2)
# 3. the second stage of the adapter
x = einops.rearrange(x, "b d t h w -> (b t) d h w")
x = self.s2(x)
x = einops.rearrange(x, "(b t) d h w -> b (t h w) d", t=new_t)
x = self.readout(x)
return x
class STPConnector(STCConnector):
def __init__(self, config, downsample=(2, 2, 2), depth=4, mlp_depth=2):
super().__init__(config=config, downsample=downsample, depth=depth, mlp_depth=mlp_depth)
self.sampler = nn.Sequential(nn.AvgPool3d(downsample), nn.SiLU())
class STCConnectorV35(STCConnector):
def __init__(self, config, downsample=(2, 2, 2), depth=4, mlp_depth=2):
super().__init__(config=config, downsample=downsample, depth=depth, mlp_depth=mlp_depth)
self.sampler = nn.Sequential(
nn.Conv3d(
in_channels=self.hidden_size,
out_channels=self.hidden_size,
kernel_size=downsample,
stride=downsample,
padding=0,
bias=True
),
nn.SiLU())
class SpatialConv(STCConnector):
def __init__(self, config, downsample=(1, 2, 2), depth=0, mlp_depth=2):
super().__init__(config=config, downsample=downsample, depth=depth, mlp_depth=mlp_depth)
class SpatialPool(STPConnector):
def __init__(self, config, downsample=(1, 2, 2), depth=0, mlp_depth=2):
super().__init__(config=config, downsample=downsample, depth=depth, mlp_depth=mlp_depth)