from typing import Any, Dict, Generator, Iterable, List, TypeVar, Union import numpy as np from inference.enterprise.workflows.complier.steps_executors.types import OutputsLookup from inference.enterprise.workflows.complier.utils import ( get_step_selector_from_its_output, is_input_selector, is_step_output_selector, ) from inference.enterprise.workflows.entities.steps import ( AbsoluteStaticCrop, ActiveLearningDataCollector, ClipComparison, Crop, OCRModel, RelativeStaticCrop, RoboflowModel, YoloWorld, ) from inference.enterprise.workflows.entities.validators import ( get_last_selector_chunk, is_selector, ) from inference.enterprise.workflows.errors import ExecutionGraphError T = TypeVar("T") def get_image( step: Union[ RoboflowModel, OCRModel, Crop, AbsoluteStaticCrop, RelativeStaticCrop, ClipComparison, ActiveLearningDataCollector, YoloWorld, ], runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, ) -> List[Dict[str, Union[str, np.ndarray]]]: if is_input_selector(selector_or_value=step.image): return runtime_parameters[get_last_selector_chunk(selector=step.image)] if is_step_output_selector(selector_or_value=step.image): step_selector = get_step_selector_from_its_output( step_output_selector=step.image ) step_output = outputs_lookup[step_selector] return step_output[get_last_selector_chunk(selector=step.image)] raise ExecutionGraphError("Cannot find image") def resolve_parameter( selector_or_value: Any, runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, ) -> Any: if not is_selector(selector_or_value=selector_or_value): return selector_or_value if is_step_output_selector(selector_or_value=selector_or_value): step_selector = get_step_selector_from_its_output( step_output_selector=selector_or_value ) step_output = outputs_lookup[step_selector] if issubclass(type(step_output), list): return [ e[get_last_selector_chunk(selector=selector_or_value)] for e in step_output ] return step_output[get_last_selector_chunk(selector=selector_or_value)] return runtime_parameters[get_last_selector_chunk(selector=selector_or_value)] def make_batches( iterable: Iterable[T], batch_size: int ) -> Generator[List[T], None, None]: batch_size = max(batch_size, 1) batch = [] for element in iterable: batch.append(element) if len(batch) >= batch_size: yield batch batch = [] if len(batch) > 0: yield batch