Fucius's picture
Upload 422 files
df6c67d verified
import time
import requests
from inference.core.devices.utils import GLOBAL_DEVICE_ID
from inference.core.env import API_KEY, METRICS_INTERVAL, METRICS_URL, TAGS
from inference.core.logger import logger
from inference.core.managers.metrics import get_model_metrics, get_system_info
from inference.core.utils.requests import api_key_safe_raise_for_status
from inference.core.version import __version__
from inference.enterprise.device_manager.command_handler import handle_command
from inference.enterprise.device_manager.container_service import (
get_container_by_id,
get_container_ids,
)
from inference.enterprise.device_manager.helpers import get_cache_model_items
def aggregate_model_stats(container_id):
"""
Aggregate statistics for models within a specified container.
This function retrieves and aggregates performance metrics for all models
associated with the given container within a specified time interval.
Args:
container_id (str): The unique identifier of the container for which
model statistics are to be aggregated.
Returns:
list: A list of dictionaries, where each dictionary represents a model's
statistics with the following keys:
- "dataset_id" (str): The ID of the dataset associated with the model.
- "version" (str): The version of the model.
- "api_key" (str): The API key that was used to make an inference against this model
- "metrics" (dict): A dictionary containing performance metrics for the model:
- "num_inferences" (int): Number of inferences made
- "num_errors" (int): Number of errors
- "avg_inference_time" (float): Average inference time in seconds
Notes:
- The function calculates statistics over a time interval defined by
the global constant METRICS_INTERVAL, passed in when starting up the container.
"""
now = time.time()
start = now - METRICS_INTERVAL
models = []
api_keys = get_cache_model_items().get(container_id, dict()).keys()
for api_key in api_keys:
model_ids = get_cache_model_items().get(container_id, dict()).get(api_key, [])
for model_id in model_ids:
model = {
"dataset_id": model_id.split("/")[0],
"version": model_id.split("/")[1],
"api_key": api_key,
"metrics": get_model_metrics(
container_id, model_id, min=start, max=now
),
}
models.append(model)
return models
def build_container_stats():
"""
Build statistics for containers and their associated models.
Returns:
list: A list of dictionaries, where each dictionary represents statistics
for a container and its associated models with the following keys:
- "uuid" (str): The unique identifier (UUID) of the container.
- "startup_time" (float): The timestamp representing the container's startup time.
- "models" (list): A list of dictionaries representing statistics for each
model associated with the container (see `aggregate_model_stats` for format).
Notes:
- This method relies on a singleton `container_service` for container information.
"""
containers = []
for id in get_container_ids():
container = get_container_by_id(id)
if container:
container_stats = {}
models = aggregate_model_stats(id)
container_stats["uuid"] = container.id
container_stats["version"] = container.version
container_stats["startup_time"] = container.startup_time
container_stats["models"] = models
if container.status == "running" or container.status == "restarting":
container_stats["status"] = "running"
elif container.status == "exited":
container_stats["status"] = "stopped"
elif container.status == "paused":
container_stats["status"] = "idle"
else:
container_stats["status"] = "processing"
containers.append(container_stats)
return containers
def aggregate_device_stats():
"""
Aggregate statistics for the device.
"""
window_start_timestamp = str(int(time.time()))
all_data = {
"api_key": API_KEY,
"timestamp": window_start_timestamp,
"device": {
"id": GLOBAL_DEVICE_ID,
"name": GLOBAL_DEVICE_ID,
"type": f"roboflow-device-manager=={__version__}",
"tags": TAGS,
"system_info": get_system_info(),
"containers": build_container_stats(),
},
}
return all_data
def report_metrics_and_handle_commands():
"""
Report metrics to Roboflow.
This function aggregates statistics for the device and its containers and
sends them to Roboflow. If Roboflow sends back any commands, they are
handled by the `handle_command` function.
"""
all_data = aggregate_device_stats()
logger.info(f"Sending metrics to Roboflow {str(all_data)}.")
res = requests.post(METRICS_URL, json=all_data)
api_key_safe_raise_for_status(response=res)
response = res.json()
for cmd in response.get("data", []):
if cmd:
handle_command(cmd)