# Mikel Broström 🔥 Yolo Tracking 🧾 AGPL-3.0 license import argparse import subprocess from pathlib import Path import numpy as np from tqdm import tqdm import configparser import shutil import json import queue import select import re import os import torch from functools import partial import threading import sys import copy import concurrent.futures from boxmot import TRACKERS from boxmot.tracker_zoo import create_tracker from boxmot.utils import ROOT, WEIGHTS, TRACKER_CONFIGS, logger as LOGGER, EXAMPLES, DATA from boxmot.utils.checks import RequirementsChecker from boxmot.utils.torch_utils import select_device from boxmot.utils.misc import increment_path from boxmot.postprocessing.gsi import gsi from ultralytics import YOLO from ultralytics.data.loaders import LoadImagesAndVideos from tracking.detectors import (get_yolo_inferer, default_imgsz, is_ultralytics_model, is_yolox_model) from tracking.utils import convert_to_mot_format, write_mot_results, download_mot_eval_tools, download_mot_dataset, unzip_mot_dataset, eval_setup, split_dataset from boxmot.appearance.reid.auto_backend import ReidAutoBackend checker = RequirementsChecker() checker.check_packages(('ultralytics @ git+https://github.com/mikel-brostrom/ultralytics.git', )) # install def cleanup_mot17(data_dir, keep_detection='FRCNN'): """ Cleans up the MOT17 dataset to resemble the MOT16 format by keeping only one detection folder per sequence. Skips sequences that have already been cleaned. Args: - data_dir (str): Path to the MOT17 train directory. - keep_detection (str): Detection type to keep (options: 'DPM', 'FRCNN', 'SDP'). Default is 'DPM'. """ # Get all folders in the train directory all_dirs = [d for d in os.listdir(data_dir) if os.path.isdir(os.path.join(data_dir, d))] # Identify unique sequences by removing detection suffixes unique_sequences = set(seq.split('-')[0] + '-' + seq.split('-')[1] for seq in all_dirs) for seq in unique_sequences: # Directory path to the cleaned sequence cleaned_seq_dir = os.path.join(data_dir, seq) # Skip if the sequence is already cleaned if os.path.exists(cleaned_seq_dir): print(f"Sequence {seq} is already cleaned. Skipping.") continue # Directories for each detection method seq_dirs = [os.path.join(data_dir, d) for d in all_dirs if d.startswith(seq)] # Directory path for the detection folder to keep keep_dir = os.path.join(data_dir, f"{seq}-{keep_detection}") if os.path.exists(keep_dir): # Move the directory to a new name (removing the detection suffix) shutil.move(keep_dir, cleaned_seq_dir) print(f"Moved {keep_dir} to {cleaned_seq_dir}") # Remove other detection directories for seq_dir in seq_dirs: if os.path.exists(seq_dir) and seq_dir != keep_dir: shutil.rmtree(seq_dir) print(f"Removed {seq_dir}") else: print(f"Directory for {seq} with {keep_detection} detection does not exist. Skipping.") print("MOT17 Cleanup completed!") def prompt_overwrite(path_type: str, path: str, ci: bool = True) -> bool: """ Prompts the user to confirm overwriting an existing file. Args: path_type (str): Type of the path (e.g., 'Detections and Embeddings', 'MOT Result'). path (str): The path to check. ci (bool): If True, automatically reuse existing file without prompting (for CI environments). Returns: bool: True if user confirms to overwrite, False otherwise. """ if ci: LOGGER.debug(f"{path_type} {path} already exists. Use existing due to no UI mode.") return False def input_with_timeout(prompt, timeout=3.0): print(prompt, end='', flush=True) result = [] input_received = threading.Event() def get_input(): user_input = sys.stdin.readline().strip().lower() result.append(user_input) input_received.set() input_thread = threading.Thread(target=get_input) input_thread.daemon = True # Ensure thread does not prevent program exit input_thread.start() input_thread.join(timeout) if input_received.is_set(): return result[0] in ['y', 'yes'] else: print("\nNo response, not proceeding with overwrite...") return False return input_with_timeout(f"{path_type} {path} already exists. Overwrite? [y/N]: ") def generate_dets_embs(args: argparse.Namespace, y: Path, source: Path) -> None: """ Generates detections and embeddings for the specified arguments, YOLO model and source. Args: args (Namespace): Parsed command line arguments. y (Path): Path to the YOLO model file. source (Path): Path to the source directory. """ WEIGHTS.mkdir(parents=True, exist_ok=True) if args.imgsz is None: args.imgsz = default_imgsz(y) yolo = YOLO( y if is_ultralytics_model(y) else 'yolov8n.pt', ) results = yolo( source=source, conf=args.conf, iou=args.iou, agnostic_nms=args.agnostic_nms, stream=True, device=args.device, verbose=False, exist_ok=args.exist_ok, project=args.project, name=args.name, classes=args.classes, imgsz=args.imgsz, vid_stride=args.vid_stride, ) if not is_ultralytics_model(y): m = get_yolo_inferer(y) yolo_model = m(model=y, device=yolo.predictor.device, args=yolo.predictor.args) yolo.predictor.model = yolo_model # If current model is YOLOX, change the preprocess and postprocess if is_yolox_model(y): # add callback to save image paths for further processing yolo.add_callback("on_predict_batch_start", lambda p: yolo_model.update_im_paths(p)) yolo.predictor.preprocess = ( lambda im: yolo_model.preprocess(im=im)) yolo.predictor.postprocess = ( lambda preds, im, im0s: yolo_model.postprocess(preds=preds, im=im, im0s=im0s)) reids = [] for r in args.reid_model: reid_model = ReidAutoBackend(weights=args.reid_model, device=yolo.predictor.device, half=args.half).model reids.append(reid_model) embs_path = args.project / 'dets_n_embs' / y.stem / 'embs' / r.stem / (source.parent.name + '.txt') embs_path.parent.mkdir(parents=True, exist_ok=True) embs_path.touch(exist_ok=True) if os.path.getsize(embs_path) > 0: open(embs_path, 'w').close() yolo.predictor.custom_args = args dets_path = args.project / 'dets_n_embs' / y.stem / 'dets' / (source.parent.name + '.txt') dets_path.parent.mkdir(parents=True, exist_ok=True) dets_path.touch(exist_ok=True) if os.path.getsize(dets_path) > 0: open(dets_path, 'w').close() with open(str(dets_path), 'ab+') as f: np.savetxt(f, [], fmt='%f', header=str(source)) for frame_idx, r in enumerate(tqdm(results, desc="Frames")): nr_dets = len(r.boxes) frame_idx = torch.full((1, 1), frame_idx + 1).repeat(nr_dets, 1) img = r.orig_img dets = np.concatenate( [ frame_idx, r.boxes.xyxy.to('cpu'), r.boxes.conf.unsqueeze(1).to('cpu'), r.boxes.cls.unsqueeze(1).to('cpu'), ], axis=1 ) # Filter dets with incorrect boxes: (x2 < x1 or y2 < y1) boxes = r.boxes.xyxy.to('cpu').numpy().round().astype(int) boxes_filter = ((np.maximum(0, boxes[:, 0]) < np.minimum(boxes[:, 2], img.shape[1])) & (np.maximum(0, boxes[:, 1]) < np.minimum(boxes[:, 3], img.shape[0]))) dets = dets[boxes_filter] with open(str(dets_path), 'ab+') as f: np.savetxt(f, dets, fmt='%f') for reid, reid_model_name in zip(reids, args.reid_model): embs = reid.get_features(dets[:, 1:5], img) embs_path = args.project / "dets_n_embs" / y.stem / 'embs' / reid_model_name.stem / (source.parent.name + '.txt') with open(str(embs_path), 'ab+') as f: np.savetxt(f, embs, fmt='%f') def generate_mot_results(args: argparse.Namespace, config_dict: dict = None) -> dict[str, np.ndarray]: """ Generates MOT results for the specified arguments and configuration. Args: args (Namespace): Parsed command line arguments. config_dict (dict, optional): Additional configuration dictionary. Returns: dict[str, np.ndarray]: {seq_name: array} with frame ids used for MOT """ args.device = select_device(args.device) tracker = create_tracker( args.tracking_method, TRACKER_CONFIGS / (args.tracking_method + '.yaml'), args.reid_model[0].with_suffix('.pt'), args.device, False, False, config_dict ) with open(args.dets_file_path, 'r') as file: source = Path(file.readline().strip().replace("# ", "")) dets = np.loadtxt(args.dets_file_path, skiprows=1) embs = np.loadtxt(args.embs_file_path) dets_n_embs = np.concatenate([dets, embs], axis=1) dataset = LoadImagesAndVideos(source) txt_path = args.exp_folder_path / (source.parent.name + '.txt') all_mot_results = [] # Change FPS if args.fps: # Extract original FPS conf_path = source.parent / 'seqinfo.ini' conf = configparser.ConfigParser() conf.read(conf_path) orig_fps = int(conf.get("Sequence", "frameRate")) if orig_fps < args.fps: LOGGER.warning(f"Original FPS ({orig_fps}) is lower than " f"requested FPS ({args.fps}) for sequence " f"{source.parent.name}. Using original FPS.") target_fps = orig_fps else: target_fps = args.fps step = orig_fps/target_fps else: step = 1 # Create list with frame numbers according to needed step frame_nums = np.arange(1, len(dataset) + 1, step).astype(int).tolist() seq_frame_nums = {source.parent.name: frame_nums.copy()} for frame_num, d in enumerate(tqdm(dataset, desc=source.parent.name), 1): # Filter using list with needed numbers if len(frame_nums) > 0: if frame_num < frame_nums[0]: continue else: frame_nums.pop(0) im = d[1][0] frame_dets_n_embs = dets_n_embs[dets_n_embs[:, 0] == frame_num] dets = frame_dets_n_embs[:, 1:7] embs = frame_dets_n_embs[:, 7:] tracks = tracker.update(dets, im, embs) if tracks.size > 0: mot_results = convert_to_mot_format(tracks, frame_num) all_mot_results.append(mot_results) if all_mot_results: all_mot_results = np.vstack(all_mot_results) else: all_mot_results = np.empty((0, 0)) write_mot_results(txt_path, all_mot_results) return seq_frame_nums def parse_mot_results(results: str) -> dict: """ Extracts the COMBINED HOTA, MOTA, IDF1 from the results generated by the run_mot_challenge.py script. Args: results (str): MOT results as a string. Returns: dict: A dictionary containing HOTA, MOTA, and IDF1 scores. """ combined_results = results.split('COMBINED')[2:-1] combined_results = [float(re.findall(r"[-+]?(?:\d*\.*\d+)", f)[0]) for f in combined_results] results_dict = {} for key, value in zip(["HOTA", "MOTA", "IDF1"], combined_results): results_dict[key] = value return results_dict def trackeval(args: argparse.Namespace, seq_paths: list, save_dir: Path, MOT_results_folder: Path, gt_folder: Path, metrics: list = ["HOTA", "CLEAR", "Identity"]) -> str: """ Executes a Python script to evaluate MOT challenge tracking results using specified metrics. Args: seq_paths (list): List of sequence paths. save_dir (Path): Directory to save evaluation results. MOT_results_folder (Path): Folder containing MOT results. gt_folder (Path): Folder containing ground truth data. metrics (list, optional): List of metrics to use for evaluation. Defaults to ["HOTA", "CLEAR", "Identity"]. Returns: str: Standard output from the evaluation script. """ d = [seq_path.parent.name for seq_path in seq_paths] args = [ sys.executable, EXAMPLES / 'val_utils' / 'scripts' / 'run_mot_challenge.py', "--GT_FOLDER", str(gt_folder), "--BENCHMARK", "", "--TRACKERS_FOLDER", args.exp_folder_path, "--TRACKERS_TO_EVAL", "", "--SPLIT_TO_EVAL", "train", "--METRICS", *metrics, "--USE_PARALLEL", "True", "--TRACKER_SUB_FOLDER", "", "--NUM_PARALLEL_CORES", str(4), "--SKIP_SPLIT_FOL", "True", "--GT_LOC_FORMAT", "{gt_folder}/{seq}/gt/gt_temp.txt", "--SEQ_INFO", *d ] p = subprocess.Popen( args=args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout, stderr = p.communicate() if stderr: print("Standard Error:\n", stderr) return stdout def run_generate_dets_embs(opt: argparse.Namespace) -> None: """ Runs the generate_dets_embs function for all YOLO models and source directories. Args: opt (Namespace): Parsed command line arguments. """ mot_folder_paths = sorted([item for item in Path(opt.source).iterdir()]) for y in opt.yolo_model: for i, mot_folder_path in enumerate(mot_folder_paths): dets_path = Path(opt.project) / 'dets_n_embs' / y.stem / 'dets' / (mot_folder_path.name + '.txt') embs_path = Path(opt.project) / 'dets_n_embs' / y.stem / 'embs' / (opt.reid_model[0].stem) / (mot_folder_path.name + '.txt') if dets_path.exists() and embs_path.exists(): if prompt_overwrite('Detections and Embeddings', dets_path, opt.ci): LOGGER.debug(f'Overwriting detections and embeddings for {mot_folder_path}...') else: LOGGER.debug(f'Skipping generation for {mot_folder_path} as they already exist.') continue LOGGER.debug(f'Generating detections and embeddings for data under {mot_folder_path} [{i + 1}/{len(mot_folder_paths)} seqs]') generate_dets_embs(opt, y, source=mot_folder_path / 'img1') def process_single_mot(opt: argparse.Namespace, d: Path, e: Path, evolve_config: dict): # Create a deep copy of opt so each task works independently new_opt = copy.deepcopy(opt) new_opt.dets_file_path = d new_opt.embs_file_path = e frames_dict = generate_mot_results(new_opt, evolve_config) return frames_dict def run_generate_mot_results(opt: argparse.Namespace, evolve_config: dict = None) -> None: """ Runs the generate_mot_results function for all YOLO models and detection/embedding files in parallel. """ for y in opt.yolo_model: exp_folder_path = opt.project / 'mot' / (f"{y.stem}_{opt.reid_model[0].stem}_{opt.tracking_method}") exp_folder_path = increment_path(path=exp_folder_path, sep="_", exist_ok=False) opt.exp_folder_path = exp_folder_path mot_folder_names = [item.stem for item in Path(opt.source).iterdir()] dets_folder = opt.project / "dets_n_embs" / y.stem / 'dets' embs_folder = opt.project / "dets_n_embs" / y.stem / 'embs' / opt.reid_model[0].stem dets_file_paths = sorted([ item for item in dets_folder.glob('*.txt') if not item.name.startswith('.') and item.stem in mot_folder_names ]) embs_file_paths = sorted([ item for item in embs_folder.glob('*.txt') if not item.name.startswith('.') and item.stem in mot_folder_names ]) LOGGER.info(f"\nStarting tracking on:\n\t{opt.source}\nwith preloaded dets\n\t({dets_folder.relative_to(ROOT)})\nand embs\n\t({embs_folder.relative_to(ROOT)})\nusing\n\t{opt.tracking_method}") tasks = [] # Create a thread pool to run each file pair in parallel with concurrent.futures.ThreadPoolExecutor() as executor: for d, e in zip(dets_file_paths, embs_file_paths): mot_result_path = exp_folder_path / (d.stem + '.txt') if mot_result_path.exists(): if prompt_overwrite('MOT Result', mot_result_path, opt.ci): LOGGER.info(f'Overwriting MOT result for {d.stem}...') else: LOGGER.info(f'Skipping MOT result generation for {d.stem} as it already exists.') continue # Submit the task to process this file pair in parallel tasks.append(executor.submit(process_single_mot, opt, d, e, evolve_config)) # Dict with {seq_name: [frame_nums]} seqs_frame_nums = {} # Wait for all tasks to complete and log any exceptions for future in concurrent.futures.as_completed(tasks): try: seqs_frame_nums.update(future.result()) except Exception as exc: LOGGER.error(f'Error processing file pair: {exc}') # Postprocess data with gsi if requested if opt.gsi: gsi(mot_results_folder=opt.exp_folder_path) with open(opt.exp_folder_path / 'seqs_frame_nums.json', 'w') as f: json.dump(seqs_frame_nums, f) def run_trackeval(opt: argparse.Namespace) -> dict: """ Runs the trackeval function to evaluate tracking results. Args: opt (Namespace): Parsed command line arguments. """ seq_paths, save_dir, MOT_results_folder, gt_folder = eval_setup(opt, opt.val_tools_path) trackeval_results = trackeval(opt, seq_paths, save_dir, MOT_results_folder, gt_folder) hota_mota_idf1 = parse_mot_results(trackeval_results) if opt.verbose: LOGGER.info(trackeval_results) with open(opt.tracking_method + "_output.json", "w") as outfile: outfile.write(json.dumps(hota_mota_idf1)) LOGGER.info(json.dumps(hota_mota_idf1)) return hota_mota_idf1 def run_all(opt: argparse.Namespace) -> None: """ Runs all stages of the pipeline: generate_dets_embs, generate_mot_results, and trackeval. Args: opt (Namespace): Parsed command line arguments. """ run_generate_dets_embs(opt) run_generate_mot_results(opt) run_trackeval(opt) def parse_opt() -> argparse.Namespace: parser = argparse.ArgumentParser() # Global arguments parser.add_argument('--yolo-model', nargs='+', type=Path, default=[WEIGHTS / 'yolov8n.pt'], help='yolo model path') parser.add_argument('--reid-model', nargs='+', type=Path, default=[WEIGHTS / 'osnet_x0_25_msmt17.pt'], help='reid model path') parser.add_argument('--source', type=str, help='file/dir/URL/glob, 0 for webcam') parser.add_argument('--imgsz', '--img', '--img-size', nargs='+', type=int, default=None, help='inference size h,w') parser.add_argument('--fps', type=int, default=None, help='video frame-rate') parser.add_argument('--conf', type=float, default=0.01, help='min confidence threshold') parser.add_argument('--iou', type=float, default=0.7, help='intersection over union (IoU) threshold for NMS') parser.add_argument('--device', default='', help='cuda device, i.e. 0 or 0,1,2,3 or cpu') parser.add_argument('--classes', nargs='+', type=int, default=0, help='filter by class: --classes 0, or --classes 0 2 3') parser.add_argument('--project', default=ROOT / 'runs', type=Path, help='save results to project/name') parser.add_argument('--name', default='', help='save results to project/name') parser.add_argument('--exist-ok', action='store_true', default=True, help='existing project/name ok, do not increment') parser.add_argument('--half', action='store_true', help='use FP16 half-precision inference') parser.add_argument('--vid-stride', type=int, default=1, help='video frame-rate stride') parser.add_argument('--ci', action='store_true', help='Automatically reuse existing due to no UI in CI') parser.add_argument('--tracking-method', type=str, default='deepocsort', help='deepocsort, botsort, strongsort, ocsort, bytetrack, imprassoc, boosttrack') parser.add_argument('--dets-file-path', type=Path, help='path to detections file') parser.add_argument('--embs-file-path', type=Path, help='path to embeddings file') parser.add_argument('--exp-folder-path', type=Path, help='path to experiment folder') parser.add_argument('--verbose', action='store_true', help='print results') parser.add_argument('--agnostic-nms', default=False, action='store_true', help='class-agnostic NMS') parser.add_argument('--gsi', action='store_true', help='apply Gaussian smooth interpolation postprocessing') parser.add_argument('--n-trials', type=int, default=4, help='nr of trials for evolution') parser.add_argument('--objectives', type=str, nargs='+', default=["HOTA", "MOTA", "IDF1"], help='set of objective metrics: HOTA,MOTA,IDF1') parser.add_argument('--val-tools-path', type=Path, default=EXAMPLES / 'val_utils', help='path to store trackeval repo in') parser.add_argument('--split-dataset', action='store_true', help='Use the second half of the dataset') subparsers = parser.add_subparsers(dest='command') # Subparser for generate_dets_embs generate_dets_embs_parser = subparsers.add_parser('generate_dets_embs', help='Generate detections and embeddings') generate_dets_embs_parser.add_argument('--source', type=str, required=True, help='file/dir/URL/glob, 0 for webcam') generate_dets_embs_parser.add_argument('--yolo-model', nargs='+', type=Path, default=WEIGHTS / 'yolov8n.pt', help='yolo model path') generate_dets_embs_parser.add_argument('--reid-model', nargs='+', type=Path, default=WEIGHTS / 'osnet_x0_25_msmt17.pt', help='reid model path') generate_dets_embs_parser.add_argument('--imgsz', '--img', '--img-size', nargs='+', type=int, default=[640], help='inference size h,w') generate_dets_embs_parser.add_argument('--classes', nargs='+', type=int, default=0, help='filter by class: --classes 0, or --classes 0 2 3') # Subparser for generate_mot_results generate_mot_results_parser = subparsers.add_parser('generate_mot_results', help='Generate MOT results') generate_mot_results_parser.add_argument('--yolo-model', nargs='+', type=Path, default=WEIGHTS / 'yolov8n.pt', help='yolo model path') generate_mot_results_parser.add_argument('--reid-model', nargs='+', type=Path, default=WEIGHTS / 'osnet_x0_25_msmt17.pt', help='reid model path') generate_mot_results_parser.add_argument('--tracking-method', type=str, default='deepocsort', help='deepocsort, botsort, strongsort, ocsort, bytetrack, imprassoc, boosttrack') generate_mot_results_parser.add_argument('--imgsz', '--img', '--img-size', nargs='+', type=int, default=[640], help='inference size h,w') # Subparser for trackeval trackeval_parser = subparsers.add_parser('trackeval', help='Evaluate tracking results') trackeval_parser.add_argument('--source', type=str, required=True, help='file/dir/URL/glob, 0 for webcam') trackeval_parser.add_argument('--exp-folder-path', type=Path, required=True, help='path to experiment folder') opt = parser.parse_args() source_path = Path(opt.source) opt.benchmark, opt.split = source_path.parent.name, source_path.name return opt if __name__ == "__main__": opt = parse_opt() # download MOT benchmark download_mot_eval_tools(opt.val_tools_path) if not Path(opt.source).exists(): zip_path = download_mot_dataset(opt.val_tools_path, opt.benchmark) unzip_mot_dataset(zip_path, opt.val_tools_path, opt.benchmark) if opt.benchmark == 'MOT17': cleanup_mot17(opt.source) if opt.split_dataset: opt.source, opt.benchmark = split_dataset(opt.source) if opt.command == 'generate_dets_embs': run_generate_dets_embs(opt) elif opt.command == 'generate_mot_results': run_generate_mot_results(opt) elif opt.command == 'trackeval': run_trackeval(opt) else: run_all(opt)