Fucius's picture
Upload 422 files
df6c67d verified
import itertools
import statistics
from collections import Counter, defaultdict
from copy import deepcopy
from functools import partial
from typing import Any, Callable, Dict, Generator, List, Optional, Set, Tuple, Union
from uuid import uuid4
import numpy as np
from fastapi import BackgroundTasks
from inference.core.managers.base import ModelManager
from inference.core.utils.image_utils import ImageType, load_image
from inference.enterprise.workflows.complier.entities import StepExecutionMode
from inference.enterprise.workflows.complier.steps_executors.active_learning_middlewares import (
WorkflowsActiveLearningMiddleware,
)
from inference.enterprise.workflows.complier.steps_executors.constants import (
CENTER_X_KEY,
CENTER_Y_KEY,
DETECTION_ID_KEY,
HEIGHT_KEY,
IMAGE_TYPE_KEY,
IMAGE_VALUE_KEY,
ORIGIN_COORDINATES_KEY,
ORIGIN_SIZE_KEY,
PARENT_ID_KEY,
WIDTH_KEY,
)
from inference.enterprise.workflows.complier.steps_executors.types import (
NextStepReference,
OutputsLookup,
)
from inference.enterprise.workflows.complier.steps_executors.utils import (
get_image,
resolve_parameter,
)
from inference.enterprise.workflows.complier.utils import (
construct_selector_pointing_step_output,
construct_step_selector,
)
from inference.enterprise.workflows.entities.steps import (
AbsoluteStaticCrop,
ActiveLearningDataCollector,
AggregationMode,
BinaryOperator,
CompoundDetectionFilterDefinition,
Condition,
Crop,
DetectionFilter,
DetectionFilterDefinition,
DetectionOffset,
DetectionsConsensus,
Operator,
RelativeStaticCrop,
)
from inference.enterprise.workflows.entities.validators import get_last_selector_chunk
from inference.enterprise.workflows.errors import ExecutionGraphError
OPERATORS = {
Operator.EQUAL: lambda a, b: a == b,
Operator.NOT_EQUAL: lambda a, b: a != b,
Operator.LOWER_THAN: lambda a, b: a < b,
Operator.GREATER_THAN: lambda a, b: a > b,
Operator.LOWER_OR_EQUAL_THAN: lambda a, b: a <= b,
Operator.GREATER_OR_EQUAL_THAN: lambda a, b: a >= b,
Operator.IN: lambda a, b: a in b,
}
BINARY_OPERATORS = {
BinaryOperator.AND: lambda a, b: a and b,
BinaryOperator.OR: lambda a, b: a or b,
}
AGGREGATION_MODE2FIELD_AGGREGATOR = {
AggregationMode.MAX: max,
AggregationMode.MIN: min,
AggregationMode.AVERAGE: statistics.mean,
}
async def run_crop_step(
step: Crop,
runtime_parameters: Dict[str, Any],
outputs_lookup: OutputsLookup,
model_manager: ModelManager,
api_key: Optional[str],
step_execution_mode: StepExecutionMode,
) -> Tuple[NextStepReference, OutputsLookup]:
image = get_image(
step=step,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
detections = resolve_parameter(
selector_or_value=step.detections,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
decoded_images = [load_image(e) for e in image]
decoded_images = [
i[0] if i[1] is True else i[0][:, :, ::-1] for i in decoded_images
]
origin_image_shape = extract_origin_size_from_images(
input_images=image,
decoded_images=decoded_images,
)
crops = list(
itertools.chain.from_iterable(
crop_image(image=i, detections=d, origin_size=o)
for i, d, o in zip(decoded_images, detections, origin_image_shape)
)
)
parent_ids = [c[PARENT_ID_KEY] for c in crops]
outputs_lookup[construct_step_selector(step_name=step.name)] = {
"crops": crops,
PARENT_ID_KEY: parent_ids,
}
return None, outputs_lookup
def crop_image(
image: np.ndarray,
detections: List[dict],
origin_size: dict,
) -> List[Dict[str, Union[str, np.ndarray]]]:
crops = []
for detection in detections:
x_min, y_min, x_max, y_max = detection_to_xyxy(detection=detection)
cropped_image = image[y_min:y_max, x_min:x_max]
crops.append(
{
IMAGE_TYPE_KEY: ImageType.NUMPY_OBJECT.value,
IMAGE_VALUE_KEY: cropped_image,
PARENT_ID_KEY: detection[DETECTION_ID_KEY],
ORIGIN_COORDINATES_KEY: {
CENTER_X_KEY: detection["x"],
CENTER_Y_KEY: detection["y"],
ORIGIN_SIZE_KEY: origin_size,
},
}
)
return crops
async def run_condition_step(
step: Condition,
runtime_parameters: Dict[str, Any],
outputs_lookup: OutputsLookup,
model_manager: ModelManager,
api_key: Optional[str],
step_execution_mode: StepExecutionMode,
) -> Tuple[NextStepReference, OutputsLookup]:
left_value = resolve_parameter(
selector_or_value=step.left,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
right_value = resolve_parameter(
selector_or_value=step.right,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
evaluation_result = OPERATORS[step.operator](left_value, right_value)
next_step = step.step_if_true if evaluation_result else step.step_if_false
return next_step, outputs_lookup
async def run_detection_filter(
step: DetectionFilter,
runtime_parameters: Dict[str, Any],
outputs_lookup: OutputsLookup,
model_manager: ModelManager,
api_key: Optional[str],
step_execution_mode: StepExecutionMode,
) -> Tuple[NextStepReference, OutputsLookup]:
predictions = resolve_parameter(
selector_or_value=step.predictions,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
images_meta_selector = construct_selector_pointing_step_output(
selector=step.predictions,
new_output="image",
)
images_meta = resolve_parameter(
selector_or_value=images_meta_selector,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
prediction_type_selector = construct_selector_pointing_step_output(
selector=step.predictions,
new_output="prediction_type",
)
predictions_type = resolve_parameter(
selector_or_value=prediction_type_selector,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
filter_callable = build_filter_callable(definition=step.filter_definition)
result_detections, result_parent_id = [], []
for prediction in predictions:
filtered_predictions = [deepcopy(p) for p in prediction if filter_callable(p)]
result_detections.append(filtered_predictions)
result_parent_id.append([p[PARENT_ID_KEY] for p in filtered_predictions])
step_selector = construct_step_selector(step_name=step.name)
outputs_lookup[step_selector] = [
{"predictions": d, PARENT_ID_KEY: p, "image": i, "prediction_type": pt}
for d, p, i, pt in zip(
result_detections, result_parent_id, images_meta, predictions_type
)
]
return None, outputs_lookup
def build_filter_callable(
definition: Union[DetectionFilterDefinition, CompoundDetectionFilterDefinition],
) -> Callable[[dict], bool]:
if definition.type == "CompoundDetectionFilterDefinition":
left_callable = build_filter_callable(definition=definition.left)
right_callable = build_filter_callable(definition=definition.right)
binary_operator = BINARY_OPERATORS[definition.operator]
return lambda e: binary_operator(left_callable(e), right_callable(e))
if definition.type == "DetectionFilterDefinition":
operator = OPERATORS[definition.operator]
return lambda e: operator(e[definition.field_name], definition.reference_value)
raise ExecutionGraphError(
f"Detected filter definition of type {definition.type} which is unknown"
)
async def run_detection_offset_step(
step: DetectionOffset,
runtime_parameters: Dict[str, Any],
outputs_lookup: OutputsLookup,
model_manager: ModelManager,
api_key: Optional[str],
step_execution_mode: StepExecutionMode,
) -> Tuple[NextStepReference, OutputsLookup]:
detections = resolve_parameter(
selector_or_value=step.predictions,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
images_meta_selector = construct_selector_pointing_step_output(
selector=step.predictions,
new_output="image",
)
images_meta = resolve_parameter(
selector_or_value=images_meta_selector,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
prediction_type_selector = construct_selector_pointing_step_output(
selector=step.predictions,
new_output="prediction_type",
)
predictions_type = resolve_parameter(
selector_or_value=prediction_type_selector,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
offset_x = resolve_parameter(
selector_or_value=step.offset_x,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
offset_y = resolve_parameter(
selector_or_value=step.offset_y,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
result_detections, result_parent_id = [], []
for detection in detections:
offset_detections = [
offset_detection(detection=d, offset_x=offset_x, offset_y=offset_y)
for d in detection
]
result_detections.append(offset_detections)
result_parent_id.append([d[PARENT_ID_KEY] for d in offset_detections])
step_selector = construct_step_selector(step_name=step.name)
outputs_lookup[step_selector] = [
{"predictions": d, PARENT_ID_KEY: p, "image": i, "prediction_type": pt}
for d, p, i, pt in zip(
result_detections, result_parent_id, images_meta, predictions_type
)
]
return None, outputs_lookup
def offset_detection(
detection: Dict[str, Any], offset_x: int, offset_y: int
) -> Dict[str, Any]:
detection_copy = deepcopy(detection)
detection_copy[WIDTH_KEY] += round(offset_x)
detection_copy[HEIGHT_KEY] += round(offset_y)
detection_copy[PARENT_ID_KEY] = detection_copy[DETECTION_ID_KEY]
detection_copy[DETECTION_ID_KEY] = str(uuid4())
return detection_copy
async def run_static_crop_step(
step: Union[AbsoluteStaticCrop, RelativeStaticCrop],
runtime_parameters: Dict[str, Any],
outputs_lookup: OutputsLookup,
model_manager: ModelManager,
api_key: Optional[str],
step_execution_mode: StepExecutionMode,
) -> Tuple[NextStepReference, OutputsLookup]:
image = get_image(
step=step,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
decoded_images = [load_image(e) for e in image]
decoded_images = [
i[0] if i[1] is True else i[0][:, :, ::-1] for i in decoded_images
]
origin_image_shape = extract_origin_size_from_images(
input_images=image,
decoded_images=decoded_images,
)
crops = [
take_static_crop(
image=i,
crop=step,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
origin_size=size,
)
for i, size in zip(decoded_images, origin_image_shape)
]
parent_ids = [c[PARENT_ID_KEY] for c in crops]
outputs_lookup[construct_step_selector(step_name=step.name)] = {
"crops": crops,
PARENT_ID_KEY: parent_ids,
}
return None, outputs_lookup
def extract_origin_size_from_images(
input_images: List[Union[dict, np.ndarray]],
decoded_images: List[np.ndarray],
) -> List[Dict[str, int]]:
result = []
for input_image, decoded_image in zip(input_images, decoded_images):
if (
issubclass(type(input_image), dict)
and ORIGIN_COORDINATES_KEY in input_image
):
result.append(input_image[ORIGIN_COORDINATES_KEY][ORIGIN_SIZE_KEY])
else:
result.append(
{HEIGHT_KEY: decoded_image.shape[0], WIDTH_KEY: decoded_image.shape[1]}
)
return result
def take_static_crop(
image: np.ndarray,
crop: Union[AbsoluteStaticCrop, RelativeStaticCrop],
runtime_parameters: Dict[str, Any],
outputs_lookup: OutputsLookup,
origin_size: dict,
) -> Dict[str, Union[str, np.ndarray]]:
resolve_parameter_closure = partial(
resolve_parameter,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
x_center = resolve_parameter_closure(crop.x_center)
y_center = resolve_parameter_closure(crop.y_center)
width = resolve_parameter_closure(crop.width)
height = resolve_parameter_closure(crop.height)
if crop.type == "RelativeStaticCrop":
x_center = round(image.shape[1] * x_center)
y_center = round(image.shape[0] * y_center)
width = round(image.shape[1] * width)
height = round(image.shape[0] * height)
x_min = round(x_center - width / 2)
y_min = round(y_center - height / 2)
x_max = round(x_min + width)
y_max = round(y_min + height)
cropped_image = image[y_min:y_max, x_min:x_max]
return {
IMAGE_TYPE_KEY: ImageType.NUMPY_OBJECT.value,
IMAGE_VALUE_KEY: cropped_image,
PARENT_ID_KEY: f"$steps.{crop.name}",
ORIGIN_COORDINATES_KEY: {
CENTER_X_KEY: x_center,
CENTER_Y_KEY: y_center,
ORIGIN_SIZE_KEY: origin_size,
},
}
async def run_detections_consensus_step(
step: DetectionsConsensus,
runtime_parameters: Dict[str, Any],
outputs_lookup: OutputsLookup,
model_manager: ModelManager,
api_key: Optional[str],
step_execution_mode: StepExecutionMode,
) -> Tuple[NextStepReference, OutputsLookup]:
resolve_parameter_closure = partial(
resolve_parameter,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
all_predictions = [resolve_parameter_closure(p) for p in step.predictions]
# all_predictions has shape (n_consensus_input, bs, img_predictions)
if len(all_predictions) < 1:
raise ExecutionGraphError(
f"Consensus step requires at least one source of predictions."
)
batch_sizes = get_and_validate_batch_sizes(
all_predictions=all_predictions,
step_name=step.name,
)
images_meta_selector = construct_selector_pointing_step_output(
selector=step.predictions[0],
new_output="image",
)
images_meta = resolve_parameter_closure(images_meta_selector)
batch_size = batch_sizes[0]
results = []
for batch_index in range(batch_size):
batch_element_predictions = [e[batch_index] for e in all_predictions]
(
parent_id,
object_present,
presence_confidence,
consensus_detections,
) = resolve_batch_consensus(
predictions=batch_element_predictions,
required_votes=resolve_parameter_closure(step.required_votes),
class_aware=resolve_parameter_closure(step.class_aware),
iou_threshold=resolve_parameter_closure(step.iou_threshold),
confidence=resolve_parameter_closure(step.confidence),
classes_to_consider=resolve_parameter_closure(step.classes_to_consider),
required_objects=resolve_parameter_closure(step.required_objects),
presence_confidence_aggregation=step.presence_confidence_aggregation,
detections_merge_confidence_aggregation=step.detections_merge_confidence_aggregation,
detections_merge_coordinates_aggregation=step.detections_merge_coordinates_aggregation,
)
results.append(
{
"predictions": consensus_detections,
"parent_id": parent_id,
"object_present": object_present,
"presence_confidence": presence_confidence,
"image": images_meta[batch_index],
"prediction_type": "object-detection",
}
)
outputs_lookup[construct_step_selector(step_name=step.name)] = results
return None, outputs_lookup
def get_and_validate_batch_sizes(
all_predictions: List[List[List[dict]]],
step_name: str,
) -> List[int]:
batch_sizes = get_predictions_batch_sizes(all_predictions=all_predictions)
if not all_batch_sizes_equal(batch_sizes=batch_sizes):
raise ExecutionGraphError(
f"Detected missmatch of input dimensions in step: {step_name}"
)
return batch_sizes
def get_predictions_batch_sizes(all_predictions: List[List[List[dict]]]) -> List[int]:
return [len(predictions) for predictions in all_predictions]
def all_batch_sizes_equal(batch_sizes: List[int]) -> bool:
if len(batch_sizes) == 0:
return True
reference = batch_sizes[0]
return all(e == reference for e in batch_sizes)
def resolve_batch_consensus(
predictions: List[List[dict]],
required_votes: int,
class_aware: bool,
iou_threshold: float,
confidence: float,
classes_to_consider: Optional[List[str]],
required_objects: Optional[Union[int, Dict[str, int]]],
presence_confidence_aggregation: AggregationMode,
detections_merge_confidence_aggregation: AggregationMode,
detections_merge_coordinates_aggregation: AggregationMode,
) -> Tuple[str, bool, Dict[str, float], List[dict]]:
if does_not_detected_objects_in_any_source(predictions=predictions):
return "undefined", False, {}, []
parent_id = get_parent_id_of_predictions_from_different_sources(
predictions=predictions,
)
predictions = filter_predictions(
predictions=predictions,
classes_to_consider=classes_to_consider,
)
detections_already_considered = set()
consensus_detections = []
for source_id, detection in enumerate_detections(predictions=predictions):
(
consensus_detections_update,
detections_already_considered,
) = get_consensus_for_single_detection(
detection=detection,
source_id=source_id,
predictions=predictions,
iou_threshold=iou_threshold,
class_aware=class_aware,
required_votes=required_votes,
confidence=confidence,
detections_merge_confidence_aggregation=detections_merge_confidence_aggregation,
detections_merge_coordinates_aggregation=detections_merge_coordinates_aggregation,
detections_already_considered=detections_already_considered,
)
consensus_detections += consensus_detections_update
(
object_present,
presence_confidence,
) = check_objects_presence_in_consensus_predictions(
consensus_detections=consensus_detections,
aggregation_mode=presence_confidence_aggregation,
class_aware=class_aware,
required_objects=required_objects,
)
return (
parent_id,
object_present,
presence_confidence,
consensus_detections,
)
def get_consensus_for_single_detection(
detection: dict,
source_id: int,
predictions: List[List[dict]],
iou_threshold: float,
class_aware: bool,
required_votes: int,
confidence: float,
detections_merge_confidence_aggregation: AggregationMode,
detections_merge_coordinates_aggregation: AggregationMode,
detections_already_considered: Set[str],
) -> Tuple[List[dict], Set[str]]:
if detection["detection_id"] in detections_already_considered:
return ([], detections_already_considered)
consensus_detections = []
detections_with_max_overlap = (
get_detections_from_different_sources_with_max_overlap(
detection=detection,
source=source_id,
predictions=predictions,
iou_threshold=iou_threshold,
class_aware=class_aware,
detections_already_considered=detections_already_considered,
)
)
if len(detections_with_max_overlap) < (required_votes - 1):
return consensus_detections, detections_already_considered
detections_to_merge = [detection] + [
matched_value[0] for matched_value in detections_with_max_overlap.values()
]
merged_detection = merge_detections(
detections=detections_to_merge,
confidence_aggregation_mode=detections_merge_confidence_aggregation,
boxes_aggregation_mode=detections_merge_coordinates_aggregation,
)
if merged_detection["confidence"] < confidence:
return consensus_detections, detections_already_considered
consensus_detections.append(merged_detection)
detections_already_considered.add(detection[DETECTION_ID_KEY])
for matched_value in detections_with_max_overlap.values():
detections_already_considered.add(matched_value[0][DETECTION_ID_KEY])
return consensus_detections, detections_already_considered
def check_objects_presence_in_consensus_predictions(
consensus_detections: List[dict],
class_aware: bool,
aggregation_mode: AggregationMode,
required_objects: Optional[Union[int, Dict[str, int]]],
) -> Tuple[bool, Dict[str, float]]:
if len(consensus_detections) == 0:
return False, {}
if required_objects is None:
required_objects = 0
if issubclass(type(required_objects), dict) and not class_aware:
required_objects = sum(required_objects.values())
if (
issubclass(type(required_objects), int)
and len(consensus_detections) < required_objects
):
return False, {}
if not class_aware:
aggregated_confidence = aggregate_field_values(
detections=consensus_detections,
field="confidence",
aggregation_mode=aggregation_mode,
)
return True, {"any_object": aggregated_confidence}
class2detections = defaultdict(list)
for detection in consensus_detections:
class2detections[detection["class"]].append(detection)
if issubclass(type(required_objects), dict):
for requested_class, required_objects_count in required_objects.items():
if len(class2detections[requested_class]) < required_objects_count:
return False, {}
class2confidence = {
class_name: aggregate_field_values(
detections=class_detections,
field="confidence",
aggregation_mode=aggregation_mode,
)
for class_name, class_detections in class2detections.items()
}
return True, class2confidence
def does_not_detected_objects_in_any_source(predictions: List[List[dict]]) -> bool:
return all(len(p) == 0 for p in predictions)
def get_parent_id_of_predictions_from_different_sources(
predictions: List[List[dict]],
) -> str:
encountered_parent_ids = {
p[PARENT_ID_KEY] for prediction_source in predictions for p in prediction_source
}
if len(encountered_parent_ids) > 1:
raise ExecutionGraphError(
f"Missmatch in predictions - while executing consensus step, "
f"in equivalent batches, detections are assigned different parent "
f"identifiers, whereas consensus can only be applied for predictions "
f"made against the same input."
)
return list(encountered_parent_ids)[0]
def filter_predictions(
predictions: List[List[dict]],
classes_to_consider: Optional[List[str]],
) -> List[List[dict]]:
if classes_to_consider is None:
return predictions
classes_to_consider = set(classes_to_consider)
return [
[
detection
for detection in detections
if detection["class"] in classes_to_consider
]
for detections in predictions
]
def get_detections_from_different_sources_with_max_overlap(
detection: dict,
source: int,
predictions: List[List[dict]],
iou_threshold: float,
class_aware: bool,
detections_already_considered: Set[str],
) -> Dict[int, Tuple[dict, float]]:
current_max_overlap = {}
for other_source, other_detection in enumerate_detections(
predictions=predictions,
excluded_source=source,
):
if other_detection[DETECTION_ID_KEY] in detections_already_considered:
continue
if class_aware and detection["class"] != other_detection["class"]:
continue
iou_value = calculate_iou(
detection_a=detection,
detection_b=other_detection,
)
if iou_value <= iou_threshold:
continue
if current_max_overlap.get(other_source) is None:
current_max_overlap[other_source] = (other_detection, iou_value)
if current_max_overlap[other_source][1] < iou_value:
current_max_overlap[other_source] = (other_detection, iou_value)
return current_max_overlap
def enumerate_detections(
predictions: List[List[dict]],
excluded_source: Optional[int] = None,
) -> Generator[Tuple[int, dict], None, None]:
for source_id, detections in enumerate(predictions):
if excluded_source is not None and excluded_source == source_id:
continue
for detection in detections:
yield source_id, detection
def calculate_iou(detection_a: dict, detection_b: dict) -> float:
box_a = detection_to_xyxy(detection=detection_a)
box_b = detection_to_xyxy(detection=detection_b)
x_a = max(box_a[0], box_b[0])
y_a = max(box_a[1], box_b[1])
x_b = min(box_a[2], box_b[2])
y_b = min(box_a[3], box_b[3])
intersection = max(0, x_b - x_a) * max(0, y_b - y_a)
bbox_a_area, bbox_b_area = get_detection_sizes(
detections=[detection_a, detection_b]
)
union = float(bbox_a_area + bbox_b_area - intersection)
if union == 0.0:
return 0.0
return intersection / float(bbox_a_area + bbox_b_area - intersection)
def detection_to_xyxy(detection: dict) -> Tuple[int, int, int, int]:
x_min = round(detection["x"] - detection[WIDTH_KEY] / 2)
y_min = round(detection["y"] - detection[HEIGHT_KEY] / 2)
x_max = round(x_min + detection[WIDTH_KEY])
y_max = round(y_min + detection[HEIGHT_KEY])
return x_min, y_min, x_max, y_max
def merge_detections(
detections: List[dict],
confidence_aggregation_mode: AggregationMode,
boxes_aggregation_mode: AggregationMode,
) -> dict:
class_name, class_id = AGGREGATION_MODE2CLASS_SELECTOR[confidence_aggregation_mode](
detections
)
x, y, width, height = AGGREGATION_MODE2BOXES_AGGREGATOR[boxes_aggregation_mode](
detections
)
return {
PARENT_ID_KEY: detections[0][PARENT_ID_KEY],
DETECTION_ID_KEY: f"{uuid4()}",
"class": class_name,
"class_id": class_id,
"confidence": aggregate_field_values(
detections=detections,
field="confidence",
aggregation_mode=confidence_aggregation_mode,
),
"x": x,
"y": y,
"width": width,
"height": height,
}
def get_majority_class(detections: List[dict]) -> Tuple[str, int]:
class_counts = Counter(d["class"] for d in detections)
most_common_class_name = class_counts.most_common(1)[0][0]
class_id = [
d["class_id"] for d in detections if d["class"] == most_common_class_name
][0]
return most_common_class_name, class_id
def get_class_of_most_confident_detection(detections: List[dict]) -> Tuple[str, int]:
max_confidence = aggregate_field_values(
detections=detections,
field="confidence",
aggregation_mode=AggregationMode.MAX,
)
most_confident_prediction = [
d for d in detections if d["confidence"] == max_confidence
][0]
return most_confident_prediction["class"], most_confident_prediction["class_id"]
def get_class_of_least_confident_detection(detections: List[dict]) -> Tuple[str, int]:
max_confidence = aggregate_field_values(
detections=detections,
field="confidence",
aggregation_mode=AggregationMode.MIN,
)
most_confident_prediction = [
d for d in detections if d["confidence"] == max_confidence
][0]
return most_confident_prediction["class"], most_confident_prediction["class_id"]
AGGREGATION_MODE2CLASS_SELECTOR = {
AggregationMode.MAX: get_class_of_most_confident_detection,
AggregationMode.MIN: get_class_of_least_confident_detection,
AggregationMode.AVERAGE: get_majority_class,
}
def get_average_bounding_box(detections: List[dict]) -> Tuple[int, int, int, int]:
x = round(aggregate_field_values(detections=detections, field="x"))
y = round(aggregate_field_values(detections=detections, field="y"))
width = round(aggregate_field_values(detections=detections, field="width"))
height = round(aggregate_field_values(detections=detections, field="height"))
return x, y, width, height
def get_smallest_bounding_box(detections: List[dict]) -> Tuple[int, int, int, int]:
detection_sizes = get_detection_sizes(detections=detections)
smallest_size = min(detection_sizes)
matching_detection_id = [
idx for idx, v in enumerate(detection_sizes) if v == smallest_size
][0]
matching_detection = detections[matching_detection_id]
return (
matching_detection["x"],
matching_detection["y"],
matching_detection["width"],
matching_detection["height"],
)
def get_largest_bounding_box(detections: List[dict]) -> Tuple[int, int, int, int]:
detection_sizes = get_detection_sizes(detections=detections)
largest_size = max(detection_sizes)
matching_detection_id = [
idx for idx, v in enumerate(detection_sizes) if v == largest_size
][0]
matching_detection = detections[matching_detection_id]
return (
matching_detection["x"],
matching_detection["y"],
matching_detection[WIDTH_KEY],
matching_detection[HEIGHT_KEY],
)
AGGREGATION_MODE2BOXES_AGGREGATOR = {
AggregationMode.MAX: get_largest_bounding_box,
AggregationMode.MIN: get_smallest_bounding_box,
AggregationMode.AVERAGE: get_average_bounding_box,
}
def get_detection_sizes(detections: List[dict]) -> List[float]:
return [d[HEIGHT_KEY] * d[WIDTH_KEY] for d in detections]
def aggregate_field_values(
detections: List[dict],
field: str,
aggregation_mode: AggregationMode = AggregationMode.AVERAGE,
) -> float:
values = [d[field] for d in detections]
return AGGREGATION_MODE2FIELD_AGGREGATOR[aggregation_mode](values)
async def run_active_learning_data_collector(
step: ActiveLearningDataCollector,
runtime_parameters: Dict[str, Any],
outputs_lookup: OutputsLookup,
model_manager: ModelManager,
api_key: Optional[str],
step_execution_mode: StepExecutionMode,
active_learning_middleware: WorkflowsActiveLearningMiddleware,
background_tasks: Optional[BackgroundTasks],
) -> Tuple[NextStepReference, OutputsLookup]:
resolve_parameter_closure = partial(
resolve_parameter,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
image = get_image(
step=step,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
images_meta_selector = construct_selector_pointing_step_output(
selector=step.predictions,
new_output="image",
)
images_meta = resolve_parameter_closure(images_meta_selector)
prediction_type_selector = construct_selector_pointing_step_output(
selector=step.predictions,
new_output="prediction_type",
)
predictions_type = resolve_parameter(
selector_or_value=prediction_type_selector,
runtime_parameters=runtime_parameters,
outputs_lookup=outputs_lookup,
)
prediction_type = set(predictions_type)
if len(prediction_type) > 1:
raise ExecutionGraphError(
f"Active Learning data collection step requires only single prediction "
f"type to be part of ingest. Detected: {prediction_type}."
)
prediction_type = next(iter(prediction_type))
predictions = resolve_parameter_closure(step.predictions)
predictions_output_name = get_last_selector_chunk(step.predictions)
target_dataset = resolve_parameter_closure(step.target_dataset)
target_dataset_api_key = resolve_parameter_closure(step.target_dataset_api_key)
disable_active_learning = resolve_parameter_closure(step.disable_active_learning)
active_learning_compatible_predictions = [
{"image": image_meta, predictions_output_name: prediction}
for image_meta, prediction in zip(images_meta, predictions)
]
active_learning_middleware.register(
# this should actually be asyncio, but that requires a lot of backend components redesign
dataset_name=target_dataset,
images=image,
predictions=active_learning_compatible_predictions,
api_key=target_dataset_api_key or api_key,
active_learning_disabled_for_request=disable_active_learning,
prediction_type=prediction_type,
background_tasks=background_tasks,
active_learning_configuration=step.active_learning_configuration,
)
return None, outputs_lookup