File size: 935 Bytes
2fb2317 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
from .operator import StreamInstanceOperator
from typing import List, Dict, Any
class NormalizeListFields(StreamInstanceOperator):
fields: List[str]
key_prefix: str = ""
empty_value: str = ""
separator: str = ", "
def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
for field in self.fields:
assert field in instance, f"Field {field} not found in instance {instance}"
assert isinstance(instance[field], list), f"Field {field} should be a list, got {type(instance[field])}"
target_key = self.key_prefix + field
if len(instance[field]) == 0:
instance[target_key] = self.empty_value
elif len(instance[field]) == 1:
instance[target_key] = instance[field][0]
else:
instance[target_key] = self.separator.join(instance[field])
return instance
|