|
import argparse |
|
import json |
|
import os, os.path as osp |
|
import sys |
|
from pathlib import Path |
|
|
|
import numpy as np |
|
import torch |
|
from tqdm import tqdm |
|
|
|
FILE = Path(__file__).absolute() |
|
sys.path.append(FILE.parents[0].as_posix()) |
|
|
|
from models.experimental import attempt_load |
|
from utils.datasets import create_dataloader |
|
from utils.augmentations import letterbox |
|
from utils.general import check_dataset, check_file, check_img_size, \ |
|
non_max_suppression_kp, scale_coords, set_logging, colorstr |
|
from utils.torch_utils import select_device, time_sync |
|
import tempfile |
|
import cv2 |
|
|
|
PAD_COLOR = (114 / 255, 114 / 255, 114 / 255) |
|
|
|
|
|
def run_nms(data, model_out): |
|
if data['iou_thres'] == data['iou_thres_kp'] and data['conf_thres_kp'] >= data['conf_thres']: |
|
|
|
dets = non_max_suppression_kp(model_out, data['conf_thres'], data['iou_thres'], num_coords=data['num_coords']) |
|
person_dets = [d[d[:, 5] == 0] for d in dets] |
|
kp_dets = [d[d[:, 4] >= data['conf_thres_kp']] for d in dets] |
|
kp_dets = [d[d[:, 5] > 0] for d in kp_dets] |
|
else: |
|
person_dets = non_max_suppression_kp(model_out, data['conf_thres'], data['iou_thres'], |
|
classes=[0], |
|
num_coords=data['num_coords']) |
|
|
|
kp_dets = non_max_suppression_kp(model_out, data['conf_thres_kp'], data['iou_thres_kp'], |
|
classes=list(range(1, 1 + len(data['kp_flip']))), |
|
num_coords=data['num_coords']) |
|
return person_dets, kp_dets |
|
|
|
|
|
def post_process_batch(data, imgs, paths, shapes, person_dets, kp_dets, |
|
two_stage=False, pad=0, device='cpu', model=None, origins=None): |
|
|
|
batch_bboxes, batch_poses, batch_scores, batch_ids = [], [], [], [] |
|
n_fused = np.zeros(data['num_coords'] // 2) |
|
|
|
if origins is None: |
|
origins = [np.array([0, 0, 0]) for _ in range(len(person_dets))] |
|
|
|
|
|
for si, (pd, kpd, origin) in enumerate(zip(person_dets, kp_dets, origins)): |
|
nd = pd.shape[0] |
|
nkp = kpd.shape[0] |
|
|
|
if nd: |
|
path, shape = Path(paths[si]) if len(paths) else '', shapes[si][0] |
|
img_id = int(osp.splitext(osp.split(path)[-1])[0]) if path else si |
|
|
|
|
|
if two_stage: |
|
gs = max(int(model.stride.max()), 32) |
|
crops, origins, crop_shapes = [], [], [] |
|
|
|
for bbox in pd[:, :4].cpu().numpy(): |
|
x1, y1, x2, y2 = map(int, map(round, bbox)) |
|
x1, x2 = max(x1, 0), min(x2, data['imgsz']) |
|
y1, y2 = max(y1, 0), min(y2, data['imgsz']) |
|
h0, w0 = y2 - y1, x2 - x1 |
|
crop_shapes.append([(h0, w0)]) |
|
crop = np.transpose(imgs[si][:, y1:y2, x1:x2].cpu().numpy(), (1, 2, 0)) |
|
crop = cv2.copyMakeBorder(crop, pad, pad, pad, pad, cv2.BORDER_CONSTANT, value=PAD_COLOR) |
|
h0 += 2 * pad |
|
w0 += 2 * pad |
|
origins = [np.array([x1 - pad, y1 - pad, 0])] |
|
crop_pre = letterbox(crop, data['imgsz'], color=PAD_COLOR, stride=gs, auto=False)[0] |
|
crop_input = torch.Tensor(np.transpose(np.expand_dims(crop_pre, axis=0), (0, 3, 1, 2))).to(device) |
|
|
|
out = model(crop_input, augment=True, kp_flip=data['kp_flip'], scales=data['scales'], flips=data['flips'])[0] |
|
person_dets, kp_dets = run_nms(data, out) |
|
_, poses, scores, img_ids, _ = post_process_batch( |
|
data, crop_input, paths, [[(h0, w0)]], person_dets, kp_dets, device=device, origins=origins) |
|
|
|
|
|
if len(poses): |
|
poses = np.stack(poses, axis=0) |
|
poses = poses[:, :, :2].reshape(poses.shape[0], -1) |
|
poses = scale_coords(imgs[si].shape[1:], poses, shape) |
|
poses = poses.reshape(poses.shape[0], data['num_coords'] // 2, 2) |
|
poses = np.concatenate((poses, np.zeros((poses.shape[0], data['num_coords'] // 2, 1))), axis=-1) |
|
poses = [p for p in poses] |
|
|
|
|
|
else: |
|
scores = pd[:, 4].cpu().numpy() |
|
bboxes = scale_coords(imgs[si].shape[1:], pd[:, :4], shape).round().cpu().numpy() |
|
poses = scale_coords(imgs[si].shape[1:], pd[:, -data['num_coords']:], shape).cpu().numpy() |
|
poses = poses.reshape((nd, -data['num_coords'], 2)) |
|
poses = np.concatenate((poses, np.zeros((nd, poses.shape[1], 1))), axis=-1) |
|
|
|
if data['use_kp_dets'] and nkp: |
|
mask = scores > data['conf_thres_kp_person'] |
|
poses_mask = poses[mask] |
|
|
|
if len(poses_mask): |
|
kpd[:, :4] = scale_coords(imgs[si].shape[1:], kpd[:, :4], shape) |
|
kpd = kpd[:, :6].cpu() |
|
|
|
for x1, y1, x2, y2, conf, cls in kpd: |
|
x, y = np.mean((x1, x2)), np.mean((y1, y2)) |
|
pose_kps = poses_mask[:, int(cls - 1)] |
|
dist = np.linalg.norm(pose_kps[:, :2] - np.array([[x, y]]), axis=-1) |
|
kp_match = np.argmin(dist) |
|
if conf > pose_kps[kp_match, 2] and dist[kp_match] < data['overwrite_tol']: |
|
pose_kps[kp_match] = [x, y, conf] |
|
n_fused[int(cls - 1)] += 1 |
|
poses[mask] = poses_mask |
|
|
|
poses = [p + origin for p in poses] |
|
|
|
batch_bboxes.extend(bboxes) |
|
batch_poses.extend(poses) |
|
batch_scores.extend(scores) |
|
batch_ids.extend([img_id] * len(scores)) |
|
|
|
return batch_bboxes, batch_poses, batch_scores, batch_ids, n_fused |
|
|
|
|
|
@torch.no_grad() |
|
def run(data, |
|
weights=None, |
|
batch_size=16, |
|
imgsz=1280, |
|
task='val', |
|
device='', |
|
conf_thres=0.001, |
|
iou_thres=0.65, |
|
no_kp_dets=False, |
|
conf_thres_kp=0.2, |
|
iou_thres_kp=0.25, |
|
conf_thres_kp_person=0.3, |
|
overwrite_tol=50, |
|
scales=[1], |
|
flips=[None], |
|
rect=False, |
|
half=True, |
|
model=None, |
|
dataloader=None, |
|
compute_loss=None, |
|
two_stage=False, |
|
pad=0 |
|
): |
|
|
|
if two_stage: |
|
assert batch_size == 1, 'Batch size must be set to 1 for two-stage processing' |
|
assert conf_thres >= 0.01, 'Confidence threshold must be >= 0.01 for two-stage processing' |
|
assert not rect, 'Cannot use rectangular inference with two-stage processing' |
|
assert not half, 'Two-stage processing must use full precision' |
|
|
|
use_kp_dets = not no_kp_dets |
|
|
|
|
|
training = model is not None |
|
if training: |
|
device = next(model.parameters()).device |
|
else: |
|
device = select_device(device, batch_size=batch_size) |
|
|
|
|
|
model = attempt_load(weights, map_location=device) |
|
gs = max(int(model.stride.max()), 32) |
|
imgsz = check_img_size(imgsz, s=gs) |
|
|
|
|
|
data = check_dataset(data) |
|
|
|
|
|
data['imgsz'] = imgsz |
|
data['conf_thres'] = conf_thres |
|
data['iou_thres'] = iou_thres |
|
data['use_kp_dets'] = use_kp_dets |
|
data['conf_thres_kp'] = conf_thres_kp |
|
data['iou_thres_kp'] = iou_thres_kp |
|
data['conf_thres_kp_person'] = conf_thres_kp_person |
|
data['overwrite_tol'] = overwrite_tol |
|
data['scales'] = scales |
|
data['flips'] = flips |
|
|
|
is_coco = 'coco' in data['path'] |
|
if is_coco: |
|
from pycocotools.coco import COCO |
|
from pycocotools.cocoeval import COCOeval |
|
else: |
|
from crowdposetools.coco import COCO |
|
from crowdposetools.cocoeval import COCOeval |
|
|
|
|
|
half &= device.type != 'cpu' |
|
if half: |
|
model.half() |
|
|
|
model.eval() |
|
nc = int(data['nc']) |
|
|
|
|
|
if not training: |
|
if device.type != 'cpu': |
|
model(torch.zeros(1, 3, imgsz, imgsz).to(device).type_as(next(model.parameters()))) |
|
task = task if task in ('train', 'val', 'test') else 'val' |
|
dataloader = create_dataloader(data[task], data['labels'], imgsz, batch_size, gs, rect=rect, |
|
prefix=colorstr(f'{task}: '), kp_flip=data['kp_flip'])[0] |
|
|
|
seen = 0 |
|
mp, mr, map50, mAP, t0, t1, t2 = 0., 0., 0., 0., 0., 0., 0. |
|
loss = torch.zeros(4, device=device) |
|
json_dump = [] |
|
n_fused = np.zeros(data['num_coords'] // 2) |
|
|
|
for batch_i, (imgs, targets, paths, shapes) in enumerate(tqdm(dataloader, desc='Processing {} images'.format(task))): |
|
t_ = time_sync() |
|
imgs = imgs.to(device, non_blocking=True) |
|
imgs = imgs.half() if half else imgs.float() |
|
imgs /= 255.0 |
|
targets = targets.to(device) |
|
nb, _, height, width = imgs.shape |
|
t = time_sync() |
|
t0 += t - t_ |
|
|
|
|
|
out, train_out = model(imgs, augment=True, kp_flip=data['kp_flip'], scales=data['scales'], flips=data['flips']) |
|
t1 += time_sync() - t |
|
|
|
|
|
if train_out: |
|
if compute_loss: |
|
loss += compute_loss([x.float() for x in train_out], targets)[1] |
|
|
|
t = time_sync() |
|
|
|
|
|
person_dets, kp_dets = run_nms(data, out) |
|
|
|
|
|
_, poses, scores, img_ids, n_fused_batch = post_process_batch( |
|
data, imgs, paths, shapes, person_dets, kp_dets, two_stage, pad, device, model) |
|
|
|
t2 += time_sync() - t |
|
seen += len(imgs) |
|
n_fused += n_fused_batch |
|
|
|
for i, (pose, score, img_id) in enumerate(zip(poses, scores, img_ids)): |
|
json_dump.append({ |
|
'image_id': img_id, |
|
'category_id': 1, |
|
'keypoints': pose.reshape(-1).tolist(), |
|
'score': float(score) |
|
}) |
|
|
|
if not training: |
|
save_dir, weights_name = osp.split(weights) |
|
json_name = '{}_{}_c{}_i{}_ck{}_ik{}_ckp{}_t{}.json'.format( |
|
task, osp.splitext(weights_name)[0], |
|
conf_thres, iou_thres, conf_thres_kp, iou_thres_kp, |
|
conf_thres_kp_person, overwrite_tol |
|
) |
|
json_path = osp.join(save_dir, json_name) |
|
else: |
|
tmp = tempfile.NamedTemporaryFile(mode='w+b') |
|
json_path = tmp.name |
|
|
|
with open(json_path, 'w') as f: |
|
json.dump(json_dump, f) |
|
|
|
if task in ('train', 'val'): |
|
annot = osp.join(data['path'], data['{}_annotations'.format(task)]) |
|
coco = COCO(annot) |
|
result = coco.loadRes(json_path) |
|
eval = COCOeval(coco, result, iouType='keypoints') |
|
eval.evaluate() |
|
eval.accumulate() |
|
eval.summarize() |
|
mAP, map50 = eval.stats[:2] |
|
|
|
if training: |
|
tmp.close() |
|
|
|
|
|
t = tuple(x / seen * 1E3 for x in (t0, t1, t2)) |
|
if not training and task != 'test': |
|
os.rename(json_path, osp.splitext(json_path)[0] + '_ap{:.4f}.json'.format(mAP)) |
|
shape = (batch_size, 3, imgsz, imgsz) |
|
print(f'Speed: %.3fms pre-process, %.3fms inference, %.3fms NMS per image at shape {shape}' % t) |
|
print('Keypoint Objects Fused:', n_fused) |
|
model.float() |
|
return (mp, mr, map50, mAP, *(loss.cpu() / len(dataloader)).tolist()), np.zeros(nc), t |
|
|
|
|
|
def parse_opt(): |
|
parser = argparse.ArgumentParser(prog='val.py') |
|
parser.add_argument('--data', type=str, default='data/coco-kp.yaml', help='dataset.yaml path') |
|
parser.add_argument('--weights', default='kapao_s_coco.pt') |
|
parser.add_argument('--batch-size', type=int, default=1, help='batch size') |
|
parser.add_argument('--imgsz', type=int, default=1280, help='inference size (pixels)') |
|
parser.add_argument('--task', default='val', help='train, val, test') |
|
parser.add_argument('--device', default='', help='cuda device, i.e. 0 or 0,1,2,3 or cpu') |
|
parser.add_argument('--conf-thres', type=float, default=0.001, help='confidence threshold') |
|
parser.add_argument('--iou-thres', type=float, default=0.65, help='NMS IoU threshold') |
|
parser.add_argument('--no-kp-dets', action='store_true', help='do not use keypoint objects') |
|
parser.add_argument('--conf-thres-kp', type=float, default=0.2) |
|
parser.add_argument('--conf-thres-kp-person', type=float, default=0.3) |
|
parser.add_argument('--iou-thres-kp', type=float, default=0.25) |
|
parser.add_argument('--overwrite-tol', type=int, default=50) |
|
parser.add_argument('--scales', type=float, nargs='+', default=[1]) |
|
parser.add_argument('--flips', type=int, nargs='+', default=[-1]) |
|
parser.add_argument('--rect', action='store_true', help='rectangular input image') |
|
parser.add_argument('--half', action='store_true', help='use FP16 half-precision inference') |
|
parser.add_argument('--two-stage', action='store_true', help='use two-stage inference (experimental)') |
|
parser.add_argument('--pad', type=int, default=0, help='padding for two-stage inference') |
|
opt = parser.parse_args() |
|
opt.flips = [None if f == -1 else f for f in opt.flips] |
|
opt.data = check_file(opt.data) |
|
return opt |
|
|
|
|
|
def main(opt): |
|
set_logging() |
|
print(colorstr('val: ') + ', '.join(f'{k}={v}' for k, v in vars(opt).items())) |
|
if opt.task in ('train', 'val', 'test'): |
|
run(**vars(opt)) |
|
|
|
|
|
if __name__ == "__main__": |
|
opt = parse_opt() |
|
main(opt) |
|
|