Elron commited on
Commit
5fe31b7
1 Parent(s): 719f1b0

Upload operator.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operator.py +130 -84
operator.py CHANGED
@@ -1,6 +1,7 @@
 
1
  from abc import abstractmethod
2
  from dataclasses import field
3
- from typing import Any, Dict, Generator, List, Optional, Union
4
 
5
  from .artifact import Artifact
6
  from .dataclass import NonPositionalField
@@ -32,8 +33,7 @@ class OperatorError(Exception):
32
 
33
 
34
  class StreamingOperator(Artifact):
35
- """
36
- Base class for all stream operators in the streaming model.
37
 
38
  Stream operators are a key component of the streaming model and are responsible for processing continuous data streams.
39
  They perform operations such as transformations, aggregations, joins, windowing and more on these streams.
@@ -48,8 +48,7 @@ class StreamingOperator(Artifact):
48
 
49
  @abstractmethod
50
  def __call__(self, streams: Optional[MultiStream] = None) -> MultiStream:
51
- """
52
- Abstract method that performs operations on the stream.
53
 
54
  Args:
55
  streams (Optional[MultiStream]): The input MultiStream, which can be None.
@@ -60,8 +59,7 @@ class StreamingOperator(Artifact):
60
 
61
 
62
  class StreamSource(StreamingOperator):
63
- """
64
- A class representing a stream source operator in the streaming system.
65
 
66
  A stream source operator is a special type of `StreamingOperator` that generates a data stream without taking any input streams. It serves as the starting point in a stream processing pipeline, providing the initial data that other operators in the pipeline can process.
67
 
@@ -75,8 +73,7 @@ class StreamSource(StreamingOperator):
75
 
76
 
77
  class SourceOperator(StreamSource):
78
- """
79
- A class representing a source operator in the streaming system.
80
 
81
  A source operator is responsible for generating the data stream from some source, such as a database or a file. This is the starting point of a stream processing pipeline. The `SourceOperator` class is a type of `StreamSource`, which is a special type of `StreamingOperator` that generates an output stream but does not take any input streams.
82
 
@@ -99,8 +96,7 @@ class SourceOperator(StreamSource):
99
 
100
 
101
  class StreamInitializerOperator(StreamSource):
102
- """
103
- A class representing a stream initializer operator in the streaming system.
104
 
105
  A stream initializer operator is a special type of `StreamSource` that is capable of taking parameters during the stream generation process. This can be useful in situations where the stream generation process needs to be customized or configured based on certain parameters.
106
 
@@ -123,8 +119,7 @@ class StreamInitializerOperator(StreamSource):
123
 
124
 
125
  class MultiStreamOperator(StreamingOperator):
126
- """
127
- A class representing a multi-stream operator in the streaming system.
128
 
129
  A multi-stream operator is a type of `StreamingOperator` that operates on an entire MultiStream object at once. It takes a `MultiStream` as input and produces a `MultiStream` as output. The `process` method should be implemented by subclasses to define the specific operations to be performed on the input `MultiStream`.
130
  """
@@ -138,9 +133,13 @@ class MultiStreamOperator(StreamingOperator):
138
  result.set_caching(self.caching)
139
  return result
140
 
141
- def _process_multi_stream(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
 
 
142
  result = self.process(multi_stream)
143
- assert isinstance(result, MultiStream), "MultiStreamOperator must return a MultiStream"
 
 
144
  return result
145
 
146
  @abstractmethod
@@ -149,30 +148,42 @@ class MultiStreamOperator(StreamingOperator):
149
 
150
 
151
  class SingleStreamOperator(MultiStreamOperator):
152
- """
153
- A class representing a single-stream operator in the streaming system.
154
 
