File size: 8,948 Bytes
df6c67d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import threading
from contextlib import contextmanager
from datetime import datetime
from typing import Generator, List, Optional, OrderedDict, Union

import redis.lock

from inference.core import logger
from inference.core.active_learning.entities import StrategyLimit, StrategyLimitType
from inference.core.active_learning.utils import TIMESTAMP_FORMAT
from inference.core.cache.base import BaseCache

MAX_LOCK_TIME = 5
SECONDS_IN_HOUR = 60 * 60
USAGE_KEY = "usage"

LIMIT_TYPE2KEY_INFIX_GENERATOR = {
    StrategyLimitType.MINUTELY: lambda: f"minute_{datetime.utcnow().minute}",
    StrategyLimitType.HOURLY: lambda: f"hour_{datetime.utcnow().hour}",
    StrategyLimitType.DAILY: lambda: f"day_{datetime.utcnow().strftime(TIMESTAMP_FORMAT)}",
}
LIMIT_TYPE2KEY_EXPIRATION = {
    StrategyLimitType.MINUTELY: 120,
    StrategyLimitType.HOURLY: 2 * SECONDS_IN_HOUR,
    StrategyLimitType.DAILY: 25 * SECONDS_IN_HOUR,
}


def use_credit_of_matching_strategy(
    cache: BaseCache,
    workspace: str,
    project: str,
    matching_strategies_limits: OrderedDict[str, List[StrategyLimit]],
) -> Optional[str]:
    # In scope of this function, cache keys updates regarding usage limits for
    # specific :workspace and :project are locked - to ensure increment to be done atomically
    # Limits are accounted at the moment of registration - which may introduce inaccuracy
    # given that registration is postponed from prediction
    # Returns: strategy with spare credit if found - else None
    with lock_limits(cache=cache, workspace=workspace, project=project):
        strategy_with_spare_credit = find_strategy_with_spare_usage_credit(
            cache=cache,
            workspace=workspace,
            project=project,
            matching_strategies_limits=matching_strategies_limits,
        )
        if strategy_with_spare_credit is None:
            return None
        consume_strategy_limits_usage_credit(
            cache=cache,
            workspace=workspace,
            project=project,
            strategy_name=strategy_with_spare_credit,
        )
        return strategy_with_spare_credit


def return_strategy_credit(
    cache: BaseCache,
    workspace: str,
    project: str,
    strategy_name: str,
) -> None:
    # In scope of this function, cache keys updates regarding usage limits for
    # specific :workspace and :project are locked - to ensure decrement to be done atomically
    # Returning strategy is a bit naive (we may add to a pool of credits from the next period - but only
    # if we have previously taken from the previous one and some credits are used in the new pool) -
    # in favour of easier implementation.
    with lock_limits(cache=cache, workspace=workspace, project=project):
        return_strategy_limits_usage_credit(
            cache=cache,
            workspace=workspace,
            project=project,
            strategy_name=strategy_name,
        )


@contextmanager
def lock_limits(
    cache: BaseCache,
    workspace: str,
    project: str,
) -> Generator[Union[threading.Lock, redis.lock.Lock], None, None]:
    limits_lock_key = generate_cache_key_for_active_learning_usage_lock(
        workspace=workspace,
        project=project,
    )
    with cache.lock(key=limits_lock_key, expire=MAX_LOCK_TIME) as lock:
        yield lock


def find_strategy_with_spare_usage_credit(
    cache: BaseCache,
    workspace: str,
    project: str,
    matching_strategies_limits: OrderedDict[str, List[StrategyLimit]],
) -> Optional[str]:
    for strategy_name, strategy_limits in matching_strategies_limits.items():
        rejected_by_strategy = (
            datapoint_should_be_rejected_based_on_strategy_usage_limits(
                cache=cache,
                workspace=workspace,
                project=project,
                strategy_name=strategy_name,
                strategy_limits=strategy_limits,
            )
        )
        if not rejected_by_strategy:
            return strategy_name
    return None


def datapoint_should_be_rejected_based_on_strategy_usage_limits(
    cache: BaseCache,
    workspace: str,
    project: str,
    strategy_name: str,
    strategy_limits: List[StrategyLimit],
) -> bool:
    for strategy_limit in strategy_limits:
        limit_reached = datapoint_should_be_rejected_based_on_limit_usage(
            cache=cache,
            workspace=workspace,
            project=project,
            strategy_name=strategy_name,
            strategy_limit=strategy_limit,
        )
        if limit_reached:
            logger.debug(
                f"Violated Active Learning strategy limit: {strategy_limit.limit_type.name} "
                f"with value {strategy_limit.value} for sampling strategy: {strategy_name}."
            )
            return True
    return False


