File size: 6,703 Bytes
4d7f032
c6d1c21
 
 
4d7f032
eac4eaf
 
c6d1c21
eac4eaf
 
 
c6d1c21
 
ba0637a
 
eac4eaf
ba0637a
 
 
 
c6d1c21
ba0637a
 
 
 
 
 
 
 
 
 
66c1161
c6d1c21
 
eac4eaf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c6d1c21
 
 
 
 
 
c3f3d16
 
 
 
c6d1c21
 
eac4eaf
 
 
c6d1c21
 
 
 
 
 
 
 
 
 
 
c3f3d16
 
c6d1c21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6931664
 
 
 
 
 
c3f3d16
c6d1c21
eac4eaf
 
c6d1c21
eac4eaf
 
 
c3f3d16
 
 
 
eac4eaf
 
 
 
 
 
 
 
 
 
 
 
 
c3f3d16
 
eac4eaf
 
 
c3f3d16
eac4eaf
c6d1c21
 
 
 
 
 
 
 
eac4eaf
c6d1c21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ba0637a
c6d1c21
 
 
ba0637a
c6d1c21
eac4eaf
66c1161
eac4eaf
 
 
 
66c1161
 
 
 
 
 
c6d1c21
eac4eaf
c6d1c21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
import inspect
import json
import os
import pkgutil
from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Any, Dict, List, Union, final

from .dataclass import Dataclass, Field, asdict, fields
from .text_utils import camel_to_snake_case, is_camel_case
from .type_utils import issubtype


class Artifactories(object):
    def __new__(cls):
        if not hasattr(cls, "instance"):
            cls.instance = super(Artifactories, cls).__new__(cls)
            cls.instance.artifactories = []

        return cls.instance

    def __iter__(self):
        return iter(self.artifactories)

    def __next__(self):
        return next(self.artifactories)

    def register_atrifactory(self, artifactory):
        assert isinstance(artifactory, Artifactory), "Artifactory must be an instance of Artifactory"
        assert hasattr(artifactory, "__contains__"), "Artifactory must have __contains__ method"
        assert hasattr(artifactory, "__getitem__"), "Artifactory must have __getitem__ method"
        self.artifactories = [artifactory] + self.artifactories


def map_values_in_place(object, mapper):
    if isinstance(object, dict):
        for key, value in object.items():
            object[key] = mapper(value)
        return object
    if isinstance(object, list):
        for i in range(len(object)):
            object[i] = mapper(object[i])
        return object
    return mapper(object)


class Artifact(Dataclass):
    type: str = Field(default=None, final=True, init=False)

    _class_register = {}

    @classmethod
    def is_artifact_dict(cls, d):
        return isinstance(d, dict) and "type" in d and d["type"] in cls._class_register

    @classmethod
    def get_artifact_type(cls):
        return camel_to_snake_case(cls.__name__)

    @classmethod
    def register_class(cls, artifact_class):
        assert issubclass(
            artifact_class, Artifact
        ), f"Artifact class must be a subclass of Artifact, got {artifact_class}"
        assert is_camel_case(
            artifact_class.__name__
        ), f"Artifact class name must be legal camel case, got {artifact_class.__name__}"

        snake_case_key = camel_to_snake_case(artifact_class.__name__)

        if snake_case_key in cls._class_register:
            assert (
                cls._class_register[snake_case_key] == artifact_class
            ), f"Artifact class name must be unique, {snake_case_key} already exists for {cls._class_register[snake_case_key]}"

            return snake_case_key

        cls._class_register[snake_case_key] = artifact_class

        return snake_case_key

    @classmethod
    def is_artifact_file(cls, path):
        if not os.path.exists(path) or not os.path.isfile(path):
            return False
        with open(path, "r") as f:
            d = json.load(f)
        return cls.is_artifact_dict(d)

    @classmethod
    def _recursive_load(cls, d):
        if isinstance(d, dict):
            new_d = {}
            for key, value in d.items():
                new_d[key] = cls._recursive_load(value)
            d = new_d
        elif isinstance(d, list):
            d = [cls._recursive_load(value) for value in d]
        else:
            pass
        if cls.is_artifact_dict(d):
            instance = cls._class_register[d.pop("type")](**d)
            return instance
        else:
            return d

    @classmethod
    def from_dict(cls, d):
        assert cls.is_artifact_dict(d), "Input must be a dict with type field"
        return cls._recursive_load(d)

    @classmethod
    def load(cls, path):
        try:
            with open(path, "r") as f:
                d = json.load(f)
            assert "type" in d, "Saved artifact must have a type field"
            return cls._recursive_load(d)
        except Exception as e:
            raise Exception(f"{e}\n\nFailed to load artifact from {path} see above for more details.")

    def prepare(self):
        pass

    def verify(self):
        pass

    @final
    def __pre_init__(self, **kwargs):
        self._init_dict = kwargs

    @final
    def __post_init__(self):
        self.type = self.register_class(self.__class__)

        for field in fields(self):
            if issubtype(field.type, Union[Artifact, List[Artifact], Dict[str, Artifact]]):
                value = getattr(self, field.name)
                value = map_values_in_place(value, maybe_recover_artifact)
                setattr(self, field.name, value)

        self.prepare()
        self.verify()

    def _to_raw_dict(self):
        return {"type": self.type, **self._init_dict}

    def save(self, path):
        with open(path, "w") as f:
            init_dict = self.to_dict()
            json.dump(init_dict, f, indent=4)


class ArtifactList(list, Artifact):
    def prepare(self):
        for artifact in self:
            artifact.prepare()


class Artifactory(Artifact):
    @abstractmethod
    def __contains__(self, name: str) -> bool:
        pass

    @abstractmethod
    def __getitem__(self, name) -> Artifact:
        pass


class UnitxtArtifactNotFoundError(Exception):
    def __init__(self, name, artifactories):
        self.name = name
        self.artifactories = artifactories

    def __str__(self):
        return f"Artifact {self.name} does not exist, in artifactories:{self.artifactories}"


def fetch_artifact(name):
    if Artifact.is_artifact_file(name):
        return Artifact.load(name), None
    else:
        for artifactory in Artifactories():
            if name in artifactory:
                return artifactory[name], artifactory

    raise UnitxtArtifactNotFoundError(name, Artifactories().artifactories)


def verbosed_fetch_artifact(identifer):
    artifact, artifactory = fetch_artifact(identifer)
    print(f"Artifact {identifer} is fetched from {artifactory}")
    return artifact


def maybe_recover_artifact(artifact):
    if isinstance(artifact, str):
        return verbosed_fetch_artifact(artifact)
    else:
        return artifact


def register_all_artifacts(path):
    for loader, module_name, is_pkg in pkgutil.walk_packages(path):
        print(__name__)
        if module_name == __name__:
            continue
        print(f"Loading {module_name}")
        # Import the module
        module = loader.find_module(module_name).load_module(module_name)

        # Iterate over every object in the module
        for name, obj in inspect.getmembers(module):
            # Make sure the object is a class
            if inspect.isclass(obj):
                # Make sure the class is a subclass of Artifact (but not Artifact itself)
                if issubclass(obj, Artifact) and obj is not Artifact:
                    print(obj)