155
  A single-stream operator is a type of `MultiStreamOperator` that operates on individual `Stream` objects within a `MultiStream`. It iterates through each `Stream` in the `MultiStream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each `Stream`.
156
  """
157
 
158
- apply_to_streams: List[str] = NonPositionalField(default=None) # None apply to all streams
159
- dont_apply_to_streams: List[str] = NonPositionalField(default_factory=list)
 
 
160
 
161
  def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
162
  result = {}
163
  for stream_name, stream in multi_stream.items():
164
- stream = self._process_single_stream(stream, stream_name)
165
- assert isinstance(stream, Stream), "SingleStreamOperator must return a Stream"
 
 
 
 
 
166
  result[stream_name] = stream
167
 
168
  return MultiStream(result)
169
 
170
- def _process_single_stream(self, stream: Stream, stream_name: str = None) -> Stream:
171
- return Stream(self._process_stream, gen_kwargs={"stream": stream, "stream_name": stream_name})
 
 
 
 
 
172
 
173
- def is_should_be_processed(self, stream_name):
174
  if (
175
  self.apply_to_streams is not None
 
176
  and stream_name in self.apply_to_streams
177
  and stream_name in self.dont_apply_to_streams
178
  ):
@@ -182,29 +193,32 @@ class SingleStreamOperator(MultiStreamOperator):
182
 
183
  return (
184
  self.apply_to_streams is None or stream_name in self.apply_to_streams
185
- ) and stream_name not in self.dont_apply_to_streams
 
 
 
186
 
187
- def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
188
- if self.is_should_be_processed(stream_name):
189
- yield from self.process(stream, stream_name)
190
- else:
191
- yield from stream
192
 
193
  @abstractmethod
194
- def process(self, stream: Stream, stream_name: str = None) -> Generator:
195
  pass
196
 
197
 
198
  class PagedStreamOperator(SingleStreamOperator):
199
- """
200
- A class representing a paged-stream operator in the streaming system.
201
 
202
  A paged-stream operator is a type of `SingleStreamOperator` that operates on a page of instances in a `Stream` at a time, where a page is a subset of instances. The `process` method should be implemented by subclasses to define the specific operations to be performed on each page.
203
  """
204
 
205
  page_size: int = 1000
206
 
207
- def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
 
 
208
  page = []
209
  for instance in stream:
210
  page.append(instance)
@@ -214,13 +228,12 @@ class PagedStreamOperator(SingleStreamOperator):
214
  yield from self.process(page, stream_name)
215
 
216
  @abstractmethod
217
- def process(self, page: List[Dict], stream_name: str = None) -> Generator:
218
  pass
219
 
220
 
221
  class SingleStreamReducer(StreamingOperator):
222
- """
223
- A class representing a single-stream reducer in the streaming system.
224
 
225
  A single-stream reducer is a type of `StreamingOperator` that operates on individual `Stream` objects within a `MultiStream` and reduces each `Stream` to a single output value. The `process` method should be implemented by subclasses to define the specific reduction operation to be performed on each `Stream`.
226
  """
@@ -239,27 +252,40 @@ class SingleStreamReducer(StreamingOperator):
239
 
240
 
241
  class StreamInstanceOperator(SingleStreamOperator):
242
- """
243
- A class representing a stream instance operator in the streaming system.
244
 
245
  A stream instance operator is a type of `SingleStreamOperator` that operates on individual instances within a `Stream`. It iterates through each instance in the `Stream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each instance.
246
  """
247
 
248
- def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
249
- for instance in stream:
250
- yield self._process_instance(instance, stream_name)
251
-
252
- def _process_instance(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
 
 
 
 
 
 
 
 
 
 
 
 
 
253
  return self.process(instance, stream_name)
254
 
255
  @abstractmethod
256
- def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
 
 
257
  pass
258
 
259
 
260
  class StreamInstanceOperatorValidator(StreamInstanceOperator):
261
- """
262
- A class representing a stream instance operator validator in the streaming system.
263
 
264
  A stream instance operator validator is a type of `StreamInstanceOperator` that includes a validation step. It operates on individual instances within a `Stream` and validates the result of processing each instance.
265
  """
@@ -268,18 +294,24 @@ class StreamInstanceOperatorValidator(StreamInstanceOperator):
268
  def validate(self, instance):
269
  pass
270
 
271
- def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
 
 
272
  iterator = iter(stream)
273
- first_instance = next(iterator)
 
 
 
274
  result = self._process_instance(first_instance, stream_name)
275
  self.validate(result)
276
  yield result
277
- yield from (self._process_instance(instance, stream_name) for instance in iterator)
 
 
278
 
279
 
280
  class InstanceOperator(Artifact):
281
- """
282
- A class representing an instance operator in the streaming system.
283
 
284
  An instance operator is a type of `Artifact` that operates on a single instance (represented as a dict) at a time. It takes an instance as input and produces a transformed instance as output.
285
  """
@@ -293,8 +325,7 @@ class InstanceOperator(Artifact):
293
 
294
 
295
  class BaseFieldOperator(Artifact):
296
- """
297
- A class representing a field operator in the streaming system.
298
 
