# Facial Expression Embedding

In [1]:
import numpy as np
import os
import random
from typing import *
import tensorflow as tf
from pathlib import Path
from tensorflow.keras import applications
from tensorflow.keras import layers
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras import Model
from tensorflow.keras.applications import resnet
import pandas as pd
import mediapipe as mp
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import requests
from tqdm import tqdm
import base64
from concurrent.futures import ThreadPoolExecutor, as_completed
from retrying import retry
import swifter


2023-04-19 12:14:54.399884: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Dataset Loading

In [9]:
IMAGE_DIR = "images"
TRAINING_DATASET = "training_dataset"

In [3]:
@retry(stop_max_attempt_number=3)
def image_downloader(url: str):
    get_name = lambda url: base64.urlsafe_b64encode(url.encode()).decode()
    Path(IMAGE_DIR).mkdir(exist_ok=True)
    filename = get_name(url)
    if os.path.exists(os.path.join(IMAGE_DIR, filename)):
        return filename
    res = requests.get(url, timeout=10)
    if not res.ok:
        return None
    with open(os.path.join(IMAGE_DIR, filename), "wb") as f:
        f.write(res.content)
    return filename


In [4]:
def get_column_names():
    names = []
    for i in range(1, 4):
        names += [
            f"img{i}_url",
            f"img{i}_tl_col",
            f"img{i}_br_col",
            f"img{i}_tl_row",
            f"img{i}_br_row",
        ]
    names += ["triplet_type"]
    for i in range(6):
        names += [f"annotator{i+1}_id", f"annotation{i+1}"]
    return names


def get_local_storage_column_names():
    names = []
    for i in range(1, 4):
        names += [
            f"img{i}_id",
            f"img{i}_tl_col",
            f"img{i}_br_col",
            f"img{i}_tl_row",
            f"img{i}_br_row",
        ]
    names += ["triplet_type"]
    names += ["annotator1_id"]
    names += ["annotation"]
    return names


def get_label(annotations: pd.Series):
    def mode(x):
        s = pd.Series(x)
        if s.value_counts(normalize=True).max() < 0.5:
            return np.nan
        return s.mode().at[0]

    return annotations.swifter.apply(mode)


def fecnet_dataset_loader(dataset_csv: str):
    if isinstance(dataset_csv, bytes):
        dataset_csv = dataset_csv.decode()
    df = pd.read_csv(
        dataset_csv, header=None, names=get_column_names(), nrows=10000
    )  # TODO: remove nrows

    # download images
    df["img1_url"] = df["img1_url"].swifter.apply(image_downloader)
    df["img2_url"] = df["img2_url"].swifter.apply(image_downloader)
    df["img3_url"] = df["img3_url"].swifter.apply(image_downloader)
    df.dropna(subset=["img1_url", "img2_url", "img3_url"], inplace=True)

    # determine label
    df["label"] = get_label(
        pd.Series(df[[f"annotation{i}" for i in range(1, 7)]].values.tolist())
    )
    df.dropna(subset=["label"], inplace=True)

    samples = {
        "img1": [],
        "img1_box": [],
        "img2": [],
        "img2_box": [],
        "img3": [],
        "img3_box": [],
    }

    for _, row in df.iterrows():
        img1_idx, img2_idx, img3_idx = 1, 2, 3
        if row.label == 1:
            img1_idx, img3_idx = img3_idx, img1_idx
        elif row.label == 2:
            img2_idx, img3_idx = img3_idx, img2_idx
        bounding_boxes = (
            (
                (row[f"img{img1_idx}_tl_col"], row[f"img{img1_idx}_tl_row"]),
                (row[f"img{img1_idx}_br_col"], row[f"img{img1_idx}_br_row"]),
            ),
            (
                (row[f"img{img2_idx}_tl_col"], row[f"img{img2_idx}_tl_row"]),
                (row[f"img{img2_idx}_br_col"], row[f"img{img2_idx}_br_row"]),
            ),
            (
                (row[f"img{img3_idx}_tl_col"], row[f"img{img3_idx}_tl_row"]),
                (row[f"img{img3_idx}_br_col"], row[f"img{img3_idx}_br_row"]),
            ),
        )
        samples["img1"].append(row[f"img{img1_idx}_url"])
        samples["img1_box"].append(bounding_boxes[0])
        samples["img2"].append(row[f"img{img2_idx}_url"])
        samples["img2_box"].append(bounding_boxes[1])
        samples["img3"].append(row[f"img{img3_idx}_url"])
        samples["img3_box"].append(bounding_boxes[2])
    return samples


