import time import requests from inference.core.devices.utils import GLOBAL_DEVICE_ID from inference.core.env import API_KEY, METRICS_INTERVAL, METRICS_URL, TAGS from inference.core.logger import logger from inference.core.managers.metrics import get_model_metrics, get_system_info from inference.core.utils.requests import api_key_safe_raise_for_status from inference.core.version import __version__ from inference.enterprise.device_manager.command_handler import handle_command from inference.enterprise.device_manager.container_service import ( get_container_by_id, get_container_ids, ) from inference.enterprise.device_manager.helpers import get_cache_model_items def aggregate_model_stats(container_id): """ Aggregate statistics for models within a specified container. This function retrieves and aggregates performance metrics for all models associated with the given container within a specified time interval. Args: container_id (str): The unique identifier of the container for which model statistics are to be aggregated. Returns: list: A list of dictionaries, where each dictionary represents a model's statistics with the following keys: - "dataset_id" (str): The ID of the dataset associated with the model. - "version" (str): The version of the model. - "api_key" (str): The API key that was used to make an inference against this model - "metrics" (dict): A dictionary containing performance metrics for the model: - "num_inferences" (int): Number of inferences made - "num_errors" (int): Number of errors - "avg_inference_time" (float): Average inference time in seconds Notes: - The function calculates statistics over a time interval defined by the global constant METRICS_INTERVAL, passed in when starting up the container. """ now = time.time() start = now - METRICS_INTERVAL models = [] api_keys = get_cache_model_items().get(container_id, dict()).keys() for api_key in api_keys: model_ids = get_cache_model_items().get(container_id, dict()).get(api_key, []) for model_id in model_ids: model = { "dataset_id": model_id.split("/")[0], "version": model_id.split("/")[1], "api_key": api_key, "metrics": get_model_metrics( container_id, model_id, min=start, max=now ), } models.append(model) return models def build_container_stats(): """ Build statistics for containers and their associated models. Returns: list: A list of dictionaries, where each dictionary represents statistics for a container and its associated models with the following keys: - "uuid" (str): The unique identifier (UUID) of the container. - "startup_time" (float): The timestamp representing the container's startup time. - "models" (list): A list of dictionaries representing statistics for each model associated with the container (see `aggregate_model_stats` for format). Notes: - This method relies on a singleton `container_service` for container information. """ containers = [] for id in get_container_ids(): container = get_container_by_id(id) if container: container_stats = {} models = aggregate_model_stats(id) container_stats["uuid"] = container.id container_stats["version"] = container.version container_stats["startup_time"] = container.startup_time container_stats["models"] = models if container.status == "running" or container.status == "restarting": container_stats["status"] = "running" elif container.status == "exited": container_stats["status"] = "stopped" elif container.status == "paused": container_stats["status"] = "idle" else: container_stats["status"] = "processing" containers.append(container_stats) return containers def aggregate_device_stats(): """ Aggregate statistics for the device. """ window_start_timestamp = str(int(time.time())) all_data = { "api_key": API_KEY, "timestamp": window_start_timestamp, "device": { "id": GLOBAL_DEVICE_ID, "name": GLOBAL_DEVICE_ID, "type": f"roboflow-device-manager=={__version__}", "tags": TAGS, "system_info": get_system_info(), "containers": build_container_stats(), }, } return all_data def report_metrics_and_handle_commands(): """ Report metrics to Roboflow. This function aggregates statistics for the device and its containers and sends them to Roboflow. If Roboflow sends back any commands, they are handled by the `handle_command` function. """ all_data = aggregate_device_stats() logger.info(f"Sending metrics to Roboflow {str(all_data)}.") res = requests.post(METRICS_URL, json=all_data) api_key_safe_raise_for_status(response=res) response = res.json() for cmd in response.get("data", []): if cmd: handle_command(cmd)