299
  A field operator is a type of `Artifact` that operates on a single field within an instance. It takes an instance and a field name as input, processes the field, and updates the field in the instance with the processed value.
300
  """
@@ -309,33 +340,22 @@ class BaseFieldOperator(Artifact):
309
  pass
310
 
311
 
312
- class InstanceOperatorWithGlobalAccess(StreamingOperator):
313
- """
314
- A class representing an instance operator with global access in the streaming system.
315
 
316
  An instance operator with global access is a type of `StreamingOperator` that operates on individual instances within a `Stream` and can also access other streams.
317
  It uses the `accessible_streams` attribute to determine which other streams it has access to.
318
  In order to make this efficient and to avoid qudratic complexity, it caches the accessible streams by default.
319
  """
320
 
321
- accessible_streams: Union[MultiStream, List[str]] = None
322
- cache_accessible_streams: bool = True
323
-
324
  def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
325
  result = {}
326
 
327
- if isinstance(self.accessible_streams, list):
328
- # cache the accessible streams:
329
- self.accessible_streams = MultiStream(
330
- {stream_name: multi_stream[stream_name] for stream_name in self.accessible_streams}
331
- )
332
-
333
- if self.cache_accessible_streams:
334
- for stream in self.accessible_streams.values():
335
- stream.caching = True
336
-
337
  for stream_name, stream in multi_stream.items():
338
- stream = Stream(self.generator, gen_kwargs={"stream": stream, "multi_stream": self.accessible_streams})
 
 
 
339
  result[stream_name] = stream
340
 
341
  return MultiStream(result)
@@ -348,24 +368,44 @@ class InstanceOperatorWithGlobalAccess(StreamingOperator):
348
  pass
349
 
350
 
351
- class SequntialOperator(MultiStreamOperator):
352
- """
353
- A class representing a sequential operator in the streaming system.
354
 
355
  A sequential operator is a type of `MultiStreamOperator` that applies a sequence of other operators to a `MultiStream`. It maintains a list of `StreamingOperator`s and applies them in order to the `MultiStream`.
356
  """
357
 
 
 
358
  steps: List[StreamingOperator] = field(default_factory=list)
359
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
360
  def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
361
- for operator in self.steps:
362
  multi_stream = operator(multi_stream)
363
  return multi_stream
364
 
365
 
366
- class SourceSequntialOperator(SequntialOperator):
367
- """
368
- A class representing a source sequential operator in the streaming system.
369
 
370
  A source sequential operator is a type of `SequntialOperator` that starts with a source operator. The first operator in its list of steps is a `StreamSource`, which generates the initial `MultiStream` that the other operators then process.
371
  """
@@ -374,15 +414,17 @@ class SourceSequntialOperator(SequntialOperator):
374
  return super().__call__()
375
 
376
  def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
 
 
 
377
  multi_stream = self.steps[0]()
378
- for operator in self.steps[1:]:
379
  multi_stream = operator(multi_stream)
380
  return multi_stream
381
 
382
 
383
- class SequntialOperatorInitilizer(SequntialOperator):
384
- """
385
- A class representing a sequential operator initializer in the streaming system.
386
 
387
  A sequential operator initializer is a type of `SequntialOperator` that starts with a stream initializer operator. The first operator in its list of steps is a `StreamInitializerOperator`, which generates the initial `MultiStream` based on the provided arguments and keyword arguments.
388
  """
@@ -392,10 +434,14 @@ class SequntialOperatorInitilizer(SequntialOperator):
392
  return self.process(*args, **kwargs)
393
 
394
  def process(self, *args, **kwargs) -> MultiStream:
 
 
 
 
395
  assert isinstance(
396
  self.steps[0], StreamInitializerOperator
397
- ), "The first step in a SequntialOperatorInitilizer must be a StreamInitializerOperator"
398
  multi_stream = self.steps[0](*args, **kwargs)
399
- for operator in self.steps[1:]:
400
  multi_stream = operator(multi_stream)
401
  return multi_stream
 
1
+ import re
2
  from abc import abstractmethod
3
  from dataclasses import field
4
+ from typing import Any, Dict, Generator, List, Optional
5
 
6
  from .artifact import Artifact
7
  from .dataclass import NonPositionalField
 
33
 
34
 
35
  class StreamingOperator(Artifact):
36
+ """Base class for all stream operators in the streaming model.
 
