import numpy as np from sklearn.utils.linear_assignment_ import linear_assignment import copy from sklearn.metrics.pairwise import cosine_similarity as cosine class Tracker(object): def __init__(self, opt): self.opt = opt self.reset() self.nID = 10000 self.alpha = 0.1 def init_track(self, results): for item in results: if item['score'] > self.opt.new_thresh: self.id_count += 1 # active and age are never used in the paper item['active'] = 1 item['age'] = 1 item['tracking_id'] = self.id_count if not ('ct' in item): bbox = item['bbox'] item['ct'] = [(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2] self.tracks.append(item) self.nID = 10000 self.embedding_bank = np.zeros((self.nID, 128)) self.cat_bank = np.zeros((self.nID), dtype=np.int) def reset(self): self.id_count = 0 self.nID = 10000 self.tracks = [] self.embedding_bank = np.zeros((self.nID, 128)) self.cat_bank = np.zeros((self.nID), dtype=np.int) self.tracklet_ages = np.zeros((self.nID), dtype=np.int) self.alive = [] def step(self, results_with_low, public_det=None): results = [item for item in results_with_low if item['score'] >= self.opt.track_thresh] # first association N = len(results) M = len(self.tracks) self.alive = [] track_boxes = np.array([[track['bbox'][0], track['bbox'][1], track['bbox'][2], track['bbox'][3]] for track in self.tracks], np.float32) # M x 4 det_boxes = np.array([[item['bbox'][0], item['bbox'][1], item['bbox'][2], item['bbox'][3]] for item in results], np.float32) # N x 4 box_ious = self.bbox_overlaps_py(det_boxes, track_boxes) dets = np.array( [det['ct'] + det['tracking'] for det in results], np.float32) # N x 2 track_size = np.array([((track['bbox'][2] - track['bbox'][0]) * \ (track['bbox'][3] - track['bbox'][1])) \ for track in self.tracks], np.float32) # M track_cat = np.array([track['class'] for track in self.tracks], np.int32) # M item_size = np.array([((item['bbox'][2] - item['bbox'][0]) * \ (item['bbox'][3] - item['bbox'][1])) \ for item in results], np.float32) # N item_cat = np.array([item['class'] for item in results], np.int32) # N tracks = np.array( [pre_det['ct'] for pre_det in self.tracks], np.float32) # M x 2 dist = (((tracks.reshape(1, -1, 2) - \ dets.reshape(-1, 1, 2)) ** 2).sum(axis=2)) # N x M if self.opt.dataset == 'youtube_vis': invalid = ((dist > track_size.reshape(1, M)) + \ (dist > item_size.reshape(N, 1)) + (box_ious < self.opt.overlap_thresh)) > 0 else: invalid = ((dist > track_size.reshape(1, M)) + \ (dist > item_size.reshape(N, 1)) + \ (item_cat.reshape(N, 1) != track_cat.reshape(1, M)) + (box_ious < self.opt.overlap_thresh)) > 0 dist = dist + invalid * 1e18 if self.opt.hungarian: item_score = np.array([item['score'] for item in results], np.float32) # N dist[dist > 1e18] = 1e18 matched_indices = linear_assignment(dist) else: matched_indices = greedy_assignment(copy.deepcopy(dist)) unmatched_dets = [d for d in range(dets.shape[0]) \ if not (d in matched_indices[:, 0])] unmatched_tracks = [d for d in range(tracks.shape[0]) \ if not (d in matched_indices[:, 1])] if self.opt.hungarian: matches = [] for m in matched_indices: if dist[m[0], m[1]] > 1e16: unmatched_dets.append(m[0]) unmatched_tracks.append(m[1]) else: matches.append(m) matches = np.array(matches).reshape(-1, 2) else: matches = matched_indices ret = [] for m in matches: track = results[m[0]] track['tracking_id'] = self.tracks[m[1]]['tracking_id'] track['age'] = 1 track['active'] = self.tracks[m[1]]['active'] + 1 if 'embedding' in track: self.alive.append(track['tracking_id']) self.embedding_bank[self.tracks[m[1]]['tracking_id'] - 1, :] = self.alpha * track['embedding'] \ + (1 - self.alpha) * self.embedding_bank[ self.tracks[m[1]][ 'tracking_id'] - 1, :] self.cat_bank[self.tracks[m[1]]['tracking_id'] - 1] = track['class'] ret.append(track) if self.opt.public_det and len(unmatched_dets) > 0: # Public detection: only create tracks from provided detections pub_dets = np.array([d['ct'] for d in public_det], np.float32) dist3 = ((dets.reshape(-1, 1, 2) - pub_dets.reshape(1, -1, 2)) ** 2).sum( axis=2) matched_dets = [d for d in range(dets.shape[0]) \ if not (d in unmatched_dets)] dist3[matched_dets] = 1e18 for j in range(len(pub_dets)): i = dist3[:, j].argmin() if dist3[i, j] < item_size[i]: dist3[i, :] = 1e18 track = results[i] if track['score'] > self.opt.new_thresh: self.id_count += 1 track['tracking_id'] = self.id_count track['age'] = 1 track['active'] = 1 ret.append(track) else: # Private detection: create tracks for all un-matched detections for i in unmatched_dets: track = results[i] if track['score'] > self.opt.new_thresh: if 'embedding' in track: max_id, max_cos = self.get_similarity(track['embedding'], False, track['class']) if max_cos >= 0.3 and self.tracklet_ages[max_id - 1] < self.opt.window_size: track['tracking_id'] = max_id track['age'] = 1 track['active'] = 1 self.embedding_bank[track['tracking_id'] - 1, :] = self.alpha * track['embedding'] \ + (1 - self.alpha) * self.embedding_bank[track['tracking_id'] - 1,:] else: self.id_count += 1 track['tracking_id'] = self.id_count track['age'] = 1 track['active'] = 1 self.embedding_bank[self.id_count - 1, :] = track['embedding'] self.cat_bank[self.id_count - 1] = track['class'] self.alive.append(track['tracking_id']) ret.append(track) else: self.id_count += 1 track['tracking_id'] = self.id_count track['age'] = 1 track['active'] = 1 ret.append(track) self.tracklet_ages[:self.id_count] = self.tracklet_ages[:self.id_count] + 1 for track in ret: self.tracklet_ages[track['tracking_id'] - 1] = 1 # second association results_second = [item for item in results_with_low if item['score'] < self.opt.track_thresh] self_tracks_second = [self.tracks[i] for i in unmatched_tracks if self.tracks[i]['active'] > 0] second2original = [i for i in unmatched_tracks if self.tracks[i]['active'] > 0] N = len(results_second) M = len(self_tracks_second) if N > 0 and M > 0: track_boxes_second = np.array([[track['bbox'][0], track['bbox'][1], track['bbox'][2], track['bbox'][3]] for track in self_tracks_second], np.float32) # M x 4 det_boxes_second = np.array([[item['bbox'][0], item['bbox'][1], item['bbox'][2], item['bbox'][3]] for item in results_second], np.float32) # N x 4 box_ious_second = self.bbox_overlaps_py(det_boxes_second, track_boxes_second) dets = np.array( [det['ct'] + det['tracking'] for det in results_second], np.float32) # N x 2 track_size = np.array([((track['bbox'][2] - track['bbox'][0]) * \ (track['bbox'][3] - track['bbox'][1])) \ for track in self_tracks_second], np.float32) # M track_cat = np.array([track['class'] for track in self_tracks_second], np.int32) # M item_size = np.array([((item['bbox'][2] - item['bbox'][0]) * \ (item['bbox'][3] - item['bbox'][1])) \ for item in results_second], np.float32) # N item_cat = np.array([item['class'] for item in results_second], np.int32) # N tracks_second = np.array( [pre_det['ct'] for pre_det in self_tracks_second], np.float32) # M x 2 dist = (((tracks_second.reshape(1, -1, 2) - \ dets.reshape(-1, 1, 2)) ** 2).sum(axis=2)) # N x M invalid = ((dist > track_size.reshape(1, M)) + \ (dist > item_size.reshape(N, 1)) + \ (item_cat.reshape(N, 1) != track_cat.reshape(1, M)) + (box_ious_second < 0.3)) > 0 dist = dist + invalid * 1e18 matched_indices_second = greedy_assignment(copy.deepcopy(dist), 1e8) unmatched_tracks_second = [d for d in range(tracks_second.shape[0]) \ if not (d in matched_indices_second[:, 1])] matches_second = matched_indices_second for m in matches_second: track = results_second[m[0]] track['tracking_id'] = self_tracks_second[m[1]]['tracking_id'] track['age'] = 1 track['active'] = self_tracks_second[m[1]]['active'] + 1 if 'embedding' in track: self.alive.append(track['tracking_id']) self.embedding_bank[self_tracks_second[m[1]]['tracking_id'] - 1, :] = self.alpha * track['embedding'] \ + (1 - self.alpha) * self.embedding_bank[self_tracks_second[m[1]]['tracking_id'] - 1,:] self.cat_bank[self_tracks_second[m[1]]['tracking_id'] - 1] = track['class'] ret.append(track) unmatched_tracks = [second2original[i] for i in unmatched_tracks_second] + \ [i for i in unmatched_tracks if self.tracks[i]['active'] == 0] # Never used for i in unmatched_tracks: track = self.tracks[i] if track['age'] < self.opt.max_age: track['age'] += 1 track['active'] = 1 # 0 bbox = track['bbox'] ct = track['ct'] v = [0, 0] track['bbox'] = [ bbox[0] + v[0], bbox[1] + v[1], bbox[2] + v[0], bbox[3] + v[1]] track['ct'] = [ct[0] + v[0], ct[1] + v[1]] ret.append(track) for r_ in ret: del r_['embedding'] self.tracks = ret return ret def get_similarity(self, feat, stat, cls): max_id = -1 max_cos = -1 if stat: nID = self.id_count else: nID = self.id_count a = feat[None, :] b = self.embedding_bank[:nID, :] if len(b) > 0: alive = np.array(self.alive, dtype=np.int) - 1 cosim = cosine(a, b) cosim = np.reshape(cosim, newshape=(-1)) cosim[alive] = -2 cosim[nID - 1] = -2 cosim[np.where(self.cat_bank[:nID] != cls)[0]] = -2 max_id = int(np.argmax(cosim) + 1) max_cos = np.max(cosim) return max_id, max_cos def bbox_overlaps_py(self, boxes, query_boxes): """ determine overlaps between boxes and query_boxes :param boxes: n * 4 bounding boxes :param query_boxes: k * 4 bounding boxes :return: overlaps: n * k overlaps """ n_ = boxes.shape[0] k_ = query_boxes.shape[0] overlaps = np.zeros((n_, k_), dtype=np.float) for k in range(k_): query_box_area = (query_boxes[k, 2] - query_boxes[k, 0] + 1) * (query_boxes[k, 3] - query_boxes[k, 1] + 1) for n in range(n_): iw = min(boxes[n, 2], query_boxes[k, 2]) - max(boxes[n, 0], query_boxes[k, 0]) + 1 if iw > 0: ih = min(boxes[n, 3], query_boxes[k, 3]) - max(boxes[n, 1], query_boxes[k, 1]) + 1 if ih > 0: box_area = (boxes[n, 2] - boxes[n, 0] + 1) * (boxes[n, 3] - boxes[n, 1] + 1) all_area = float(box_area + query_box_area - iw * ih) overlaps[n, k] = iw * ih / all_area return overlaps def greedy_assignment(dist, thresh=1e16): matched_indices = [] if dist.shape[1] == 0: return np.array(matched_indices, np.int32).reshape(-1, 2) for i in range(dist.shape[0]): j = dist[i].argmin() if dist[i][j] < thresh: dist[:, j] = 1e18 matched_indices.append([i, j]) return np.array(matched_indices, np.int32).reshape(-1, 2)