import streamlit as st import cv2 import tempfile from ultralytics import YOLO import numpy as np import time alerting_classes = { 0: 'People', 2: 'Car', 7: 'Truck', 24: 'Backpack', 65: 'Suspicious handheld device', 26: 'Handbag', 28: 'Suitcase', } red_tint = np.array([[[0, 0, 255]]], dtype=np.uint8) model1 = YOLO('yolov8n.pt') st.title("Object Detection and Recognition") video_file = st.file_uploader("Choose a video file", type=["mp4"]) if video_file is not None: # Create temporary file for uploaded video tfile = tempfile.NamedTemporaryFile(delete=False) tfile.write(video_file.read()) # Open video capture using temporary file path cap = cv2.VideoCapture(tfile.name) alert_set = set(alerting_classes.keys()) alert_set.remove(0) # Create red-tinted overlay red_tinted_overlay = np.tile(red_tint, (1, 1, 1)) stop_button = st.button("Stop Inference") processing_interrupted = False total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) progress_bar_processing_slot = st.empty() # Collect frames in a list frames = [] while cap.isOpened() and not processing_interrupted: alert_flag = False alert_reason = [] success, frame = cap.read() # if the frame is read correctly ret is True if not success: # st.warning("Can't receive frame (stream end?). Exiting ...") break if success: # Check if the stop button is clicked if stop_button: processing_interrupted = True break # Perform YOLO object detection results = model1(frame, conf=0.35, verbose=False, classes=list(alerting_classes.keys())) class_ids = results[0].boxes.cls.tolist() class_counts = {cls: class_ids.count(cls) for cls in set(class_ids)} for cls in alert_set: if cls in class_counts and class_counts[cls] > 0: alert_flag = True alert_reason.append((cls, class_counts[cls])) if class_counts.get(0, 0) > 5: alert_flag = True alert_reason.append((0, class_counts[0])) text = 'ALERT!' font = cv2.FONT_HERSHEY_DUPLEX font_scale = 0.75 thickness = 2 size = cv2.getTextSize(text, font, font_scale, thickness) x = 0 y = int((2 + size[0][1])) img = results[0].plot() if alert_flag: # Resize the red-tinted overlay to match the image size red_tinted_overlay = cv2.resize(red_tinted_overlay, (img.shape[1], img.shape[0])) img = cv2.addWeighted(img, 0.7, red_tinted_overlay, 0.3, 0) cv2.putText(img, text, (x, y), font, font_scale, (0, 0, 0), thickness) y += int(size[0][1]) + 10 # Move to the next line for cls, count in alert_reason: alert_text = f'{count} {alerting_classes[cls]}' cv2.putText(img, alert_text, (x, y), font, font_scale, (0, 0, 0), thickness) y += int(size[0][1]) + 10 # Move to the next line # Append the frame to the list frames.append(img) del results # Update processing progress bar current_frame_processing = int(cap.get(cv2.CAP_PROP_POS_FRAMES)) progress = current_frame_processing / total_frames progress_bar_processing_slot.progress(progress) progress_bar_processing_slot.text(f"Processing... {int(progress * 100)}%") # Release resources cap.release() tfile.close() # Display frames one by one as a video with 24 FPS if processing_interrupted: st.text("User interrupted processing.") else: progress_bar_processing_slot.text("Done!") video_placeholder = st.image([], channels="BGR", caption="YOLOv8 Inference") progress_bar_display = st.progress(0) fps_delay = 1 / 24 # Delay to achieve 24 FPS