File size: 6,582 Bytes
afda258 fe70bc2 3f69f5c afda258 fe70bc2 afda258 6554f07 afda258 65044cf 2f1cbf2 afda258 5d4c633 afda258 37335e4 afda258 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
from transformers import AutoTokenizer, AutoModel
import clip
import skimage.io as io
import PIL.Image
from transformers import GPT2Tokenizer, GPT2LMHeadModel, AdamW, get_linear_schedule_with_warmup
import os
import pickle
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.nn import functional as F
import pandas as pd
from tqdm import tqdm
from PIL import Image
from typing import Tuple
import numpy as np
import time
import json
import nltk
nltk.download('punkt')
class Adapter(nn.Module):
def forward(self, x):
return self.model(x)
def __init__(self, sizes: Tuple[int, ...], bias=True, act=nn.Tanh):
super(Adapter, self).__init__()
layers = []
for i in range(len(sizes) -1):
layers.append(nn.Linear(sizes[i], sizes[i + 1], bias=bias))
if i < len(sizes) - 2:
layers.append(act())
self.model = nn.Sequential(*layers)
class ClipGPT2Model(nn.Module):
def __init__(self, img_feature_length, img_feature_size = 512):
super(ClipGPT2Model, self).__init__()
self.img_feature_length = img_feature_length
self.gpt = GPT2LMHeadModel.from_pretrained('gpt2')
self.gpt_embedding_size = self.gpt.transformer.wte.weight.shape[1]
self.clip_project = Adapter((img_feature_size,
(self.gpt_embedding_size * img_feature_length) // 2,
self.gpt_embedding_size * img_feature_length))
def get_dummy_token(self,
batch_size: int,
device: torch.device) -> torch.Tensor:
return torch.zeros(batch_size, self.img_feature_length, dtype=torch.int64, device=device)
def forward(self,
tokens: torch.Tensor,
feature: torch.Tensor,
mask = None,
labels = None):
embedding_text = self.gpt.transformer.wte(tokens)
feature_projections = self.clip_project(feature).view(-1, self.img_feature_length, self.gpt_embedding_size)
embedding_cat = torch.cat((feature_projections, embedding_text), dim=1)
if labels is not None:
dummy_token = self.get_dummy_token(tokens.shape[0], tokens.device)
labels = torch.cat((dummy_token, tokens), dim=1)
out = self.gpt(inputs_embeds=embedding_cat, labels=labels, attention_mask=mask)
return out
def generate_beam(
model,
tokenizer,
beam_size: int = 10,
prompt=None,
embed=None,
entry_length=76,
temperature=0.9,
stop_token: str = ".",
):
model.eval()
stop_token_index = tokenizer.encode(stop_token)[0]
tokens = None
scores = None
device = next(model.parameters()).device
seq_lengths = torch.ones(beam_size, device=device)
is_stopped = torch.zeros(beam_size, device=device, dtype=torch.bool)
with torch.no_grad():
if embed is not None:
generated = embed
else:
if tokens is None:
tokens = torch.tensor(tokenizer.encode(prompt))
tokens = tokens.unsqueeze(0).to(device)
generated = model.gpt.transformer.wte(tokens)
for i in range(entry_length):
outputs = model.gpt(inputs_embeds=generated)
logits = outputs.logits
logits = logits[:, -1, :] / (temperature if temperature > 0 else 1.0)
logits = logits.softmax(-1).log()
if scores is None:
scores, next_tokens = logits.topk(beam_size, -1)
generated = generated.expand(beam_size, *generated.shape[1:])
next_tokens, scores = next_tokens.permute(1, 0), scores.squeeze(0)
if tokens is None:
tokens = next_tokens
else:
tokens = tokens.expand(beam_size, *tokens.shape[1:])
tokens = torch.cat((tokens, next_tokens), dim=1)
else:
logits[is_stopped] = -float(np.inf)
logits[is_stopped, 0] = 0
scores_sum = scores[:, None] + logits
seq_lengths[~is_stopped] += 1
scores_sum_average = scores_sum / seq_lengths[:, None]
scores_sum_average, next_tokens = scores_sum_average.view(-1).topk(
beam_size, -1
)
next_tokens_source = next_tokens // scores_sum.shape[1]
seq_lengths = seq_lengths[next_tokens_source]
next_tokens = next_tokens % scores_sum.shape[1]
next_tokens = next_tokens.unsqueeze(1)
tokens = tokens[next_tokens_source]
tokens = torch.cat((tokens, next_tokens), dim=1)
generated = generated[next_tokens_source]
scores = scores_sum_average * seq_lengths
is_stopped = is_stopped[next_tokens_source]
next_token_embed = model.gpt.transformer.wte(next_tokens.squeeze()).view(
generated.shape[0], 1, -1
)
generated = torch.cat((generated, next_token_embed), dim=1)
is_stopped = is_stopped + next_tokens.eq(stop_token_index).squeeze()
if is_stopped.all():
break
scores = scores / seq_lengths
output_list = tokens.cpu().numpy()
output_texts = [
tokenizer.decode(output[: int(length)])
for output, length in zip(output_list, seq_lengths)
]
order = scores.argsort(descending=True)
output_texts = [output_texts[i] for i in order]
return output_texts
def generate_caption_clipgpt(img):
prefix_length = 10
model = ClipGPT2Model(prefix_length)
model.load_state_dict(torch.load('model_train_best_run_clipGPT.pt', map_location=torch.device('cpu')))
model = model.eval()
device=torch.device('cpu')
model = model.to(device)
clip_model, preprocess = clip.load('ViT-B/32', device, jit=False)
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
start_time = time.time()
pil_image = PIL.Image.fromarray(img)
image = preprocess(pil_image).unsqueeze(0).to(device)
with torch.no_grad():
prefix = clip_model.encode_image(image).to(device, dtype=torch.float32)
prefix_embed = model.clip_project(prefix).reshape(1, prefix_length, -1)
beam_caption = generate_beam(model, tokenizer, embed=prefix_embed)[0]
end_time = time.time()
print("--- Time taken to generate: %s seconds ---" % (end_time - start_time))
return beam_caption
|