Spaces:
Runtime error
Runtime error
import time | |
import requests | |
from inference.core.devices.utils import GLOBAL_DEVICE_ID | |
from inference.core.env import API_KEY, METRICS_INTERVAL, METRICS_URL, TAGS | |
from inference.core.logger import logger | |
from inference.core.managers.metrics import get_model_metrics, get_system_info | |
from inference.core.utils.requests import api_key_safe_raise_for_status | |
from inference.core.version import __version__ | |
from inference.enterprise.device_manager.command_handler import handle_command | |
from inference.enterprise.device_manager.container_service import ( | |
get_container_by_id, | |
get_container_ids, | |
) | |
from inference.enterprise.device_manager.helpers import get_cache_model_items | |
def aggregate_model_stats(container_id): | |
""" | |
Aggregate statistics for models within a specified container. | |
This function retrieves and aggregates performance metrics for all models | |
associated with the given container within a specified time interval. | |
Args: | |
container_id (str): The unique identifier of the container for which | |
model statistics are to be aggregated. | |
Returns: | |
list: A list of dictionaries, where each dictionary represents a model's | |
statistics with the following keys: | |
- "dataset_id" (str): The ID of the dataset associated with the model. | |
- "version" (str): The version of the model. | |
- "api_key" (str): The API key that was used to make an inference against this model | |
- "metrics" (dict): A dictionary containing performance metrics for the model: | |
- "num_inferences" (int): Number of inferences made | |
- "num_errors" (int): Number of errors | |
- "avg_inference_time" (float): Average inference time in seconds | |
Notes: | |
- The function calculates statistics over a time interval defined by | |
the global constant METRICS_INTERVAL, passed in when starting up the container. | |
""" | |
now = time.time() | |
start = now - METRICS_INTERVAL | |
models = [] | |
api_keys = get_cache_model_items().get(container_id, dict()).keys() | |
for api_key in api_keys: | |
model_ids = get_cache_model_items().get(container_id, dict()).get(api_key, []) | |
for model_id in model_ids: | |
model = { | |
"dataset_id": model_id.split("/")[0], | |
"version": model_id.split("/")[1], | |
"api_key": api_key, | |
"metrics": get_model_metrics( | |
container_id, model_id, min=start, max=now | |
), | |
} | |
models.append(model) | |
return models | |
def build_container_stats(): | |
""" | |
Build statistics for containers and their associated models. | |
Returns: | |
list: A list of dictionaries, where each dictionary represents statistics | |
for a container and its associated models with the following keys: | |
- "uuid" (str): The unique identifier (UUID) of the container. | |
- "startup_time" (float): The timestamp representing the container's startup time. | |
- "models" (list): A list of dictionaries representing statistics for each | |
model associated with the container (see `aggregate_model_stats` for format). | |
Notes: | |
- This method relies on a singleton `container_service` for container information. | |
""" | |
containers = [] | |
for id in get_container_ids(): | |
container = get_container_by_id(id) | |
if container: | |
container_stats = {} | |
models = aggregate_model_stats(id) | |
container_stats["uuid"] = container.id | |
container_stats["version"] = container.version | |
container_stats["startup_time"] = container.startup_time | |
container_stats["models"] = models | |
if container.status == "running" or container.status == "restarting": | |
container_stats["status"] = "running" | |
elif container.status == "exited": | |
container_stats["status"] = "stopped" | |
elif container.status == "paused": | |
container_stats["status"] = "idle" | |
else: | |
container_stats["status"] = "processing" | |
containers.append(container_stats) | |
return containers | |
def aggregate_device_stats(): | |
""" | |
Aggregate statistics for the device. | |
""" | |
window_start_timestamp = str(int(time.time())) | |
all_data = { | |
"api_key": API_KEY, | |
"timestamp": window_start_timestamp, | |
"device": { | |
"id": GLOBAL_DEVICE_ID, | |
"name": GLOBAL_DEVICE_ID, | |
"type": f"roboflow-device-manager=={__version__}", | |
"tags": TAGS, | |
"system_info": get_system_info(), | |
"containers": build_container_stats(), | |
}, | |
} | |
return all_data | |
def report_metrics_and_handle_commands(): | |
""" | |
Report metrics to Roboflow. | |
This function aggregates statistics for the device and its containers and | |
sends them to Roboflow. If Roboflow sends back any commands, they are | |
handled by the `handle_command` function. | |
""" | |
all_data = aggregate_device_stats() | |
logger.info(f"Sending metrics to Roboflow {str(all_data)}.") | |
res = requests.post(METRICS_URL, json=all_data) | |
api_key_safe_raise_for_status(response=res) | |
response = res.json() | |
for cmd in response.get("data", []): | |
if cmd: | |
handle_command(cmd) | |