import ultralytics from ultralytics import YOLO import torch import numpy as np import pandas as pd from tqdm import tqdm from matplotlib import pyplot as plt import cv2 import warnings import ffmpeg warnings.filterwarnings("ignore") device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') def segment(video_path: str, model_path: str, start: int, fstep: int, crop: list) -> tuple: """ runs YOLO segmentation model and calculates the LV Area """ model = YOLO(model_path)#.to(device) cap = cv2.VideoCapture(video_path) stop = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) lv_area = [] frames = [] message = 'Video processing succeeded' for fr in tqdm(range(start, stop, fstep), desc=f'processing ECHO'): cap.set(cv2.CAP_PROP_POS_FRAMES, fr) _, frame = cap.read() new_w = int(frame.shape[1] * crop[1]) new_h = int(frame.shape[0] * crop[0]) new_left = int(frame.shape[1] / 2 + crop[3] * new_w - new_w / 2) new_top = int(frame.shape[0] / 2 + crop[2] * new_h - new_h / 2) frame = frame[new_top:new_top + new_h, new_left:new_left + new_w] frame_m = frame inputs = frame #torch.Tensor(frame).to(device) with torch.no_grad(): result = model(inputs, verbose=False) result = result#.to('cpu') classes = result[0].names if len(classes) == 0: pass overlay = frame.copy() color_list = [(255, 0, 0), (255, 255, 0), (255, 0, 255), (0, 255, 0), (0, 0, 255), (128, 128, 128)] for i, res in enumerate(result[0]): bx = res.boxes m = res.masks.xy label = int(bx.cls.squeeze().cpu()) if label == 1: lv_area.append(cv2.contourArea(m[0])) box = list(map(int, bx.xyxy.squeeze().cpu().tolist())) cv2.rectangle(overlay, (box[0], box[1]), (box[2], box[3]), (36, 255, 12), 2) cv2.putText(overlay, classes[label], (box[0], box[1] - 5), cv2.FONT_HERSHEY_TRIPLEX, 1, (0, 0, 255), 2) cv2.fillPoly(overlay, pts=np.int32([m]), color=color_list[i % 6]) alpha = 0.4 frame_m = cv2.addWeighted(overlay, alpha, frame, 1 - alpha, 0) frames.append(frame_m) if len(lv_area) == 0: message = 'Video processing failed' return lv_area, frames, message def plotter(lv_data: list, window: int) -> tuple: """ plots the rolling mean graph for LV area. calculates the average ejection fracture """ lv_rolling = pd.Series(lv_data).rolling(window=window).mean().dropna() ef = (max(lv_rolling) - min(lv_rolling)) / max(lv_rolling) dataframe = pd.DataFrame({ 'Frame': np.array(range(len(lv_rolling))), 'Left ventricle visible area, px*px': lv_rolling.values }).astype('int32') txt = f'Ejection fraction - {ef:.1%}' return dataframe, txt def writer(fn, images, framerate=25, vcodec='libx264'): if not isinstance(images, np.ndarray): images = np.asarray(images) n, height, width, channels = images.shape process = ( ffmpeg .input('pipe:', format='rawvideo', pix_fmt='rgb24', s='{}x{}'.format(width, height)) .output(fn, pix_fmt='yuv420p', vcodec=vcodec, r=framerate) .overwrite_output() .run_async(pipe_stdin=True) ) for frame in images: process.stdin.write( frame.astype(np.uint8).tobytes() ) process.stdin.close() process.wait()