Elron commited on
Commit
916e075
·
verified ·
1 Parent(s): a0fa683

Upload operator.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operator.py +22 -1
operator.py CHANGED
@@ -150,6 +150,11 @@ class MultiStreamOperator(StreamingOperator):
150
  def process(self, multi_stream: MultiStream) -> MultiStream:
151
  pass
152
 
 
 
 
 
 
153
 
154
  class SingleStreamOperator(MultiStreamOperator):
155
  """A class representing a single-stream operator in the streaming system.
@@ -216,6 +221,10 @@ class SingleStreamOperator(MultiStreamOperator):
216
  def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
217
  pass
218
 
 
 
 
 
219
 
220
  class PagedStreamOperator(SingleStreamOperator):
221
  """A class representing a paged-stream operator in the streaming system.
@@ -240,12 +249,21 @@ class PagedStreamOperator(SingleStreamOperator):
240
  if len(page) >= self.page_size:
241
  yield from self.process(page, stream_name)
242
  page = []
 
 
 
 
 
243
  yield from self.process(page, stream_name)
244
 
245
  @abstractmethod
246
  def process(self, page: List[Dict], stream_name: Optional[str] = None) -> Generator:
247
  pass
248
 
 
 
 
 
249
 
250
  class SingleStreamReducer(StreamingOperator):
251
  """A class representing a single-stream reducer in the streaming system.
@@ -298,6 +316,9 @@ class StreamInstanceOperator(SingleStreamOperator):
298
  ) -> Dict[str, Any]:
299
  pass
300
 
 
 
 
301
 
302
  class StreamInstanceOperatorValidator(StreamInstanceOperator):
303
  """A class representing a stream instance operator validator in the streaming system.
@@ -316,7 +337,7 @@ class StreamInstanceOperatorValidator(StreamInstanceOperator):
316
  try:
317
  first_instance = next(iterator)
318
  except StopIteration as e:
319
- raise StopIteration(f"Strem '{stream_name}' is empty") from e
320
  result = self._process_instance(first_instance, stream_name)
321
  self.validate(result)
322
  yield result
 
150
  def process(self, multi_stream: MultiStream) -> MultiStream:
151
  pass
152
 
153
+ def process_instance(self, instance, stream_name="tmp"):
154
+ multi_stream = {stream_name: [instance]}
155
+ processed_multi_stream = self(multi_stream)
156
+ return next(iter(processed_multi_stream[stream_name]))
157
+
158
 
159
  class SingleStreamOperator(MultiStreamOperator):
160
  """A class representing a single-stream operator in the streaming system.
 
221
  def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
222
  pass
223
 
224
+ def process_instance(self, instance, stream_name="tmp"):
225
+ processed_stream = self._process_single_stream([instance], stream_name)
226
+ return next(iter(processed_stream))
227
+
228
 
229
  class PagedStreamOperator(SingleStreamOperator):
230
  """A class representing a paged-stream operator in the streaming system.
 
249
  if len(page) >= self.page_size:
250
  yield from self.process(page, stream_name)
251
  page = []
252
+ yield from self._process_page(page, stream_name)
253
+
254
+ def _process_page(
255
+ self, page: List[Dict], stream_name: Optional[str] = None
256
+ ) -> Generator:
257
  yield from self.process(page, stream_name)
258
 
259
  @abstractmethod
260
  def process(self, page: List[Dict], stream_name: Optional[str] = None) -> Generator:
261
  pass
262
 
263
+ def process_instance(self, instance, stream_name="tmp"):
264
+ processed_stream = self._process_page([instance], stream_name)
265
+ return next(iter(processed_stream))
266
+
267
 
268
  class SingleStreamReducer(StreamingOperator):
269
  """A class representing a single-stream reducer in the streaming system.
 
316
  ) -> Dict[str, Any]:
317
  pass
318
 
319
+ def process_instance(self, instance, stream_name="tmp"):
320
+ return self._process_instance(instance, stream_name)
321
+
322
 
323
  class StreamInstanceOperatorValidator(StreamInstanceOperator):
324
  """A class representing a stream instance operator validator in the streaming system.
 
337
  try:
338
  first_instance = next(iterator)
339
  except StopIteration as e:
340
+ raise StopIteration(f"Stream '{stream_name}' is empty") from e
341
  result = self._process_instance(first_instance, stream_name)
342
  self.validate(result)
343
  yield result