File size: 4,791 Bytes
01664b3
 
5ceacf4
 
01664b3
 
 
20c01c5
01664b3
 
5ceacf4
01664b3
 
 
 
 
5ceacf4
01664b3
 
 
 
5ceacf4
01664b3
 
 
 
 
5ceacf4
 
01664b3
 
5ceacf4
 
01664b3
 
 
 
 
 
 
 
 
5ceacf4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01664b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5ceacf4
01664b3
 
5ceacf4
 
 
 
 
 
 
 
01664b3
5ceacf4
01664b3
 
5ceacf4
01664b3
 
 
 
 
 
 
 
 
 
 
 
 
 
5ceacf4
01664b3
 
 
 
 
 
 
5ceacf4
01664b3
 
 
 
5ceacf4
01664b3
 
5ceacf4
 
01664b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5ceacf4
 
20c01c5
01664b3
 
5ceacf4
01664b3
20c01c5
5ceacf4
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import os
import re
import shutil
import time
from types import SimpleNamespace
from typing import Any

import gradio as gr
import numpy as np
from detectron2 import engine
from PIL import Image

from inference import main, setup_cfg

# internal settings
NUM_PROCESSES = 1
CROP = True
SCORE_THRESHOLD = 0.8
MAX_PARTS = 5
ARGS = SimpleNamespace(
    config_file="configs/coco/instance-segmentation/swin/opd_v1_real.yaml",
    model="../data/models/motion_state_pred_opdformerp_rgb.pth",
    input_format="RGB",
    output=".output",
    cpu=True,
)

outputs = []


def predict(rgb_image: str, depth_image: str, intrinsics: np.ndarray, num_samples: int) -> list[Any]:
    global outputs

    def find_gifs(path: str) -> list[str]:
        """Scrape folders for all generated gif files."""
        for file in os.listdir(path):
            sub_path = os.path.join(path, file)
            if os.path.isdir(sub_path):
                for image_file in os.listdir(sub_path):
                    if re.match(r".*\.gif$", image_file):
                        yield os.path.join(sub_path, image_file)

    def find_images(path: str) -> list[str]:
        """Scrape folders for all generated gif files."""
        images = {}
        for file in os.listdir(path):
            sub_path = os.path.join(path, file)
            if os.path.isdir(sub_path):
                images[file] = []
                for image_file in sorted(os.listdir(sub_path)):
                    if re.match(r".*\.png$", image_file):
                        images[file].append(os.path.join(sub_path, image_file))
        return images

    def get_generator(images):
        def gen():
            while True:
                for im in images:
                    time.sleep(0.025)
                    yield im
                time.sleep(3)

        return gen

    # clear old predictions
    for path in os.listdir(ARGS.output):
        full_path = os.path.join(ARGS.output, path)
        if os.path.isdir(full_path):
            shutil.rmtree(full_path)
        else:
            os.remove(full_path)

    cfg = setup_cfg(ARGS)

    engine.launch(
        main,
        NUM_PROCESSES,
        args=(
            cfg,
            rgb_image,
            depth_image,
            intrinsics,
            num_samples,
            CROP,
            SCORE_THRESHOLD,
        ),
    )

    # process output
    # TODO: may want to select these in decreasing order of score
    image_files = find_images(ARGS.output)
    output = []
    for count, part in enumerate(image_files):
        if count < MAX_PARTS:
            # output.append(gr.update(value=get_generator([Image.open(im) for im in image_files[part]]), visible=True))
            output.append(get_generator([Image.open(im) for im in image_files[part]]))
    # while len(output) < MAX_PARTS:
    #     output.append(gr.update(visible=False))

    yield from output[0]()


with gr.Blocks() as demo:
    gr.Markdown(
        """
    # OPDMulti Demo
    Upload an image to see its range of motion.
    """
    )

    # TODO: add gr.Examples

    with gr.Row():
        rgb_image = gr.Image(
            image_mode="RGB", source="upload", type="filepath", label="RGB Image", show_label=True, interactive=True
        )
        depth_image = gr.Image(
            image_mode="I;16", source="upload", type="filepath", label="Depth Image", show_label=True, interactive=True
        )

    intrinsics = gr.Dataframe(
        value=[
            [
                214.85935872395834,
                0.0,
                125.90160319010417,
            ],
            [
                0.0,
                214.85935872395834,
                95.13726399739583,
            ],
            [
                0.0,
                0.0,
                1.0,
            ],
        ],
        row_count=(3, "fixed"),
        col_count=(3, "fixed"),
        datatype="number",
        type="numpy",
        label="Intrinsics matrix",
        show_label=True,
        interactive=True,
    )
    num_samples = gr.Number(
        value=10,
        label="Number of samples",
        show_label=True,
        interactive=True,
        precision=0,
        minimum=3,
        maximum=20,
    )

    submit_btn = gr.Button("Run model")

    # TODO: do we want to set a maximum limit on how many parts we render? We could also show the number of components
    # identified.
    # images = [gr.Image(type="pil", label=f"Part {idx + 1}", visible=False) for idx in range(MAX_PARTS)]
    image = gr.Image(type="pil", visible=True)

    # TODO: maybe need to use a queue here so we don't overload the instance
    submit_btn.click(
        fn=predict, inputs=[rgb_image, depth_image, intrinsics, num_samples], outputs=image, api_name="run_model"
    )

demo.queue(api_open=False)
demo.launch()