In [5]:
def extract_landmarks(image):
    with mp.solutions.face_mesh.FaceMesh(
        static_image_mode=True,
        max_num_faces=1,
        refine_landmarks=True,
        min_detection_confidence=0.5,
    ) as face_mesh:
        results = face_mesh.process(image.numpy())
        if results.multi_face_landmarks:
            landmarks = results.multi_face_landmarks[0]
            landmarks = np.array(
                [[lm.x, lm.y, lm.z] for lm in landmarks.landmark], dtype=np.float32
            )
            landmarks = landmarks.flatten()
        else:
            landmarks = np.zeros(478 * 3, dtype=np.float32)
    return landmarks


def preprocess_image(filename: str, tl: Tuple[float, float], br: Tuple[float, float]):
    image_string = tf.io.read_file(tf.strings.join([IMAGE_DIR, "/", filename]))
    image = tf.image.decode_jpeg(image_string, channels=3)
    image = tf.image.convert_image_dtype(image, tf.uint8)

    # crop image
    tl = tf.cast(tf.multiply(tl, tf.cast(tf.shape(image)[:2][::-1], tf.float32)), tf.int32)
    br = tf.cast(tf.multiply(br, tf.cast(tf.shape(image)[:2][::-1], tf.float32)), tf.int32)
    image = tf.image.crop_to_bounding_box(
        image, tl[1], tl[0], br[1] - tl[1], br[0] - tl[0]
    )

    # extract landmarks using facemesh
    return tf.py_function(extract_landmarks, [image], tf.float32)
    # return image


ImgType = Tuple[str, Tuple[float, float], Tuple[float, float]]


def preprocess_triplets(triplet: dict):
    anchor: ImgType = (triplet["img1"], triplet["img1_box"][0], triplet["img1_box"][1])
    positive: ImgType = (
        triplet["img2"],
        triplet["img2_box"][0],
        triplet["img2_box"][1],
    )
    negative: ImgType = (
        triplet["img3"],
        triplet["img3_box"][0],
        triplet["img3_box"][1],
    )
    return (
        preprocess_image(*anchor),
        preprocess_image(*positive),
        preprocess_image(*negative),
    )


In [6]:
if not TRAINING_DATASET:
    df = fecnet_dataset_loader("data/faceexp-comparison-data-train-public.csv")
    image_count = len(df)

    dataset = tf.data.Dataset.from_tensor_slices(
        df
    )

    dataset = dataset.shuffle(buffer_size=1024)
    dataset = dataset.map(preprocess_triplets)
else:
    dataset = tf.data.Dataset.load(TRAINING_DATASET)

train_dataset = dataset.take(round(image_count * 0.8))
val_dataset = dataset.skip(round(image_count * 0.8))

train_dataset = train_dataset.batch(32, drop_remainder=False)
train_dataset = train_dataset.prefetch(8)

val_dataset = val_dataset.batch(32, drop_remainder=False)
val_dataset = val_dataset.prefetch(8)

Pandas Apply:   0%|          | 0/10000 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/10000 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/10000 [00:00<?, ?it/s]

Pandas Apply:   0%|          | 0/8143 [00:00<?, ?it/s]

2023-04-19 13:04:21.919175: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


## Data Visualisation

In [10]:
def visualise_face_mesh(landmarks):
    landmarks = landmarks.reshape(-1, 3)
    fig = go.Figure(
        go.Mesh3d(
            x=landmarks[:, 0],
            y=landmarks[:, 1],
            z=landmarks[:, 2],
            color="lightpink",
            opacity=0.50,
            contour=dict(  
                color="grey",
                width=1,
            ),
        )
    )
    fig.show()

def visualise_face_mesh_triplets(anchor, positive, negative):
    fig = make_subplots(
        rows=anchor.shape[0],
        cols=3,
        specs=[[{"type": "surface"}, {"type": "surface"}, {"type": "surface"}]],
    )
    for i in range(anchor.shape[0]):
        fig.add_trace(
            go.Mesh3d(
                x=anchor[i, :, 0],
                y=anchor[i, :, 1],
                z=anchor[i, :, 2],
                color="lightpink",
                opacity=0.50,
            ),
            row=i + 1,
            col=1,
        )
        fig.add_trace(
            go.Mesh3d(
                x=positive[i, :, 0],
                y=positive[i, :, 1],
                z=positive[i, :, 2],
                color="lightpink",
                opacity=0.50,
            ),
            row=i + 1,
            col=2,
        )
        fig.add_trace(
            go.Mesh3d(
                x=negative[i, :, 0],
                y=negative[i, :, 1],
                z=negative[i, :, 2],
                color="lightpink",
                opacity=0.50,
            ),
            row=i + 1,
            col=3,
        )
    fig.show()

def visualise_image(image):
    fig = px.imshow(image)
    fig.show()


def visualise_triplet(anchor, positive, negative):
    pass


In [160]:
# ds_samples = list(dataset.as_numpy_iterator())

