metric / collections_operators.py
Elron's picture
Upload folder using huggingface_hub
0a1b314 verified
raw
history blame contribute delete
No virus
3.03 kB
from copy import deepcopy
from typing import Any, Generator, List, Optional
from .operators import FieldOperator, StreamOperator
from .stream import Stream
class Dictify(FieldOperator):
with_keys: List[str]
def process_value(self, tup: Any) -> Any:
return dict(zip(self.with_keys, tup))
class Wrap(FieldOperator):
inside: str
def verify(self):
super().verify()
if self.inside not in ["list", "tuple", "set"]:
raise ValueError(
f"Wrap.inside support only types: [list, tuple, set], got {self.inside}"
)
def process_value(self, value: Any) -> Any:
if self.inside == "list":
return [value]
if self.inside == "tuple":
return (value,)
return {
value,
}
class Chunk(FieldOperator):
size: int
def process_value(self, collection: Any) -> Any:
return [
collection[i : i + self.size] for i in range(0, len(collection), self.size)
]
class Slice(FieldOperator):
start: Optional[int] = None
stop: Optional[int] = None
step: Optional[int] = None
def process_value(self, collection: Any) -> Any:
slicer = slice(self.start, self.stop, self.step)
return collection[slicer]
class Get(FieldOperator):
item: Any
def process_value(self, collection: Any) -> Any:
return collection[self.item]
class DuplicateByList(StreamOperator):
field: str
to_field: Optional[str] = None
use_deep_copy: bool = False
def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
to_field = self.field if self.to_field is None else self.to_field
for instance in stream:
elements = instance[self.field]
for element in elements:
if self.use_deep_copy:
instance_copy = deepcopy(instance)
instance_copy[to_field] = element
else:
instance_copy = {
**instance,
self.field: elements,
to_field: element,
}
yield instance_copy
class DuplicateBySubLists(StreamOperator):
field: str
to_field: Optional[str] = None
use_deep_copy: bool = False
def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator:
to_field = self.field if self.to_field is None else self.to_field
for instance in stream:
elements = instance[self.field]
for i in range(1, len(elements) + 1):
if self.use_deep_copy:
instance_copy = deepcopy(instance)
instance_copy[to_field] = elements[:i]
else:
instance_copy = {
**instance,
self.field: elements,
to_field: elements[:i],
}
yield instance_copy