Elron commited on
Commit
cb31369
1 Parent(s): d4645b0

Upload operator.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operator.py +34 -15
operator.py CHANGED
@@ -3,6 +3,7 @@ from dataclasses import field
3
  from typing import Any, Dict, Generator, List, Optional, Union
4
 
5
  from .artifact import Artifact
 
6
  from .stream import MultiStream, Stream
7
 
8
 
@@ -31,11 +32,11 @@ class OperatorError(Exception):
31
 
32
  class StreamingOperator(Artifact):
33
  """
34
- Base class for all stream operators in the streaming model.
35
 
36
  Stream operators are a key component of the streaming model and are responsible for processing continuous data streams.
37
- They perform operations such as transformations, aggregations, joins, windowing and more on these streams.
38
- There are several types of stream operators, including source operators, processing operators, etc.
39
 
40
  As a `StreamingOperator`, this class is responsible for performing operations on a stream, and must be implemented by all other specific types of stream operators in the system.
41
  When called, a `StreamingOperator` must return a MultiStream.
@@ -59,13 +60,14 @@ class StreamingOperator(Artifact):
59
 
60
  class StreamSource(StreamingOperator):
61
  """
62
- A class representing a stream source operator in the streaming system.
63
 
64
  A stream source operator is a special type of `StreamingOperator` that generates a data stream without taking any input streams. It serves as the starting point in a stream processing pipeline, providing the initial data that other operators in the pipeline can process.
65
 
66
  When called, a `StreamSource` should generate a `MultiStream`. This behavior must be implemented by any classes that inherit from `StreamSource`.
67
 
68
  """
 
69
  @abstractmethod
70
  def __call__(self) -> MultiStream:
71
  pass
@@ -73,15 +75,17 @@ class StreamSource(StreamingOperator):
73
 
74
  class SourceOperator(StreamSource):
75
  """
76
- A class representing a source operator in the streaming system.
77
 
78
  A source operator is responsible for generating the data stream from some source, such as a database or a file. This is the starting point of a stream processing pipeline. The `SourceOperator` class is a type of `StreamSource`, which is a special type of `StreamingOperator` that generates an output stream but does not take any input streams.
79
 
80
  When called, a `SourceOperator` invokes its `process` method, which should be implemented by all subclasses to generate the required `MultiStream`.
81
 
82
  """
 
83
  def __call__(self) -> MultiStream:
84
- return self.process()
 
85
 
86
  @abstractmethod
87
  def process(self) -> MultiStream:
@@ -90,16 +94,17 @@ class SourceOperator(StreamSource):
90
 
91
  class StreamInitializerOperator(StreamSource):
92
  """
93
- A class representing a stream initializer operator in the streaming system.
94
 
95
- A stream initializer operator is a special type of `StreamSource` that is capable of taking parameters during the stream generation process. This can be useful in situations where the stream generation process needs to be customized or configured based on certain parameters.
96
 
97
  When called, a `StreamInitializerOperator` invokes its `process` method, passing any supplied arguments and keyword arguments. The `process` method should be implemented by all subclasses to generate the required `MultiStream` based on the given arguments and keyword arguments.
98
 
99
  """
100
-
101
  def __call__(self, *args, **kwargs) -> MultiStream:
102
- return self.process(*args, **kwargs)
 
103
 
104
  @abstractmethod
105
  def process(self, *args, **kwargs) -> MultiStream:
@@ -112,8 +117,10 @@ class MultiStreamOperator(StreamingOperator):
112
 
113
  A multi-stream operator is a type of `StreamingOperator` that operates on an entire MultiStream object at once. It takes a `MultiStream` as input and produces a `MultiStream` as output. The `process` method should be implemented by subclasses to define the specific operations to be performed on the input `MultiStream`.
114
  """
 
115
  def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
116
- return self._process_multi_stream(multi_stream)
 
117
 