37
 
38
  Stream operators are a key component of the streaming model and are responsible for processing continuous data streams.
39
  They perform operations such as transformations, aggregations, joins, windowing and more on these streams.
 
48
 
49
  @abstractmethod
50
  def __call__(self, streams: Optional[MultiStream] = None) -> MultiStream:
51
+ """Abstract method that performs operations on the stream.
 
52
 
53
  Args:
54
  streams (Optional[MultiStream]): The input MultiStream, which can be None.
 
59
 
60
 
61
  class StreamSource(StreamingOperator):
62
+ """A class representing a stream source operator in the streaming system.
 
63
 
64
  A stream source operator is a special type of `StreamingOperator` that generates a data stream without taking any input streams. It serves as the starting point in a stream processing pipeline, providing the initial data that other operators in the pipeline can process.
65
 
 
73
 
74
 
75
  class SourceOperator(StreamSource):
76
+ """A class representing a source operator in the streaming system.
 
77
 
78
  A source operator is responsible for generating the data stream from some source, such as a database or a file. This is the starting point of a stream processing pipeline. The `SourceOperator` class is a type of `StreamSource`, which is a special type of `StreamingOperator` that generates an output stream but does not take any input streams.
79
 
 
96
 
97
 
98
  class StreamInitializerOperator(StreamSource):
99
+ """A class representing a stream initializer operator in the streaming system.
 
100
 
101
  A stream initializer operator is a special type of `StreamSource` that is capable of taking parameters during the stream generation process. This can be useful in situations where the stream generation process needs to be customized or configured based on certain parameters.
102
 
 
119
 
120
 
121
  class MultiStreamOperator(StreamingOperator):
122
+ """A class representing a multi-stream operator in the streaming system.
 
123
 
124
  A multi-stream operator is a type of `StreamingOperator` that operates on an entire MultiStream object at once. It takes a `MultiStream` as input and produces a `MultiStream` as output. The `process` method should be implemented by subclasses to define the specific operations to be performed on the input `MultiStream`.
125
  """
 
133
  result.set_caching(self.caching)
134
  return result
135
 
136
+ def _process_multi_stream(
137
+ self, multi_stream: Optional[MultiStream] = None
138
+ ) -> MultiStream:
139
  result = self.process(multi_stream)
140
+ assert isinstance(
141
+ result, MultiStream
142
+ ), "MultiStreamOperator must return a MultiStream"
143
  return result
144
 
145
  @abstractmethod
 
148
 
149
 
150
  class SingleStreamOperator(MultiStreamOperator):
151
+ """A class representing a single-stream operator in the streaming system.
 
152
 
153
  A single-stream operator is a type of `MultiStreamOperator` that operates on individual `Stream` objects within a `MultiStream`. It iterates through each `Stream` in the `MultiStream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each `Stream`.
154
  """
155
 
156
+ apply_to_streams: List[str] = NonPositionalField(
157
+ default=None
158
+ ) # None apply to all streams
159
+ dont_apply_to_streams: List[str] = NonPositionalField(default_factory=None)
160
 
161
  def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
162
  result = {}
163
  for stream_name, stream in multi_stream.items():
164
+ if self._is_should_be_processed(stream_name):
165
+ stream = self._process_single_stream(stream, stream_name)
166
+ else:
167
+ stream = stream
168
+ assert isinstance(
169
+ stream, Stream
170
+ ), "SingleStreamOperator must return a Stream"
171
  result[stream_name] = stream
172
 
173
  return MultiStream(result)
174
 
175
+ def _process_single_stream(
176
+ self, stream: Stream, stream_name: Optional[str] = None
177
+ ) -> Stream:
178
+ return Stream(
179
+ self._process_stream,
180
+ gen_kwargs={"stream": stream, "stream_name": stream_name},
181
+ )
182
 
183
+ def _is_should_be_processed(self, stream_name):
184
  if (
185
  self.apply_to_streams is not None
186
+ and self.dont_apply_to_streams is not None
187
  and stream_name in self.apply_to_streams
188
  and stream_name in self.dont_apply_to_streams
189
  ):
 
193
 
194
  return (
195
  self.apply_to_streams is None or stream_name in self.apply_to_streams
196
+ ) and (
197
+ self.dont_apply_to_streams is None
198
+ or stream_name not in self.dont_apply_to_streams
199
+ )
200
 
