| import re |
| import types |
| import io |
| import torch |
| import os |
| from PIL import Image |
| import argparse |
| from qwen_vl_utils import fetch_image |
|
|
| from transformers import ( |
| ProcessorMixin, |
| SiglipImageProcessor, |
| BatchFeature, |
| Qwen2VLImageProcessor, |
| PreTrainedTokenizer, |
| AutoImageProcessor, |
| CLIPImageProcessor, |
| ) |
|
|
| from .utils import ( |
| process_anyres_image, |
| preprocess_image_ovis, |
| ovis_template_process, |
| BLACK_IMG_ENV, |
| DEFAULT_IM_END_TOKEN, |
| DEFAULT_IM_START_TOKEN, |
| DEFAULT_IMAGE_TOKEN, |
| DEFAULT_VI_END_TOKEN, |
| DEFAULT_VI_START_TOKEN, |
| DEFAULT_VIDEO_TOKEN, |
| IMAGE_TOKEN_INDEX, |
| SEQ_MAX_LEN, |
| IGNORE_INDEX, |
| ) |
|
|
| siglip_processor_config = { |
| "do_normalize": True, |
| "do_rescale": True, |
| "do_resize": True, |
| "image_mean": [ |
| 0.5, |
| 0.5, |
| 0.5 |
| ], |
| "image_processor_type": "SiglipImageProcessor", |
| "image_std": [ |
| 0.5, |
| 0.5, |
| 0.5 |
| ], |
| "processor_class": "SiglipProcessor", |
| "resample": 3, |
| "rescale_factor": 0.00392156862745098, |
| "size": { |
| "height": 384, |
| "width": 384 |
| } |
| } |
|
|
| qwen2vl_processor_config = { |
| "min_pixels": 3136, |
| "max_pixels": 12845056, |
| "patch_size": 14, |
| "temporal_patch_size": 2, |
| "merge_size": 2, |
| "image_mean": [ |
| 0.48145466, |
| 0.4578275, |
| 0.40821073 |
| ], |
| "image_std": [ |
| 0.26862954, |
| 0.26130258, |
| 0.27577711 |
| ], |
| "image_processor_type": "Qwen2VLImageProcessor", |
| "processor_class": "Qwen2VLProcessor" |
| } |
|
|
| aimv2_processor_config = { |
| "crop_size": { |
| "height": 448, |
| "width": 448 |
| }, |
| "do_center_crop": True, |
| "do_convert_rgb": True, |
| "do_normalize": True, |
| "do_rescale": True, |
| "do_resize": True, |
| "image_mean": [ |
| 0.48145466, |
| 0.4578275, |
| 0.40821073 |
| ], |
| "image_processor_type": "CLIPImageProcessor", |
| "image_std": [ |
| 0.26862954, |
| 0.26130258, |
| 0.27577711 |
| ], |
| "resample": 3, |
| "rescale_factor": 0.00392156862745098, |
| "size": { |
| "shortest_edge": 448 |
| } |
| } |
|
|
|
|
| class ValleyProcessor(ProcessorMixin): |
| attributes = ["tokenizer"] |
| optional_attributes = [ |
| "max_pixels", |
| "min_pixels", |
| "anyres", |
| "only_crop_single_image", |
| "grid_pinpoints", |
| "use_special_start_end_token", |
| "only_navit", |
| "chat_template", |
| "process_mode", |
| ] |
| tokenizer_class = "AutoTokenizer" |
|
|
| def __init__(self, tokenizer=None, chat_template=None, **kwargs): |
| super().__init__(tokenizer=tokenizer, chat_template=chat_template, **kwargs) |
| self.black_img = BLACK_IMG_ENV |
| self.siglip_image_processor = SiglipImageProcessor.from_dict(siglip_processor_config) |
| self.qwen2vl_image_processor = Qwen2VLImageProcessor.from_dict(qwen2vl_processor_config) |
| self.aimv2_image_processor = CLIPImageProcessor.from_dict(aimv2_processor_config) |
| self.anyres = kwargs.get("anyres", True) |
| self.grid_pinpoints = kwargs.get("grid_pinpoints", "(1x1),...,(3x3)") |
| self.only_crop_single_image = kwargs.get("only_crop_single_image", True) |
| self.use_special_start_end_token = kwargs.get("use_special_start_end_token", True) |
| self.only_navit = kwargs.get("only_navit", False) |
| self.process_mode = kwargs.get("process_mode", "qwen3") |
|
|
| self.aimv2_crop_size = self.aimv2_image_processor.size["shortest_edge"] |
|
|
|
|
| def preprocess_images_siglip(self, images) -> torch.FloatTensor: |
| if isinstance(images[0], str): |
| images_pil = [Image.open(img).convert("RGB") for img in images] |
| elif isinstance(images[0], Image.Image): |
| images_pil = [img.convert("RGB") for img in images] |
| elif isinstance(images[0], bytes): |
| images_pil = [Image.open(io.BytesIO(img)).convert("RGB") for img in images] |
| else: |
| raise ValueError("unsupported type") |
|
|
| processed_images = [] |
| have_multi_images = len(images_pil) > 1 |
| for img in images_pil: |
| if self.anyres: |
| if not self.only_crop_single_image or not have_multi_images: |
| image = process_anyres_image(img, self.siglip_image_processor, self.grid_pinpoints) |
| else: |
| image = [self.siglip_image_processor(img, return_tensors="pt")["pixel_values"][0]] |
| else: |
| image = self.siglip_image_processor(img, return_tensors="pt")["pixel_values"][0] |
| |
| processed_images.append(image) |
|
|
| if not self.anyres: |
| return torch.stack(processed_images, dim=0) |
| else: |
| return [torch.stack(img, dim=0) for img in processed_images] |
|
|
| def preprocess_images_qwen2vl(self, images) -> dict: |
| if isinstance(images[0], str): |
| images_pil = [Image.open(img).convert("RGB") for img in images] |
| elif isinstance(images[0], Image.Image): |
| images_pil = [img.convert("RGB") for img in images] |
| elif isinstance(images[0], bytes): |
| images_pil = [Image.open(io.BytesIO(img)).convert("RGB") for img in images] |
| else: |
| raise ValueError("unsupported type") |
|
|
| image_sizes = [[x.size for x in images_pil]] |
| data_dict_qwen2vl = self.qwen2vl_image_processor( |
| [fetch_image({"image": img}) for img in images_pil], |
| return_tensors="pt" |
| ) |
|
|
| data_dict_qwen2vl["image_sizes"] = image_sizes |
|
|
| return data_dict_qwen2vl |
|
|
| def preprocess_multimodal(self, conversations): |
| for sentence in conversations: |
| if sentence["role"] == "system": |
| continue |
| segs = re.split(DEFAULT_IMAGE_TOKEN, sentence["content"]) |
| if self.use_special_start_end_token: |
| sentence["content"] = (DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN).join(segs) |
| else: |
| sentence["content"] = DEFAULT_IMAGE_TOKEN.join(segs) |
|
|
| return conversations |
|
|
| def preprocess_images_aimv2(self, images) -> torch.FloatTensor: |
| processed_images = [] |
| image_sizes_list = [] |
| have_multi_images = len(images) > 1 |
| for image_file in images: |
| if isinstance(image_file, str): |
| img = Image.open(image_file).convert("RGB") |
| elif isinstance(image_file, Image.Image): |
| img = image_file.convert("RGB") |
| elif isinstance(image_file, bytes): |
| img = Image.open(io.BytesIO(image_file)).convert("RGB") |
| else: |
| raise ValueError("unsupported type") |
| image_sizes_list.append(img.size) |
| if self.anyres: |
| if not self.only_crop_single_image or not have_multi_images: |
| img, ovis_image_placeholders = preprocess_image_ovis(img, image_processor=self.aimv2_image_processor, crop_size=self.aimv2_crop_size, max_partition=9) |
| else: |
| img, ovis_image_placeholders = preprocess_image_ovis(img, image_processor=self.aimv2_image_processor, crop_size=self.aimv2_crop_size, max_partition=1) |
| else: |
| img, ovis_image_placeholders = preprocess_image_ovis(img, image_processor=self.aimv2_image_processor, crop_size=self.aimv2_crop_size, max_partition=1) |
| img = (img, ovis_image_placeholders) |
| processed_images.append(img) |
|
|
| if not self.anyres: |
| return [(img[0], img[1]) for img in processed_images], [image_sizes_list] |
| else: |
| return [(torch.cat(img[0], dim=0), img[1]) for img in processed_images], [image_sizes_list] |
|
|
|
|
| def preprocess_qwen2( |
| self, |
| conversations, |
| tokenizer: PreTrainedTokenizer, |
| has_image: bool = False, |
| inference: bool = False, |
| only_mask_system: bool = False, |
| ) -> dict: |
| conv = types.SimpleNamespace( |
| system="You are a helpful assistant.", |
| roles=("user", "assistant"), |
| version="qwen2", |
| offset=0, |
| sep="<|im_start|>", |
| sep2="<|im_end|>\n", |
| ) |
|
|
| |
| assert conversations[0]["role"] == "system" |
| if conversations[0]["content"] == None: |
| conversations[0]["content"] = conv.system |
| |
| |
| for j, sentence in enumerate(conversations[1:]): |
| role = sentence["role"] |
| assert role == conv.roles[j % 2], "The conversation sequence is incorrect." |
| |
| conversation_str = tokenizer.apply_chat_template(conversations, tokenize=False, add_generation_prompt=inference) |
| |
| |
| rounds = conversation_str.split(conv.sep2) |
| input_ids_ = torch.tensor([], dtype=torch.int64) |
| targets_ = torch.tensor([], dtype=torch.int64) |
| for i, rou in enumerate(rounds): |
| if rou == "": |
| continue |
| if (not inference) or (i < (len(rounds) - 1)): |
| rou += conv.sep2 |
| if has_image: |
| cur_input_ids_ = self.tokenizer_image_token(rou, tokenizer, return_tensors='pt') |
| input_ids_ = torch.cat([input_ids_, cur_input_ids_], dim=0) |
| if only_mask_system: |
| mask_len = len(self.tokenizer_image_token(re.sub(rf'{conv.roles[0]}\n[\s\S]*', f'{conv.roles[0]}:', rou), |
| tokenizer)) |
| else: |
| mask_len = len(self.tokenizer_image_token(re.sub(rf'{conv.roles[1]}\n[\s\S]*', f'{conv.roles[1]}:', rou), |
| tokenizer)) |
| targets_ = torch.cat([targets_, torch.tensor([-100] * mask_len), cur_input_ids_[mask_len:]], dim=0) |
| else: |
| cur_input_ids_ = tokenizer(rou, return_tensors='pt')["input_ids"][0, :] |
| input_ids_ = torch.cat([input_ids_, cur_input_ids_], dim=0) |
| mask_len = len(tokenizer(re.sub(rf'{conv.roles[1]}\n[\s\S]*', rf'{conv.roles[1]}:', rou))["input_ids"][:]) |
| targets_ = torch.cat([targets_, torch.tensor([-100] * mask_len), cur_input_ids_[mask_len:]], dim=0) |
| |
| return {"input_ids": input_ids_, "labels": targets_} |
|
|
|
|
| def preprocess_qwen3( |
| self, |
| conversations, |
| tokenizer: PreTrainedTokenizer, |
| has_image: bool = False, |
| inference: bool = False, |
| only_mask_system: bool = False, |
| enable_thinking: bool = False, |
| ) -> dict: |
| conv = types.SimpleNamespace( |
| system="You are a helpful assistant.", |
| roles=("user", "assistant"), |
| version="qwen3", |
| offset=0, |
| sep="<|im_start|>", |
| sep2="<|im_end|>\n", |
| ) |
| |
|
|
| |
|
|
| assert conversations[0]["role"] == "system" |
| if conversations[0]["content"] == None: |
| conversations[0]["content"] = conv.system |
| |
| |
| |
| |
| |
| for j, sentence in enumerate(conversations[1:]): |
| role = sentence["role"] |
| assert role == conv.roles[j % 2], "The conversation sequence is incorrect." |
| |
| conversation_str = tokenizer.apply_chat_template(conversations, tokenize=False, add_generation_prompt=inference, enable_thinking=enable_thinking) |
| |
| |
| rounds = conversation_str.split(conv.sep2) |
| input_ids_ = torch.tensor([], dtype=torch.int64) |
| targets_ = torch.tensor([], dtype=torch.int64) |
| for i, rou in enumerate(rounds): |
| if rou == "": |
| continue |
| if (not inference) or (i < (len(rounds) - 1)): |
| rou += conv.sep2 |
| if has_image: |
| cur_input_ids_ = self.tokenizer_image_token(rou, tokenizer, return_tensors='pt') |
| input_ids_ = torch.cat([input_ids_, cur_input_ids_], dim=0) |
| if only_mask_system: |
| mask_len = len(self.tokenizer_image_token(re.sub(rf'{conv.roles[0]}\n[\s\S]*', f'{conv.roles[0]}:', rou), |
| tokenizer)) |
| else: |
| mask_len = len(self.tokenizer_image_token(re.sub(rf'{conv.roles[1]}\n[\s\S]*', f'{conv.roles[1]}:', rou), |
| tokenizer)) |
| targets_ = torch.cat([targets_, torch.tensor([-100] * mask_len), cur_input_ids_[mask_len:]], dim=0) |
| else: |
| cur_input_ids_ = tokenizer(rou, return_tensors='pt')["input_ids"][0, :] |
| input_ids_ = torch.cat([input_ids_, cur_input_ids_], dim=0) |
| mask_len = len(tokenizer(re.sub(rf'{conv.roles[1]}\n[\s\S]*', rf'{conv.roles[1]}:', rou))["input_ids"][:]) |
| targets_ = torch.cat([targets_, torch.tensor([-100] * mask_len), cur_input_ids_[mask_len:]], dim=0) |
| |
| return {"input_ids": input_ids_, "labels": targets_} |
|
|
|
|
| def preprocess_ovis2( |
| self, |
| source, |
| tokenizer: PreTrainedTokenizer, |
| has_image: bool = False, |
| inference: bool = False, |
| only_mask_system: bool = False, |
| video_len: int = 0, |
| ): |
| |
| judge_format = "from" in source[0].keys() |
|
|
| if judge_format: |
| if source[-1]["from"] == "gpt": |
| source = source[:-1] |
|
|
| roles = {"human": 'user', "gpt": 'assistant'} |
| input_ids = [] |
| labels = [] |
| messages = "<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n" |
| for message in source: |
| if message["from"] == "human": |
| user = message["value"] |
| if '<image>' not in user and '<video>' not in user: |
| messages += f"<|im_start|>{roles['human']}\n" + user + "<|im_end|>\n" |
|
|
| if '<image>' in user: |
| |
| |
| |
| |
| messages += f"<|im_start|>{roles['human']}\n" + user + "<|im_end|>\n" |
|
|
| if '<video>' in user: |
| user = user.replace('<video>', '\n'.join(['<image>'] * video_len) + '\n') |
| messages += f"<|im_start|>{roles['human']}\n" + user + "<|im_end|>\n" |
| |
|
|
| elif message["from"] == "gpt": |
| assistant = message["value"] |
| messages += f"<|im_start|>{roles['gpt']}\n" + assistant + "<|im_end|>\n" |
| if inference: |
| messages += f"<|im_start|>{roles['gpt']}\n" |
| else: |
| messages = messages[:-1] |
|
|
| messages = messages.split('<image>') |
| messages = [tokenizer.encode(m) for m in messages] |
| for m in messages[:-1]: |
| input_ids += m |
| input_ids += [IMAGE_TOKEN_INDEX] |
| input_ids += messages[-1] |
|
|
| |
| head_id = tokenizer.encode(f'<|im_start|>{roles["gpt"]}\n') |
| last_id = None |
| for i, id in enumerate(input_ids): |
| if input_ids[i:i+len(head_id)] == head_id: |
| last_id = i+len(head_id) |
| if i+len(head_id) > len(input_ids): |
| break |
| |
| assert last_id != None |
| labels = len(input_ids) * [IGNORE_INDEX] |
| labels[last_id:] = input_ids[last_id:] |
| return {"input_ids": torch.tensor(input_ids), "labels": torch.tensor(labels)} |
|
|
| else: |
| if source[-1]["role"] == "assistant": |
| source = source[:-1] |
|
|
| input_ids = [] |
| labels = [] |
| messages = "<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n" |
| for message in source: |
| if message["role"] == "user": |
| user = message["value"] |
| if '<image>' not in user and '<video>' not in user: |
| messages += f"<|im_start|>user\n" + user + "<|im_end|>\n" |
|
|
| if '<image>' in user: |
| |
| |
| |
| |
| messages += f"<|im_start|>user\n" + user + "<|im_end|>\n" |
|
|
| if '<video>' in user: |
| user = user.replace('<video>', '\n'.join(['<image>'] * video_len) + '\n') |
| messages += f"<|im_start|>user\n" + user + "<|im_end|>\n" |
| |
| elif message["role"] == "assistant": |
| assistant = message["value"] |
| messages += f"<|im_start|>assistant\n" + assistant + "<|im_end|>\n" |
| if inference: |
| messages += f"<|im_start|>assistant\n" |
| else: |
| messages = messages[:-1] |
|
|
| messages = messages.split('<image>') |
| messages = [tokenizer.encode(m) for m in messages] |
| for m in messages[:-1]: |
| input_ids += m |
| input_ids += [IMAGE_TOKEN_INDEX] |
| input_ids += messages[-1] |
|
|
| |
| head_id = tokenizer.encode(f'<|im_start|>assistant\n') |
| last_id = None |
| for i, id in enumerate(input_ids): |
| if input_ids[i:i+len(head_id)] == head_id: |
| last_id = i+len(head_id) |
| if i+len(head_id) > len(input_ids): |
| break |
| |
| assert last_id != None |
| labels = len(input_ids) * [IGNORE_INDEX] |
| labels[last_id:] = input_ids[last_id:] |
| return {"input_ids": torch.tensor(input_ids), "labels": torch.tensor(labels)} |
|
|
|
|
| def tokenizer_image_token( |
| self, |
| prompt, |
| tokenizer, |
| image_token_index=IMAGE_TOKEN_INDEX, |
| return_tensors=None, |
| ): |
| def split_with_token(string, token): |
| result = string.split(token) |
| for i in range(len(result) - 1): |
| result.insert(i * 2 + 1, token) |
| return result |
|
|
| if len(prompt) > SEQ_MAX_LEN: |
| raise ValueError("sequence is too long !!!") |
|
|
| prompt_chunks = split_with_token(prompt, DEFAULT_IMAGE_TOKEN) |
| input_ids, offset = ([tokenizer.bos_token_id], 1) if getattr(tokenizer,'bos_token',None) else ([], 0) |
| token2index = {DEFAULT_IMAGE_TOKEN: image_token_index} |
| for chunk in prompt_chunks: |
| if chunk in token2index: |
| input_ids.append(token2index[chunk]) |
| else: |
| chunk_ids = tokenizer(chunk).input_ids |
| if chunk_ids[0] != getattr(tokenizer,'bos_token_id', None): |
| offset = 0 |
| input_ids.extend(chunk_ids[offset:]) |
|
|
| if return_tensors is not None: |
| if return_tensors == "pt": |
| return torch.tensor(input_ids, dtype=torch.long) |
| raise ValueError(f"Unsupported tensor type: {return_tensors}") |
| return input_ids |
|
|
|
|
|
|
| def __call__(self, messages, inference=True, **kwargs) -> BatchFeature: |
| |
| |
| |
| process_mode = self.process_mode |
| if process_mode == "ovis2": |
| video_len = kwargs.get('video_len', 0) |
| |
|
|
| if "images" not in messages or not messages["images"] or not messages["images"][0]: |
| images = [self.black_img] |
| elif type(messages["images"]) == str: |
| images = [messages["images"]] |
| else: |
| images = messages["images"] |
|
|
| conversations = messages["conversations"] |
| |
| |
| if "role" in conversations[0]: |
| new_conversations = [] |
| for conversation in conversations: |
| if conversation["role"] == "system": |
| new_conversations.append({"from": "system", "value": conversation["content"]}) |
| elif conversation["role"] == "user": |
| new_conversations.append({"from": "human", "value": conversation["content"]}) |
| elif conversation["role"] == "assistant": |
| new_conversations.append({"from": "gpt", "value": conversation["content"]}) |
| conversations = new_conversations |
|
|
| |
| first_conv = conversations[1] if conversations[0]["from"] == "system" else conversations[0] |
| if images and "<image>" not in first_conv["value"]: |
| image_token = "\n".join(["<image>"] * len(images)) |
| first_conv["value"] = f"{image_token}\n{first_conv['value']}" |
|
|
| data_dict = self.preprocess_ovis2(conversations, self.tokenizer, has_image=True, only_mask_system=False, inference=inference, video_len=video_len) |
| data_dict['images'], data_dict['image_sizes'] = self.preprocess_images_aimv2(images) |
| data_dict = ovis_template_process(data_dict) |
| |
| data_dict['images'] = [data_dict['images']] |
| data_dict['input_ids'] = data_dict['input_ids'].unsqueeze(0) |
| return BatchFeature(data={**data_dict}) |
| |
| elif process_mode == "qwen2" or process_mode == "qwen3": |
| max_pixels=kwargs.get("max_pixels", self.max_pixels) |
| min_pixels=kwargs.get("min_pixels", self.min_pixels) |
| if max_pixels is not None: |
| self.qwen2vl_image_processor.max_pixels = max_pixels |
| if min_pixels is not None: |
| self.qwen2vl_image_processor.min_pixels = min_pixels |
|
|
| |
| if "images" not in messages or not messages["images"] or not messages["images"][0]: |
| images = [self.black_img] |
| elif type(messages["images"]) == str: |
| images = [messages["images"]] |
| else: |
| images = messages["images"] |
|
|
| |
| conversations = messages["conversations"] |
| if conversations[0]["role"] != "system": |
| conversations = [{"role":"system", "content": None}] + conversations |
| |
| |
| assert conversations[1]["role"] == "user" |
| if images and "<image>" not in conversations[1]["content"]: |
| image_token = " ".join(["<image>"] * len(images)) |
| conversations[1]["content"] = f"{image_token}\n{conversations[1]['content']}" |
| |
| |
| if inference: |
| assert conversations[-1]["role"] == "user", "the last message should be assistant if inference=True" |
| |
| |
| if self.only_navit: |
| precessed_images_siglip = None |
| else: |
| precessed_images_siglip = self.preprocess_images_siglip(images) |
| processed_data_dict_qwen2vl = self.preprocess_images_qwen2vl(images) |
| source = self.preprocess_multimodal(conversations) |
| if process_mode == "qwen2": |
| data_dict = self.preprocess_qwen2(source, self.tokenizer, has_image=True, only_mask_system=False, inference=inference) |
| if process_mode == "qwen3": |
| |
| enable_thinking = kwargs.get("enable_thinking", True) |
| data_dict = self.preprocess_qwen3(source, self.tokenizer, has_image=True, only_mask_system=False, inference=inference, enable_thinking=enable_thinking) |
| |
| data_dict["input_ids"] = data_dict["input_ids"].unsqueeze(0) |
| data_dict["labels"] = data_dict["labels"].unsqueeze(0) |
| data_dict["images"] = [precessed_images_siglip] |
|
|
| return BatchFeature(data={**data_dict, **processed_data_dict_qwen2vl}) |
| else: |
| raise ValueError(f"Unsupported process mode: {process_mode}") |
|
|
| def batch_decode(self, *args, **kwargs): |
| """ |
| This method forwards all its arguments to Qwen2TokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please |
| refer to the docstring of this method for more information. |
| """ |
| return self.tokenizer.batch_decode(*args, **kwargs) |
|
|
|
|
| def decode(self, *args, **kwargs): |
| """ |
| This method forwards all its arguments to Qwen2TokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to |
| the docstring of this method for more information. |
| """ |
| return self.tokenizer.decode(*args, **kwargs) |
|
|
|
|