118
  def _process_multi_stream(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
119
  result = self.process(multi_stream)
@@ -131,6 +138,7 @@ class SingleStreamOperator(MultiStreamOperator):
131
 
132
  A single-stream operator is a type of `MultiStreamOperator` that operates on individual `Stream` objects within a `MultiStream`. It iterates through each `Stream` in the `MultiStream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each `Stream`.
133
  """
 
134
  def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
135
  result = {}
136
  for stream_name, stream in multi_stream.items():
@@ -157,8 +165,9 @@ class PagedStreamOperator(SingleStreamOperator):
157
 
158
  A paged-stream operator is a type of `SingleStreamOperator` that operates on a page of instances in a `Stream` at a time, where a page is a subset of instances. The `process` method should be implemented by subclasses to define the specific operations to be performed on each page.
159
  """
 
160
  page_size: int = 1000
161
-
162
  def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
163
  page = []
164
  for instance in stream:
@@ -179,6 +188,7 @@ class SingleStreamReducer(StreamingOperator):
179
 
180
  A single-stream reducer is a type of `StreamingOperator` that operates on individual `Stream` objects within a `MultiStream` and reduces each `Stream` to a single output value. The `process` method should be implemented by subclasses to define the specific reduction operation to be performed on each `Stream`.
181
  """
 
182
  def __call__(self, multi_stream: Optional[MultiStream] = None) -> Dict[str, Any]:
183
  result = {}
184
  for stream_name, stream in multi_stream.items():
@@ -198,6 +208,7 @@ class StreamInstanceOperator(SingleStreamOperator):
198
 
199
  A stream instance operator is a type of `SingleStreamOperator` that operates on individual instances within a `Stream`. It iterates through each instance in the `Stream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each instance.
200
  """
 
201
  def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
202
  for instance in stream:
203
  yield self._process_instance(instance, stream_name)
@@ -216,6 +227,7 @@ class StreamInstanceOperatorValidator(StreamInstanceOperator):
216
 
217
  A stream instance operator validator is a type of `StreamInstanceOperator` that includes a validation step. It operates on individual instances within a `Stream` and validates the result of processing each instance.
218
  """
 
219
  @abstractmethod
220
  def validate(self, instance):
221
  pass
@@ -235,6 +247,7 @@ class InstanceOperator(Artifact):
235
 
236
  An instance operator is a type of `Artifact` that operates on a single instance (represented as a dict) at a time. It takes an instance as input and produces a transformed instance as output.
237
  """
 
238
  def __call__(self, data: dict) -> dict:
239
  return self.process(data)
240
 
@@ -243,12 +256,13 @@ class InstanceOperator(Artifact):
243
  pass
244
 
245
 
246
- class FieldOperator(Artifact):
247
  """
248
  A class representing a field operator in the streaming system.
249
 
250
  A field operator is a type of `Artifact` that operates on a single field within an instance. It takes an instance and a field name as input, processes the field, and updates the field in the instance with the processed value.
251
  """
 
252
  def __call__(self, data: Dict[str, Any], field: str) -> dict:
253
  value = self.process(data[field])
254
  data[field] = value
@@ -263,10 +277,11 @@ class InstanceOperatorWithGlobalAccess(StreamingOperator):
263
  """
264
  A class representing an instance operator with global access in the streaming system.
265
 
266
- An instance operator with global access is a type of `StreamingOperator` that operates on individual instances within a `Stream` and can also access other streams.
267
  It uses the `accessible_streams` attribute to determine which other streams it has access to.
268
  In order to make this efficient and to avoid qudratic complexity, it caches the accessible streams by default.
269
  """
 
270
  accessible_streams: Union[MultiStream, List[str]] = None
271
  cache_accessible_streams: bool = True
272
 
@@ -303,6 +318,7 @@ class SequntialOperator(MultiStreamOperator):
303
 
304
  A sequential operator is a type of `MultiStreamOperator` that applies a sequence of other operators to a `MultiStream`. It maintains a list of `StreamingOperator`s and applies them in order to the `MultiStream`.
305
  """
 
306
  steps: List[StreamingOperator] = field(default_factory=list)
307
 
308
  def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
@@ -317,6 +333,7 @@ class SourceSequntialOperator(SequntialOperator):
317
 
318
  A source sequential operator is a type of `SequntialOperator` that starts with a source operator. The first operator in its list of steps is a `StreamSource`, which generates the initial `MultiStream` that the other operators then process.
319
  """
 
320
  def __call__(self) -> MultiStream:
321
  return super().__call__()
322
 
@@ -333,8 +350,10 @@ class SequntialOperatorInitilizer(SequntialOperator):
333
 
334
  A sequential operator initializer is a type of `SequntialOperator` that starts with a stream initializer operator. The first operator in its list of steps is a `StreamInitializerOperator`, which generates the initial `MultiStream` based on the provided arguments and keyword arguments.
335
  """
 
336
  def __call__(self, *args, **kwargs) -> MultiStream:
337
- return self.process(*args, **kwargs)
 
338
 
339
  def process(self, *args, **kwargs) -> MultiStream:
340
  assert isinstance(
 
3
  from typing import Any, Dict, Generator, List, Optional, Union
4
 
5
  from .artifact import Artifact
6
+ from .random_utils import nested_seed
7
  from .stream import MultiStream, Stream
8
 
9
 
 
32
 
33
  class StreamingOperator(Artifact):
34
  """
35
+ Base class for all stream operators in the streaming model.
36
 
37
  Stream operators are a key component of the streaming model and are responsible for processing continuous data streams.
38
+ They perform operations such as transformations, aggregations, joins, windowing and more on these streams.
39
+ There are several types of stream operators, including source operators, processing operators, etc.
40
 
41
  As a `StreamingOperator`, this class is responsible for performing operations on a stream, and must be implemented by all other specific types of stream operators in the system.
42
  When called, a `StreamingOperator` must return a MultiStream.
 
60
 
61
  class StreamSource(StreamingOperator):
62
  """
63
+ A class representing a stream source operator in the streaming system.
64
 
65
  A stream source operator is a special type of `StreamingOperator` that generates a data stream without taking any input streams. It serves as the starting point in a stream processing pipeline, providing the initial data that other operators in the pipeline can process.
66
 
67
  When called, a `StreamSource` should generate a `MultiStream`. This behavior must be implemented by any classes that inherit from `StreamSource`.
68
 
69
  """
70
+
71
  @abstractmethod
72
  def __call__(self) -> MultiStream:
73
  pass
 
75
 
76
  class SourceOperator(StreamSource):
77
  """
78
+ A class representing a source operator in the streaming system.
79
 
80
  A source operator is responsible for generating the data stream from some source, such as a database or a file. This is the starting point of a stream processing pipeline. The `SourceOperator` class is a type of `StreamSource`, which is a special type of `StreamingOperator` that generates an output stream but does not take any input streams.
81
 
82
  When called, a `SourceOperator` invokes its `process` method, which should be implemented by all subclasses to generate the required `MultiStream`.
83
 
84
  """
85
+
86
  def __call__(self) -> MultiStream:
87
+ with nested_seed():
88
+ return self.process()
89
 
90
  @abstractmethod
91
  def process(self) -> MultiStream:
 
94
 
95
  class StreamInitializerOperator(StreamSource):
96
  """
97
+ A class representing a stream initializer operator in the streaming system.
98
 
99
+ A stream initializer operator is a special type of `StreamSource` that is capable of taking parameters during the stream generation process. This can be useful in situations where the stream generation process needs to be customized or configured based on certain parameters.
100
 
101
  When called, a `StreamInitializerOperator` invokes its `process` method, passing any supplied arguments and keyword arguments. The `process` method should be implemented by all subclasses to generate the required `MultiStream` based on the given arguments and keyword arguments.
102
 
103
  """
104
+
105
  def __call__(self, *args, **kwargs) -> MultiStream:
106
+ with nested_seed():
107
+ return self.process(*args, **kwargs)
108
 
109
  @abstractmethod
110
  def process(self, *args, **kwargs) -> MultiStream:
 
117
 
118
  A multi-stream operator is a type of `StreamingOperator` that operates on an entire MultiStream object at once. It takes a `MultiStream` as input and produces a `MultiStream` as output. The `process` method should be implemented by subclasses to define the specific operations to be performed on the input `MultiStream`.
119
  """
120
+
121
  def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
122
+ with nested_seed():
123
+ return self._process_multi_stream(multi_stream)
124
 
125
  def _process_multi_stream(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
126
  result = self.process(multi_stream)
 
138
 
139
  A single-stream operator is a type of `MultiStreamOperator` that operates on individual `Stream` objects within a `MultiStream`. It iterates through each `Stream` in the `MultiStream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each `Stream`.
140
  """
141
+
142
  def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
143
  result = {}
144
  for stream_name, stream in multi_stream.items():
 
165
 
166
  A paged-stream operator is a type of `SingleStreamOperator` that operates on a page of instances in a `Stream` at a time, where a page is a subset of instances. The `process` method should be implemented by subclasses to define the specific operations to be performed on each page.
167
  """
168
+
169
  page_size: int = 1000
170
+
171
  def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
172
  page = []
173
  for instance in stream:
 
188
 
189
  A single-stream reducer is a type of `StreamingOperator` that operates on individual `Stream` objects within a `MultiStream` and reduces each `Stream` to a single output value. The `process` method should be implemented by subclasses to define the specific reduction operation to be performed on each `Stream`.
190
  """
191
+
192
  def __call__(self, multi_stream: Optional[MultiStream] = None) -> Dict[str, Any]:
193
  result = {}
194
  for stream_name, stream in multi_stream.items():
 
208
 
209
  A stream instance operator is a type of `SingleStreamOperator` that operates on individual instances within a `Stream`. It iterates through each instance in the `Stream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each instance.
210
  """
211
+
212
  def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
213
  for instance in stream:
214
  yield self._process_instance(instance, stream_name)
 
227
 
228
  A stream instance operator validator is a type of `StreamInstanceOperator` that includes a validation step. It operates on individual instances within a `Stream` and validates the result of processing each instance.
229
  """
230
+
231
  @abstractmethod
232
  def validate(self, instance):
233
  pass
 
247
 
248
  An instance operator is a type of `Artifact` that operates on a single instance (represented as a dict) at a time. It takes an instance as input and produces a transformed instance as output.
249
  """
250
+
251
  def __call__(self, data: dict) -> dict:
252
  return self.process(data)
253
 
 
256
  pass
257
 
258
 
259
+ class BaseFieldOperator(Artifact):
260
  """
261
  A class representing a field operator in the streaming system.
262
 
263
  A field operator is a type of `Artifact` that operates on a single field within an instance. It takes an instance and a field name as input, processes the field, and updates the field in the instance with the processed value.
264
  """
265
+
266
  def __call__(self, data: Dict[str, Any], field: str) -> dict:
267
  value = self.process(data[field])
268
  data[field] = value
 
277
  """
278
  A class representing an instance operator with global access in the streaming system.
279
 
280
+ An instance operator with global access is a type of `StreamingOperator` that operates on individual instances within a `Stream` and can also access other streams.
281
  It uses the `accessible_streams` attribute to determine which other streams it has access to.
282
  In order to make this efficient and to avoid qudratic complexity, it caches the accessible streams by default.
283
  """
284
+
285
  accessible_streams: Union[MultiStream, List[str]] = None
286
  cache_accessible_streams: bool = True
287
 
 
318
 
319
  A sequential operator is a type of `MultiStreamOperator` that applies a sequence of other operators to a `MultiStream`. It maintains a list of `StreamingOperator`s and applies them in order to the `MultiStream`.
320
  """
321
+
322
  steps: List[StreamingOperator] = field(default_factory=list)
323
 
324
  def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
 
333
 
334
  A source sequential operator is a type of `SequntialOperator` that starts with a source operator. The first operator in its list of steps is a `StreamSource`, which generates the initial `MultiStream` that the other operators then process.
335
  """
336
+
337
  def __call__(self) -> MultiStream:
338
  return super().__call__()
339
 
 
350
 
351
  A sequential operator initializer is a type of `SequntialOperator` that starts with a stream initializer operator. The first operator in its list of steps is a `StreamInitializerOperator`, which generates the initial `MultiStream` based on the provided arguments and keyword arguments.
352
  """
353
+
354
  def __call__(self, *args, **kwargs) -> MultiStream:
355
+ with nested_seed():
356
+ return self.process(*args, **kwargs)
357
 
358
  def process(self, *args, **kwargs) -> MultiStream:
359
  assert isinstance(