|
import gradio as gr |
|
import cv2 |
|
import numpy as np |
|
from tensorflow.keras.models import load_model |
|
from sklearn.preprocessing import StandardScaler |
|
from ultralytics import YOLO |
|
|
|
|
|
lstm_model = load_model('suspicious_activity_model.h5') |
|
yolo_model = YOLO('yolov8n-pose.pt') |
|
scaler = StandardScaler() |
|
|
|
def extract_keypoints(frame): |
|
""" |
|
Extracts normalized keypoints from a frame using YOLO pose model. |
|
""" |
|
results = yolo_model(frame, verbose=False) |
|
for r in results: |
|
if r.keypoints is not None and len(r.keypoints) > 0: |
|
|
|
keypoints = r.keypoints.xyn.tolist()[0] |
|
flattened_keypoints = [kp for keypoint in keypoints for kp in keypoint[:2]] |
|
return flattened_keypoints |
|
return None |
|
|
|
def process_frame(frame): |
|
""" |
|
Process each frame for suspicious activity detection |
|
""" |
|
|
|
results = yolo_model(frame, verbose=False) |
|
for box in results[0].boxes: |
|
cls = int(box.cls[0]) |
|
confidence = float(box.conf[0]) |
|
|
|
|
|
if cls == 0 and confidence > 0.5: |
|
x1, y1, x2, y2 = map(int, box.xyxy[0]) |
|
|
|
|
|
roi = frame[y1:y2, x1:x2] |
|
if roi.size > 0: |
|
|
|
keypoints = extract_keypoints(roi) |
|
|
|
if keypoints is not None and len(keypoints) > 0: |
|
|
|
keypoints_scaled = scaler.fit_transform([keypoints]) |
|
keypoints_reshaped = keypoints_scaled.reshape((1, 1, len(keypoints))) |
|
|
|
|
|
prediction = (lstm_model.predict(keypoints_reshaped) > 0.5).astype(int)[0][0] |
|
|
|
|
|
color = (0, 0, 255) if prediction == 1 else (0, 255, 0) |
|
label = 'Suspicious' if prediction == 1 else 'Normal' |
|
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) |
|
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) |
|
else: |
|
print("No valid keypoints detected for ROI. Skipping frame.") |
|
else: |
|
print("ROI size is zero. Skipping frame.") |
|
|
|
return frame |
|
|
|
def detect_suspicious_activity(input_video): |
|
""" |
|
Main function to process video for suspicious activity detection |
|
""" |
|
|
|
cap = cv2.VideoCapture(input_video) |
|
|
|
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
out = cv2.VideoWriter('output_video.mp4', fourcc, fps, (width, height)) |
|
|
|
|
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
processed_frame = process_frame(frame) |
|
|
|
|
|
out.write(processed_frame) |
|
|
|
|
|
cap.release() |
|
out.release() |
|
|
|
return 'output_video.mp4' |
|
|
|
|
|
iface = gr.Interface( |
|
fn=detect_suspicious_activity, |
|
inputs=gr.Video(label="Upload Video"), |
|
outputs=gr.Video(label="Processed Video"), |
|
title="Suspicious Activity Detection", |
|
description="Upload a video to detect suspicious activities using YOLO and LSTM models" |
|
) |
|
|
|
|
|
iface.launch() |