bytetrack / yolox /motdt_tracker /motdt_tracker.py
AK391
all files
7734d5b
import numpy as np
#from numba import jit
from collections import OrderedDict, deque
import itertools
import os
import cv2
import torch
from torch._C import dtype
import torchvision
from yolox.motdt_tracker import matching
from .kalman_filter import KalmanFilter
from .reid_model import load_reid_model, extract_reid_features
from yolox.data.dataloading import get_yolox_datadir
from .basetrack import BaseTrack, TrackState
class STrack(BaseTrack):
def __init__(self, tlwh, score, max_n_features=100, from_det=True):
# wait activate
self._tlwh = np.asarray(tlwh, dtype=np.float)
self.kalman_filter = None
self.mean, self.covariance = None, None
self.is_activated = False
self.score = score
self.max_n_features = max_n_features
self.curr_feature = None
self.last_feature = None
self.features = deque([], maxlen=self.max_n_features)
# classification
self.from_det = from_det
self.tracklet_len = 0
self.time_by_tracking = 0
# self-tracking
self.tracker = None
def set_feature(self, feature):
if feature is None:
return False
self.features.append(feature)
self.curr_feature = feature
self.last_feature = feature
# self._p_feature = 0
return True
def predict(self):
if self.time_since_update > 0:
self.tracklet_len = 0
self.time_since_update += 1
mean_state = self.mean.copy()
if self.state != TrackState.Tracked:
mean_state[7] = 0
self.mean, self.covariance = self.kalman_filter.predict(mean_state, self.covariance)
if self.tracker:
self.tracker.update_roi(self.tlwh)
def self_tracking(self, image):
tlwh = self.tracker.predict(image) if self.tracker else self.tlwh
return tlwh
def activate(self, kalman_filter, frame_id, image):
"""Start a new tracklet"""
self.kalman_filter = kalman_filter # type: KalmanFilter
self.track_id = self.next_id()
# cx, cy, aspect_ratio, height, dx, dy, da, dh
self.mean, self.covariance = self.kalman_filter.initiate(self.tlwh_to_xyah(self._tlwh))
# self.tracker = sot.SingleObjectTracker()
# self.tracker.init(image, self.tlwh)
del self._tlwh
self.time_since_update = 0
self.time_by_tracking = 0
self.tracklet_len = 0
self.state = TrackState.Tracked
# self.is_activated = True
self.frame_id = frame_id
self.start_frame = frame_id
def re_activate(self, new_track, frame_id, image, new_id=False):
# self.mean, self.covariance = self.kalman_filter.initiate(self.tlwh_to_xyah(new_track.tlwh))
self.mean, self.covariance = self.kalman_filter.update(
self.mean, self.covariance, self.tlwh_to_xyah(new_track.tlwh)
)
self.time_since_update = 0
self.time_by_tracking = 0
self.tracklet_len = 0
self.state = TrackState.Tracked
self.is_activated = True
self.frame_id = frame_id
if new_id:
self.track_id = self.next_id()
self.set_feature(new_track.curr_feature)
def update(self, new_track, frame_id, image, update_feature=True):
"""
Update a matched track
:type new_track: STrack
:type frame_id: int
:type update_feature: bool
:return:
"""
self.frame_id = frame_id
self.time_since_update = 0
if new_track.from_det:
self.time_by_tracking = 0
else:
self.time_by_tracking += 1
self.tracklet_len += 1
new_tlwh = new_track.tlwh
self.mean, self.covariance = self.kalman_filter.update(
self.mean, self.covariance, self.tlwh_to_xyah(new_tlwh))
self.state = TrackState.Tracked
self.is_activated = True
self.score = new_track.score
if update_feature:
self.set_feature(new_track.curr_feature)
if self.tracker:
self.tracker.update(image, self.tlwh)
@property
#@jit
def tlwh(self):
"""Get current position in bounding box format `(top left x, top left y,
width, height)`.
"""
if self.mean is None:
return self._tlwh.copy()
ret = self.mean[:4].copy()
ret[2] *= ret[3]
ret[:2] -= ret[2:] / 2
return ret
@property
#@jit
def tlbr(self):
"""Convert bounding box to format `(min x, min y, max x, max y)`, i.e.,
`(top left, bottom right)`.
"""
ret = self.tlwh.copy()
ret[2:] += ret[:2]
return ret
@staticmethod
#@jit
def tlwh_to_xyah(tlwh):
"""Convert bounding box to format `(center x, center y, aspect ratio,
height)`, where the aspect ratio is `width / height`.
"""
ret = np.asarray(tlwh).copy()
ret[:2] += ret[2:] / 2
ret[2] /= ret[3]
return ret
def to_xyah(self):
return self.tlwh_to_xyah(self.tlwh)
def tracklet_score(self):
# score = (1 - np.exp(-0.6 * self.hit_streak)) * np.exp(-0.03 * self.time_by_tracking)
score = max(0, 1 - np.log(1 + 0.05 * self.time_by_tracking)) * (self.tracklet_len - self.time_by_tracking > 2)
# score = max(0, 1 - np.log(1 + 0.05 * self.n_tracking)) * (1 - np.exp(-0.6 * self.hit_streak))
return score
def __repr__(self):
return 'OT_{}_({}-{})'.format(self.track_id, self.start_frame, self.end_frame)
class OnlineTracker(object):
def __init__(self, model_folder, min_cls_score=0.4, min_ap_dist=0.8, max_time_lost=30, use_tracking=True, use_refind=True):
self.min_cls_score = min_cls_score
self.min_ap_dist = min_ap_dist
self.max_time_lost = max_time_lost
self.kalman_filter = KalmanFilter()
self.tracked_stracks = [] # type: list[STrack]
self.lost_stracks = [] # type: list[STrack]
self.removed_stracks = [] # type: list[STrack]
self.use_refind = use_refind
self.use_tracking = use_tracking
self.classifier = None
self.reid_model = load_reid_model(model_folder)
self.frame_id = 0
def update(self, output_results, img_info, img_size, img_file_name):
img_file_name = os.path.join(get_yolox_datadir(), 'mot', 'train', img_file_name)
image = cv2.imread(img_file_name)
# post process detections
output_results = output_results.cpu().numpy()
confidences = output_results[:, 4] * output_results[:, 5]
bboxes = output_results[:, :4] # x1y1x2y2
img_h, img_w = img_info[0], img_info[1]
scale = min(img_size[0] / float(img_h), img_size[1] / float(img_w))
bboxes /= scale
bbox_xyxy = bboxes
tlwhs = self._xyxy_to_tlwh_array(bbox_xyxy)
remain_inds = confidences > self.min_cls_score
tlwhs = tlwhs[remain_inds]
det_scores = confidences[remain_inds]
self.frame_id += 1
activated_starcks = []
refind_stracks = []
lost_stracks = []
removed_stracks = []
"""step 1: prediction"""
for strack in itertools.chain(self.tracked_stracks, self.lost_stracks):
strack.predict()
"""step 2: scoring and selection"""
if det_scores is None:
det_scores = np.ones(len(tlwhs), dtype=float)
detections = [STrack(tlwh, score, from_det=True) for tlwh, score in zip(tlwhs, det_scores)]
if self.use_tracking:
tracks = [STrack(t.self_tracking(image), t.score * t.tracklet_score(), from_det=False)
for t in itertools.chain(self.tracked_stracks, self.lost_stracks) if t.is_activated]
detections.extend(tracks)
rois = np.asarray([d.tlbr for d in detections], dtype=np.float32)
scores = np.asarray([d.score for d in detections], dtype=np.float32)
# nms
if len(detections) > 0:
nms_out_index = torchvision.ops.batched_nms(
torch.from_numpy(rois),
torch.from_numpy(scores.reshape(-1)).to(torch.from_numpy(rois).dtype),
torch.zeros_like(torch.from_numpy(scores.reshape(-1))),
0.7,
)
keep = nms_out_index.numpy()
mask = np.zeros(len(rois), dtype=np.bool)
mask[keep] = True
keep = np.where(mask & (scores >= self.min_cls_score))[0]
detections = [detections[i] for i in keep]
scores = scores[keep]
for d, score in zip(detections, scores):
d.score = score
pred_dets = [d for d in detections if not d.from_det]
detections = [d for d in detections if d.from_det]
# set features
tlbrs = [det.tlbr for det in detections]
features = extract_reid_features(self.reid_model, image, tlbrs)
features = features.cpu().numpy()
for i, det in enumerate(detections):
det.set_feature(features[i])
"""step 3: association for tracked"""
# matching for tracked targets
unconfirmed = []
tracked_stracks = [] # type: list[STrack]
for track in self.tracked_stracks:
if not track.is_activated:
unconfirmed.append(track)
else:
tracked_stracks.append(track)
dists = matching.nearest_reid_distance(tracked_stracks, detections, metric='euclidean')
dists = matching.gate_cost_matrix(self.kalman_filter, dists, tracked_stracks, detections)
matches, u_track, u_detection = matching.linear_assignment(dists, thresh=self.min_ap_dist)
for itracked, idet in matches:
tracked_stracks[itracked].update(detections[idet], self.frame_id, image)
# matching for missing targets
detections = [detections[i] for i in u_detection]
dists = matching.nearest_reid_distance(self.lost_stracks, detections, metric='euclidean')
dists = matching.gate_cost_matrix(self.kalman_filter, dists, self.lost_stracks, detections)
matches, u_lost, u_detection = matching.linear_assignment(dists, thresh=self.min_ap_dist)
for ilost, idet in matches:
track = self.lost_stracks[ilost] # type: STrack
det = detections[idet]
track.re_activate(det, self.frame_id, image, new_id=not self.use_refind)
refind_stracks.append(track)
# remaining tracked
# tracked
len_det = len(u_detection)
detections = [detections[i] for i in u_detection] + pred_dets
r_tracked_stracks = [tracked_stracks[i] for i in u_track]
dists = matching.iou_distance(r_tracked_stracks, detections)
matches, u_track, u_detection = matching.linear_assignment(dists, thresh=0.5)
for itracked, idet in matches:
r_tracked_stracks[itracked].update(detections[idet], self.frame_id, image, update_feature=True)
for it in u_track:
track = r_tracked_stracks[it]
track.mark_lost()
lost_stracks.append(track)
# unconfirmed
detections = [detections[i] for i in u_detection if i < len_det]
dists = matching.iou_distance(unconfirmed, detections)
matches, u_unconfirmed, u_detection = matching.linear_assignment(dists, thresh=0.7)
for itracked, idet in matches:
unconfirmed[itracked].update(detections[idet], self.frame_id, image, update_feature=True)
for it in u_unconfirmed:
track = unconfirmed[it]
track.mark_removed()
removed_stracks.append(track)
"""step 4: init new stracks"""
for inew in u_detection:
track = detections[inew]
if not track.from_det or track.score < 0.6:
continue
track.activate(self.kalman_filter, self.frame_id, image)
activated_starcks.append(track)
"""step 6: update state"""
for track in self.lost_stracks:
if self.frame_id - track.end_frame > self.max_time_lost:
track.mark_removed()
removed_stracks.append(track)
self.tracked_stracks = [t for t in self.tracked_stracks if t.state == TrackState.Tracked]
self.lost_stracks = [t for t in self.lost_stracks if t.state == TrackState.Lost] # type: list[STrack]
self.tracked_stracks.extend(activated_starcks)
self.tracked_stracks.extend(refind_stracks)
self.lost_stracks.extend(lost_stracks)
self.removed_stracks.extend(removed_stracks)
# output_stracks = self.tracked_stracks + self.lost_stracks
# get scores of lost tracks
output_tracked_stracks = [track for track in self.tracked_stracks if track.is_activated]
output_stracks = output_tracked_stracks
return output_stracks
@staticmethod
def _xyxy_to_tlwh_array(bbox_xyxy):
if isinstance(bbox_xyxy, np.ndarray):
bbox_tlwh = bbox_xyxy.copy()
elif isinstance(bbox_xyxy, torch.Tensor):
bbox_tlwh = bbox_xyxy.clone()
bbox_tlwh[:, 2] = bbox_xyxy[:, 2] - bbox_xyxy[:, 0]
bbox_tlwh[:, 3] = bbox_xyxy[:, 3] - bbox_xyxy[:, 1]
return bbox_tlwh