Juartaurus's picture
Upload folder using huggingface_hub
1865436
import argparse
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import time
import tensorflow as tf
physical_devices = tf.config.experimental.list_physical_devices('GPU')
if len(physical_devices) > 0:
tf.config.experimental.set_memory_growth(physical_devices[0], True)
from PIL import Image
import cv2
import numpy as np
import matplotlib.pyplot as plt
# deep sort imports
from deep_sort import nn_matching
from application_util import preprocessing
from deep_sort.detection import Detection
from deep_sort.tracker import Tracker
from _tools_ import generate_detections as gdet
# deepsort
from mrcnn.mrcnn_color import MRCNN
# ocr
# from sts.demo.sts import handle_sts
def _parse_args():
parser = argparse.ArgumentParser(description="")
parser.add_argument("--model",
help="detection model",
type=str,
default="./checkpoint/maskrcnn_signboard_ss.ckpt")
parser.add_argument("--input_size",
help="input size",
type=int,
default=1024)
parser.add_argument("--score",
help="score threshold",
type=float,
default=0.50)
parser.add_argument("--size",
help="resize images to",
type=int,
default=1024)
parser.add_argument("--video",
help="path to input video or set to 0 for webcam",
type=str,
default="./samples/demo.mp4")
parser.add_argument("--output",
help="path to output video",
type=str,
default="./outputs/demo.mp4")
parser.add_argument("--output_format",
help="codec used in VideoWriter when saving video to file",
type=str,
default='mp4v')
parser.add_argument("--dont_show",
help="dont show video output",
type=bool,
default=True)
parser.add_argument("--info",
help="show detailed info of tracked objects",
type=bool,
default=True)
parser.add_argument("--count",
help="count objects being tracked on screen",
type=bool,
default=True)
args = parser.parse_args()
return args
def handle(args):
# Definition of the parameters
max_cosine_distance = 0.4
nn_budget = None
nms_max_overlap = 1.0
# initialize deep sort
model_filename = 'checkpoint/signboard_2793.pb'
encoder = gdet.create_box_encoder(model_filename, batch_size=1)
# calculate cosine distance metric
metric = nn_matching.NearestNeighborDistanceMetric("cosine", max_cosine_distance, nn_budget)
# initialize tracker
tracker = Tracker(metric)
# initialize maskrcnn
mrcnn = MRCNN(args.model, args.input_size, args.score)
# load configuration for object detector
video_path = args.video
# begin video capture
try:
vid = cv2.VideoCapture(int(video_path))
except:
vid = cv2.VideoCapture(video_path)
out = None
# get video ready to save locally if flag is set
if args.output:
# by default VideoCapture returns float instead of int
width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(vid.get(cv2.CAP_PROP_FPS))
codec = cv2.VideoWriter_fourcc(*args.output_format)
out = cv2.VideoWriter(args.output, codec, fps, (width, height))
frame_num = 0
# while video is running
while True:
return_value, frame = vid.read()
if return_value:
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(image)
else:
print('Video has ended or failed, try a different video format!')
break
frame_num +=1
print('Frame #: ', frame_num)
start_time = time.time()
boxes, scores, class_names, class_ids, class_color = mrcnn.detect_result_(image, min_score=0.5)
count = len(class_names)
if args.count:
cv2.putText(frame, "Objects being tracked: {0}".format(count), (5, 35), cv2.FONT_HERSHEY_COMPLEX_SMALL, 2, (0, 255, 0), 2)
print("Objects being tracked: {0}".format(count))
# encode yolo detections and feed to tracker
features = encoder(frame, boxes)
detections = [Detection(box, score, class_name, feature) for box, score, class_name, feature in zip(boxes, scores, class_names, features)]
#initialize color map
cmap = plt.get_cmap('tab20b')
colors = [cmap(i)[:3] for i in np.linspace(0, 1, 20)]
# run non-maxima supression
boxs = np.array([d.tlwh for d in detections])
scores = np.array([d.confidence for d in detections])
classes = np.array([d.class_name for d in detections])
indices = preprocessing.non_max_suppression(boxs, classes, nms_max_overlap, scores)
detections = [detections[i] for i in indices]
# Call the tracker
tracker.predict()
tracker.update(detections)
# update tracks
with open("./outputs/{}.txt".format(frame_num), "a+", encoding="utf-8") as ff:
for track in tracker.tracks:
if not track.is_confirmed() or track.time_since_update > 1:
continue
bbox = track.to_tlbr()
# crop to ids folder
ids_path = "./ids/"+str(track.track_id)
if not os.path.isdir(ids_path):
os.mkdir(ids_path)
crop_ids = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]
num_ids = 0
while os.path.isfile(os.path.join(ids_path, str(track.track_id) + "_" + str(frame_num) + "_" + str(num_ids)+".png")):
num_ids += 1
final_ids_path = os.path.join(ids_path, str(track.track_id) + "_" + str(frame_num) + "_" + str(num_ids)+".png")
cv2.imwrite(final_ids_path, crop_ids)
for track in tracker.tracks:
if not track.is_confirmed() or track.time_since_update > 1:
continue
bbox = track.to_tlbr()
class_name = track.get_class()
# predict ocr
crop_ids = frame[int(bbox[1]):int(bbox[3]), int(bbox[0]):int(bbox[2])]
dict_box_sign_out, dict_rec_sign_out = [], [] # handle_sts(crop_ids)
# draw bbox on screen
color = colors[int(track.track_id) % len(colors)]
color = [i * 255 for i in color]
cv2.rectangle(frame, (int(bbox[0]), int(bbox[1])), (int(bbox[2]), int(bbox[3])), color, 2)
cv2.rectangle(frame, (int(bbox[0]), int(bbox[1]-30)), (int(bbox[0])+(len(class_name)+len(str(track.track_id)))*17, int(bbox[1])), color, -1)
cv2.putText(frame, class_name + "-" + str(track.track_id),(int(bbox[0]), int(bbox[1]-10)),0, 0.75, (255,255,255),2)
dict_rec_sign_out_join = "_".join(dict_rec_sign_out)
cv2.putText(frame, dict_rec_sign_out_join, (int(bbox[0]), int(bbox[1]+20)), 0, 0.75, (255, 255, 255), 2)
# if enable info flag then print details about each track
if args.info:
print("Tracker ID: {}, Class: {}, BBox Coords (xmin, ymin, xmax, ymax): {}".format(str(track.track_id), class_name, (int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]))))
ff.write("{}, {}, {}, {}, {}, {}\n".format(str(track.track_id), int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]), dict_rec_sign_out_join))
ff.close()
# calculate frames per second of running detections
fps = 1.0 / (time.time() - start_time)
print("FPS: %.2f" % fps)
result = frame
if not args.dont_show:
cv2.imshow("Output Video", result)
# if output flag is set, save video file
if args.output:
cv2.imwrite("./outputs/{0}.jpg".format(frame_num), result)
out.write(result)
if cv2.waitKey(1) & 0xFF == ord('q'): break
cv2.destroyAllWindows()
def main():
args = _parse_args()
handle(args)
if __name__ == '__main__':
main()