|
|
import cv2 |
|
|
import numpy as np |
|
|
import os |
|
|
|
|
|
from CPR_Module.Common.logging_config import cpr_logger |
|
|
|
|
|
class WarningsOverlayer: |
|
|
def __init__(self): |
|
|
|
|
|
self.DRAWER_CONFIG = { |
|
|
"base_position": (0.05, 0.15), |
|
|
"vertical_spacing": 0.06 |
|
|
} |
|
|
|
|
|
|
|
|
self.WARNING_CONFIG = { |
|
|
|
|
|
"Right arm bent!": {"color": (52, 110, 235)}, |
|
|
"Left arm bent!": {"color": (52, 110, 235)}, |
|
|
"Left hand not on chest!": {"color": (161, 127, 18)}, |
|
|
"Right hand not on chest!": {"color": (161, 127, 18)}, |
|
|
"Both hands not on chest!": {"color": (161, 127, 18)}, |
|
|
|
|
|
|
|
|
"Depth too low!": {"color": (125, 52, 235)}, |
|
|
"Depth too high!": {"color": (125, 52, 235)}, |
|
|
"Rate too slow!": {"color": (235, 52, 214)}, |
|
|
"Rate too fast!": {"color": (235, 52, 214)} |
|
|
} |
|
|
|
|
|
def add_warnings_to_processed_video(self, video_output_path, sampling_interval_frames, rate_and_depth_warnings, posture_warnings): |
|
|
"""Process both warning types with identical handling""" |
|
|
cpr_logger.info("\n[POST-PROCESS] Starting warning overlay") |
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(video_output_path) |
|
|
if not cap.isOpened(): |
|
|
cpr_logger.info("[ERROR] Failed to open processed video") |
|
|
return |
|
|
|
|
|
|
|
|
original_fourcc = int(cap.get(cv2.CAP_PROP_FOURCC)) |
|
|
processed_fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
|
|
|
|
|
|
base = os.path.splitext(video_output_path)[0] |
|
|
final_path = os.path.abspath(f"{base}_final.mp4") |
|
|
writer = cv2.VideoWriter(final_path, original_fourcc, processed_fps, (width, height)) |
|
|
|
|
|
|
|
|
all_warnings = [] |
|
|
|
|
|
for entry in posture_warnings: |
|
|
if warnings := entry.get('posture_warnings'): |
|
|
start = entry['start_frame'] // sampling_interval_frames |
|
|
end = entry['end_frame'] // sampling_interval_frames |
|
|
all_warnings.append((int(start), int(end), warnings)) |
|
|
|
|
|
for entry in rate_and_depth_warnings: |
|
|
if warnings := entry.get('rate_and_depth_warnings'): |
|
|
start = entry['start_frame'] // sampling_interval_frames |
|
|
end = entry['end_frame'] // sampling_interval_frames |
|
|
all_warnings.append((int(start), int(end), warnings)) |
|
|
|
|
|
|
|
|
os.makedirs("screenshots", exist_ok=True) |
|
|
screenshot_counts = {} |
|
|
|
|
|
|
|
|
skip_warnings = { |
|
|
"Depth too low!", |
|
|
"Depth too high!", |
|
|
"Rate too slow!", |
|
|
"Rate too fast!" |
|
|
} |
|
|
|
|
|
|
|
|
frame_idx = 0 |
|
|
while True: |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
active_warnings = [] |
|
|
for start, end, warnings in all_warnings: |
|
|
if start <= frame_idx <= end: |
|
|
active_warnings.extend(warnings) |
|
|
|
|
|
|
|
|
self._draw_warnings(frame, active_warnings) |
|
|
|
|
|
for warning_text in set(active_warnings): |
|
|
if warning_text in skip_warnings: |
|
|
continue |
|
|
count = screenshot_counts.get(warning_text, 0) |
|
|
if count < 2: |
|
|
screenshot_path = os.path.join("screenshots", f"{warning_text.replace(' ', '_')}_{frame_idx}.jpg") |
|
|
cv2.imwrite(screenshot_path, frame) |
|
|
screenshot_counts[warning_text] = count + 1 |
|
|
|
|
|
writer.write(frame) |
|
|
frame_idx += 1 |
|
|
|
|
|
cap.release() |
|
|
writer.release() |
|
|
cpr_logger.info(f"\n[POST-PROCESS] Final output saved to: {final_path}") |
|
|
|
|
|
def _draw_warnings(self, frame, active_warnings): |
|
|
"""Draw all warnings in a single vertical drawer""" |
|
|
frame_height = frame.shape[0] |
|
|
frame_width = frame.shape[1] |
|
|
|
|
|
|
|
|
base_x = int(self.DRAWER_CONFIG["base_position"][0] * frame_width) |
|
|
current_y = int(self.DRAWER_CONFIG["base_position"][1] * frame_height) |
|
|
|
|
|
|
|
|
y_spacing = int(self.DRAWER_CONFIG["vertical_spacing"] * frame_height) |
|
|
|
|
|
|
|
|
for warning_text in active_warnings: |
|
|
if color := self.WARNING_CONFIG.get(warning_text, {}).get("color"): |
|
|
|
|
|
self._draw_warning_banner( |
|
|
frame=frame, |
|
|
text=warning_text, |
|
|
color=color, |
|
|
position=(base_x, current_y)) |
|
|
|
|
|
|
|
|
current_y += y_spacing |
|
|
|
|
|
def _draw_warning_banner(self, frame, text, color, position): |
|
|
"""Base drawing function for warning banners""" |
|
|
(text_width, text_height), _ = cv2.getTextSize( |
|
|
text, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2) |
|
|
|
|
|
x, y = position |
|
|
|
|
|
cv2.rectangle(frame, |
|
|
(x - 10, y - text_height - 10), |
|
|
(x + text_width + 10, y + 10), |
|
|
color, -1) |
|
|
|
|
|
cv2.putText(frame, text, (x, y), |
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) |
|
|
|