import cv2, os, time, math |
import numpy as np |
from skimage.metrics import structural_similarity as ssim |
import matplotlib.pyplot as plt |
def compute_optical_flow(prev_gray, curr_gray): |
flow = cv2.calcOpticalFlowFarneback(prev_gray, curr_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) |
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1]) |
return np.max(magnitude) |
def compute_orb_distance(prev_frame, curr_frame, match_threshold = 40): |
orb = cv2.ORB_create() |
kp1, des1 = orb.detectAndCompute(prev_frame, None) |
kp2, des2 = orb.detectAndCompute(curr_frame, None) |
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True) |
orig_matches = bf.match(des1, des2) |
matches = [match for match in orig_matches if match.distance < match_threshold] |
matches = sorted(matches, key=lambda x: x.distance) |
num_matches = len(matches) |
if num_matches == 0: |
return 0 |
max_descriptor_distance = max(match.distance for match in matches[:num_matches]) |
euclidean_distances = [] |
for match in matches[:num_matches]: |
pt1 = np.array(kp1[match.queryIdx].pt) |
pt2 = np.array(kp2[match.trainIdx].pt) |
euclidean_distance = np.sqrt((pt1[0] - pt2[0])**2 + (pt1[1] - pt2[1])**2) |
euclidean_distances.append(euclidean_distance) |
max_movement_distance = np.max(euclidean_distances) |
normalized_descriptor_distance = max_descriptor_distance / 256 |
return max_movement_distance |
def compute_ssim(prev_frame, curr_frame): |
return ssim(prev_frame, curr_frame, data_range=255) |
def compute_pixel_diff(prev_frame, curr_frame): |
diff = cv2.absdiff(prev_frame, curr_frame) |
return np.mean(diff) |
def preprocess_frame(frame, width=640, height=360): |
target_size = (width, height) |
resized_frame = cv2.resize(frame, target_size, interpolation=cv2.INTER_AREA) |
return resized_frame |
def smooth_curve(data, window_size=5): |
return np.convolve(data, np.ones(window_size)/window_size, mode='valid') |
def find_timestamp_clusters(fast_motion_timestamps, min_time_gap=5): |
clusters = [] |
current_cluster = [] |
for i, timestamp in enumerate(fast_motion_timestamps): |
if i == 0: |
current_cluster.append(timestamp) |
else: |
if timestamp - fast_motion_timestamps[i-1] <= min_time_gap: |
current_cluster.append(timestamp) |
else: |
clusters.append(current_cluster) |
current_cluster = [timestamp] |
if current_cluster: |
clusters.append(current_cluster) |
return clusters |
def detect_fast_motion(video_path, output_dir, end_time, start_time, window_size=3, motion_threshold=0.6, step = 2): |
cap = cv2.VideoCapture(video_path) |
fps = cap.get(cv2.CAP_PROP_FPS) |
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT) |
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH) |
orb_scores = [] |
ssim_scores = [] |
timestamps = [] |
frame_list = [] |
prev_frame = None |
frame_count = 0 |
while cap.isOpened(): |
ret, orig_frame = cap.read() |
if not ret: |
break |
if height == 360 and width == 640: |
frame = orig_frame |
else: |
frame = preprocess_frame(orig_frame, width = 640, height = 360) |
if frame_count > end_time * fps: |
break |
if frame_count < start_time * fps or frame_count % step != 0: |
frame_count += 1 |
continue |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
if prev_frame is not None: |
orb_scores.append(compute_orb_distance(prev_frame, gray)) |
ssim_scores.append(compute_ssim(prev_frame, gray)) |
timestamps.append(frame_count/fps) |
else: |
orb_scores.append(0) |
ssim_scores.append(1) |
timestamps.append(start_time) |
frame_list.append(frame) |
prev_frame = gray |
frame_count += 1 |
cap.release() |
new_fps = len(timestamps)/ (max(timestamps) - min(timestamps)) |
print(f"fps : {fps} frame_height : {height} frame_width : {width} New fps is {new_fps}") |
ssim_scores = (1 - np.array(ssim_scores)) * new_fps |
orb_scores = (np.array(orb_scores) * new_fps)/(np.sqrt(640**2 + 360**2)) |
smoothed_ssim_scores = smooth_curve(ssim_scores, window_size=window_size) |
smoothed_orb_scores = smooth_curve(orb_scores, window_size=window_size) |
combined_scores = (0.3 * orb_scores) + (0.7 * ssim_scores) |
smoothed_combined_scores = (0.3 * smoothed_orb_scores) + (0.7 * smoothed_ssim_scores) |
adjusted_timestamps = timestamps[window_size // 2 : -(window_size // 2)] |
fast_motion_timestamps = [] |
fast_motion_frames = [] |
fast_motion_mags = [] |
ids = [] |
for i in range(len(combined_scores)): |
if combined_scores[i] > motion_threshold: |
fast_motion_mags.append(combined_scores[i]) |
fast_motion_timestamps.append(timestamps[i]) |
fast_motion_frames.append(frame_list[i]) |
ids.append(i) |
padded_fast_motion_frames = [] |
padded_fast_motion_timestamps = [] |
if len(ids) < 5 and len(ids) > 0: |
padded_fast_motion_frames.extend(frame_list[min(ids) - 2:min(ids)]) |
padded_fast_motion_timestamps.extend(timestamps[min(ids) - 2:min(ids)]) |
padded_fast_motion_frames.extend(fast_motion_frames) |
padded_fast_motion_timestamps.extend(fast_motion_timestamps) |
padded_fast_motion_frames.extend(frame_list[max(ids) + 1:max(ids) + 3]) |
padded_fast_motion_timestamps.extend(timestamps[max(ids) + 1:max(ids) + 3]) |
print(f"padded_fast_motion_timestamps are {padded_fast_motion_timestamps}. Length of padded_fast_motion_timestamps is {len(padded_fast_motion_frames)}") |
else: |
padded_fast_motion_frames = fast_motion_frames |
padded_fast_motion_timestamps = fast_motion_timestamps |
plt.figure(figsize=(12, 6)) |
plt.plot(adjusted_timestamps, smoothed_orb_scores, label='ORB Distance') |
plt.plot(adjusted_timestamps, smoothed_ssim_scores, label='Inverted SSIM') |
plt.plot(adjusted_timestamps, smoothed_combined_scores, label='Combined Score') |
plt.axhline(y=motion_threshold, color='r', linestyle='--', label='Threshold') |
plt.xlabel('Frame') |
plt.ylabel('Normalized Score') |
plt.title('Motion Detection Metrics') |
plt.legend() |
plt.savefig(f"{output_dir}/motion_detection_plot_smoothened_{video_path.split('/')[-1].split('.')[0]}.png") |
plt.figure(figsize=(12, 6)) |
plt.plot(timestamps, ssim_scores, label='Inverted SSIM') |
plt.plot(timestamps, combined_scores, label='Combined Score') |
plt.axhline(y=motion_threshold, color='r', linestyle='--', label='Threshold') |
plt.xlabel('Frame') |
plt.ylabel('Normalized Score') |
plt.title('Motion Detection Metrics') |
plt.legend() |
plt.savefig(f"{output_dir}/motion_detection_plot_raw_{video_path.split('/')[-1].split('.')[0]}.png") |
print(f"Max motion score is {np.max(combined_scores)} and mean motion score is {np.mean(combined_scores)} from {np.min(timestamps)} to {np.max(timestamps)}") |
print(f"Detected {len(fast_motion_timestamps)} frames when step = {step}.") |
try: |
print(f"fast motion between {np.min(fast_motion_timestamps)} and {np.max(fast_motion_timestamps)}") |
except: |
pass |
if len(fast_motion_timestamps) == 0: |
return [], [] |
elif len(fast_motion_timestamps) > 0.5 * len(combined_scores): |
print("More than half of the video has fast motion") |
return fast_motion_timestamps, padded_fast_motion_frames |
else: |
timestamp_clusters = find_timestamp_clusters(fast_motion_timestamps, min_time_gap = 5) |
for timestamp_cluster in timestamp_clusters: |
print(f"min time : {np.min(timestamp_cluster)} max time : {np.max(timestamp_cluster)} length : {len(timestamp_cluster)}") |
return timestamp_clusters, padded_fast_motion_frames |
''' |
# Open the video file |
video_path = "../test_videos/" |
mp4_files = [f for f in os.listdir(video_path) if f.endswith('.mp4')] |
output_dir = "motion_detection_results" |
os.system(f"rm -rf {output_dir}") |
os.system(f"mkdir {output_dir}") |
end_time = 15 |
start_time = 0 |
for mp4_file in mp4_files: |
print(f"\nAnalyzing video {mp4_file}") |
if mp4_file == "8.mp4": |
end_time = 60 |
start_time = 0 |
elif mp4_file == "6.mp4": |
end_time = 32 |
start_time = 0 |
elif mp4_file == "3.mp4": |
end_time = 6.5 #To remove last few frames that are blurry |
start_time = 0 |
elif mp4_file == "2.mp4": |
end_time = 182 |
start_time = 140 |
else: |
end_time = 15 |
start_time = 0 |
#if mp4_file != "3.mp4" and mp4_file != "5.mp4" and mp4_file != "6.mp4": |
# continue |
start = time.time() |
fast_motion_timestamps = detect_fast_motion(video_path + mp4_file, output_dir, end_time, start_time, motion_threshold = 1.5) |
end = time.time() |
print(f"Execution time for {mp4_file} : {end - start} seconds. Duration of the video was {end_time - start_time} seconds") |
''' |