| | """ |
| | Annotation Parser Script |
| | |
| | This script is used for annotating video files with anomaly information, such as the number of frames containing anomalies |
| | and the start-end frame couples where anomalies occur. It provides a graphical interface for replaying videos, pausing, |
| | marking frames with anomalies, and undoing markings. |
| | |
| | Usage: |
| | python video_annotator.py --root_dir <root_directory> --video_subdir <video_subdirectory> --annotation_file <output_annotation_file> --anomaly_type <anomaly_class_name> |
| | |
| | Parameters: |
| | --root_dir: Path to the root directory containing the video files. |
| | --video_subdir: Subdirectory within the root directory where video files are located. |
| | --annotation_file: Path to the output annotation file where annotated information will be saved. |
| | --anomaly_type: The class name or type of anomalies to be annotated. |
| | |
| | Note: |
| | Please run this in your local environment and then transfer the annotation file to the server for |
| | further tasks. The server environment may have plugin issues due to the opencv-python. |
| | """ |
| | import cv2 |
| | import sys |
| | import os |
| | import argparse |
| |
|
| |
|
| | def get_args(): |
| | parser = argparse.ArgumentParser(description="Annotation Parser") |
| | |
| | parser.add_argument('--root_dir', type=str, default="H:\\Projects\\VAD\\Test", |
| | help="path to root directory") |
| | parser.add_argument('--video_subdir', type=str, default="Anomaly_videos", |
| | help="path to video subfolder") |
| | parser.add_argument('--annotation_file', type=str, default="H:\\Projects\\VAD\\Test\\Anomaly_videos.txt", |
| | help="the output annotation file") |
| | parser.add_argument('--anomaly_type', type=str, default='Abnormal', help="The class name of videos.") |
| |
|
| | return parser.parse_args() |
| |
|
| |
|
| | def get_files(folder_path): |
| | |
| | video_files = [] |
| |
|
| | |
| | for root, dirs, files in os.walk(folder_path): |
| | for file in files: |
| | if file.lower().endswith(('.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv')): |
| | video_files.append(os.path.join(root, file)) |
| |
|
| | return video_files |
| |
|
| |
|
| | if __name__ == "__main__": |
| | args = get_args() |
| | |
| | video_dir = os.path.join(args.root_dir, args.video_subdir) |
| | files = get_files(video_dir) |
| |
|
| | for filename in files: |
| | if not filename.endswith(".mp4"): |
| | continue |
| |
|
| | cap = cv2.VideoCapture(filename) |
| | num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| | print(f"{filename}: {num_frames}") |
| |
|
| | if not cap.isOpened(): |
| | print("Error opening video") |
| | sys.exit(1) |
| |
|
| | mem = [] |
| | actions = [] |
| |
|
| | counter = 0 |
| | max_frame = 0 |
| | marked_frame = 0 |
| |
|
| | playing = True |
| | marking = False |
| |
|
| | while cap.isOpened(): |
| | if playing: |
| | if counter == max_frame: |
| | ret, frame = cap.read() |
| | if ret: |
| | if len(mem) == 200: |
| | mem.pop(0) |
| | mem.append(frame) |
| | else: |
| | break |
| | counter += 1 |
| | max_frame += 1 |
| |
|
| | else: |
| | frame = mem[counter - max_frame - 1] |
| | counter += 1 |
| | else: |
| | frame = mem[counter - max_frame - 1] |
| | cv2.putText(frame, f"{len(mem) + counter - max_frame}", (50, 50), fontFace=cv2.FONT_HERSHEY_SIMPLEX, |
| | fontScale=1, color=(255, 255, 255)) |
| |
|
| | if marking: |
| | cv2.circle(frame, (int(frame.shape[1]/2), 50), 10, (0, 0, 255), -1) |
| |
|
| | cv2.imshow("Replay", frame) |
| | key = cv2.waitKey(60) |
| | if key & 0xFF == ord(' '): |
| | playing = not playing |
| | elif key & 0xFF == ord('a') and not playing: |
| | if (max_frame - counter) < len(mem) - 11: |
| | counter -= 10 |
| | elif key & 0xFF == ord('d') and not playing: |
| | if (max_frame - counter) > 10: |
| | counter += 10 |
| | elif key & 0xFF == ord(',') and not playing: |
| | if (max_frame - counter) < len(mem) - 1: |
| | counter -= 1 |
| | elif key & 0xFF == ord('.') and not playing: |
| | if (max_frame - counter) > 0: |
| | counter += 1 |
| | elif key & 0xFF == ord('m'): |
| | if marking: |
| | actions.append((marked_frame, counter)) |
| | else: |
| | marked_frame = counter |
| | marking = not marking |
| | print(f'marking {counter}, waiting for the end frame: {marking}') |
| | elif key & 0xFF == ord('z'): |
| | print(f'cancel last marking') |
| | if not marking: |
| | actions.pop(-1) |
| | marking = not marking |
| | elif key & 0xFF == ord('s'): |
| | break |
| | elif key & 0xFF == ord('q'): |
| | sys.exit(1) |
| |
|
| | |
| | |
| | num_action = len(actions) |
| | dir_and_file = os.path.split(filename) |
| | relative_path = os.path.relpath(filename, args.root_dir) |
| | line = f"{relative_path} {args.anomaly_type} {num_frames}" |
| | for i in range(num_action): |
| | line += f" {actions[i][0]} {actions[i][1]}" |
| | line += '\n' |
| |
|
| | with open(args.annotation_file, "a") as f: |
| | f.write(line) |
| |
|
| | cap.release() |
| | cv2.destroyWindow("Replay") |
| |
|
| | cv2.destroyAllWindows() |
| |
|