| |
|
|
| import os |
| import sys |
| import tempfile |
| from contextlib import ExitStack, contextmanager |
| from copy import deepcopy |
| from unittest import mock |
| import torch |
| from torch import nn |
|
|
| |
| import detectron2 |
| from detectron2.structures import Boxes, Instances |
| from detectron2.utils.env import _import_file |
|
|
| _counter = 0 |
|
|
|
|
| def _clear_jit_cache(): |
| from torch.jit._recursive import concrete_type_store |
| from torch.jit._state import _jit_caching_layer |
|
|
| concrete_type_store.type_store.clear() |
| _jit_caching_layer.clear() |
|
|
|
|
| def _add_instances_conversion_methods(newInstances): |
| """ |
| Add from_instances methods to the scripted Instances class. |
| """ |
| cls_name = newInstances.__name__ |
|
|
| @torch.jit.unused |
| def from_instances(instances: Instances): |
| """ |
| Create scripted Instances from original Instances |
| """ |
| fields = instances.get_fields() |
| image_size = instances.image_size |
| ret = newInstances(image_size) |
| for name, val in fields.items(): |
| assert hasattr(ret, f"_{name}"), f"No attribute named {name} in {cls_name}" |
| setattr(ret, name, deepcopy(val)) |
| return ret |
|
|
| newInstances.from_instances = from_instances |
|
|
|
|
| @contextmanager |
| def patch_instances(fields): |
| """ |
| A contextmanager, under which the Instances class in detectron2 is replaced |
| by a statically-typed scriptable class, defined by `fields`. |
| See more in `scripting_with_instances`. |
| """ |
|
|
| with tempfile.TemporaryDirectory(prefix="detectron2") as dir, tempfile.NamedTemporaryFile( |
| mode="w", encoding="utf-8", suffix=".py", dir=dir, delete=False |
| ) as f: |
| try: |
| |
| |
| _clear_jit_cache() |
|
|
| cls_name, s = _gen_instance_module(fields) |
| f.write(s) |
| f.flush() |
| f.close() |
|
|
| module = _import(f.name) |
| new_instances = getattr(module, cls_name) |
| _ = torch.jit.script(new_instances) |
| |
| Instances.__torch_script_class__ = True |
| |
| Instances._jit_override_qualname = torch._jit_internal._qualified_name(new_instances) |
|
|
| _add_instances_conversion_methods(new_instances) |
| yield new_instances |
| finally: |
| try: |
| del Instances.__torch_script_class__ |
| del Instances._jit_override_qualname |
| except AttributeError: |
| pass |
| sys.modules.pop(module.__name__) |
|
|
|
|
| def _gen_instance_class(fields): |
| """ |
| Args: |
| fields (dict[name: type]) |
| """ |
|
|
| class _FieldType: |
| def __init__(self, name, type_): |
| assert isinstance(name, str), f"Field name must be str, got {name}" |
| self.name = name |
| self.type_ = type_ |
| self.annotation = f"{type_.__module__}.{type_.__name__}" |
|
|
| fields = [_FieldType(k, v) for k, v in fields.items()] |
|
|
| def indent(level, s): |
| return " " * 4 * level + s |
|
|
| lines = [] |
|
|
| global _counter |
| _counter += 1 |
|
|
| cls_name = "ScriptedInstances{}".format(_counter) |
|
|
| field_names = tuple(x.name for x in fields) |
| extra_args = ", ".join([f"{f.name}: Optional[{f.annotation}] = None" for f in fields]) |
| lines.append( |
| f""" |
| class {cls_name}: |
| def __init__(self, image_size: Tuple[int, int], {extra_args}): |
| self.image_size = image_size |
| self._field_names = {field_names} |
| """ |
| ) |
|
|
| for f in fields: |
| lines.append( |
| indent(2, f"self._{f.name} = torch.jit.annotate(Optional[{f.annotation}], {f.name})") |
| ) |
|
|
| for f in fields: |
| lines.append( |
| f""" |
| @property |
| def {f.name}(self) -> {f.annotation}: |
| # has to use a local for type refinement |
| # https://pytorch.org/docs/stable/jit_language_reference.html#optional-type-refinement |
| t = self._{f.name} |
| assert t is not None, "{f.name} is None and cannot be accessed!" |
| return t |
| |
| @{f.name}.setter |
| def {f.name}(self, value: {f.annotation}) -> None: |
| self._{f.name} = value |
| """ |
| ) |
|
|
| |
| lines.append( |
| """ |
| def __len__(self) -> int: |
| """ |
| ) |
| for f in fields: |
| lines.append( |
| f""" |
| t = self._{f.name} |
| if t is not None: |
| return len(t) |
| """ |
| ) |
| lines.append( |
| """ |
| raise NotImplementedError("Empty Instances does not support __len__!") |
| """ |
| ) |
|
|
| |
| lines.append( |
| """ |
| def has(self, name: str) -> bool: |
| """ |
| ) |
| for f in fields: |
| lines.append( |
| f""" |
| if name == "{f.name}": |
| return self._{f.name} is not None |
| """ |
| ) |
| lines.append( |
| """ |
| return False |
| """ |
| ) |
|
|
| |
| none_args = ", None" * len(fields) |
| lines.append( |
| f""" |
| def to(self, device: torch.device) -> "{cls_name}": |
| ret = {cls_name}(self.image_size{none_args}) |
| """ |
| ) |
| for f in fields: |
| if hasattr(f.type_, "to"): |
| lines.append( |
| f""" |
| t = self._{f.name} |
| if t is not None: |
| ret._{f.name} = t.to(device) |
| """ |
| ) |
| else: |
| |
| |
| pass |
| lines.append( |
| """ |
| return ret |
| """ |
| ) |
|
|
| |
| none_args = ", None" * len(fields) |
| lines.append( |
| f""" |
| def __getitem__(self, item) -> "{cls_name}": |
| ret = {cls_name}(self.image_size{none_args}) |
| """ |
| ) |
| for f in fields: |
| lines.append( |
| f""" |
| t = self._{f.name} |
| if t is not None: |
| ret._{f.name} = t[item] |
| """ |
| ) |
| lines.append( |
| """ |
| return ret |
| """ |
| ) |
|
|
| |
| |
| none_args = ", None" * len(fields) |
| lines.append( |
| f""" |
| def cat(self, instances: List["{cls_name}"]) -> "{cls_name}": |
| ret = {cls_name}(self.image_size{none_args}) |
| """ |
| ) |
| for f in fields: |
| lines.append( |
| f""" |
| t = self._{f.name} |
| if t is not None: |
| values: List[{f.annotation}] = [x.{f.name} for x in instances] |
| if torch.jit.isinstance(t, torch.Tensor): |
| ret._{f.name} = torch.cat(values, dim=0) |
| else: |
| ret._{f.name} = t.cat(values) |
| """ |
| ) |
| lines.append( |
| """ |
| return ret""" |
| ) |
|
|
| |
| lines.append( |
| """ |
| def get_fields(self) -> Dict[str, Tensor]: |
| ret = {} |
| """ |
| ) |
| for f in fields: |
| if f.type_ == Boxes: |
| stmt = "t.tensor" |
| elif f.type_ == torch.Tensor: |
| stmt = "t" |
| else: |
| stmt = f'assert False, "unsupported type {str(f.type_)}"' |
| lines.append( |
| f""" |
| t = self._{f.name} |
| if t is not None: |
| ret["{f.name}"] = {stmt} |
| """ |
| ) |
| lines.append( |
| """ |
| return ret""" |
| ) |
| return cls_name, os.linesep.join(lines) |
|
|
|
|
| def _gen_instance_module(fields): |
| |
| s = """ |
| from copy import deepcopy |
| import torch |
| from torch import Tensor |
| import typing |
| from typing import * |
| |
| import detectron2 |
| from detectron2.structures import Boxes, Instances |
| |
| """ |
|
|
| cls_name, cls_def = _gen_instance_class(fields) |
| s += cls_def |
| return cls_name, s |
|
|
|
|
| def _import(path): |
| return _import_file( |
| "{}{}".format(sys.modules[__name__].__name__, _counter), path, make_importable=True |
| ) |
|
|
|
|
| @contextmanager |
| def patch_builtin_len(modules=()): |
| """ |
| Patch the builtin len() function of a few detectron2 modules |
| to use __len__ instead, because __len__ does not convert values to |
| integers and therefore is friendly to tracing. |
| |
| Args: |
| modules (list[stsr]): names of extra modules to patch len(), in |
| addition to those in detectron2. |
| """ |
|
|
| def _new_len(obj): |
| return obj.__len__() |
|
|
| with ExitStack() as stack: |
| MODULES = [ |
| "detectron2.modeling.roi_heads.fast_rcnn", |
| "detectron2.modeling.roi_heads.mask_head", |
| "detectron2.modeling.roi_heads.keypoint_head", |
| ] + list(modules) |
| ctxs = [stack.enter_context(mock.patch(mod + ".len")) for mod in MODULES] |
| for m in ctxs: |
| m.side_effect = _new_len |
| yield |
|
|
|
|
| def patch_nonscriptable_classes(): |
| """ |
| Apply patches on a few nonscriptable detectron2 classes. |
| Should not have side-effects on eager usage. |
| """ |
| |
| |
|
|
| from detectron2.modeling.backbone import ResNet, FPN |
|
|
| |
| |
| |
|
|
| def prepare_resnet(self): |
| ret = deepcopy(self) |
| ret.stages = nn.ModuleList(ret.stages) |
| for k in self.stage_names: |
| delattr(ret, k) |
| return ret |
|
|
| ResNet.__prepare_scriptable__ = prepare_resnet |
|
|
| def prepare_fpn(self): |
| ret = deepcopy(self) |
| ret.lateral_convs = nn.ModuleList(ret.lateral_convs) |
| ret.output_convs = nn.ModuleList(ret.output_convs) |
| for name, _ in self.named_children(): |
| if name.startswith("fpn_"): |
| delattr(ret, name) |
| return ret |
|
|
| FPN.__prepare_scriptable__ = prepare_fpn |
|
|
| |
| |
| from detectron2.modeling.roi_heads import StandardROIHeads |
|
|
| if hasattr(StandardROIHeads, "__annotations__"): |
| |
| StandardROIHeads.__annotations__ = deepcopy(StandardROIHeads.__annotations__) |
| StandardROIHeads.__annotations__["mask_on"] = torch.jit.Final[bool] |
| StandardROIHeads.__annotations__["keypoint_on"] = torch.jit.Final[bool] |
|
|
|
|
| |
| patch_nonscriptable_classes() |
|
|
|
|
| @contextmanager |
| def freeze_training_mode(model): |
| """ |
| A context manager that annotates the "training" attribute of every submodule |
| to constant, so that the training codepath in these modules can be |
| meta-compiled away. Upon exiting, the annotations are reverted. |
| """ |
| classes = {type(x) for x in model.modules()} |
| |
| |
| classes = {x for x in classes if not hasattr(x, "__constants__")} |
| for cls in classes: |
| cls.__annotations__["training"] = torch.jit.Final[bool] |
| yield |
| for cls in classes: |
| cls.__annotations__["training"] = bool |
|
|