| import copy |
| import json |
| import math |
| import os |
| import PIL |
| from PIL import Image |
| from typing import Dict, List, Optional, Union |
|
|
| import numpy as np |
| import torch |
| from PIL import Image |
| import re |
| from torchvision.transforms.functional import to_tensor |
| from transformers import ( |
| AutoTokenizer, |
| AutoFeatureExtractor, |
| AutoImageProcessor, |
| AutoVideoProcessor, |
| Qwen2_5_VLProcessor, |
| Qwen2AudioProcessor, |
| WhisperFeatureExtractor, |
| ) |
| from transformers.audio_utils import AudioInput |
| from transformers.image_processing_utils import ( |
| BaseImageProcessor, |
| BatchFeature, |
| get_size_dict, |
| ) |
| from transformers.image_transforms import ( |
| convert_to_rgb, |
| get_resize_output_image_size, |
| resize, |
| to_channel_dimension_format, |
| ) |
| from transformers.image_utils import ( |
| ImageInput, |
| ) |
| from transformers.models.qwen2_5_vl.processing_qwen2_5_vl import ( |
| Qwen2_5_VLProcessorKwargs, |
| ) |
| from transformers.processing_utils import ( |
| ProcessingKwargs, ProcessorMixin, SpecificProcessorType, Unpack, |
| ) |
|
|
| from transformers.tokenization_utils_base import PreTokenizedInput, TextInput |
| from transformers.utils import TensorType, logging |
| from transformers.video_utils import VideoInput |
| from typing_extensions import Unpack |
|
|
| logger = logging.get_logger(__name__) |
|
|
| class HyperCLOVAXOmniProcessorKwargs(ProcessingKwargs, total=False): |
| _defaults = { |
| "audio_kwargs": { |
| "sample_rate": 16_000, |
| "chunk_unit": 80, |
| "min_chunk_size": 1_600, |
| }, |
| "images_kwargs": { |
| }, |
| "videos_kwargs": { |
| }, |
| } |
|
|
| class HyperCLOVAXOmniProcessor(ProcessorMixin): |
| attributes = [ |
| "audio_processor", |
| |
| |
| "image_processor", |
| "video_processor", |
| "tokenizer", |
| ] |
| audio_processor_class = "AutoFeatureExtractor" |
| |
| |
| image_processor_class = "AutoImageProcessor" |
| tokenizer_class = ("PreTrainedTokenizer", "PreTrainedTokenizerFast") |
| video_processor_class = "AutoVideoProcessor" |
|
|
| def __init__( |
| self, |
| audio_processor: Optional[AutoFeatureExtractor] = None, |
| chat_template: Optional[str] = None, |
| image_processor: Optional[AutoImageProcessor] = None, |
| video_processor: Optional[AutoVideoProcessor] = None, |
| tokenizer: AutoTokenizer = None, |
| **kwargs, |
| ): |
| |
| if chat_template is None and hasattr(tokenizer, "chat_template"): |
| chat_template = tokenizer.chat_template |
| |
| ProcessorMixin.__init__( |
| self, |
| audio_processor, |
| image_processor, |
| video_processor, |
| tokenizer, |
| chat_template=chat_template, |
| ) |
|
|
| self.modalities = list() |
| if self.audio_processor is not None: |
| self.audio_placeholder = f'{self.audio_processor.audio_token}' |
| self.discrete_audio_placeholder = f'{self.audio_processor.discrete_audio_token}' |
| |
| |
| self.audio_token = self.audio_processor.audio_token |
| self.audio_token_id = ( |
| tokenizer.audio_token_id |
| if getattr(tokenizer, "audio_token_id", None) |
| else tokenizer.convert_tokens_to_ids(self.audio_processor.audio_token) |
| ) |
| |
| if self.image_processor is not None: |
| self.image_placeholder = f'{self.image_processor.image_token}' |
| self.discrete_image_placeholder = f'{self.image_processor.discrete_image_token}' |
| |
| |
| self.image_token = self.image_processor.image_token |
| self.image_token_id = ( |
| tokenizer.image_token_id |
| if getattr(tokenizer, "image_token_id", None) |
| else tokenizer.convert_tokens_to_ids(self.image_processor.image_token) |
| ) |
| self.discrete_image_ratio_tokens = { |
| tuple(_discrete_image_ratio): f'<|vision_ratio_{_discrete_image_ratio[0]}:{_discrete_image_ratio[1]}|>' |
| for _discrete_image_ratio in self.image_processor.discrete_image_ratios |
| } |
| self.modalities.append("image") |
| if self.video_processor is not None: |
| self.video_placeholder = f'{self.video_processor.video_token}' |
| |
| self.video_token = self.video_processor.video_token |
| self.video_token_id = ( |
| tokenizer.video_token_id |
| if getattr(tokenizer, "video_token_id", None) |
| else tokenizer.convert_tokens_to_ids(self.video_processor.video_token) |
| ) |
| self.modalities.append("video") |
| |
| @classmethod |
| def from_pretrained( |
| cls: type[SpecificProcessorType], |
| pretrained_model_name_or_path: Union[str, os.PathLike], |
| **kwargs, |
| ): |
| audio_processer_kwargs = kwargs.pop("audio_processor_kwargs", dict()) |
| iamge_processer_kwargs = kwargs.pop("image_processor_kwargs", dict()) |
| video_processer_kwargs = kwargs.pop("video_processor_kwargs", dict()) |
| |
| if "tokenizer" not in kwargs: |
| kwargs["tokenizer"] = AutoTokenizer.from_pretrained( |
| pretrained_model_name_or_path, |
| **kwargs, |
| ) |
| |
| audio_processor = None |
| try: |
| audio_processor = AutoFeatureExtractor.from_pretrained( |
| pretrained_model_name_or_path, |
| subfolder="audio", |
| **audio_processer_kwargs, |
| **kwargs, |
| ) |
| except Exception as ex: |
| pass |
| |
| image_processor = None |
| try: |
| image_processor = AutoImageProcessor.from_pretrained( |
| pretrained_model_name_or_path, |
| subfolder="image", |
| **iamge_processer_kwargs, |
| **kwargs, |
| ) |
| except Exception as ex: |
| pass |
| |
| video_processor = None |
| try: |
| video_processor = AutoVideoProcessor.from_pretrained( |
| pretrained_model_name_or_path, |
| subfolder="video", |
| **video_processer_kwargs, |
| **kwargs, |
| ) |
| except Exception as ex: |
| pass |
| |
| return super().from_pretrained( |
| pretrained_model_name_or_path=pretrained_model_name_or_path, |
| audio_processor=audio_processor, |
| image_processor=image_processor, |
| video_processor=video_processor, |
| **kwargs, |
| ) |
|
|
| def save_pretrained( |
| self, |
| save_directory: Union[str, os.PathLike], |
| *args, |
| **kwargs, |
| ): |
| original_attributes = list(self.__class__.attributes) |
| try: |
| audio_processor = getattr(self, "audio_processor", None) |
| if audio_processor is None and "audio_processor" in self.__class__.attributes: |
| self.__class__.attributes = [a for a in self.__class__.attributes if a != "audio_processor"] |
|
|
| |
| |
| |
| try: |
| tok = getattr(self, "tokenizer", None) |
| ct = getattr(tok, "chat_template", None) if tok is not None else None |
| if isinstance(ct, str) and ct: |
| self.chat_template = ct |
| except Exception: |
| pass |
|
|
| self.register_for_auto_class() |
| super().save_pretrained(save_directory, *args, **kwargs) |
| finally: |
| self.__class__.attributes = original_attributes |
|
|
| |
| |
| try: |
| chat_template = getattr(self, "chat_template", None) |
| tokenizer_cfg_path = os.path.join(save_directory, "tokenizer_config.json") |
| if isinstance(chat_template, str) and chat_template and os.path.exists(tokenizer_cfg_path): |
| with open(tokenizer_cfg_path, "r", encoding="utf-8") as f: |
| tokenizer_cfg = json.load(f) |
| tokenizer_cfg["chat_template"] = chat_template |
|
|
| |
| |
| extra_map = getattr(getattr(self, "tokenizer", None), "extra_special_tokens", None) |
| if not isinstance(extra_map, dict): |
| extra_map = {} |
| |
| extra_map.setdefault("image_token", "<|IMAGE_PAD|>") |
| extra_map.setdefault("video_token", "<|VIDEO_PAD|>") |
| tokenizer_cfg["extra_special_tokens"] = extra_map |
|
|
| with open(tokenizer_cfg_path, "w", encoding="utf-8") as f: |
| json.dump(tokenizer_cfg, f, ensure_ascii=False, indent=2) |
| except Exception: |
| |
| pass |
|
|
| audio_config_path = os.path.join(save_directory, "audio_preprocessor_config.json") |
| if getattr(self, "audio_processor", None) is not None: |
| with open(audio_config_path, "w", encoding="utf-8") as f: |
| json.dump(self.audio_processor.to_dict(), f, ensure_ascii=False, indent=2) |
| elif os.path.exists(audio_config_path): |
| os.remove(audio_config_path) |
|
|
| def __call__( |
| self, |
| text: Union[TextInput, PreTokenizedInput, list[TextInput], list[PreTokenizedInput]] = None, |
| audios: AudioInput | None = None, |
| images: ImageInput | None = None, |
| videos: VideoInput | None = None, |
| **kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs], |
| ) -> BatchFeature: |
| """ |
| Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text` |
| and `kwargs` arguments to Qwen2TokenizerFast's [`~Qwen2TokenizerFast.__call__`] if `text` is not `None` to encode |
| the text. To prepare the vision inputs, this method forwards the `vision_infos` and `kwrags` arguments to |
| Qwen2VLImageProcessor's [`~Qwen2VLImageProcessor.__call__`] if `vision_infos` is not `None`. |
| |
| Args: |
| images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `list[PIL.Image.Image]`, `list[np.ndarray]`, `list[torch.Tensor]`): |
| The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch |
| tensor. Both channels-first and channels-last formats are supported. |
| text (`str`, `list[str]`, `list[list[str]]`): |
| The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings |
| (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set |
| `is_split_into_words=True` (to lift the ambiguity with a batch of sequences). |
| videos (`np.ndarray`, `torch.Tensor`, `list[np.ndarray]`, `list[torch.Tensor]`): |
| The image or batch of videos to be prepared. Each video can be a 4D NumPy array or PyTorch |
| tensor, or a nested list of 3D frames. Both channels-first and channels-last formats are supported. |
| return_tensors (`str` or [`~utils.TensorType`], *optional*): |
| If set, will return tensors of a particular framework. Acceptable values are: |
| - `'tf'`: Return TensorFlow `tf.constant` objects. |
| - `'pt'`: Return PyTorch `torch.Tensor` objects. |
| - `'np'`: Return NumPy `np.ndarray` objects. |
| - `'jax'`: Return JAX `jnp.ndarray` objects. |
| |
| Returns: |
| [`BatchFeature`]: A [`BatchFeature`] with the following fields: |
| |
| - **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`. |
| - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when |
| `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not |
| `None`). |
| - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`. |
| - **pixel_values_videos** -- Pixel values of videos to be fed to a model. Returned when `videos` is not `None`. |
| - **image_grid_thw** -- List of image 3D grid in LLM. Returned when `images` is not `None`. |
| - **video_grid_thw** -- List of video 3D grid in LLM. Returned when `videos` is not `None`. |
| """ |
| output_kwargs = self._merge_kwargs( |
| HyperCLOVAXOmniProcessorKwargs, |
| tokenizer_init_kwargs=self.tokenizer.init_kwargs, |
| **kwargs, |
| ) |
| |
| |
| if text is None: |
| pass |
| else: |
| if isinstance(text, str): |
| text = [text, ] |
| |
| text = copy.deepcopy(text) |
|
|
| |
| audio_inputs = dict() |
| discrete_audio_inputs = dict() |
| if ( |
| audios is not None |
| and self.audio_processor is not None |
| ): |
| if ( |
| len(audios) > 0 |
| and isinstance(audios[0], np.ndarray) |
| ): |
| audios = [audios, ] |
| |
| |
| audio_inputs = self._process_continuous_audio( |
| audios=audios, |
| ) |
| |
| discrete_audio_inputs = self._process_discrete_audio( |
| audios=audios, |
| **output_kwargs["audio_kwargs"], |
| ) |
|
|
| |
| image_inputs, image_grid_thw = dict(), list() |
| discrete_image_inputs, discrete_image_ratios = dict(), list() |
| if ( |
| images is not None |
| and self.image_processor is not None |
| ): |
| if ( |
| len(images) > 0 |
| and isinstance(images[0], PIL.Image.Image) |
| ): |
| images = [images, ] |
| |
| |
| image_inputs = dict() |
| for _images in images: |
| _image_inputs = self.image_processor( |
| images=_images, |
| **output_kwargs["images_kwargs"], |
| ) |
| _image_grid_thw = _image_inputs["image_grid_thw"] |
| for _k, _v in _image_inputs.items(): |
| if _k not in image_inputs: |
| image_inputs[_k] = list() |
| image_inputs[_k].append(_v) |
| image_grid_thw.append(_image_grid_thw) |
| for _k, _v in image_inputs.items(): |
| if isinstance(_v[0], torch.Tensor): |
| image_inputs[_k] = torch.stack(_v, dim=0) |
| |
| discrete_image_inputs = self._process_discrete_images( |
| images=images, |
| ) |
| discrete_image_ratios = discrete_image_inputs["discrete_image_ratios"] |
|
|
| |
| video_inputs, video_grid_thw = dict(), list() |
| if ( |
| videos is not None |
| and self.video_processor is not None |
| ): |
| if ( |
| len(videos) > 0 |
| and isinstance(videos[0], np.ndarray) |
| ): |
| videos = [videos, ] |
| |
| |
| video_inputs = dict() |
| video_grid_thw = list() |
| for _videos in videos: |
| _video_inputs = self.video_processor( |
| videos=_videos, |
| **output_kwargs["videos_kwargs"], |
| ) |
| _video_grid_thw = _video_inputs["video_grid_thw"] |
| for _k, _v in _video_inputs.items(): |
| if _k not in video_inputs: |
| video_inputs[_k] = list() |
| video_inputs[_k].append(_v) |
| video_grid_thw.append(_video_grid_thw) |
| video_inputs = { |
| _k: torch.stack(_v, dim=0) |
| if isinstance(_v[0], torch.Tensor) else _v |
| for _k, _v in video_inputs.items() |
| } |
|
|
| |
| if ( |
| text is not None |
| and audio_inputs |
| ): |
| for _sample_idx, (_text_before, _audio_query_lengths, _discrete_audio_query_lengths) in enumerate(zip( |
| text, audio_inputs["audio_query_lengths"], discrete_audio_inputs["discrete_audio_query_lengths"], |
| )): |
| _find_iters = list(re.finditer(re.escape(self.audio_placeholder), _text_before, re.DOTALL)) |
| if len(_find_iters) > 0: |
| _text_after = "" |
| _prev_end_idx = 0 |
| for _idx, _continuous_audio_match in enumerate(_find_iters): |
| _cur_start_idx = _continuous_audio_match.start() |
| _inplace_str = self.get_audio_token_replacement( |
| audio_query_length=_audio_query_lengths[_idx], |
| include_boundary_tokens=True, |
| tokenize=False, |
| ) |
|
|
| _discrete_audio_match = re.search(re.escape(self.discrete_audio_placeholder), _text_before[_prev_end_idx:_continuous_audio_match.start()]) |
| if _discrete_audio_match: |
| _cur_start_idx = _discrete_audio_match.start() |
| _discrete_inplace_str = self.get_discrete_audio_token_replacement( |
| discrete_audio_query_length=_discrete_audio_query_lengths[_idx], |
| include_boundary_tokens=True, |
| tokenize=False, |
| ) |
| _inplace_str = f'{_discrete_inplace_str}{_inplace_str}' |
| |
| _text_after += _text_before[_prev_end_idx:_cur_start_idx] |
| _text_after += _inplace_str |
| _prev_end_idx = _continuous_audio_match.end() |
| _text_after += _text_before[_prev_end_idx:] |
| text[_sample_idx] = _text_after |
|
|
| |
| if ( |
| text is not None |
| and image_inputs |
| ): |
| for _sample_idx, (_text_before, _image_grid_thw, _discrete_image_ratios) in enumerate(zip( |
| text, image_inputs["image_grid_thw"], discrete_image_inputs["discrete_image_ratios"], |
| )): |
| _find_iters = list(re.finditer(re.escape(self.image_placeholder), _text_before, re.DOTALL)) |
| if len(_find_iters) > 0: |
| _text_after = "" |
| _prev_end_idx = 0 |
| for _idx, _continuous_image_match in enumerate(_find_iters): |
| _cur_start_idx = _continuous_image_match.start() |
| _inplace_str = self.get_image_token_replacement( |
| image_grid_thw=_image_grid_thw[_idx], |
| include_boundary_tokens=True, |
| tokenize=False, |
| ) |
|
|
| _discrete_image_match = re.search(re.escape(self.discrete_image_placeholder), _text_before[_prev_end_idx:_continuous_image_match.start()]) |
| if _discrete_image_match: |
| _cur_start_idx = _discrete_image_match.start() |
| _discrete_inplace_str = self.get_discrete_image_token_replacement( |
| discrete_image_ratio=_discrete_image_ratios[_idx], |
| include_boundary_tokens=True, |
| tokenize=False, |
| ) |
| _inplace_str = f'{_discrete_inplace_str}{_inplace_str}' |
| |
| _text_after += _text_before[_prev_end_idx:_cur_start_idx] |
| _text_after += _inplace_str |
| _prev_end_idx = _continuous_image_match.end() |
| _text_after += _text_before[_prev_end_idx:] |
| text[_sample_idx] = _text_after |
|
|
| |
| if ( |
| text is not None |
| and video_inputs |
| ): |
| for _sample_idx, (_text_before, _video_grid_thw) in enumerate(zip( |
| text, video_inputs["video_grid_thw"] |
| )): |
| _find_iters = list(re.finditer(re.escape(self.video_placeholder), _text_before, re.DOTALL)) |
| if len(_find_iters) > 0: |
| _text_after = "" |
| _prev_end_idx = 0 |
| for _idx, _continuous_video_match in enumerate(_find_iters): |
| _cur_start_idx = _continuous_video_match.start() |
| _inplace_str = self.get_video_token_replacement( |
| video_grid_thw=_video_grid_thw[_idx], |
| include_boundary_tokens=True, |
| tokenize=False, |
| ) |
| _text_after += _text_before[_prev_end_idx:_cur_start_idx] |
| _text_after += _inplace_str |
| _prev_end_idx = _continuous_video_match.end() |
| _text_after += _text_before[_prev_end_idx:] |
| text[_sample_idx] = _text_after |
|
|
| return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None) |
| return_mm_token_type_ids = output_kwargs["text_kwargs"].pop("return_mm_token_type_ids", False) |
| text_inputs = dict() |
| if text is not None: |
| text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"], return_tensors=None) |
| self._check_special_mm_tokens( |
| text, |
| text_inputs, |
| modalities=self.modalities, |
| ) |
|
|
| if return_mm_token_type_ids: |
| array_ids = np.array(text_inputs["input_ids"]) |
| mm_token_type_ids = np.zeros_like(text_inputs["input_ids"]) |
| mm_token_type_ids[array_ids == self.image_processor.image_token_id] = 1 |
| if text_inputs: |
| text_inputs["mm_token_type_ids"] = mm_token_type_ids.tolist() |
|
|
| data = { |
| **text_inputs, |
| **image_inputs, |
| **video_inputs, |
| **discrete_image_inputs, |
| **audio_inputs, |
| **discrete_audio_inputs, |
| } |
| _tensorable_data, _untensorable_data = dict(), dict() |
| for _k, _v in data.items(): |
| if _k in [ |
| "discrete_image_ratios", |
| ]: |
| _untensorable_data[_k] = _v |
| else: |
| _tensorable_data[_k] = _v |
| model_inputs = BatchFeature(data=_tensorable_data, tensor_type=return_tensors) |
| model_inputs.update(_untensorable_data) |
| return model_inputs |
|
|
| def _process_continuous_audio( |
| self, |
| audios: Union[List[np.ndarray], List[List[np.ndarray]]], |
| sample_rate: int = 16_000, |
| chunk_unit: int = 80, |
| min_chunk_size: int = 1_600, |
| return_tensors: Optional[bool] = None, |
| ): |
| """Continuous Audio Preprocessing""" |
| if ( |
| len(audios) > 0 |
| and isinstance(audios[0], np.ndarray) |
| ): |
| audios = [audios, ] |
| |
| audio_values, audio_masks, audio_query_lengths = list(), list(), list() |
| for _audios in audios: |
| _audio_values, _audio_masks, _audio_query_lengths = list(), list(), list() |
| if len(_audios) == 0: |
| _audio_values = torch.zeros(0, 128, 3000) |
| _audio_masks = torch.zeros(0, 3000) |
| _audio_query_lengths = [0, ] |
| |
| else: |
| for _audio in _audios: |
| chunks = [] |
| for i in range(0, len(_audio), 30 * self.audio_processor.sampling_rate): |
| chunks.append(_audio[i : i + 30 * self.audio_processor.sampling_rate]) |
| num_of_chunks = len(chunks) |
| preprocess_results = self.audio_processor( |
| chunks, |
| sampling_rate=self.audio_processor.sampling_rate, |
| return_attention_mask=True, |
| padding="max_length" |
| ) |
|
|
| _audio_value = preprocess_results.input_features |
| _audio_mask = preprocess_results.attention_mask |
| if isinstance(_audio_value, list): |
| _audio_value = np.array(_audio_value) |
| if isinstance(_audio_mask, list): |
| _audio_mask = np.array(_audio_mask) |
|
|
| input_lengths = int(_audio_mask.sum()) |
| input_lengths = (input_lengths - 1) // 2 + 1 |
| output_lengths = (input_lengths - 2) // 2 + 1 |
|
|
| _audio_values.append(torch.Tensor(_audio_value)) |
| _audio_masks.append(torch.Tensor(_audio_mask)) |
| _audio_query_lengths.append(output_lengths) |
|
|
| _audio_values = torch.cat(_audio_values, dim=0) |
| _audio_masks = torch.cat(_audio_masks, dim=0) |
| _audio_query_lengths = torch.tensor(_audio_query_lengths) |
| audio_values.append(_audio_values) |
| audio_masks.append(_audio_masks) |
| audio_query_lengths.append(_audio_query_lengths) |
|
|
| audio_values = torch.stack(audio_values, dim=0) |
| audio_masks = torch.stack(audio_masks, dim=0) |
| audio_query_lengths = torch.stack(audio_query_lengths, dim=0) |
| return { |
| "audio_values": audio_values, |
| "audio_masks": audio_masks, |
| "audio_query_lengths": audio_query_lengths, |
| } |
|
|
| def _process_discrete_audio( |
| self, |
| audios: Union[List[np.ndarray], List[List[np.ndarray]]], |
| sample_rate: int = 16_000, |
| chunk_unit: int = 80, |
| min_chunk_size: int = 1_600, |
| return_tensors: Optional[bool] = None, |
| ): |
| """Discrete Audio Preprocessing""" |
| if ( |
| len(audios) > 0 |
| and isinstance(audios[0], np.ndarray) |
| ): |
| audios = [audios, ] |
| |
| discrete_audio_values, discrete_audio_query_lengths = list(), list() |
| for _audios in audios: |
| _discrete_audio_values, _discrete_audio_query_lengths = list(), list() |
| for _audio in _audios: |
| audio_length = len(_audio) |
| max_audio_length = 600 * sample_rate |
| audio_duration_sec = audio_length / sample_rate |
| |
| if audio_length < min_chunk_size: |
| raise ValueError(f"Discrete audio too short: {audio_length}") |
| if np.isnan(_audio).any() or np.isinf(_audio).any(): |
| raise ValueError("Discrete audio contains NaN/Inf") |
| if audio_length > max_audio_length: |
| raise ValueError(f"Discrete audio too long: {audio_length} samples = ({audio_duration_sec:.2f}s > 600s)") |
| |
| audio_min, audio_max = _audio.min().item(), _audio.max().item() |
| if audio_min < -100.0 or audio_max > 100.0: |
| raise ValueError(f"Discrete audio values out of range: min {audio_min}, max {audio_max}") |
|
|
| _audio_query_length = None |
| if audio_length > chunk_unit * sample_rate: |
| total_code_len = 0 |
| chunk_size = chunk_unit * sample_rate |
| for start in range(0, audio_length, chunk_size): |
| end = min(start + chunk_size, audio_length) |
| if end < audio_length and audio_length - end < min_chunk_size: |
| end = audio_length |
| chunk_len = end - start |
| mel_len = chunk_len // 160 |
| after_conv1 = (mel_len + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1 |
| code_len = (after_conv1 + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1 |
| total_code_len += code_len |
| if end >= audio_length: |
| break |
| _audio_query_length = total_code_len |
| |
| else: |
| mel_len = audio_length // 160 |
| after_conv1 = (mel_len + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1 |
| code_len = (after_conv1 + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1 |
| _audio_query_length = code_len |
|
|
| _discrete_audio_values.append(torch.tensor(_audio)) |
| _discrete_audio_query_lengths.append(_audio_query_length) |
| |
| _discrete_audio_values = _discrete_audio_values = torch.stack(_discrete_audio_values, dim=0) |
| _discrete_audio_query_lengths = torch.tensor(_discrete_audio_query_lengths) |
| discrete_audio_values.append(_discrete_audio_values) |
| discrete_audio_query_lengths.append(_discrete_audio_query_lengths) |
|
|
| discrete_audio_values = torch.stack(discrete_audio_values, dim=0) |
| discrete_audio_query_lengths = torch.stack(discrete_audio_query_lengths, dim=0) |
| return { |
| "discrete_audio_values": discrete_audio_values, |
| "discrete_audio_query_lengths": discrete_audio_query_lengths, |
| } |
|
|
| def _process_discrete_images( |
| self, |
| images: Union[List[PIL.Image.Image], List[List[PIL.Image.Image]]], |
| return_tensors: Optional[bool] = None, |
| ): |
| """Discrete Image Preprocessing""" |
| if ( |
| len(images) > 0 |
| and isinstance(images[0], PIL.Image.Image) |
| ): |
| images = [images, ] |
|
|
| discrete_pixel_values, image_ratios = list(), list() |
| for _images in images: |
| _discrete_pixel_values, _image_ratios = list(), list() |
| for _image in _images: |
| w, h = _image.size |
| _img_ratio = self._find_best_ratio_token([h, w]) |
| _discrete_pixel_value = _image.resize((384, 384), Image.BICUBIC) |
| _discrete_pixel_tensor = to_tensor(_discrete_pixel_value) |
| _discrete_pixel_tensor = _discrete_pixel_tensor.squeeze(dim=0) |
| _discrete_pixel_values.append(_discrete_pixel_tensor) |
| _img_ratio = torch.tensor(_img_ratio) |
| _image_ratios.append(_img_ratio) |
| _discrete_pixel_values = torch.stack(_discrete_pixel_values, dim=0) |
| _image_ratios = torch.stack(_image_ratios, dim=0) |
| discrete_pixel_values.append(_discrete_pixel_values) |
| image_ratios.append(_image_ratios) |
|
|
| discrete_pixel_values = torch.stack(discrete_pixel_values, dim=0) |
| image_ratios = torch.stack(image_ratios, dim=0) |
| return { |
| "discrete_pixel_values": discrete_pixel_values, |
| "discrete_image_ratios": image_ratios, |
| } |
|
|
| def _find_best_ratio_token( |
| self, |
| original_size: List[int], |
| ): |
| """Find the best ratio token based on original_size""" |
| base_ratios = list(self.discrete_image_ratio_tokens.keys()) |
| vision_aspect_ratios = [r for ratio in base_ratios for r in [ratio, ratio[::-1]]][1:] |
|
|
| if not isinstance(original_size, list) or len(original_size) != 2: |
| return self.discrete_image_ratio_tokens[(1, 1)] |
|
|
| h, w = original_size |
| if h == 0 or w == 0: |
| return self.discrete_image_ratio_tokens[(1, 1)] |
|
|
| ratios = [i / j for i, j in vision_aspect_ratios] |
| best_size_idx = np.argmin([abs(w / h - r) for r in ratios]) |
| i, j = vision_aspect_ratios[best_size_idx] |
| |
| return (i, j) |
| |
| def get_num_audio_tokens( |
| self, |
| audio_masks: torch.Tensor, |
| **kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs], |
| ) -> int: |
| kwargs = self._merge_kwargs( |
| HyperCLOVAXOmniProcessorKwargs, |
| tokenizer_init_kwargs=self.tokenizer.init_kwargs, |
| **kwargs, |
| ) |
| |
| def _compute_num_audio_tokens(audio_mask: torch.Tensor,): |
| """ |
| audio_mask: shape (N, ) |
| """ |
| input_length = (int(audio_mask.sum()) - 1) // 2 + 1 |
| num_audio_tokens = (input_length - 2) // 2 + 1 |
| return num_audio_tokens |
| |
| if len(audio_masks.shape) == 1: |
| num_audio_tokens = _compute_num_audio_tokens(audio_mask=audio_masks) |
| else: |
| num_audio_tokens = sum([ |
| _compute_num_audio_tokens(audio_mask=_audio_mask) |
| for _audio_mask in audio_masks |
| ]) |
| |
| return num_audio_tokens |
| |
| def get_num_discrete_audio_tokens( |
| self, |
| discrete_audio_values: Optional[torch.Tensor] = None, |
| **kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs], |
| ) -> int: |
| kwargs = self._merge_kwargs( |
| HyperCLOVAXOmniProcessorKwargs, |
| tokenizer_init_kwargs=self.tokenizer.init_kwargs, |
| **kwargs, |
| ) |
| |
| audio_length = len(discrete_audio_values) |
| num_audio_tokens = 0 |
| chunk_size = kwargs["audio_kwargs"].get("chunk_unit", 80) * kwargs["audio_kwargs"].get("sample_rate", 16_000) |
| for _start in range(0, audio_length, chunk_size): |
| _end = min(_start + chunk_size, audio_length) |
| _chunked_length = _end - _start |
| _num_mel_frames = _chunked_length // 160 |
| _num_mel_frames_conv1 = (_num_mel_frames + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1 |
| _num_audio_tokens = (_num_mel_frames_conv1 + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1 |
| num_audio_tokens += _num_audio_tokens |
| |
| return num_audio_tokens |
| |
| def get_num_image_tokens( |
| self, |
| image_width: Optional[int] = None, |
| image_height: Optional[int] = None, |
| pixel_values: Optional[torch.Tensor] = None, |
| **kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs], |
| ) -> int: |
| kwargs = self._merge_kwargs( |
| HyperCLOVAXOmniProcessorKwargs, |
| tokenizer_init_kwargs=self.tokenizer.init_kwargs, |
| **kwargs, |
| ) |
| |
| image_processor_merge_size = 2 |
| if self.image_processor is not None: |
| image_processor_merge_size = getattr(self.image_processor, "merge_size", 2) |
|
|
| num_image_tokens = None |
| if pixel_values is None: |
| images_kwargs = Qwen2_5_VLProcessorKwargs._defaults.get("images_kwargs", {}) |
| images_kwargs.update(kwargs["images_kwargs"]) |
| num_image_patches = self.image_processor.get_number_of_image_patches( |
| image_height, image_width, images_kwargs, |
| ) |
| num_image_tokens = num_image_patches // (image_processor_merge_size ** 2) |
| elif len(pixel_values.shape) == 2: |
| num_image_tokens = pixel_values.shape[0] // (image_processor_merge_size ** 2) |
| else: |
| num_image_tokens = sum([ |
| _pixel_values.shape[0] // (image_processor_merge_size ** 2) |
| for _pixel_values in pixel_values |
| ]) |
| |
| return num_image_tokens |
| |
| def get_num_discrete_image_tokens( |
| self, |
| **kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs], |
| ) -> int: |
| kwargs = self._merge_kwargs( |
| HyperCLOVAXOmniProcessorKwargs, |
| tokenizer_init_kwargs=self.tokenizer.init_kwargs, |
| **kwargs, |
| ) |
| discrete_token_size = self.image_processor.discrete_token_size |
| num_image_tokens = discrete_token_size ** 2 + discrete_token_size |
| |
| return num_image_tokens |
| |
| def get_num_video_tokens( |
| self, |
| image_width: Optional[int] = None, |
| image_height: Optional[int] = None, |
| num_frames: Optional[int] = None, |
| pixel_values_videos: Optional[torch.Tensor] = None, |
| **kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs], |
| ) -> int: |
| kwargs = self._merge_kwargs( |
| HyperCLOVAXOmniProcessorKwargs, |
| tokenizer_init_kwargs=self.tokenizer.init_kwargs, |
| **kwargs, |
| ) |
| |
| video_processor_merge_size = 2 |
| if self.video_processor is not None: |
| video_processor_merge_size = getattr(self.video_processor, "merge_size", 2) |
| |
| if not pixel_values_videos: |
| videos_kwargs = Qwen2_5_VLProcessorKwargs._defaults.get("videos_kwargs", {}) |
| videos_kwargs.update(kwargs["videos_kwargs"]) |
| num_video_patches = self.video_processor.get_num_of_video_patches( |
| num_frames, image_height, image_width, videos_kwargs, |
| ) |
| num_video_tokens = num_video_patches // (video_processor_merge_size ** 2) |
| elif len(pixel_values_videos.shape) == 2: |
| num_video_tokens = pixel_values_videos.shape[0] // (video_processor_merge_size ** 2) |
| else: |
| num_video_tokens = sum([ |
| _pixel_values_videos.shape[0] // (video_processor_merge_size ** 2) |
| for _pixel_values_videos in pixel_values_videos |
| ]) |
| |
| return num_video_tokens |
| |
| def get_audio_token_replacement( |
| self, |
| audio_query_length: int, |
| include_boundary_tokens: Optional[bool] = True, |
| tokenize: Optional[bool] = False, |
| ): |
| replacement = self.audio_processor.audio_token * int(audio_query_length) |
| if include_boundary_tokens: |
| replacement = f'{self.audio_processor.audio_start_token}{replacement}{self.audio_processor.audio_end_token}' |
| if tokenize: |
| replacement = self.tokenizer.encode(replacement) |
| return replacement |
| |
| def get_discrete_audio_token_replacement( |
| self, |
| discrete_audio_query_length: Optional[int] = None, |
| include_boundary_tokens: Optional[bool] = True, |
| tokenize: Optional[bool] = False, |
| ): |
| replacement = self.audio_processor.discrete_audio_token * int(discrete_audio_query_length) |
| if include_boundary_tokens: |
| replacement = f'{self.audio_processor.discrete_audio_start_token}{replacement}{self.audio_processor.discrete_audio_end_token}' |
| if tokenize: |
| replacement = self.tokenizer.encode(replacement) |
| return replacement |
| |
| def get_image_token_replacement( |
| self, |
| image_grid_thw: List[int], |
| include_boundary_tokens: Optional[bool] = True, |
| tokenize: Optional[bool] = False, |
| ): |
| merge_length = self.image_processor.merge_size ** 2 |
| discrete_token_size = self.image_processor.discrete_token_size |
| _num_image_tokens = image_grid_thw.prod() // merge_length |
| replacement = self.image_processor.image_token * int(_num_image_tokens) |
| if include_boundary_tokens: |
| replacement = f'{self.image_processor.image_start_token}{replacement}{self.image_processor.image_end_token}' |
| if tokenize: |
| replacement = self.tokenizer.encode(replacement) |
| return replacement |
| |
| def get_discrete_image_token_replacement( |
| self, |
| discrete_image_ratio: Optional[List[int]] = None, |
| include_boundary_tokens: Optional[bool] = True, |
| tokenize: Optional[bool] = False, |
| ): |
| discrete_token_size = self.image_processor.discrete_token_size |
| _row_str = f'{(self.image_processor.discrete_image_token * discrete_token_size)}{self.image_processor.vision_eol_token}' |
| _discrete_image_ratio_token = self.discrete_image_ratio_tokens[(discrete_image_ratio[0], discrete_image_ratio[0])] |
| replacement = f'{_discrete_image_ratio_token}{(_row_str * discrete_token_size)}' |
| if include_boundary_tokens: |
| replacement = f'{self.image_processor.discrete_image_start_token}{replacement}{self.image_processor.discrete_image_end_token}' |
| if tokenize: |
| replacement = self.tokenizer.encode(replacement) |
| return replacement |
| |
| def get_video_token_replacement( |
| self, |
| video_grid_thw: List[int], |
| include_boundary_tokens: Optional[bool] = True, |
| tokenize: Optional[bool] = False, |
| ): |
| merge_length = self.video_processor.merge_size ** 2 |
| _num_video_tokens = video_grid_thw.prod() // merge_length |
| replacement = self.video_processor.video_token * int(_num_video_tokens) |
| if include_boundary_tokens: |
| replacement = f'{self.video_processor.video_start_token}{replacement}{self.video_processor.video_end_token}' |
| if tokenize: |
| replacement = self.tokenizer.encode(replacement) |
| return replacement |