# coding=utf-8 # Copyright 2021 The OpenAI Team Authors and The HuggingFace Team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ TF 2.0 CLIP model.""" from __future__ import annotations import math from dataclasses import dataclass from typing import Any, Optional, Tuple, Union import numpy as np import tensorflow as tf from ...activations_tf import get_tf_activation from ...modeling_tf_outputs import TFBaseModelOutput, TFBaseModelOutputWithPooling # Public API from ...modeling_tf_utils import ( TFModelInputType, TFPreTrainedModel, get_initializer, keras_serializable, unpack_inputs, ) from ...tf_utils import check_embeddings_within_bounds, shape_list, stable_softmax from ...utils import ( ModelOutput, add_start_docstrings, add_start_docstrings_to_model_forward, logging, replace_return_docstrings, ) from .configuration_clip import CLIPConfig, CLIPTextConfig, CLIPVisionConfig logger = logging.get_logger(__name__) _CHECKPOINT_FOR_DOC = "openai/clip-vit-base-patch32" TF_CLIP_PRETRAINED_MODEL_ARCHIVE_LIST = [ "openai/clip-vit-base-patch32", # See all CLIP models at https://huggingface.co/models?filter=clip ] LARGE_NEGATIVE = -1e8 # Copied from transformers.models.bart.modeling_tf_bart._expand_mask def _expand_mask(mask: tf.Tensor, tgt_len: Optional[int] = None): """ Expands attention_mask from `[bsz, seq_len]` to `[bsz, 1, tgt_seq_len, src_seq_len]`. """ src_len = shape_list(mask)[1] tgt_len = tgt_len if tgt_len is not None else src_len one_cst = tf.constant(1.0) mask = tf.cast(mask, dtype=one_cst.dtype) expanded_mask = tf.tile(mask[:, None, None, :], (1, 1, tgt_len, 1)) return (one_cst - expanded_mask) * LARGE_NEGATIVE # contrastive loss function, adapted from # https://sachinruk.github.io/blog/pytorch/pytorch%20lightning/loss%20function/gpu/2021/03/07/CLIP.html def contrastive_loss(logits: tf.Tensor) -> tf.Tensor: return tf.math.reduce_mean( tf.keras.metrics.sparse_categorical_crossentropy( y_true=tf.range(shape_list(logits)[0]), y_pred=logits, from_logits=True ) ) def clip_loss(similarity: tf.Tensor) -> tf.Tensor: caption_loss = contrastive_loss(similarity) image_loss = contrastive_loss(tf.transpose(similarity)) return (caption_loss + image_loss) / 2.0 @dataclass class TFCLIPOutput(ModelOutput): """ Args: loss (`tf.Tensor` of shape `(1,)`, *optional*, returned when `return_loss` is `True`): Contrastive loss for image-text similarity. logits_per_image:(`tf.Tensor` of shape `(image_batch_size, text_batch_size)`): The scaled dot product scores between `image_embeds` and `text_embeds`. This represents the image-text similarity scores. logits_per_text:(`tf.Tensor` of shape `(text_batch_size, image_batch_size)`): The scaled dot product scores between `text_embeds` and `image_embeds`. This represents the text-image similarity scores. text_embeds(`tf.Tensor` of shape `(batch_size, output_dim`): The text embeddings obtained by applying the projection layer to the pooled output of [`TFCLIPTextModel`]. image_embeds(`tf.Tensor` of shape `(batch_size, output_dim`): The image embeddings obtained by applying the projection layer to the pooled output of [`TFCLIPVisionModel`]. text_model_output([`~modeling_tf_utils.TFBaseModelOutputWithPooling`]): The output of the [`TFCLIPTextModel`]. vision_model_output([`~modeling_tf_utils.TFBaseModelOutputWithPooling`]): The output of the [`TFCLIPVisionModel`]. """ loss: tf.Tensor | None = None logits_per_image: tf.Tensor = None logits_per_text: tf.Tensor = None text_embeds: tf.Tensor = None image_embeds: tf.Tensor = None text_model_output: TFBaseModelOutputWithPooling = None vision_model_output: TFBaseModelOutputWithPooling = None def to_tuple(self) -> Tuple[Any]: return tuple( self[k] if k not in ["text_model_output", "vision_model_output"] else getattr(self, k).to_tuple() for k in self.keys() ) class TFCLIPVisionEmbeddings(tf.keras.layers.Layer): def __init__(self, config: CLIPVisionConfig, **kwargs): super().__init__(**kwargs) self.embed_dim = config.hidden_size self.image_size = config.image_size self.patch_size = config.patch_size self.num_patches = (self.image_size // self.patch_size) ** 2 self.num_positions = self.num_patches + 1 self.config = config self.patch_embedding = tf.keras.layers.Conv2D( filters=self.embed_dim, kernel_size=self.patch_size, strides=self.patch_size, padding="valid", data_format="channels_last", use_bias=False, kernel_initializer=get_initializer(self.config.initializer_range * self.config.initializer_factor), name="patch_embedding", ) def build(self, input_shape: tf.TensorShape = None): factor = self.config.initializer_factor self.class_embedding = self.add_weight( shape=(self.embed_dim,), initializer=get_initializer(self.embed_dim**-0.5 * factor), trainable=True, name="class_embedding", ) with tf.name_scope("position_embedding"): self.position_embedding = self.add_weight( shape=(self.num_positions, self.embed_dim), initializer=get_initializer(self.config.initializer_range * factor), trainable=True, name="embeddings", ) super().build(input_shape) def call(self, pixel_values: tf.Tensor) -> tf.Tensor: """`pixel_values` is expected to be of NCHW format.""" batch_size, num_channels, height, width = shape_list(pixel_values) # When running on CPU, `tf.nn.conv2d` doesn't support `NCHW` format. # So change the input format from `NCHW` to `NHWC`. # shape = (batch_size, in_height, in_width, in_channels=num_channels) pixel_values = tf.transpose(pixel_values, perm=(0, 2, 3, 1)) patch_embeds = self.patch_embedding(pixel_values) # Change the 2D spatial dimensions to a single temporal dimension. # shape = (batch_size, num_patches, out_channels=embed_dim) patch_embeds = tf.reshape(tensor=patch_embeds, shape=(batch_size, self.num_patches, -1)) # add the [CLS] token to the embedded patch tokens class_embeds = tf.broadcast_to(self.class_embedding, shape=(batch_size, 1, self.embed_dim)) embeddings = tf.concat((class_embeds, patch_embeds), axis=1) embeddings = embeddings + self.position_embedding return embeddings class TFCLIPTextEmbeddings(tf.keras.layers.Layer): def __init__(self, config: CLIPTextConfig, **kwargs): super().__init__(**kwargs) self.embed_dim = config.hidden_size self.config = config def build(self, input_shape: tf.TensorShape = None): with tf.name_scope("token_embedding"): self.weight = self.add_weight( shape=(self.config.vocab_size, self.embed_dim), initializer=get_initializer(self.config.initializer_factor * self.config.initializer_range), trainable=True, name="weight", ) with tf.name_scope("position_embedding"): self.position_embedding = self.add_weight( shape=(self.config.max_position_embeddings, self.embed_dim), initializer=get_initializer(self.config.initializer_factor * self.config.initializer_range), trainable=True, name="embeddings", ) super().build(input_shape) def call( self, input_ids: tf.Tensor = None, position_ids: tf.Tensor = None, inputs_embeds: tf.Tensor = None, ) -> tf.Tensor: """ Applies embedding based on inputs tensor. Returns: final_embeddings (`tf.Tensor`): output embedding tensor. """ if input_ids is None and inputs_embeds is None: raise ValueError("You have to specify either input_ids or inputs_embeds") if inputs_embeds is None: check_embeddings_within_bounds(input_ids, self.config.vocab_size) inputs_embeds = tf.gather(params=self.weight, indices=input_ids) input_shape = shape_list(inputs_embeds)[:-1] if position_ids is None: position_ids = tf.expand_dims(tf.range(start=0, limit=input_shape[-1]), axis=0) position_embeds = tf.gather(params=self.position_embedding, indices=position_ids) position_embeds = tf.tile(input=position_embeds, multiples=(input_shape[0], 1, 1)) final_embeddings = inputs_embeds + position_embeds return final_embeddings class TFCLIPAttention(tf.keras.layers.Layer): """Multi-headed attention from 'Attention Is All You Need' paper""" def __init__(self, config: CLIPConfig, **kwargs): super().__init__(**kwargs) self.embed_dim = config.hidden_size self.num_attention_heads = config.num_attention_heads self.attention_head_size = self.embed_dim // self.num_attention_heads if self.attention_head_size * self.num_attention_heads != self.embed_dim: raise ValueError( f"embed_dim must be divisible by num_heads (got `embed_dim`: {self.embed_dim} and `num_heads`:" f" {self.num_attention_heads})." ) factor = config.initializer_factor in_proj_std = (self.embed_dim**-0.5) * ((2 * config.num_hidden_layers) ** -0.5) * factor out_proj_std = (self.embed_dim**-0.5) * factor self.sqrt_att_head_size = math.sqrt(self.attention_head_size) self.q_proj = tf.keras.layers.Dense( units=self.embed_dim, kernel_initializer=get_initializer(in_proj_std), name="q_proj" ) self.k_proj = tf.keras.layers.Dense( units=self.embed_dim, kernel_initializer=get_initializer(in_proj_std), name="k_proj" ) self.v_proj = tf.keras.layers.Dense( units=self.embed_dim, kernel_initializer=get_initializer(in_proj_std), name="v_proj" ) self.dropout = tf.keras.layers.Dropout(rate=config.attention_dropout) self.out_proj = tf.keras.layers.Dense( units=self.embed_dim, kernel_initializer=get_initializer(out_proj_std), name="out_proj" ) # copied from transformers.models.bert.modeling_tf_bert.TFBertSelfAttention.transpose_for_scores def transpose_for_scores(self, tensor: tf.Tensor, batch_size: int) -> tf.Tensor: # Reshape from [batch_size, seq_length, all_head_size] to [batch_size, seq_length, num_attention_heads, attention_head_size] tensor = tf.reshape(tensor=tensor, shape=(batch_size, -1, self.num_attention_heads, self.attention_head_size)) # Transpose the tensor from [batch_size, seq_length, num_attention_heads, attention_head_size] to [batch_size, num_attention_heads, seq_length, attention_head_size] return tf.transpose(tensor, perm=[0, 2, 1, 3]) def call( self, hidden_states: tf.Tensor, attention_mask: tf.Tensor, causal_attention_mask: tf.Tensor, output_attentions: bool, training: bool = False, ) -> Tuple[tf.Tensor]: """Input shape: Batch x Time x Channel""" batch_size = shape_list(hidden_states)[0] mixed_query_layer = self.q_proj(inputs=hidden_states) mixed_key_layer = self.k_proj(inputs=hidden_states) mixed_value_layer = self.v_proj(inputs=hidden_states) query_layer = self.transpose_for_scores(mixed_query_layer, batch_size) key_layer = self.transpose_for_scores(mixed_key_layer, batch_size) value_layer = self.transpose_for_scores(mixed_value_layer, batch_size) # Take the dot product between "query" and "key" to get the raw attention scores. # (batch size, num_heads, seq_len_q, seq_len_k) attention_scores = tf.matmul(query_layer, key_layer, transpose_b=True) dk = tf.cast(self.sqrt_att_head_size, dtype=attention_scores.dtype) attention_scores = tf.divide(attention_scores, dk) # apply the causal_attention_mask first if causal_attention_mask is not None: # Apply the causal attention mask (precomputed for all layers in TFCLIPModel call() function) attention_scores = tf.add(attention_scores, causal_attention_mask) if attention_mask is not None: # Apply the attention mask (precomputed for all layers in TFCLIPModel call() function) attention_scores = tf.add(attention_scores, attention_mask) # Normalize the attention scores to probabilities. _attention_probs = stable_softmax(logits=attention_scores, axis=-1) # This is actually dropping out entire tokens to attend to, which might # seem a bit unusual, but is taken from the original Transformer paper. attention_probs = self.dropout(inputs=_attention_probs, training=training) attention_output = tf.matmul(attention_probs, value_layer) attention_output = tf.transpose(attention_output, perm=[0, 2, 1, 3]) # (batch_size, seq_len_q, embed_dim) attention_output = tf.reshape(tensor=attention_output, shape=(batch_size, -1, self.embed_dim)) attention_output = self.out_proj(attention_output, training=training) # In TFBert, attention weights are returned after dropout. # However, in CLIP, they are returned before dropout. outputs = (attention_output, _attention_probs) if output_attentions else (attention_output,) return outputs class TFCLIPMLP(tf.keras.layers.Layer): def __init__(self, config: CLIPConfig, **kwargs): super().__init__(**kwargs) self.activation_fn = get_tf_activation(config.hidden_act) factor = config.initializer_factor in_proj_std = (config.hidden_size**-0.5) * ((2 * config.num_hidden_layers) ** -0.5) * factor fc_std = (2 * config.hidden_size) ** -0.5 * factor self.fc1 = tf.keras.layers.Dense( units=config.intermediate_size, kernel_initializer=get_initializer(fc_std), name="fc1" ) self.fc2 = tf.keras.layers.Dense( units=config.hidden_size, kernel_initializer=get_initializer(in_proj_std), name="fc2" ) def call(self, hidden_states: tf.Tensor) -> tf.Tensor: hidden_states = self.fc1(inputs=hidden_states) hidden_states = self.activation_fn(hidden_states) hidden_states = self.fc2(inputs=hidden_states) return hidden_states class TFCLIPEncoderLayer(tf.keras.layers.Layer): def __init__(self, config: CLIPConfig, **kwargs): super().__init__(**kwargs) self.embed_dim = config.hidden_size self.self_attn = TFCLIPAttention(config, name="self_attn") self.layer_norm1 = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="layer_norm1") self.mlp = TFCLIPMLP(config, name="mlp") self.layer_norm2 = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="layer_norm2") def call( self, hidden_states: tf.Tensor, attention_mask: tf.Tensor, causal_attention_mask: tf.Tensor, output_attentions: bool, training: bool = False, ) -> Tuple[tf.Tensor]: """ Args: hidden_states (`tf.Tensor`): input to the layer of shape `(batch, seq_len, embed_dim)` attention_mask (`tf.Tensor`): attention mask of size `(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values. causal_attention_mask (`tf.Tensor`): causal attention mask of size `(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values. output_attentions (`bool`): Whether or not to return the attentions tensors of all attention layers. See `outputs` under returned tensors for more detail. """ residual = hidden_states hidden_states = self.layer_norm1(inputs=hidden_states) attention_outputs = self.self_attn( hidden_states=hidden_states, attention_mask=attention_mask, causal_attention_mask=causal_attention_mask, output_attentions=output_attentions, training=training, ) hidden_states = attention_outputs[0] hidden_states = residual + hidden_states residual = hidden_states hidden_states = self.layer_norm2(inputs=hidden_states) hidden_states = self.mlp(hidden_states=hidden_states) hidden_states = residual + hidden_states outputs = (hidden_states,) + attention_outputs[1:] # add attentions if we output them return outputs class TFCLIPEncoder(tf.keras.layers.Layer): """ Transformer encoder consisting of `config.num_hidden_layers` self attention layers. Each layer is a [`TFCLIPEncoderLayer`]. Args: config: CLIPConfig """ def __init__(self, config: CLIPConfig, **kwargs): super().__init__(**kwargs) self.layers = [TFCLIPEncoderLayer(config, name=f"layers_._{i}") for i in range(config.num_hidden_layers)] def call( self, hidden_states: tf.Tensor, attention_mask: tf.Tensor, causal_attention_mask: tf.Tensor, output_attentions: bool, output_hidden_states: bool, return_dict: bool, training: bool = False, ) -> Union[TFBaseModelOutput, Tuple[tf.Tensor]]: all_hidden_states = () if output_hidden_states else None all_attentions = () if output_attentions else None for i, layer_module in enumerate(self.layers): if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) layer_outputs = layer_module( hidden_states=hidden_states, attention_mask=attention_mask, causal_attention_mask=causal_attention_mask, output_attentions=output_attentions, training=training, ) hidden_states = layer_outputs[0] if output_attentions: all_attentions = all_attentions + (layer_outputs[1],) # Add last layer if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) if not return_dict: return tuple(v for v in [hidden_states, all_hidden_states, all_attentions] if v is not None) return TFBaseModelOutput( last_hidden_state=hidden_states, hidden_states=all_hidden_states, attentions=all_attentions ) class TFCLIPTextTransformer(tf.keras.layers.Layer): def __init__(self, config: CLIPTextConfig, **kwargs): super().__init__(**kwargs) self.embeddings = TFCLIPTextEmbeddings(config, name="embeddings") self.encoder = TFCLIPEncoder(config, name="encoder") self.final_layer_norm = tf.keras.layers.LayerNormalization( epsilon=config.layer_norm_eps, name="final_layer_norm" ) # For `pooled_output` computation self.eos_token_id = config.eos_token_id def call( self, input_ids: TFModelInputType, attention_mask: tf.Tensor, position_ids: tf.Tensor, output_attentions: bool, output_hidden_states: bool, return_dict: bool, training: bool = False, ) -> Union[TFBaseModelOutputWithPooling, Tuple[tf.Tensor]]: input_shape = shape_list(input_ids) embedding_output = self.embeddings(input_ids=input_ids, position_ids=position_ids) batch_size, seq_length = input_shape # CLIP's text model uses causal mask, prepare it here. # https://github.com/openai/CLIP/blob/cfcffb90e69f37bf2ff1e988237a0fbe41f33c04/clip/model.py#L324 causal_attention_mask = self._build_causal_attention_mask(batch_size, seq_length, dtype=embedding_output.dtype) # check attention mask and invert # [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len] attention_mask = _expand_mask(attention_mask) encoder_outputs = self.encoder( hidden_states=embedding_output, attention_mask=attention_mask, causal_attention_mask=causal_attention_mask, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, training=training, ) sequence_output = encoder_outputs[0] sequence_output = self.final_layer_norm(inputs=sequence_output) if self.eos_token_id == 2: # The `eos_token_id` was incorrect before PR #24773: Let's keep what have been done here. # A CLIP model with such `eos_token_id` in the config can't work correctly with extra new tokens added # ------------------------------------------------------------ # text_embeds.shape = [batch_size, n_ctx, transformer.width] # take features from the eot embedding (eot_token is the highest number in each sequence) pooled_output = tf.gather_nd( params=sequence_output, indices=tf.stack( values=(tf.range(input_shape[0], dtype=tf.int64), tf.math.argmax(input_ids, axis=-1)), axis=1 ), ) else: # The config gets updated `eos_token_id` from PR #24773 (so the use of exta new tokens is possible) pooled_output = tf.gather_nd( params=sequence_output, indices=tf.stack( values=( tf.range(input_shape[0], dtype=tf.int64), tf.math.argmax(tf.cast(input_ids == self.eos_token_id, dtype=tf.int8), axis=-1), ), axis=1, ), ) if not return_dict: return (sequence_output, pooled_output) + encoder_outputs[1:] return TFBaseModelOutputWithPooling( last_hidden_state=sequence_output, pooler_output=pooled_output, hidden_states=encoder_outputs.hidden_states, attentions=encoder_outputs.attentions, ) def _build_causal_attention_mask(self, batch_size, seq_length, dtype=tf.float32): # It is possible with an unspecified sequence length for seq_length to be # a runtime value, which is unsupported by tf.constant. Per the TensorFlow # docs, tf.fill can handle runtime dynamic shapes: # https://www.tensorflow.org/api_docs/python/tf/fill diag = tf.cast(tf.fill((seq_length,), 0.0), dtype) # set an additive 2D attention mask with all places being masked to_mask = tf.cast(tf.fill((seq_length, seq_length), -10000.0), dtype) # set diagonal & lower triangular parts to 0 (i.e. the places not to be masked) # TIP: think the 2D matrix as the space of (query_seq, key_seq) to_mask = tf.linalg.band_part(to_mask, 0, -1) # to_mask = tf.linalg.band_part(to_mask, -1, 0) to_mask = tf.linalg.set_diag(to_mask, diagonal=diag) return tf.broadcast_to(input=to_mask, shape=(batch_size, 1, seq_length, seq_length)) @keras_serializable class TFCLIPTextMainLayer(tf.keras.layers.Layer): config_class = CLIPTextConfig def __init__(self, config: CLIPTextConfig, **kwargs): super().__init__(**kwargs) self.config = config self.text_model = TFCLIPTextTransformer(config, name="text_model") def get_input_embeddings(self) -> tf.keras.layers.Layer: return self.text_model.embeddings def set_input_embeddings(self, value: tf.Variable): self.text_model.embeddings.weight = value self.text_model.embeddings.vocab_size = shape_list(value)[0] @unpack_inputs def call( self, input_ids: TFModelInputType | None = None, attention_mask: np.ndarray | tf.Tensor | None = None, position_ids: np.ndarray | tf.Tensor | None = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, training: bool = False, ) -> Union[TFBaseModelOutputWithPooling, Tuple[tf.Tensor]]: if input_ids is None: raise ValueError("You have to specify input_ids") input_shape = shape_list(input_ids) if attention_mask is None: attention_mask = tf.fill(dims=input_shape, value=1) text_model_outputs = self.text_model( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, training=training, ) return text_model_outputs class TFCLIPVisionTransformer(tf.keras.layers.Layer): def __init__(self, config: CLIPVisionConfig, **kwargs): super().__init__(**kwargs) self.embeddings = TFCLIPVisionEmbeddings(config, name="embeddings") self.pre_layernorm = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="pre_layrnorm") self.encoder = TFCLIPEncoder(config, name="encoder") self.post_layernorm = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="post_layernorm") def call( self, pixel_values: TFModelInputType, output_attentions: bool, output_hidden_states: bool, return_dict: bool, training: bool = False, ) -> Union[TFBaseModelOutputWithPooling, Tuple[tf.Tensor]]: embedding_output = self.embeddings(pixel_values=pixel_values) embedding_output = self.pre_layernorm(inputs=embedding_output) encoder_outputs = self.encoder( hidden_states=embedding_output, attention_mask=None, causal_attention_mask=None, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, training=training, ) sequence_output = encoder_outputs[0] pooled_output = sequence_output[:, 0, :] pooled_output = self.post_layernorm(inputs=pooled_output) if not return_dict: return (sequence_output, pooled_output) + encoder_outputs[1:] return TFBaseModelOutputWithPooling( last_hidden_state=sequence_output, pooler_output=pooled_output, hidden_states=encoder_outputs.hidden_states, attentions=encoder_outputs.attentions, ) @keras_serializable class TFCLIPVisionMainLayer(tf.keras.layers.Layer): config_class = CLIPVisionConfig def __init__(self, config: CLIPVisionConfig, **kwargs): super().__init__(**kwargs) self.config = config self.vision_model = TFCLIPVisionTransformer(config, name="vision_model") def get_input_embeddings(self) -> tf.keras.layers.Layer: return self.vision_model.embeddings @unpack_inputs def call( self, pixel_values: TFModelInputType | None = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, training: bool = False, ) -> Union[TFBaseModelOutputWithPooling, Tuple[tf.Tensor]]: if pixel_values is None: raise ValueError("You have to specify pixel_values") vision_model_outputs = self.vision_model( pixel_values=pixel_values, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, training=training, ) return vision_model_outputs @keras_serializable class TFCLIPMainLayer(tf.keras.layers.Layer): config_class = CLIPConfig def __init__(self, config: CLIPConfig, **kwargs): super().__init__(**kwargs) if not isinstance(config.text_config, CLIPTextConfig): raise ValueError( "config.text_config is expected to be of type CLIPTextConfig but is of type" f" {type(config.text_config)}." ) if not isinstance(config.vision_config, CLIPVisionConfig): raise ValueError( "config.vision_config is expected to be of type CLIPVisionConfig but is of type" f" {type(config.vision_config)}." ) self.config = config text_config = config.text_config vision_config = config.vision_config self.projection_dim = config.projection_dim self.text_model = TFCLIPTextTransformer(text_config, name="text_model") self.vision_model = TFCLIPVisionTransformer(vision_config, name="vision_model") self.visual_projection = tf.keras.layers.Dense( units=self.projection_dim, kernel_initializer=get_initializer(vision_config.hidden_size**-0.5 * self.config.initializer_factor), use_bias=False, name="visual_projection", ) self.text_projection = tf.keras.layers.Dense( units=self.projection_dim, kernel_initializer=get_initializer(text_config.hidden_size**-0.5 * self.config.initializer_factor), use_bias=False, name="text_projection", ) def build(self, input_shape: tf.TensorShape = None): self.logit_scale = self.add_weight( shape=(1,), initializer=tf.keras.initializers.Constant(self.config.logit_scale_init_value), trainable=True, name="logit_scale", ) super().build(input_shape) @unpack_inputs def get_text_features( self, input_ids: TFModelInputType | None = None, attention_mask: np.ndarray | tf.Tensor | None = None, position_ids: np.ndarray | tf.Tensor | None = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, training: bool = False, ) -> tf.Tensor: if input_ids is None: raise ValueError("You have to specify either input_ids") input_shape = shape_list(input_ids) if attention_mask is None: attention_mask = tf.fill(dims=input_shape, value=1) text_outputs = self.text_model( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, training=training, ) pooled_output = text_outputs[1] text_features = self.text_projection(inputs=pooled_output) return text_features @unpack_inputs def get_image_features( self, pixel_values: TFModelInputType | None = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, training: bool = False, ) -> tf.Tensor: if pixel_values is None: raise ValueError("You have to specify pixel_values") vision_outputs = self.vision_model( pixel_values=pixel_values, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, training=training, ) pooled_output = vision_outputs[1] # pooled_output image_features = self.visual_projection(inputs=pooled_output) return image_features @unpack_inputs def call( self, input_ids: TFModelInputType | None = None, pixel_values: TFModelInputType | None = None, attention_mask: np.ndarray | tf.Tensor | None = None, position_ids: np.ndarray | tf.Tensor | None = None, return_loss: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, training: bool = False, ) -> Union[TFCLIPOutput, Tuple[tf.Tensor]]: if input_ids is None: raise ValueError("You have to specify either input_ids") if pixel_values is None: raise ValueError("You have to specify pixel_values") input_shape = shape_list(input_ids) if attention_mask is None: attention_mask = tf.fill(dims=input_shape, value=1) vision_outputs = self.vision_model( pixel_values=pixel_values, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, training=training, ) text_outputs = self.text_model( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, training=training, ) image_embeds = vision_outputs[1] image_embeds = self.visual_projection(inputs=image_embeds) text_embeds = text_outputs[1] text_embeds = self.text_projection(inputs=text_embeds) # normalized features image_embeds = image_embeds / tf.norm(tensor=image_embeds, ord="euclidean", axis=-1, keepdims=True) text_embeds = text_embeds / tf.norm(tensor=text_embeds, ord="euclidean", axis=-1, keepdims=True) # cosine similarity as logits logit_scale = tf.math.exp(self.logit_scale) logits_per_text = tf.matmul(text_embeds, image_embeds, transpose_b=True) * logit_scale logits_per_image = tf.transpose(logits_per_text) loss = None if return_loss: loss = clip_loss(logits_per_text) loss = tf.reshape(loss, (1,)) if not return_dict: output = (logits_per_image, logits_per_text, text_embeds, image_embeds, text_outputs, vision_outputs) return (loss,) + output if loss is not None else output return TFCLIPOutput( loss=loss, logits_per_image=logits_per_image, logits_per_text=logits_per_text, text_embeds=text_embeds, image_embeds=image_embeds, text_model_output=text_outputs, vision_model_output=vision_outputs, ) class TFCLIPPreTrainedModel(TFPreTrainedModel): """ An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained models. """ config_class = CLIPConfig base_model_prefix = "clip" _keys_to_ignore_on_load_missing = [r"position_ids"] _keys_to_ignore_on_load_unexpected = [r"position_ids"] CLIP_START_DOCSTRING = r""" This model inherits from [`TFPreTrainedModel`]. Check the superclass documentation for the generic methods the library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads etc.) This model is also a [tf.keras.Model](https://www.tensorflow.org/api_docs/python/tf/keras/Model) subclass. Use it as a regular TF 2.0 Keras Model and refer to the TF 2.0 documentation for all matter related to general usage and behavior. TensorFlow models and layers in `transformers` accept two formats as input: - having all inputs as keyword arguments (like PyTorch models), or - having all inputs as a list, tuple or dict in the first positional argument. The reason the second format is supported is that Keras methods prefer this format when passing inputs to models and layers. Because of this support, when using methods like `model.fit()` things should "just work" for you - just pass your inputs and labels in any format that `model.fit()` supports! If, however, you want to use the second format outside of Keras methods like `fit()` and `predict()`, such as when creating your own layers or models with the Keras `Functional` API, there are three possibilities you can use to gather all the input Tensors in the first positional argument: - a single Tensor with `input_ids` only and nothing else: `model(input_ids)` - a list of varying length with one or several input Tensors IN THE ORDER given in the docstring: `model([input_ids, attention_mask])` or `model([input_ids, attention_mask, token_type_ids])` - a dictionary with one or several input Tensors associated to the input names given in the docstring: `model({"input_ids": input_ids, "token_type_ids": token_type_ids})` Note that when creating models and layers with [subclassing](https://keras.io/guides/making_new_layers_and_models_via_subclassing/) then you don't need to worry about any of this, as you can just pass inputs like you would to any other Python function! Args: config ([`CLIPConfig`]): Model configuration class with all the parameters of the model. Initializing with a config file does not load the weights associated with the model, only the configuration. Check out the [`~TFPreTrainedModel.from_pretrained`] method to load the model weights. """ CLIP_TEXT_INPUTS_DOCSTRING = r""" Args: input_ids (`np.ndarray`, `tf.Tensor`, `List[tf.Tensor]` ``Dict[str, tf.Tensor]` or `Dict[str, np.ndarray]` and each example must have the shape `({0})`): Indices of input sequence tokens in the vocabulary. Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.__call__`] and [`PreTrainedTokenizer.encode`] for details. [What are input IDs?](../glossary#input-ids) attention_mask (`np.ndarray` or `tf.Tensor` of shape `({0})`, *optional*): Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: - 1 for tokens that are **not masked**, - 0 for tokens that are **masked**. [What are attention masks?](../glossary#attention-mask) position_ids (`np.ndarray` or `tf.Tensor` of shape `({0})`, *optional*): Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0, config.max_position_embeddings - 1]`. [What are position IDs?](../glossary#position-ids) output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. This argument can be used only in eager mode, in graph mode the value in the config will be used instead. output_hidden_states (`bool`, *optional*): Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for more detail. This argument can be used only in eager mode, in graph mode the value in the config will be used instead. return_dict (`bool`, *optional*): Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. This argument can be used in eager mode, in graph mode the value will always be set to True. training (`bool`, *optional*, defaults to `False``): Whether or not to use the model in training mode (some modules like dropout modules have different behaviors between training and evaluation). """ CLIP_VISION_INPUTS_DOCSTRING = r""" Args: pixel_values (`np.ndarray`, `tf.Tensor`, `List[tf.Tensor]` ``Dict[str, tf.Tensor]` or `Dict[str, np.ndarray]` and each example must have the shape `(batch_size, num_channels, height, width)`): Pixel values. Pixel values can be obtained using [`AutoImageProcessor`]. See [`CLIPImageProcessor.__call__`] for details. output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. This argument can be used only in eager mode, in graph mode the value in the config will be used instead. output_hidden_states (`bool`, *optional*): Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for more detail. This argument can be used only in eager mode, in graph mode the value in the config will be used instead. return_dict (`bool`, *optional*): Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. This argument can be used in eager mode, in graph mode the value will always be set to True. training (`bool`, *optional*, defaults to `False``): Whether or not to use the model in training mode (some modules like dropout modules have different behaviors between training and evaluation). """ CLIP_INPUTS_DOCSTRING = r""" Args: input_ids (`np.ndarray`, `tf.Tensor`, `List[tf.Tensor]` ``Dict[str, tf.Tensor]` or `Dict[str, np.ndarray]` and each example must have the shape `({0})`): Indices of input sequence tokens in the vocabulary. Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.__call__`] and [`PreTrainedTokenizer.encode`] for details. [What are input IDs?](../glossary#input-ids) pixel_values (`np.ndarray`, `tf.Tensor`, `List[tf.Tensor]` `Dict[str, tf.Tensor]` or `Dict[str, np.ndarray]` and each example must have the shape `(batch_size, num_channels, height, width)`): Pixel values. Pixel values can be obtained using [`AutoImageProcessor`]. See [`CLIPImageProcessor.__call__`] for details. attention_mask (`np.ndarray` or `tf.Tensor` of shape `({0})`, *optional*): Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: - 1 for tokens that are **not masked**, - 0 for tokens that are **masked**. [What are attention masks?](../glossary#attention-mask) position_ids (`np.ndarray` or `tf.Tensor` of shape `({0})`, *optional*): Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0, config.max_position_embeddings - 1]`. [What are position IDs?](../glossary#position-ids) return_loss (`bool`, *optional*): Whether or not to return the contrastive loss. output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. This argument can be used only in eager mode, in graph mode the value in the config will be used instead. output_hidden_states (`bool`, *optional*): Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for more detail. This argument can be used only in eager mode, in graph mode the value in the config will be used instead. return_dict (`bool`, *optional*): Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. This argument can be used in eager mode, in graph mode the value will always be set to True. training (`bool`, *optional*, defaults to `False``): Whether or not to use the model in training mode (some modules like dropout modules have different behaviors between training and evaluation). """ class TFCLIPTextModel(TFCLIPPreTrainedModel): config_class = CLIPTextConfig def __init__(self, config: CLIPTextConfig, *inputs, **kwargs): super().__init__(config, *inputs, **kwargs) self.clip = TFCLIPTextMainLayer(config, name="clip") @unpack_inputs @add_start_docstrings_to_model_forward(CLIP_TEXT_INPUTS_DOCSTRING.format("batch_size, sequence_length")) @replace_return_docstrings(output_type=TFBaseModelOutputWithPooling, config_class=CLIPTextConfig) def call( self, input_ids: TFModelInputType | None = None, attention_mask: np.ndarray | tf.Tensor | None = None, position_ids: np.ndarray | tf.Tensor | None = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, training: Optional[bool] = False, ) -> Union[TFBaseModelOutputWithPooling, Tuple[tf.Tensor]]: r""" Returns: Examples: ```python >>> from transformers import AutoTokenizer, TFCLIPTextModel >>> model = TFCLIPTextModel.from_pretrained("openai/clip-vit-base-patch32") >>> tokenizer = AutoTokenizer.from_pretrained("openai/clip-vit-base-patch32") >>> inputs = tokenizer(["a photo of a cat", "a photo of a dog"], padding=True, return_tensors="tf") >>> outputs = model(**inputs) >>> last_hidden_state = outputs.last_hidden_state >>> pooled_output = outputs.pooler_output # pooled (EOS token) states ```""" outputs = self.clip( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, training=training, ) return outputs class TFCLIPVisionModel(TFCLIPPreTrainedModel): config_class = CLIPVisionConfig main_input_name = "pixel_values" def __init__(self, config: CLIPVisionConfig, *inputs, **kwargs): super().__init__(config, *inputs, **kwargs) self.clip = TFCLIPVisionMainLayer(config, name="clip") @unpack_inputs @add_start_docstrings_to_model_forward(CLIP_VISION_INPUTS_DOCSTRING) @replace_return_docstrings(output_type=TFBaseModelOutputWithPooling, config_class=CLIPVisionConfig) def call( self, pixel_values: TFModelInputType | None = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, training: Optional[bool] = False, ) -> Union[TFBaseModelOutputWithPooling, Tuple[tf.Tensor]]: r""" Returns: Examples: ```python >>> from PIL import Image >>> import requests >>> from transformers import AutoProcessor, TFCLIPVisionModel >>> model = TFCLIPVisionModel.from_pretrained("openai/clip-vit-base-patch32") >>> processor = AutoProcessor.from_pretrained("openai/clip-vit-base-patch32") >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" >>> image = Image.open(requests.get(url, stream=True).raw) >>> inputs = processor(images=image, return_tensors="tf") >>> outputs = model(**inputs) >>> last_hidden_state = outputs.last_hidden_state >>> pooled_output = outputs.pooler_output # pooled CLS states ```""" outputs = self.clip( pixel_values=pixel_values, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, training=training, ) return outputs @add_start_docstrings(CLIP_START_DOCSTRING) class TFCLIPModel(TFCLIPPreTrainedModel): config_class = CLIPConfig def __init__(self, config: CLIPConfig, *inputs, **kwargs): super().__init__(config, *inputs, **kwargs) self.clip = TFCLIPMainLayer(config, name="clip") @unpack_inputs @add_start_docstrings_to_model_forward(CLIP_TEXT_INPUTS_DOCSTRING.format("batch_size, sequence_length")) def get_text_features( self, input_ids: TFModelInputType | None = None, attention_mask: np.ndarray | tf.Tensor | None = None, position_ids: np.ndarray | tf.Tensor | None = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, training: bool = False, ) -> tf.Tensor: r""" Returns: text_features (`tf.Tensor` of shape `(batch_size, output_dim`): The text embeddings obtained by applying the projection layer to the pooled output of [`TFCLIPTextModel`]. Examples: ```python >>> from transformers import AutoTokenizer, TFCLIPModel >>> model = TFCLIPModel.from_pretrained("openai/clip-vit-base-patch32") >>> tokenizer = AutoTokenizer.from_pretrained("openai/clip-vit-base-patch32") >>> inputs = tokenizer(["a photo of a cat", "a photo of a dog"], padding=True, return_tensors="tf") >>> text_features = model.get_text_features(**inputs) ```""" text_features = self.clip.get_text_features( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) return text_features @unpack_inputs @add_start_docstrings_to_model_forward(CLIP_VISION_INPUTS_DOCSTRING) def get_image_features( self, pixel_values: TFModelInputType | None = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, training: bool = False, ) -> tf.Tensor: r""" Returns: image_features (`tf.Tensor` of shape `(batch_size, output_dim`): The image embeddings obtained by applying the projection layer to the pooled output of [`TFCLIPVisionModel`]. Examples: ```python >>> from PIL import Image >>> import requests >>> from transformers import AutoProcessor, TFCLIPModel >>> model = TFCLIPModel.from_pretrained("openai/clip-vit-base-patch32") >>> processor = AutoProcessor.from_pretrained("openai/clip-vit-base-patch32") >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" >>> image = Image.open(requests.get(url, stream=True).raw) >>> inputs = processor(images=image, return_tensors="tf") >>> image_features = model.get_image_features(**inputs) ```""" image_features = self.clip.get_image_features( pixel_values=pixel_values, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) return image_features @unpack_inputs @add_start_docstrings_to_model_forward(CLIP_INPUTS_DOCSTRING.format("batch_size, sequence_length")) @replace_return_docstrings(output_type=TFCLIPOutput, config_class=CLIPConfig) def call( self, input_ids: TFModelInputType | None = None, pixel_values: TFModelInputType | None = None, attention_mask: np.ndarray | tf.Tensor | None = None, position_ids: np.ndarray | tf.Tensor | None = None, return_loss: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, training: bool = False, ) -> Union[TFCLIPOutput, Tuple[tf.Tensor]]: r""" Returns: Examples: ```python >>> import tensorflow as tf >>> from PIL import Image >>> import requests >>> from transformers import AutoProcessor, TFCLIPModel >>> model = TFCLIPModel.from_pretrained("openai/clip-vit-base-patch32") >>> processor = AutoProcessor.from_pretrained("openai/clip-vit-base-patch32") >>> url = "http://images.cocodataset.org/val2017/000000039769.jpg" >>> image = Image.open(requests.get(url, stream=True).raw) >>> inputs = processor( ... text=["a photo of a cat", "a photo of a dog"], images=image, return_tensors="tf", padding=True ... ) >>> outputs = model(**inputs) >>> logits_per_image = outputs.logits_per_image # this is the image-text similarity score >>> probs = tf.nn.softmax(logits_per_image, axis=1) # we can take the softmax to get the label probabilities ```""" outputs = self.clip( input_ids=input_ids, pixel_values=pixel_values, attention_mask=attention_mask, position_ids=position_ids, return_loss=return_loss, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) return outputs def serving_output(self, output: TFCLIPOutput) -> TFCLIPOutput: # TODO: As is this currently fails with saved_model=True, because # TensorFlow cannot trace through nested dataclasses. Reference: # https://github.com/huggingface/transformers/pull/16886 return output