import cv2 import time import numpy as np import onnx import onnxruntime import os os.system('pip install --upgrade --force-reinstall onnxruntime') # Ref: https://github.com/liruoteng/OpticalFlowToolkit/blob/5cf87b947a0032f58c922bbc22c0afb30b90c418/lib/flowlib.py#L249 import numpy as np UNKNOWN_FLOW_THRESH = 1e7 def make_color_wheel(): """ Generate color wheel according Middlebury color code :return: Color wheel """ RY = 15 YG = 6 GC = 4 CB = 11 BM = 13 MR = 6 ncols = RY + YG + GC + CB + BM + MR colorwheel = np.zeros([ncols, 3]) col = 0 # RY colorwheel[0:RY, 0] = 255 colorwheel[0:RY, 1] = np.transpose(np.floor(255*np.arange(0, RY) / RY)) col += RY # YG colorwheel[col:col+YG, 0] = 255 - np.transpose(np.floor(255*np.arange(0, YG) / YG)) colorwheel[col:col+YG, 1] = 255 col += YG # GC colorwheel[col:col+GC, 1] = 255 colorwheel[col:col+GC, 2] = np.transpose(np.floor(255*np.arange(0, GC) / GC)) col += GC # CB colorwheel[col:col+CB, 1] = 255 - np.transpose(np.floor(255*np.arange(0, CB) / CB)) colorwheel[col:col+CB, 2] = 255 col += CB # BM colorwheel[col:col+BM, 2] = 255 colorwheel[col:col+BM, 0] = np.transpose(np.floor(255*np.arange(0, BM) / BM)) col += + BM # MR colorwheel[col:col+MR, 2] = 255 - np.transpose(np.floor(255 * np.arange(0, MR) / MR)) colorwheel[col:col+MR, 0] = 255 return colorwheel colorwheel = make_color_wheel() def compute_color(u, v): """ compute optical flow color map :param u: optical flow horizontal map :param v: optical flow vertical map :return: optical flow in color code """ [h, w] = u.shape img = np.zeros([h, w, 3]) nanIdx = np.isnan(u) | np.isnan(v) u[nanIdx] = 0 v[nanIdx] = 0 ncols = np.size(colorwheel, 0) rad = np.sqrt(u**2+v**2) a = np.arctan2(-v, -u) / np.pi fk = (a+1) / 2 * (ncols - 1) + 1 k0 = np.floor(fk).astype(int) k1 = k0 + 1 k1[k1 == ncols+1] = 1 f = fk - k0 for i in range(0, np.size(colorwheel,1)): tmp = colorwheel[:, i] col0 = tmp[k0-1] / 255 col1 = tmp[k1-1] / 255 col = (1-f) * col0 + f * col1 idx = rad <= 1 col[idx] = 1-rad[idx]*(1-col[idx]) notidx = np.logical_not(idx) col[notidx] *= 0.75 img[:, :, i] = np.uint8(np.floor(255 * col*(1-nanIdx))) return img def flow_to_image(flow): """ Convert flow into middlebury color code image :param flow: optical flow map :return: optical flow image in middlebury color """ u = flow[:, :, 0] v = flow[:, :, 1] maxu = -999. maxv = -999. minu = 999. minv = 999. idxUnknow = (abs(u) > UNKNOWN_FLOW_THRESH) | (abs(v) > UNKNOWN_FLOW_THRESH) u[idxUnknow] = 0 v[idxUnknow] = 0 maxu = max(maxu, np.max(u)) minu = min(minu, np.min(u)) maxv = max(maxv, np.max(v)) minv = min(minv, np.min(v)) rad = np.sqrt(u ** 2 + v ** 2) maxrad = max(-1, np.max(rad)) u = u/(maxrad + np.finfo(float).eps) v = v/(maxrad + np.finfo(float).eps) img = compute_color(u, v) idx = np.repeat(idxUnknow[:, :, np.newaxis], 3, axis=2) img[idx] = 0 return np.uint8(img) class Raft(): def __init__(self, model_path): # Initialize model self.initialize_model(model_path) def __call__(self, img1, img2): return self.estimate_flow(img1, img2) def initialize_model(self, model_path): self.session = onnxruntime.InferenceSession(model_path, providers=['CUDAExecutionProvider', 'CPUExecutionProvider']) # Get model info self.get_input_details() self.get_output_details() def estimate_flow(self, img1, img2): input_tensor1 = self.prepare_input(img1) input_tensor2 = self.prepare_input(img2) outputs = self.inference(input_tensor1, input_tensor2) self.flow_map = self.process_output(outputs) return self.flow_map def prepare_input(self, img): img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) self.img_height, self.img_width = img.shape[:2] img_input = cv2.resize(img, (self.input_width,self.input_height)) # img_input = img_input/255 img_input = img_input.transpose(2, 0, 1) img_input = img_input[np.newaxis,:,:,:] return img_input.astype(np.float32) def inference(self, input_tensor1, input_tensor2): # start = time.time() outputs = self.session.run(self.output_names, {self.input_names[0]: input_tensor1, self.input_names[1]: input_tensor2}) # print(time.time() - start) return outputs def process_output(self, output): flow_map = output[1][0].transpose(1, 2, 0) return flow_map def draw_flow(self): # Convert flow to image flow_img = flow_to_image(self.flow_map) # Convert to BGR flow_img = cv2.cvtColor(flow_img, cv2.COLOR_RGB2BGR) # Resize the depth map to match the input image shape return cv2.resize(flow_img, (self.img_width,self.img_height)) def get_input_details(self): model_inputs = self.session.get_inputs() self.input_names = [model_inputs[i].name for i in range(len(model_inputs))] self.input_shape = model_inputs[0].shape self.input_height = self.input_shape[2] self.input_width = self.input_shape[3] def get_output_details(self): model_outputs = self.session.get_outputs() self.output_names = [model_outputs[i].name for i in range(len(model_outputs))] self.output_shape = model_outputs[0].shape self.output_height = self.output_shape[2] self.output_width = self.output_shape[3] if __name__ == '__main__': from imread_from_url import imread_from_url # Initialize model model_path='raft_small_iter10_240x320.onnx' flow_estimator = Raft(model_path) # Read inference image img1 = imread_from_url("https://github.com/princeton-vl/RAFT/blob/master/demo-frames/frame_0016.png?raw=true") img2 = imread_from_url("https://github.com/princeton-vl/RAFT/blob/master/demo-frames/frame_0025.png?raw=true") # Estimate flow and colorize it flow_map = flow_estimator(img1, img2) flow_img = flow_estimator.draw_flow() combined_img = np.hstack((img1, img2, flow_img)) #cv2.namedWindow("Estimated flow", cv2.WINDOW_NORMAL) #cv2.imshow("Estimated flow", combined_img) #cv2.waitKey(0) import os import cv2 import gradio as gr import yt_dlp def download_youtube_video(youtube_url, output_filename): ydl_opts = { 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', 'outtmpl': output_filename, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([youtube_url]) def process_video(youtube_url, start_time, flow_frame_offset): model_path = 'models/raft_small_iter10_240x320.onnx' flow_estimator = Raft(model_path) output_filename = 'downloaded_video.mp4' processed_output = 'processed_video.mp4' # Download video if os.path.exists(output_filename): os.remove(output_filename) download_youtube_video(youtube_url, output_filename) cap = cv2.VideoCapture(output_filename) if not cap.isOpened(): return "Error: Could not open video." frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) fourcc = cv2.VideoWriter_fourcc(*'XVID') out = cv2.VideoWriter(processed_output, fourcc, fps, (frame_width, frame_height)) cap.set(cv2.CAP_PROP_POS_FRAMES, start_time * fps) frame_list = [] frame_num = 0 while cap.isOpened(): ret, prev_frame = cap.read() if not ret: break frame_list.append(prev_frame) frame_num += 1 if frame_num <= flow_frame_offset: continue flow_map = flow_estimator(frame_list[0], frame_list[-1]) flow_img = flow_estimator.draw_flow() alpha = 0.5 combined_img = cv2.addWeighted(frame_list[0], alpha, flow_img, (1 - alpha), 0) if combined_img is None: break out.write(combined_img) frame_list.pop(0) cap.release() out.release() return processed_output examples = [ ["https://www.youtube.com/watch?v=is38pqgbj6A", 5, 50, "output_1.mp4"], ["https://www.youtube.com/watch?v=AdbrfoxiAtk", 0, 60, "output_2.mp4"], ["https://www.youtube.com/watch?v=vWGg0iPmI8k", 13, 70, "output_3.mp4"], ] with gr.Blocks() as app: gr.Markdown("### Optical Flow Video Processing\n" "Enter a YouTube URL, set the start time and flow frame offset, " "then click 'Process Video' to see the optical flow processing.") with gr.Row(): with gr.Column(): youtube_url = gr.Textbox(label="YouTube URL", placeholder="Enter YouTube Video URL Here") start_time = gr.Slider(minimum=0, maximum=60, label="Start Time (seconds)", step=1) flow_frame_offset = gr.Slider(minimum=1, maximum=100, label="Flow Frame Offset", step=1) submit_button = gr.Button("Process Video") with gr.Column(): output_video = gr.Video(label="Processed Video") submit_button.click( fn=process_video, inputs=[youtube_url, start_time, flow_frame_offset], outputs=output_video ) gr.Examples(examples=examples, inputs=[youtube_url, start_time, flow_frame_offset], fn=process_video, outputs=output_video, cache_examples=False) app.launch()