ICON / lib /pymaf /utils /streamer.py
Yuliang's picture
done
2d5f249
import cv2
import torch
import numpy as np
import imageio
def aug_matrix(w1, h1, w2, h2):
dx = (w2 - w1) / 2.0
dy = (h2 - h1) / 2.0
matrix_trans = np.array([[1.0, 0, dx],
[0, 1.0, dy],
[0, 0, 1.0]])
scale = np.min([float(w2)/w1, float(h2)/h1])
M = get_affine_matrix(
center=(w2 / 2.0, h2 / 2.0),
translate=(0, 0),
scale=scale)
M = np.array(M + [0., 0., 1.]).reshape(3, 3)
M = M.dot(matrix_trans)
return M
def get_affine_matrix(center, translate, scale):
cx, cy = center
tx, ty = translate
M = [1, 0, 0,
0, 1, 0]
M = [x * scale for x in M]
# Apply translation and of center translation: RSS * C^-1
M[2] += M[0] * (-cx) + M[1] * (-cy)
M[5] += M[3] * (-cx) + M[4] * (-cy)
# Apply center translation: T * C * RSS * C^-1
M[2] += cx + tx
M[5] += cy + ty
return M
class BaseStreamer():
"""This streamer will return images at 512x512 size.
"""
def __init__(self,
width=512, height=512, pad=True,
mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5),
**kwargs):
self.width = width
self.height = height
self.pad = pad
self.mean = np.array(mean)
self.std = np.array(std)
self.loader = self.create_loader()
def create_loader(self):
raise NotImplementedError
yield np.zeros((600, 400, 3)) # in RGB (0, 255)
def __getitem__(self, index):
image = next(self.loader)
in_height, in_width, _ = image.shape
M = aug_matrix(in_width, in_height, self.width, self.height, self.pad)
image = cv2.warpAffine(
image, M[0:2, :], (self.width, self.height), flags=cv2.INTER_CUBIC)
input = np.float32(image)
input = (input / 255.0 - self.mean) / self.std # TO [-1.0, 1.0]
input = input.transpose(2, 0, 1) # TO [3 x H x W]
return torch.from_numpy(input).float()
def __len__(self):
raise NotImplementedError
class CaptureStreamer(BaseStreamer):
"""This streamer takes webcam as input.
"""
def __init__(self, id=0, width=512, height=512, pad=True, **kwargs):
super().__init__(width, height, pad, **kwargs)
self.capture = cv2.VideoCapture(id)
def create_loader(self):
while True:
_, image = self.capture.read()
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # RGB
yield image
def __len__(self):
return 100_000_000
def __del__(self):
self.capture.release()
class VideoListStreamer(BaseStreamer):
"""This streamer takes a list of video files as input.
"""
def __init__(self, files, width=512, height=512, pad=True, **kwargs):
super().__init__(width, height, pad, **kwargs)
self.files = files
self.captures = [imageio.get_reader(f) for f in files]
self.nframes = sum([int(cap._meta["fps"] * cap._meta["duration"])
for cap in self.captures])
def create_loader(self):
for capture in self.captures:
for image in capture: # RGB
yield image
def __len__(self):
return self.nframes
def __del__(self):
for capture in self.captures:
capture.close()
class ImageListStreamer(BaseStreamer):
"""This streamer takes a list of image files as input.
"""
def __init__(self, files, width=512, height=512, pad=True, **kwargs):
super().__init__(width, height, pad, **kwargs)
self.files = files
def create_loader(self):
for f in self.files:
image = cv2.imread(f, cv2.IMREAD_UNCHANGED)[:, :, 0:3]
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # RGB
yield image
def __len__(self):
return len(self.files)