File size: 5,482 Bytes
01664b3
 
5ceacf4
 
01664b3
 
 
20c01c5
01664b3
 
5ceacf4
01664b3
 
 
 
 
6d737eb
01664b3
 
 
 
3c7feee
01664b3
 
 
 
6d737eb
01664b3
5ceacf4
 
01664b3
 
5ceacf4
 
01664b3
 
 
 
 
 
 
 
 
5ceacf4
 
 
 
 
 
 
 
 
 
 
 
 
3c7feee
5ceacf4
 
 
 
 
 
 
01664b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5ceacf4
01664b3
 
5ceacf4
6d737eb
5ceacf4
 
6d737eb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01664b3
6d737eb
 
 
 
01664b3
 
5ceacf4
01664b3
 
 
 
 
 
 
 
 
 
 
 
 
 
5ceacf4
01664b3
 
 
 
 
 
 
5ceacf4
01664b3
 
 
 
5ceacf4
01664b3
 
5ceacf4
 
01664b3
 
 
 
 
 
 
 
 
 
 
 
6d737eb
01664b3
 
 
 
 
 
 
 
6d737eb
 
 
 
 
 
 
 
 
 
 
 
01664b3
 
 
 
6d737eb
 
 
20c01c5
01664b3
6d737eb
01664b3
20c01c5
5ceacf4
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import os
import re
import shutil
import time
from types import SimpleNamespace
from typing import Any

import gradio as gr
import numpy as np
from detectron2 import engine
from PIL import Image

from inference import main, setup_cfg

# internal settings
NUM_PROCESSES = 1
CROP = False
SCORE_THRESHOLD = 0.8
MAX_PARTS = 5
ARGS = SimpleNamespace(
    config_file="configs/coco/instance-segmentation/swin/opd_v1_real.yaml",
    model=".data/models/motion_state_pred_opdformerp_rgb.pth",
    input_format="RGB",
    output=".output",
    cpu=True,
)
NUM_SAMPLES = 10

outputs = []


def predict(rgb_image: str, depth_image: str, intrinsics: np.ndarray, num_samples: int) -> list[Any]:
    global outputs

    def find_gifs(path: str) -> list[str]:
        """Scrape folders for all generated gif files."""
        for file in os.listdir(path):
            sub_path = os.path.join(path, file)
            if os.path.isdir(sub_path):
                for image_file in os.listdir(sub_path):
                    if re.match(r".*\.gif$", image_file):
                        yield os.path.join(sub_path, image_file)

    def find_images(path: str) -> list[str]:
        """Scrape folders for all generated gif files."""
        images = {}
        for file in os.listdir(path):
            sub_path = os.path.join(path, file)
            if os.path.isdir(sub_path):
                images[file] = []
                for image_file in sorted(os.listdir(sub_path)):
                    if re.match(r".*\.png$", image_file):
                        images[file].append(os.path.join(sub_path, image_file))
        return images

    # clear old predictions
    os.makedirs(ARGS.output, exist_ok=True)
    for path in os.listdir(ARGS.output):
        full_path = os.path.join(ARGS.output, path)
        if os.path.isdir(full_path):
            shutil.rmtree(full_path)
        else:
            os.remove(full_path)

    cfg = setup_cfg(ARGS)

    engine.launch(
        main,
        NUM_PROCESSES,
        args=(
            cfg,
            rgb_image,
            depth_image,
            intrinsics,
            num_samples,
            CROP,
            SCORE_THRESHOLD,
        ),
    )

    # process output
    # TODO: may want to select these in decreasing order of score
    image_files = find_images(ARGS.output)
    outputs = []
    for count, part in enumerate(image_files):
        if count < MAX_PARTS:
            outputs.append([Image.open(im) for im in image_files[part]])

    return [
        *[gr.update(value=out[0], visible=True) for out in outputs],
        *[gr.update(visible=False) for _ in range(MAX_PARTS - len(outputs))],
    ]


def get_trigger(idx: int, fps: int = 40, oscillate: bool = True):
    def iter_images(*args, **kwargs):
        if idx < len(outputs):
            for im in outputs[idx]:
                time.sleep(1.0 / fps)
                yield im
            if oscillate:
                for im in reversed(outputs[idx]):
                    time.sleep(1.0 / fps)
                    yield im

        else:
            raise ValueError("Could not find any images to load into this module.")

    return iter_images


with gr.Blocks() as demo:
    gr.Markdown(
        """
    # OPDMulti Demo
    Upload an image to see its range of motion.
    """
    )

    # TODO: add gr.Examples

    with gr.Row():
        rgb_image = gr.Image(
            image_mode="RGB", source="upload", type="filepath", label="RGB Image", show_label=True, interactive=True
        )
        depth_image = gr.Image(
            image_mode="I;16", source="upload", type="filepath", label="Depth Image", show_label=True, interactive=True
        )

    intrinsics = gr.Dataframe(
        value=[
            [
                214.85935872395834,
                0.0,
                125.90160319010417,
            ],
            [
                0.0,
                214.85935872395834,
                95.13726399739583,
            ],
            [
                0.0,
                0.0,
                1.0,
            ],
        ],
        row_count=(3, "fixed"),
        col_count=(3, "fixed"),
        datatype="number",
        type="numpy",
        label="Intrinsics matrix",
        show_label=True,
        interactive=True,
    )
    num_samples = gr.Number(
        value=NUM_SAMPLES,
        label="Number of samples",
        show_label=True,
        interactive=True,
        precision=0,
        minimum=3,
        maximum=20,
    )

    examples = gr.Examples(
        examples=[
            ["examples/59-4860.png", "examples/59-4860_d.png"],
            ["examples/174-8460.png", "examples/174-8460_d.png"],
            ["examples/187-0.png", "examples/187-0_d.png"],
            ["examples/187-23040.png", "examples/187-23040_d.png"],
        ],
        inputs=[rgb_image, depth_image],
        api_name=False,
        examples_per_page=2,
    )

    submit_btn = gr.Button("Run model")

    # TODO: do we want to set a maximum limit on how many parts we render? We could also show the number of components
    # identified.
    images = [gr.Image(type="pil", label=f"Part {idx + 1}", visible=False) for idx in range(MAX_PARTS)]
    for idx, image_comp in enumerate(images):
        image_comp.select(get_trigger(idx), inputs=[], outputs=image_comp, api_name=False)

    submit_btn.click(
        fn=predict, inputs=[rgb_image, depth_image, intrinsics, num_samples], outputs=images, api_name=False
    )

demo.queue(api_open=False)
demo.launch()