201
+ def _process_stream(
202
+ self, stream: Stream, stream_name: Optional[str] = None
203
+ ) -> Generator:
204
+ yield from self.process(stream, stream_name)
 
205
 
206
  @abstractmethod
207
+ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
208
  pass
209
 
210
 
211
  class PagedStreamOperator(SingleStreamOperator):
212
+ """A class representing a paged-stream operator in the streaming system.
 
213
 
214
  A paged-stream operator is a type of `SingleStreamOperator` that operates on a page of instances in a `Stream` at a time, where a page is a subset of instances. The `process` method should be implemented by subclasses to define the specific operations to be performed on each page.
215
  """
216
 
217
  page_size: int = 1000
218
 
219
+ def _process_stream(
220
+ self, stream: Stream, stream_name: Optional[str] = None
221
+ ) -> Generator:
222
  page = []
223
  for instance in stream:
224
  page.append(instance)
 
228
  yield from self.process(page, stream_name)
229
 
230
  @abstractmethod
231
+ def process(self, page: List[Dict], stream_name: Optional[str] = None) -> Generator:
232
  pass
233
 
234
 
235
  class SingleStreamReducer(StreamingOperator):
236
+ """A class representing a single-stream reducer in the streaming system.
 
237
 
238
  A single-stream reducer is a type of `StreamingOperator` that operates on individual `Stream` objects within a `MultiStream` and reduces each `Stream` to a single output value. The `process` method should be implemented by subclasses to define the specific reduction operation to be performed on each `Stream`.
239
  """
 
252
 
253
 
254
  class StreamInstanceOperator(SingleStreamOperator):
255
+ """A class representing a stream instance operator in the streaming system.
 
256
 
257
  A stream instance operator is a type of `SingleStreamOperator` that operates on individual instances within a `Stream`. It iterates through each instance in the `Stream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each instance.
258
  """
259
 
260
+ def _process_stream(
261
+ self, stream: Stream, stream_name: Optional[str] = None
262
+ ) -> Generator:
263
+ try:
264
+ _index = None
265
+ for _index, instance in enumerate(stream):
266
+ yield self._process_instance(instance, stream_name)
267
+ except Exception as e:
268
+ if _index is None:
269
+ raise e
270
+ else:
271
+ raise ValueError(
272
+ f"Error processing instance '{_index}' from stream '{stream_name}' in {self.__class__.__name__} due to: {e}"
273
+ ) from e
274
+
275
+ def _process_instance(
276
+ self, instance: Dict[str, Any], stream_name: Optional[str] = None
277
+ ) -> Dict[str, Any]:
278
  return self.process(instance, stream_name)
279
 
280
  @abstractmethod
281
+ def process(
282
+ self, instance: Dict[str, Any], stream_name: Optional[str] = None
283
+ ) -> Dict[str, Any]:
284
  pass
285
 
286
 
287
  class StreamInstanceOperatorValidator(StreamInstanceOperator):
288
+ """A class representing a stream instance operator validator in the streaming system.
 
289
 
290
  A stream instance operator validator is a type of `StreamInstanceOperator` that includes a validation step. It operates on individual instances within a `Stream` and validates the result of processing each instance.
291
  """
 
294
  def validate(self, instance):
295
  pass
296
 
297
+ def _process_stream(
298
+ self, stream: Stream, stream_name: Optional[str] = None
299
+ ) -> Generator:
300
  iterator = iter(stream)
301
+ try:
302
+ first_instance = next(iterator)
303
+ except StopIteration as e:
304
+ raise StopIteration(f"Strem '{stream_name}' is empty") from e
305
  result = self._process_instance(first_instance, stream_name)
306
  self.validate(result)
307
  yield result
308
+ yield from (
309
+ self._process_instance(instance, stream_name) for instance in iterator
310
+ )
311
 
312
 
313
  class InstanceOperator(Artifact):
314
+ """A class representing an instance operator in the streaming system.
 
315
 
316
  An instance operator is a type of `Artifact` that operates on a single instance (represented as a dict) at a time. It takes an instance as input and produces a transformed instance as output.
317
  """
 
325
 
326
 
327
  class BaseFieldOperator(Artifact):
328
+ """A class representing a field operator in the streaming system.
 
329
 
330
  A field operator is a type of `Artifact` that operates on a single field within an instance. It takes an instance and a field name as input, processes the field, and updates the field in the instance with the processed value.
331
  """
 
