import numpy as np import torchvision import time import math import os import copy import pdb import argparse import sys import cv2 import skimage.io import skimage.transform import skimage.color import skimage import torch import model from torch.utils.data import Dataset, DataLoader from torchvision import datasets, models, transforms from dataloader import CSVDataset, collater, Resizer, AspectRatioBasedSampler, Augmenter, UnNormalizer, Normalizer, RGB_MEAN, RGB_STD from scipy.optimize import linear_sum_assignment # assert torch.__version__.split('.')[1] == '4' print('CUDA available: {}'.format(torch.cuda.is_available())) color_list = [(0, 0, 255), (255, 0, 0), (0, 255, 0), (255, 0, 255), (0, 255, 255), (255, 255, 0), (128, 0, 255), (0, 128, 255), (128, 255, 0), (0, 255, 128), (255, 128, 0), (255, 0, 128), (128, 128, 255), (128, 255, 128), (255, 128, 128), (128, 128, 0), (128, 0, 128)] class detect_rect: def __init__(self): self.curr_frame = 0 self.curr_rect = np.array([0, 0, 1, 1]) self.next_rect = np.array([0, 0, 1, 1]) self.conf = 0 self.id = 0 @property def position(self): x = (self.curr_rect[0] + self.curr_rect[2])/2 y = (self.curr_rect[1] + self.curr_rect[3])/2 return np.array([x, y]) @property def size(self): w = self.curr_rect[2] - self.curr_rect[0] h = self.curr_rect[3] - self.curr_rect[1] return np.array([w, h]) class tracklet: def __init__(self, det_rect): self.id = det_rect.id self.rect_list = [det_rect] self.rect_num = 1 self.last_rect = det_rect self.last_frame = det_rect.curr_frame self.no_match_frame = 0 def add_rect(self, det_rect): self.rect_list.append(det_rect) self.rect_num = self.rect_num + 1 self.last_rect = det_rect self.last_frame = det_rect.curr_frame @property def velocity(self): if(self.rect_num < 2): return (0, 0) elif(self.rect_num < 6): return (self.rect_list[self.rect_num - 1].position - self.rect_list[self.rect_num - 2].position) / (self.rect_list[self.rect_num - 1].curr_frame - self.rect_list[self.rect_num - 2].curr_frame) else: v1 = (self.rect_list[self.rect_num - 1].position - self.rect_list[self.rect_num - 4].position) / (self.rect_list[self.rect_num - 1].curr_frame - self.rect_list[self.rect_num - 4].curr_frame) v2 = (self.rect_list[self.rect_num - 2].position - self.rect_list[self.rect_num - 5].position) / (self.rect_list[self.rect_num - 2].curr_frame - self.rect_list[self.rect_num - 5].curr_frame) v3 = (self.rect_list[self.rect_num - 3].position - self.rect_list[self.rect_num - 6].position) / (self.rect_list[self.rect_num - 3].curr_frame - self.rect_list[self.rect_num - 6].curr_frame) return (v1 + v2 + v3) / 3 def cal_iou(rect1, rect2): x1, y1, x2, y2 = rect1 x3, y3, x4, y4 = rect2 i_w = min(x2, x4) - max(x1, x3) i_h = min(y2, y4) - max(y1, y3) if(i_w <= 0 or i_h <= 0): return 0 i_s = i_w * i_h s_1 = (x2 - x1) * (y2 - y1) s_2 = (x4 - x3) * (y4 - y3) return float(i_s) / (s_1 + s_2 - i_s) def cal_simi(det_rect1, det_rect2): return cal_iou(det_rect1.next_rect, det_rect2.curr_rect) def cal_simi_track_det(track, det_rect): if(det_rect.curr_frame <= track.last_frame): print("cal_simi_track_det error") return 0 elif(det_rect.curr_frame - track.last_frame == 1): return cal_iou(track.last_rect.next_rect, det_rect.curr_rect) else: pred_rect = track.last_rect.curr_rect + np.append(track.velocity, track.velocity) * (det_rect.curr_frame - track.last_frame) return cal_iou(pred_rect, det_rect.curr_rect) def track_det_match(tracklet_list, det_rect_list, min_iou = 0.5): num1 = len(tracklet_list) num2 = len(det_rect_list) cost_mat = np.zeros((num1, num2)) for i in range(num1): for j in range(num2): cost_mat[i, j] = -cal_simi_track_det(tracklet_list[i], det_rect_list[j]) match_result = linear_sum_assignment(cost_mat) match_result = np.asarray(match_result) match_result = np.transpose(match_result) matches, unmatched1, unmatched2 = [], [], [] for i in range(num1): if i not in match_result[:, 0]: unmatched1.append(i) for j in range(num2): if j not in match_result[:, 1]: unmatched2.append(j) for i, j in match_result: if cost_mat[i, j] > -min_iou: unmatched1.append(i) unmatched2.append(j) else: matches.append((i, j)) return matches, unmatched1, unmatched2 def draw_caption(image, box, caption, color): b = np.array(box).astype(int) cv2.putText(image, caption, (b[0], b[1] - 8), cv2.FONT_HERSHEY_PLAIN, 2, color, 2) def run_each_dataset(model_dir, retinanet, dataset_path, subset, cur_dataset): print(cur_dataset) img_list = os.listdir(os.path.join(dataset_path, subset, cur_dataset, 'img1')) img_list = [os.path.join(dataset_path, subset, cur_dataset, 'img1', _) for _ in img_list if ('jpg' in _) or ('png' in _)] img_list = sorted(img_list) img_len = len(img_list) last_feat = None confidence_threshold = 0.4 IOU_threshold = 0.5 retention_threshold = 10 det_list_all = [] tracklet_all = [] max_id = 0 max_draw_len = 100 draw_interval = 5 img_width = 1920 img_height = 1080 fps = 30 for i in range(img_len): det_list_all.append([]) for idx in range((int(img_len / 2)), img_len + 1): i = idx - 1 print('tracking: ', i) with torch.no_grad(): data_path1 = img_list[min(idx, img_len - 1)] img_origin1 = skimage.io.imread(data_path1) img_h, img_w, _ = img_origin1.shape img_height, img_width = img_h, img_w resize_h, resize_w = math.ceil(img_h / 32) * 32, math.ceil(img_w / 32) * 32 img1 = np.zeros((resize_h, resize_w, 3), dtype=img_origin1.dtype) img1[:img_h, :img_w, :] = img_origin1 img1 = (img1.astype(np.float32) / 255.0 - np.array([[RGB_MEAN]])) / np.array([[RGB_STD]]) img1 = torch.from_numpy(img1).permute(2, 0, 1).view(1, 3, resize_h, resize_w) scores, transformed_anchors, last_feat = retinanet(img1.cuda().float(), last_feat=last_feat) # if idx > 0: if idx > (int(img_len / 2)): idxs = np.where(scores>0.1) for j in range(idxs[0].shape[0]): bbox = transformed_anchors[idxs[0][j], :] x1 = int(bbox[0]) y1 = int(bbox[1]) x2 = int(bbox[2]) y2 = int(bbox[3]) x3 = int(bbox[4]) y3 = int(bbox[5]) x4 = int(bbox[6]) y4 = int(bbox[7]) det_conf = float(scores[idxs[0][j]]) det_rect = detect_rect() det_rect.curr_frame = idx det_rect.curr_rect = np.array([x1, y1, x2, y2]) det_rect.next_rect = np.array([x3, y3, x4, y4]) det_rect.conf = det_conf if det_rect.conf > confidence_threshold: det_list_all[det_rect.curr_frame - 1].append(det_rect) # if i == 0: if i == int(img_len / 2): for j in range(len(det_list_all[i])): det_list_all[i][j].id = j + 1 max_id = max(max_id, j + 1) track = tracklet(det_list_all[i][j]) tracklet_all.append(track) continue matches, unmatched1, unmatched2 = track_det_match(tracklet_all, det_list_all[i], IOU_threshold) for j in range(len(matches)): det_list_all[i][matches[j][1]].id = tracklet_all[matches[j][0]].id det_list_all[i][matches[j][1]].id = tracklet_all[matches[j][0]].id tracklet_all[matches[j][0]].add_rect(det_list_all[i][matches[j][1]]) delete_track_list = [] for j in range(len(unmatched1)): tracklet_all[unmatched1[j]].no_match_frame = tracklet_all[unmatched1[j]].no_match_frame + 1 if(tracklet_all[unmatched1[j]].no_match_frame >= retention_threshold): delete_track_list.append(unmatched1[j]) origin_index = set([k for k in range(len(tracklet_all))]) delete_index = set(delete_track_list) left_index = list(origin_index - delete_index) tracklet_all = [tracklet_all[k] for k in left_index] for j in range(len(unmatched2)): det_list_all[i][unmatched2[j]].id = max_id + 1 max_id = max_id + 1 track = tracklet(det_list_all[i][unmatched2[j]]) tracklet_all.append(track) #**************visualize tracking result and save evaluate file**************** fout_tracking = open(os.path.join(model_dir, 'results', cur_dataset + '.txt'), 'w') save_img_dir = os.path.join(model_dir, 'results', cur_dataset) if not os.path.exists(save_img_dir): os.makedirs(save_img_dir) out_video = os.path.join(model_dir, 'results', cur_dataset + '.mp4') videoWriter = cv2.VideoWriter(out_video, cv2.VideoWriter_fourcc('m', 'p', '4', 'v'), fps, (img_width, img_height)) id_dict = {} for i in range((int(img_len / 2)), img_len): print('saving: ', i) img = cv2.imread(img_list[i]) for j in range(len(det_list_all[i])): x1, y1, x2, y2 = det_list_all[i][j].curr_rect.astype(int) trace_id = det_list_all[i][j].id id_dict.setdefault(str(trace_id),[]).append((int((x1+x2)/2), y2)) draw_trace_id = str(trace_id) draw_caption(img, (x1, y1, x2, y2), draw_trace_id, color=color_list[trace_id % len(color_list)]) cv2.rectangle(img, (x1, y1), (x2, y2), color=color_list[trace_id % len(color_list)], thickness=2) trace_len = len(id_dict[str(trace_id)]) trace_len_draw = min(max_draw_len, trace_len) for k in range(trace_len_draw - draw_interval): if(k % draw_interval == 0): draw_point1 = id_dict[str(trace_id)][trace_len - k - 1] draw_point2 = id_dict[str(trace_id)][trace_len - k - 1 - draw_interval] cv2.line(img, draw_point1, draw_point2, color=color_list[trace_id % len(color_list)], thickness=2) fout_tracking.write(str(i+1) + ',' + str(trace_id) + ',' + str(x1) + ',' + str(y1) + ',' + str(x2 - x1) + ',' + str(y2 - y1) + ',-1,-1,-1,-1\n') cv2.imwrite(os.path.join(save_img_dir, str(i + 1).zfill(6) + '.jpg'), img) videoWriter.write(img) # cv2.waitKey(0) fout_tracking.close() videoWriter.release() def run_from_train(model_dir, root_path): if not os.path.exists(os.path.join(model_dir, 'results')): os.makedirs(os.path.join(model_dir, 'results')) retinanet = torch.load(os.path.join(model_dir, 'model_final.pt')) use_gpu = True if use_gpu: retinanet = retinanet.cuda() retinanet.eval() for seq_num in [2, 4, 5, 9, 10, 11, 13]: run_each_dataset(model_dir, retinanet, root_path, 'train', 'MOT17-{:02d}'.format(seq_num)) for seq_num in [1, 3, 6, 7, 8, 12, 14]: run_each_dataset(model_dir, retinanet, root_path, 'test', 'MOT17-{:02d}'.format(seq_num)) def main(args=None): parser = argparse.ArgumentParser(description='Simple script for testing a CTracker network.') parser.add_argument('--dataset_path', default='/dockerdata/home/jeromepeng/data/MOT/MOT17/', type=str, help='Dataset path, location of the images sequence.') parser.add_argument('--model_dir', default='./trained_model/', help='Path to model (.pt) file.') parser.add_argument('--model_path', default='./trained_model/model_final.pth', help='Path to model (.pt) file.') parser = parser.parse_args(args) if not os.path.exists(os.path.join(parser.model_dir, 'results')): os.makedirs(os.path.join(parser.model_dir, 'results')) retinanet = model.resnet50(num_classes=1, pretrained=True) # retinanet_save = torch.load(os.path.join(parser.model_dir, 'model_final.pth')) retinanet_save = torch.load(os.path.join(parser.model_path)) # rename moco pre-trained keys state_dict = retinanet_save.state_dict() for k in list(state_dict.keys()): # retain only encoder up to before the embedding layer if k.startswith('module.'): # remove prefix state_dict[k[len("module."):]] = state_dict[k] # delete renamed or unused k del state_dict[k] retinanet.load_state_dict(state_dict) use_gpu = True if use_gpu: retinanet = retinanet.cuda() retinanet.eval() for seq_num in [2, 4, 5, 9, 10, 11, 13]: run_each_dataset(parser.model_dir, retinanet, parser.dataset_path, 'train', 'MOT17-{:02d}'.format(seq_num)) # for seq_num in [1, 3, 6, 7, 8, 12, 14]: # run_each_dataset(parser.model_dir, retinanet, parser.dataset_path, 'test', 'MOT17-{:02d}'.format(seq_num)) if __name__ == '__main__': main()