Spaces:
Runtime error
Runtime error
import numpy as np | |
from sklearn.utils.linear_assignment_ import linear_assignment | |
# from numba import jit | |
import copy | |
class Tracker(object): | |
def __init__(self, opt): | |
self.opt = opt | |
self.reset() | |
def init_track(self, results): | |
for item in results: | |
if item['score'] > self.opt.new_thresh: | |
self.id_count += 1 | |
# active and age are never used in the paper | |
item['active'] = 1 | |
item['age'] = 1 | |
item['tracking_id'] = self.id_count | |
if not ('ct' in item): | |
bbox = item['bbox'] | |
item['ct'] = [(bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2] | |
self.tracks.append(item) | |
def reset(self): | |
self.id_count = 0 | |
self.tracks = [] | |
def step(self, results_with_low, public_det=None): | |
results = [item for item in results_with_low if item['score'] >= self.opt.track_thresh] | |
# first association | |
N = len(results) | |
M = len(self.tracks) | |
dets = np.array( | |
[det['ct'] + det['tracking'] for det in results], np.float32) # N x 2 | |
track_size = np.array([((track['bbox'][2] - track['bbox'][0]) * \ | |
(track['bbox'][3] - track['bbox'][1])) \ | |
for track in self.tracks], np.float32) # M | |
track_cat = np.array([track['class'] for track in self.tracks], np.int32) # M | |
item_size = np.array([((item['bbox'][2] - item['bbox'][0]) * \ | |
(item['bbox'][3] - item['bbox'][1])) \ | |
for item in results], np.float32) # N | |
item_cat = np.array([item['class'] for item in results], np.int32) # N | |
tracks = np.array( | |
[pre_det['ct'] for pre_det in self.tracks], np.float32) # M x 2 | |
dist = (((tracks.reshape(1, -1, 2) - \ | |
dets.reshape(-1, 1, 2)) ** 2).sum(axis=2)) # N x M | |
invalid = ((dist > track_size.reshape(1, M)) + \ | |
(dist > item_size.reshape(N, 1)) + \ | |
(item_cat.reshape(N, 1) != track_cat.reshape(1, M))) > 0 | |
dist = dist + invalid * 1e18 | |
if self.opt.hungarian: | |
assert not self.opt.hungarian, 'we only verify centertrack with greedy_assignment' | |
item_score = np.array([item['score'] for item in results], np.float32) # N | |
dist[dist > 1e18] = 1e18 | |
matched_indices = linear_assignment(dist) | |
else: | |
matched_indices = greedy_assignment(copy.deepcopy(dist)) | |
unmatched_dets = [d for d in range(dets.shape[0]) \ | |
if not (d in matched_indices[:, 0])] | |
unmatched_tracks = [d for d in range(tracks.shape[0]) \ | |
if not (d in matched_indices[:, 1])] | |
if self.opt.hungarian: | |
assert not self.opt.hungarian, 'we only verify centertrack with greedy_assignment' | |
matches = [] | |
for m in matched_indices: | |
if dist[m[0], m[1]] > 1e16: | |
unmatched_dets.append(m[0]) | |
unmatched_tracks.append(m[1]) | |
else: | |
matches.append(m) | |
matches = np.array(matches).reshape(-1, 2) | |
else: | |
matches = matched_indices | |
ret = [] | |
for m in matches: | |
track = results[m[0]] | |
track['tracking_id'] = self.tracks[m[1]]['tracking_id'] | |
track['age'] = 1 | |
track['active'] = self.tracks[m[1]]['active'] + 1 | |
ret.append(track) | |
if self.opt.public_det and len(unmatched_dets) > 0: | |
assert not self.opt.public_det, 'we only verify centertrack with private detection' | |
# Public detection: only create tracks from provided detections | |
pub_dets = np.array([d['ct'] for d in public_det], np.float32) | |
dist3 = ((dets.reshape(-1, 1, 2) - pub_dets.reshape(1, -1, 2)) ** 2).sum( | |
axis=2) | |
matched_dets = [d for d in range(dets.shape[0]) \ | |
if not (d in unmatched_dets)] | |
dist3[matched_dets] = 1e18 | |
for j in range(len(pub_dets)): | |
i = dist3[:, j].argmin() | |
if dist3[i, j] < item_size[i]: | |
dist3[i, :] = 1e18 | |
track = results[i] | |
if track['score'] > self.opt.new_thresh: | |
self.id_count += 1 | |
track['tracking_id'] = self.id_count | |
track['age'] = 1 | |
track['active'] = 1 | |
ret.append(track) | |
else: | |
# Private detection: create tracks for all un-matched detections | |
for i in unmatched_dets: | |
track = results[i] | |
if track['score'] > self.opt.new_thresh: | |
self.id_count += 1 | |
track['tracking_id'] = self.id_count | |
track['age'] = 1 | |
track['active'] = 1 | |
ret.append(track) | |
# second association | |
results_second = [item for item in results_with_low if item['score'] < self.opt.track_thresh] | |
self_tracks_second = [self.tracks[i] for i in unmatched_tracks if self.tracks[i]['active'] > 0] | |
second2original = [i for i in unmatched_tracks if self.tracks[i]['active'] > 0] | |
N = len(results_second) | |
M = len(self_tracks_second) | |
if N > 0 and M > 0: | |
dets = np.array( | |
[det['ct'] + det['tracking'] for det in results_second], np.float32) # N x 2 | |
track_size = np.array([((track['bbox'][2] - track['bbox'][0]) * \ | |
(track['bbox'][3] - track['bbox'][1])) \ | |
for track in self_tracks_second], np.float32) # M | |
track_cat = np.array([track['class'] for track in self_tracks_second], np.int32) # M | |
item_size = np.array([((item['bbox'][2] - item['bbox'][0]) * \ | |
(item['bbox'][3] - item['bbox'][1])) \ | |
for item in results_second], np.float32) # N | |
item_cat = np.array([item['class'] for item in results_second], np.int32) # N | |
tracks_second = np.array( | |
[pre_det['ct'] for pre_det in self_tracks_second], np.float32) # M x 2 | |
dist = (((tracks_second.reshape(1, -1, 2) - \ | |
dets.reshape(-1, 1, 2)) ** 2).sum(axis=2)) # N x M | |
invalid = ((dist > track_size.reshape(1, M)) + \ | |
(dist > item_size.reshape(N, 1)) + \ | |
(item_cat.reshape(N, 1) != track_cat.reshape(1, M))) > 0 | |
dist = dist + invalid * 1e18 | |
matched_indices_second = greedy_assignment(copy.deepcopy(dist), 1e8) | |
unmatched_tracks_second = [d for d in range(tracks_second.shape[0]) \ | |
if not (d in matched_indices_second[:, 1])] | |
matches_second = matched_indices_second | |
for m in matches_second: | |
track = results_second[m[0]] | |
track['tracking_id'] = self_tracks_second[m[1]]['tracking_id'] | |
track['age'] = 1 | |
track['active'] = self_tracks_second[m[1]]['active'] + 1 | |
ret.append(track) | |
unmatched_tracks = [second2original[i] for i in unmatched_tracks_second] + \ | |
[i for i in unmatched_tracks if self.tracks[i]['active'] == 0] | |
#. for debug | |
# unmatched_tracks = [i for i in unmatched_tracks if self.tracks[i]['active'] > 0] + \ | |
# [i for i in unmatched_tracks if self.tracks[i]['active'] == 0] | |
for i in unmatched_tracks: | |
track = self.tracks[i] | |
if track['age'] < self.opt.max_age: | |
track['age'] += 1 | |
track['active'] = 0 | |
bbox = track['bbox'] | |
ct = track['ct'] | |
v = [0, 0] | |
track['bbox'] = [ | |
bbox[0] + v[0], bbox[1] + v[1], | |
bbox[2] + v[0], bbox[3] + v[1]] | |
track['ct'] = [ct[0] + v[0], ct[1] + v[1]] | |
ret.append(track) | |
self.tracks = ret | |
return ret | |
def greedy_assignment(dist, thresh=1e16): | |
matched_indices = [] | |
if dist.shape[1] == 0: | |
return np.array(matched_indices, np.int32).reshape(-1, 2) | |
for i in range(dist.shape[0]): | |
j = dist[i].argmin() | |
if dist[i][j] < thresh: | |
dist[:, j] = 1e18 | |
matched_indices.append([i, j]) | |
return np.array(matched_indices, np.int32).reshape(-1, 2) | |