340
  pass
341
 
342
 
343
+ class InstanceOperatorWithMultiStreamAccess(StreamingOperator):
344
+ """A class representing an instance operator with global access in the streaming system.
 
345
 
346
  An instance operator with global access is a type of `StreamingOperator` that operates on individual instances within a `Stream` and can also access other streams.
347
  It uses the `accessible_streams` attribute to determine which other streams it has access to.
348
  In order to make this efficient and to avoid qudratic complexity, it caches the accessible streams by default.
349
  """
350
 
 
 
 
351
  def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
352
  result = {}
353
 
 
 
 
 
 
 
 
 
 
 
354
  for stream_name, stream in multi_stream.items():
355
+ stream = Stream(
356
+ self.generator,
357
+ gen_kwargs={"stream": stream, "multi_stream": multi_stream},
358
+ )
359
  result[stream_name] = stream
360
 
361
  return MultiStream(result)
 
368
  pass
369
 
370
 
371
+ class SequentialOperator(MultiStreamOperator):
372
+ """A class representing a sequential operator in the streaming system.
 
373
 
374
  A sequential operator is a type of `MultiStreamOperator` that applies a sequence of other operators to a `MultiStream`. It maintains a list of `StreamingOperator`s and applies them in order to the `MultiStream`.
375
  """
376
 
377
+ max_steps = None
378
+
379
  steps: List[StreamingOperator] = field(default_factory=list)
380
 
381
+ def num_steps(self) -> int:
382
+ return len(self.steps)
383
+
384
+ def set_max_steps(self, max_steps):
385
+ assert (
386
+ max_steps <= self.num_steps()
387
+ ), f"Max steps requested ({max_steps}) is larger than defined steps {self.num_steps()}"
388
+ assert max_steps >= 1, f"Max steps requested ({max_steps}) is less than 1"
389
+ self.max_steps = max_steps
390
+
391
+ def get_last_step_description(self):
392
+ last_step = (
393
+ self.max_steps - 1 if self.max_steps is not None else len(self.steps) - 1
394
+ )
395
+ description = str(self.steps[last_step])
396
+ return re.sub(r"\w+=None, ", "", description)
397
+
398
+ def _get_max_steps(self):
399
+ return self.max_steps if self.max_steps is not None else len(self.steps)
400
+
401
  def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
402
+ for operator in self.steps[0 : self._get_max_steps()]:
403
  multi_stream = operator(multi_stream)
404
  return multi_stream
405
 
406
 
407
+ class SourceSequentialOperator(SequentialOperator):
408
+ """A class representing a source sequential operator in the streaming system.
 
409
 
410
  A source sequential operator is a type of `SequntialOperator` that starts with a source operator. The first operator in its list of steps is a `StreamSource`, which generates the initial `MultiStream` that the other operators then process.
411
  """
 
414
  return super().__call__()
415
 
416
  def process(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
417
+ assert (
418
+ self.num_steps() > 0
419
+ ), "Calling process on a SourceSequentialOperator without any steps"
420
  multi_stream = self.steps[0]()
421
+ for operator in self.steps[1 : self._get_max_steps()]:
422
  multi_stream = operator(multi_stream)
423
  return multi_stream
424
 
425
 
426
+ class SequentialOperatorInitilizer(SequentialOperator):
427
+ """A class representing a sequential operator initializer in the streaming system.
 
428
 
429
  A sequential operator initializer is a type of `SequntialOperator` that starts with a stream initializer operator. The first operator in its list of steps is a `StreamInitializerOperator`, which generates the initial `MultiStream` based on the provided arguments and keyword arguments.
430
  """
 
434
  return self.process(*args, **kwargs)
435
 
436
  def process(self, *args, **kwargs) -> MultiStream:
437
+ assert (
438
+ self.num_steps() > 0
439
+ ), "Calling process on a SequentialOperatorInitilizer without any steps"
440
+
441
  assert isinstance(
442
  self.steps[0], StreamInitializerOperator
443
+ ), "The first step in a SequentialOperatorInitilizer must be a StreamInitializerOperator"
444
  multi_stream = self.steps[0](*args, **kwargs)
445
+ for operator in self.steps[1 : self._get_max_steps()]:
446
  multi_stream = operator(multi_stream)
447
  return multi_stream