|
from copy import deepcopy |
|
from typing import Any, Generator, List, Optional |
|
|
|
from .operators import FieldOperator, SingleStreamOperator |
|
from .stream import Stream |
|
|
|
|
|
class Dictify(FieldOperator): |
|
with_keys: List[str] |
|
|
|
def process_value(self, tup: Any) -> Any: |
|
return dict(zip(self.with_keys, tup)) |
|
|
|
|
|
class Wrap(FieldOperator): |
|
inside: str |
|
|
|
def verify(self): |
|
super().verify() |
|
if self.inside not in ["list", "tuple", "set"]: |
|
raise ValueError( |
|
f"Wrap.inside support only types: [list, tuple, set], got {self.inside}" |
|
) |
|
|
|
def process_value(self, value: Any) -> Any: |
|
if self.inside == "list": |
|
return [value] |
|
if self.inside == "tuple": |
|
return (value,) |
|
return { |
|
value, |
|
} |
|
|
|
|
|
class Chunk(FieldOperator): |
|
size: int |
|
|
|
def process_value(self, collection: Any) -> Any: |
|
return [ |
|
collection[i : i + self.size] for i in range(0, len(collection), self.size) |
|
] |
|
|
|
|
|
class Slice(FieldOperator): |
|
start: Optional[int] = None |
|
stop: Optional[int] = None |
|
step: Optional[int] = None |
|
|
|
def process_value(self, collection: Any) -> Any: |
|
slicer = slice(self.start, self.stop, self.step) |
|
return collection[slicer] |
|
|
|
|
|
class Get(FieldOperator): |
|
item: Any |
|
|
|
def process_value(self, collection: Any) -> Any: |
|
return collection[self.item] |
|
|
|
|
|
class DuplicateByList(SingleStreamOperator): |
|
field: str |
|
to_field: Optional[str] = None |
|
use_deep_copy: bool = False |
|
|
|
def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator: |
|
to_field = self.field if self.to_field is None else self.to_field |
|
for instance in stream: |
|
elements = instance[self.field] |
|
for element in elements: |
|
if self.use_deep_copy: |
|
instance_copy = deepcopy(instance) |
|
instance_copy[to_field] = element |
|
else: |
|
instance_copy = { |
|
**instance, |
|
self.field: elements, |
|
to_field: element, |
|
} |
|
yield instance_copy |
|
|
|
|
|
class DuplicateBySubLists(SingleStreamOperator): |
|
field: str |
|
to_field: Optional[str] = None |
|
use_deep_copy: bool = False |
|
|
|
def process(self, stream: Stream, stream_name: Optional[str] = None) -> Generator: |
|
to_field = self.field if self.to_field is None else self.to_field |
|
for instance in stream: |
|
elements = instance[self.field] |
|
for i in range(1, len(elements) + 1): |
|
if self.use_deep_copy: |
|
instance_copy = deepcopy(instance) |
|
instance_copy[to_field] = elements[:i] |
|
else: |
|
instance_copy = { |
|
**instance, |
|
self.field: elements, |
|
to_field: elements[:i], |
|
} |
|
yield instance_copy |
|
|