|
import argparse |
|
import datetime |
|
import random |
|
import time |
|
from pathlib import Path |
|
from tqdm import tqdm |
|
|
|
import torch |
|
import torchvision.transforms as standard_transforms |
|
import numpy as np |
|
|
|
from PIL import Image |
|
import cv2 |
|
from crowd_datasets import build_dataset |
|
from engine import * |
|
from models import build_model |
|
import os |
|
import warnings |
|
import requests |
|
from os.path import exists |
|
from models.vgg_ import model_paths, model_urls |
|
warnings.filterwarnings('ignore') |
|
|
|
def get_args_parser(): |
|
parser = argparse.ArgumentParser('Set parameters for P2PNet evaluation', add_help=False) |
|
|
|
|
|
parser.add_argument('--backbone', default='vgg16_bn', type=str, |
|
help="name of the convolutional backbone to use") |
|
|
|
parser.add_argument('--input_video', default='../Video-tests/test1.mp4', type=str, |
|
help="address of input video file") |
|
|
|
parser.add_argument('--row', default=2, type=int, |
|
help="row number of anchor points") |
|
parser.add_argument('--line', default=2, type=int, |
|
help="line number of anchor points") |
|
|
|
parser.add_argument('--output_dir', default='./logs/', |
|
help='path where to save') |
|
parser.add_argument('--weight_path', default='./SHTechA.pth', |
|
help='path where the trained weights saved') |
|
|
|
parser.add_argument('--gpu_id', default=0, type=int, help='the gpu used for evaluation') |
|
|
|
return parser |
|
|
|
def load_model(): |
|
|
|
if (not exists(model_paths['vgg16_bn'])): |
|
print('Downloading VGG!') |
|
url = model_urls['vgg16_bn'] |
|
r = requests.get(url, allow_redirects=True) |
|
with open(model_paths['vgg16_bn'], 'wb') as f: |
|
f.write(r.content) |
|
print('Finished Downloading') |
|
parser = argparse.ArgumentParser('P2PNet evaluation script', parents=[get_args_parser()]) |
|
args = parser.parse_args() |
|
|
|
|
|
|
|
device = torch.device('cpu') |
|
|
|
model = build_model(args) |
|
|
|
model.to(device) |
|
|
|
if args.weight_path is not None: |
|
checkpoint = torch.load(args.weight_path, map_location='cpu') |
|
model.load_state_dict(checkpoint['model']) |
|
|
|
model.eval() |
|
|
|
transform = standard_transforms.Compose([ |
|
standard_transforms.ToTensor(), |
|
standard_transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), |
|
]) |
|
return model, transform, device |
|
|
|
def image_inference(model, transform, device, img_file, raw_img_input=False, return_img=False): |
|
if raw_img_input: |
|
img_raw = img_file |
|
else: |
|
|
|
img_path = img_file |
|
|
|
img_raw = Image.open(img_path).convert('RGB') |
|
|
|
width, height = img_raw.size |
|
new_width = width // 128 * 128 |
|
new_height = height // 128 * 128 |
|
img_raw = img_raw.resize((new_width, new_height), Image.ANTIALIAS) |
|
|
|
img = transform(img_raw) |
|
|
|
samples = torch.Tensor(img).unsqueeze(0) |
|
samples = samples.to(device) |
|
|
|
outputs = model(samples) |
|
outputs_scores = torch.nn.functional.softmax(outputs['pred_logits'], -1)[:, :, 1][0] |
|
|
|
outputs_points = outputs['pred_points'][0] |
|
|
|
threshold = 0.5 |
|
|
|
points = outputs_points[outputs_scores > threshold].detach().cpu().numpy().tolist() |
|
predict_cnt = int((outputs_scores > threshold).sum()) |
|
|
|
outputs_scores = torch.nn.functional.softmax(outputs['pred_logits'], -1)[:, :, 1][0] |
|
|
|
outputs_points = outputs['pred_points'][0] |
|
|
|
size = 2 |
|
img_to_draw = cv2.cvtColor(np.array(img_raw), cv2.COLOR_RGB2BGR) |
|
for p in points: |
|
img_to_draw = cv2.circle(img_to_draw, (int(p[0]), int(p[1])), size, (0, 0, 255), -1) |
|
|
|
if return_img: |
|
return img_to_draw, predict_cnt |
|
else: |
|
cv2.imwrite(img_file, img_to_draw) |
|
return predict_cnt |
|
|
|
|
|
def video_reader(videoFile): |
|
cap = cv2.VideoCapture(videoFile) |
|
while(cap.isOpened()): |
|
ret,cv2_im = cap.read() |
|
if ret: |
|
converted = cv2.cvtColor(cv2_im,cv2.COLOR_BGR2RGB) |
|
pil_im = Image.fromarray(converted) |
|
yield pil_im |
|
|
|
elif not ret: |
|
break |
|
cap.release() |
|
|
|
|
|
def video_inference(model, transform, device, video_file): |
|
result = [] |
|
for frame in tqdm(video_reader(video_file)): |
|
img_raw = frame |
|
|
|
width, height = img_raw.size |
|
new_width = width // 128 * 128 |
|
new_height = height // 128 * 128 |
|
img_raw = img_raw.resize((new_width, new_height), Image.ANTIALIAS) |
|
frames_size = (new_width, new_height) |
|
|
|
img = transform(img_raw) |
|
|
|
samples = torch.Tensor(img).unsqueeze(0) |
|
samples = samples.to(device) |
|
|
|
outputs = model(samples) |
|
outputs_scores = torch.nn.functional.softmax(outputs['pred_logits'], -1)[:, :, 1][0] |
|
|
|
outputs_points = outputs['pred_points'][0] |
|
|
|
threshold = 0.5 |
|
|
|
points = outputs_points[outputs_scores > threshold].detach().cpu().numpy().tolist() |
|
predict_cnt = int((outputs_scores > threshold).sum()) |
|
|
|
outputs_scores = torch.nn.functional.softmax(outputs['pred_logits'], -1)[:, :, 1][0] |
|
|
|
outputs_points = outputs['pred_points'][0] |
|
|
|
size = 10 |
|
img_to_draw = cv2.cvtColor(np.array(img_raw), cv2.COLOR_RGB2BGR) |
|
for p in points: |
|
img_to_draw = cv2.circle(img_to_draw, (int(p[0]), int(p[1])), size, (0, 0, 255), -1) |
|
|
|
|
|
|
|
if result: |
|
result.write(img_to_draw) |
|
break |
|
else: |
|
result = cv2.VideoWriter(f'{video_file}.avi', |
|
cv2.VideoWriter_fourcc(*'MJPG'), |
|
10, frames_size) |
|
result.write(img_to_draw) |
|
result.release() |
|
return True |
|
|