Elron commited on
Commit
d10b65b
1 Parent(s): 4b1b61a

Upload collections_operators.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. collections_operators.py +93 -0
collections_operators.py ADDED
@@ -0,0 +1,93 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from copy import deepcopy
2
+ from typing import Any, Generator, List, Optional
3
+
4
+ from .operators import FieldOperator, SingleStreamOperator
5
+ from .stream import Stream
6
+
7
+
8
+ class Dictify(FieldOperator):
9
+ with_keys: List[str]
10
+
11
+ def process_value(self, tup: Any) -> Any:
12
+ return dict(zip(self.with_keys, tup))
13
+
14
+
15
+ class Wrap(FieldOperator):
16
+ inside: str
17
+
18
+ def verify(self):
19
+ super().verify()
20
+ if self.inside not in ["list", "tuple", "set"]:
21
+ raise ValueError(
22
+ f"Wrap.inside support only types: [list, tuple, set], got {self.inside}"
23
+ )
24
+
25
+ def process_value(self, value: Any) -> Any:
26
+ if self.inside == "list":
27
+ return [value]
28
+ if self.inside == "tuple":
29
+ return (value,)
30
+ return {
31
+ value,
32
+ }
33
+
34
+
35
+ class Slice(FieldOperator):
36
+ start: Optional[int] = None
37
+ stop: Optional[int] = None
38
+ step: Optional[int] = None
39
+
40
+ def process_value(self, collection: Any) -> Any:
41
+ slicer = slice(self.start, self.stop, self.step)
42
+ return collection[slicer]
43
+
44
+
45
+ class Get(FieldOperator):
46
+ item: Any
47
+
48
+ def process_value(self, collection: Any) -> Any:
49
+ return collection[self.item]
50
+
51
+
52
+ class DuplicateByList(SingleStreamOperator):
53
+ field: str
54
+ to_field: Optional[str] = None
55
+ use_deep_copy: bool = False
56
+
57
+ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
58
+ to_field = self.field if self.to_field is None else self.to_field
59
+ for instance in stream:
60
+ elements = instance[self.field]
61
+ for element in elements:
62
+ if self.use_deep_copy:
63
+ instance_copy = deepcopy(instance)
64
+ instance_copy[to_field] = element
65
+ else:
66
+ instance_copy = {
67
+ **instance,
68
+ self.field: elements,
69
+ to_field: element,
70
+ }
71
+ yield instance_copy
72
+
73
+
74
+ class DuplicateBySubLists(SingleStreamOperator):
75
+ field: str
76
+ to_field: Optional[str] = None
77
+ use_deep_copy: bool = False
78
+
79
+ def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
80
+ to_field = self.field if self.to_field is None else self.to_field
81
+ for instance in stream:
82
+ elements = instance[self.field]
83
+ for i in range(1, len(elements) + 1):
84
+ if self.use_deep_copy:
85
+ instance_copy = deepcopy(instance)
86
+ instance_copy[to_field] = elements[:i]
87
+ else:
88
+ instance_copy = {
89
+ **instance,
90
+ self.field: elements,
91
+ to_field: elements[:i],
92
+ }
93
+ yield instance_copy