Spaces:
Runtime error
Runtime error
| import cv2 | |
| from ultralytics import YOLO | |
| import os | |
| from dotenv import load_dotenv | |
| from pathlib import Path | |
| import math | |
| import json | |
| import numpy as np | |
| env_path = Path('.') / '.env' | |
| load_dotenv(dotenv_path=env_path) | |
| path = { | |
| 'DET_MODEL_PATH': str(os.getenv('DET_MODEL_PATH')), | |
| 'IMG_DIR_PATH': str(os.getenv('IMG_DIR_PATH')), | |
| 'ACTIVITY_DET_MODEL_PATH':str(os.getenv('ACTIVITY_DET_MODEL_PATH')), | |
| } | |
| #constants | |
| PERSON_HEIGHT = 1.5 | |
| VEHICAL_HEIGHT = 1.35 | |
| ANIMAL_HEIGHT = 0.6 | |
| FOCAL_LENGTH = 6400 | |
| # CONF = 0.0 | |
| #Load models | |
| det_model = YOLO(path['DET_MODEL_PATH']) | |
| activity_det_model = YOLO(path['ACTIVITY_DET_MODEL_PATH']) | |
| activity_classes = ['Standing','Running','Sitting'] | |
| def object_detection(image): | |
| """ | |
| Args: | |
| image (numpy array): get numpy array of image which has 3 channels | |
| Returns: | |
| new_boxes: returns json object which has below format | |
| [ | |
| { | |
| "actual_boundries": [ | |
| { | |
| "top_left": [48, 215], | |
| "bottom_right": [62, 245], | |
| "class": "person" | |
| } | |
| ], | |
| "updated_boundries": { | |
| "top_left": [41, 199], | |
| "bottom_right": [73, 269], | |
| "person_count": 1, | |
| "vehical_count": 0, | |
| "animal_count": 0 | |
| } | |
| } | |
| ] | |
| """ | |
| #detect object using yolo model | |
| results = det_model(image) | |
| boxes = results[0].boxes.xyxy.tolist() | |
| classes = results[0].boxes.cls.tolist() | |
| names = results[0].names | |
| confidences = results[0].boxes.conf.tolist() | |
| ctr = 0 | |
| my_boxes = [] # ((x1, y1), (x2,y2), person_count, vehical_count, animal_count) | |
| for box, cls, conf in zip(boxes, classes, confidences): | |
| x1, y1, x2, y2 = box | |
| name = names[int(cls)] | |
| my_obj = {"actual_boundries": [{"top_left": (int(x1), int(y1)), | |
| "bottom_right": (int(x2), int(y2)), | |
| "class": name}]} | |
| # img = cv2.imread(img_path) | |
| x1 = max(0, x1 - (x2-x1)/2) | |
| y1 = max(0, y1 - (y2-y1)/2) | |
| x2 = min(len(image[0])-1, x2 + (x2-x1)/2) | |
| y2 = min(len(image)-1, y2 + (y2-y1)/2) | |
| x1, y1, x2, y2 = math.floor(x1), math.floor(y1), math.ceil(x2), math.ceil(y2) | |
| # image = cv2.rectangle(image, (x1, y1), (x2, y2), (255, 0, 0), 2) | |
| my_obj["updated_boundries"] = {"top_left": (x1, y1), | |
| "bottom_right": (x2, y2), | |
| "person_count": 1 if name == 'person' else 0, | |
| "vehical_count": 1 if name == 'vehical' else 0, | |
| "animal_count": 1 if name == 'animal' else 0} | |
| my_boxes.append(my_obj) | |
| ctr += 1 | |
| my_boxes.sort(key=lambda x: (x['updated_boundries']['top_left'], x['updated_boundries']['bottom_right'])) | |
| new_boxes = [] | |
| if len(my_boxes) > 0: | |
| new_boxes.append(my_boxes[0]) | |
| for indx, box in enumerate(my_boxes): | |
| if indx != 0: | |
| top_left_last = new_boxes[-1]['updated_boundries']['top_left'] | |
| bottom_right_last = new_boxes[-1]['updated_boundries']['bottom_right'] | |
| top_left_curr = box['updated_boundries']['top_left'] | |
| bottom_right_curr = box['updated_boundries']['bottom_right'] | |
| if bottom_right_last[0] >= top_left_curr[0] and bottom_right_last[1] >= top_left_curr[1]: | |
| new_x1 = min(top_left_last[0], top_left_curr[0]) | |
| new_y1 = min(top_left_last[1], top_left_curr[1]) | |
| new_x2 = max(bottom_right_last[0], bottom_right_curr[0]) | |
| new_y2 = max(bottom_right_last[1], bottom_right_curr[1]) | |
| new_boxes[-1]['actual_boundries'] += box['actual_boundries'] | |
| new_boxes[-1]['updated_boundries'] = {"top_left": (new_x1, new_y1), | |
| "bottom_right": (new_x2, new_y2), | |
| "person_count": new_boxes[-1]['updated_boundries']['person_count'] + box['updated_boundries']['person_count'], | |
| "vehical_count": new_boxes[-1]['updated_boundries']['vehical_count'] + box['updated_boundries']['vehical_count'], | |
| "animal_count": new_boxes[-1]['updated_boundries']['animal_count'] + box['updated_boundries']['animal_count']} | |
| else: | |
| new_boxes.append(box) | |
| return new_boxes | |
| def croped_images(image,new_boxes): | |
| """_summary_ | |
| Args: | |
| image (numpy array): get numpy array of image which has 3 channels | |
| new_boxes (json array): get json array | |
| Returns: | |
| croped_images_list(list of numpy array): returns list which has croped images | |
| single_object_images(list of numpy array): returns list which has single object images | |
| """ | |
| croped_images_list = [] | |
| single_object_images = [] | |
| for data in new_boxes: | |
| print(data['updated_boundries']) | |
| crop_image = image[data['updated_boundries']['top_left'][1]:data['updated_boundries']['bottom_right'][1],data['updated_boundries']['top_left'][0]:data['updated_boundries']['bottom_right'][0]] | |
| croped_images_list.append(crop_image) | |
| for object in data['actual_boundries']: | |
| if object['class']=='person': | |
| crop_object= image[object['top_left'][1]:object['bottom_right'][1],object['top_left'][0]:object['bottom_right'][0]] | |
| single_object_images.append(crop_object) | |
| return croped_images_list,single_object_images | |
| def image_enhancements(croped_images_list,single_object_images): | |
| """_summary_ | |
| Args: | |
| croped_images_list (list numpy array): croped images list | |
| single_object_images (list numpy array): single object images list | |
| Returns: | |
| enhanced croped images: returns enhanced images | |
| enhanced single_object_images: returns enhanced images | |
| """ | |
| enhanced_images = [] | |
| enhanced_single_object_images = [] | |
| for image in croped_images_list: | |
| # resize the image | |
| res = cv2.resize(image,(500*image.shape[1]//image.shape[0],500), interpolation = cv2.INTER_CUBIC) | |
| # brightness and contrast | |
| brightness = 16 | |
| contrast = 0.95 | |
| res2 = cv2.addWeighted(res, contrast, np.zeros(res.shape, res.dtype), 0, brightness) | |
| # Sharpen the image | |
| kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]]) | |
| sharpened_image = cv2.filter2D(res2, -1, kernel) | |
| #append in the list | |
| enhanced_images.append(sharpened_image) | |
| for image in single_object_images: | |
| # resize the image | |
| res = cv2.resize(image,(500*image.shape[1]//image.shape[0],500), interpolation = cv2.INTER_CUBIC) | |
| # brightness and contrast | |
| brightness = 16 | |
| contrast = 0.95 | |
| res2 = cv2.addWeighted(res, contrast, np.zeros(res.shape, res.dtype), 0, brightness) | |
| # Sharpen the image | |
| kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]]) | |
| sharpened_image = cv2.filter2D(res2, -1, kernel) | |
| #append enhnaced single object image | |
| enhanced_single_object_images.append(sharpened_image) | |
| return enhanced_images,enhanced_single_object_images | |
| def detect_activity(single_object_images): | |
| """_summary_ | |
| Args: | |
| single_object_images (list of numpy array): list of single object images | |
| Returns: | |
| activities(list of strings): returns list of activities perform by person | |
| """ | |
| activities = [] | |
| for img in single_object_images: | |
| predictions =activity_det_model.predict(img) | |
| for result in predictions: | |
| probs = result.probs | |
| class_index = probs.top1 | |
| activities.append(activity_classes[class_index]) | |
| return activities | |
| def get_distances(new_boxes): | |
| """_summary_ | |
| Args: | |
| new_boxes (json array): takes json array of detected image's data | |
| Returns: | |
| distance_list: list of distances of each object | |
| """ | |
| distance_list = [] | |
| for box in new_boxes: | |
| for actual_box in box['actual_boundries']: | |
| height = actual_box['bottom_right'][1] - actual_box['top_left'][1] | |
| if actual_box['class'] == "person": | |
| distance = FOCAL_LENGTH*PERSON_HEIGHT/height | |
| elif actual_box['class'] == "vehical": | |
| distance = FOCAL_LENGTH*PERSON_HEIGHT/height | |
| else: | |
| distance = FOCAL_LENGTH*PERSON_HEIGHT/height | |
| distance_list.append(str(round(distance)) + "m") | |
| return distance_list | |
| def get_json_data(json_data,enhanced_images,detected_activity,distances_list): | |
| """_summary_ | |
| Args: | |
| json_data (json Array): get json data of image | |
| enhanced_images (list of numpy array): list of enhanced images | |
| detected_activity (list of strings): list of activities of person | |
| distances_list (lsit of integers): list of distances of each object | |
| Returns: | |
| results(json Array): contains all informations needed for frontend | |
| {'zoomed_img':np.array([]) , | |
| 'actual_boxes':[], | |
| 'updated_boxes':{}, | |
| } | |
| """ | |
| results = [] | |
| object_count = 0 | |
| activity_count = 0 | |
| for idx,box in enumerate(json_data): | |
| final_json_output = {'zoomed_img':np.array([]) , | |
| 'actual_boxes':[], | |
| 'updated_boxes':{}, | |
| } | |
| final_json_output['zoomed_img'] = enhanced_images[idx] | |
| final_json_output['updated_boxes'] = { "top_left": box['updated_boundries']['top_left'], | |
| "bottom_right": box['updated_boundries']['bottom_right']} | |
| for actual_box in box['actual_boundries']: | |
| temp = {"top_left": [], | |
| "bottom_right": [], | |
| "class": "", | |
| "distance":0, | |
| "activity":'none'} | |
| temp['top_left'] = actual_box['top_left'] | |
| temp['bottom_right'] = actual_box['bottom_right'] | |
| temp['class'] = actual_box['class'] | |
| temp['distance'] = distances_list[object_count] | |
| object_count+=1 | |
| if temp['class'] == 'person': | |
| temp['activity'] = detected_activity[activity_count] | |
| activity_count+=1 | |
| final_json_output['actual_boxes'].append(temp) | |
| final_json_output = fix_distance(final_json_output) | |
| results.append(final_json_output) | |
| return results | |
| def fix_distance(final_json_output): | |
| """_summary_ | |
| Args: | |
| final_json_output (json Array): array of json object | |
| Returns: | |
| final_json_output (json Array): array of json object | |
| """ | |
| distances = [] | |
| DIFF = 90 | |
| for idx,box in enumerate(final_json_output['actual_boxes']): | |
| distances.append({'idx':idx,'distance':int(box['distance'][:-1])}) | |
| sorted_dist = sorted(distances, key=lambda d: d['distance']) | |
| sum_dist = [] | |
| idx= 0 | |
| sum_dist.append({'sum':sorted_dist[0]['distance'],'idxes':[sorted_dist[0]['idx']]}) | |
| for i in range(1,len(sorted_dist)): | |
| print(sorted_dist[i]['distance'],sorted_dist[i-1]['distance']) | |
| if abs(sorted_dist[i]['distance']-sorted_dist[i-1]['distance']) <=DIFF: | |
| sum_dist[idx]['sum']+= sorted_dist[i]['distance'] | |
| sum_dist[idx]['idxes'].append(sorted_dist[i]['idx']) | |
| else: | |
| sum_dist.append({'sum':sorted_dist[i]['distance'],'idxes':[sorted_dist[i]['idx']]}) | |
| idx+=1 | |
| #change values in distance array | |
| for data in sum_dist: | |
| count = len(data['idxes']) | |
| mean = data['sum']//count | |
| for i in data['idxes']: | |
| final_json_output['actual_boxes'][i]['distance'] = str(mean)+'m' | |
| return final_json_output | |