In [161]:
# visualise_face_mesh(ds_samples[30][0])

## Model Definition

In [11]:
def create_embedding_model():
    input_layer = layers.Input(shape=(478, 3))
    flatten = layers.Flatten()(input_layer)
    dense1 = layers.Dense(512, activation="relu")(flatten)
    dense1 = layers.BatchNormalization()(dense1)
    dense2 = layers.Dense(256, activation="relu")(dense1)
    dense2 = layers.BatchNormalization()(dense2)
    output = layers.Dense(16)(dense2)

    embedding = Model(input_layer, output, name="Embedding")
    return embedding

In [12]:
embedding = create_embedding_model()
embedding.summary()

Model: "Embedding"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 478, 3)]          0         
                                                                 
 flatten (Flatten)           (None, 1434)              0         
                                                                 
 dense (Dense)               (None, 512)               734720    
                                                                 
 batch_normalization (BatchN  (None, 512)              2048      
 ormalization)                                                   
                                                                 
 dense_1 (Dense)             (None, 256)               131328    
                                                                 
 batch_normalization_1 (Batc  (None, 256)              1024      
 hNormalization)                                         

In [13]:
class DistanceLayer(layers.Layer):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)


In [14]:
anchor_input = layers.Input(name="anchor", shape=(478,) + (3,))
positive_input = layers.Input(name="positive", shape=(478,) + (3,))
negative_input = layers.Input(name="negative", shape=(478,) + (3,))

distances = DistanceLayer()(
    embedding(anchor_input),
    embedding(positive_input),
    embedding(negative_input),
)

siamese_network = Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
)

In [15]:
class SiameseModel(Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=0.2):
        super().__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")
        self.acc_tracker = metrics.Mean(name="accuracy")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape is a context manager that records every operation that
        # you do inside. We are using it here to compute the loss so we can get
        # the gradients and apply them using the optimizer specified in
        # `compile()`.
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        # Storing the gradients of the loss function with respect to the
        # weights/parameters.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)

        # Applying the gradients on the model using the specified optimizer
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # Let's update and return the training loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        # Let's update and return the loss metric.
        self.loss_tracker.update_state(loss)
        self.acc_tracker.update_state(
            self._compute_accuracy(data)
        )
        return {"loss": self.loss_tracker.result(), "accuracy": self.acc_tracker.result()}

    def _compute_loss(self, data):
        # The output of the network is a tuple containing the distances
        # between the anchor and the positive example, and the anchor and
        # the negative example.
        ap_distance, an_distance = self.siamese_network(data)

        # Computing the Triplet Loss by subtracting both distances and
        # making sure we don't get a negative value.
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss
    
    def _compute_accuracy(self, data):
        ap_distance, an_distance = self.siamese_network(data)
        return tf.cast(ap_distance < an_distance, tf.float32)

    @property
    def metrics(self):
        # We need to list our metrics here so the `reset_states()` can be
        # called automatically.
        return [self.loss_tracker, self.acc_tracker]

## Model Fitting

In [16]:
siamese_model = SiameseModel(siamese_network)
siamese_model.compile(optimizer=optimizers.Adam(0.0005), weighted_metrics=["accuracy"])
siamese_model.fit(train_dataset, epochs=40, validation_data=val_dataset)

Epoch 1/40
Epoch 2/40

KeyboardInterrupt: 

## Evaluate Model

In [None]:
test_df = fecnet_dataset_loader("data/faceexp-comparison-data-test-public.csv")

test_dataset = tf.data.Dataset.from_tensor_slices(
    test_df
)

test_dataset = test_dataset.map(preprocess_triplets)

In [None]:
siamese_model.evaluate(test_dataset)

In [168]:
sample = next(iter(train_dataset))
# visualise_face_mesh_triplet(*sample)

anchor, positive, negative = sample
anchor_embedding, positive_embedding, negative_embedding = (
    embedding(tf.reshape(anchor, (-1, 478,3))),
    embedding(tf.reshape(positive, (-1, 478,3))),
    embedding(tf.reshape(negative, (-1, 478,3))),
)
cosine_similarity = metrics.CosineSimilarity()

positive_similarity = cosine_similarity(anchor_embedding, positive_embedding)
print("Positive similarity:", positive_similarity.numpy())

negative_similarity = cosine_similarity(anchor_embedding, negative_embedding)
print("Negative similarity", negative_similarity.numpy())

positive_negative_similarity = cosine_similarity(positive_embedding, negative_embedding)
print("Positive-Negative similarity", positive_negative_similarity.numpy())

Positive similarity: 0.96082336
Negative similarity 0.85784876
Positive-Negative similarity 0.8386482


In [8]:
# tf.data.Dataset.save(dataset, "training_dataset")

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
