Spaces:
Runtime error
Runtime error
from collections import OrderedDict | |
from typing import List, Optional, Tuple | |
from uuid import uuid4 | |
import numpy as np | |
from inference.core import logger | |
from inference.core.active_learning.cache_operations import ( | |
return_strategy_credit, | |
use_credit_of_matching_strategy, | |
) | |
from inference.core.active_learning.entities import ( | |
ActiveLearningConfiguration, | |
ImageDimensions, | |
Prediction, | |
PredictionType, | |
SamplingMethod, | |
) | |
from inference.core.active_learning.post_processing import ( | |
adjust_prediction_to_client_scaling_factor, | |
encode_prediction, | |
) | |
from inference.core.cache.base import BaseCache | |
from inference.core.env import ACTIVE_LEARNING_TAGS | |
from inference.core.roboflow_api import ( | |
annotate_image_at_roboflow, | |
register_image_at_roboflow, | |
) | |
from inference.core.utils.image_utils import encode_image_to_jpeg_bytes | |
from inference.core.utils.preprocess import downscale_image_keeping_aspect_ratio | |
def execute_sampling( | |
image: np.ndarray, | |
prediction: Prediction, | |
prediction_type: PredictionType, | |
sampling_methods: List[SamplingMethod], | |
) -> List[str]: | |
matching_strategies = [] | |
for method in sampling_methods: | |
sampling_result = method.sample(image, prediction, prediction_type) | |
if sampling_result: | |
matching_strategies.append(method.name) | |
return matching_strategies | |
def execute_datapoint_registration( | |
cache: BaseCache, | |
matching_strategies: List[str], | |
image: np.ndarray, | |
prediction: Prediction, | |
prediction_type: PredictionType, | |
configuration: ActiveLearningConfiguration, | |
api_key: str, | |
batch_name: str, | |
) -> None: | |
local_image_id = str(uuid4()) | |
encoded_image, scaling_factor = prepare_image_to_registration( | |
image=image, | |
desired_size=configuration.max_image_size, | |
jpeg_compression_level=configuration.jpeg_compression_level, | |
) | |
prediction = adjust_prediction_to_client_scaling_factor( | |
prediction=prediction, | |
scaling_factor=scaling_factor, | |
prediction_type=prediction_type, | |
) | |
matching_strategies_limits = OrderedDict( | |
(strategy_name, configuration.strategies_limits[strategy_name]) | |
for strategy_name in matching_strategies | |
) | |
strategy_with_spare_credit = use_credit_of_matching_strategy( | |
cache=cache, | |
workspace=configuration.workspace_id, | |
project=configuration.dataset_id, | |
matching_strategies_limits=matching_strategies_limits, | |
) | |
if strategy_with_spare_credit is None: | |
logger.debug(f"Limit on Active Learning strategy reached.") | |
return None | |
register_datapoint_at_roboflow( | |
cache=cache, | |
strategy_with_spare_credit=strategy_with_spare_credit, | |
encoded_image=encoded_image, | |
local_image_id=local_image_id, | |
prediction=prediction, | |
prediction_type=prediction_type, | |
configuration=configuration, | |
api_key=api_key, | |
batch_name=batch_name, | |
) | |
def prepare_image_to_registration( | |
image: np.ndarray, | |
desired_size: Optional[ImageDimensions], | |
jpeg_compression_level: int, | |
) -> Tuple[bytes, float]: | |
scaling_factor = 1.0 | |
if desired_size is not None: | |
height_before_scale = image.shape[0] | |
image = downscale_image_keeping_aspect_ratio( | |
image=image, | |
desired_size=desired_size.to_wh(), | |
) | |
scaling_factor = image.shape[0] / height_before_scale | |
return ( | |
encode_image_to_jpeg_bytes(image=image, jpeg_quality=jpeg_compression_level), | |
scaling_factor, | |
) | |
def register_datapoint_at_roboflow( | |
cache: BaseCache, | |
strategy_with_spare_credit: str, | |
encoded_image: bytes, | |
local_image_id: str, | |
prediction: Prediction, | |
prediction_type: PredictionType, | |
configuration: ActiveLearningConfiguration, | |
api_key: str, | |
batch_name: str, | |
) -> None: | |
tags = collect_tags( | |
configuration=configuration, | |
sampling_strategy=strategy_with_spare_credit, | |
) | |
roboflow_image_id = safe_register_image_at_roboflow( | |
cache=cache, | |
strategy_with_spare_credit=strategy_with_spare_credit, | |
encoded_image=encoded_image, | |
local_image_id=local_image_id, | |
configuration=configuration, | |
api_key=api_key, | |
batch_name=batch_name, | |
tags=tags, | |
) | |
if is_prediction_registration_forbidden( | |
prediction=prediction, | |
persist_predictions=configuration.persist_predictions, | |
roboflow_image_id=roboflow_image_id, | |
): | |
return None | |
encoded_prediction, prediction_file_type = encode_prediction( | |
prediction=prediction, prediction_type=prediction_type | |
) | |
_ = annotate_image_at_roboflow( | |
api_key=api_key, | |
dataset_id=configuration.dataset_id, | |
local_image_id=local_image_id, | |
roboflow_image_id=roboflow_image_id, | |
annotation_content=encoded_prediction, | |
annotation_file_type=prediction_file_type, | |
is_prediction=True, | |
) | |
def collect_tags( | |
configuration: ActiveLearningConfiguration, sampling_strategy: str | |
) -> List[str]: | |
tags = ACTIVE_LEARNING_TAGS if ACTIVE_LEARNING_TAGS is not None else [] | |
tags.extend(configuration.tags) | |
tags.extend(configuration.strategies_tags[sampling_strategy]) | |
if configuration.persist_predictions: | |
# this replacement is needed due to backend input validation | |
tags.append(configuration.model_id.replace("/", "-")) | |
return tags | |
def safe_register_image_at_roboflow( | |
cache: BaseCache, | |
strategy_with_spare_credit: str, | |
encoded_image: bytes, | |
local_image_id: str, | |
configuration: ActiveLearningConfiguration, | |
api_key: str, | |
batch_name: str, | |
tags: List[str], | |
) -> Optional[str]: | |
credit_to_be_returned = False | |
try: | |
registration_response = register_image_at_roboflow( | |
api_key=api_key, | |
dataset_id=configuration.dataset_id, | |
local_image_id=local_image_id, | |
image_bytes=encoded_image, | |
batch_name=batch_name, | |
tags=tags, | |
) | |
image_duplicated = registration_response.get("duplicate", False) | |
if image_duplicated: | |
credit_to_be_returned = True | |
logger.warning(f"Image duplication detected: {registration_response}.") | |
return None | |
return registration_response["id"] | |
except Exception as error: | |
credit_to_be_returned = True | |
raise error | |
finally: | |
if credit_to_be_returned: | |
return_strategy_credit( | |
cache=cache, | |
workspace=configuration.workspace_id, | |
project=configuration.dataset_id, | |
strategy_name=strategy_with_spare_credit, | |
) | |
def is_prediction_registration_forbidden( | |
prediction: Prediction, | |
persist_predictions: bool, | |
roboflow_image_id: Optional[str], | |
) -> bool: | |
return ( | |
roboflow_image_id is None | |
or persist_predictions is False | |
or prediction.get("is_stub", False) is True | |
or (len(prediction.get("predictions", [])) == 0 and "top" not in prediction) | |
) | |