|
import cv2, os, time, math |
|
import numpy as np |
|
from skimage.metrics import structural_similarity as ssim |
|
import matplotlib.pyplot as plt |
|
|
|
def compute_optical_flow(prev_gray, curr_gray): |
|
flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) |
|
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1]) |
|
|
|
return np.max(magnitude) |
|
|
|
def compute_orb_distance(prev_frame, curr_frame, match_threshold = 40): |
|
|
|
orb = cv2.ORB_create() |
|
|
|
|
|
kp1, des1 = orb.detectAndCompute(prev_frame, None) |
|
kp2, des2 = orb.detectAndCompute(curr_frame, None) |
|
|
|
|
|
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) |
|
|
|
|
|
orig_matches = bf.match(des1, des2) |
|
|
|
matches = [match for match in orig_matches if match.distance < match_threshold] |
|
|
|
|
|
matches = sorted(matches, key=lambda x: x.distance) |
|
|
|
|
|
num_matches = len(matches) |
|
if num_matches == 0: |
|
return 0 |
|
|
|
max_descriptor_distance = max(match.distance for match in matches[:num_matches]) |
|
|
|
|
|
euclidean_distances = [] |
|
for match in matches[:num_matches]: |
|
|
|
pt1 = np.array(kp1[match.queryIdx].pt) |
|
pt2 = np.array(kp2[match.trainIdx].pt) |
|
|
|
|
|
euclidean_distance = np.sqrt((pt1[0] - pt2[0])**2 + (pt1[1] - pt2[1])**2) |
|
|
|
euclidean_distances.append(euclidean_distance) |
|
|
|
|
|
max_movement_distance = np.max(euclidean_distances) |
|
|
|
|
|
normalized_descriptor_distance = max_descriptor_distance / 256 |
|
|
|
|
|
|
|
return max_movement_distance |
|
|
|
|
|
def compute_ssim(prev_frame, curr_frame): |
|
return ssim(prev_frame, curr_frame, data_range=255) |
|
|
|
def compute_pixel_diff(prev_frame, curr_frame): |
|
diff = cv2.absdiff(prev_frame, curr_frame) |
|
return np.mean(diff) |
|
|
|
def preprocess_frame(frame, width=640, height=360): |
|
target_size = (width, height) |
|
resized_frame = cv2.resize(frame, target_size, interpolation=cv2.INTER_AREA) |
|
return resized_frame |
|
|
|
def smooth_curve(data, window_size=5): |
|
return np.convolve(data, np.ones(window_size)/window_size, mode='valid') |
|
|
|
def find_timestamp_clusters(fast_motion_timestamps, min_time_gap=5): |
|
clusters = [] |
|
current_cluster = [] |
|
|
|
for i, timestamp in enumerate(fast_motion_timestamps): |
|
|
|
if i == 0: |
|
current_cluster.append(timestamp) |
|
else: |
|
|
|
if timestamp - fast_motion_timestamps[i-1] <= min_time_gap: |
|
|
|
current_cluster.append(timestamp) |
|
else: |
|
|
|
clusters.append(current_cluster) |
|
current_cluster = [timestamp] |
|
|
|
|
|
if current_cluster: |
|
clusters.append(current_cluster) |
|
|
|
return clusters |
|
|
|
|
|
def detect_fast_motion(video_path, output_dir, end_time, start_time, window_size=3, motion_threshold=0.6, step = 2): |
|
cap = cv2.VideoCapture(video_path) |
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) |
|
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) |
|
|
|
orb_scores = [] |
|
|
|
ssim_scores = [] |
|
|
|
timestamps = [] |
|
frame_list = [] |
|
|
|
prev_frame = None |
|
frame_count = 0 |
|
|
|
while cap.isOpened(): |
|
ret, orig_frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
if height == 360 and width == 640: |
|
frame = orig_frame |
|
else: |
|
frame = preprocess_frame(orig_frame, width = 640, height = 360) |
|
|
|
|
|
if frame_count > end_time * fps: |
|
break |
|
|
|
if frame_count < start_time * fps or frame_count % step != 0: |
|
frame_count += 1 |
|
continue |
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
|
if prev_frame is not None: |
|
|
|
orb_scores.append(compute_orb_distance(prev_frame, gray)) |
|
ssim_scores.append(compute_ssim(prev_frame, gray)) |
|
|
|
|
|
timestamps.append(frame_count/fps) |
|
else: |
|
|
|
orb_scores.append(0) |
|
ssim_scores.append(1) |
|
timestamps.append(start_time) |
|
|
|
frame_list.append(frame) |
|
prev_frame = gray |
|
frame_count += 1 |
|
|
|
|
|
|
|
|
|
cap.release() |
|
|
|
new_fps = len(timestamps)/ (max(timestamps) - min(timestamps)) |
|
print(f"fps : {fps} frame_height : {height} frame_width : {width} New fps is {new_fps}") |
|
|
|
|
|
|
|
ssim_scores = (1 - np.array(ssim_scores)) * new_fps |
|
orb_scores = (np.array(orb_scores) * new_fps)/(np.sqrt(640**2 + 360**2)) |
|
|
|
|
|
smoothed_ssim_scores = smooth_curve(ssim_scores, window_size=window_size) |
|
smoothed_orb_scores = smooth_curve(orb_scores, window_size=window_size) |
|
|
|
|
|
|
|
|
|
combined_scores = (0.3 * orb_scores) + (0.7 * ssim_scores) |
|
smoothed_combined_scores = (0.3 * smoothed_orb_scores) + (0.7 * smoothed_ssim_scores) |
|
|
|
|
|
adjusted_timestamps = timestamps[window_size // 2 : -(window_size // 2)] |
|
|
|
|
|
fast_motion_timestamps = [] |
|
fast_motion_frames = [] |
|
fast_motion_mags = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ids = [] |
|
for i in range(len(combined_scores)): |
|
if combined_scores[i] > motion_threshold: |
|
fast_motion_mags.append(combined_scores[i]) |
|
fast_motion_timestamps.append(timestamps[i]) |
|
fast_motion_frames.append(frame_list[i]) |
|
ids.append(i) |
|
|
|
padded_fast_motion_frames = [] |
|
padded_fast_motion_timestamps = [] |
|
|
|
if len(ids) < 5 and len(ids) > 0: |
|
|
|
padded_fast_motion_frames.extend(frame_list[min(ids) - 2:min(ids)]) |
|
padded_fast_motion_timestamps.extend(timestamps[min(ids) - 2:min(ids)]) |
|
|
|
padded_fast_motion_frames.extend(fast_motion_frames) |
|
padded_fast_motion_timestamps.extend(fast_motion_timestamps) |
|
|
|
padded_fast_motion_frames.extend(frame_list[max(ids) + 1:max(ids) + 3]) |
|
padded_fast_motion_timestamps.extend(timestamps[max(ids) + 1:max(ids) + 3]) |
|
print(f"padded_fast_motion_timestamps are {padded_fast_motion_timestamps}. Length of padded_fast_motion_timestamps is {len(padded_fast_motion_frames)}") |
|
else: |
|
padded_fast_motion_frames = fast_motion_frames |
|
padded_fast_motion_timestamps = fast_motion_timestamps |
|
|
|
|
|
plt.figure(figsize=(12, 6)) |
|
plt.plot(adjusted_timestamps, smoothed_orb_scores, label='ORB Distance') |
|
plt.plot(adjusted_timestamps, smoothed_ssim_scores, label='Inverted SSIM') |
|
|
|
plt.plot(adjusted_timestamps, smoothed_combined_scores, label='Combined Score') |
|
plt.axhline(y=motion_threshold, color='r', linestyle='--', label='Threshold') |
|
plt.xlabel('Frame') |
|
plt.ylabel('Normalized Score') |
|
plt.title('Motion Detection Metrics') |
|
plt.legend() |
|
plt.savefig(f"{output_dir}/motion_detection_plot_smoothened_{video_path.split('/')[-1].split('.')[0]}.png") |
|
|
|
|
|
plt.figure(figsize=(12, 6)) |
|
|
|
plt.plot(timestamps, ssim_scores, label='Inverted SSIM') |
|
|
|
plt.plot(timestamps, combined_scores, label='Combined Score') |
|
plt.axhline(y=motion_threshold, color='r', linestyle='--', label='Threshold') |
|
plt.xlabel('Frame') |
|
plt.ylabel('Normalized Score') |
|
plt.title('Motion Detection Metrics') |
|
plt.legend() |
|
plt.savefig(f"{output_dir}/motion_detection_plot_raw_{video_path.split('/')[-1].split('.')[0]}.png") |
|
|
|
|
|
|
|
print(f"Max motion score is {np.max(combined_scores)} and mean motion score is {np.mean(combined_scores)} from {np.min(timestamps)} to {np.max(timestamps)}") |
|
print(f"Detected {len(fast_motion_timestamps)} frames when step = {step}.") |
|
try: |
|
print(f"fast motion between {np.min(fast_motion_timestamps)} and {np.max(fast_motion_timestamps)}") |
|
except: |
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(fast_motion_timestamps) == 0: |
|
print("FAST MOTION NOT DETECTED!") |
|
return [], [] |
|
elif len(fast_motion_timestamps) > 0.5 * len(combined_scores): |
|
print("More than half of the video has fast motion") |
|
return fast_motion_timestamps, padded_fast_motion_frames |
|
else: |
|
timestamp_clusters = find_timestamp_clusters(fast_motion_timestamps, min_time_gap = 5) |
|
for timestamp_cluster in timestamp_clusters: |
|
print(f"min time : {np.min(timestamp_cluster)} max time : {np.max(timestamp_cluster)} length : {len(timestamp_cluster)}") |
|
return timestamp_clusters, padded_fast_motion_frames |
|
|
|
|
|
''' |
|
# Open the video file |
|
video_path = "../test_videos/" |
|
mp4_files = [f for f in os.listdir(video_path) if f.endswith('.mp4')] |
|
output_dir = "motion_detection_results" |
|
os.system(f"rm -rf {output_dir}") |
|
os.system(f"mkdir {output_dir}") |
|
end_time = 15 |
|
start_time = 0 |
|
|
|
for mp4_file in mp4_files: |
|
print(f"\nAnalyzing video {mp4_file}") |
|
|
|
if mp4_file == "8.mp4": |
|
end_time = 60 |
|
start_time = 0 |
|
elif mp4_file == "6.mp4": |
|
end_time = 32 |
|
start_time = 0 |
|
elif mp4_file == "3.mp4": |
|
end_time = 6.5 #To remove last few frames that are blurry |
|
start_time = 0 |
|
elif mp4_file == "2.mp4": |
|
end_time = 182 |
|
start_time = 140 |
|
else: |
|
end_time = 15 |
|
start_time = 0 |
|
|
|
#if mp4_file != "3.mp4" and mp4_file != "5.mp4" and mp4_file != "6.mp4": |
|
# continue |
|
|
|
start = time.time() |
|
fast_motion_timestamps = detect_fast_motion(video_path + mp4_file, output_dir, end_time, start_time, motion_threshold = 1.5) |
|
end = time.time() |
|
|
|
print(f"Execution time for {mp4_file} : {end - start} seconds. Duration of the video was {end_time - start_time} seconds") |
|
''' |