import json | |
import time | |
from uuid import uuid4 | |
import redis | |
from .. import settings | |
# Connect to Redis | |
db = redis.Redis( | |
host=settings.REDIS_IP, port=settings.REDIS_PORT, db=settings.REDIS_DB_ID | |
) | |
async def model_predict(image_name): | |
print(f"Processing image {image_name}...") | |
""" | |
Receives an image name and queues the job into Redis. | |
Will loop until getting the answer from our ML service. | |
Parameters | |
---------- | |
image_name : str | |
Name for the image uploaded by the user. | |
Returns | |
------- | |
prediction, score : tuple(str, float) | |
Model predicted class as a string and the corresponding confidence | |
score as a number. | |
""" | |
prediction = None | |
score = None | |
# Assign an unique ID for this job and add it to the queue. | |
job_id = str(uuid4()) | |
# Create a dict with the job data we will send through Redis | |
job_data = {"id": job_id, "image_name": image_name} | |
# Send the job to the model service using Redis | |
db.lpush(settings.REDIS_QUEUE, json.dumps(job_data)) | |
# Loop until we received the response from our ML model | |
while True: | |
# Attempt to get model predictions using job_id | |
output = db.get(job_id) | |
# Check if the text was correctly processed by the ML model | |
if output is not None: | |
output = json.loads(output.decode("utf-8")) | |
prediction = output["prediction"] | |
score = output["score"] | |
db.delete(job_id) | |
break | |
# Sleep some time waiting for model results | |
time.sleep(settings.API_SLEEP) | |
return prediction, score | |