kisa-misa's picture
Create predict.py
ddff15f
import hydra
import torch
import argparse
import time
from pathlib import Path
import math
import cv2
import torch
import torch.backends.cudnn as cudnn
from numpy import random
from ultralytics.yolo.engine.predictor import BasePredictor
from ultralytics.yolo.utils import DEFAULT_CONFIG, ROOT, ops
from ultralytics.yolo.utils.checks import check_imgsz
from ultralytics.yolo.utils.plotting import Annotator, colors, save_one_box
import cv2
from deep_sort_pytorch.utils.parser import get_config
from deep_sort_pytorch.deep_sort import DeepSort
from collections import deque
import numpy as np
palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1)
cars_deque = {}
deepsort = None
object_counter = {}
speed_line_queue = {}
def estimatespeed(Location1, Location2):
#Euclidean Distance Formula
d_pixel = math.sqrt(math.pow(Location2[0] - Location1[0], 2) + math.pow(Location2[1] - Location1[1], 2))
# defining thr pixels per meter
ppm = 8
d_meters = d_pixel/ppm
time_constant = 15*3.6
#distance = speed/time
speed = d_meters * time_constant
return int(speed)
def init_tracker():
global deepsort
cfg_deep = get_config()
cfg_deep.merge_from_file("deep_sort_pytorch/configs/deep_sort.yaml")
deepsort= DeepSort(cfg_deep.DEEPSORT.REID_CKPT,
max_dist=cfg_deep.DEEPSORT.MAX_DIST, min_confidence=cfg_deep.DEEPSORT.MIN_CONFIDENCE,
nms_max_overlap=cfg_deep.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg_deep.DEEPSORT.MAX_IOU_DISTANCE,
max_age=cfg_deep.DEEPSORT.MAX_AGE, n_init=cfg_deep.DEEPSORT.N_INIT, nn_budget=cfg_deep.DEEPSORT.NN_BUDGET,
use_cuda=True)
##########################################################################################
def xyxy_to_xywh(*xyxy):
"""" Calculates the relative bounding box from absolute pixel values. """
bbox_left = min([xyxy[0].item(), xyxy[2].item()])
bbox_top = min([xyxy[1].item(), xyxy[3].item()])
bbox_w = abs(xyxy[0].item() - xyxy[2].item())
bbox_h = abs(xyxy[1].item() - xyxy[3].item())
x_c = (bbox_left + bbox_w / 2)
y_c = (bbox_top + bbox_h / 2)
w = bbox_w
h = bbox_h
return x_c, y_c, w, h
def compute_color_for_labels(label):
"""
Simple function that adds fixed color depending on the class
"""
if label == 0: #person
color = (85,45,255)
elif label == 2: # Car
color = (222,82,175)
elif label == 3: # Motobike
color = (0, 204, 255)
elif label == 5: # Bus
color = (0, 149, 255)
else:
color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette]
return tuple(color)
def draw_border(img, pt1, pt2, color, thickness, r, d):
x1,y1 = pt1
x2,y2 = pt2
# Top left
cv2.line(img, (x1 + r, y1), (x1 + r + d, y1), color, thickness)
cv2.line(img, (x1, y1 + r), (x1, y1 + r + d), color, thickness)
cv2.ellipse(img, (x1 + r, y1 + r), (r, r), 180, 0, 90, color, thickness)
# Top right
cv2.line(img, (x2 - r, y1), (x2 - r - d, y1), color, thickness)
cv2.line(img, (x2, y1 + r), (x2, y1 + r + d), color, thickness)
cv2.ellipse(img, (x2 - r, y1 + r), (r, r), 270, 0, 90, color, thickness)
# Bottom left
cv2.line(img, (x1 + r, y2), (x1 + r + d, y2), color, thickness)
cv2.line(img, (x1, y2 - r), (x1, y2 - r - d), color, thickness)
cv2.ellipse(img, (x1 + r, y2 - r), (r, r), 90, 0, 90, color, thickness)
# Bottom right
cv2.line(img, (x2 - r, y2), (x2 - r - d, y2), color, thickness)
cv2.line(img, (x2, y2 - r), (x2, y2 - r - d), color, thickness)
cv2.ellipse(img, (x2 - r, y2 - r), (r, r), 0, 0, 90, color, thickness)
cv2.rectangle(img, (x1 + r, y1), (x2 - r, y2), color, -1, cv2.LINE_AA)
cv2.rectangle(img, (x1, y1 + r), (x2, y2 - r - d), color, -1, cv2.LINE_AA)
cv2.circle(img, (x1 +r, y1+r), 2, color, 12)
cv2.circle(img, (x2 -r, y1+r), 2, color, 12)
cv2.circle(img, (x1 +r, y2-r), 2, color, 12)
cv2.circle(img, (x2 -r, y2-r), 2, color, 12)
return img
def UI_box(x, img, color=None, label=None, line_thickness=None):
# Plots one bounding box on image img
tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness
color = color or [random.randint(0, 255) for _ in range(3)]
c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3]))
cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA)
if label:
tf = max(tl - 1, 1) # font thickness
t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0]
img = draw_border(img, (c1[0], c1[1] - t_size[1] -3), (c1[0] + t_size[0], c1[1]+3), color, 1, 8, 2)
cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA)
def ccw(A,B,C):
return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0])
def draw_boxes(img, bbox, names,object_id, identities=None, offset=(0, 0)):
#cv2.line(img, line[0], line[1], (46,162,112), 3)
cv2.putText(img, f'Number of cars: {len(cars_deque)}', (11, 35), 0, 1, [0, 255, 0], thickness=2, lineType=cv2.LINE_AA)
height, width, _ = img.shape
# remove tracked point from buffer if object is lost
for key in list(cars_deque):
if key not in identities:
cars_deque.pop(key)
for i, box in enumerate(bbox):
obj_name = names[object_id[i]]
if obj_name == 'car':
x1, y1, x2, y2 = [int(i) for i in box]
x1 += offset[0]
x2 += offset[0]
y1 += offset[1]
y2 += offset[1]
# code to find center of bottom edge
center = (int((x2+x1)/ 2), int((y2+y2)/2))
# get ID of object
id = int(identities[i]) if identities is not None else 0
# create new buffer for new object
if id not in cars_deque:
cars_deque[id] = deque(maxlen= 64)
speed_line_queue[id] = []
color = compute_color_for_labels(object_id[i])
label = '{}{:d}'.format("", id) + ":"+ '%s' % (obj_name)
# add center to buffer
cars_deque[id].appendleft(center)
if len(cars_deque[id]) >= 2:
object_speed = estimatespeed(cars_deque[id][1], cars_deque[id][0])
speed_line_queue[id].append(object_speed)
if obj_name not in object_counter:
object_counter[obj_name] = 1
try:
label = label + " " + str(sum(speed_line_queue[id])//len(speed_line_queue[id])) + "km/h"
except:
pass
UI_box(box, img, label=label, color=color, line_thickness=2)
return img
class DetectionPredictor(BasePredictor):
def get_annotator(self, img):
return Annotator(img, line_width=self.args.line_thickness, example=str(self.model.names))
def preprocess(self, img):
img = torch.from_numpy(img).to(self.model.device)
img = img.half() if self.model.fp16 else img.float() # uint8 to fp16/32
img /= 255 # 0 - 255 to 0.0 - 1.0
return img
def postprocess(self, preds, img, orig_img):
preds = ops.non_max_suppression(preds,
self.args.conf,
self.args.iou,
agnostic=self.args.agnostic_nms,
max_det=self.args.max_det)
for i, pred in enumerate(preds):
shape = orig_img[i].shape if self.webcam else orig_img.shape
pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], shape).round()
return preds
def write_results(self, idx, preds, batch):
p, im, im0 = batch
all_outputs = []
log_string = ""
if len(im.shape) == 3:
im = im[None] # expand for batch dim
self.seen += 1
im0 = im0.copy()
if self.webcam: # batch_size >= 1
log_string += f'{idx}: '
frame = self.dataset.count
else:
frame = getattr(self.dataset, 'frame', 0)
self.data_path = p
save_path = str(self.save_dir / p.name) # im.jpg
self.txt_path = str(self.save_dir / 'labels' / p.stem) + ('' if self.dataset.mode == 'image' else f'_{frame}')
log_string += '%gx%g ' % im.shape[2:] # print string
self.annotator = self.get_annotator(im0)
det = preds[idx]
all_outputs.append(det)
if len(det) == 0:
return log_string
for c in det[:, 5].unique():
n = (det[:, 5] == c).sum() # detections per class
log_string += f"{n} {self.model.names[int(c)]}{'s' * (n > 1)}, "
# write
gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh
xywh_bboxs = []
confs = []
oids = []
outputs = []
for *xyxy, conf, cls in reversed(det):
x_c, y_c, bbox_w, bbox_h = xyxy_to_xywh(*xyxy)
xywh_obj = [x_c, y_c, bbox_w, bbox_h]
xywh_bboxs.append(xywh_obj)
confs.append([conf.item()])
oids.append(int(cls))
xywhs = torch.Tensor(xywh_bboxs)
confss = torch.Tensor(confs)
outputs = deepsort.update(xywhs, confss, oids, im0)
if len(outputs) > 0:
bbox_xyxy = outputs[:, :4]
identities = outputs[:, -2]
object_id = outputs[:, -1]
draw_boxes(im0, bbox_xyxy, self.model.names, object_id,identities)
return log_string
@hydra.main(version_base=None, config_path=str(DEFAULT_CONFIG.parent), config_name=DEFAULT_CONFIG.name)
def predict(cfg):
init_tracker()
cfg.model = cfg.model or "yolov8n.pt"
cfg.imgsz = check_imgsz(cfg.imgsz, min_dim=2) # check image size
cfg.source = cfg.source if cfg.source is not None else ROOT / "assets"
predictor = DetectionPredictor(cfg)
predictor()
if __name__ == "__main__":
predict()