workshop / LaSA /libs /postprocess.py
qiushuocheng's picture
Upload 173 files
41e3185
import numpy as np
import torch
from .metric import argrelmax
from .transformer import GaussianSmoothing
__all__ = ["PostProcessor"]
def decide_boundary_prob_with_similarity(x: torch.Tensor) -> torch.Tensor:
"""
Decide action boundary probabilities based on adjacent frame similarities.
Args:
x: frame-wise video features (N, C, T)
Return:
boundary: action boundary probability (N, 1, T)
"""
device = x.device
# gaussian kernel.
diff = x[0, :, 1:] - x[0, :, :-1]
similarity = torch.exp(-torch.norm(diff, dim=0) / (2 * 1.0))
# define action starting point as action boundary.
start = torch.ones(1).float().to(device)
boundary = torch.cat([start, similarity])
boundary = boundary.view(1, 1, -1)
return boundary
class PostProcessor(object):
def __init__(
self,
name: str,
boundary_th: int = 0.7,
theta_t: int = 15,
kernel_size: int = 15,
) -> None:
self.func = {
"refinement_with_boundary": self._refinement_with_boundary,
"relabeling": self._relabeling,
"smoothing": self._smoothing,
}
assert name in self.func
self.name = name #'refinement_with_boundary'
self.boundary_th = boundary_th #0.5
self.theta_t = theta_t #15
self.kernel_size = kernel_size #15
if name == "smoothing":
self.filter = GaussianSmoothing(self.kernel_size)
def _is_probability(self, x: np.ndarray) -> bool:
assert x.ndim == 3
if x.shape[1] == 1:
# sigmoid
if x.min() >= 0 and x.max() <= 1:
return True
else:
return False
else:
# softmax
_sum = np.sum(x, axis=1).astype(np.float32)
_ones = np.ones_like(_sum, dtype=np.float32)
return np.allclose(_sum, _ones)
def _convert2probability(self, x: np.ndarray) -> np.ndarray:
"""
Args: x (N, C, T)
"""
assert x.ndim == 3
if self._is_probability(x):
return x
else:
if x.shape[1] == 1:
# sigmoid
prob = 1 / (1 + np.exp(-x))
else:
# softmax
prob = np.exp(x) / np.sum(np.exp(x), axis=1)
return prob.astype(np.float32)
def _convert2label(self, x: np.ndarray) -> np.ndarray:
assert x.ndim == 2 or x.ndim == 3
if x.ndim == 2:
return x.astype(np.int64)
else:
if not self._is_probability(x):
x = self._convert2probability(x)
label = np.argmax(x, axis=1) #从onehot to index
return label.astype(np.int64)
def _refinement_with_boundary(
self,
outputs: np.array,
boundaries: np.ndarray,
masks: np.ndarray,
) -> np.ndarray:
"""
Get segments which is defined as the span b/w two boundaries,
and decide their classes by majority vote.
Args:
outputs: numpy array. shape (N, C, T)
the model output for frame-level class prediction.
boundaries: numpy array. shape (N, 1, T)
boundary prediction.
masks: np.array. np.bool. shape (N, 1, T)
valid length for each video
Return:
preds: np.array. shape (N, T)
final class prediction considering boundaries.
"""
preds = self._convert2label(outputs) #(1,6000)
boundaries = self._convert2probability(boundaries) #(1,1,6000)
for i, (output, pred, boundary, mask) in enumerate(
zip(outputs, preds, boundaries, masks)
):
boundary = boundary[mask]
idx = argrelmax(boundary, threshold=self.boundary_th)
# add the index of the last action ending
T = pred.shape[0]
idx.append(T)
# majority vote
for j in range(len(idx) - 1):
count = np.bincount(pred[idx[j] : idx[j + 1]])
modes = np.where(count == count.max())[0]
if len(modes) == 1:
mode = modes
else:
if outputs.ndim == 3:
# if more than one majority class exist
prob_sum_max = 0
for m in modes:
prob_sum = output[m, idx[j] : idx[j + 1]].sum()
if prob_sum_max < prob_sum:
mode = m
prob_sum_max = prob_sum
else:
# decide first mode when more than one majority class
# have the same number during oracle experiment
mode = modes[0]
preds[i, idx[j] : idx[j + 1]] = mode
return preds
def _relabeling(self, outputs: np.ndarray, **kwargs: np.ndarray) -> np.ndarray:
"""
Relabeling small action segments with their previous action segment
Args:
output: the results of action segmentation. (N, T) or (N, C, T)
theta_t: the threshold of the size of action segments.
Return:
relabeled output. (N, T)
"""
preds = self._convert2label(outputs)
for i in range(preds.shape[0]):
# shape (T,)
last = preds[i][0]
cnt = 1
for j in range(1, preds.shape[1]):
if last == preds[i][j]:
cnt += 1
else:
if cnt > self.theta_t:
cnt = 1
last = preds[i][j]
else:
preds[i][j - cnt : j] = preds[i][j - cnt - 1]
cnt = 1
last = preds[i][j]
if cnt <= self.theta_t:
preds[i][j - cnt : j] = preds[i][j - cnt - 1]
return preds
def _smoothing(self, outputs: np.ndarray, **kwargs: np.ndarray) -> np.ndarray:
"""
Smoothing action probabilities with gaussian filter.
Args:
outputs: frame-wise action probabilities. (N, C, T)
Return:
predictions: final prediction. (N, T)
"""
outputs = self._convert2probability(outputs)
outputs = self.filter(torch.Tensor(outputs)).numpy()
preds = self._convert2label(outputs)
return preds
def __call__(self, outputs, **kwargs: np.ndarray) -> np.ndarray:
preds = self.func[self.name](outputs, **kwargs)
return preds