Spaces:
Runtime error
Runtime error
| # Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import pathlib | |
| import tempfile | |
| import numpy as np | |
| from pytriton.decorators import TritonContext, batch | |
| from pytriton.model_config.tensor import Tensor | |
| from pytriton.model_config.triton_model_config import TensorSpec | |
| from pytriton.models.manager import ModelManager | |
| from pytriton.models.model import Model, ModelConfig | |
| from pytriton.proxy.communication import TensorStore | |
| from pytriton.proxy.types import Request | |
| from pytriton.utils.workspace import Workspace | |
| def test_get_model_config_return_model_config_when_minimal_required_data(tmp_path): | |
| def infer_func(inputs): | |
| return inputs | |
| triton_context = TritonContext() | |
| workspace = Workspace(tmp_path / "workspace") | |
| model = Model( | |
| model_name="simple", | |
| model_version=2, | |
| inference_fn=infer_func, | |
| inputs=[ | |
| Tensor(dtype=np.float32, shape=(-1,)), | |
| Tensor(dtype=np.float32, shape=(-1,)), | |
| ], | |
| outputs=[ | |
| Tensor(dtype=np.int32, shape=(-1,)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| model_config = model._get_triton_model_config() | |
| assert model_config.model_name == "simple" | |
| assert model_config.model_version == 2 | |
| assert model_config.batching is True | |
| assert model_config.max_batch_size == 128 | |
| assert model_config.inputs == [ | |
| TensorSpec(name="INPUT_1", dtype=np.float32, shape=(-1,)), | |
| TensorSpec(name="INPUT_2", dtype=np.float32, shape=(-1,)), | |
| ] | |
| assert model_config.outputs == [ | |
| TensorSpec(name="OUTPUT_1", dtype=np.int32, shape=(-1,)), | |
| ] | |
| ipc_socket_path = workspace.path / "ipc_proxy_backend_simple" | |
| assert model_config.backend_parameters == { | |
| "shared-memory-socket": f"ipc://{ipc_socket_path.as_posix()}", | |
| } | |
| def test_get_model_config_return_model_config_when_custom_names(): | |
| def infer_func(inputs): | |
| return inputs | |
| triton_context = TritonContext() | |
| with tempfile.TemporaryDirectory() as tempdir: | |
| tempdir = pathlib.Path(tempdir) | |
| workspace = Workspace(tempdir / "workspace") | |
| model = Model( | |
| model_name="simple", | |
| model_version=2, | |
| inference_fn=infer_func, | |
| inputs=[ | |
| Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
| Tensor(name="variable2", dtype=np.float32().dtype, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="factorials", dtype=np.int32().dtype, shape=(-1,)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| model_config = model._get_triton_model_config() | |
| assert model_config.model_name == "simple" | |
| assert model_config.model_version == 2 | |
| assert model_config.batching is True | |
| assert model_config.max_batch_size == 128 | |
| assert model_config.inputs == [ | |
| TensorSpec(name="variable1", dtype=object, shape=(2, 1)), | |
| TensorSpec(name="variable2", dtype=np.float32, shape=(2, 1)), | |
| ] | |
| assert model_config.outputs == [ | |
| TensorSpec(name="factorials", dtype=np.int32, shape=(-1,)), | |
| ] | |
| def test_generate_model_create_model_store(): | |
| def infer_func(inputs): | |
| return inputs | |
| triton_context = TritonContext() | |
| with tempfile.TemporaryDirectory() as tempdir: | |
| tempdir = pathlib.Path(tempdir) | |
| workspace = Workspace(tempdir / "workspace") | |
| model = Model( | |
| model_name="simple", | |
| model_version=2, | |
| inference_fn=infer_func, | |
| inputs=[ | |
| Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
| Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| with tempfile.TemporaryDirectory() as tempdir: | |
| model_repository = pathlib.Path(tempdir) / "model_repository" | |
| model_repository.mkdir() | |
| model.generate_model(model_repository) | |
| assert (model_repository / "simple").is_dir() | |
| assert (model_repository / "simple" / "config.pbtxt").is_file() | |
| assert (model_repository / "simple" / "2").is_dir() | |
| assert (model_repository / "simple" / "2" / "model.py").is_file() | |
| def test_generate_models_with_same_names_and_different_versions_create_model_store(): | |
| def infer_func(inputs): | |
| return inputs | |
| triton_context = TritonContext() | |
| with tempfile.TemporaryDirectory() as tempdir: | |
| tempdir = pathlib.Path(tempdir) | |
| workspace = Workspace(tempdir / "workspace") | |
| model1 = Model( | |
| model_name="simple", | |
| model_version=1, | |
| inference_fn=infer_func, | |
| inputs=[ | |
| Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
| Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| model2 = Model( | |
| model_name="simple", | |
| model_version=2, | |
| inference_fn=infer_func, | |
| inputs=[ | |
| Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
| Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| with tempfile.TemporaryDirectory() as tempdir: | |
| model_repository = pathlib.Path(tempdir) / "model_repository" | |
| model_repository.mkdir() | |
| model1.generate_model(model_repository) | |
| model2.generate_model(model_repository) | |
| assert (model_repository / "simple").is_dir() | |
| assert (model_repository / "simple" / "config.pbtxt").is_file() | |
| assert (model_repository / "simple" / "1").is_dir() | |
| assert (model_repository / "simple" / "1" / "model.py").is_file() | |
| assert (model_repository / "simple" / "2").is_dir() | |
| assert (model_repository / "simple" / "2" / "model.py").is_file() | |
| def test_setup_create_proxy_backend_connection(tmp_path): | |
| def infer_func(inputs): | |
| return inputs | |
| triton_context = TritonContext() | |
| workspace = Workspace(tmp_path / "workspace") | |
| tensor_store = TensorStore(workspace.path / "data_store.sock") | |
| model = Model( | |
| model_name="simple", | |
| model_version=2, | |
| inference_fn=infer_func, | |
| inputs=[ | |
| Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
| Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| try: | |
| tensor_store.start() | |
| model.setup() | |
| assert len(model._inference_handlers) == 1 | |
| finally: | |
| model.clean() | |
| tensor_store.close() | |
| def test_setup_can_be_called_multiple_times(tmp_path): | |
| def infer_func(inputs): | |
| return inputs | |
| triton_context = TritonContext() | |
| workspace = Workspace(tmp_path / "workspace") | |
| tensor_store = TensorStore(workspace.path / "data_store.sock") | |
| model = Model( | |
| model_name="simple", | |
| model_version=2, | |
| inference_fn=infer_func, | |
| inputs=[ | |
| Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
| Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| try: | |
| tensor_store.start() | |
| model.setup() | |
| assert len(model._inference_handlers) == 1 | |
| python_backend1 = model._inference_handlers[0] | |
| assert python_backend1 is not None | |
| model.setup() | |
| assert len(model._inference_handlers) == 1 | |
| python_backend2 = model._inference_handlers[0] | |
| assert python_backend2 is not None | |
| assert python_backend1 == python_backend2 | |
| finally: | |
| model.clean() | |
| tensor_store.close() | |
| def test_clean_remove_proxy_backend_connection(tmp_path): | |
| def infer_func(inputs): | |
| return inputs | |
| triton_context = TritonContext() | |
| workspace = Workspace(tmp_path / "workspace") | |
| tensor_store = TensorStore(workspace.path / "data_store.sock") | |
| model = Model( | |
| model_name="simple", | |
| model_version=2, | |
| inference_fn=infer_func, | |
| inputs=[ | |
| Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
| Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| try: | |
| tensor_store.start() | |
| model.setup() | |
| finally: | |
| model.clean() | |
| tensor_store.close() | |
| assert len(model._inference_handlers) == 0 | |
| def test_clean_can_be_called_multiple_times(tmp_path): | |
| def infer_func(inputs): | |
| return inputs | |
| triton_context = TritonContext() | |
| workspace = Workspace(tmp_path / "workspace") | |
| tensor_store = TensorStore(workspace.path / "data_store.sock") | |
| model = Model( | |
| model_name="simple", | |
| model_version=2, | |
| inference_fn=infer_func, | |
| inputs=[ | |
| Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
| Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| try: | |
| tensor_store.start() | |
| model.setup() | |
| model.clean() | |
| model.clean() | |
| assert len(model._inference_handlers) == 0 | |
| finally: | |
| tensor_store.close() | |
| def test_is_alive_return_false_when_model_not_setup(tmp_path): | |
| def infer_func(inputs): | |
| return inputs | |
| triton_context = TritonContext() | |
| with tempfile.TemporaryDirectory() as tempdir: | |
| tempdir = pathlib.Path(tempdir) | |
| workspace = Workspace(tempdir / "workspace") | |
| model = Model( | |
| model_name="simple", | |
| model_version=2, | |
| inference_fn=infer_func, | |
| inputs=[ | |
| Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
| Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| assert not model.is_alive() | |
| def test_is_alive_return_true_when_model_is_setup(tmp_path): | |
| def infer_func(inputs): | |
| return inputs | |
| triton_context = TritonContext() | |
| workspace = Workspace(tmp_path / "workspace") | |
| tensor_store = TensorStore(workspace.path / "data_store.sock") | |
| model = Model( | |
| model_name="simple", | |
| model_version=2, | |
| inference_fn=infer_func, | |
| inputs=[ | |
| Tensor(name="variable1", dtype=object, shape=(2, 1)), | |
| Tensor(name="variable2", dtype=np.float32, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="factorials", dtype=np.int32, shape=(-1,)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| try: | |
| tensor_store.start() | |
| model.setup() | |
| assert model.is_alive() | |
| assert len(model._inference_handlers) == 1 | |
| finally: | |
| model.clean() | |
| tensor_store.close() | |
| def test_triton_context_injection(tmp_path): | |
| class Multimodel: | |
| def infer1(self, variable1): | |
| return [variable1] | |
| def infer2(self, variable2): | |
| return [variable2] | |
| m = Multimodel() | |
| def infer_func(variable3): | |
| return [variable3] | |
| triton_context = TritonContext() | |
| workspace = Workspace(tmp_path / "workspace") | |
| tensor_store = TensorStore(workspace.path / "data_store.sock") | |
| tensor_store.start() | |
| model1 = Model( | |
| model_name="simple1", | |
| model_version=1, | |
| inference_fn=m.infer1, | |
| inputs=[ | |
| Tensor(name="variable1", dtype=np.int32, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="out1", dtype=np.int32, shape=(2, 1)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| model2 = Model( | |
| model_name="simple2", | |
| model_version=1, | |
| inference_fn=m.infer2, | |
| inputs=[ | |
| Tensor(name="variable2", dtype=np.int32, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="out2", dtype=np.int32, shape=(2, 1)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| model3 = Model( | |
| model_name="simple3", | |
| model_version=1, | |
| inference_fn=infer_func, | |
| inputs=[ | |
| Tensor(name="variable3", dtype=np.int32, shape=(2, 1)), | |
| ], | |
| outputs=[ | |
| Tensor(name="out3", dtype=np.int32, shape=(2, 1)), | |
| ], | |
| config=ModelConfig(max_batch_size=128, batching=True), | |
| workspace=workspace, | |
| triton_context=triton_context, | |
| strict=False, | |
| ) | |
| manager = ModelManager("") | |
| try: | |
| manager.add_model(model1) | |
| model1.setup() | |
| manager.add_model(model2) | |
| model2.setup() | |
| manager.add_model(model3) | |
| model3.setup() | |
| input_requests1 = [Request({"variable1": np.array([[7, 5], [8, 6]])}, {})] | |
| input_requests2 = [Request({"variable2": np.array([[1, 2], [1, 2], [11, 12]])}, {})] | |
| input_requests3 = [Request({"variable3": np.array([[1, 2]])}, {})] | |
| def assert_inputs_properly_mapped_to_outputs(expected_out_name, outputs, input_request_arr): | |
| assert len(outputs) == 1 | |
| assert expected_out_name in outputs[0] | |
| assert outputs[0][expected_out_name].shape == input_request_arr.shape | |
| assert np.array_equal(outputs[0][expected_out_name], input_request_arr) | |
| outputs1 = m.infer1(input_requests1) | |
| assert_inputs_properly_mapped_to_outputs("out1", outputs1, input_requests1[0]["variable1"]) | |
| outputs2 = m.infer2(input_requests2) | |
| assert_inputs_properly_mapped_to_outputs("out2", outputs2, input_requests2[0]["variable2"]) | |
| outputs3 = infer_func(input_requests3) | |
| assert_inputs_properly_mapped_to_outputs("out3", outputs3, input_requests3[0]["variable3"]) | |
| outputs1 = m.infer1(input_requests1) | |
| assert_inputs_properly_mapped_to_outputs("out1", outputs1, input_requests1[0]["variable1"]) | |
| outputs3 = infer_func(input_requests3) | |
| assert_inputs_properly_mapped_to_outputs("out3", outputs3, input_requests3[0]["variable3"]) | |
| finally: | |
| manager.clean() | |
| tensor_store.close() | |