Elron commited on
Commit
a19f8c1
1 Parent(s): fbd19c3

Upload operator.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operator.py +21 -1
operator.py CHANGED
@@ -155,6 +155,9 @@ class SingleStreamOperator(MultiStreamOperator):
155
  A single-stream operator is a type of `MultiStreamOperator` that operates on individual `Stream` objects within a `MultiStream`. It iterates through each `Stream` in the `MultiStream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each `Stream`.
156
  """
157
 
 
 
 
158
  def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
159
  result = {}
160
  for stream_name, stream in multi_stream.items():
@@ -167,8 +170,25 @@ class SingleStreamOperator(MultiStreamOperator):
167
  def _process_single_stream(self, stream: Stream, stream_name: str = None) -> Stream:
168
  return Stream(self._process_stream, gen_kwargs={"stream": stream, "stream_name": stream_name})
169
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
170
  def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
171
- yield from self.process(stream, stream_name)
 
 
 
172
 
173
  @abstractmethod
174
  def process(self, stream: Stream, stream_name: str = None) -> Generator:
 
155
  A single-stream operator is a type of `MultiStreamOperator` that operates on individual `Stream` objects within a `MultiStream`. It iterates through each `Stream` in the `MultiStream` and applies the `process` method. The `process` method should be implemented by subclasses to define the specific operations to be performed on each `Stream`.
156
  """
157
 
158
+ apply_to_streams: List[str] = NonPositionalField(default=None) # None apply to all streams
159
+ dont_apply_to_streams: List[str] = NonPositionalField(default_factory=list)
160
+
161
  def _process_multi_stream(self, multi_stream: MultiStream) -> MultiStream:
162
  result = {}
163
  for stream_name, stream in multi_stream.items():
 
170
  def _process_single_stream(self, stream: Stream, stream_name: str = None) -> Stream:
171
  return Stream(self._process_stream, gen_kwargs={"stream": stream, "stream_name": stream_name})
172
 
173
+ def is_should_be_processed(self, stream_name):
174
+ if (
175
+ self.apply_to_streams is not None
176
+ and stream_name in self.apply_to_streams
177
+ and stream_name in self.dont_apply_to_streams
178
+ ):
179
+ raise ValueError(
180
+ f"Stream '{stream_name}' can be in either apply_to_streams or dont_apply_to_streams not both."
181
+ )
182
+
183
+ return (
184
+ self.apply_to_streams is None or stream_name in self.apply_to_streams
185
+ ) and stream_name not in self.dont_apply_to_streams
186
+
187
  def _process_stream(self, stream: Stream, stream_name: str = None) -> Generator:
188
+ if self.is_should_be_processed(stream_name):
189
+ yield from self.process(stream, stream_name)
190
+ else:
191
+ yield from stream
192
 
193
  @abstractmethod
194
  def process(self, stream: Stream, stream_name: str = None) -> Generator: