import itertools import statistics from collections import Counter, defaultdict from copy import deepcopy from functools import partial from typing import Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Union from uuid import uuid4 import numpy as np from fastapi import BackgroundTasks from inference.core.managers.base import ModelManager from inference.core.utils.image_utils import ImageType, load_image from inference.enterprise.workflows.complier.entities import StepExecutionMode from inference.enterprise.workflows.complier.steps_executors.active_learning_middlewares import ( WorkflowsActiveLearningMiddleware, ) from inference.enterprise.workflows.complier.steps_executors.constants import ( CENTER_X_KEY, CENTER_Y_KEY, DETECTION_ID_KEY, HEIGHT_KEY, IMAGE_TYPE_KEY, IMAGE_VALUE_KEY, ORIGIN_COORDINATES_KEY, ORIGIN_SIZE_KEY, PARENT_ID_KEY, WIDTH_KEY, ) from inference.enterprise.workflows.complier.steps_executors.types import ( NextStepReference, OutputsLookup, ) from inference.enterprise.workflows.complier.steps_executors.utils import ( get_image, resolve_parameter, ) from inference.enterprise.workflows.complier.utils import ( construct_selector_pointing_step_output, construct_step_selector, ) from inference.enterprise.workflows.entities.steps import ( AbsoluteStaticCrop, ActiveLearningDataCollector, AggregationMode, BinaryOperator, CompoundDetectionFilterDefinition, Condition, Crop, DetectionFilter, DetectionFilterDefinition, DetectionOffset, DetectionsConsensus, Operator, RelativeStaticCrop, ) from inference.enterprise.workflows.entities.validators import get_last_selector_chunk from inference.enterprise.workflows.errors import ExecutionGraphError OPERATORS = { Operator.EQUAL: lambda a, b: a == b, Operator.NOT_EQUAL: lambda a, b: a != b, Operator.LOWER_THAN: lambda a, b: a < b, Operator.GREATER_THAN: lambda a, b: a > b, Operator.LOWER_OR_EQUAL_THAN: lambda a, b: a <= b, Operator.GREATER_OR_EQUAL_THAN: lambda a, b: a >= b, Operator.IN: lambda a, b: a in b, } BINARY_OPERATORS = { BinaryOperator.AND: lambda a, b: a and b, BinaryOperator.OR: lambda a, b: a or b, } AGGREGATION_MODE2FIELD_AGGREGATOR = { AggregationMode.MAX: max, AggregationMode.MIN: min, AggregationMode.AVERAGE: statistics.mean, } async def run_crop_step( step: Crop, runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], step_execution_mode: StepExecutionMode, ) -> Tuple[NextStepReference, OutputsLookup]: image = get_image( step=step, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) detections = resolve_parameter( selector_or_value=step.detections, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) decoded_images = [load_image(e) for e in image] decoded_images = [ i[0] if i[1] is True else i[0][:, :, ::-1] for i in decoded_images ] origin_image_shape = extract_origin_size_from_images( input_images=image, decoded_images=decoded_images, ) crops = list( itertools.chain.from_iterable( crop_image(image=i, detections=d, origin_size=o) for i, d, o in zip(decoded_images, detections, origin_image_shape) ) ) parent_ids = [c[PARENT_ID_KEY] for c in crops] outputs_lookup[construct_step_selector(step_name=step.name)] = { "crops": crops, PARENT_ID_KEY: parent_ids, } return None, outputs_lookup def crop_image( image: np.ndarray, detections: List[dict], origin_size: dict, ) -> List[Dict[str, Union[str, np.ndarray]]]: crops = [] for detection in detections: x_min, y_min, x_max, y_max = detection_to_xyxy(detection=detection) cropped_image = image[y_min:y_max, x_min:x_max] crops.append( { IMAGE_TYPE_KEY: ImageType.NUMPY_OBJECT.value, IMAGE_VALUE_KEY: cropped_image, PARENT_ID_KEY: detection[DETECTION_ID_KEY], ORIGIN_COORDINATES_KEY: { CENTER_X_KEY: detection["x"], CENTER_Y_KEY: detection["y"], ORIGIN_SIZE_KEY: origin_size, }, } ) return crops async def run_condition_step( step: Condition, runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], step_execution_mode: StepExecutionMode, ) -> Tuple[NextStepReference, OutputsLookup]: left_value = resolve_parameter( selector_or_value=step.left, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) right_value = resolve_parameter( selector_or_value=step.right, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) evaluation_result = OPERATORS[step.operator](left_value, right_value) next_step = step.step_if_true if evaluation_result else step.step_if_false return next_step, outputs_lookup async def run_detection_filter( step: DetectionFilter, runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], step_execution_mode: StepExecutionMode, ) -> Tuple[NextStepReference, OutputsLookup]: predictions = resolve_parameter( selector_or_value=step.predictions, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) images_meta_selector = construct_selector_pointing_step_output( selector=step.predictions, new_output="image", ) images_meta = resolve_parameter( selector_or_value=images_meta_selector, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) prediction_type_selector = construct_selector_pointing_step_output( selector=step.predictions, new_output="prediction_type", ) predictions_type = resolve_parameter( selector_or_value=prediction_type_selector, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) filter_callable = build_filter_callable(definition=step.filter_definition) result_detections, result_parent_id = [], [] for prediction in predictions: filtered_predictions = [deepcopy(p) for p in prediction if filter_callable(p)] result_detections.append(filtered_predictions) result_parent_id.append([p[PARENT_ID_KEY] for p in filtered_predictions]) step_selector = construct_step_selector(step_name=step.name) outputs_lookup[step_selector] = [ {"predictions": d, PARENT_ID_KEY: p, "image": i, "prediction_type": pt} for d, p, i, pt in zip( result_detections, result_parent_id, images_meta, predictions_type ) ] return None, outputs_lookup def build_filter_callable( definition: Union[DetectionFilterDefinition, CompoundDetectionFilterDefinition], ) -> Callable[[dict], bool]: if definition.type == "CompoundDetectionFilterDefinition": left_callable = build_filter_callable(definition=definition.left) right_callable = build_filter_callable(definition=definition.right) binary_operator = BINARY_OPERATORS[definition.operator] return lambda e: binary_operator(left_callable(e), right_callable(e)) if definition.type == "DetectionFilterDefinition": operator = OPERATORS[definition.operator] return lambda e: operator(e[definition.field_name], definition.reference_value) raise ExecutionGraphError( f"Detected filter definition of type {definition.type} which is unknown" ) async def run_detection_offset_step( step: DetectionOffset, runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], step_execution_mode: StepExecutionMode, ) -> Tuple[NextStepReference, OutputsLookup]: detections = resolve_parameter( selector_or_value=step.predictions, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) images_meta_selector = construct_selector_pointing_step_output( selector=step.predictions, new_output="image", ) images_meta = resolve_parameter( selector_or_value=images_meta_selector, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) prediction_type_selector = construct_selector_pointing_step_output( selector=step.predictions, new_output="prediction_type", ) predictions_type = resolve_parameter( selector_or_value=prediction_type_selector, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) offset_x = resolve_parameter( selector_or_value=step.offset_x, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) offset_y = resolve_parameter( selector_or_value=step.offset_y, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) result_detections, result_parent_id = [], [] for detection in detections: offset_detections = [ offset_detection(detection=d, offset_x=offset_x, offset_y=offset_y) for d in detection ] result_detections.append(offset_detections) result_parent_id.append([d[PARENT_ID_KEY] for d in offset_detections]) step_selector = construct_step_selector(step_name=step.name) outputs_lookup[step_selector] = [ {"predictions": d, PARENT_ID_KEY: p, "image": i, "prediction_type": pt} for d, p, i, pt in zip( result_detections, result_parent_id, images_meta, predictions_type ) ] return None, outputs_lookup def offset_detection( detection: Dict[str, Any], offset_x: int, offset_y: int ) -> Dict[str, Any]: detection_copy = deepcopy(detection) detection_copy[WIDTH_KEY] += round(offset_x) detection_copy[HEIGHT_KEY] += round(offset_y) detection_copy[PARENT_ID_KEY] = detection_copy[DETECTION_ID_KEY] detection_copy[DETECTION_ID_KEY] = str(uuid4()) return detection_copy async def run_static_crop_step( step: Union[AbsoluteStaticCrop, RelativeStaticCrop], runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], step_execution_mode: StepExecutionMode, ) -> Tuple[NextStepReference, OutputsLookup]: image = get_image( step=step, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) decoded_images = [load_image(e) for e in image] decoded_images = [ i[0] if i[1] is True else i[0][:, :, ::-1] for i in decoded_images ] origin_image_shape = extract_origin_size_from_images( input_images=image, decoded_images=decoded_images, ) crops = [ take_static_crop( image=i, crop=step, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, origin_size=size, ) for i, size in zip(decoded_images, origin_image_shape) ] parent_ids = [c[PARENT_ID_KEY] for c in crops] outputs_lookup[construct_step_selector(step_name=step.name)] = { "crops": crops, PARENT_ID_KEY: parent_ids, } return None, outputs_lookup def extract_origin_size_from_images( input_images: List[Union[dict, np.ndarray]], decoded_images: List[np.ndarray], ) -> List[Dict[str, int]]: result = [] for input_image, decoded_image in zip(input_images, decoded_images): if ( issubclass(type(input_image), dict) and ORIGIN_COORDINATES_KEY in input_image ): result.append(input_image[ORIGIN_COORDINATES_KEY][ORIGIN_SIZE_KEY]) else: result.append( {HEIGHT_KEY: decoded_image.shape[0], WIDTH_KEY: decoded_image.shape[1]} ) return result def take_static_crop( image: np.ndarray, crop: Union[AbsoluteStaticCrop, RelativeStaticCrop], runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, origin_size: dict, ) -> Dict[str, Union[str, np.ndarray]]: resolve_parameter_closure = partial( resolve_parameter, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) x_center = resolve_parameter_closure(crop.x_center) y_center = resolve_parameter_closure(crop.y_center) width = resolve_parameter_closure(crop.width) height = resolve_parameter_closure(crop.height) if crop.type == "RelativeStaticCrop": x_center = round(image.shape[1] * x_center) y_center = round(image.shape[0] * y_center) width = round(image.shape[1] * width) height = round(image.shape[0] * height) x_min = round(x_center - width / 2) y_min = round(y_center - height / 2) x_max = round(x_min + width) y_max = round(y_min + height) cropped_image = image[y_min:y_max, x_min:x_max] return { IMAGE_TYPE_KEY: ImageType.NUMPY_OBJECT.value, IMAGE_VALUE_KEY: cropped_image, PARENT_ID_KEY: f"$steps.{crop.name}", ORIGIN_COORDINATES_KEY: { CENTER_X_KEY: x_center, CENTER_Y_KEY: y_center, ORIGIN_SIZE_KEY: origin_size, }, } async def run_detections_consensus_step( step: DetectionsConsensus, runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], step_execution_mode: StepExecutionMode, ) -> Tuple[NextStepReference, OutputsLookup]: resolve_parameter_closure = partial( resolve_parameter, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) all_predictions = [resolve_parameter_closure(p) for p in step.predictions] # all_predictions has shape (n_consensus_input, bs, img_predictions) if len(all_predictions) < 1: raise ExecutionGraphError( f"Consensus step requires at least one source of predictions." ) batch_sizes = get_and_validate_batch_sizes( all_predictions=all_predictions, step_name=step.name, ) images_meta_selector = construct_selector_pointing_step_output( selector=step.predictions[0], new_output="image", ) images_meta = resolve_parameter_closure(images_meta_selector) batch_size = batch_sizes[0] results = [] for batch_index in range(batch_size): batch_element_predictions = [e[batch_index] for e in all_predictions] ( parent_id, object_present, presence_confidence, consensus_detections, ) = resolve_batch_consensus( predictions=batch_element_predictions, required_votes=resolve_parameter_closure(step.required_votes), class_aware=resolve_parameter_closure(step.class_aware), iou_threshold=resolve_parameter_closure(step.iou_threshold), confidence=resolve_parameter_closure(step.confidence), classes_to_consider=resolve_parameter_closure(step.classes_to_consider), required_objects=resolve_parameter_closure(step.required_objects), presence_confidence_aggregation=step.presence_confidence_aggregation, detections_merge_confidence_aggregation=step.detections_merge_confidence_aggregation, detections_merge_coordinates_aggregation=step.detections_merge_coordinates_aggregation, ) results.append( { "predictions": consensus_detections, "parent_id": parent_id, "object_present": object_present, "presence_confidence": presence_confidence, "image": images_meta[batch_index], "prediction_type": "object-detection", } ) outputs_lookup[construct_step_selector(step_name=step.name)] = results return None, outputs_lookup def get_and_validate_batch_sizes( all_predictions: List[List[List[dict]]], step_name: str, ) -> List[int]: batch_sizes = get_predictions_batch_sizes(all_predictions=all_predictions) if not all_batch_sizes_equal(batch_sizes=batch_sizes): raise ExecutionGraphError( f"Detected missmatch of input dimensions in step: {step_name}" ) return batch_sizes def get_predictions_batch_sizes(all_predictions: List[List[List[dict]]]) -> List[int]: return [len(predictions) for predictions in all_predictions] def all_batch_sizes_equal(batch_sizes: List[int]) -> bool: if len(batch_sizes) == 0: return True reference = batch_sizes[0] return all(e == reference for e in batch_sizes) def resolve_batch_consensus( predictions: List[List[dict]], required_votes: int, class_aware: bool, iou_threshold: float, confidence: float, classes_to_consider: Optional[List[str]], required_objects: Optional[Union[int, Dict[str, int]]], presence_confidence_aggregation: AggregationMode, detections_merge_confidence_aggregation: AggregationMode, detections_merge_coordinates_aggregation: AggregationMode, ) -> Tuple[str, bool, Dict[str, float], List[dict]]: if does_not_detected_objects_in_any_source(predictions=predictions): return "undefined", False, {}, [] parent_id = get_parent_id_of_predictions_from_different_sources( predictions=predictions, ) predictions = filter_predictions( predictions=predictions, classes_to_consider=classes_to_consider, ) detections_already_considered = set() consensus_detections = [] for source_id, detection in enumerate_detections(predictions=predictions): ( consensus_detections_update, detections_already_considered, ) = get_consensus_for_single_detection( detection=detection, source_id=source_id, predictions=predictions, iou_threshold=iou_threshold, class_aware=class_aware, required_votes=required_votes, confidence=confidence, detections_merge_confidence_aggregation=detections_merge_confidence_aggregation, detections_merge_coordinates_aggregation=detections_merge_coordinates_aggregation, detections_already_considered=detections_already_considered, ) consensus_detections += consensus_detections_update ( object_present, presence_confidence, ) = check_objects_presence_in_consensus_predictions( consensus_detections=consensus_detections, aggregation_mode=presence_confidence_aggregation, class_aware=class_aware, required_objects=required_objects, ) return ( parent_id, object_present, presence_confidence, consensus_detections, ) def get_consensus_for_single_detection( detection: dict, source_id: int, predictions: List[List[dict]], iou_threshold: float, class_aware: bool, required_votes: int, confidence: float, detections_merge_confidence_aggregation: AggregationMode, detections_merge_coordinates_aggregation: AggregationMode, detections_already_considered: Set[str], ) -> Tuple[List[dict], Set[str]]: if detection["detection_id"] in detections_already_considered: return ([], detections_already_considered) consensus_detections = [] detections_with_max_overlap = ( get_detections_from_different_sources_with_max_overlap( detection=detection, source=source_id, predictions=predictions, iou_threshold=iou_threshold, class_aware=class_aware, detections_already_considered=detections_already_considered, ) ) if len(detections_with_max_overlap) < (required_votes - 1): return consensus_detections, detections_already_considered detections_to_merge = [detection] + [ matched_value[0] for matched_value in detections_with_max_overlap.values() ] merged_detection = merge_detections( detections=detections_to_merge, confidence_aggregation_mode=detections_merge_confidence_aggregation, boxes_aggregation_mode=detections_merge_coordinates_aggregation, ) if merged_detection["confidence"] < confidence: return consensus_detections, detections_already_considered consensus_detections.append(merged_detection) detections_already_considered.add(detection[DETECTION_ID_KEY]) for matched_value in detections_with_max_overlap.values(): detections_already_considered.add(matched_value[0][DETECTION_ID_KEY]) return consensus_detections, detections_already_considered def check_objects_presence_in_consensus_predictions( consensus_detections: List[dict], class_aware: bool, aggregation_mode: AggregationMode, required_objects: Optional[Union[int, Dict[str, int]]], ) -> Tuple[bool, Dict[str, float]]: if len(consensus_detections) == 0: return False, {} if required_objects is None: required_objects = 0 if issubclass(type(required_objects), dict) and not class_aware: required_objects = sum(required_objects.values()) if ( issubclass(type(required_objects), int) and len(consensus_detections) < required_objects ): return False, {} if not class_aware: aggregated_confidence = aggregate_field_values( detections=consensus_detections, field="confidence", aggregation_mode=aggregation_mode, ) return True, {"any_object": aggregated_confidence} class2detections = defaultdict(list) for detection in consensus_detections: class2detections[detection["class"]].append(detection) if issubclass(type(required_objects), dict): for requested_class, required_objects_count in required_objects.items(): if len(class2detections[requested_class]) < required_objects_count: return False, {} class2confidence = { class_name: aggregate_field_values( detections=class_detections, field="confidence", aggregation_mode=aggregation_mode, ) for class_name, class_detections in class2detections.items() } return True, class2confidence def does_not_detected_objects_in_any_source(predictions: List[List[dict]]) -> bool: return all(len(p) == 0 for p in predictions) def get_parent_id_of_predictions_from_different_sources( predictions: List[List[dict]], ) -> str: encountered_parent_ids = { p[PARENT_ID_KEY] for prediction_source in predictions for p in prediction_source } if len(encountered_parent_ids) > 1: raise ExecutionGraphError( f"Missmatch in predictions - while executing consensus step, " f"in equivalent batches, detections are assigned different parent " f"identifiers, whereas consensus can only be applied for predictions " f"made against the same input." ) return list(encountered_parent_ids)[0] def filter_predictions( predictions: List[List[dict]], classes_to_consider: Optional[List[str]], ) -> List[List[dict]]: if classes_to_consider is None: return predictions classes_to_consider = set(classes_to_consider) return [ [ detection for detection in detections if detection["class"] in classes_to_consider ] for detections in predictions ] def get_detections_from_different_sources_with_max_overlap( detection: dict, source: int, predictions: List[List[dict]], iou_threshold: float, class_aware: bool, detections_already_considered: Set[str], ) -> Dict[int, Tuple[dict, float]]: current_max_overlap = {} for other_source, other_detection in enumerate_detections( predictions=predictions, excluded_source=source, ): if other_detection[DETECTION_ID_KEY] in detections_already_considered: continue if class_aware and detection["class"] != other_detection["class"]: continue iou_value = calculate_iou( detection_a=detection, detection_b=other_detection, ) if iou_value <= iou_threshold: continue if current_max_overlap.get(other_source) is None: current_max_overlap[other_source] = (other_detection, iou_value) if current_max_overlap[other_source][1] < iou_value: current_max_overlap[other_source] = (other_detection, iou_value) return current_max_overlap def enumerate_detections( predictions: List[List[dict]], excluded_source: Optional[int] = None, ) -> Generator[Tuple[int, dict], None, None]: for source_id, detections in enumerate(predictions): if excluded_source is not None and excluded_source == source_id: continue for detection in detections: yield source_id, detection def calculate_iou(detection_a: dict, detection_b: dict) -> float: box_a = detection_to_xyxy(detection=detection_a) box_b = detection_to_xyxy(detection=detection_b) x_a = max(box_a[0], box_b[0]) y_a = max(box_a[1], box_b[1]) x_b = min(box_a[2], box_b[2]) y_b = min(box_a[3], box_b[3]) intersection = max(0, x_b - x_a) * max(0, y_b - y_a) bbox_a_area, bbox_b_area = get_detection_sizes( detections=[detection_a, detection_b] ) union = float(bbox_a_area + bbox_b_area - intersection) if union == 0.0: return 0.0 return intersection / float(bbox_a_area + bbox_b_area - intersection) def detection_to_xyxy(detection: dict) -> Tuple[int, int, int, int]: x_min = round(detection["x"] - detection[WIDTH_KEY] / 2) y_min = round(detection["y"] - detection[HEIGHT_KEY] / 2) x_max = round(x_min + detection[WIDTH_KEY]) y_max = round(y_min + detection[HEIGHT_KEY]) return x_min, y_min, x_max, y_max def merge_detections( detections: List[dict], confidence_aggregation_mode: AggregationMode, boxes_aggregation_mode: AggregationMode, ) -> dict: class_name, class_id = AGGREGATION_MODE2CLASS_SELECTOR[confidence_aggregation_mode]( detections ) x, y, width, height = AGGREGATION_MODE2BOXES_AGGREGATOR[boxes_aggregation_mode]( detections ) return { PARENT_ID_KEY: detections[0][PARENT_ID_KEY], DETECTION_ID_KEY: f"{uuid4()}", "class": class_name, "class_id": class_id, "confidence": aggregate_field_values( detections=detections, field="confidence", aggregation_mode=confidence_aggregation_mode, ), "x": x, "y": y, "width": width, "height": height, } def get_majority_class(detections: List[dict]) -> Tuple[str, int]: class_counts = Counter(d["class"] for d in detections) most_common_class_name = class_counts.most_common(1)[0][0] class_id = [ d["class_id"] for d in detections if d["class"] == most_common_class_name ][0] return most_common_class_name, class_id def get_class_of_most_confident_detection(detections: List[dict]) -> Tuple[str, int]: max_confidence = aggregate_field_values( detections=detections, field="confidence", aggregation_mode=AggregationMode.MAX, ) most_confident_prediction = [ d for d in detections if d["confidence"] == max_confidence ][0] return most_confident_prediction["class"], most_confident_prediction["class_id"] def get_class_of_least_confident_detection(detections: List[dict]) -> Tuple[str, int]: max_confidence = aggregate_field_values( detections=detections, field="confidence", aggregation_mode=AggregationMode.MIN, ) most_confident_prediction = [ d for d in detections if d["confidence"] == max_confidence ][0] return most_confident_prediction["class"], most_confident_prediction["class_id"] AGGREGATION_MODE2CLASS_SELECTOR = { AggregationMode.MAX: get_class_of_most_confident_detection, AggregationMode.MIN: get_class_of_least_confident_detection, AggregationMode.AVERAGE: get_majority_class, } def get_average_bounding_box(detections: List[dict]) -> Tuple[int, int, int, int]: x = round(aggregate_field_values(detections=detections, field="x")) y = round(aggregate_field_values(detections=detections, field="y")) width = round(aggregate_field_values(detections=detections, field="width")) height = round(aggregate_field_values(detections=detections, field="height")) return x, y, width, height def get_smallest_bounding_box(detections: List[dict]) -> Tuple[int, int, int, int]: detection_sizes = get_detection_sizes(detections=detections) smallest_size = min(detection_sizes) matching_detection_id = [ idx for idx, v in enumerate(detection_sizes) if v == smallest_size ][0] matching_detection = detections[matching_detection_id] return ( matching_detection["x"], matching_detection["y"], matching_detection["width"], matching_detection["height"], ) def get_largest_bounding_box(detections: List[dict]) -> Tuple[int, int, int, int]: detection_sizes = get_detection_sizes(detections=detections) largest_size = max(detection_sizes) matching_detection_id = [ idx for idx, v in enumerate(detection_sizes) if v == largest_size ][0] matching_detection = detections[matching_detection_id] return ( matching_detection["x"], matching_detection["y"], matching_detection[WIDTH_KEY], matching_detection[HEIGHT_KEY], ) AGGREGATION_MODE2BOXES_AGGREGATOR = { AggregationMode.MAX: get_largest_bounding_box, AggregationMode.MIN: get_smallest_bounding_box, AggregationMode.AVERAGE: get_average_bounding_box, } def get_detection_sizes(detections: List[dict]) -> List[float]: return [d[HEIGHT_KEY] * d[WIDTH_KEY] for d in detections] def aggregate_field_values( detections: List[dict], field: str, aggregation_mode: AggregationMode = AggregationMode.AVERAGE, ) -> float: values = [d[field] for d in detections] return AGGREGATION_MODE2FIELD_AGGREGATOR[aggregation_mode](values) async def run_active_learning_data_collector( step: ActiveLearningDataCollector, runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], step_execution_mode: StepExecutionMode, active_learning_middleware: WorkflowsActiveLearningMiddleware, background_tasks: Optional[BackgroundTasks], ) -> Tuple[NextStepReference, OutputsLookup]: resolve_parameter_closure = partial( resolve_parameter, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) image = get_image( step=step, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) images_meta_selector = construct_selector_pointing_step_output( selector=step.predictions, new_output="image", ) images_meta = resolve_parameter_closure(images_meta_selector) prediction_type_selector = construct_selector_pointing_step_output( selector=step.predictions, new_output="prediction_type", ) predictions_type = resolve_parameter( selector_or_value=prediction_type_selector, runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) prediction_type = set(predictions_type) if len(prediction_type) > 1: raise ExecutionGraphError( f"Active Learning data collection step requires only single prediction " f"type to be part of ingest. Detected: {prediction_type}." ) prediction_type = next(iter(prediction_type)) predictions = resolve_parameter_closure(step.predictions) predictions_output_name = get_last_selector_chunk(step.predictions) target_dataset = resolve_parameter_closure(step.target_dataset) target_dataset_api_key = resolve_parameter_closure(step.target_dataset_api_key) disable_active_learning = resolve_parameter_closure(step.disable_active_learning) active_learning_compatible_predictions = [ {"image": image_meta, predictions_output_name: prediction} for image_meta, prediction in zip(images_meta, predictions) ] active_learning_middleware.register( # this should actually be asyncio, but that requires a lot of backend components redesign dataset_name=target_dataset, images=image, predictions=active_learning_compatible_predictions, api_key=target_dataset_api_key or api_key, active_learning_disabled_for_request=disable_active_learning, prediction_type=prediction_type, background_tasks=background_tasks, active_learning_configuration=step.active_learning_configuration, ) return None, outputs_lookup