import argparse
import json
import os, os.path as osp
import sys
from pathlib import Path
import numpy as np
import torch
from tqdm import tqdm
FILE = Path(__file__).absolute()
sys.path.append(FILE.parents[0].as_posix()) # add kapao/ to path
from models.experimental import attempt_load
from utils.datasets import create_dataloader
from utils.augmentations import letterbox
from utils.general import check_dataset, check_file, check_img_size, \
non_max_suppression_kp, scale_coords, set_logging, colorstr
from utils.torch_utils import select_device, time_sync
import tempfile
import cv2
PAD_COLOR = (114 / 255, 114 / 255, 114 / 255)
def run_nms(data, model_out):
if data['iou_thres'] == data['iou_thres_kp'] and data['conf_thres_kp'] >= data['conf_thres']:
# Combined NMS saves ~0.2 ms / image
dets = non_max_suppression_kp(model_out, data['conf_thres'], data['iou_thres'], num_coords=data['num_coords'])
person_dets = [d[d[:, 5] == 0] for d in dets]
kp_dets = [d[d[:, 4] >= data['conf_thres_kp']] for d in dets]
kp_dets = [d[d[:, 5] > 0] for d in kp_dets]
person_dets = non_max_suppression_kp(model_out, data['conf_thres'], data['iou_thres'],
kp_dets = non_max_suppression_kp(model_out, data['conf_thres_kp'], data['iou_thres_kp'],
classes=list(range(1, 1 + len(data['kp_flip']))),
return person_dets, kp_dets
def post_process_batch(data, imgs, paths, shapes, person_dets, kp_dets,
two_stage=False, pad=0, device='cpu', model=None, origins=None):
batch_bboxes, batch_poses, batch_scores, batch_ids = [], [], [], []
n_fused = np.zeros(data['num_coords'] // 2)
if origins is None: # used only for two-stage inference so set to 0 if None
origins = [np.array([0, 0, 0]) for _ in range(len(person_dets))]
# process each image in batch
for si, (pd, kpd, origin) in enumerate(zip(person_dets, kp_dets, origins)):
nd = pd.shape[0]
nkp = kpd.shape[0]
if nd:
path, shape = Path(paths[si]) if len(paths) else '', shapes[si][0]
img_id = int(osp.splitext(osp.split(path)[-1])[0]) if path else si
if two_stage:
gs = max(int(model.stride.max()), 32) # grid size (max stride)
crops, origins, crop_shapes = [], [], []
for bbox in pd[:, :4].cpu().numpy():
x1, y1, x2, y2 = map(int, map(round, bbox))
x1, x2 = max(x1, 0), min(x2, data['imgsz'])
y1, y2 = max(y1, 0), min(y2, data['imgsz'])
h0, w0 = y2 - y1, x2 - x1
crop_shapes.append([(h0, w0)])
crop = np.transpose(imgs[si][:, y1:y2, x1:x2].cpu().numpy(), (1, 2, 0))
crop = cv2.copyMakeBorder(crop, pad, pad, pad, pad, cv2.BORDER_CONSTANT, value=PAD_COLOR) # add padding
h0 += 2 * pad
w0 += 2 * pad
origins = [np.array([x1 - pad, y1 - pad, 0])]
crop_pre = letterbox(crop, data['imgsz'], color=PAD_COLOR, stride=gs, auto=False)[0]
crop_input = torch.Tensor(np.transpose(np.expand_dims(crop_pre, axis=0), (0, 3, 1, 2))).to(device)
out = model(crop_input, augment=True, kp_flip=data['kp_flip'], scales=data['scales'], flips=data['flips'])[0]
person_dets, kp_dets = run_nms(data, out)
_, poses, scores, img_ids, _ = post_process_batch(
data, crop_input, paths, [[(h0, w0)]], person_dets, kp_dets, device=device, origins=origins)
# map back to original image
if len(poses):
poses = np.stack(poses, axis=0)
poses = poses[:, :, :2].reshape(poses.shape[0], -1)
poses = scale_coords(imgs[si].shape[1:], poses, shape)
poses = poses.reshape(poses.shape[0], data['num_coords'] // 2, 2)
poses = np.concatenate((poses, np.zeros((poses.shape[0], data['num_coords'] // 2, 1))), axis=-1)
poses = [p for p in poses] # convert back to list
scores = pd[:, 4].cpu().numpy() # person detection score
bboxes = scale_coords(imgs[si].shape[1:], pd[:, :4], shape).round().cpu().numpy()
poses = scale_coords(imgs[si].shape[1:], pd[:, -data['num_coords']:], shape).cpu().numpy()
poses = poses.reshape((nd, -data['num_coords'], 2))
poses = np.concatenate((poses, np.zeros((nd, poses.shape[1], 1))), axis=-1)
if data['use_kp_dets'] and nkp:
mask = scores > data['conf_thres_kp_person']
poses_mask = poses[mask]
if len(poses_mask):
kpd[:, :4] = scale_coords(imgs[si].shape[1:], kpd[:, :4], shape)
kpd = kpd[:, :6].cpu()
for x1, y1, x2, y2, conf, cls in kpd:
x, y = np.mean((x1, x2)), np.mean((y1, y2))
pose_kps = poses_mask[:, int(cls - 1)]
dist = np.linalg.norm(pose_kps[:, :2] - np.array([[x, y]]), axis=-1)
kp_match = np.argmin(dist)
if conf > pose_kps[kp_match, 2] and dist[kp_match] < data['overwrite_tol']:
pose_kps[kp_match] = [x, y, conf]
n_fused[int(cls - 1)] += 1
poses[mask] = poses_mask
poses = [p + origin for p in poses]
batch_ids.extend([img_id] * len(scores))
return batch_bboxes, batch_poses, batch_scores, batch_ids, n_fused
def run(data,
weights=None, # model.pt path(s)
batch_size=16, # batch size
imgsz=1280, # inference size (pixels)
task='val', # train, val, test, speed or study
device='', # cuda device, i.e. 0 or 0,1,2,3 or cpu
conf_thres=0.001, # confidence threshold
iou_thres=0.65, # NMS IoU threshold
overwrite_tol=50, # pixels for kp det overwrite
half=True, # use FP16 half-precision inference
if two_stage:
assert batch_size == 1, 'Batch size must be set to 1 for two-stage processing'
assert conf_thres >= 0.01, 'Confidence threshold must be >= 0.01 for two-stage processing'
assert not rect, 'Cannot use rectangular inference with two-stage processing'
assert not half, 'Two-stage processing must use full precision'
use_kp_dets = not no_kp_dets
# Initialize/load model and set device
training = model is not None
if training: # called by train.py
device = next(model.parameters()).device # get model device
else: # called directly
device = select_device(device, batch_size=batch_size)
# Load model
model = attempt_load(weights, map_location=device) # load FP32 model
gs = max(int(model.stride.max()), 32) # grid size (max stride)
imgsz = check_img_size(imgsz, s=gs) # check image size
# Data
data = check_dataset(data) # check
# add inference settings to data dict
data['imgsz'] = imgsz
data['conf_thres'] = conf_thres
data['iou_thres'] = iou_thres
data['use_kp_dets'] = use_kp_dets
data['conf_thres_kp'] = conf_thres_kp
data['iou_thres_kp'] = iou_thres_kp
data['conf_thres_kp_person'] = conf_thres_kp_person
data['overwrite_tol'] = overwrite_tol
data['scales'] = scales
data['flips'] = flips
is_coco = 'coco' in data['path']
if is_coco:
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from crowdposetools.coco import COCO
from crowdposetools.cocoeval import COCOeval
# Half
half &= device.type != 'cpu' # half precision only supported on CUDA
if half:
nc = int(data['nc']) # number of classes
# Dataloader
if not training:
if device.type != 'cpu':
model(torch.zeros(1, 3, imgsz, imgsz).to(device).type_as(next(model.parameters()))) # run once
task = task if task in ('train', 'val', 'test') else 'val' # path to train/val/test images
dataloader = create_dataloader(data[task], data['labels'], imgsz, batch_size, gs, rect=rect,
prefix=colorstr(f'{task}: '), kp_flip=data['kp_flip'])[0]
seen = 0
mp, mr, map50, mAP, t0, t1, t2 = 0., 0., 0., 0., 0., 0., 0.
loss = torch.zeros(4, device=device)
json_dump = []
n_fused = np.zeros(data['num_coords'] // 2)
for batch_i, (imgs, targets, paths, shapes) in enumerate(tqdm(dataloader, desc='Processing {} images'.format(task))):
t_ = time_sync()
imgs = imgs.to(device, non_blocking=True)
imgs = imgs.half() if half else imgs.float() # uint8 to fp16/32
imgs /= 255.0 # 0 - 255 to 0.0 - 1.0
targets = targets.to(device)
nb, _, height, width = imgs.shape # batch size, channels, height, width
t = time_sync()
t0 += t - t_
# Run model
out, train_out = model(imgs, augment=True, kp_flip=data['kp_flip'], scales=data['scales'], flips=data['flips'])
t1 += time_sync() - t
# Compute loss
if train_out: # only computed if no scale / flipping
if compute_loss:
loss += compute_loss([x.float() for x in train_out], targets)[1] # box, obj, cls, kp
t = time_sync()
person_dets, kp_dets = run_nms(data, out)
# Fuse keypoint and pose detections
_, poses, scores, img_ids, n_fused_batch = post_process_batch(
data, imgs, paths, shapes, person_dets, kp_dets, two_stage, pad, device, model)
t2 += time_sync() - t
seen += len(imgs)
n_fused += n_fused_batch
for i, (pose, score, img_id) in enumerate(zip(poses, scores, img_ids)):
'image_id': img_id,
'category_id': 1,
'keypoints': pose.reshape(-1).tolist(),
'score': float(score) # person score
if not training: # save json
save_dir, weights_name = osp.split(weights)
json_name = '{}_{}_c{}_i{}_ck{}_ik{}_ckp{}_t{}.json'.format(
task, osp.splitext(weights_name)[0],
conf_thres, iou_thres, conf_thres_kp, iou_thres_kp,
conf_thres_kp_person, overwrite_tol
json_path = osp.join(save_dir, json_name)
tmp = tempfile.NamedTemporaryFile(mode='w+b')
json_path = tmp.name
with open(json_path, 'w') as f:
json.dump(json_dump, f)
if task in ('train', 'val'):
annot = osp.join(data['path'], data['{}_annotations'.format(task)])
coco = COCO(annot)
result = coco.loadRes(json_path)
eval = COCOeval(coco, result, iouType='keypoints')
mAP, map50 = eval.stats[:2]
if training:
# Print speeds
t = tuple(x / seen * 1E3 for x in (t0, t1, t2)) # speeds per image
if not training and task != 'test':
os.rename(json_path, osp.splitext(json_path)[0] + '_ap{:.4f}.json'.format(mAP))
shape = (batch_size, 3, imgsz, imgsz)
print(f'Speed: %.3fms pre-process, %.3fms inference, %.3fms NMS per image at shape {shape}' % t)
print('Keypoint Objects Fused:', n_fused)
model.float() # for training
return (mp, mr, map50, mAP, *(loss.cpu() / len(dataloader)).tolist()), np.zeros(nc), t # for compatibility with train
def parse_opt():
parser = argparse.ArgumentParser(prog='val.py')
parser.add_argument('--data', type=str, default='data/coco-kp.yaml', help='dataset.yaml path')
parser.add_argument('--weights', default='kapao_s_coco.pt')
parser.add_argument('--batch-size', type=int, default=1, help='batch size')
parser.add_argument('--imgsz', type=int, default=1280, help='inference size (pixels)')
parser.add_argument('--task', default='val', help='train, val, test')
parser.add_argument('--device', default='', help='cuda device, i.e. 0 or 0,1,2,3 or cpu')
parser.add_argument('--conf-thres', type=float, default=0.001, help='confidence threshold')
parser.add_argument('--iou-thres', type=float, default=0.65, help='NMS IoU threshold')
parser.add_argument('--no-kp-dets', action='store_true', help='do not use keypoint objects')
parser.add_argument('--conf-thres-kp', type=float, default=0.2)
parser.add_argument('--conf-thres-kp-person', type=float, default=0.3)
parser.add_argument('--iou-thres-kp', type=float, default=0.25)
parser.add_argument('--overwrite-tol', type=int, default=50)
parser.add_argument('--scales', type=float, nargs='+', default=[1])
parser.add_argument('--flips', type=int, nargs='+', default=[-1])
parser.add_argument('--rect', action='store_true', help='rectangular input image')
parser.add_argument('--half', action='store_true', help='use FP16 half-precision inference')
parser.add_argument('--two-stage', action='store_true', help='use two-stage inference (experimental)')
parser.add_argument('--pad', type=int, default=0, help='padding for two-stage inference')
opt = parser.parse_args()
opt.flips = [None if f == -1 else f for f in opt.flips]
opt.data = check_file(opt.data) # check file
return opt
def main(opt):
print(colorstr('val: ') + ', '.join(f'{k}={v}' for k, v in vars(opt).items()))
if opt.task in ('train', 'val', 'test'): # run normally
if __name__ == "__main__":
opt = parse_opt()