import cv2 from ultralytics import YOLO import os from dotenv import load_dotenv from pathlib import Path import math import json import numpy as np env_path = Path('.') / '.env' load_dotenv(dotenv_path=env_path) path = { 'DET_MODEL_PATH': str(os.getenv('DET_MODEL_PATH')), 'IMG_DIR_PATH': str(os.getenv('IMG_DIR_PATH')), 'ACTIVITY_DET_MODEL_PATH':str(os.getenv('ACTIVITY_DET_MODEL_PATH')), } #constants PERSON_HEIGHT = 1.5 VEHICAL_HEIGHT = 1.35 ANIMAL_HEIGHT = 0.6 FOCAL_LENGTH = 6400 # CONF = 0.0 #Load models det_model = YOLO(path['DET_MODEL_PATH']) activity_det_model = YOLO(path['ACTIVITY_DET_MODEL_PATH']) activity_classes = ['Standing','Running','Sitting'] def object_detection(image): """ Args: image (numpy array): get numpy array of image which has 3 channels Returns: new_boxes: returns json object which has below format [ { "actual_boundries": [ { "top_left": [48, 215], "bottom_right": [62, 245], "class": "person" } ], "updated_boundries": { "top_left": [41, 199], "bottom_right": [73, 269], "person_count": 1, "vehical_count": 0, "animal_count": 0 } } ] """ #detect object using yolo model results = det_model(image) boxes = results[0].boxes.xyxy.tolist() classes = results[0].boxes.cls.tolist() names = results[0].names confidences = results[0].boxes.conf.tolist() ctr = 0 my_boxes = [] # ((x1, y1), (x2,y2), person_count, vehical_count, animal_count) for box, cls, conf in zip(boxes, classes, confidences): x1, y1, x2, y2 = box name = names[int(cls)] my_obj = {"actual_boundries": [{"top_left": (int(x1), int(y1)), "bottom_right": (int(x2), int(y2)), "class": name}]} # img = cv2.imread(img_path) x1 = max(0, x1 - (x2-x1)/2) y1 = max(0, y1 - (y2-y1)/2) x2 = min(len(image[0])-1, x2 + (x2-x1)/2) y2 = min(len(image)-1, y2 + (y2-y1)/2) x1, y1, x2, y2 = math.floor(x1), math.floor(y1), math.ceil(x2), math.ceil(y2) # image = cv2.rectangle(image, (x1, y1), (x2, y2), (255, 0, 0), 2) my_obj["updated_boundries"] = {"top_left": (x1, y1), "bottom_right": (x2, y2), "person_count": 1 if name == 'person' else 0, "vehical_count": 1 if name == 'vehical' else 0, "animal_count": 1 if name == 'animal' else 0} my_boxes.append(my_obj) ctr += 1 my_boxes.sort(key=lambda x: (x['updated_boundries']['top_left'], x['updated_boundries']['bottom_right'])) new_boxes = [] if len(my_boxes) > 0: new_boxes.append(my_boxes[0]) for indx, box in enumerate(my_boxes): if indx != 0: top_left_last = new_boxes[-1]['updated_boundries']['top_left'] bottom_right_last = new_boxes[-1]['updated_boundries']['bottom_right'] top_left_curr = box['updated_boundries']['top_left'] bottom_right_curr = box['updated_boundries']['bottom_right'] if bottom_right_last[0] >= top_left_curr[0] and bottom_right_last[1] >= top_left_curr[1]: new_x1 = min(top_left_last[0], top_left_curr[0]) new_y1 = min(top_left_last[1], top_left_curr[1]) new_x2 = max(bottom_right_last[0], bottom_right_curr[0]) new_y2 = max(bottom_right_last[1], bottom_right_curr[1]) new_boxes[-1]['actual_boundries'] += box['actual_boundries'] new_boxes[-1]['updated_boundries'] = {"top_left": (new_x1, new_y1), "bottom_right": (new_x2, new_y2), "person_count": new_boxes[-1]['updated_boundries']['person_count'] + box['updated_boundries']['person_count'], "vehical_count": new_boxes[-1]['updated_boundries']['vehical_count'] + box['updated_boundries']['vehical_count'], "animal_count": new_boxes[-1]['updated_boundries']['animal_count'] + box['updated_boundries']['animal_count']} else: new_boxes.append(box) return new_boxes def croped_images(image,new_boxes): """_summary_ Args: image (numpy array): get numpy array of image which has 3 channels new_boxes (json array): get json array Returns: croped_images_list(list of numpy array): returns list which has croped images single_object_images(list of numpy array): returns list which has single object images """ croped_images_list = [] single_object_images = [] for data in new_boxes: print(data['updated_boundries']) crop_image = image[data['updated_boundries']['top_left'][1]:data['updated_boundries']['bottom_right'][1],data['updated_boundries']['top_left'][0]:data['updated_boundries']['bottom_right'][0]] croped_images_list.append(crop_image) for object in data['actual_boundries']: if object['class']=='person': crop_object= image[object['top_left'][1]:object['bottom_right'][1],object['top_left'][0]:object['bottom_right'][0]] single_object_images.append(crop_object) return croped_images_list,single_object_images def image_enhancements(croped_images_list,single_object_images): """_summary_ Args: croped_images_list (list numpy array): croped images list single_object_images (list numpy array): single object images list Returns: enhanced croped images: returns enhanced images enhanced single_object_images: returns enhanced images """ enhanced_images = [] enhanced_single_object_images = [] for image in croped_images_list: # resize the image res = cv2.resize(image,(500*image.shape[1]//image.shape[0],500), interpolation = cv2.INTER_CUBIC) # brightness and contrast brightness = 16 contrast = 0.95 res2 = cv2.addWeighted(res, contrast, np.zeros(res.shape, res.dtype), 0, brightness) # Sharpen the image kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]]) sharpened_image = cv2.filter2D(res2, -1, kernel) #append in the list enhanced_images.append(sharpened_image) for image in single_object_images: # resize the image res = cv2.resize(image,(500*image.shape[1]//image.shape[0],500), interpolation = cv2.INTER_CUBIC) # brightness and contrast brightness = 16 contrast = 0.95 res2 = cv2.addWeighted(res, contrast, np.zeros(res.shape, res.dtype), 0, brightness) # Sharpen the image kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]]) sharpened_image = cv2.filter2D(res2, -1, kernel) #append enhnaced single object image enhanced_single_object_images.append(sharpened_image) return enhanced_images,enhanced_single_object_images def detect_activity(single_object_images): """_summary_ Args: single_object_images (list of numpy array): list of single object images Returns: activities(list of strings): returns list of activities perform by person """ activities = [] for img in single_object_images: predictions =activity_det_model.predict(img) for result in predictions: probs = result.probs class_index = probs.top1 activities.append(activity_classes[class_index]) return activities def get_distances(new_boxes): """_summary_ Args: new_boxes (json array): takes json array of detected image's data Returns: distance_list: list of distances of each object """ distance_list = [] for box in new_boxes: for actual_box in box['actual_boundries']: height = actual_box['bottom_right'][1] - actual_box['top_left'][1] if actual_box['class'] == "person": distance = FOCAL_LENGTH*PERSON_HEIGHT/height elif actual_box['class'] == "vehical": distance = FOCAL_LENGTH*PERSON_HEIGHT/height else: distance = FOCAL_LENGTH*PERSON_HEIGHT/height distance_list.append(str(round(distance)) + "m") return distance_list def get_json_data(json_data,enhanced_images,detected_activity,distances_list): """_summary_ Args: json_data (json Array): get json data of image enhanced_images (list of numpy array): list of enhanced images detected_activity (list of strings): list of activities of person distances_list (lsit of integers): list of distances of each object Returns: results(json Array): contains all informations needed for frontend {'zoomed_img':np.array([]) , 'actual_boxes':[], 'updated_boxes':{}, } """ results = [] object_count = 0 activity_count = 0 for idx,box in enumerate(json_data): final_json_output = {'zoomed_img':np.array([]) , 'actual_boxes':[], 'updated_boxes':{}, } final_json_output['zoomed_img'] = enhanced_images[idx] final_json_output['updated_boxes'] = { "top_left": box['updated_boundries']['top_left'], "bottom_right": box['updated_boundries']['bottom_right']} for actual_box in box['actual_boundries']: temp = {"top_left": [], "bottom_right": [], "class": "", "distance":0, "activity":'none'} temp['top_left'] = actual_box['top_left'] temp['bottom_right'] = actual_box['bottom_right'] temp['class'] = actual_box['class'] temp['distance'] = distances_list[object_count] object_count+=1 if temp['class'] == 'person': temp['activity'] = detected_activity[activity_count] activity_count+=1 final_json_output['actual_boxes'].append(temp) final_json_output = fix_distance(final_json_output) results.append(final_json_output) return results def fix_distance(final_json_output): """_summary_ Args: final_json_output (json Array): array of json object Returns: final_json_output (json Array): array of json object """ distances = [] DIFF = 90 for idx,box in enumerate(final_json_output['actual_boxes']): distances.append({'idx':idx,'distance':int(box['distance'][:-1])}) sorted_dist = sorted(distances, key=lambda d: d['distance']) sum_dist = [] idx= 0 sum_dist.append({'sum':sorted_dist[0]['distance'],'idxes':[sorted_dist[0]['idx']]}) for i in range(1,len(sorted_dist)): print(sorted_dist[i]['distance'],sorted_dist[i-1]['distance']) if abs(sorted_dist[i]['distance']-sorted_dist[i-1]['distance']) <=DIFF: sum_dist[idx]['sum']+= sorted_dist[i]['distance'] sum_dist[idx]['idxes'].append(sorted_dist[i]['idx']) else: sum_dist.append({'sum':sorted_dist[i]['distance'],'idxes':[sorted_dist[i]['idx']]}) idx+=1 #change values in distance array for data in sum_dist: count = len(data['idxes']) mean = data['sum']//count for i in data['idxes']: final_json_output['actual_boxes'][i]['distance'] = str(mean)+'m' return final_json_output