File size: 5,316 Bytes
938e515
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# Copyright (c) Facebook, Inc. and its affiliates.
from typing import Any, Dict, Optional, Tuple


class EntrySelector:
    """
    Base class for entry selectors
    """

    @staticmethod
    def from_string(spec: str) -> "EntrySelector":
        if spec == "*":
            return AllEntrySelector()
        return FieldEntrySelector(spec)


class AllEntrySelector(EntrySelector):
    """
    Selector that accepts all entries
    """

    SPECIFIER = "*"

    def __call__(self, entry):
        return True


class FieldEntrySelector(EntrySelector):
    """
    Selector that accepts only entries that match provided field
    specifier(s). Only a limited set of specifiers is supported for now:
      <specifiers>::=<specifier>[<comma><specifiers>]
      <specifier>::=<field_name>[<type_delim><type>]<equal><value_or_range>
      <field_name> is a valid identifier
      <type> ::= "int" | "str"
      <equal> ::= "="
      <comma> ::= ","
      <type_delim> ::= ":"
      <value_or_range> ::= <value> | <range>
      <range> ::= <value><range_delim><value>
      <range_delim> ::= "-"
      <value> is a string without spaces and special symbols
        (e.g. <comma>, <equal>, <type_delim>, <range_delim>)
    """

    _SPEC_DELIM = ","
    _TYPE_DELIM = ":"
    _RANGE_DELIM = "-"
    _EQUAL = "="
    _ERROR_PREFIX = "Invalid field selector specifier"

    class _FieldEntryValuePredicate:
        """
        Predicate that checks strict equality for the specified entry field
        """

        def __init__(self, name: str, typespec: Optional[str], value: str):
            import builtins

            self.name = name
            self.type = getattr(builtins, typespec) if typespec is not None else str
            self.value = value

        def __call__(self, entry):
            return entry[self.name] == self.type(self.value)

    class _FieldEntryRangePredicate:
        """
        Predicate that checks whether an entry field falls into the specified range
        """

        def __init__(self, name: str, typespec: Optional[str], vmin: str, vmax: str):
            import builtins

            self.name = name
            self.type = getattr(builtins, typespec) if typespec is not None else str
            self.vmin = vmin
            self.vmax = vmax

        def __call__(self, entry):
            return (entry[self.name] >= self.type(self.vmin)) and (
                entry[self.name] <= self.type(self.vmax)
            )

    def __init__(self, spec: str):
        self._predicates = self._parse_specifier_into_predicates(spec)

    def __call__(self, entry: Dict[str, Any]):
        for predicate in self._predicates:
            if not predicate(entry):
                return False
        return True

    def _parse_specifier_into_predicates(self, spec: str):
        predicates = []
        specs = spec.split(self._SPEC_DELIM)
        for subspec in specs:
            eq_idx = subspec.find(self._EQUAL)
            if eq_idx > 0:
                field_name_with_type = subspec[:eq_idx]
                field_name, field_type = self._parse_field_name_type(field_name_with_type)
                field_value_or_range = subspec[eq_idx + 1 :]
                if self._is_range_spec(field_value_or_range):
                    vmin, vmax = self._get_range_spec(field_value_or_range)
                    predicate = FieldEntrySelector._FieldEntryRangePredicate(
                        field_name, field_type, vmin, vmax
                    )
                else:
                    predicate = FieldEntrySelector._FieldEntryValuePredicate(
                        field_name, field_type, field_value_or_range
                    )
                predicates.append(predicate)
            elif eq_idx == 0:
                self._parse_error(f'"{subspec}", field name is empty!')
            else:
                self._parse_error(f'"{subspec}", should have format ' "<field>=<value_or_range>!")
        return predicates

    def _parse_field_name_type(self, field_name_with_type: str) -> Tuple[str, Optional[str]]:
        type_delim_idx = field_name_with_type.find(self._TYPE_DELIM)
        if type_delim_idx > 0:
            field_name = field_name_with_type[:type_delim_idx]
            field_type = field_name_with_type[type_delim_idx + 1 :]
        elif type_delim_idx == 0:
            self._parse_error(f'"{field_name_with_type}", field name is empty!')
        else:
            field_name = field_name_with_type
            field_type = None
        # pyre-fixme[61]: `field_name` may not be initialized here.
        # pyre-fixme[61]: `field_type` may not be initialized here.
        return field_name, field_type

    def _is_range_spec(self, field_value_or_range):
        delim_idx = field_value_or_range.find(self._RANGE_DELIM)
        return delim_idx > 0

    def _get_range_spec(self, field_value_or_range):
        if self._is_range_spec(field_value_or_range):
            delim_idx = field_value_or_range.find(self._RANGE_DELIM)
            vmin = field_value_or_range[:delim_idx]
            vmax = field_value_or_range[delim_idx + 1 :]
            return vmin, vmax
        else:
            self._parse_error('"field_value_or_range", range of values expected!')

    def _parse_error(self, msg):
        raise ValueError(f"{self._ERROR_PREFIX}: {msg}")