Spaces:
Runtime error
Runtime error
import numpy as np | |
from sklearn.utils.linear_assignment_ import linear_assignment | |
import copy | |
from sklearn.metrics.pairwise import cosine_similarity as cosine | |
class Tracker(object): | |
def __init__(self, opt): | |
self.opt = opt | |
self.reset() | |
self.nID = 10000 | |
self.alpha = 0.1 | |
def init_track(self, results): | |
for item in results: | |
if item['score'] > self.opt.new_thresh: | |
self.id_count += 1 | |
# active and age are never used in the paper | |
item['active'] = 1 | |
item['age'] = 1 | |
item['tracking_id'] = self.id_count | |
if not ('ct' in item): | |
bbox = item['bbox'] | |
item['ct'] = [(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2] | |
self.tracks.append(item) | |
self.nID = 10000 | |
self.embedding_bank = np.zeros((self.nID, 128)) | |
self.cat_bank = np.zeros((self.nID), dtype=np.int) | |
def reset(self): | |
self.id_count = 0 | |
self.nID = 10000 | |
self.tracks = [] | |
self.embedding_bank = np.zeros((self.nID, 128)) | |
self.cat_bank = np.zeros((self.nID), dtype=np.int) | |
self.tracklet_ages = np.zeros((self.nID), dtype=np.int) | |
self.alive = [] | |
def step(self, results_with_low, public_det=None): | |
results = [item for item in results_with_low if item['score'] >= self.opt.track_thresh] | |
# first association | |
N = len(results) | |
M = len(self.tracks) | |
self.alive = [] | |
track_boxes = np.array([[track['bbox'][0], track['bbox'][1], | |
track['bbox'][2], track['bbox'][3]] for track in self.tracks], np.float32) # M x 4 | |
det_boxes = np.array([[item['bbox'][0], item['bbox'][1], | |
item['bbox'][2], item['bbox'][3]] for item in results], np.float32) # N x 4 | |
box_ious = self.bbox_overlaps_py(det_boxes, track_boxes) | |
dets = np.array( | |
[det['ct'] + det['tracking'] for det in results], np.float32) # N x 2 | |
track_size = np.array([((track['bbox'][2] - track['bbox'][0]) * \ | |
(track['bbox'][3] - track['bbox'][1])) \ | |
for track in self.tracks], np.float32) # M | |
track_cat = np.array([track['class'] for track in self.tracks], np.int32) # M | |
item_size = np.array([((item['bbox'][2] - item['bbox'][0]) * \ | |
(item['bbox'][3] - item['bbox'][1])) \ | |
for item in results], np.float32) # N | |
item_cat = np.array([item['class'] for item in results], np.int32) # N | |
tracks = np.array( | |
[pre_det['ct'] for pre_det in self.tracks], np.float32) # M x 2 | |
dist = (((tracks.reshape(1, -1, 2) - \ | |
dets.reshape(-1, 1, 2)) ** 2).sum(axis=2)) # N x M | |
if self.opt.dataset == 'youtube_vis': | |
invalid = ((dist > track_size.reshape(1, M)) + \ | |
(dist > item_size.reshape(N, 1)) + (box_ious < self.opt.overlap_thresh)) > 0 | |
else: | |
invalid = ((dist > track_size.reshape(1, M)) + \ | |
(dist > item_size.reshape(N, 1)) + \ | |
(item_cat.reshape(N, 1) != track_cat.reshape(1, M)) + (box_ious < self.opt.overlap_thresh)) > 0 | |
dist = dist + invalid * 1e18 | |
if self.opt.hungarian: | |
item_score = np.array([item['score'] for item in results], np.float32) # N | |
dist[dist > 1e18] = 1e18 | |
matched_indices = linear_assignment(dist) | |
else: | |
matched_indices = greedy_assignment(copy.deepcopy(dist)) | |
unmatched_dets = [d for d in range(dets.shape[0]) \ | |
if not (d in matched_indices[:, 0])] | |
unmatched_tracks = [d for d in range(tracks.shape[0]) \ | |
if not (d in matched_indices[:, 1])] | |
if self.opt.hungarian: | |
matches = [] | |
for m in matched_indices: | |
if dist[m[0], m[1]] > 1e16: | |
unmatched_dets.append(m[0]) | |
unmatched_tracks.append(m[1]) | |
else: | |
matches.append(m) | |
matches = np.array(matches).reshape(-1, 2) | |
else: | |
matches = matched_indices | |
ret = [] | |
for m in matches: | |
track = results[m[0]] | |
track['tracking_id'] = self.tracks[m[1]]['tracking_id'] | |
track['age'] = 1 | |
track['active'] = self.tracks[m[1]]['active'] + 1 | |
if 'embedding' in track: | |
self.alive.append(track['tracking_id']) | |
self.embedding_bank[self.tracks[m[1]]['tracking_id'] - 1, :] = self.alpha * track['embedding'] \ | |
+ (1 - self.alpha) * self.embedding_bank[ | |
self.tracks[m[1]][ | |
'tracking_id'] - 1, | |
:] | |
self.cat_bank[self.tracks[m[1]]['tracking_id'] - 1] = track['class'] | |
ret.append(track) | |
if self.opt.public_det and len(unmatched_dets) > 0: | |
# Public detection: only create tracks from provided detections | |
pub_dets = np.array([d['ct'] for d in public_det], np.float32) | |
dist3 = ((dets.reshape(-1, 1, 2) - pub_dets.reshape(1, -1, 2)) ** 2).sum( | |
axis=2) | |
matched_dets = [d for d in range(dets.shape[0]) \ | |
if not (d in unmatched_dets)] | |
dist3[matched_dets] = 1e18 | |
for j in range(len(pub_dets)): | |
i = dist3[:, j].argmin() | |
if dist3[i, j] < item_size[i]: | |
dist3[i, :] = 1e18 | |
track = results[i] | |
if track['score'] > self.opt.new_thresh: | |
self.id_count += 1 | |
track['tracking_id'] = self.id_count | |
track['age'] = 1 | |
track['active'] = 1 | |
ret.append(track) | |
else: | |
# Private detection: create tracks for all un-matched detections | |
for i in unmatched_dets: | |
track = results[i] | |
if track['score'] > self.opt.new_thresh: | |
if 'embedding' in track: | |
max_id, max_cos = self.get_similarity(track['embedding'], False, track['class']) | |
if max_cos >= 0.3 and self.tracklet_ages[max_id - 1] < self.opt.window_size: | |
track['tracking_id'] = max_id | |
track['age'] = 1 | |
track['active'] = 1 | |
self.embedding_bank[track['tracking_id'] - 1, :] = self.alpha * track['embedding'] \ | |
+ (1 - self.alpha) * self.embedding_bank[track['tracking_id'] - 1,:] | |
else: | |
self.id_count += 1 | |
track['tracking_id'] = self.id_count | |
track['age'] = 1 | |
track['active'] = 1 | |
self.embedding_bank[self.id_count - 1, :] = track['embedding'] | |
self.cat_bank[self.id_count - 1] = track['class'] | |
self.alive.append(track['tracking_id']) | |
ret.append(track) | |
else: | |
self.id_count += 1 | |
track['tracking_id'] = self.id_count | |
track['age'] = 1 | |
track['active'] = 1 | |
ret.append(track) | |
self.tracklet_ages[:self.id_count] = self.tracklet_ages[:self.id_count] + 1 | |
for track in ret: | |
self.tracklet_ages[track['tracking_id'] - 1] = 1 | |
# second association | |
results_second = [item for item in results_with_low if item['score'] < self.opt.track_thresh] | |
self_tracks_second = [self.tracks[i] for i in unmatched_tracks if self.tracks[i]['active'] > 0] | |
second2original = [i for i in unmatched_tracks if self.tracks[i]['active'] > 0] | |
N = len(results_second) | |
M = len(self_tracks_second) | |
if N > 0 and M > 0: | |
track_boxes_second = np.array([[track['bbox'][0], track['bbox'][1], | |
track['bbox'][2], track['bbox'][3]] for track in self_tracks_second], np.float32) # M x 4 | |
det_boxes_second = np.array([[item['bbox'][0], item['bbox'][1], | |
item['bbox'][2], item['bbox'][3]] for item in results_second], np.float32) # N x 4 | |
box_ious_second = self.bbox_overlaps_py(det_boxes_second, track_boxes_second) | |
dets = np.array( | |
[det['ct'] + det['tracking'] for det in results_second], np.float32) # N x 2 | |
track_size = np.array([((track['bbox'][2] - track['bbox'][0]) * \ | |
(track['bbox'][3] - track['bbox'][1])) \ | |
for track in self_tracks_second], np.float32) # M | |
track_cat = np.array([track['class'] for track in self_tracks_second], np.int32) # M | |
item_size = np.array([((item['bbox'][2] - item['bbox'][0]) * \ | |
(item['bbox'][3] - item['bbox'][1])) \ | |
for item in results_second], np.float32) # N | |
item_cat = np.array([item['class'] for item in results_second], np.int32) # N | |
tracks_second = np.array( | |
[pre_det['ct'] for pre_det in self_tracks_second], np.float32) # M x 2 | |
dist = (((tracks_second.reshape(1, -1, 2) - \ | |
dets.reshape(-1, 1, 2)) ** 2).sum(axis=2)) # N x M | |
invalid = ((dist > track_size.reshape(1, M)) + \ | |
(dist > item_size.reshape(N, 1)) + \ | |
(item_cat.reshape(N, 1) != track_cat.reshape(1, M)) + (box_ious_second < 0.3)) > 0 | |
dist = dist + invalid * 1e18 | |
matched_indices_second = greedy_assignment(copy.deepcopy(dist), 1e8) | |
unmatched_tracks_second = [d for d in range(tracks_second.shape[0]) \ | |
if not (d in matched_indices_second[:, 1])] | |
matches_second = matched_indices_second | |
for m in matches_second: | |
track = results_second[m[0]] | |
track['tracking_id'] = self_tracks_second[m[1]]['tracking_id'] | |
track['age'] = 1 | |
track['active'] = self_tracks_second[m[1]]['active'] + 1 | |
if 'embedding' in track: | |
self.alive.append(track['tracking_id']) | |
self.embedding_bank[self_tracks_second[m[1]]['tracking_id'] - 1, :] = self.alpha * track['embedding'] \ | |
+ (1 - self.alpha) * self.embedding_bank[self_tracks_second[m[1]]['tracking_id'] - 1,:] | |
self.cat_bank[self_tracks_second[m[1]]['tracking_id'] - 1] = track['class'] | |
ret.append(track) | |
unmatched_tracks = [second2original[i] for i in unmatched_tracks_second] + \ | |
[i for i in unmatched_tracks if self.tracks[i]['active'] == 0] | |
# Never used | |
for i in unmatched_tracks: | |
track = self.tracks[i] | |
if track['age'] < self.opt.max_age: | |
track['age'] += 1 | |
track['active'] = 1 # 0 | |
bbox = track['bbox'] | |
ct = track['ct'] | |
v = [0, 0] | |
track['bbox'] = [ | |
bbox[0] + v[0], bbox[1] + v[1], | |
bbox[2] + v[0], bbox[3] + v[1]] | |
track['ct'] = [ct[0] + v[0], ct[1] + v[1]] | |
ret.append(track) | |
for r_ in ret: | |
del r_['embedding'] | |
self.tracks = ret | |
return ret | |
def get_similarity(self, feat, stat, cls): | |
max_id = -1 | |
max_cos = -1 | |
if stat: | |
nID = self.id_count | |
else: | |
nID = self.id_count | |
a = feat[None, :] | |
b = self.embedding_bank[:nID, :] | |
if len(b) > 0: | |
alive = np.array(self.alive, dtype=np.int) - 1 | |
cosim = cosine(a, b) | |
cosim = np.reshape(cosim, newshape=(-1)) | |
cosim[alive] = -2 | |
cosim[nID - 1] = -2 | |
cosim[np.where(self.cat_bank[:nID] != cls)[0]] = -2 | |
max_id = int(np.argmax(cosim) + 1) | |
max_cos = np.max(cosim) | |
return max_id, max_cos | |
def bbox_overlaps_py(self, boxes, query_boxes): | |
""" | |
determine overlaps between boxes and query_boxes | |
:param boxes: n * 4 bounding boxes | |
:param query_boxes: k * 4 bounding boxes | |
:return: overlaps: n * k overlaps | |
""" | |
n_ = boxes.shape[0] | |
k_ = query_boxes.shape[0] | |
overlaps = np.zeros((n_, k_), dtype=np.float) | |
for k in range(k_): | |
query_box_area = (query_boxes[k, 2] - query_boxes[k, 0] + 1) * (query_boxes[k, 3] - query_boxes[k, 1] + 1) | |
for n in range(n_): | |
iw = min(boxes[n, 2], query_boxes[k, 2]) - max(boxes[n, 0], query_boxes[k, 0]) + 1 | |
if iw > 0: | |
ih = min(boxes[n, 3], query_boxes[k, 3]) - max(boxes[n, 1], query_boxes[k, 1]) + 1 | |
if ih > 0: | |
box_area = (boxes[n, 2] - boxes[n, 0] + 1) * (boxes[n, 3] - boxes[n, 1] + 1) | |
all_area = float(box_area + query_box_area - iw * ih) | |
overlaps[n, k] = iw * ih / all_area | |
return overlaps | |
def greedy_assignment(dist, thresh=1e16): | |
matched_indices = [] | |
if dist.shape[1] == 0: | |
return np.array(matched_indices, np.int32).reshape(-1, 2) | |
for i in range(dist.shape[0]): | |
j = dist[i].argmin() | |
if dist[i][j] < thresh: | |
dist[:, j] = 1e18 | |
matched_indices.append([i, j]) | |
return np.array(matched_indices, np.int32).reshape(-1, 2) | |