import gradio as gr import cv2 import numpy as np from ultralytics import YOLO from collections import defaultdict import tempfile import os class PersonCounter: def __init__(self, line_position=0.5): self.model = YOLO("yolov8n.pt") self.tracker = defaultdict(list) self.crossed_ids = set() self.line_position = line_position self.count = 0 def process_frame(self, frame): height, width = frame.shape[:2] line_y = int(height * self.line_position) # Draw counting line cv2.line(frame, (0, line_y), (width, line_y), (0, 255, 0), 2) # Run detection and tracking results = self.model.track(frame, persist=True, classes=[0]) if results[0].boxes.id is not None: boxes = results[0].boxes.xyxy.cpu().numpy() track_ids = results[0].boxes.id.cpu().numpy().astype(int) for box, track_id in zip(boxes, track_ids): # Draw bounding box cv2.rectangle(frame, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (255, 0, 0), 2) # Get feet position center_x = (box[0] + box[2]) / 2 feet_y = box[3] # Draw tracking point cv2.circle(frame, (int(center_x), int(feet_y)), 5, (0, 255, 255), -1) # Store tracking history if track_id in self.tracker: prev_y = self.tracker[track_id][-1] # Check if person has crossed the line if prev_y < line_y and feet_y >= line_y and track_id not in self.crossed_ids: self.crossed_ids.add(track_id) self.count += 1 # Draw crossing indicator cv2.circle(frame, (int(center_x), int(line_y)), 8, (0, 0, 255), -1) self.tracker[track_id] = [feet_y] # Draw count with background count_text = f"Count: {self.count}" font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 1.5 thickness = 3 (text_width, text_height), _ = cv2.getTextSize(count_text, font, font_scale, thickness) cv2.rectangle(frame, (10, 10), (20 + text_width, 20 + text_height), (0, 0, 0), -1) cv2.putText(frame, count_text, (15, 15 + text_height), font, font_scale, (0, 255, 0), thickness) return frame def process_video(video_path, progress=gr.Progress()): # Create temp directory for output temp_dir = tempfile.mkdtemp() output_path = os.path.join(temp_dir, "result.mp4") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): raise ValueError("Could not open video file") width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = int(cap.get(cv2.CAP_PROP_FPS)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) counter = PersonCounter(line_position=0.5) for frame_idx in progress.tqdm(range(total_frames)): ret, frame = cap.read() if not ret: break processed_frame = counter.process_frame(frame) writer.write(processed_frame) cap.release() writer.release() return output_path, f"Final count: {counter.count} people entered" # Create Gradio interface demo = gr.Interface( fn=process_video, inputs=gr.Video(label="Upload a video file"), outputs=[ gr.Video(label="Processed Video"), gr.Textbox(label="Results") ], title="Store Entry People Counter", description="Upload a video to count the number of people entering through a line. The green line represents the counting threshold, blue boxes show detected people, and the counter increases when someone crosses the line from top to bottom.", examples=[], cache_examples=False ) if __name__ == "__main__": demo.launch()