unnati026's picture
Update app.py
a30dae1
raw
history blame
No virus
4.11 kB
import streamlit as st
import cv2
import tempfile
from ultralytics import YOLO
import numpy as np
import time
alerting_classes = {
0: 'People',
2: 'Car',
7: 'Truck',
24: 'Backpack',
65: 'Suspicious handheld device',
26: 'Handbag',
28: 'Suitcase',
}
red_tint = np.array([[[0, 0, 255]]], dtype=np.uint8)
model1 = YOLO('yolov8n.pt')
st.title("Object Detection and Recognition")
video_file = st.file_uploader("Choose a video file", type=["mp4"])
if video_file is not None:
# Create temporary file for uploaded video
tfile = tempfile.NamedTemporaryFile(delete=False)
tfile.write(video_file.read())
# Open video capture using temporary file path
cap = cv2.VideoCapture(tfile.name)
alert_set = set(alerting_classes.keys())
alert_set.remove(0)
# Create red-tinted overlay
red_tinted_overlay = np.tile(red_tint, (1, 1, 1))
stop_button = st.button("Stop Inference")
processing_interrupted = False
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
progress_bar_processing_slot = st.empty()
# Collect frames in a list
frames = []
while cap.isOpened() and not processing_interrupted:
alert_flag = False
alert_reason = []
success, frame = cap.read()
# if the frame is read correctly ret is True
if not success:
# st.warning("Can't receive frame (stream end?). Exiting ...")
break
if success:
# Check if the stop button is clicked
if stop_button:
processing_interrupted = True
break
# Perform YOLO object detection
results = model1(frame, conf=0.35, verbose=False, classes=list(alerting_classes.keys()))
class_ids = results[0].boxes.cls.tolist()
class_counts = {cls: class_ids.count(cls) for cls in set(class_ids)}
for cls in alert_set:
if cls in class_counts and class_counts[cls] > 0:
alert_flag = True
alert_reason.append((cls, class_counts[cls]))
if class_counts.get(0, 0) > 5:
alert_flag = True
alert_reason.append((0, class_counts[0]))
text = 'ALERT!'
font = cv2.FONT_HERSHEY_DUPLEX
font_scale = 0.75
thickness = 2
size = cv2.getTextSize(text, font, font_scale, thickness)
x = 0
y = int((2 + size[0][1]))
img = results[0].plot()
if alert_flag:
# Resize the red-tinted overlay to match the image size
red_tinted_overlay = cv2.resize(red_tinted_overlay, (img.shape[1], img.shape[0]))
img = cv2.addWeighted(img, 0.7, red_tinted_overlay, 0.3, 0)
cv2.putText(img, text, (x, y), font, font_scale, (0, 0, 0), thickness)
y += int(size[0][1]) + 10 # Move to the next line
for cls, count in alert_reason:
alert_text = f'{count} {alerting_classes[cls]}'
cv2.putText(img, alert_text, (x, y), font, font_scale, (0, 0, 0), thickness)
y += int(size[0][1]) + 10 # Move to the next line
# Append the frame to the list
frames.append(img)
del results
# Update processing progress bar
current_frame_processing = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
progress = current_frame_processing / total_frames
progress_bar_processing_slot.progress(progress)
progress_bar_processing_slot.text(f"Processing... {int(progress * 100)}%")
# Release resources
cap.release()
tfile.close()
# Display frames one by one as a video with 24 FPS
if processing_interrupted:
st.text("User interrupted processing.")
else:
progress_bar_processing_slot.text("Done!")
video_placeholder = st.image([], channels="BGR", caption="YOLOv8 Inference")
progress_bar_display = st.progress(0)
fps_delay = 1 / 24 # Delay to achieve 24 FPS