File size: 2,254 Bytes
899c526
 
 
 
 
a8c8616
899c526
 
a8c8616
899c526
 
 
 
 
 
 
 
a8c8616
 
 
 
 
 
 
 
 
 
 
899c526
 
 
 
 
 
 
 
 
a8c8616
899c526
a8c8616
 
899c526
 
 
 
 
 
 
 
 
 
a8c8616
899c526
 
a8c8616
 
 
899c526
a8c8616
899c526
 
 
 
a8c8616
899c526
 
 
 
a8c8616
 
899c526
 
a8c8616
899c526
 
a8c8616
 
899c526
 
 
 
 
a8c8616
 
 
 
899c526
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import cv2
import numpy as np
from pathlib import Path
from itertools import chain
from multiprocessing import Queue
import mmcv


def load_calib(calib: str) -> np.ndarray:
    calib = np.loadtxt(calib, delimiter=" ")
    fx, fy, cx, cy = calib[:4]

    K = np.eye(3)
    K[0, 0] = fx
    K[0, 2] = cx
    K[1, 1] = fy
    K[1, 2] = cy
    return K, calib


def image_stream(
    queue: Queue, imagedir: str, calib: str | None, stride: int, skip: int = 0
) -> None:
    """image generator"""

    if calib is not None:
        K, calib = load_calib(calib)
        fx, fy, cx, cy = K[0, 0], K[1, 1], K[0, 2], K[1, 2]

    img_exts = ["*.png", "*.jpeg", "*.jpg"]
    image_list = sorted(chain.from_iterable(Path(imagedir).glob(e) for e in img_exts))[
        skip::stride
    ]

    for t, imfile in enumerate(image_list):
        image = cv2.imread(str(imfile))

        if calib is not None:
            intrinsics = np.array([fx, fy, cx, cy])
        else:
            intrinsics = None

        h, w, _ = image.shape
        image = image[: h - h % 16, : w - w % 16]

        queue.put((t, image, intrinsics))

    queue.put((-1, image, intrinsics))


def video_stream(
    queue: Queue, imagedir: str, calib: str | None, stride: int, skip: int = 0
) -> None:
    """video generator"""
    if calib is not None:
        K, calib = load_calib(calib)
        fx, fy, cx, cy = K[0, 0], K[1, 1], K[0, 2], K[1, 2]

    video_reader = mmcv.VideoReader(imagedir)

    t = 0

    for _ in range(skip):
        image = video_reader.read()

    while True:
        # Capture frame-by-frame
        for _ in range(stride):
            image = video_reader.read()
            if image is None:
                break

        if image is None:
            break

        # if len(calib) > 4:
        #     image = cv2.undistort(image, K, calib[4:])

        image = cv2.resize(image, None, fx=0.5, fy=0.5, interpolation=cv2.INTER_AREA)
        h, w, _ = image.shape
        image = image[: h - h % 16, : w - w % 16]

        if calib is not None:
            intrinsics = np.array([fx * 0.5, fy * 0.5, cx * 0.5, cy * 0.5])
        else:
            intrinsics = None
        queue.put((t, image, intrinsics))

        t += 1

    queue.put((-1, image, intrinsics))