from glob import glob import hydra import argparse import time from pathlib import Path import math import cv2 import torch import torch.backends.cudnn as cudnn from numpy import random from ultralytics.yolo.engine.predictor import BasePredictor from ultralytics.yolo.utils import DEFAULT_CONFIG, ROOT, ops from ultralytics.yolo.utils.checks import check_imgsz from ultralytics.yolo.utils.plotting import Annotator, colors, save_one_box import pandas as pd import cv2 from ultralytics.yolo.v8.detect.deep_sort_pytorch.utils.parser import get_config from ultralytics.yolo.v8.detect.deep_sort_pytorch.deep_sort import DeepSort from collections import deque import numpy as np import csv import matplotlib.pyplot as plt import seaborn as sns import gradio as gr palette = (2 ** 11 - 1, 2 ** 15 - 1, 2 ** 20 - 1) deq = {} indices = [0] * 100 c = 0 num = 1 #f = open('/pulse.csv', "w+") #f.close() #with open('/pulse.csv', 'a') as f: # create the csv writer # writer = csv.writer(f) # header = ['time', 'pulse'] # writer.writerow(header) deepsort = None object_counter = {} speed_line_queue = {} def estimatespeed(Location1, Location2, h, w): #Euclidean Distance Formula d_pixel = math.sqrt(math.pow(Location2[0] - Location1[0], 2) + math.pow(Location2[1] - Location1[1], 2)) # defining thr pixels per meter ppm = max(h, w) // 10 d_meters = d_pixel/ppm time_constant = 15*3.6 #distance = speed/time speed = d_meters * time_constant return int(speed) def init_tracker(): global deepsort cfg_deep = get_config() cfg_deep.merge_from_file("deep_sort_pytorch/configs/deep_sort.yaml") deepsort= DeepSort(cfg_deep.DEEPSORT.REID_CKPT, max_dist=cfg_deep.DEEPSORT.MAX_DIST, min_confidence=cfg_deep.DEEPSORT.MIN_CONFIDENCE, nms_max_overlap=cfg_deep.DEEPSORT.NMS_MAX_OVERLAP, max_iou_distance=cfg_deep.DEEPSORT.MAX_IOU_DISTANCE, max_age=cfg_deep.DEEPSORT.MAX_AGE, n_init=cfg_deep.DEEPSORT.N_INIT, nn_budget=cfg_deep.DEEPSORT.NN_BUDGET, use_cuda=True) ########################################################################################## def xyxy_to_xywh(*xyxy): """" Calculates the relative bounding box from absolute pixel values. """ bbox_left = min([xyxy[0].item(), xyxy[2].item()]) bbox_top = min([xyxy[1].item(), xyxy[3].item()]) bbox_w = abs(xyxy[0].item() - xyxy[2].item()) bbox_h = abs(xyxy[1].item() - xyxy[3].item()) x_c = (bbox_left + bbox_w / 2) y_c = (bbox_top + bbox_h / 2) w = bbox_w h = bbox_h return x_c, y_c, w, h def compute_color_for_labels(label): """ Simple function that adds fixed color depending on the class """ if label == 7: #truck color = (85,45,255) elif label == 2: # Car color = (222,82,175) elif label == 3: # Motorcycle color = (0, 204, 255) elif label == 5: # Bus color = (0, 149, 255) else: color = [int((p * (label ** 2 - label + 1)) % 255) for p in palette] return tuple(color) def draw_border(img, pt1, pt2, color, thickness, r, d): x1,y1 = pt1 x2,y2 = pt2 # Top left cv2.line(img, (x1 + r, y1), (x1 + r + d, y1), color, thickness) cv2.line(img, (x1, y1 + r), (x1, y1 + r + d), color, thickness) cv2.ellipse(img, (x1 + r, y1 + r), (r, r), 180, 0, 90, color, thickness) # Top right cv2.line(img, (x2 - r, y1), (x2 - r - d, y1), color, thickness) cv2.line(img, (x2, y1 + r), (x2, y1 + r + d), color, thickness) cv2.ellipse(img, (x2 - r, y1 + r), (r, r), 270, 0, 90, color, thickness) # Bottom left cv2.line(img, (x1 + r, y2), (x1 + r + d, y2), color, thickness) cv2.line(img, (x1, y2 - r), (x1, y2 - r - d), color, thickness) cv2.ellipse(img, (x1 + r, y2 - r), (r, r), 90, 0, 90, color, thickness) # Bottom right cv2.line(img, (x2 - r, y2), (x2 - r - d, y2), color, thickness) cv2.line(img, (x2, y2 - r), (x2, y2 - r - d), color, thickness) cv2.ellipse(img, (x2 - r, y2 - r), (r, r), 0, 0, 90, color, thickness) cv2.rectangle(img, (x1 + r, y1), (x2 - r, y2), color, -1, cv2.LINE_AA) cv2.rectangle(img, (x1, y1 + r), (x2, y2 - r - d), color, -1, cv2.LINE_AA) cv2.circle(img, (x1 +r, y1+r), 2, color, 12) cv2.circle(img, (x2 -r, y1+r), 2, color, 12) cv2.circle(img, (x1 +r, y2-r), 2, color, 12) cv2.circle(img, (x2 -r, y2-r), 2, color, 12) return img def UI_box(x, img, color=None, label=None, line_thickness=None): # Plots one bounding box on image img tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness color = color or [random.randint(0, 255) for _ in range(3)] c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3])) cv2.rectangle(img, c1, c2, color, thickness=tl, lineType=cv2.LINE_AA) if label: tf = max(tl - 1, 1) # font thickness t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0] img = draw_border(img, (c1[0], c1[1] - t_size[1] -3), (c1[0] + t_size[0], c1[1]+3), color, 1, 8, 2) cv2.putText(img, label, (c1[0], c1[1] - 2), 0, tl / 3, [225, 255, 255], thickness=tf, lineType=cv2.LINE_AA) def ccw(A,B,C): return (C[1]-A[1]) * (B[0]-A[0]) > (B[1]-A[1]) * (C[0]-A[0]) def draw_boxes(img, bbox, names,object_id,writer, writer2, identities=None, offset=(0, 0)): height, width, _ = img.shape # remove tracked point from buffer if object is lost global c for key in list(deq): if key not in identities: deq.pop(key) weights = [0,0,int(6.72),int(1.638),0,30,0,int(18.75)] speeds = [0] * 8 for i, box in enumerate(bbox): obj_name = names[object_id[i]] x1, y1, x2, y2 = [int(i) for i in box] x1 += offset[0] x2 += offset[0] y1 += offset[1] y2 += offset[1] # code to find center of bottom edge center = (int((x2+x1)/ 2), int((y2+y2)/2)) # get ID of object id = int(identities[i]) if identities is not None else 0 # create new buffer for new object if id not in deq: deq[id] = deque(maxlen= 64) if object_id[i] in [2, 3, 5, 7]: c +=1 indices[id] = c speed_line_queue[id] = [] color = compute_color_for_labels(object_id[i]) label = '{}{:d}'.format("", indices[id]) + ":"+ '%s' % (obj_name) # add center to buffer deq[id].appendleft(center) if len(deq[id]) >= 2: object_speed = estimatespeed(deq[id][1], deq[id][0], x2-x1, y2-y1) speed_line_queue[id].append(object_speed) if obj_name not in object_counter: object_counter[obj_name] = 1 #motorcycle_weight = 1.638 #car_weight = 6.72 #truck_weight = 18.75 #bus_weight = 30 try: spd = sum(speed_line_queue[id])//len(speed_line_queue[id]) speeds[object_id[i]] += spd label = label + " v=" + str(spd) + " m=" + str(weights[object_id[i]]) writer2.writerow([str(indices[id]), obj_name, str(spd), str(weights[object_id[i]])]) except: pass UI_box(box, img, label=label, color=color, line_thickness=2) #cv2.putText(img, f"{speeds}", (500, 50), 0, 1, [0, 255, 0], thickness=2, lineType=cv2.LINE_AA) t = time.localtime() current_time = time.strftime("%H:%M:%S %d.%m.%Y", t) pulse = sum(np.multiply(speeds, weights)) # write a row to the csv file writer.writerow([f"{current_time}", f"{pulse}"]) cv2.putText(img, f"pulse: {pulse}", (500, 50), 0, 1, [0, 255, 0], thickness=2, lineType=cv2.LINE_AA) #for i, object_speed in enumerate(speeds): # object_speed = sum(object_speed)*weights[i] return img class DetectionPredictor(BasePredictor): def get_annotator(self, img): return Annotator(img, line_width=self.args.line_thickness, example=str(self.model.names)) def preprocess(self, img): img = torch.from_numpy(img).to(self.model.device) img = img.half() if self.model.fp16 else img.float() # uint8 to fp16/32 img /= 255 # 0 - 255 to 0.0 - 1.0 return img def postprocess(self, preds, img, orig_img): preds = ops.non_max_suppression(preds, self.args.conf, self.args.iou, classes = [2, 3, 5, 7], agnostic=self.args.agnostic_nms, max_det=self.args.max_det) for i, pred in enumerate(preds): shape = orig_img[i].shape if self.webcam else orig_img.shape pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], shape).round() return preds def write_results(self, idx, preds, batch): global num p, im, im0 = batch all_outputs = [] log_string = "" if len(im.shape) == 3: im = im[None] # expand for batch dim self.seen += 1 im0 = im0.copy() if self.webcam: # batch_size >= 1 log_string += f'{idx}: ' frame = self.dataset.count else: frame = getattr(self.dataset, 'frame', 0) self.data_path = p save_path = str(self.save_dir / p.name) # im.jpg self.txt_path = str(self.save_dir / 'labels' / p.stem) + ('' if self.dataset.mode == 'image' else f'_{frame}') log_string += '%gx%g ' % im.shape[2:] # print string self.annotator = self.get_annotator(im0) det = preds[idx] all_outputs.append(det) if len(det) == 0: return log_string count = 0 for c in det[:, 5].unique(): count += 1 n = (det[:, 5] == c).sum() # detections per class cv2.putText(im0, f"{n} {self.model.names[int(c)]}", (11, count*50), 0, 1, [0, 255, 0], thickness=2, lineType=cv2.LINE_AA) log_string += f"{n} {self.model.names[int(c)]}{'s' * (n > 1)}, " # write gn = torch.tensor(im0.shape)[[1, 0, 1, 0]] # normalization gain whwh xywh_bboxs = [] confs = [] oids = [] outputs = [] for *xyxy, conf, cls in reversed(det): x_c, y_c, bbox_w, bbox_h = xyxy_to_xywh(*xyxy) xywh_obj = [x_c, y_c, bbox_w, bbox_h] xywh_bboxs.append(xywh_obj) confs.append([conf.item()]) oids.append(int(cls)) xywhs = torch.Tensor(xywh_bboxs) confss = torch.Tensor(confs) outputs = deepsort.update(xywhs, confss, oids, im0) #with open('/pulse.csv', 'a') as f: # create the csv writer #writer = csv.writer(f) if len(outputs) > 0: bbox_xyxy = outputs[:, :4] identities = outputs[:, -2] object_id = outputs[:, -1] #f2 = open('/vehicles_data.csv', "w+") #f2.close() #with open('/vehicles_data.csv', 'a') as f: #writer2 = csv.writer(f) #header = ['id', 'class', 'speed', 'weight'] #writer2.writerow(header) img= draw_boxes(im0, bbox_xyxy, self.model.names, object_id,writer, writer2, identities) cv2.imshow("window", img) #df = pd.read_csv("/pulse.csv") #df['time'] = pd.to_datetime(df['time'], format = '%H:%M:%S %d.%m.%Y') #df.index = df['time'] #del df['time'] #try: #fig, ax = plt.subplots() #plt.clf() #sns.lineplot(df) #ax.set_xticklabels([t.get_text().split(".")[0] for t in ax.get_xticklabels()]) #ax.set_xticklabels([pd.to_datetime(t.get_text()).strftime('%H:%M:%S') for t in ax.get_xticklabels()]) #plt.ylabel('Pulse') #plt.xlabel('time') #plt.savefig(f'/time_series/figure_{num:010d}.png') #num += 1 #except: #log_string += f'An error occured while saving figure_{num:010d}.png, ' return [log_string, img] @hydra.main(version_base=None, config_path=str(DEFAULT_CONFIG.parent), config_name=DEFAULT_CONFIG.name) def predict(cfg): init_tracker() cfg.model = cfg.model or "yolov8n.pt" cfg.imgsz = check_imgsz(cfg.imgsz, min_dim=2) # check image size cfg.source = cfg.source if cfg.source is not None else ROOT / "assets" predictor = DetectionPredictor(cfg) predictor() #model = Yolov4(weight_path="yolov4.weights", class_name_path='coco_classes.txt') def gradio_wrapper(img): result = predict(model="YOLOv8-real-time/ultralytics/yolo/v8/detect/yolov8x6.pt", source=img) #print(np.shape(img)) return result demo = gr.Interface( fn=gradio_wrapper, #gr.Image(source="webcam", streaming=True, flip=True), inputs=gr.Image(source="webcam", streaming=True), outputs="image", live=True ) demo.launch()