Spaces:
Running
on
Zero
Running
on
Zero
File size: 2,269 Bytes
1de8821 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
from typing import List
import torch
import torch.nn as nn
from torch.utils.checkpoint import checkpoint
from model.open_clip import CLIP, tokenize
### pretrained model path
# _VITH14 = dict(
# laion2b_s32b_b79k=_pcfg(hf_hub='laion/CLIP-ViT-H-14-laion2B-s32B-b79K/'),
# )
class FrozenOpenCLIPEmbedder(nn.Module):
"""
Uses the OpenCLIP transformer encoder for text
"""
LAYERS = [
#"pooled",
"last",
"penultimate"
]
def __init__(self, embed_dim, vision_cfg, text_cfg, layer="last"):
super().__init__()
assert layer in self.LAYERS
# model, _, _ = open_clip.create_model_and_transforms(arch, device=torch.device('cpu'), pretrained=version)
model = CLIP(embed_dim, dict(vision_cfg), dict(text_cfg))
del model.visual
self.model = model
self.layer = layer
if self.layer == "last":
self.layer_idx = 0
elif self.layer == "penultimate":
self.layer_idx = 1
else:
raise NotImplementedError()
def forward(self, tokens):
z = self.encode_with_transformer(tokens)
return z
def encode_with_transformer(self, text):
x = self.model.token_embedding(text) # [batch_size, n_ctx, d_model]
x = x + self.model.positional_embedding
x = x.permute(1, 0, 2) # NLD -> LND
x = self.text_transformer_forward(x, attn_mask=self.model.attn_mask)
x = x.permute(1, 0, 2) # LND -> NLD
x = self.model.ln_final(x)
return x
def text_transformer_forward(self, x: torch.Tensor, attn_mask = None):
for i, r in enumerate(self.model.transformer.resblocks):
if i == len(self.model.transformer.resblocks) - self.layer_idx:
break
if self.model.transformer.grad_checkpointing and not torch.jit.is_scripting():
x = checkpoint(r, x, attn_mask)
else:
x = r(x, attn_mask=attn_mask)
return x
def encode(self, text: List[str]) -> torch.Tensor:
# convert a batch of text to tensor
tokens = tokenize(text)
# move tensor to model device
tokens = tokens.to(next(self.model.parameters()).device)
return self(tokens)
|