def datapoint_should_be_rejected_based_on_limit_usage(
    cache: BaseCache,
    workspace: str,
    project: str,
    strategy_name: str,
    strategy_limit: StrategyLimit,
) -> bool:
    current_usage = get_current_strategy_limit_usage(
        cache=cache,
        workspace=workspace,
        project=project,
        strategy_name=strategy_name,
        limit_type=strategy_limit.limit_type,
    )
    if current_usage is None:
        current_usage = 0
    return current_usage >= strategy_limit.value


def consume_strategy_limits_usage_credit(
    cache: BaseCache,
    workspace: str,
    project: str,
    strategy_name: str,
) -> None:
    for limit_type in StrategyLimitType:
        consume_strategy_limit_usage_credit(
            cache=cache,
            workspace=workspace,
            project=project,
            strategy_name=strategy_name,
            limit_type=limit_type,
        )


def consume_strategy_limit_usage_credit(
    cache: BaseCache,
    workspace: str,
    project: str,
    strategy_name: str,
    limit_type: StrategyLimitType,
) -> None:
    current_value = get_current_strategy_limit_usage(
        cache=cache,
        limit_type=limit_type,
        workspace=workspace,
        project=project,
        strategy_name=strategy_name,
    )
    if current_value is None:
        current_value = 0
    current_value += 1
    set_current_strategy_limit_usage(
        current_value=current_value,
        cache=cache,
        limit_type=limit_type,
        workspace=workspace,
        project=project,
        strategy_name=strategy_name,
    )


def return_strategy_limits_usage_credit(
    cache: BaseCache,
    workspace: str,
    project: str,
    strategy_name: str,
) -> None:
    for limit_type in StrategyLimitType:
        return_strategy_limit_usage_credit(
            cache=cache,
            workspace=workspace,
            project=project,
            strategy_name=strategy_name,
            limit_type=limit_type,
        )


def return_strategy_limit_usage_credit(
    cache: BaseCache,
    workspace: str,
    project: str,
    strategy_name: str,
    limit_type: StrategyLimitType,
) -> None:
    current_value = get_current_strategy_limit_usage(
        cache=cache,
        limit_type=limit_type,
        workspace=workspace,
        project=project,
        strategy_name=strategy_name,
    )
    if current_value is None:
        return None
    current_value = max(current_value - 1, 0)
    set_current_strategy_limit_usage(
        current_value=current_value,
        cache=cache,
        limit_type=limit_type,
        workspace=workspace,
        project=project,
        strategy_name=strategy_name,
    )


def get_current_strategy_limit_usage(
    cache: BaseCache,
    workspace: str,
    project: str,
    strategy_name: str,
    limit_type: StrategyLimitType,
) -> Optional[int]:
    usage_key = generate_cache_key_for_active_learning_usage(
        limit_type=limit_type,
        workspace=workspace,
        project=project,
        strategy_name=strategy_name,
    )
    value = cache.get(usage_key)
    if value is None:
        return value
    return value[USAGE_KEY]


def set_current_strategy_limit_usage(
    current_value: int,
    cache: BaseCache,
    workspace: str,
    project: str,
    strategy_name: str,
    limit_type: StrategyLimitType,
) -> None:
    usage_key = generate_cache_key_for_active_learning_usage(
        limit_type=limit_type,
        workspace=workspace,
        project=project,
        strategy_name=strategy_name,
    )
    expire = LIMIT_TYPE2KEY_EXPIRATION[limit_type]
    cache.set(key=usage_key, value={USAGE_KEY: current_value}, expire=expire)  # type: ignore


def generate_cache_key_for_active_learning_usage_lock(
    workspace: str,
    project: str,
) -> str:
    return f"active_learning:usage:{workspace}:{project}:usage:lock"


def generate_cache_key_for_active_learning_usage(
    limit_type: StrategyLimitType,
    workspace: str,
    project: str,
    strategy_name: str,
) -> str:
    time_infix = LIMIT_TYPE2KEY_INFIX_GENERATOR[limit_type]()
    return f"active_learning:usage:{workspace}:{project}:{strategy_name}:{time_infix}"