File size: 8,441 Bytes
e7f788e c4a2537 e7f788e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
from abc import abstractmethod
from dataclasses import field
from typing import Any, Dict, Generator, List, Optional, Union
from .artifact import Artifact
from .stream import MultiStream, Stream
class Operator(Artifact):
pass
class OperatorError(Exception):
def __init__(self, exception: Exception, operators: List[Operator]):
super().__init__(
"This error was raised by the following operators: "
+ ",\n".join([str(operator) for operator in operators])
+ "."
)
self.exception = exception
self.operators = operators
@classmethod
def from_operator_error(cls, exception: Exception, operator: Operator):
return cls(exception.exception, [*exception.operators, operator])
@classmethod
def from_exception(cls, exception: Exception, operator: Operator):
return cls(exception, [operator])
class StreamingOperator(Artifact):
@abstractmethod
def __call__(self, streams: Optional[MultiStream] = None) -> MultiStream:
pass
class StreamSource(StreamingOperator):
@abstractmethod
def __call__(self) -> MultiStream:
pass
class SourceOperator(StreamSource):
def __call__(self) -> MultiStream:
return self.process()
@abstractmethod
def process(self) -> MultiStream:
pass
class StreamInitializerOperator(StreamSource):
def __call__(self, *args, **kwargs) -> MultiStream:
return self.process(*args, **kwargs)
@abstractmethod
def process(self, *args, **kwargs) -> MultiStream:
pass
class MultiStreamOperator(StreamingOperator):
def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
return self._process_multi_stream(multi_stream)
def _process_multi_stream(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
result = self.process(multi_stream)
assert isinstance(result, MultiStream), "MultiStreamOperator must return a MultiStream"
return result
@abstractmethod
def process(self, multi_stream: MultiStream) -> MultiStream:
pass
class SingleStreamOperator(MultiStreamOperator):
def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
result = {}
for stream_name, stream in multi_stream.items():
stream = self._process_single_stream(stream, stream_name)
assert isinstance(stream, Stream), "SingleStreamOperator must return a Stream"
result[stream_name] = stream
return MultiStream(result)
def _process_single_stream(self, stream: Stream, stream_name: str = None) -> Stream:
return Stream(self._process_stream, gen_kwargs={"stream": stream, "stream_name": stream_name})
def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
yield from self.process(stream, stream_name)
@abstractmethod
def process(self, stream: Stream, stream_name: str = None) -> Generator:
pass
# class StreamGeneratorOperator(SingleStreamOperator):
# def stream(self, stream):
# return Stream(self.process, gen_kwargs={'stream': stream})
# @abstractmethod
# def process(self, stream: Stream) -> Generator:
# yield None
class SingleStreamReducer(StreamingOperator):
def __call__(self, multi_stream: Optional[MultiStream] = None) -> Dict[str, Any]:
result = {}
for stream_name, stream in multi_stream.items():
stream = self.process(stream)
result[stream_name] = stream
return result
@abstractmethod
def process(self, stream: Stream) -> Any:
pass
class StreamInstanceOperator(SingleStreamOperator):
def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
for instance in stream:
yield self._process_instance(instance, stream_name)
def _process_instance(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
return self.process(instance, stream_name)
@abstractmethod
def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
pass
class StreamInstanceOperatorValidator(StreamInstanceOperator):
@abstractmethod
def validate(self, instance):
pass
def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
iterator = iter(stream)
first_instance = next(iterator)
result = self._process_instance(first_instance, stream_name)
self.validate(result)
yield result
yield from (self._process_instance(instance, stream_name) for instance in iterator)
class InstanceOperator(Artifact):
def __call__(self, data: dict) -> dict:
return self.process(data)
@abstractmethod
def process(self, data: dict) -> dict:
pass
class FieldOperator(Artifact):
def __call__(self, data: Dict[str, Any], field: str) -> dict:
value = self.process(data[field])
data[field] = value
return data
@abstractmethod
def process(self, value: Any) -> Any:
pass
# class NamedStreamInstanceOperator(StreamingOperator):
# def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
# result = {}
# for stream_name, stream in multi_stream.items():
# stream = Stream(self.generator, gen_kwargs={'stream': stream, 'stream_name': stream_name})
# result[stream_name] = stream
# return MultiStream(result)
# def verify_first_instance(self, instance):
# pass
# def generator(self, stream, stream_name):
# iterator = iter(stream)
# first_instance = next(iterator)
# result = self.process(first_instance, stream_name)
# self.verify_first_instance(result)
# yield result
# yield from (self.process(instance) for instance in iterator)
# @abstractmethod
# def process(self, instance: dict, stream_name: str) -> dict:
# pass
class InstanceOperatorWithGlobalAccess(StreamingOperator):
accessible_streams: Union[MultiStream, List[str]] = None
cache_accessible_streams: bool = True
def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
result = {}
if isinstance(self.accessible_streams, list):
# cache the accessible streams:
self.accessible_streams = MultiStream(
{stream_name: multi_stream[stream_name] for stream_name in self.accessible_streams}
)
if self.cache_accessible_streams:
for stream in self.accessible_streams.values():
stream.set_caching(True)
for stream_name, stream in multi_stream.items():
stream = Stream(self.generator, gen_kwargs={"stream": stream, "multi_stream": self.accessible_streams})
result[stream_name] = stream
return MultiStream(result)
def generator(self, stream, multi_stream):
yield from (self.process(instance, multi_stream) for instance in stream)
@abstractmethod
def process(self, instance: dict, multi_stream: MultiStream) -> dict:
pass
class SequntialOperator(MultiStreamOperator):
steps: List[StreamingOperator] = field(default_factory=list)
def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
for operator in self.steps:
multi_stream = operator(multi_stream)
return multi_stream
class SourceSequntialOperator(SequntialOperator):
def __call__(self) -> MultiStream:
return super().__call__()
def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
multi_stream = self.steps[0]()
for operator in self.steps[1:]:
multi_stream = operator(multi_stream)
return multi_stream
class SequntialOperatorInitilizer(SequntialOperator):
def __call__(self, *args, **kwargs) -> MultiStream:
return self.process(*args, **kwargs)
def process(self, *args, **kwargs) -> MultiStream:
assert isinstance(
self.steps[0], StreamInitializerOperator
), "The first step in a SequntialOperatorInitilizer must be a StreamInitializerOperator"
multi_stream = self.steps[0](*args, **kwargs)
for operator in self.steps[1:]:
multi_stream = operator(multi_stream)
return multi_stream
|