Spaces:
Runtime error
Runtime error
| # Copyright 2018 The TensorFlow Authors. All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| # ============================================================================== | |
| """Defines NeuMF model for NCF framework. | |
| Some abbreviations used in the code base: | |
| NeuMF: Neural Matrix Factorization | |
| NCF: Neural Collaborative Filtering | |
| GMF: Generalized Matrix Factorization | |
| MLP: Multi-Layer Perceptron | |
| GMF applies a linear kernel to model the latent feature interactions, and MLP | |
| uses a nonlinear kernel to learn the interaction function from data. NeuMF model | |
| is a fused model of GMF and MLP to better model the complex user-item | |
| interactions, and unifies the strengths of linearity of MF and non-linearity of | |
| MLP for modeling the user-item latent structures. | |
| In NeuMF model, it allows GMF and MLP to learn separate embeddings, and combine | |
| the two models by concatenating their last hidden layer. | |
| """ | |
| from __future__ import absolute_import | |
| from __future__ import division | |
| # from __future__ import google_type_annotations | |
| from __future__ import print_function | |
| import sys | |
| from six.moves import xrange # pylint: disable=redefined-builtin | |
| import tensorflow as tf | |
| from typing import Any, Dict, Text | |
| from official.recommendation import constants as rconst | |
| from official.recommendation import movielens | |
| from official.recommendation import ncf_common | |
| from official.recommendation import stat_utils | |
| def sparse_to_dense_grads(grads_and_vars): | |
| """Convert sparse gradients to dense gradients. | |
| All sparse gradients, which are represented as instances of tf.IndexedSlices, | |
| are converted to dense Tensors. Dense gradients, which are represents as | |
| Tensors, are unchanged. | |
| The purpose of this conversion is that for small embeddings, which are used by | |
| this model, applying dense gradients with the AdamOptimizer is faster than | |
| applying sparse gradients. | |
| Args | |
| grads_and_vars: A list of (gradient, variable) tuples. Each gradient can | |
| be a Tensor or an IndexedSlices. Tensors are unchanged, and IndexedSlices | |
| are converted to dense Tensors. | |
| Returns: | |
| The same list of (gradient, variable) as `grads_and_vars`, except each | |
| IndexedSlices gradient is converted to a Tensor. | |
| """ | |
| # Calling convert_to_tensor changes IndexedSlices into Tensors, and leaves | |
| # Tensors unchanged. | |
| return [(tf.convert_to_tensor(g), v) for g, v in grads_and_vars] | |
| def neumf_model_fn(features, labels, mode, params): | |
| """Model Function for NeuMF estimator.""" | |
| if params.get("use_seed"): | |
| tf.set_random_seed(stat_utils.random_int32()) | |
| users = features[movielens.USER_COLUMN] | |
| items = features[movielens.ITEM_COLUMN] | |
| user_input = tf.keras.layers.Input(tensor=users) | |
| item_input = tf.keras.layers.Input(tensor=items) | |
| logits = construct_model(user_input, item_input, params).output | |
| # Softmax with the first column of zeros is equivalent to sigmoid. | |
| softmax_logits = ncf_common.convert_to_softmax_logits(logits) | |
| if mode == tf.estimator.ModeKeys.EVAL: | |
| duplicate_mask = tf.cast(features[rconst.DUPLICATE_MASK], tf.float32) | |
| return _get_estimator_spec_with_metrics( | |
| logits, | |
| softmax_logits, | |
| duplicate_mask, | |
| params["num_neg"], | |
| params["match_mlperf"], | |
| use_tpu_spec=params["use_tpu"]) | |
| elif mode == tf.estimator.ModeKeys.TRAIN: | |
| labels = tf.cast(labels, tf.int32) | |
| valid_pt_mask = features[rconst.VALID_POINT_MASK] | |
| optimizer = tf.compat.v1.train.AdamOptimizer( | |
| learning_rate=params["learning_rate"], | |
| beta1=params["beta1"], | |
| beta2=params["beta2"], | |
| epsilon=params["epsilon"]) | |
| if params["use_tpu"]: | |
| optimizer = tf.compat.v1.tpu.CrossShardOptimizer(optimizer) | |
| loss = tf.compat.v1.losses.sparse_softmax_cross_entropy( | |
| labels=labels, | |
| logits=softmax_logits, | |
| weights=tf.cast(valid_pt_mask, tf.float32) | |
| ) | |
| tf.identity(loss, name="cross_entropy") | |
| global_step = tf.compat.v1.train.get_global_step() | |
| tvars = tf.compat.v1.trainable_variables() | |
| gradients = optimizer.compute_gradients( | |
| loss, tvars, colocate_gradients_with_ops=True) | |
| gradients = sparse_to_dense_grads(gradients) | |
| minimize_op = optimizer.apply_gradients( | |
| gradients, global_step=global_step, name="train") | |
| update_ops = tf.compat.v1.get_collection(tf.compat.v1.GraphKeys.UPDATE_OPS) | |
| train_op = tf.group(minimize_op, update_ops) | |
| return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op) | |
| else: | |
| raise NotImplementedError | |
| def _strip_first_and_last_dimension(x, batch_size): | |
| return tf.reshape(x[0, :], (batch_size,)) | |
| def construct_model(user_input: tf.Tensor, item_input: tf.Tensor, | |
| params: Dict[Text, Any]) -> tf.keras.Model: | |
| """Initialize NeuMF model. | |
| Args: | |
| user_input: keras input layer for users | |
| item_input: keras input layer for items | |
| params: Dict of hyperparameters. | |
| Raises: | |
| ValueError: if the first model layer is not even. | |
| Returns: | |
| model: a keras Model for computing the logits | |
| """ | |
| num_users = params["num_users"] | |
| num_items = params["num_items"] | |
| model_layers = params["model_layers"] | |
| mf_regularization = params["mf_regularization"] | |
| mlp_reg_layers = params["mlp_reg_layers"] | |
| mf_dim = params["mf_dim"] | |
| if model_layers[0] % 2 != 0: | |
| raise ValueError("The first layer size should be multiple of 2!") | |
| # Initializer for embedding layers | |
| embedding_initializer = "glorot_uniform" | |
| def mf_slice_fn(x): | |
| x = tf.squeeze(x, [1]) | |
| return x[:, :mf_dim] | |
| def mlp_slice_fn(x): | |
| x = tf.squeeze(x, [1]) | |
| return x[:, mf_dim:] | |
| # It turns out to be significantly more effecient to store the MF and MLP | |
| # embedding portions in the same table, and then slice as needed. | |
| embedding_user = tf.keras.layers.Embedding( | |
| num_users, | |
| mf_dim + model_layers[0] // 2, | |
| embeddings_initializer=embedding_initializer, | |
| embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization), | |
| input_length=1, | |
| name="embedding_user")( | |
| user_input) | |
| embedding_item = tf.keras.layers.Embedding( | |
| num_items, | |
| mf_dim + model_layers[0] // 2, | |
| embeddings_initializer=embedding_initializer, | |
| embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization), | |
| input_length=1, | |
| name="embedding_item")( | |
| item_input) | |
| # GMF part | |
| mf_user_latent = tf.keras.layers.Lambda( | |
| mf_slice_fn, name="embedding_user_mf")(embedding_user) | |
| mf_item_latent = tf.keras.layers.Lambda( | |
| mf_slice_fn, name="embedding_item_mf")(embedding_item) | |
| # MLP part | |
| mlp_user_latent = tf.keras.layers.Lambda( | |
| mlp_slice_fn, name="embedding_user_mlp")(embedding_user) | |
| mlp_item_latent = tf.keras.layers.Lambda( | |
| mlp_slice_fn, name="embedding_item_mlp")(embedding_item) | |
| # Element-wise multiply | |
| mf_vector = tf.keras.layers.multiply([mf_user_latent, mf_item_latent]) | |
| # Concatenation of two latent features | |
| mlp_vector = tf.keras.layers.concatenate([mlp_user_latent, mlp_item_latent]) | |
| num_layer = len(model_layers) # Number of layers in the MLP | |
| for layer in xrange(1, num_layer): | |
| model_layer = tf.keras.layers.Dense( | |
| model_layers[layer], | |
| kernel_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[layer]), | |
| activation="relu") | |
| mlp_vector = model_layer(mlp_vector) | |
| # Concatenate GMF and MLP parts | |
| predict_vector = tf.keras.layers.concatenate([mf_vector, mlp_vector]) | |
| # Final prediction layer | |
| logits = tf.keras.layers.Dense( | |
| 1, activation=None, kernel_initializer="lecun_uniform", | |
| name=movielens.RATING_COLUMN)(predict_vector) | |
| # Print model topology. | |
| model = tf.keras.models.Model([user_input, item_input], logits) | |
| model.summary() | |
| sys.stdout.flush() | |
| return model | |
| def _get_estimator_spec_with_metrics(logits: tf.Tensor, | |
| softmax_logits: tf.Tensor, | |
| duplicate_mask: tf.Tensor, | |
| num_training_neg: int, | |
| match_mlperf: bool = False, | |
| use_tpu_spec: bool = False): | |
| """Returns a EstimatorSpec that includes the metrics.""" | |
| cross_entropy, \ | |
| metric_fn, \ | |
| in_top_k, \ | |
| ndcg, \ | |
| metric_weights = compute_eval_loss_and_metrics_helper( | |
| logits, | |
| softmax_logits, | |
| duplicate_mask, | |
| num_training_neg, | |
| match_mlperf) | |
| if use_tpu_spec: | |
| return tf.estimator.tpu.TPUEstimatorSpec( | |
| mode=tf.estimator.ModeKeys.EVAL, | |
| loss=cross_entropy, | |
| eval_metrics=(metric_fn, [in_top_k, ndcg, metric_weights])) | |
| return tf.estimator.EstimatorSpec( | |
| mode=tf.estimator.ModeKeys.EVAL, | |
| loss=cross_entropy, | |
| eval_metric_ops=metric_fn(in_top_k, ndcg, metric_weights) | |
| ) | |
| def compute_eval_loss_and_metrics_helper(logits: tf.Tensor, | |
| softmax_logits: tf.Tensor, | |
| duplicate_mask: tf.Tensor, | |
| num_training_neg: int, | |
| match_mlperf: bool = False): | |
| """Model evaluation with HR and NDCG metrics. | |
| The evaluation protocol is to rank the test interacted item (truth items) | |
| among the randomly chosen 999 items that are not interacted by the user. | |
| The performance of the ranked list is judged by Hit Ratio (HR) and Normalized | |
| Discounted Cumulative Gain (NDCG). | |
| For evaluation, the ranked list is truncated at 10 for both metrics. As such, | |
| the HR intuitively measures whether the test item is present on the top-10 | |
| list, and the NDCG accounts for the position of the hit by assigning higher | |
| scores to hits at top ranks. Both metrics are calculated for each test user, | |
| and the average scores are reported. | |
| If `match_mlperf` is True, then the HR and NDCG computations are done in a | |
| slightly unusual way to match the MLPerf reference implementation. | |
| Specifically, if the evaluation negatives contain duplicate items, it will be | |
| treated as if the item only appeared once. Effectively, for duplicate items in | |
| a row, the predicted score for all but one of the items will be set to | |
| -infinity | |
| For example, suppose we have that following inputs: | |
| logits_by_user: [[ 2, 3, 3], | |
| [ 5, 4, 4]] | |
| items_by_user: [[10, 20, 20], | |
| [30, 40, 40]] | |
| # Note: items_by_user is not explicitly present. Instead the relevant \ | |
| information is contained within `duplicate_mask` | |
| top_k: 2 | |
| Then with match_mlperf=True, the HR would be 2/2 = 1.0. With | |
| match_mlperf=False, the HR would be 1/2 = 0.5. This is because each user has | |
| predicted scores for only 2 unique items: 10 and 20 for the first user, and 30 | |
| and 40 for the second. Therefore, with match_mlperf=True, it's guaranteed the | |
| first item's score is in the top 2. With match_mlperf=False, this function | |
| would compute the first user's first item is not in the top 2, because item 20 | |
| has a higher score, and item 20 occurs twice. | |
| Args: | |
| logits: A tensor containing the predicted logits for each user. The shape of | |
| logits is (num_users_per_batch * (1 + NUM_EVAL_NEGATIVES),) Logits for a | |
| user are grouped, and the last element of the group is the true element. | |
| softmax_logits: The same tensor, but with zeros left-appended. | |
| duplicate_mask: A vector with the same shape as logits, with a value of 1 if | |
| the item corresponding to the logit at that position has already appeared | |
| for that user. | |
| num_training_neg: The number of negatives per positive during training. | |
| match_mlperf: Use the MLPerf reference convention for computing rank. | |
| Returns: | |
| cross_entropy: the loss | |
| metric_fn: the metrics function | |
| in_top_k: hit rate metric | |
| ndcg: ndcg metric | |
| metric_weights: metric weights | |
| """ | |
| in_top_k, ndcg, metric_weights, logits_by_user = compute_top_k_and_ndcg( | |
| logits, duplicate_mask, match_mlperf) | |
| # Examples are provided by the eval Dataset in a structured format, so eval | |
| # labels can be reconstructed on the fly. | |
| eval_labels = tf.reshape(shape=(-1,), tensor=tf.one_hot( | |
| tf.zeros(shape=(logits_by_user.shape[0],), dtype=tf.int32) + | |
| rconst.NUM_EVAL_NEGATIVES, logits_by_user.shape[1], dtype=tf.int32)) | |
| eval_labels_float = tf.cast(eval_labels, tf.float32) | |
| # During evaluation, the ratio of negatives to positives is much higher | |
| # than during training. (Typically 999 to 1 vs. 4 to 1) By adjusting the | |
| # weights for the negative examples we compute a loss which is consistent with | |
| # the training data. (And provides apples-to-apples comparison) | |
| negative_scale_factor = num_training_neg / rconst.NUM_EVAL_NEGATIVES | |
| example_weights = ( | |
| (eval_labels_float + (1 - eval_labels_float) * negative_scale_factor) * | |
| (1 + rconst.NUM_EVAL_NEGATIVES) / (1 + num_training_neg)) | |
| # Tile metric weights back to logit dimensions | |
| expanded_metric_weights = tf.reshape(tf.tile( | |
| metric_weights[:, tf.newaxis], (1, rconst.NUM_EVAL_NEGATIVES + 1)), (-1,)) | |
| # ignore padded examples | |
| example_weights *= tf.cast(expanded_metric_weights, tf.float32) | |
| cross_entropy = tf.compat.v1.losses.sparse_softmax_cross_entropy( | |
| logits=softmax_logits, labels=eval_labels, weights=example_weights) | |
| def metric_fn(top_k_tensor, ndcg_tensor, weight_tensor): | |
| return { | |
| rconst.HR_KEY: tf.compat.v1.metrics.mean(top_k_tensor, | |
| weights=weight_tensor, | |
| name=rconst.HR_METRIC_NAME), | |
| rconst.NDCG_KEY: tf.compat.v1.metrics.mean(ndcg_tensor, | |
| weights=weight_tensor, | |
| name=rconst.NDCG_METRIC_NAME) | |
| } | |
| return cross_entropy, metric_fn, in_top_k, ndcg, metric_weights | |
| def compute_top_k_and_ndcg(logits: tf.Tensor, | |
| duplicate_mask: tf.Tensor, | |
| match_mlperf: bool = False): | |
| """Compute inputs of metric calculation. | |
| Args: | |
| logits: A tensor containing the predicted logits for each user. The shape of | |
| logits is (num_users_per_batch * (1 + NUM_EVAL_NEGATIVES),) Logits for a | |
| user are grouped, and the first element of the group is the true element. | |
| duplicate_mask: A vector with the same shape as logits, with a value of 1 if | |
| the item corresponding to the logit at that position has already appeared | |
| for that user. | |
| match_mlperf: Use the MLPerf reference convention for computing rank. | |
| Returns: | |
| is_top_k, ndcg and weights, all of which has size (num_users_in_batch,), and | |
| logits_by_user which has size | |
| (num_users_in_batch, (rconst.NUM_EVAL_NEGATIVES + 1)). | |
| """ | |
| logits_by_user = tf.reshape(logits, (-1, rconst.NUM_EVAL_NEGATIVES + 1)) | |
| duplicate_mask_by_user = tf.cast( | |
| tf.reshape(duplicate_mask, (-1, rconst.NUM_EVAL_NEGATIVES + 1)), | |
| logits_by_user.dtype) | |
| if match_mlperf: | |
| # Set duplicate logits to the min value for that dtype. The MLPerf | |
| # reference dedupes during evaluation. | |
| logits_by_user *= (1 - duplicate_mask_by_user) | |
| logits_by_user += duplicate_mask_by_user * logits_by_user.dtype.min | |
| # Determine the location of the first element in each row after the elements | |
| # are sorted. | |
| sort_indices = tf.argsort( | |
| logits_by_user, axis=1, direction="DESCENDING") | |
| # Use matrix multiplication to extract the position of the true item from the | |
| # tensor of sorted indices. This approach is chosen because both GPUs and TPUs | |
| # perform matrix multiplications very quickly. This is similar to np.argwhere. | |
| # However this is a special case because the target will only appear in | |
| # sort_indices once. | |
| one_hot_position = tf.cast(tf.equal(sort_indices, rconst.NUM_EVAL_NEGATIVES), | |
| tf.int32) | |
| sparse_positions = tf.multiply( | |
| one_hot_position, tf.range(logits_by_user.shape[1])[tf.newaxis, :]) | |
| position_vector = tf.reduce_sum(sparse_positions, axis=1) | |
| in_top_k = tf.cast(tf.less(position_vector, rconst.TOP_K), tf.float32) | |
| ndcg = tf.math.log(2.) / tf.math.log( | |
| tf.cast(position_vector, tf.float32) + 2) | |
| ndcg *= in_top_k | |
| # If a row is a padded row, all but the first element will be a duplicate. | |
| metric_weights = tf.not_equal(tf.reduce_sum(duplicate_mask_by_user, axis=1), | |
| rconst.NUM_EVAL_NEGATIVES) | |
| return in_top_k, ndcg, metric_weights, logits_by_user | |