File size: 999 Bytes
e4d5cd2
2fb2317
22010ee
2fb2317
 
 
 
 
 
 
 
e4d5cd2
 
 
2fb2317
 
e4d5cd2
 
 
2fb2317
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from typing import Any, Dict, List, Optional

from .operator import StreamInstanceOperator


class NormalizeListFields(StreamInstanceOperator):
    fields: List[str]
    key_prefix: str = ""
    empty_value: str = ""
    separator: str = ", "

    def process(
        self, instance: Dict[str, Any], stream_name: Optional[str] = None
    ) -> Dict[str, Any]:
        for field in self.fields:
            assert field in instance, f"Field {field} not found in instance {instance}"
            assert isinstance(
                instance[field], list
            ), f"Field {field} should be a list, got {type(instance[field])}"

            target_key = self.key_prefix + field

            if len(instance[field]) == 0:
                instance[target_key] = self.empty_value
            elif len(instance[field]) == 1:
                instance[target_key] = instance[field][0]
            else:
                instance[target_key] = self.separator.join(instance[field])

        return instance