|
import json |
|
import os |
|
import time |
|
|
|
import numpy as np |
|
import redis |
|
import settings |
|
from tensorflow.keras.applications import ResNet50 |
|
from tensorflow.keras.applications.resnet50 import decode_predictions, preprocess_input |
|
from tensorflow.keras.preprocessing import image |
|
|
|
|
|
db = redis.Redis( |
|
host=settings.REDIS_IP, port=settings.REDIS_PORT, db=settings.REDIS_DB_ID |
|
) |
|
|
|
|
|
model = ResNet50(include_top=True, weights="imagenet") |
|
|
|
|
|
def predict(image_name): |
|
""" |
|
Load image from the corresponding folder based on the image name |
|
received, then, run our ML model to get predictions. |
|
|
|
Parameters |
|
---------- |
|
image_name : str |
|
Image filename. |
|
|
|
Returns |
|
------- |
|
class_name, pred_probability : tuple(str, float) |
|
Model predicted class as a string and the corresponding confidence |
|
score as a number. |
|
""" |
|
class_name = None |
|
pred_probability = None |
|
|
|
|
|
image_path = os.path.join(settings.UPLOAD_FOLDER, image_name) |
|
|
|
|
|
img = image.load_img(image_path, target_size=(224, 224)) |
|
|
|
|
|
|
|
x = image.img_to_array(img) |
|
|
|
|
|
x_batch = np.expand_dims(x, axis=0) |
|
|
|
|
|
x_batch = preprocess_input(x_batch) |
|
|
|
|
|
predictions = model.predict(x_batch) |
|
|
|
|
|
top_pred = decode_predictions(predictions, top=1)[0][0] |
|
_, class_name, pred_probability = top_pred |
|
|
|
|
|
pred_probability = round(float(pred_probability), 4) |
|
|
|
return class_name, pred_probability |
|
|
|
|
|
def classify_process(): |
|
""" |
|
Loop indefinitely asking Redis for new jobs. |
|
When a new job arrives, takes it from the Redis queue, uses the loaded ML |
|
model to get predictions and stores the results back in Redis using |
|
the original job ID so other services can see it was processed and access |
|
the results. |
|
|
|
Load image from the corresponding folder based on the image name |
|
received, then, run our ML model to get predictions. |
|
""" |
|
while True: |
|
|
|
q = db.brpop(settings.REDIS_QUEUE)[1] |
|
|
|
|
|
q = json.loads(q.decode("utf-8")) |
|
|
|
|
|
job_id = q["id"] |
|
|
|
|
|
prediction, score = predict(q["image_name"]) |
|
|
|
|
|
output = {"prediction": prediction, "score": score} |
|
|
|
|
|
|
|
db.set(job_id, json.dumps(output)) |
|
|
|
|
|
time.sleep(settings.SERVER_SLEEP) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
print("Launching ML service...") |
|
classify_process() |
|
|