Elron commited on
Commit
b868ef2
1 Parent(s): f61017e

Upload operators.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operators.py +17 -4
operators.py CHANGED
@@ -144,6 +144,8 @@ class FieldOperator(StreamInstanceOperator):
144
  field_to_field: Optional[Union[List[Tuple[str, str]], Dict[str, str]]] = None
145
  process_every_value: bool = False
146
  use_query: bool = False
 
 
147
 
148
  def verify(self):
149
  super().verify()
@@ -175,7 +177,13 @@ class FieldOperator(StreamInstanceOperator):
175
  def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
176
  for from_field, to_field in self._field_to_field:
177
  try:
178
- old_value = dict_get(instance, from_field, use_dpath=self.use_query)
 
 
 
 
 
 
179
  except TypeError as e:
180
  raise TypeError(f"Failed to get {from_field} from {instance}")
181
  if self.process_every_value:
@@ -480,7 +488,7 @@ class ArtifactFetcherMixin:
480
  return cls.cache[artifact_identifier]
481
 
482
 
483
- class ApplyValueOperatorsField(StreamInstanceOperator, ArtifactFetcherMixin):
484
  """
485
  Applies value operators to each instance in a stream based on specified fields.
486
 
@@ -490,7 +498,7 @@ class ApplyValueOperatorsField(StreamInstanceOperator, ArtifactFetcherMixin):
490
  default_operators (List[str]): A list of default operators to be used if no operators are found in the instance.
491
  """
492
 
493
- value_field: str
494
  operators_field: str
495
  default_operators: List[str] = None
496
 
@@ -507,7 +515,12 @@ class ApplyValueOperatorsField(StreamInstanceOperator, ArtifactFetcherMixin):
507
 
508
  for name in operator_names:
509
  operator = self.get_artifact(name)
510
- instance = operator(instance, self.value_field)
 
 
 
 
 
511
 
512
  return instance
513
 
 
144
  field_to_field: Optional[Union[List[Tuple[str, str]], Dict[str, str]]] = None
145
  process_every_value: bool = False
146
  use_query: bool = False
147
+ get_default: Any = None
148
+ not_exist_ok: bool = False
149
 
150
  def verify(self):
151
  super().verify()
 
177
  def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
178
  for from_field, to_field in self._field_to_field:
179
  try:
180
+ old_value = dict_get(
181
+ instance,
182
+ from_field,
183
+ use_dpath=self.use_query,
184
+ default=self.get_default,
185
+ not_exist_ok=self.not_exist_ok,
186
+ )
187
  except TypeError as e:
188
  raise TypeError(f"Failed to get {from_field} from {instance}")
189
  if self.process_every_value:
 
488
  return cls.cache[artifact_identifier]
489
 
490
 
491
+ class ApplyOperatorsField(StreamInstanceOperator, ArtifactFetcherMixin):
492
  """
493
  Applies value operators to each instance in a stream based on specified fields.
494
 
 
498
  default_operators (List[str]): A list of default operators to be used if no operators are found in the instance.
499
  """
500
 
501
+ inputs_fields: str
502
  operators_field: str
503
  default_operators: List[str] = None
504
 
 
515
 
516
  for name in operator_names:
517
  operator = self.get_artifact(name)
518
+ for field in self.inputs_fields:
519
+ value = instance[field]
520
+ if isinstance(value, list):
521
+ instance[field] = [operator.process(v) for v in value]
522
+ else:
523
+ instance[field] = operator.process(instance[field])
524
 
525
  return instance
526