File size: 935 Bytes
2fb2317
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from .operator import StreamInstanceOperator

from typing import List, Dict, Any


class NormalizeListFields(StreamInstanceOperator):
    fields: List[str]
    key_prefix: str = ""
    empty_value: str = ""
    separator: str = ", "

    def process(self, instance: Dict[str, Any], stream_name: str = None) -> Dict[str, Any]:
        for field in self.fields:
            assert field in instance, f"Field {field} not found in instance {instance}"
            assert isinstance(instance[field], list), f"Field {field} should be a list, got {type(instance[field])}"

            target_key = self.key_prefix + field

            if len(instance[field]) == 0:
                instance[target_key] = self.empty_value
            elif len(instance[field]) == 1:
                instance[target_key] = instance[field][0]
            else:
                instance[target_key] = self.separator.join(instance[field])

        return instance