mask-detection-manfred / pipelines.py
manfredmichael's picture
Add pipeline
93c1293
from PIL import Image
from ultralytics import YOLO
import numpy as np
import cv2
import torch
from utils import readb64, img2base64
model_int8 = YOLO('weights/best.torchscript', task='detect')
labels = {
0: 'mask_weared_incorrect',
1: 'with_mask',
2: 'without_mask',
}
def inference_on_image(path):
results = model_int8(path)
img = cv2.imread(path, cv2.COLOR_BGR2RGB)
for box in results[0].boxes:
img = draw_bbox_prediction(img, box)
cv2.imshow('Detected Image', img)
cv2.waitKey(0)
return results
def inference_on_video(path, vid_stride=10):
results = model_int8(path, vid_stride=10, stream=True)
cap = cv2.VideoCapture(path)
ret, img = cap.read()
frame_counter = 0
while True:
ret, img = cap.read()
if ret:
if frame_counter % 10 == 0:
result = next(results)
for box in result.boxes:
img = draw_bbox_prediction(img, box)
else:
cap.release()
break
cv2.imshow('Detected Image', img)
frame_counter += 1
k = cv2.waitKey(5) & 0xFF
if k == 27:
cap.release()
cv2.destroyAllWindows()
break
return results
def draw_bbox_prediction(img, box):
cls = box.cls.item()
confidence = box.conf.item()
label = labels[cls]
x1, y1, x2, y2 = map(int, list(box.xyxy.numpy()[0]))
scaler = (x2-x1)/(640/8)
cv2.rectangle(img, (x1, y1), (x2, y2), (0, 102, 255), int(2*scaler))
img = cv2.rectangle(img, (x1, y1 - int(20*scaler)), (x1 + (x2 - x1)*3, y1), (0, 102, 255), -1)
img = cv2.putText(img, "{}: {:.3f}".format(label, confidence), (x1,y1-5),cv2.FONT_HERSHEY_SIMPLEX,0.6*scaler,(255,255,255), int(1*scaler))
return img
class ImagePipeline:
def __init__(self, device='cpu', gpu_id=0, weights='weights/best.torchscript'):
self.model = YOLO(weights, task='detect')
def preprocess(self, data):
image_base64 = data.pop("images", data)
if not type(image_base64) == list:
image_base64 = [image_base64]
elif len(image_base64) > 1:
raise Exception("ImagePipeline only accepts 1 image/frame")
images = [readb64(image) for image in image_base64]
return images
def inference(self, images):
results = self.model(images[0])
return results
def get_response(self, inference_result):
response = []
if not bool(set([0, 2]).intersection(inference_result[0].boxes.cls.numpy())):
# if not set([0, 2]).issubset(inference_result[0].boxes.cls.numpy()):
message = "Everyone is wearing mask correctly"
else:
message = "Someone is not wearing mask or incorrectly wearing mask"
for i, result in enumerate(inference_result):
for xywhn, cls, conf in zip(
result.boxes.xywhn,
result.boxes.cls,
result.boxes.conf
):
xywhn = list(xywhn.numpy())
response.append({
'xywhn': {
'x': float(xywhn[0]),
'y': float(xywhn[1]),
'w': float(xywhn[2]),
'h': float(xywhn[3]),
},
'class': cls.item(),
'confidence': conf.item(),
})
return {'results': response,
'message': message}
def draw_bbox(self, images, inference_result):
img = np.array(images[0])
boxes = list(inference_result[0].boxes)
boxes.reverse()
for box in boxes:
img = draw_bbox_prediction(img, box)
return img
def __call__(self, data, config_payload=None, draw_bbox=False):
images = self.preprocess(data)
inference_result = self.inference(images)
response = self.get_response(inference_result)
if draw_bbox:
annotated_img = self.draw_bbox(images, inference_result)
return response, annotated_img
return response
class VideoPipeline:
def __init__(self, device='cpu', gpu_id=0, weights='weights/best.torchscript'):
self.model = YOLO(weights, task='detect')
def preprocess(self, data):
return data
def inference(self, video_path, vid_stride=30):
results = self.model(video_path, vid_stride=vid_stride)
return results
def get_response(self, inference_result):
response = []
# default message
message = "Everyone is wearing mask correctly"
for i, result in enumerate(inference_result):
if set([0, 2]).issubset(inference_result[0].boxes.cls.numpy()):
message = "Someone is not wearing mask or incorrectly wearing mask"
for xywhn, cls, conf in zip(
result.boxes.xywhn,
result.boxes.cls,
result.boxes.conf
):
xywhn = list(xywhn.numpy())
response.append({
'xywhn': {
'x': float(xywhn[0]),
'y': float(xywhn[1]),
'w': float(xywhn[2]),
'h': float(xywhn[3]),
},
'class': cls.item(),
'confidence': conf.item(),
})
return {'results': response,
'message': message}
def __call__(self, data, config_payload=None):
data = self.preprocess(data)
inference_result = self.inference(data)
response = self.get_response(inference_result)
return response
if __name__ == '__main__':
import cv2
import argparse
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--input_type',
default='image',
const='image',
nargs='?',
choices=['image', 'video'],
help='type of input (default: %(default)s)')
parser.add_argument("-p", "--path", help="filepath")
args = parser.parse_args()
if args.input_type=='image':
results = inference_on_image(args.path)
elif args.input_type == 'video':
results = inference_on_video(args.path)
print(results)
# Examples
# python pipelines.py --input_type image --path sample_files/image-1.jpeg
# python pipelines.py --input_type video --path sample_files/video-1.mp4