|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Tests for when the training and inference graphs are the same.""" |
|
|
|
import os |
|
import tempfile |
|
|
|
import tensorflow as tf |
|
|
|
from tensorflow_model_optimization.python.core.common.keras.compression.algorithms import periodical_update_and_scheduling as svd |
|
from tensorflow_model_optimization.python.core.keras.compat import keras |
|
from tensorflow_model_optimization.python.core.keras.testing import test_utils_mnist |
|
|
|
|
|
def _build_model(): |
|
i = keras.layers.Input(shape=(28, 28), name='input') |
|
x = keras.layers.Reshape((28, 28, 1))(i) |
|
x = keras.layers.Conv2D( |
|
20, 5, activation='relu', padding='valid', name='conv1' |
|
)(x) |
|
x = keras.layers.MaxPool2D(2, 2)(x) |
|
x = keras.layers.Conv2D( |
|
50, 5, activation='relu', padding='valid', name='conv2' |
|
)(x) |
|
x = keras.layers.MaxPool2D(2, 2)(x) |
|
x = keras.layers.Flatten()(x) |
|
x = keras.layers.Dense(500, activation='relu', name='fc1')(x) |
|
output = keras.layers.Dense(10, name='fc2')(x) |
|
|
|
model = keras.Model(inputs=[i], outputs=[output]) |
|
return model |
|
|
|
|
|
def _get_dataset(): |
|
mnist = keras.datasets.mnist |
|
(x_train, y_train), (x_test, y_test) = mnist.load_data() |
|
x_train, x_test = x_train / 255.0, x_test / 255.0 |
|
|
|
x_train = x_train[0:1000] |
|
y_train = y_train[0:1000] |
|
return (x_train, y_train), (x_test, y_test) |
|
|
|
|
|
def _train_model(model): |
|
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True) |
|
|
|
model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy']) |
|
|
|
(x_train, y_train), _ = _get_dataset() |
|
|
|
model.fit(x_train, y_train, epochs=1) |
|
|
|
|
|
def _save_as_saved_model(model): |
|
saved_model_dir = tempfile.mkdtemp() |
|
model.save(saved_model_dir) |
|
return saved_model_dir |
|
|
|
|
|
|
|
def _convert_to_tflite(saved_model_dir): |
|
_, tflite_file = tempfile.mkstemp() |
|
|
|
converter = tf.lite.TFLiteConverter.from_saved_model(saved_model_dir) |
|
tflite_model = converter.convert() |
|
|
|
with open(tflite_file, 'wb') as f: |
|
f.write(tflite_model) |
|
|
|
return tflite_file |
|
|
|
|
|
def _get_directory_size_in_bytes(directory): |
|
total = 0 |
|
try: |
|
for entry in os.scandir(directory): |
|
if entry.is_file(): |
|
|
|
total += entry.stat().st_size |
|
elif entry.is_dir(): |
|
|
|
total += _get_directory_size_in_bytes(entry.path) |
|
except NotADirectoryError: |
|
|
|
return os.path.getsize(directory) |
|
except PermissionError: |
|
|
|
return 0 |
|
return total |
|
|
|
|
|
class FunctionalTest(tf.test.TestCase): |
|
|
|
|
|
|
|
def testSVD_ReducesSavedModelSize(self): |
|
model = _build_model() |
|
|
|
original_saved_model_dir = _save_as_saved_model(model) |
|
|
|
algorithm = svd.SVD(rank=16, update_freq=1, warmup_step=10) |
|
training_model = algorithm.optimize_model(model) |
|
compressed_model = algorithm.compress_model(training_model) |
|
|
|
saved_model_dir = _save_as_saved_model(compressed_model) |
|
|
|
original_size = _get_directory_size_in_bytes(original_saved_model_dir) |
|
compressed_size = _get_directory_size_in_bytes(saved_model_dir) |
|
|
|
self.assertLess(compressed_size, original_size / 3) |
|
|
|
def testSVD_HasReasonableAccuracy_TF(self): |
|
model = _build_model() |
|
|
|
algorithm = svd.SVD(rank=16, update_freq=1, warmup_step=10) |
|
training_model = algorithm.optimize_model(model) |
|
|
|
_train_model(training_model) |
|
|
|
compressed_model = algorithm.compress_model(training_model) |
|
|
|
_, (x_test, y_test) = _get_dataset() |
|
|
|
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True) |
|
|
|
compressed_model.compile( |
|
optimizer='adam', loss=loss_fn, metrics=['accuracy']) |
|
|
|
results = compressed_model.evaluate(x_test, y_test) |
|
|
|
self.assertGreater(results[1], 0.60) |
|
|
|
def testSVD_ReducesTFLiteModelSize(self): |
|
model = _build_model() |
|
|
|
original_saved_model_dir = _save_as_saved_model(model) |
|
original_tflite_file = _convert_to_tflite(original_saved_model_dir) |
|
|
|
algorithm = svd.SVD(rank=16, update_freq=1, warmup_step=10) |
|
training_model = algorithm.optimize_model(model) |
|
compressed_model = algorithm.compress_model(training_model) |
|
|
|
saved_model_dir = _save_as_saved_model(compressed_model) |
|
compressed_tflite_file = _convert_to_tflite(saved_model_dir) |
|
|
|
original_size = os.path.getsize(original_tflite_file) |
|
compressed_size = os.path.getsize(compressed_tflite_file) |
|
|
|
self.assertLess(compressed_size, original_size / 6) |
|
|
|
def testSVD_HasReasonableAccuracy_TFLite(self): |
|
model = _build_model() |
|
|
|
algorithm = svd.SVD(rank=16, update_freq=1, warmup_step=10) |
|
training_model = algorithm.optimize_model(model) |
|
|
|
_train_model(training_model) |
|
|
|
compressed_model = algorithm.compress_model(training_model) |
|
|
|
saved_model_dir = _save_as_saved_model(compressed_model) |
|
compressed_tflite_file = _convert_to_tflite(saved_model_dir) |
|
|
|
accuracy = test_utils_mnist.eval_tflite(compressed_tflite_file) |
|
|
|
self.assertGreater(accuracy, 0.60) |
|
|
|
|
|
def testSVD_BreaksDownLayerWeights(self): |
|
model = _build_model() |
|
|
|
first_conv_layer = model.layers[2] |
|
self.assertLen(first_conv_layer.weights, 2) |
|
|
|
algorithm = svd.SVD(rank=16, update_freq=1, warmup_step=10) |
|
training_model = algorithm.optimize_model(model) |
|
compressed_model = algorithm.compress_model(training_model) |
|
|
|
first_conv_layer = compressed_model.layers[2] |
|
|
|
self.assertLen(first_conv_layer.weights, 3) |
|
|
|
|
|
def testSVD_PreservesPretrainedWeights(self): |
|
i = keras.layers.Input(shape=(2), name='input') |
|
output = keras.layers.Dense(3, name='fc1')(i) |
|
model = keras.Model(inputs=[i], outputs=[output]) |
|
|
|
dense_layer_weights = model.layers[1].get_weights() |
|
|
|
algorithm = svd.SVD(rank=1, update_freq=1, warmup_step=10) |
|
training_model = algorithm.optimize_model(model) |
|
|
|
dense_layer_training_weights = training_model.layers[1].get_weights() |
|
|
|
|
|
algorithm.weight_reprs = [] |
|
algorithm.init_training_weights(dense_layer_weights[0]) |
|
w1_repr, w2_repr = algorithm.weight_reprs |
|
assert (w1_repr.kwargs['initializer'](None) == \ |
|
dense_layer_training_weights[0]).numpy().all() |
|
assert (w2_repr.kwargs['initializer'](None) == \ |
|
dense_layer_training_weights[1]).numpy().all() |
|
|
|
|
|
assert (dense_layer_weights[1] == dense_layer_training_weights[2]).all() |
|
|
|
|
|
if __name__ == '__main__': |
|
tf.test.main() |
|
|