from src.tracker.mrcnn.mrcnn_color import MRCNN | |
from src.tracker._tools_ import generate_detections as gdet | |
from src.tracker.deep_sort.tracker import Tracker | |
from src.tracker.deep_sort.detection import Detection | |
from src.tracker.application_util import preprocessing | |
from src.tracker.deep_sort import nn_matching | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import cv2 | |
from PIL import Image | |
import tensorflow as tf | |
import time | |
import ffmpeg | |
import os | |
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' | |
physical_devices = tf.config.experimental.list_physical_devices('GPU') | |
if len(physical_devices) > 0: | |
tf.config.experimental.set_memory_growth(physical_devices[0], True) | |
# deep sort imports | |
# deepsort | |
def check_rotation(path_video_file): | |
print(path_video_file) | |
meta_dict = ffmpeg.probe(path_video_file) | |
try: | |
if int(meta_dict['streams'][0]['tags']['rotate']) == 90: | |
return cv2.ROTATE_90_CLOCKWISE | |
elif int(meta_dict['streams'][0]['tags']['rotate']) == 180: | |
return cv2.ROTATE_180 | |
elif int(meta_dict['streams'][0]['tags']['rotate']) == 270: | |
except: | |
return None | |
def correct_rotation(frame, rotateCode): | |
return cv2.rotate(frame, rotateCode) | |
class SignboardTracker(): | |
def __init__(self, | |
detector_checkpoint: str = "./checkpoints/ss/ss.ckpt", | |
input_size: int = 1024, | |
score: float = 0.7, | |
size: int = 1024, | |
video: str = "", | |
output: str = "", | |
output_format: str = "", | |
dont_show: bool = True, | |
info: bool = True, | |
count: bool = True, | |
max_cosine_distance: float = 0.4, | |
nn_budget: None = None, | |
nms_max_overlap: float = 1.0, | |
tracker_checkpoint: str = "./checkpoints/tracker/signboard_2793.pb" | |
) -> None: | |
self.detector_checkpoint = detector_checkpoint | |
self.input_size = input_size | |
self.score = score | |
self.size = size | | = video | |
self.output = output | |
self.output_format = output_format | |
self.dont_show = dont_show | | = info | |
self.count = count | |
self.max_cosine_distance = max_cosine_distance | |
self.nn_budget = nn_budget | |
self.nms_max_overlap = nms_max_overlap | |
self.tracker_checkpoint = tracker_checkpoint | |
self.load_tracker() | |
self.load_detector() | |
def load_tracker(self): | |
self.encoder = gdet.create_box_encoder(self.tracker_checkpoint, batch_size=1) | |
metric = nn_matching.NearestNeighborDistanceMetric("cosine", self.max_cosine_distance, self.nn_budget) | |
self.tracker = Tracker(metric) | |
def load_detector(self): | |
self.mrcnn = MRCNN(self.detector_checkpoint, self.input_size, self.score) | |
def inference_signboard(self, fps_target, video_path, output, output_format, output_frames): | |
results = {} | |
results_ = {} | |
rotateCode = check_rotation(video_path) | |
try: | |
vid = cv2.VideoCapture(int(video_path)) | |
except: | |
vid = cv2.VideoCapture(video_path) | |
out = None | |
# get video ready to save locally if flag is set | |
if output: | |
# by default VideoCapture returns float instead of int | |
width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
fps = int(vid.get(cv2.CAP_PROP_FPS)) | |
codec = cv2.VideoWriter_fourcc(*output_format) | |
out = cv2.VideoWriter(output, codec, fps_target, (width, height)) | |
tg = (fps - (fps%fps_target)) / fps_target | |
frame_num = 0 | |
# while video is running | |
while True: | |
return_value, frame = | |
# original = frame | |
if return_value: | |
frame_num += 1 | |
if rotateCode is not None: | |
frame = correct_rotation(frame, rotateCode) | |
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
image = Image.fromarray(image) | |
else: | |
break | |
if (frame_num%tg)==0: | |
if str(frame_num) not in results: | |
results[str(frame_num)] = [] | |
start_time = time.time() | |
boxes, scores, class_names, class_ids, class_color = self.mrcnn.detect_result_(image, min_score=0.4) | |
count = len(class_names) | |
# encode yolo detections and feed to tracker | |
features = self.encoder(frame, boxes) | |
detections = [Detection(box, score, class_name, feature) for box, score, class_name, feature in zip(boxes, scores, class_names, features)] | |
# initialize color map | |
cmap = plt.get_cmap('tab20b') | |
colors = [cmap(i)[:3] for i in np.linspace(0, 1, 20)] | |
# run non-maxima supression | |
boxs = np.array([d.tlwh for d in detections]) | |
scores = np.array([d.confidence for d in detections]) | |
classes = np.array([d.class_name for d in detections]) | |
indices = preprocessing.non_max_suppression(boxs, classes, self.nms_max_overlap, scores) | |
detections = [detections[i] for i in indices] | |
# Call the tracker | |
self.tracker.predict() | |
self.tracker.update(detections) | |
# update tracks | |
# with open(f"{output_frames}/{frame_num}.txt", "a+", encoding="utf-8") as ff: | |
for track in self.tracker.tracks: | |
if not track.is_confirmed() or track.time_since_update > 1: | |
continue | |
bbox = track.to_tlbr() | |
class_name = track.get_class() | |
# crop to ids folder | |
ids_path = f"{output_frames}" | |
# print(ids_path) | |
if not os.path.isdir(ids_path): | |
os.makedirs(ids_path) | |
crop_ids = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])] | |
# num_ids = 0 | |
# if os.path.isfile(os.path.join(ids_path, str(track.track_id) + "_" + str(frame_num)+".png")): | |
# num_ids += 1 | |
final_ids_path = os.path.join(ids_path, str(track.track_id) + "_" + str(frame_num)+".png") | |
try: | |
cv2.imwrite(final_ids_path, crop_ids) | |
except Exception as e: | |
print(e) | |
# draw bbox on screen | |
color = colors[int(track.track_id) % len(colors)] | |
color = [i * 255 for i in color] | |
cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), color, 2) | |
cv2.rectangle(frame, (int(bbox[0]), int(bbox[1]-30)), (int(bbox[0])+(len(class_name)+len(str(track.track_id)))*17, int(bbox[1])), color, -1) | |
cv2.putText(frame, class_name + "-" + str(track.track_id), (int(bbox[0]), int(bbox[1]-10)), 0, 0.75, (255, 255, 255), 2) | |
# if enable info flag then print details about each track | |
results[str(frame_num)].append({ | |
"id": track.track_id, | |
"class": class_name, | |
"box": [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])] | |
}) | |
if str(track.track_id) not in results_: | |
results_[str(track.track_id)] = [] | |
results_[str(track.track_id)].append({ | |
"frame": frame_num, | |
"class": class_name, | |
"box": [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])] | |
}) | |
# ff.close() | |
# calculate frames per second of running detections | |
# fps = 1.0 / (time.time() - start_time) | |
result = frame | |
# if output flag is set, save video file | |
if output: | |
cv2.imwrite(f"{output}/{frame_num}.jpg", result) | |
out.write(result) | |
if cv2.waitKey(1) & 0xFF == ord('q'): | |
break | |
cv2.destroyAllWindows() | |
return [results, results_] |