Spaces:
Sleeping
Sleeping
| import tensorflow as tf | |
| import coremltools as ct | |
| import numpy as np | |
| import PIL | |
| from huggingface_hub import hf_hub_download | |
| from huggingface_hub import snapshot_download | |
| import os | |
| import math | |
| # Helper class to extract features from one model, and then feed those features into a classification head | |
| # Because coremltools will only perform inference on OSX, an alternative tensorflow inference pipeline uses | |
| # a tensorflow feature extractor and feeds the features into a dynamically created keras model based on the coreml classification head. | |
| class CoreMLPipeline: | |
| def __init__(self, config, auth_key, use_tf): | |
| self.config = config | |
| self.use_tf = use_tf | |
| if use_tf: | |
| extractor_path = snapshot_download(repo_id=config["tf_extractor_repoid"], use_auth_token = auth_key) | |
| else: | |
| extractor_path = hf_hub_download(repo_id=config["coreml_extractor_repoid"], | |
| filename=config["coreml_extractor_path"], use_auth_token = auth_key) | |
| classifier_path = hf_hub_download(repo_id=config["coreml_classifier_repoid"], filename=config["coreml_classifier_path"], | |
| use_auth_token = auth_key) | |
| print(f"Loading extractor...{extractor_path}") | |
| if use_tf: | |
| self.extractor = tf.saved_model.load(os.path.join(extractor_path, config["tf_extractor_path"])) | |
| else: | |
| self.extractor = ct.models.MLModel(extractor_path) | |
| print(f"Loading classifier...{classifier_path}") | |
| self.classifier = ct.models.MLModel(classifier_path) | |
| if use_tf: | |
| self.make_keras_model() | |
| #unquantizes values if quantized | |
| def realize_weights(self, nnWeights, width): | |
| if nnWeights.quantization.numberOfBits == 0: | |
| if len(nnWeights.float16Value) > 0: | |
| weights = np.frombuffer(nnWeights.float16Value, dtype=np.float16) | |
| print(f"found 16 bit {len(nnWeights.float16Value)/2} values") | |
| else: | |
| weights = np.array(nnWeights.floatValue) | |
| elif nnWeights.quantization.numberOfBits == 8: | |
| scales = np.array(nnWeights.quantization.linearQuantization.scale) | |
| biases = np.array(nnWeights.quantization.linearQuantization.bias) | |
| quantized = nnWeights.rawValue | |
| classes = len(scales) | |
| weights = [] | |
| for i in range(0,classes): | |
| scale = scales[i] | |
| bias = biases[i] | |
| for j in range(0,width): | |
| weights.append(quantized[i*width + j] * scale + bias) | |
| weights = np.array(weights) | |
| else: | |
| print(f"Unsupported quantization: {nnWeights.quantization.numberOfBits}") | |
| weights = None | |
| return weights | |
| #Only MacOS can run inference on CoreML models. Convert it to tensorflow to match the tf feature extractor | |
| def make_keras_model(self): | |
| spec = self.classifier.get_spec() | |
| nnClassifier = spec.neuralNetworkClassifier | |
| labels = nnClassifier.stringClassLabels.vector | |
| input = tf.keras.Input(shape = (1280)) | |
| if "activation" in self.config: | |
| activation = self.config['activation'] | |
| else: | |
| activation = "sigmoid" if len(labels) == 1 else "softmax" | |
| x = tf.keras.layers.Dense(len(labels), activation = activation)(input) | |
| model = tf.keras.Model(input,x, trainable = False) | |
| weights = self.realize_weights(nnClassifier.layers[0].innerProduct.weights,1280) | |
| weights = weights.reshape((len(labels),1280)) | |
| weights = weights.T | |
| bias = self.realize_weights(nnClassifier.layers[0].innerProduct.bias, len(labels)) | |
| bias.reshape(1,len(labels)) | |
| model.set_weights([weights,bias]) | |
| self.tf_model = model | |
| self.labels = labels | |
| import math | |
| def softmax_dict(self, input_dict): | |
| """ | |
| Compute the softmax of a dictionary of values. | |
| Args: | |
| input_dict (dict): A dictionary with numerical values. | |
| Returns: | |
| dict: A dictionary with the same keys where the values are the softmax of the input values. | |
| """ | |
| # Compute the exponential of all the values | |
| exp_values = {k: math.exp(v) for k, v in input_dict.items()} | |
| # Compute the sum of all exponential values | |
| sum_exp_values = sum(exp_values.values()) | |
| # Compute the softmax by dividing each exponential value by the sum of all exponential values | |
| softmax_values = {k: v / sum_exp_values for k, v in exp_values.items()} | |
| return softmax_values | |
| def classify(self,resized): | |
| if self.use_tf: | |
| image = tf.image.convert_image_dtype(resized, tf.float32) | |
| image = tf.expand_dims(image, 0) | |
| features = self.extractor.signatures['serving_default'](image) | |
| input = {"input_1":features["output_1"]} | |
| output = self.tf_model.predict(input) | |
| results = {} | |
| for i,label in enumerate(self.labels): | |
| results[label] = output[0][i] | |
| else: | |
| features = self.extractor.predict({"image":resized}) | |
| features = features["Identity"] | |
| output = self.classifier.predict({"features":features[0]}) | |
| results = output["Identity"] | |
| if "activation" in self.config and self.config["activation"] == "softmax": | |
| results = self.softmax_dict(results) | |
| return results | |