natexcvi commited on
Commit
b87deef
1 Parent(s): 700b49d

Impl. + tests

Browse files
.dockerignore ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ .*
2
+ testdata
3
+ test_service.py
4
+ __pycache__
.gitignore ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ .venv
2
+ .env
3
+ __pycache__
4
+ .vscode
Dockerfile ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ FROM python:3.7
2
+
3
+ WORKDIR /code
4
+
5
+ COPY ./requirements.txt /code/requirements.txt
6
+
7
+ RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
8
+
9
+ COPY . .
10
+
11
+ CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "7860"]
app.py ADDED
@@ -0,0 +1,65 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from typing import Union
3
+
4
+ from fastapi import (
5
+ Depends,
6
+ FastAPI,
7
+ File,
8
+ HTTPException,
9
+ Query,
10
+ Response,
11
+ UploadFile,
12
+ status,
13
+ )
14
+
15
+ from model import Model
16
+
17
+ app = FastAPI()
18
+
19
+ model = Model(
20
+ os.getenv("MODEL_REPO_ID", ""),
21
+ os.getenv("MODEL_FILENAME", ""),
22
+ os.getenv("HF_TOKEN", ""),
23
+ )
24
+
25
+
26
+ async def validate_token(
27
+ token: Union[str, None] = Query(default=None),
28
+ ):
29
+ if token is None:
30
+ raise HTTPException(status.HTTP_401_UNAUTHORIZED, "No token provided")
31
+ if token != os.getenv("CLIENT_TOKEN"):
32
+ raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token")
33
+ return token
34
+
35
+
36
+ @app.post("/embed", status_code=status.HTTP_200_OK)
37
+ async def calculate_embedding(
38
+ image: UploadFile = File(...), _: str = Depends(validate_token)
39
+ ):
40
+ try:
41
+ image_content = await image.read()
42
+ pred = model.predict(model.preprocess(image_content))
43
+ return {"embedding": pred.tolist()}
44
+ except Exception as e:
45
+ return Response(
46
+ content=str(e), status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
47
+ )
48
+
49
+
50
+ @app.post("/similarity", status_code=status.HTTP_200_OK)
51
+ async def calculate_similarity_score(
52
+ image1: UploadFile = File(...),
53
+ image2: UploadFile = File(...),
54
+ _: str = Depends(validate_token),
55
+ ):
56
+ try:
57
+ image1_content = await image1.read()
58
+ image2_content = await image2.read()
59
+ pred1 = model.predict(model.preprocess(image1_content))
60
+ pred2 = model.predict(model.preprocess(image2_content))
61
+ return {"score": float(model.distance(pred1, pred2))}
62
+ except Exception as e:
63
+ return Response(
64
+ content=str(e), status_code=status.HTTP_500_INTERNAL_SERVER_ERROR
65
+ )
model.py ADDED
@@ -0,0 +1,165 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import threading
2
+
3
+ import cv2
4
+ import mediapipe as mp
5
+ import numpy as np
6
+ import pandas as pd
7
+ from huggingface_hub import hf_hub_download
8
+ from tensorflow import keras
9
+
10
+
11
+ class Model:
12
+ def __init__(self, model_repo_id: str, model_filename: str, hf_token: str):
13
+ self.landmark_extractor = LandmarkExtractor()
14
+ self.trained_model = self.create_prod_model()
15
+ self.trained_model.compile(
16
+ keras.optimizers.Adam(0.0005),
17
+ loss=self.triplet_loss_init(0.2),
18
+ metrics=[self.triplet_accuracy],
19
+ )
20
+ custom_objects = {
21
+ "triplet_loss": self.triplet_loss_init(0.2),
22
+ "triplet_accuracy": self.triplet_accuracy,
23
+ "K": keras.backend,
24
+ "keras": keras,
25
+ }
26
+ with keras.utils.custom_object_scope(custom_objects):
27
+ weights = keras.models.load_model(
28
+ hf_hub_download(model_repo_id, model_filename, token=hf_token)
29
+ ).get_weights()
30
+ self.trained_model.set_weights(weights)
31
+
32
+ @staticmethod
33
+ def fec_net(inputs):
34
+ x = keras.layers.Dense(1024, activation="relu")(inputs)
35
+ x = keras.layers.Dense(512, activation="relu")(x)
36
+ x = keras.layers.Dense(512, activation="relu")(x)
37
+ x = keras.layers.Dropout(0.5)(x)
38
+ x = keras.layers.Dense(16)(x)
39
+ outputs = keras.layers.Lambda(keras.backend.l2_normalize)(x)
40
+ return outputs
41
+
42
+ @classmethod
43
+ def create_prod_model(cls):
44
+ inputs = keras.layers.Input(shape=(478 * 3,))
45
+ outputs = cls.fec_net(inputs)
46
+ return keras.Model(inputs=inputs, outputs=outputs)
47
+
48
+ @staticmethod
49
+ def triplet_loss_init(alpha):
50
+ def triplet_loss(y_true, y_pred):
51
+ dimensions = 16
52
+ batch_size = y_pred.shape[0]
53
+ s1 = y_pred[:, :dimensions]
54
+ s2 = y_pred[:, dimensions : 2 * dimensions]
55
+ d = y_pred[:, 2 * dimensions :]
56
+
57
+ s1_s2 = keras.backend.sum(keras.backend.square(s1 - s2), axis=1)
58
+ s1_d = keras.backend.sum(keras.backend.square(s1 - d), axis=1)
59
+ s2_d = keras.backend.sum(keras.backend.square(s2 - d), axis=1)
60
+
61
+ loss = keras.backend.maximum(
62
+ 0.0, s1_s2 - s1_d + alpha
63
+ ) + keras.backend.maximum(0.0, s1_s2 - s2_d + alpha)
64
+ loss = keras.backend.mean(loss)
65
+
66
+ return loss
67
+
68
+ return triplet_loss
69
+
70
+ @staticmethod
71
+ def triplet_accuracy(y_true, y_pred):
72
+ dimensions = 16
73
+ s1 = y_pred[:, :dimensions]
74
+ s2 = y_pred[:, dimensions : 2 * dimensions]
75
+ d = y_pred[:, 2 * dimensions :]
76
+
77
+ s1_s2 = keras.backend.sqrt(
78
+ keras.backend.sum(keras.backend.square(s1 - s2), axis=1)
79
+ )
80
+ s1_d = keras.backend.sqrt(
81
+ keras.backend.sum(keras.backend.square(s1 - d), axis=1)
82
+ )
83
+ s2_d = keras.backend.sqrt(
84
+ keras.backend.sum(keras.backend.square(s2 - d), axis=1)
85
+ )
86
+
87
+ s1_match = keras.backend.less(s1_s2, s1_d)
88
+ s2_match = keras.backend.less(s1_s2, s2_d)
89
+ match = keras.backend.cast(
90
+ keras.backend.all(keras.backend.stack([s1_match, s2_match]), axis=0),
91
+ "float32",
92
+ )
93
+
94
+ acc = keras.backend.mean(match, axis=0)
95
+
96
+ return acc
97
+
98
+ @classmethod
99
+ def load_model(cls, model_path: str):
100
+ custom_objects = {
101
+ "triplet_loss": cls.triplet_loss_init(0.2),
102
+ "triplet_accuracy": cls.triplet_accuracy,
103
+ }
104
+ with keras.utils.custom_object_scope(custom_objects):
105
+ return keras.models.load_model(model_path)
106
+
107
+ def predict(self, x: np.ndarray):
108
+ return self.trained_model.predict(x)
109
+
110
+ def preprocess(self, image: bytes) -> np.ndarray:
111
+ array_repr = np.asarray(bytearray(image), dtype=np.uint8)
112
+ decoded_img = cv2.imdecode(array_repr, flags=cv2.IMREAD_COLOR)
113
+ landmarks = self.landmark_extractor.extract_landmarks_flat(decoded_img)
114
+ return pd.DataFrame([landmarks]).to_numpy()
115
+
116
+ @staticmethod
117
+ def distance(x1: np.ndarray, x2: np.ndarray):
118
+ return np.linalg.norm(x1 - x2, ord=2)
119
+
120
+
121
+ class LandmarkExtractor:
122
+ def __init__(self):
123
+ self.lock = threading.Lock()
124
+ self.face_mesh = mp.solutions.face_mesh.FaceMesh(
125
+ static_image_mode=True,
126
+ max_num_faces=1,
127
+ refine_landmarks=True,
128
+ min_detection_confidence=0.5,
129
+ )
130
+
131
+ @staticmethod
132
+ def __normalise_landmarks(landmarks):
133
+ left_eye_idx = 33
134
+ right_eye_idx = 263
135
+ left_eye_v = [landmarks[left_eye_idx].x, landmarks[left_eye_idx].y]
136
+ right_eye_v = [landmarks[right_eye_idx].x, landmarks[right_eye_idx].y]
137
+ xy_norm = np.linalg.solve(
138
+ np.array([left_eye_v, right_eye_v]).T,
139
+ np.array([[lmk.x, lmk.y] for lmk in landmarks]).T,
140
+ ).T
141
+ for i, lmk in enumerate(landmarks):
142
+ lmk.x = xy_norm[i][0]
143
+ lmk.y = xy_norm[i][1]
144
+ return landmarks
145
+
146
+ def extract_landmarks(self, img: np.ndarray):
147
+ results = self.face_mesh.process(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
148
+ if results.multi_face_landmarks is None:
149
+ return None
150
+ return self.__normalise_landmarks(results.multi_face_landmarks[0].landmark)
151
+
152
+ def extract_landmarks_flat(self, img: np.ndarray):
153
+ landmarks = self.extract_landmarks(img)
154
+ if landmarks is None:
155
+ return None
156
+ flat_landmarks = {}
157
+ for i, landmark in enumerate(landmarks):
158
+ flat_landmarks.update(
159
+ {
160
+ f"{i}_x": landmark.x,
161
+ f"{i}_y": landmark.y,
162
+ f"{i}_z": landmark.z,
163
+ }
164
+ )
165
+ return flat_landmarks
requirements.txt ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ huggingface_hub
2
+ tensorflow
3
+ numpy
4
+ fastapi==0.74.*
5
+ requests==2.27.*
6
+ uvicorn[standard]==0.17.*
7
+ python-multipart
8
+ mediapipe
9
+ pandas
10
+ pytest
11
+ python-dotenv
test_service.py ADDED
@@ -0,0 +1,89 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+
3
+ from dotenv import load_dotenv
4
+
5
+ load_dotenv()
6
+
7
+ import pytest
8
+ from fastapi.testclient import TestClient
9
+
10
+
11
+ @pytest.fixture
12
+ def client():
13
+ from app import app
14
+
15
+ clnt = TestClient(app)
16
+ return clnt
17
+
18
+
19
+ @pytest.mark.parametrize(
20
+ "image_path",
21
+ [
22
+ pytest.param("testdata/face_pic.jpeg", id="happy flow"),
23
+ ],
24
+ )
25
+ def test_embed(client: TestClient, image_path):
26
+ with open(image_path, "rb") as image_file:
27
+ response = client.post(
28
+ "/embed",
29
+ files=[("image", (os.path.basename(image_path), image_file))],
30
+ params={"token": os.getenv("CLIENT_TOKEN")},
31
+ )
32
+ assert response.status_code == 200, response.text
33
+ assert "embedding" in response.json()
34
+
35
+
36
+ @pytest.mark.parametrize(
37
+ "image1_path, image2_path, exp_score",
38
+ [
39
+ pytest.param(
40
+ "testdata/face_pic.jpeg",
41
+ "testdata/face_pic.jpeg",
42
+ 0,
43
+ id="same pic",
44
+ ),
45
+ pytest.param(
46
+ "testdata/face_pic.jpeg",
47
+ "testdata/face_pic3.jpeg",
48
+ 0.00026,
49
+ id="similar expression",
50
+ ),
51
+ pytest.param(
52
+ "testdata/face_pic.jpeg",
53
+ "testdata/face_pic2.jpeg",
54
+ 2,
55
+ id="different expression",
56
+ ),
57
+ ],
58
+ )
59
+ def test_similarity(client: TestClient, image1_path, image2_path, exp_score):
60
+ with open(image1_path, "rb") as image1_file, open(image2_path, "rb") as image2_file:
61
+ response = client.post(
62
+ "/similarity",
63
+ files=[
64
+ ("image1", (os.path.basename(image1_path), image1_file)),
65
+ ("image2", (os.path.basename(image2_path), image2_file)),
66
+ ],
67
+ params={"token": os.getenv("CLIENT_TOKEN")},
68
+ )
69
+ assert response.status_code == 200
70
+ assert "score" in response.json()
71
+ if exp_score is not None:
72
+ assert response.json()["score"] == pytest.approx(exp_score, abs=1e-4)
73
+
74
+
75
+ def test_authentication(client: TestClient):
76
+ response = client.post(
77
+ "/embed",
78
+ files=[("image", ("face_pic.jpeg", b""))],
79
+ )
80
+ assert response.status_code == 401
81
+ assert response.json()["detail"] == "No token provided"
82
+
83
+ response = client.post(
84
+ "/embed",
85
+ files=[("image", ("face_pic.jpeg", b""))],
86
+ params={"token": "wrong_token"},
87
+ )
88
+ assert response.status_code == 401
89
+ assert response.json()["detail"] == "Invalid token"
testdata/face_pic.jpeg ADDED
testdata/face_pic2.jpeg ADDED
testdata/face_pic3.jpeg ADDED