Spaces:
Sleeping
Sleeping
# vim: expandtab:ts=4:sw=4 | |
import cv2 | |
import numpy as np | |
from strong_sort.sort.kalman_filter import KalmanFilter | |
class TrackState: | |
""" | |
Enumeration type for the single target track state. Newly created tracks are | |
classified as `tentative` until enough evidence has been collected. Then, | |
the track state is changed to `confirmed`. Tracks that are no longer alive | |
are classified as `deleted` to mark them for removal from the set of active | |
tracks. | |
""" | |
Tentative = 1 | |
Confirmed = 2 | |
Deleted = 3 | |
class Track: | |
""" | |
A single target track with state space `(x, y, a, h)` and associated | |
velocities, where `(x, y)` is the center of the bounding box, `a` is the | |
aspect ratio and `h` is the height. | |
Parameters | |
---------- | |
mean : ndarray | |
Mean vector of the initial state distribution. | |
covariance : ndarray | |
Covariance matrix of the initial state distribution. | |
track_id : int | |
A unique track identifier. | |
n_init : int | |
Number of consecutive detections before the track is confirmed. The | |
track state is set to `Deleted` if a miss occurs within the first | |
`n_init` frames. | |
max_age : int | |
The maximum number of consecutive misses before the track state is | |
set to `Deleted`. | |
feature : Optional[ndarray] | |
Feature vector of the detection this track originates from. If not None, | |
this feature is added to the `features` cache. | |
Attributes | |
---------- | |
mean : ndarray | |
Mean vector of the initial state distribution. | |
covariance : ndarray | |
Covariance matrix of the initial state distribution. | |
track_id : int | |
A unique track identifier. | |
hits : int | |
Total number of measurement updates. | |
age : int | |
Total number of frames since first occurance. | |
time_since_update : int | |
Total number of frames since last measurement update. | |
state : TrackState | |
The current track state. | |
features : List[ndarray] | |
A cache of features. On each measurement update, the associated feature | |
vector is added to this list. | |
""" | |
def __init__(self, detection, track_id, class_id, conf, n_init, max_age, ema_alpha, | |
feature=None): | |
self.track_id = track_id | |
self.class_id = int(class_id) | |
self.hits = 1 | |
self.age = 1 | |
self.time_since_update = 0 | |
self.ema_alpha = ema_alpha | |
self.state = TrackState.Tentative | |
self.features = [] | |
if feature is not None: | |
feature /= np.linalg.norm(feature) | |
self.features.append(feature) | |
self.conf = conf | |
self._n_init = n_init | |
self._max_age = max_age | |
self.kf = KalmanFilter() | |
self.mean, self.covariance = self.kf.initiate(detection) | |
def to_tlwh(self): | |
"""Get current position in bounding box format `(top left x, top left y, | |
width, height)`. | |
Returns | |
------- | |
ndarray | |
The bounding box. | |
""" | |
ret = self.mean[:4].copy() | |
ret[2] *= ret[3] | |
ret[:2] -= ret[2:] / 2 | |
return ret | |
def to_tlbr(self): | |
"""Get kf estimated current position in bounding box format `(min x, miny, max x, | |
max y)`. | |
Returns | |
------- | |
ndarray | |
The predicted kf bounding box. | |
""" | |
ret = self.to_tlwh() | |
ret[2:] = ret[:2] + ret[2:] | |
return ret | |
def ECC(self, src, dst, warp_mode = cv2.MOTION_EUCLIDEAN, eps = 1e-5, | |
max_iter = 100, scale = 0.1, align = False): | |
"""Compute the warp matrix from src to dst. | |
Parameters | |
---------- | |
src : ndarray | |
An NxM matrix of source img(BGR or Gray), it must be the same format as dst. | |
dst : ndarray | |
An NxM matrix of target img(BGR or Gray). | |
warp_mode: flags of opencv | |
translation: cv2.MOTION_TRANSLATION | |
rotated and shifted: cv2.MOTION_EUCLIDEAN | |
affine(shift,rotated,shear): cv2.MOTION_AFFINE | |
homography(3d): cv2.MOTION_HOMOGRAPHY | |
eps: float | |
the threshold of the increment in the correlation coefficient between two iterations | |
max_iter: int | |
the number of iterations. | |
scale: float or [int, int] | |
scale_ratio: float | |
scale_size: [W, H] | |
align: bool | |
whether to warp affine or perspective transforms to the source image | |
Returns | |
------- | |
warp matrix : ndarray | |
Returns the warp matrix from src to dst. | |
if motion models is homography, the warp matrix will be 3x3, otherwise 2x3 | |
src_aligned: ndarray | |
aligned source image of gray | |
""" | |
# skip if current and previous frame are not initialized (1st inference) | |
if (src.any() or dst.any() is None): | |
return None, None | |
# skip if current and previous fames are not the same size | |
elif (src.shape != dst.shape): | |
return None, None | |
# BGR2GRAY | |
if src.ndim == 3: | |
# Convert images to grayscale | |
src = cv2.cvtColor(src, cv2.COLOR_BGR2GRAY) | |
dst = cv2.cvtColor(dst, cv2.COLOR_BGR2GRAY) | |
# make the imgs smaller to speed up | |
if scale is not None: | |
if isinstance(scale, float) or isinstance(scale, int): | |
if scale != 1: | |
src_r = cv2.resize(src, (0, 0), fx = scale, fy = scale,interpolation = cv2.INTER_LINEAR) | |
dst_r = cv2.resize(dst, (0, 0), fx = scale, fy = scale,interpolation = cv2.INTER_LINEAR) | |
scale = [scale, scale] | |
else: | |
src_r, dst_r = src, dst | |
scale = None | |
else: | |
if scale[0] != src.shape[1] and scale[1] != src.shape[0]: | |
src_r = cv2.resize(src, (scale[0], scale[1]), interpolation = cv2.INTER_LINEAR) | |
dst_r = cv2.resize(dst, (scale[0], scale[1]), interpolation=cv2.INTER_LINEAR) | |
scale = [scale[0] / src.shape[1], scale[1] / src.shape[0]] | |
else: | |
src_r, dst_r = src, dst | |
scale = None | |
else: | |
src_r, dst_r = src, dst | |
# Define 2x3 or 3x3 matrices and initialize the matrix to identity | |
if warp_mode == cv2.MOTION_HOMOGRAPHY : | |
warp_matrix = np.eye(3, 3, dtype=np.float32) | |
else : | |
warp_matrix = np.eye(2, 3, dtype=np.float32) | |
# Define termination criteria | |
criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, max_iter, eps) | |
# Run the ECC algorithm. The results are stored in warp_matrix. | |
try: | |
(cc, warp_matrix) = cv2.findTransformECC (src_r, dst_r, warp_matrix, warp_mode, criteria, None, 1) | |
except cv2.error as e: | |
return None, None | |
if scale is not None: | |
warp_matrix[0, 2] = warp_matrix[0, 2] / scale[0] | |
warp_matrix[1, 2] = warp_matrix[1, 2] / scale[1] | |
if align: | |
sz = src.shape | |
if warp_mode == cv2.MOTION_HOMOGRAPHY: | |
# Use warpPerspective for Homography | |
src_aligned = cv2.warpPerspective(src, warp_matrix, (sz[1],sz[0]), flags=cv2.INTER_LINEAR) | |
else : | |
# Use warpAffine for Translation, Euclidean and Affine | |
src_aligned = cv2.warpAffine(src, warp_matrix, (sz[1],sz[0]), flags=cv2.INTER_LINEAR) | |
return warp_matrix, src_aligned | |
else: | |
return warp_matrix, None | |
def get_matrix(self, matrix): | |
eye = np.eye(3) | |
dist = np.linalg.norm(eye - matrix) | |
if dist < 100: | |
return matrix | |
else: | |
return eye | |
def camera_update(self, previous_frame, next_frame): | |
warp_matrix, src_aligned = self.ECC(previous_frame, next_frame) | |
if warp_matrix is None and src_aligned is None: | |
return | |
[a,b] = warp_matrix | |
warp_matrix=np.array([a,b,[0,0,1]]) | |
warp_matrix = warp_matrix.tolist() | |
matrix = self.get_matrix(warp_matrix) | |
x1, y1, x2, y2 = self.to_tlbr() | |
x1_, y1_, _ = matrix @ np.array([x1, y1, 1]).T | |
x2_, y2_, _ = matrix @ np.array([x2, y2, 1]).T | |
w, h = x2_ - x1_, y2_ - y1_ | |
cx, cy = x1_ + w / 2, y1_ + h / 2 | |
self.mean[:4] = [cx, cy, w / h, h] | |
def increment_age(self): | |
self.age += 1 | |
self.time_since_update += 1 | |
def predict(self, kf): | |
"""Propagate the state distribution to the current time step using a | |
Kalman filter prediction step. | |
Parameters | |
---------- | |
kf : kalman_filter.KalmanFilter | |
The Kalman filter. | |
""" | |
self.mean, self.covariance = self.kf.predict(self.mean, self.covariance) | |
self.age += 1 | |
self.time_since_update += 1 | |
def update(self, detection, class_id, conf): | |
"""Perform Kalman filter measurement update step and update the feature | |
cache. | |
Parameters | |
---------- | |
detection : Detection | |
The associated detection. | |
""" | |
self.conf = conf | |
self.class_id = class_id.int() | |
self.mean, self.covariance = self.kf.update(self.mean, self.covariance, detection.to_xyah(), detection.confidence) | |
feature = detection.feature / np.linalg.norm(detection.feature) | |
smooth_feat = self.ema_alpha * self.features[-1] + (1 - self.ema_alpha) * feature | |
smooth_feat /= np.linalg.norm(smooth_feat) | |
self.features = [smooth_feat] | |
self.hits += 1 | |
self.time_since_update = 0 | |
if self.state == TrackState.Tentative and self.hits >= self._n_init: | |
self.state = TrackState.Confirmed | |
def mark_missed(self): | |
"""Mark this track as missed (no association at the current time step). | |
""" | |
if self.state == TrackState.Tentative: | |
self.state = TrackState.Deleted | |
elif self.time_since_update > self._max_age: | |
self.state = TrackState.Deleted | |
def is_tentative(self): | |
"""Returns True if this track is tentative (unconfirmed). | |
""" | |
return self.state == TrackState.Tentative | |
def is_confirmed(self): | |
"""Returns True if this track is confirmed.""" | |
return self.state == TrackState.Confirmed | |
def is_deleted(self): | |
"""Returns True if this track is dead and should be deleted.""" | |
return self.state == TrackState.Deleted | |