|
import gradio as gr |
|
|
|
""" |
|
===================================================== |
|
Optical Flow: Predicting movement with the RAFT model |
|
===================================================== |
|
|
|
Optical flow is the task of predicting movement between two images, usually two |
|
consecutive frames of a video. Optical flow models take two images as input, and |
|
predict a flow: the flow indicates the displacement of every single pixel in the |
|
first image, and maps it to its corresponding pixel in the second image. Flows |
|
are (2, H, W)-dimensional tensors, where the first axis corresponds to the |
|
predicted horizontal and vertical displacements. |
|
|
|
The following example illustrates how torchvision can be used to predict flows |
|
using our implementation of the RAFT model. We will also see how to convert the |
|
predicted flows to RGB images for visualization. |
|
""" |
|
|
|
import cv2 |
|
import numpy as np |
|
import os |
|
import sys |
|
import torch |
|
from PIL import Image |
|
import matplotlib.pyplot as plt |
|
import torchvision.transforms.functional as F |
|
from torchvision.io import read_video, read_image |
|
from torchvision.models.optical_flow import Raft_Large_Weights |
|
from torchvision.models.optical_flow import raft_large |
|
from torchvision.io import write_jpeg |
|
import torchvision.transforms as T |
|
|
|
import tempfile |
|
from pathlib import Path |
|
from urllib.request import urlretrieve |
|
import tensorflow as tf |
|
from scipy.interpolate import LinearNDInterpolator |
|
from imageio import imread, imwrite |
|
from flowio import readFlowFile |
|
|
|
def write_flo(flow, filename): |
|
""" |
|
Write optical flow in Middlebury .flo format |
|
|
|
:param flow: optical flow map |
|
:param filename: optical flow file path to be saved |
|
:return: None |
|
|
|
from https://github.com/liruoteng/OpticalFlowToolkit/ |
|
|
|
""" |
|
|
|
flow = flow.cpu().data.numpy() |
|
flow = flow.astype(np.float32) |
|
f = open(filename, 'wb') |
|
magic = np.array([202021.25], dtype=np.float32) |
|
(height, width) = flow.shape[0:2] |
|
w = np.array([width], dtype=np.int32) |
|
h = np.array([height], dtype=np.int32) |
|
magic.tofile(f) |
|
w.tofile(f) |
|
h.tofile(f) |
|
flow.tofile(f) |
|
f.close() |
|
|
|
|
|
|
|
def infer(): |
|
video_url = "https://download.pytorch.org/tutorial/pexelscom_pavel_danilyuk_basketball_hd.mp4" |
|
video_path = Path(tempfile.mkdtemp()) / "basketball.mp4" |
|
_ = urlretrieve(video_url, video_path) |
|
|
|
frames, _, _ = read_video(str(video_path), output_format="TCHW") |
|
print(f"FRAME BEFORE stack: {frames[100]}") |
|
|
|
input_frame_1 = read_image(str("./frame1.jpg"), ImageReadMode = ImageReadMode.UNCHANGED) |
|
input_frame_2 = read_image(str("./frame2.jpg"), ImageReadMode = ImageReadMode.UNCHANGED) |
|
|
|
img1_batch = torch.stack([frames[100]]) |
|
img2_batch = torch.stack([frames[101]]) |
|
|
|
print(f"FRAME AFTER stack: {img1_batch}") |
|
|
|
weights = Raft_Large_Weights.DEFAULT |
|
transforms = weights.transforms() |
|
|
|
|
|
def preprocess(img1_batch, img2_batch): |
|
img1_batch = F.resize(img1_batch, size=[520, 960]) |
|
img2_batch = F.resize(img2_batch, size=[520, 960]) |
|
return transforms(img1_batch, img2_batch) |
|
|
|
|
|
img1_batch, img2_batch = preprocess(img1_batch, img2_batch) |
|
|
|
print(f"shape = {img1_batch.shape}, dtype = {img1_batch.dtype}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
model = raft_large(weights=Raft_Large_Weights.DEFAULT, progress=False).to(device) |
|
model = model.eval() |
|
|
|
list_of_flows = model(img1_batch.to(device), img2_batch.to(device)) |
|
print(f"list_of_flows type = {type(list_of_flows)}") |
|
print(f"list_of_flows length = {len(list_of_flows)} = number of iterations of the model") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
predicted_flows = list_of_flows[-1] |
|
print(f"predicted_flows dtype = {predicted_flows.dtype}") |
|
print(f"predicted_flows shape = {predicted_flows.shape} = (N, 2, H, W)") |
|
print(f"predicted_flows min = {predicted_flows.min()}, predicted_flows max = {predicted_flows.max()}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from torchvision.utils import flow_to_image |
|
|
|
|
|
|
|
|
|
|
|
predicted_flow = list_of_flows[-1][0] |
|
print(f"predicted flow dtype = {predicted_flow.dtype}") |
|
print(f"predicted flow shape = {predicted_flow.shape}") |
|
|
|
flow_img = flow_to_image(predicted_flow).to("cpu") |
|
write_jpeg(flow_img, f"predicted_flow.jpg") |
|
|
|
flo_file = write_flo(predicted_flow, "flofile.flo") |
|
|
|
|
|
transform = T.ToPILImage() |
|
|
|
|
|
img = transform(frames[101]) |
|
img = img.resize((960, 520)) |
|
|
|
|
|
frame2pil = np.array(img.convert('RGB')) |
|
print(f"frame1pil: {frame2pil}") |
|
print(f"frame1pil shape: {frame2pil.shape}") |
|
print(f"frame1pil dtype: {frame2pil.dtype}") |
|
img.save('raw_frame2.jpg') |
|
|
|
|
|
numpy_array_flow = predicted_flow.permute(1, 2, 0).detach().cpu().numpy() |
|
print(f"numpy_array_flow: {numpy_array_flow}") |
|
print(f"numpy_array_flow shape: {numpy_array_flow.shape}") |
|
print(f"numpy_array_flow dtype: {numpy_array_flow.dtype}") |
|
|
|
h, w = numpy_array_flow.shape[:2] |
|
numpy_array_flow = numpy_array_flow.copy() |
|
numpy_array_flow[:, :, 0] + np.arange(w) |
|
numpy_array_flow[:, :, 1] + np.arange(h)[:, np.newaxis] |
|
|
|
|
|
numpy_array_flow*1. |
|
|
|
|
|
res = cv2.remap(frame2pil, numpy_array_flow, None, cv2.INTER_LANCZOS4) |
|
print(res) |
|
|
|
res = Image.fromarray(res) |
|
res.save('wraped.jpg') |
|
|
|
blend2 = Image.open('frame_input.jpg') |
|
blend2 = Image.blend(res,blend2,0.5) |
|
blend2.save("blended2.jpg") |
|
|
|
return "done", "predicted_flow.jpg", ["flofile.flo"], 'raw_frame2.jpg', 'wraped.jpg', "blended2.jpg" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gr.Interface(fn=infer, inputs=[], outputs=[gr.Textbox(), gr.Image(), gr.Files(), gr.Image(), gr.Image(), gr.Image()]).launch() |