BigBros / server.py
Thomas Chardonnens
add background job
3ab9bf6
raw
history blame
4.21 kB
"""Server that will listen for GET and POST requests from the client."""
import time
from typing import List
from fastapi import FastAPI, File, Form, UploadFile
from fastapi.responses import JSONResponse, Response
import asyncio
from common import SERVER_TMP_PATH, SEIZURE_DETECTION_MODEL_PATH
from client_server_interface import FHEServer
# Load the server object for seizure detection
FHE_SERVER = FHEServer(model_path=SEIZURE_DETECTION_MODEL_PATH)
def get_server_file_path(name, user_id):
"""Get the correct temporary file path for the server.
Args:
name (str): The desired file name.
user_id (int): The current user's ID.
Returns:
pathlib.Path: The file path.
"""
return SERVER_TMP_PATH / f"{name}_seizure_detection_{user_id}"
# Initialize an instance of FastAPI
app = FastAPI()
# Define the default route
@app.get("/")
def root():
return {"message": "Welcome to Your Seizure Detection FHE Server!"}
@app.post("/send_input")
def send_input(
user_id: str = Form(),
files: List[UploadFile] = File(),
):
"""Send the inputs to the server."""
# Retrieve the encrypted input image and the evaluation key paths
encrypted_image_path = get_server_file_path("encrypted_image", user_id)
evaluation_key_path = get_server_file_path("evaluation_key", user_id)
# Write the files using the above paths
with encrypted_image_path.open("wb") as encrypted_image, evaluation_key_path.open(
"wb"
) as evaluation_key:
encrypted_image.write(files[0].file.read())
evaluation_key.write(files[1].file.read())
@app.post("/run_fhe")
async def run_fhe(
user_id: str = Form(),
):
"""Execute seizure detection on the encrypted input image using FHE."""
# Retrieve the encrypted input image and the evaluation key paths
encrypted_image_path = get_server_file_path("encrypted_image", user_id)
evaluation_key_path = get_server_file_path("evaluation_key", user_id)
# Read the files using the above paths
with encrypted_image_path.open("rb") as encrypted_image_file, evaluation_key_path.open(
"rb"
) as evaluation_key_file:
encrypted_image = encrypted_image_file.read()
evaluation_key = evaluation_key_file.read()
# Run the FHE execution in a background task
async def run_fhe_task():
start = time.time()
encrypted_output = FHE_SERVER.run(encrypted_image, evaluation_key)
fhe_execution_time = round(time.time() - start, 2)
# Retrieve the encrypted output path
encrypted_output_path = get_server_file_path("encrypted_output", user_id)
# Write the file using the above path
with encrypted_output_path.open("wb") as encrypted_output_file:
encrypted_output_file.write(encrypted_output)
return fhe_execution_time
# Start the background task
task = asyncio.create_task(run_fhe_task())
# Return a response immediately
return JSONResponse(content={"message": "FHE execution started", "task_id": str(id(task))})
@app.get("/fhe_status/{task_id}")
async def fhe_status(task_id: str):
"""Check the status of an FHE execution task and return the execution time if completed."""
for task in asyncio.all_tasks():
if str(id(task)) == task_id:
if task.done():
try:
execution_time = task.result()
return JSONResponse(content={"status": "completed", "execution_time": execution_time})
except Exception as e:
return JSONResponse(content={"status": "error", "message": str(e)})
else:
return JSONResponse(content={"status": "running"})
return JSONResponse(content={"status": "not_found"})
@app.post("/get_output")
def get_output(
user_id: str = Form(),
):
"""Retrieve the encrypted output."""
# Retrieve the encrypted output path
encrypted_output_path = get_server_file_path("encrypted_output", user_id)
# Read the file using the above path
with encrypted_output_path.open("rb") as encrypted_output_file:
encrypted_output = encrypted_output_file.read()
return Response(encrypted_output)