Elron commited on
Commit
111e0b1
1 Parent(s): c8826c9

Upload operator.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operator.py +12 -2
operator.py CHANGED
@@ -166,6 +166,14 @@ class StreamInitializerOperator(SourceOperator):
166
  pass
167
 
168
 
 
 
 
 
 
 
 
 
169
  class MultiStreamOperator(StreamingOperator):
170
  """A class representing a multi-stream operator in the streaming system.
171
 
@@ -198,7 +206,7 @@ class MultiStreamOperator(StreamingOperator):
198
  pass
199
 
200
  def process_instance(self, instance, stream_name="tmp"):
201
- multi_stream = MultiStream({stream_name: [instance]})
202
  processed_multi_stream = self(multi_stream)
203
  return next(iter(processed_multi_stream[stream_name]))
204
 
@@ -269,7 +277,9 @@ class SingleStreamOperator(MultiStreamOperator):
269
  pass
270
 
271
  def process_instance(self, instance, stream_name="tmp"):
272
- processed_stream = self._process_single_stream([instance], stream_name)
 
 
273
  return next(iter(processed_stream))
274
 
275
 
 
166
  pass
167
 
168
 
169
+ def instance_generator(instance):
170
+ yield instance
171
+
172
+
173
+ def stream_single(instance: Dict[str, Any]) -> Stream:
174
+ return Stream(generator=instance_generator, gen_kwargs={"instance": instance})
175
+
176
+
177
  class MultiStreamOperator(StreamingOperator):
178
  """A class representing a multi-stream operator in the streaming system.
179
 
 
206
  pass
207
 
208
  def process_instance(self, instance, stream_name="tmp"):
209
+ multi_stream = MultiStream({stream_name: stream_single(instance)})
210
  processed_multi_stream = self(multi_stream)
211
  return next(iter(processed_multi_stream[stream_name]))
212
 
 
277
  pass
278
 
279
  def process_instance(self, instance, stream_name="tmp"):
280
+ processed_stream = self._process_single_stream(
281
+ stream_single(instance), stream_name
282
+ )
283
  return next(iter(processed_stream))
284
 
285