Elron commited on
Commit
36b5223
·
verified ·
1 Parent(s): 14e53f7

Upload operator.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operator.py +12 -14
operator.py CHANGED
@@ -57,26 +57,24 @@ class StreamingOperator(Artifact):
57
  """
58
 
59
 
60
- class StreamSource(StreamingOperator):
61
- """A class representing a stream source operator in the streaming system.
62
 
63
- A stream source operator is a special type of `StreamingOperator` that generates a data stream without taking any input streams. It serves as the starting point in a stream processing pipeline, providing the initial data that other operators in the pipeline can process.
64
-
65
- When called, a `StreamSource` should generate a `MultiStream`. This behavior must be implemented by any classes that inherit from `StreamSource`.
66
-
67
- """
68
 
69
  @abstractmethod
70
- def __call__(self) -> MultiStream:
71
  pass
72
 
73
 
74
- class SourceOperator(StreamSource):
75
  """A class representing a source operator in the streaming system.
76
 
77
  A source operator is responsible for generating the data stream from some source, such as a database or a file.
78
  This is the starting point of a stream processing pipeline.
79
- The `SourceOperator` class is a type of `StreamSource`, which is a special type of `StreamingOperator`
80
  that generates an output stream but does not take any input streams.
81
 
82
  When called, a `SourceOperator` invokes its `process` method, which should be implemented by all subclasses
@@ -86,7 +84,7 @@ class SourceOperator(StreamSource):
86
 
87
  caching: bool = NonPositionalField(default=None)
88
 
89
- def __call__(self) -> MultiStream:
90
  multi_stream = self.process()
91
  if self.caching is not None:
92
  multi_stream.set_caching(self.caching)
@@ -97,10 +95,10 @@ class SourceOperator(StreamSource):
97
  pass
98
 
99
 
100
- class StreamInitializerOperator(StreamSource):
101
  """A class representing a stream initializer operator in the streaming system.
102
 
103
- A stream initializer operator is a special type of `StreamSource` that is capable of taking parameters during the stream generation process. This can be useful in situations where the stream generation process needs to be customized or configured based on certain parameters.
104
 
105
  When called, a `StreamInitializerOperator` invokes its `process` method, passing any supplied arguments and keyword arguments. The `process` method should be implemented by all subclasses to generate the required `MultiStream` based on the given arguments and keyword arguments.
106
 
@@ -445,7 +443,7 @@ class SourceSequentialOperator(SequentialOperator):
445
  """A class representing a source sequential operator in the streaming system.
446
 
447
  A source sequential operator is a type of `SequentialOperator` that starts with a source operator.
448
- The first operator in its list of steps is a `StreamSource`, which generates the initial `MultiStream`
449
  that the other operators then process.
450
  """
451
 
 
57
  """
58
 
59
 
60
+ class SideEffectOperator(StreamingOperator):
61
+ """Base class for operators that does not affect the stream."""
62
 
63
+ def __call__(self, streams: Optional[MultiStream] = None) -> MultiStream:
64
+ self.process()
65
+ return streams
 
 
66
 
67
  @abstractmethod
68
+ def process() -> None:
69
  pass
70
 
71
 
72
+ class SourceOperator(StreamingOperator):
73
  """A class representing a source operator in the streaming system.
74
 
75
  A source operator is responsible for generating the data stream from some source, such as a database or a file.
76
  This is the starting point of a stream processing pipeline.
77
+ The `SourceOperator` class is a type of `SourceOperator`, which is a special type of `StreamingOperator`
78
  that generates an output stream but does not take any input streams.
79
 
80
  When called, a `SourceOperator` invokes its `process` method, which should be implemented by all subclasses
 
84
 
85
  caching: bool = NonPositionalField(default=None)
86
 
87
+ def __call__(self, multi_stream: Optional[MultiStream] = None) -> MultiStream:
88
  multi_stream = self.process()
89
  if self.caching is not None:
90
  multi_stream.set_caching(self.caching)
 
95
  pass
96
 
97
 
98
+ class StreamInitializerOperator(SourceOperator):
99
  """A class representing a stream initializer operator in the streaming system.
100
 
101
+ A stream initializer operator is a special type of `SourceOperator` that is capable of taking parameters during the stream generation process. This can be useful in situations where the stream generation process needs to be customized or configured based on certain parameters.
102
 
103
  When called, a `StreamInitializerOperator` invokes its `process` method, passing any supplied arguments and keyword arguments. The `process` method should be implemented by all subclasses to generate the required `MultiStream` based on the given arguments and keyword arguments.
104
 
 
443
  """A class representing a source sequential operator in the streaming system.
444
 
445
  A source sequential operator is a type of `SequentialOperator` that starts with a source operator.
446
+ The first operator in its list of steps is a `SourceOperator`, which generates the initial `MultiStream`
447
  that the other operators then process.
448
  """
449