File size: 6,787 Bytes
8de8135
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# Ultralytics YOLO πŸš€, AGPL-3.0 license

from collections import defaultdict

import cv2

from ultralytics.utils.checks import check_imshow, check_requirements
from ultralytics.utils.plotting import Annotator, colors

check_requirements("shapely>=2.0.0")

from shapely.geometry import Point, Polygon


class QueueManager:
    """A class to manage the queue in a real-time video stream based on object tracks."""

    def __init__(
        self,
        classes_names,
        reg_pts=None,
        line_thickness=2,
        track_thickness=2,
        view_img=False,
        region_color=(255, 0, 255),
        view_queue_counts=True,
        draw_tracks=False,
        count_txt_color=(255, 255, 255),
        track_color=None,
        region_thickness=5,
        fontsize=0.7,
    ):
        """
        Initializes the QueueManager with specified parameters for tracking and counting objects.

        Args:
            classes_names (dict): A dictionary mapping class IDs to class names.
            reg_pts (list of tuples, optional): Points defining the counting region polygon. Defaults to a predefined
                rectangle.
            line_thickness (int, optional): Thickness of the annotation lines. Defaults to 2.
            track_thickness (int, optional): Thickness of the track lines. Defaults to 2.
            view_img (bool, optional): Whether to display the image frames. Defaults to False.
            region_color (tuple, optional): Color of the counting region lines (BGR). Defaults to (255, 0, 255).
            view_queue_counts (bool, optional): Whether to display the queue counts. Defaults to True.
            draw_tracks (bool, optional): Whether to draw tracks of the objects. Defaults to False.
            count_txt_color (tuple, optional): Color of the count text (BGR). Defaults to (255, 255, 255).
            track_color (tuple, optional): Color of the tracks. If None, different colors will be used for different
                tracks. Defaults to None.
            region_thickness (int, optional): Thickness of the counting region lines. Defaults to 5.
            fontsize (float, optional): Font size for the text annotations. Defaults to 0.7.
        """

        # Mouse events state
        self.is_drawing = False
        self.selected_point = None

        # Region & Line Information
        self.reg_pts = reg_pts if reg_pts is not None else [(20, 60), (20, 680), (1120, 680), (1120, 60)]
        self.counting_region = (
            Polygon(self.reg_pts) if len(self.reg_pts) >= 3 else Polygon([(20, 60), (20, 680), (1120, 680), (1120, 60)])
        )
        self.region_color = region_color
        self.region_thickness = region_thickness

        # Image and annotation Information
        self.im0 = None
        self.tf = line_thickness
        self.view_img = view_img
        self.view_queue_counts = view_queue_counts
        self.fontsize = fontsize

        self.names = classes_names  # Class names
        self.annotator = None  # Annotator
        self.window_name = "Ultralytics YOLOv8 Queue Manager"

        # Object counting Information
        self.counts = 0
        self.count_txt_color = count_txt_color

        # Tracks info
        self.track_history = defaultdict(list)
        self.track_thickness = track_thickness
        self.draw_tracks = draw_tracks
        self.track_color = track_color

        # Check if environment supports imshow
        self.env_check = check_imshow(warn=True)

    def extract_and_process_tracks(self, tracks):
        """Extracts and processes tracks for queue management in a video stream."""

        # Initialize annotator and draw the queue region
        self.annotator = Annotator(self.im0, self.tf, self.names)

        if tracks[0].boxes.id is not None:
            boxes = tracks[0].boxes.xyxy.cpu()
            clss = tracks[0].boxes.cls.cpu().tolist()
            track_ids = tracks[0].boxes.id.int().cpu().tolist()

            # Extract tracks
            for box, track_id, cls in zip(boxes, track_ids, clss):
                # Draw bounding box
                self.annotator.box_label(box, label=f"{self.names[cls]}#{track_id}", color=colors(int(track_id), True))

                # Update track history
                track_line = self.track_history[track_id]
                track_line.append((float((box[0] + box[2]) / 2), float((box[1] + box[3]) / 2)))
                if len(track_line) > 30:
                    track_line.pop(0)

                # Draw track trails if enabled
                if self.draw_tracks:
                    self.annotator.draw_centroid_and_tracks(
                        track_line,
                        color=self.track_color or colors(int(track_id), True),
                        track_thickness=self.track_thickness,
                    )

                prev_position = self.track_history[track_id][-2] if len(self.track_history[track_id]) > 1 else None

                # Check if the object is inside the counting region
                if len(self.reg_pts) >= 3:
                    is_inside = self.counting_region.contains(Point(track_line[-1]))
                    if prev_position is not None and is_inside:
                        self.counts += 1

        # Display queue counts
        label = f"Queue Counts : {str(self.counts)}"
        if label is not None:
            self.annotator.queue_counts_display(
                label,
                points=self.reg_pts,
                region_color=self.region_color,
                txt_color=self.count_txt_color,
            )

        self.counts = 0  # Reset counts after displaying
        self.display_frames()

    def display_frames(self):
        """Displays the current frame with annotations."""
        if self.env_check:
            self.annotator.draw_region(reg_pts=self.reg_pts, thickness=self.region_thickness, color=self.region_color)
            cv2.namedWindow(self.window_name)
            cv2.imshow(self.window_name, self.im0)
            # Close window on 'q' key press
            if cv2.waitKey(1) & 0xFF == ord("q"):
                return

    def process_queue(self, im0, tracks):
        """
        Main function to start the queue management process.

        Args:
            im0 (ndarray): Current frame from the video stream.
            tracks (list): List of tracks obtained from the object tracking process.
        """
        self.im0 = im0  # Store the current frame
        self.extract_and_process_tracks(tracks)  # Extract and process tracks

        if self.view_img:
            self.display_frames()  # Display the frame if enabled
        return self.im0


if __name__ == "__main__":
    classes_names = {0: "person", 1: "car"}  # example class names
    queue_manager = QueueManager(classes_names)