File size: 8,441 Bytes
e7f788e
 
c4a2537
 
 
 
e7f788e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
from abc import abstractmethod
from dataclasses import field
from typing import Any, Dict, Generator, List, Optional, Union

from .artifact import Artifact
from .stream import MultiStream, Stream


class Operator(Artifact):
    pass


class OperatorError(Exception):
    def __init__(self, exception: Exception, operators: List[Operator]):
        super().__init__(
            "This error was raised by the following operators: "
            + ",\n".join([str(operator) for operator in operators])
            + "."
        )
        self.exception = exception
        self.operators = operators

    @classmethod
    def from_operator_error(cls, exception: Exception, operator: Operator):
        return cls(exception.exception, [*exception.operators, operator])

    @classmethod
    def from_exception(cls, exception: Exception, operator: Operator):
        return cls(exception, [operator])


class StreamingOperator(Artifact):
    @abstractmethod
    def __call__(self, streams: Optional[MultiStream] = None) -> MultiStream:
        pass


class StreamSource(StreamingOperator):
    @abstractmethod
    def __call__(self) -> MultiStream:
        pass


class SourceOperator(StreamSource):
    def __call__(self) -> MultiStream:
        return self.process()

    @abstractmethod
    def process(self) -> MultiStream:
        pass


class StreamInitializerOperator(StreamSource):
    def __call__(self, *args, **kwargs) -> MultiStream:
        return self.process(*args, **kwargs)

    @abstractmethod
    def process(self, *args, **kwargs) -> MultiStream:
        pass


class MultiStreamOperator(StreamingOperator):
    def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
        return self._process_multi_stream(multi_stream)

    def _process_multi_stream(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
        result = self.process(multi_stream)
        assert isinstance(result, MultiStream), "MultiStreamOperator must return a MultiStream"
        return result

    @abstractmethod
    def process(self, multi_stream: MultiStream) -> MultiStream:
        pass


class SingleStreamOperator(MultiStreamOperator):
    def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
        result = {}
        for stream_name, stream in multi_stream.items():
            stream = self._process_single_stream(stream, stream_name)
            assert isinstance(stream, Stream), "SingleStreamOperator must return a Stream"
            result[stream_name] = stream

        return MultiStream(result)

    def _process_single_stream(self, stream: Stream, stream_name: str = None) -> Stream:
        return Stream(self._process_stream, gen_kwargs={"stream": stream, "stream_name": stream_name})

    def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
        yield from self.process(stream, stream_name)

    @abstractmethod
    def process(self, stream: Stream, stream_name: str = None) -> Generator:
        pass


# class StreamGeneratorOperator(SingleStreamOperator):

#     def stream(self, stream):
#         return Stream(self.process, gen_kwargs={'stream': stream})

#     @abstractmethod
#     def process(self, stream: Stream) -> Generator:
#         yield None


class SingleStreamReducer(StreamingOperator):
    def __call__(self, multi_stream: Optional[MultiStream] = None) -> Dict[str, Any]:
        result = {}
        for stream_name, stream in multi_stream.items():
            stream = self.process(stream)
            result[stream_name] = stream

        return result

    @abstractmethod
    def process(self, stream: Stream) -> Any:
        pass


class StreamInstanceOperator(SingleStreamOperator):
    def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
        for instance in stream:
            yield self._process_instance(instance, stream_name)

    def _process_instance(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
        return self.process(instance, stream_name)

    @abstractmethod
    def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
        pass


class StreamInstanceOperatorValidator(StreamInstanceOperator):
    @abstractmethod
    def validate(self, instance):
        pass

    def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
        iterator = iter(stream)
        first_instance = next(iterator)
        result = self._process_instance(first_instance, stream_name)
        self.validate(result)
        yield result
        yield from (self._process_instance(instance, stream_name) for instance in iterator)


class InstanceOperator(Artifact):
    def __call__(self, data: dict) -> dict:
        return self.process(data)

    @abstractmethod
    def process(self, data: dict) -> dict:
        pass


class FieldOperator(Artifact):
    def __call__(self, data: Dict[str, Any], field: str) -> dict:
        value = self.process(data[field])
        data[field] = value
        return data

    @abstractmethod
    def process(self, value: Any) -> Any:
        pass


# class NamedStreamInstanceOperator(StreamingOperator):

#     def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
#         result = {}
#         for stream_name, stream in multi_stream.items():
#             stream = Stream(self.generator, gen_kwargs={'stream': stream, 'stream_name': stream_name})
#             result[stream_name] = stream
#         return MultiStream(result)

#     def verify_first_instance(self, instance):
#         pass

#     def generator(self, stream, stream_name):
#         iterator = iter(stream)
#         first_instance = next(iterator)
#         result = self.process(first_instance, stream_name)
#         self.verify_first_instance(result)
#         yield result
#         yield from (self.process(instance) for instance in iterator)

#     @abstractmethod
#     def process(self, instance: dict, stream_name: str) -> dict:
#         pass


class InstanceOperatorWithGlobalAccess(StreamingOperator):
    accessible_streams: Union[MultiStream, List[str]] = None
    cache_accessible_streams: bool = True

    def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
        result = {}

        if isinstance(self.accessible_streams, list):
            # cache the accessible streams:
            self.accessible_streams = MultiStream(
                {stream_name: multi_stream[stream_name] for stream_name in self.accessible_streams}
            )

        if self.cache_accessible_streams:
            for stream in self.accessible_streams.values():
                stream.set_caching(True)

        for stream_name, stream in multi_stream.items():
            stream = Stream(self.generator, gen_kwargs={"stream": stream, "multi_stream": self.accessible_streams})
            result[stream_name] = stream

        return MultiStream(result)

    def generator(self, stream, multi_stream):
        yield from (self.process(instance, multi_stream) for instance in stream)

    @abstractmethod
    def process(self, instance: dict, multi_stream: MultiStream) -> dict:
        pass


class SequntialOperator(MultiStreamOperator):
    steps: List[StreamingOperator] = field(default_factory=list)

    def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
        for operator in self.steps:
            multi_stream = operator(multi_stream)
        return multi_stream


class SourceSequntialOperator(SequntialOperator):
    def __call__(self) -> MultiStream:
        return super().__call__()

    def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
        multi_stream = self.steps[0]()
        for operator in self.steps[1:]:
            multi_stream = operator(multi_stream)
        return multi_stream


class SequntialOperatorInitilizer(SequntialOperator):
    def __call__(self, *args, **kwargs) -> MultiStream:
        return self.process(*args, **kwargs)

    def process(self, *args, **kwargs) -> MultiStream:
        assert isinstance(
            self.steps[0], StreamInitializerOperator
        ), "The first step in a SequntialOperatorInitilizer must be a StreamInitializerOperator"
        multi_stream = self.steps[0](*args, **kwargs)
        for operator in self.steps[1:]:
            multi_stream = operator(multi_stream)
        return multi_stream