File size: 7,929 Bytes
5ee612c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import logging
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
import torch
from PIL import Image
from typing import List, Optional, Tuple, Union, cast
import numpy as np
from tqdm import tqdm
import sys
import os
from torch.utils.data import DataLoader
from torch import nn


logger = logging.getLogger(__name__)

class Qwen2VLForEmbedding(Qwen2VLForConditionalGeneration):
    def __init__(self, config):
        super().__init__(config)
    
    def forward(
        self,
        input_ids: torch.LongTensor = None,
        attention_mask: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        past_key_values: Optional[List[torch.FloatTensor]] = None,
        inputs_embeds: Optional[torch.FloatTensor] = None,
        labels: Optional[torch.LongTensor] = None,
        use_cache: Optional[bool] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
        pixel_values: Optional[torch.Tensor] = None,
        pixel_values_videos: Optional[torch.FloatTensor] = None,
        image_grid_thw: Optional[torch.LongTensor] = None,
        video_grid_thw: Optional[torch.LongTensor] = None,
        rope_deltas: Optional[torch.LongTensor] = None,
    ): 
        output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
        output_hidden_states = (
            output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
        )
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        if inputs_embeds is None:
            inputs_embeds = self.model.embed_tokens(input_ids)
            if pixel_values is not None:
                pixel_values = pixel_values.type(self.visual.get_dtype())
                image_embeds = self.visual(pixel_values, grid_thw=image_grid_thw)
                image_mask = (input_ids == self.config.image_token_id).unsqueeze(-1).expand_as(inputs_embeds)
                image_embeds = image_embeds.to(inputs_embeds.device, inputs_embeds.dtype)
                inputs_embeds = inputs_embeds.masked_scatter(image_mask, image_embeds)

            if pixel_values_videos is not None:
                pixel_values_videos = pixel_values_videos.type(self.visual.get_dtype())
                video_embeds = self.visual(pixel_values_videos, grid_thw=video_grid_thw)
                video_mask = (input_ids == self.config.video_token_id).unsqueeze(-1).expand_as(inputs_embeds)
                video_embeds = video_embeds.to(inputs_embeds.device, inputs_embeds.dtype)
                inputs_embeds = inputs_embeds.masked_scatter(video_mask, video_embeds)

            if attention_mask is not None:
                attention_mask = attention_mask.to(inputs_embeds.device)

        outputs = self.model(
            input_ids=None,
            position_ids=position_ids,
            attention_mask=attention_mask,
            past_key_values=past_key_values,
            inputs_embeds=inputs_embeds,
            use_cache=use_cache,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        hidden_states = outputs[0] 
        embeddings = hidden_states[:, -1, :]
        embeddings = torch.nn.functional.normalize(embeddings, dim=-1)
        return embeddings

    def set_processor(self, model_name_or_path, max_len=3072, eos_token_id=151643, min_image_token=64, max_image_token=2500):
        self.max_len = max_len
        self.eos_token_id = eos_token_id
        self.processor = AutoProcessor.from_pretrained(
            model_name_or_path,
            min_pixels=min_image_token * 28 * 28,
            max_pixels=max_image_token * 28 * 28
        )
        assert self.processor.tokenizer.padding_side == 'left'
    
    def prepare_text_input(self, image=None, text=None, q_or_c=None, task_instruction=None):
        assert q_or_c in ["query", "candidate", "q", "c"]
        
        prompt_template = "<|im_start|>system\n{}<|im_end|>\n<|im_start|>user\n{}<|im_end|>\n<|im_start|>assistant\n<|endoftext|>"
        
        if "q" in q_or_c:
            if task_instruction is None:
                system_prompt = "You are a helpful assistant."
                task_instruction_example_csr = "Represent the given image with the given query."
                print(f"""Warning: For optimal performance, UniSE-MLLM requires the task instruction to be specified in the query. For example, for the Composed Screenshot Retrieval task, you might use a specific instruction like: {task_instruction_example_csr}.""")
            else:
                system_prompt = task_instruction

            if image is None:
                user_prompt = text
            else:
                if text is not None:
                    user_prompt = f"Query:{text}<|vision_start|><|image_pad|><|vision_end|>"
                else:
                    user_prompt = "<|vision_start|><|image_pad|><|vision_end|>"
            text_input = prompt_template.format(system_prompt, user_prompt)
        else:
            if text is not None:
                system_prompt = "Represent the given text."
                user_prompt = f"{text}"
            if image is not None:
                system_prompt = "Represent the given text-rich image, focusing on extracting and interpreting both its rich text content and visual features."
                user_prompt = f"<|vision_start|><|image_pad|><|vision_end|>"
            text_input = prompt_template.format(system_prompt, user_prompt)
        return text_input

    def data_process(self, images=None, text=None, q_or_c=None, task_instruction=None):
        if images is not None:
            _is_list = isinstance(images, list)
        elif text is not None:
            _is_list = isinstance(text, list)
        else:
            raise ValueError("images and text cannot be both None.")
        
        assert q_or_c in ["query", "candidate", "q", "c"]

        if not _is_list :
            text_input = self.prepare_text_input(images, text, q_or_c, task_instruction)
            text_input = [text_input]
            

            if images is not None:
                images = Image.open(images).convert("RGB")
                images = [images]
                inputs = self.processor(images=images, text=text_input, return_tensors="pt", padding=True, truncation=True,  max_length=self.max_len)
            else:
                inputs = self.processor(text=text_input, return_tensors="pt", padding=True, truncation=True,  max_length=self.max_len)
            if inputs.input_ids.size(-1) == self.max_len:
                inputs.input_ids[:, -1] = self.eos_token_id
            assert (inputs.input_ids[:, -1] == self.eos_token_id).all()
            assert (inputs.attention_mask[:, -1] == 1).all()

        else:
            if text is None:
                text = [None] * len(images)
            text_input = [self.prepare_text_input(_image, _text, q_or_c, task_instruction) for _image, _text in zip(images, text)]
            
            if images is not None:
                images = [Image.open(_image).convert("RGB") for _image in images]
                inputs = self.processor(images=images, text=text_input, return_tensors="pt", padding=True, truncation=True,  max_length=self.max_len)
            else:
                inputs = self.processor(text=text_input, return_tensors="pt", padding=True, truncation=True, max_length=self.max_len)
            if inputs.input_ids.size(-1) == self.max_len:
                inputs.input_ids[:, -1] = self.eos_token_id
            assert (inputs.input_ids[:, -1] == self.eos_token_id).all()
            assert (inputs.attention_mask[:, -1] == 1).all()

        inputs = inputs.to(self.device)

        return inputs