Elron commited on
Commit
649f9a8
1 Parent(s): 916e075

Upload operators.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. operators.py +82 -34
operators.py CHANGED
@@ -61,6 +61,7 @@ from .operator import (
61
  MultiStream,
62
  MultiStreamOperator,
63
  PagedStreamOperator,
 
64
  SingleStreamOperator,
65
  SingleStreamReducer,
66
  StreamingOperator,
@@ -880,6 +881,56 @@ class TakeByField(StreamInstanceOperator):
880
  return instance
881
 
882
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
883
  class CopyFields(FieldOperator):
884
  """Copies values from specified fields to specified fields.
885
 
@@ -1041,24 +1092,23 @@ class ApplyOperatorsField(StreamInstanceOperator, ArtifactFetcherMixin):
1041
  """Applies value operators to each instance in a stream based on specified fields.
1042
 
1043
  Args:
1044
- inputs_fields (List[str]): list of field names, the values in which are to be processed
1045
- fields_to_treat_as_list (List[str]): sublist of input_fields, each member of this sublist is supposed to contain
1046
- a list of values, each of which is to be processed.
1047
- operators_field (str): name of the field that contains the list of names of the operators to be applied,
1048
- one after the other, for the processing.
1049
  default_operators (List[str]): A list of default operators to be used if no operators are found in the instance.
1050
 
1051
  Example:
1052
- when instance {"a": 111, "b": 2, "c": ["processors.to_string", "processors.first_character"]} is processed by operator:
1053
- operator = ApplyOperatorsField(inputs_fields=["a"], operators_field="c", default_operators=["add"]),
1054
- the resulting instance is: {"a": "1", "b": 2, "c": ["processors.to_string", "processors.first_character"]}
 
 
1055
 
1056
  """
1057
 
1058
- inputs_fields: List[str]
1059
  operators_field: str
1060
  default_operators: List[str] = None
1061
- fields_to_treat_as_list: List[str] = NonPositionalField(default_factory=list)
1062
 
1063
  def process(
1064
  self, instance: Dict[str, Any], stream_name: Optional[str] = None
@@ -1072,17 +1122,11 @@ class ApplyOperatorsField(StreamInstanceOperator, ArtifactFetcherMixin):
1072
 
1073
  if isinstance(operator_names, str):
1074
  operator_names = [operator_names]
 
1075
 
1076
- for name in operator_names:
1077
- operator = self.get_artifact(name)
1078
- for field_name in self.inputs_fields:
1079
- value = instance[field_name]
1080
- if field_name in self.fields_to_treat_as_list:
1081
- instance[field_name] = [operator.process(v) for v in value]
1082
- else:
1083
- instance[field_name] = operator.process(value)
1084
-
1085
- return instance
1086
 
1087
 
1088
  class FilterByCondition(SingleStreamOperator):
@@ -1283,7 +1327,7 @@ class ExtractMostCommonFieldValues(MultiStreamOperator):
1283
 
1284
  def process(self, multi_stream: MultiStream) -> MultiStream:
1285
  stream = multi_stream[self.stream_name]
1286
- all_values = []
1287
  for instance in stream:
1288
  if (not isinstance(instance[self.field], list)) and (
1289
  self.process_every_value is True
@@ -1295,21 +1339,21 @@ class ExtractMostCommonFieldValues(MultiStreamOperator):
1295
  self.process_every_value is False
1296
  ):
1297
  # either not a list, or is a list but process_every_value == False : view contetns of 'field' as one entity whose occurrences are counted.
1298
- all_values.append(
1299
- (*instance[self.field],)
1300
  if isinstance(instance[self.field], list)
1301
- else instance[self.field]
1302
  ) # convert to a tuple if list, to enable the use of Counter which would not accept
1303
- # a list as an entity to count its occurrences
1304
  else:
1305
  # content of 'field' is a list and process_every_value == True: add one occurrence on behalf of each individual value
1306
- all_values.extend(instance[self.field])
1307
- counter = Counter(
1308
- all_values
1309
- ) # here all_values is a list of individual values, or tupples. Hence, Counter is feasible
1310
  values_and_counts = counter.most_common()
1311
  if self.overall_top_frequency_percent < 100:
1312
- top_frequency = len(all_values) * self.overall_top_frequency_percent / 100.0
 
 
1313
  sum_counts = 0
1314
  for _i, p in enumerate(values_and_counts):
1315
  sum_counts += p[1]
@@ -1317,7 +1361,7 @@ class ExtractMostCommonFieldValues(MultiStreamOperator):
1317
  break
1318
  values_and_counts = counter.most_common(_i + 1)
1319
  if self.min_frequency_percent > 0:
1320
- min_frequency = self.min_frequency_percent * len(all_values) / 100.0
1321
  while values_and_counts[-1][1] < min_frequency:
1322
  values_and_counts.pop()
1323
  values_to_keep = [
@@ -1712,14 +1756,18 @@ class LengthBalancer(DeterministicBalancer):
1712
 
1713
  Args:
1714
  segments_boundaries (List[int]): distinct integers sorted in increasing order, that maps a given total length
1715
- into the index of the least of them that exceeds the total length. (If none exceeds -- into one index
1716
- beyond, namely, the length of segments_boudaries)
1717
 
1718
  fields (Optional, List[str])
1719
 
1720
  Example:
1721
  when input [{"a": [1, 3], "b": 0, "id": 0}, {"a": [1, 3], "b": 0, "id": 1}, {"a": [], "b": "a", "id": 2}] is fed into
1722
- LengthBalancer(fields=["a"], segments_boundaries=[1])
 
 
 
 
1723
  input instances will be counted and balanced against two categories: empty total length (less than 1), and non-empty.
1724
  """
1725
 
 
61
  MultiStream,
62
  MultiStreamOperator,
63
  PagedStreamOperator,
64
+ SequentialOperator,
65
  SingleStreamOperator,
66
  SingleStreamReducer,
67
  StreamingOperator,
 
881
  return instance
882
 
883
 
884
+ class Perturbate(FieldOperator):
885
+ """Slightly perturbates the contents of 'field'. Could be Handy for imitating prediction from given target.
886
+
887
+ When task was classification, argument 'select_from' can be used to list the other potential classes, as a
888
+ relevant perturbation
889
+ """
890
+
891
+ select_from: List[Any] = []
892
+ percentage_to_perturbate: int = 1 # 1 percent
893
+
894
+ def verify(self):
895
+ assert (
896
+ 0 <= self.percentage_to_perturbate and self.percentage_to_perturbate <= 100
897
+ ), f"'percentage_to_perturbate' should be in the range 0..100. Received {self.percentage_to_perturbate}"
898
+
899
+ def prepare(self):
900
+ super().prepare()
901
+ self.random_generator = new_random_generator(sub_seed="CopyWithPerturbation")
902
+
903
+ def process_value(self, value: Any) -> Any:
904
+ perturbate = (
905
+ self.random_generator.randint(1, 100) <= self.percentage_to_perturbate
906
+ )
907
+ if not perturbate:
908
+ return value
909
+
910
+ if value in self.select_from:
911
+ # 80% of cases, return a decent class, otherwise, perturbate the value itself as follows
912
+ if self.random_generator.random() < 0.8:
913
+ return self.random_generator.choice(self.select_from)
914
+
915
+ if isinstance(value, float):
916
+ return value * (0.5 + self.random_generator.random())
917
+
918
+ if isinstance(value, int):
919
+ perturb = 1 if self.random_generator.random() < 0.5 else -1
920
+ return value + perturb
921
+
922
+ if isinstance(value, str):
923
+ if len(value) < 2:
924
+ # give up perturbation
925
+ return value
926
+ # throw one char out
927
+ prefix_len = self.random_generator.randint(1, len(value) - 1)
928
+ return value[:prefix_len] + value[prefix_len + 1 :]
929
+
930
+ # and in any other case:
931
+ return value
932
+
933
+
934
  class CopyFields(FieldOperator):
935
  """Copies values from specified fields to specified fields.
936
 
 
1092
  """Applies value operators to each instance in a stream based on specified fields.
1093
 
1094
  Args:
1095
+ operators_field (str): name of the field that contains a single name, or a list of names, of the operators to be applied,
1096
+ one after the other, for the processing of the instance. Each operator is equipped with 'process_instance()'
1097
+ method.
1098
+
 
1099
  default_operators (List[str]): A list of default operators to be used if no operators are found in the instance.
1100
 
1101
  Example:
1102
+ when instance {"prediction": 111, "references": [222, 333] , "c": ["processors.to_string", "processors.first_character"]}
1103
+ is processed by operator (please look up the catalog that these operators, they are tuned to process fields "prediction" and
1104
+ "references"):
1105
+ operator = ApplyOperatorsField(operators_field="c"),
1106
+ the resulting instance is: {"prediction": "1", "references": ["2", "3"], "c": ["processors.to_string", "processors.first_character"]}
1107
 
1108
  """
1109
 
 
1110
  operators_field: str
1111
  default_operators: List[str] = None
 
1112
 
1113
  def process(
1114
  self, instance: Dict[str, Any], stream_name: Optional[str] = None
 
1122
 
1123
  if isinstance(operator_names, str):
1124
  operator_names = [operator_names]
1125
+ # otherwise , operator_names is already a list
1126
 
1127
+ # we now have a list of nanes of operators, each is equipped with process_instance method.
1128
+ operator = SequentialOperator(steps=operator_names)
1129
+ return operator.process_instance(instance)
 
 
 
 
 
 
 
1130
 
1131
 
1132
  class FilterByCondition(SingleStreamOperator):
 
1327
 
1328
  def process(self, multi_stream: MultiStream) -> MultiStream:
1329
  stream = multi_stream[self.stream_name]
1330
+ counter = Counter()
1331
  for instance in stream:
1332
  if (not isinstance(instance[self.field], list)) and (
1333
  self.process_every_value is True
 
1339
  self.process_every_value is False
1340
  ):
1341
  # either not a list, or is a list but process_every_value == False : view contetns of 'field' as one entity whose occurrences are counted.
1342
+ counter.update(
1343
+ [(*instance[self.field],)]
1344
  if isinstance(instance[self.field], list)
1345
+ else [instance[self.field]]
1346
  ) # convert to a tuple if list, to enable the use of Counter which would not accept
1347
+ # a list as an hashable entity to count its occurrences
1348
  else:
1349
  # content of 'field' is a list and process_every_value == True: add one occurrence on behalf of each individual value
1350
+ counter.update(instance[self.field])
1351
+ # here counter counts occurrences of individual values, or tupples.
 
 
1352
  values_and_counts = counter.most_common()
1353
  if self.overall_top_frequency_percent < 100:
1354
+ top_frequency = (
1355
+ sum(counter.values()) * self.overall_top_frequency_percent / 100.0
1356
+ )
1357
  sum_counts = 0
1358
  for _i, p in enumerate(values_and_counts):
1359
  sum_counts += p[1]
 
1361
  break
1362
  values_and_counts = counter.most_common(_i + 1)
1363
  if self.min_frequency_percent > 0:
1364
+ min_frequency = self.min_frequency_percent * sum(counter.values()) / 100.0
1365
  while values_and_counts[-1][1] < min_frequency:
1366
  values_and_counts.pop()
1367
  values_to_keep = [
 
1756
 
1757
  Args:
1758
  segments_boundaries (List[int]): distinct integers sorted in increasing order, that maps a given total length
1759
+ into the index of the least of them that exceeds the total length. (If none exceeds -- into one index
1760
+ beyond, namely, the length of segments_boudaries)
1761
 
1762
  fields (Optional, List[str])
1763
 
1764
  Example:
1765
  when input [{"a": [1, 3], "b": 0, "id": 0}, {"a": [1, 3], "b": 0, "id": 1}, {"a": [], "b": "a", "id": 2}] is fed into
1766
+
1767
+ .. code-block::
1768
+
1769
+ LengthBalancer(fields=["a"], segments_boundaries=[1])
1770
+
1771
  input instances will be counted and balanced against two categories: empty total length (less than 1), and non-empty.
1772
  """
1773