|
"""A local gradio app that detect matching images using FHE.""" |
|
|
|
from PIL import Image |
|
import os |
|
import shutil |
|
import subprocess |
|
import time |
|
import gradio as gr |
|
import numpy |
|
import requests |
|
from itertools import chain |
|
|
|
from common import ( |
|
AVAILABLE_MATCHERS, |
|
CLIENT_TMP_PATH, |
|
ENCRYPTED_OUTPUT_NAME, |
|
ENCRYPTED_QUERY_NAME, |
|
ENCRYPTED_REFERENCE_NAME, |
|
SERVER_TMP_PATH, |
|
EXAMPLES, |
|
MATCHERS_PATH, |
|
INPUT_SHAPE, |
|
KEYS_PATH, |
|
REPO_DIR, |
|
SERVER_URL, |
|
) |
|
from client_server_interface import FHEClient |
|
|
|
|
|
subprocess.Popen(["uvicorn", "server:app"], cwd=REPO_DIR) |
|
time.sleep(3) |
|
|
|
|
|
def decrypt_output_with_wrong_key(encrypted_image, matcher_name): |
|
"""Decrypt the encrypted output using a different private key.""" |
|
|
|
matcher_path = MATCHERS_PATH / f"{matcher_name}/deployment" |
|
|
|
|
|
wrong_client = FHEClient(matcher_path, matcher_name) |
|
wrong_client.generate_private_and_evaluation_keys(force=True) |
|
|
|
|
|
output_result = wrong_client.deserialize_decrypt_post_process(encrypted_image) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return output_result |
|
|
|
|
|
def shorten_bytes_object(bytes_object, limit=500): |
|
"""Shorten the input bytes object to a given length. |
|
|
|
Encrypted data is too large for displaying it in the browser using Gradio. This function |
|
provides a shorten representation of it. |
|
|
|
Args: |
|
bytes_object (bytes): The input to shorten |
|
limit (int): The length to consider. Default to 500. |
|
|
|
Returns: |
|
str: Hexadecimal string shorten representation of the input byte object. |
|
|
|
""" |
|
|
|
shift = 100 |
|
return bytes_object[shift : limit + shift].hex() |
|
|
|
|
|
def get_client(user_id, matcher_name): |
|
"""Get the client API. |
|
|
|
Args: |
|
user_id (int): The current user's ID. |
|
matcher_name (str): The matcher chosen by the user |
|
|
|
Returns: |
|
FHEClient: The client API. |
|
""" |
|
return FHEClient( |
|
MATCHERS_PATH / f"{matcher_name}/deployment", |
|
matcher_name, |
|
key_dir=KEYS_PATH / f"{matcher_name}_{user_id}", |
|
) |
|
|
|
|
|
def get_client_file_path(name, user_id, matcher_name): |
|
"""Get the correct temporary file path for the client. |
|
|
|
Args: |
|
name (str): The desired file name. |
|
user_id (int): The current user's ID. |
|
matcher_name (str): The matcher chosen by the user |
|
|
|
Returns: |
|
pathlib.Path: The file path. |
|
""" |
|
return CLIENT_TMP_PATH / f"{name}_{matcher_name}_{user_id}" |
|
|
|
|
|
def clean_temporary_files(n_keys=20): |
|
"""Clean keys and encrypted images. |
|
|
|
A maximum of n_keys keys and associated temporary files are allowed to be stored. Once this |
|
limit is reached, the oldest files are deleted. |
|
|
|
Args: |
|
n_keys (int): The maximum number of keys and associated files to be stored. Default to 20. |
|
|
|
""" |
|
|
|
key_dirs = sorted(KEYS_PATH.iterdir(), key=os.path.getmtime) |
|
|
|
|
|
user_ids = [] |
|
if len(key_dirs) > n_keys: |
|
n_keys_to_delete = len(key_dirs) - n_keys |
|
for key_dir in key_dirs[:n_keys_to_delete]: |
|
user_ids.append(key_dir.name) |
|
shutil.rmtree(key_dir) |
|
|
|
|
|
client_files = CLIENT_TMP_PATH.iterdir() |
|
server_files = SERVER_TMP_PATH.iterdir() |
|
|
|
|
|
for file in chain(client_files, server_files): |
|
for user_id in user_ids: |
|
if user_id in file.name: |
|
file.unlink() |
|
|
|
|
|
def keygen(matcher_name): |
|
"""Generate the private key associated to a matcher. |
|
|
|
Args: |
|
matcher_name (str): The current matcher to consider. |
|
|
|
Returns: |
|
(user_id, True) (Tuple[int, bool]): The current user's ID and a boolean used for visual display. |
|
|
|
""" |
|
|
|
clean_temporary_files() |
|
|
|
|
|
user_id = numpy.random.randint(0, 2**32) |
|
|
|
|
|
client = get_client(user_id, matcher_name) |
|
|
|
|
|
client.generate_private_and_evaluation_keys(force=True) |
|
|
|
|
|
|
|
|
|
evaluation_key = client.get_serialized_evaluation_keys() |
|
|
|
|
|
|
|
evaluation_key_path = get_client_file_path("evaluation_key", user_id, matcher_name) |
|
|
|
with evaluation_key_path.open("wb") as evaluation_key_file: |
|
evaluation_key_file.write(evaluation_key) |
|
|
|
return (user_id, True) |
|
|
|
|
|
def encrypt( |
|
user_id, input_image, matcher_name, encrypted_image_name: str = "encrypted_image" |
|
): |
|
"""Encrypt the given image for a specific user and matcher. |
|
|
|
Args: |
|
user_id (int): The current user's ID. |
|
input_image (numpy.ndarray): The image to encrypt. |
|
matcher_name (str): The current matcher to consider. |
|
encrypted_image_name (str): how to name the encrypted image |
|
to distinguish between the query and the reference images. |
|
Defaults to "encrypted_image" |
|
|
|
Returns: |
|
(input_image, encrypted_image_short) (Tuple[bytes]): The encrypted image and one of its |
|
representation. |
|
|
|
""" |
|
if user_id == "": |
|
raise gr.Error("Please generate the private key first.") |
|
|
|
if input_image is None: |
|
raise gr.Error("Please choose an image first.") |
|
|
|
if input_image.shape[-1] not in {3, 4}: |
|
raise ValueError( |
|
f"Input image must have 3 channels (RGB) or 4 channels. Current shape: {input_image.shape}" |
|
) |
|
|
|
if input_image.shape[-1] == 4: |
|
|
|
|
|
input_image = input_image[:, :, :3] |
|
|
|
|
|
if input_image.shape != (INPUT_SHAPE[0], INPUT_SHAPE[1], 3): |
|
input_image_pil = Image.fromarray(input_image) |
|
input_image_pil = input_image_pil.resize((INPUT_SHAPE[0], INPUT_SHAPE[1])) |
|
input_image = numpy.array(input_image_pil) |
|
|
|
|
|
client = get_client(user_id, matcher_name) |
|
|
|
|
|
encrypted_image = client.encrypt_serialize(input_image) |
|
|
|
|
|
|
|
encrypted_image_path = get_client_file_path( |
|
encrypted_image_name, user_id, matcher_name |
|
) |
|
|
|
with encrypted_image_path.open("wb") as encrypted_image_file: |
|
encrypted_image_file.write(encrypted_image) |
|
|
|
|
|
encrypted_image_short = shorten_bytes_object(encrypted_image) |
|
|
|
return (resize_img(input_image), encrypted_image_short) |
|
|
|
|
|
def send_input(user_id, matcher_name): |
|
"""Send the encrypted input images as well as the evaluation key to the server. |
|
|
|
Args: |
|
user_id (int): The current user's ID. |
|
matcher_name (str): The current matcher to consider. |
|
""" |
|
|
|
evaluation_key_path = get_client_file_path("evaluation_key", user_id, matcher_name) |
|
|
|
if user_id == "" or not evaluation_key_path.is_file(): |
|
raise gr.Error("Please generate the private key first.") |
|
|
|
encrypted_query_image_path = get_client_file_path( |
|
ENCRYPTED_QUERY_NAME, user_id, matcher_name |
|
) |
|
|
|
encrypted_reference_image_path = get_client_file_path( |
|
ENCRYPTED_REFERENCE_NAME, user_id, matcher_name |
|
) |
|
|
|
for encrypted_input_path in { |
|
encrypted_query_image_path, |
|
encrypted_reference_image_path, |
|
}: |
|
if not encrypted_input_path.is_file(): |
|
raise gr.Error( |
|
f"Please generate the private key and then encrypt an image first: {encrypted_input_path}" |
|
) |
|
|
|
|
|
data = { |
|
"user_id": user_id, |
|
"matcher": matcher_name, |
|
} |
|
|
|
files = [ |
|
("files", open(encrypted_query_image_path, "rb")), |
|
("files", open(encrypted_reference_image_path, "rb")), |
|
("files", open(evaluation_key_path, "rb")), |
|
] |
|
|
|
|
|
url = SERVER_URL + "send_input" |
|
with requests.post( |
|
url=url, |
|
data=data, |
|
files=files, |
|
) as response: |
|
return response.ok |
|
|
|
|
|
def run_fhe(user_id, matcher_name): |
|
"""Apply the matcher on the encrypted image previously sent using FHE. |
|
|
|
Args: |
|
user_id (int): The current user's ID. |
|
matcher_name (str): The current matcher to consider. |
|
""" |
|
data = { |
|
"user_id": user_id, |
|
"matcher": matcher_name, |
|
} |
|
|
|
|
|
url = SERVER_URL + "run_fhe" |
|
with requests.post( |
|
url=url, |
|
data=data, |
|
) as response: |
|
if response.ok: |
|
return response.json() |
|
else: |
|
print(f"ERROR run_fhe: {response}") |
|
|
|
raise gr.Error("Please wait for the input images to be sent to the server.") |
|
|
|
|
|
def get_output(user_id, matcher_name): |
|
"""Retrieve the encrypted output. |
|
|
|
Args: |
|
user_id (int): The current user's ID. |
|
matcher_name (str): The current filter to consider. |
|
|
|
Returns: |
|
encrypted_output_short (bytes): A representation of the encrypted result. |
|
|
|
""" |
|
data = { |
|
"user_id": user_id, |
|
"matcher": matcher_name, |
|
} |
|
|
|
|
|
url = SERVER_URL + "get_output" |
|
with requests.post( |
|
url=url, |
|
data=data, |
|
) as response: |
|
if response.ok: |
|
encrypted_output = response.content |
|
|
|
|
|
|
|
encrypted_output_path = get_client_file_path( |
|
ENCRYPTED_OUTPUT_NAME, user_id, matcher_name |
|
) |
|
|
|
with encrypted_output_path.open("wb") as encrypted_output_file: |
|
encrypted_output_file.write(encrypted_output) |
|
|
|
|
|
output_representation = decrypt_output_with_wrong_key( |
|
encrypted_output, matcher_name |
|
) |
|
|
|
return { |
|
encrypted_output_representation: gr.update( |
|
value=output_representation |
|
|
|
) |
|
} |
|
|
|
else: |
|
raise gr.Error("Please wait for the FHE execution to be completed.") |
|
|
|
|
|
def decrypt_output(user_id, matcher_name): |
|
"""Decrypt the result. |
|
|
|
Args: |
|
user_id (int): The current user's ID. |
|
matcher_name (str): The current matcher to consider. |
|
|
|
Returns: |
|
(output_image, False, False) ((Tuple[numpy.ndarray, bool, bool]): The decrypted output, as |
|
well as two booleans used for resetting Gradio checkboxes |
|
|
|
""" |
|
if user_id == "": |
|
raise gr.Error("Please generate the private key first.") |
|
|
|
|
|
encrypted_output_path = get_client_file_path( |
|
ENCRYPTED_OUTPUT_NAME, user_id, matcher_name |
|
) |
|
|
|
if not encrypted_output_path.is_file(): |
|
raise gr.Error("Please run the FHE execution first.") |
|
|
|
|
|
with encrypted_output_path.open("rb") as encrypted_output_file: |
|
encrypted_output = encrypted_output_file.read() |
|
|
|
|
|
client = get_client(user_id, matcher_name) |
|
|
|
|
|
decrypted_ouput = client.deserialize_decrypt_post_process(encrypted_output) |
|
|
|
print(f"Decrypted output: {decrypted_ouput.shape=}") |
|
|
|
return {output_result: gr.update(value=decrypted_ouput)} |
|
|
|
|
|
def resize_img(img, width=256, height=256): |
|
"""Resize the image.""" |
|
if img.dtype != numpy.uint8: |
|
img = img.astype(numpy.uint8) |
|
img_pil = Image.fromarray(img) |
|
|
|
resized_img_pil = img_pil.resize((width, height)) |
|
|
|
return numpy.array(resized_img_pil) |
|
|
|
|
|
demo = gr.Blocks() |
|
|
|
|
|
print("Starting the demo...") |
|
with demo: |
|
gr.Markdown( |
|
""" |
|
<!--p align="center"> |
|
<img width=200 src="https://user-images.githubusercontent.com/5758427/197816413-d9cddad3-ba38-4793-847d-120975e1da11.png"> |
|
</p--> |
|
<h1 align="center"> #ppaihackteam14 </h1> |
|
<h1 align="center">Biometric image matching Using Fully Homomorphic Encryption</h1> |
|
<p align="center"> |
|
<a href="https://github.com/zama-ai/concrete-ml"> <img style="vertical-align: middle; display:inline-block; margin-right: 3px;" width=15 src="https://user-images.githubusercontent.com/5758427/197972109-faaaff3e-10e2-4ab6-80f5-7531f7cfb08f.png">Concrete-ML</a> |
|
— |
|
<a href="https://docs.zama.ai/concrete-ml"> <img style="vertical-align: middle; display:inline-block; margin-right: 3px;" width=15 src="https://user-images.githubusercontent.com/5758427/197976802-fddd34c5-f59a-48d0-9bff-7ad1b00cb1fb.png">Documentation</a> |
|
— |
|
<a href="https://zama.ai/community"> <img style="vertical-align: middle; display:inline-block; margin-right: 3px;" width=15 src="https://user-images.githubusercontent.com/5758427/197977153-8c9c01a7-451a-4993-8e10-5a6ed5343d02.png">Community</a> |
|
— |
|
<a href="https://twitter.com/zama_fhe"> <img style="vertical-align: middle; display:inline-block; margin-right: 3px;" width=15 src="https://user-images.githubusercontent.com/5758427/197975044-bab9d199-e120-433b-b3be-abd73b211a54.png">@zama_fhe</a> |
|
</p> |
|
<!--p align="center"> |
|
<img src="https://user-images.githubusercontent.com/56846628/219605302-5baafac4-cf6f-4f06-9a96-91cef2b84a63.png" width="70%" height="70%"> |
|
</p--> |
|
""" |
|
) |
|
|
|
gr.Markdown("## Client side") |
|
gr.Markdown("### Step 1: Upload input images. ") |
|
gr.Markdown( |
|
f"The image will automatically be resized to shape ({INPUT_SHAPE[0]}x{INPUT_SHAPE[1]}). " |
|
"The image here, however, is displayed in its original resolution. The true image used " |
|
"in this demo can be seen in Step 8." |
|
) |
|
gr.Markdown("The query image to certify.") |
|
with gr.Row(): |
|
input_query_img = gr.Image( |
|
value=None, |
|
label="Upload an image here.", |
|
height=256, |
|
width=256, |
|
source="upload", |
|
interactive=True, |
|
) |
|
|
|
examples = gr.Examples( |
|
examples=EXAMPLES, |
|
inputs=[input_query_img], |
|
examples_per_page=5, |
|
label="Examples to use.", |
|
) |
|
gr.Markdown("The reference image.") |
|
with gr.Row(): |
|
input_reference_img = gr.Image( |
|
value=None, |
|
label="Upload an image here.", |
|
height=256, |
|
width=256, |
|
source="upload", |
|
interactive=True, |
|
) |
|
|
|
examples = gr.Examples( |
|
examples=EXAMPLES, |
|
inputs=[input_reference_img], |
|
examples_per_page=5, |
|
label="Examples to use.", |
|
) |
|
|
|
gr.Markdown("### Step 2: Choose your matcher.") |
|
matcher_name = gr.Dropdown( |
|
choices=AVAILABLE_MATCHERS, |
|
value="random guessing", |
|
label="Choose your matcher", |
|
interactive=True, |
|
) |
|
|
|
gr.Markdown("#### Notes") |
|
gr.Markdown( |
|
""" |
|
- The private key is used to encrypt and decrypt the data and will never be shared. |
|
- No public key is required for these matcher operators. |
|
""" |
|
) |
|
|
|
gr.Markdown("### Step 3: Generate the private key.") |
|
keygen_button = gr.Button("Generate the private key.") |
|
|
|
with gr.Row(): |
|
keygen_checkbox = gr.Checkbox(label="Private key generated:", interactive=False) |
|
|
|
user_id = gr.Textbox(label="", max_lines=2, interactive=False, visible=False) |
|
encrypted_query_image = gr.Textbox( |
|
value=ENCRYPTED_QUERY_NAME, |
|
label="", |
|
max_lines=2, |
|
interactive=False, |
|
visible=False, |
|
) |
|
encrypted_reference_image = gr.Textbox( |
|
value=ENCRYPTED_REFERENCE_NAME, |
|
label="", |
|
max_lines=2, |
|
interactive=False, |
|
visible=False, |
|
) |
|
|
|
gr.Markdown("### Step 4: Encrypt the images using FHE.") |
|
encrypt_query_button = gr.Button("Encrypt the query image using FHE.") |
|
|
|
with gr.Row(): |
|
encrypted_input_query = gr.Textbox( |
|
label="Encrypted input query representation:", |
|
max_lines=2, |
|
interactive=False, |
|
) |
|
|
|
encrypt_reference_button = gr.Button("Encrypt the reference image using FHE.") |
|
with gr.Row(): |
|
encrypted_input_reference = gr.Textbox( |
|
label="Encrypted input reference representation:", |
|
max_lines=2, |
|
interactive=False, |
|
) |
|
|
|
gr.Markdown("## Server side") |
|
gr.Markdown( |
|
"The encrypted value is received by the server. The server can then compute the matcher " |
|
"directly over encrypted values. Once the computation is finished, the server returns " |
|
"the encrypted results to the client." |
|
) |
|
|
|
gr.Markdown("### Step 5: Send the encrypted images to the server.") |
|
send_input_button = gr.Button("Send the encrypted images to the server.") |
|
send_input_checkbox = gr.Checkbox(label="Encrypted images sent.", interactive=False) |
|
|
|
gr.Markdown("### Step 6: Run FHE execution.") |
|
execute_fhe_button = gr.Button("Run FHE execution.") |
|
fhe_execution_time = gr.Textbox( |
|
label="Total FHE execution time (in seconds):", max_lines=1, interactive=False |
|
) |
|
|
|
gr.Markdown("### Step 7: Receive the encrypted output from the server.") |
|
gr.Markdown( |
|
"The result displayed here is the encrypted result sent by the server, which has been " |
|
"decrypted using a different private key. This is only used to visually represent an " |
|
"encrypted result." |
|
) |
|
get_output_button = gr.Button( |
|
"Receive the encrypted output result from the server." |
|
) |
|
|
|
with gr.Row(): |
|
encrypted_output_representation = gr.Label() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gr.Markdown("## Client side") |
|
gr.Markdown( |
|
"The encrypted output is sent back to the client, who can finally decrypt it with the " |
|
"private key. Only the client is aware of the original input images and the result of the matching." |
|
) |
|
|
|
gr.Markdown("### Step 8: Decrypt the output.") |
|
gr.Markdown( |
|
"The images displayed on the left are the input images used during the demo. The output result " |
|
"can be seen on the right." |
|
) |
|
decrypt_button = gr.Button("Decrypt the output") |
|
|
|
|
|
with gr.Row(): |
|
original_query_image = gr.Image( |
|
input_query_img.value, |
|
label=f"Input query image ({INPUT_SHAPE[0]}x{INPUT_SHAPE[1]}):", |
|
interactive=False, |
|
height=256, |
|
width=256, |
|
) |
|
original_reference_image = gr.Image( |
|
input_reference_img.value, |
|
label=f"Input reference image ({INPUT_SHAPE[0]}x{INPUT_SHAPE[1]}):", |
|
interactive=False, |
|
height=256, |
|
width=256, |
|
) |
|
output_result = gr.Label() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
keygen_button.click( |
|
keygen, |
|
inputs=[matcher_name], |
|
outputs=[user_id, keygen_checkbox], |
|
) |
|
|
|
|
|
encrypt_query_button.click( |
|
encrypt, |
|
inputs=[user_id, input_query_img, matcher_name, encrypted_query_image], |
|
outputs=[original_query_image, encrypted_input_query], |
|
) |
|
|
|
|
|
encrypt_reference_button.click( |
|
encrypt, |
|
inputs=[user_id, input_reference_img, matcher_name, encrypted_reference_image], |
|
outputs=[original_reference_image, encrypted_input_reference], |
|
) |
|
|
|
|
|
send_input_button.click( |
|
send_input, inputs=[user_id, matcher_name], outputs=[send_input_checkbox] |
|
) |
|
|
|
|
|
execute_fhe_button.click( |
|
run_fhe, inputs=[user_id, matcher_name], outputs=[fhe_execution_time] |
|
) |
|
|
|
|
|
get_output_button.click( |
|
get_output, |
|
inputs=[user_id, matcher_name], |
|
outputs=[encrypted_output_representation], |
|
) |
|
|
|
|
|
decrypt_button.click( |
|
decrypt_output, |
|
inputs=[user_id, matcher_name], |
|
outputs=[output_result, keygen_checkbox, send_input_checkbox], |
|
) |
|
|
|
gr.Markdown( |
|
"The app was built with [Concrete-ML](https://github.com/zama-ai/concrete-ml), a " |
|
"Privacy-Preserving Machine Learning (PPML) open-source set of tools by [Zama](https://zama.ai/). " |
|
"Try it yourself and don't forget to star on Github ⭐." |
|
) |
|
|
|
demo.launch(share=False) |
|
|