Spaces:
Runtime error
Runtime error
import threading | |
from contextlib import contextmanager | |
from datetime import datetime | |
from typing import Generator, List, Optional, OrderedDict, Union | |
import redis.lock | |
from inference.core import logger | |
from inference.core.active_learning.entities import StrategyLimit, StrategyLimitType | |
from inference.core.active_learning.utils import TIMESTAMP_FORMAT | |
from inference.core.cache.base import BaseCache | |
MAX_LOCK_TIME = 5 | |
SECONDS_IN_HOUR = 60 * 60 | |
USAGE_KEY = "usage" | |
LIMIT_TYPE2KEY_INFIX_GENERATOR = { | |
StrategyLimitType.MINUTELY: lambda: f"minute_{datetime.utcnow().minute}", | |
StrategyLimitType.HOURLY: lambda: f"hour_{datetime.utcnow().hour}", | |
StrategyLimitType.DAILY: lambda: f"day_{datetime.utcnow().strftime(TIMESTAMP_FORMAT)}", | |
} | |
LIMIT_TYPE2KEY_EXPIRATION = { | |
StrategyLimitType.MINUTELY: 120, | |
StrategyLimitType.HOURLY: 2 * SECONDS_IN_HOUR, | |
StrategyLimitType.DAILY: 25 * SECONDS_IN_HOUR, | |
} | |
def use_credit_of_matching_strategy( | |
cache: BaseCache, | |
workspace: str, | |
project: str, | |
matching_strategies_limits: OrderedDict[str, List[StrategyLimit]], | |
) -> Optional[str]: | |
# In scope of this function, cache keys updates regarding usage limits for | |
# specific :workspace and :project are locked - to ensure increment to be done atomically | |
# Limits are accounted at the moment of registration - which may introduce inaccuracy | |
# given that registration is postponed from prediction | |
# Returns: strategy with spare credit if found - else None | |
with lock_limits(cache=cache, workspace=workspace, project=project): | |
strategy_with_spare_credit = find_strategy_with_spare_usage_credit( | |
cache=cache, | |
workspace=workspace, | |
project=project, | |
matching_strategies_limits=matching_strategies_limits, | |
) | |
if strategy_with_spare_credit is None: | |
return None | |
consume_strategy_limits_usage_credit( | |
cache=cache, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_with_spare_credit, | |
) | |
return strategy_with_spare_credit | |
def return_strategy_credit( | |
cache: BaseCache, | |
workspace: str, | |
project: str, | |
strategy_name: str, | |
) -> None: | |
# In scope of this function, cache keys updates regarding usage limits for | |
# specific :workspace and :project are locked - to ensure decrement to be done atomically | |
# Returning strategy is a bit naive (we may add to a pool of credits from the next period - but only | |
# if we have previously taken from the previous one and some credits are used in the new pool) - | |
# in favour of easier implementation. | |
with lock_limits(cache=cache, workspace=workspace, project=project): | |
return_strategy_limits_usage_credit( | |
cache=cache, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_name, | |
) | |
def lock_limits( | |
cache: BaseCache, | |
workspace: str, | |
project: str, | |
) -> Generator[Union[threading.Lock, redis.lock.Lock], None, None]: | |
limits_lock_key = generate_cache_key_for_active_learning_usage_lock( | |
workspace=workspace, | |
project=project, | |
) | |
with cache.lock(key=limits_lock_key, expire=MAX_LOCK_TIME) as lock: | |
yield lock | |
def find_strategy_with_spare_usage_credit( | |
cache: BaseCache, | |
workspace: str, | |
project: str, | |
matching_strategies_limits: OrderedDict[str, List[StrategyLimit]], | |
) -> Optional[str]: | |
for strategy_name, strategy_limits in matching_strategies_limits.items(): | |
rejected_by_strategy = ( | |
datapoint_should_be_rejected_based_on_strategy_usage_limits( | |
cache=cache, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_name, | |
strategy_limits=strategy_limits, | |
) | |
) | |
if not rejected_by_strategy: | |
return strategy_name | |
return None | |
def datapoint_should_be_rejected_based_on_strategy_usage_limits( | |
cache: BaseCache, | |
workspace: str, | |
project: str, | |
strategy_name: str, | |
strategy_limits: List[StrategyLimit], | |
) -> bool: | |
for strategy_limit in strategy_limits: | |
limit_reached = datapoint_should_be_rejected_based_on_limit_usage( | |
cache=cache, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_name, | |
strategy_limit=strategy_limit, | |
) | |
if limit_reached: | |
logger.debug( | |
f"Violated Active Learning strategy limit: {strategy_limit.limit_type.name} " | |
f"with value {strategy_limit.value} for sampling strategy: {strategy_name}." | |
) | |
return True | |
return False | |
def datapoint_should_be_rejected_based_on_limit_usage( | |
cache: BaseCache, | |
workspace: str, | |
project: str, | |
strategy_name: str, | |
strategy_limit: StrategyLimit, | |
) -> bool: | |
current_usage = get_current_strategy_limit_usage( | |
cache=cache, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_name, | |
limit_type=strategy_limit.limit_type, | |
) | |
if current_usage is None: | |
current_usage = 0 | |
return current_usage >= strategy_limit.value | |
def consume_strategy_limits_usage_credit( | |
cache: BaseCache, | |
workspace: str, | |
project: str, | |
strategy_name: str, | |
) -> None: | |
for limit_type in StrategyLimitType: | |
consume_strategy_limit_usage_credit( | |
cache=cache, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_name, | |
limit_type=limit_type, | |
) | |
def consume_strategy_limit_usage_credit( | |
cache: BaseCache, | |
workspace: str, | |
project: str, | |
strategy_name: str, | |
limit_type: StrategyLimitType, | |
) -> None: | |
current_value = get_current_strategy_limit_usage( | |
cache=cache, | |
limit_type=limit_type, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_name, | |
) | |
if current_value is None: | |
current_value = 0 | |
current_value += 1 | |
set_current_strategy_limit_usage( | |
current_value=current_value, | |
cache=cache, | |
limit_type=limit_type, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_name, | |
) | |
def return_strategy_limits_usage_credit( | |
cache: BaseCache, | |
workspace: str, | |
project: str, | |
strategy_name: str, | |
) -> None: | |
for limit_type in StrategyLimitType: | |
return_strategy_limit_usage_credit( | |
cache=cache, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_name, | |
limit_type=limit_type, | |
) | |
def return_strategy_limit_usage_credit( | |
cache: BaseCache, | |
workspace: str, | |
project: str, | |
strategy_name: str, | |
limit_type: StrategyLimitType, | |
) -> None: | |
current_value = get_current_strategy_limit_usage( | |
cache=cache, | |
limit_type=limit_type, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_name, | |
) | |
if current_value is None: | |
return None | |
current_value = max(current_value - 1, 0) | |
set_current_strategy_limit_usage( | |
current_value=current_value, | |
cache=cache, | |
limit_type=limit_type, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_name, | |
) | |
def get_current_strategy_limit_usage( | |
cache: BaseCache, | |
workspace: str, | |
project: str, | |
strategy_name: str, | |
limit_type: StrategyLimitType, | |
) -> Optional[int]: | |
usage_key = generate_cache_key_for_active_learning_usage( | |
limit_type=limit_type, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_name, | |
) | |
value = cache.get(usage_key) | |
if value is None: | |
return value | |
return value[USAGE_KEY] | |
def set_current_strategy_limit_usage( | |
current_value: int, | |
cache: BaseCache, | |
workspace: str, | |
project: str, | |
strategy_name: str, | |
limit_type: StrategyLimitType, | |
) -> None: | |
usage_key = generate_cache_key_for_active_learning_usage( | |
limit_type=limit_type, | |
workspace=workspace, | |
project=project, | |
strategy_name=strategy_name, | |
) | |
expire = LIMIT_TYPE2KEY_EXPIRATION[limit_type] | |
cache.set(key=usage_key, value={USAGE_KEY: current_value}, expire=expire) # type: ignore | |
def generate_cache_key_for_active_learning_usage_lock( | |
workspace: str, | |
project: str, | |
) -> str: | |
return f"active_learning:usage:{workspace}:{project}:usage:lock" | |
def generate_cache_key_for_active_learning_usage( | |
limit_type: StrategyLimitType, | |
workspace: str, | |
project: str, | |
strategy_name: str, | |
) -> str: | |
time_infix = LIMIT_TYPE2KEY_INFIX_GENERATOR[limit_type]() | |
return f"active_learning:usage:{workspace}:{project}:{strategy_name}:{time_infix}" | |