|
import gradio as gr |
|
|
|
""" |
|
===================================================== |
|
Optical Flow: Predicting movement with the RAFT model |
|
===================================================== |
|
|
|
Optical flow is the task of predicting movement between two images, usually two |
|
consecutive frames of a video. Optical flow models take two images as input, and |
|
predict a flow: the flow indicates the displacement of every single pixel in the |
|
first image, and maps it to its corresponding pixel in the second image. Flows |
|
are (2, H, W)-dimensional tensors, where the first axis corresponds to the |
|
predicted horizontal and vertical displacements. |
|
|
|
The following example illustrates how torchvision can be used to predict flows |
|
using our implementation of the RAFT model. We will also see how to convert the |
|
predicted flows to RGB images for visualization. |
|
""" |
|
|
|
import cv2 |
|
import numpy as np |
|
import os |
|
import sys |
|
import torch |
|
import matplotlib.pyplot as plt |
|
import torchvision.transforms.functional as F |
|
from torchvision.io import read_video |
|
from torchvision.models.optical_flow import Raft_Large_Weights |
|
from torchvision.models.optical_flow import raft_large |
|
from torchvision.io import write_jpeg |
|
import torchvision.transforms as T |
|
|
|
import tempfile |
|
from pathlib import Path |
|
from urllib.request import urlretrieve |
|
import tensorflow as tf |
|
from scipy.interpolate import interp2d |
|
from imageio import imread, imwrite |
|
from flowio import readFlowFile |
|
|
|
def write_flo(flow, filename): |
|
""" |
|
Write optical flow in Middlebury .flo format |
|
|
|
:param flow: optical flow map |
|
:param filename: optical flow file path to be saved |
|
:return: None |
|
|
|
from https://github.com/liruoteng/OpticalFlowToolkit/ |
|
|
|
""" |
|
|
|
flow = flow.cpu().data.numpy() |
|
flow = flow.astype(np.float32) |
|
f = open(filename, 'wb') |
|
magic = np.array([202021.25], dtype=np.float32) |
|
(height, width) = flow.shape[0:2] |
|
w = np.array([width], dtype=np.int32) |
|
h = np.array([height], dtype=np.int32) |
|
magic.tofile(f) |
|
w.tofile(f) |
|
h.tofile(f) |
|
flow.tofile(f) |
|
f.close() |
|
|
|
def warpImage(im, vx, vy, cast_uint8=True): |
|
''' |
|
function to warp images with different dimensions |
|
''' |
|
|
|
height2, width2, nChannels = im.shape |
|
height1, width1 = vx.shape |
|
|
|
x = np.linspace(1, width2, width2) |
|
y = np.linspace(1, height2, height2) |
|
X = np.linspace(1, width1, width1) |
|
Y = np.linspace(1, height1, height1) |
|
xx, yy = np.meshgrid(x, y) |
|
XX, YY = np.meshgrid(X, Y) |
|
|
|
XX = np.concatenate([XX, vx], axis = 1) |
|
|
|
YY = np.concatenate([YY, vy], axis = 1) |
|
mask = (XX < 1) | (XX > width2) | (YY < 1) | (YY > height2) |
|
XX = np.clip(XX, 1, width2) |
|
YY = np.clip(XX, 1, height2) |
|
|
|
warpI2 = np.zeros((height1, width1, nChannels)) |
|
for i in range(nChannels): |
|
f = interp2d(x, y, im[:, :, i], 'cubic') |
|
foo = f(X, Y) |
|
foo[mask] = 0.6 |
|
warpI2[:, :, i] = foo |
|
|
|
mask = 1 - mask |
|
|
|
if cast_uint8: |
|
warpI2 = warpI2.astype(np.uint8) |
|
|
|
return warpI2, mask |
|
|
|
|
|
def get_warp_res(fname_image, fname_flow, fname_output='warped.png'): |
|
print(f"FNAME IMAGE: {fname_image}") |
|
im2 = imread(fname_image) |
|
print(f"FNAME IMAGE READED: {im2}") |
|
flow = fname_flow.cpu().detach().numpy() |
|
im_warped, _ = warpImage(im2, flow[:, :, 0], flow[:, :, 1]) |
|
imwrite(fname_output, im_warped) |
|
|
|
def infer(): |
|
video_url = "https://download.pytorch.org/tutorial/pexelscom_pavel_danilyuk_basketball_hd.mp4" |
|
video_path = Path(tempfile.mkdtemp()) / "basketball.mp4" |
|
_ = urlretrieve(video_url, video_path) |
|
|
|
frames, _, _ = read_video(str(video_path), output_format="TCHW") |
|
print(f"FRAME BEFORE: {frames[100]}") |
|
img1_batch = torch.stack([frames[100]]) |
|
img2_batch = torch.stack([frames[101]]) |
|
print(f"FRAME AFTER: {img1_batch}") |
|
weights = Raft_Large_Weights.DEFAULT |
|
transforms = weights.transforms() |
|
|
|
|
|
def preprocess(img1_batch, img2_batch): |
|
img1_batch = F.resize(img1_batch, size=[520, 960]) |
|
img2_batch = F.resize(img2_batch, size=[520, 960]) |
|
return transforms(img1_batch, img2_batch) |
|
|
|
|
|
img1_batch, img2_batch = preprocess(img1_batch, img2_batch) |
|
|
|
print(f"shape = {img1_batch.shape}, dtype = {img1_batch.dtype}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
model = raft_large(weights=Raft_Large_Weights.DEFAULT, progress=False).to(device) |
|
model = model.eval() |
|
|
|
list_of_flows = model(img1_batch.to(device), img2_batch.to(device)) |
|
print(f"type = {type(list_of_flows)}") |
|
print(f"length = {len(list_of_flows)} = number of iterations of the model") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
predicted_flows = list_of_flows[-1] |
|
print(f"dtype = {predicted_flows.dtype}") |
|
print(f"shape = {predicted_flows.shape} = (N, 2, H, W)") |
|
print(f"min = {predicted_flows.min()}, max = {predicted_flows.max()}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from torchvision.utils import flow_to_image |
|
|
|
|
|
|
|
|
|
|
|
predicted_flow = list_of_flows[-1][0] |
|
flow_img = flow_to_image(predicted_flow).to("cpu") |
|
|
|
write_jpeg(flow_img, f"predicted_flow.jpg") |
|
input_image = flow_to_image(frames[100]).to("cpu") |
|
write_jpeg(input_image, f"frame_input.jpg") |
|
flo_file = write_flo(predicted_flow, "flofile.flo") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return "done", "predicted_flow.jpg", ["flofile.flo"], 'frame_input.jpg' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gr.Interface(fn=infer, inputs=[], outputs=[gr.Textbox(), gr.Image(), gr.Files(), gr.Image()]).launch() |