import base64 import time from dataclasses import dataclass from datetime import datetime import requests import docker from inference.core.cache import cache from inference.core.env import METRICS_INTERVAL from inference.core.logger import logger from inference.core.utils.image_utils import load_image_rgb from inference.enterprise.device_manager.helpers import get_cache_model_items @dataclass class InferServerContainer: status: str id: str port: int host: str startup_time: float version: str def __init__(self, docker_container, details): self.container = docker_container self.status = details.get("status") self.id = details.get("uuid") self.port = details.get("port") self.host = details.get("host") self.version = details.get("version") t = details.get("startup_time_ts").split(".")[0] self.startup_time = ( datetime.strptime(t, "%Y-%m-%dT%H:%M:%S").timestamp() if t is not None else datetime.now().timestamp() ) def kill(self): try: self.container.kill() return True, None except Exception as e: logger.error(e) return False, None def restart(self): try: self.container.restart() return True, None except Exception as e: logger.error(e) return False, None def stop(self): try: self.container.stop() return True, None except Exception as e: logger.error(e) return False, None def start(self): try: self.container.start() return True, None except Exception as e: logger.error(e) return False, None def inspect(self): try: info = requests.get(f"http://{self.host}:{self.port}/info").json() return True, info except Exception as e: logger.error(e) return False, None def snapshot(self): try: snapshot = self.get_latest_inferred_images() snapshot.update({"container_id": self.id}) return True, snapshot except Exception as e: logger.error(e) return False, None def get_latest_inferred_images(self, max=4): """ Retrieve the latest inferred images and associated information for this container. This method fetches the most recent inferred images within the time interval defined by METRICS_INTERVAL. Args: max (int, optional): The maximum number of inferred images to retrieve. Defaults to 4. Returns: dict: A dictionary where each key represents a model ID associated with this container, and the corresponding value is a list of dictionaries containing information about the latest inferred images. Each dictionary has the following keys: - "image" (str): The base64-encoded image data. - "dimensions" (dict): Image dimensions (width and height). - "predictions" (list): A list of predictions or results associated with the image. Notes: - This method uses the global constant METRICS_INTERVAL to specify the time interval. """ now = time.time() start = now - METRICS_INTERVAL api_keys = get_cache_model_items().get(self.id, dict()).keys() model_ids = [] for api_key in api_keys: mids = get_cache_model_items().get(self.id, dict()).get(api_key, []) model_ids.extend(mids) num_images = 0 latest_inferred_images = dict() for model_id in model_ids: if num_images >= max: break latest_reqs = cache.zrangebyscore( f"inference:{self.id}:{model_id}", min=start, max=now ) for req in latest_reqs: images = req["request"]["image"] image_dims = req.get("response", {}).get("image", dict()) predictions = req.get("response", {}).get("predictions", []) if images is None or len(images) == 0: continue if type(images) is not list: images = [images] for image in images: value = None if image["type"] == "base64": value = image["value"] else: loaded_image = load_image_rgb(image) image_bytes = loaded_image.tobytes() image_base64 = base64.b64encode(image_bytes).decode("utf-8") value = image_base64 if latest_inferred_images.get(model_id) is None: latest_inferred_images[model_id] = [] inference = dict( image=value, dimensions=image_dims, predictions=predictions ) latest_inferred_images[model_id].append(inference) num_images += 1 return latest_inferred_images def get_startup_config(self): """ Get the startup configuration for this container. Returns: dict: A dictionary containing the startup configuration for this container. """ env_vars = self.container.attrs.get("Config", {}).get("Env", {}) port_bindings = self.container.attrs.get("HostConfig", {}).get( "PortBindings", {} ) detached = self.container.attrs.get("HostConfig", {}).get("Detached", False) image = self.container.attrs.get("Config", {}).get("Image", "") privileged = self.container.attrs.get("HostConfig", {}).get("Privileged", False) labels = self.container.attrs.get("Config", {}).get("Labels", {}) env = [] for var in env_vars: name, value = var.split("=") env.append(f"{name}={value}") return { "env": env, "port_bindings": port_bindings, "detach": detached, "image": image, "privileged": privileged, "labels": labels, # TODO: add device requests } def is_inference_server_container(container): """ Checks if a container is an inference server container Args: container (any): A container object from the Docker SDK Returns: boolean: True if the container is an inference server container, False otherwise """ image_tags = container.image.tags for t in image_tags: if t.startswith("roboflow/roboflow-inference-server"): return True return False def get_inference_containers(): """ Discovers inference server containers running on the host and parses their information into a list of InferServerContainer objects """ client = docker.from_env() containers = client.containers.list() inference_containers = [] for c in containers: if is_inference_server_container(c): details = parse_container_info(c) info = {} try: info = requests.get( f"http://{details['host']}:{details['port']}/info", timeout=3 ).json() except Exception as e: logger.error(f"Failed to get info from container {c.id} {details} {e}") details.update(info) infer_container = InferServerContainer(c, details) if len(inference_containers) == 0: inference_containers.append(infer_container) continue for ic in inference_containers: if ic.id == infer_container.id: continue inference_containers.append(infer_container) return inference_containers def parse_container_info(c): """ Parses the container information into a dictionary Args: c (any): Docker SDK Container object Returns: dict: A dictionary containing the container information """ env = c.attrs.get("Config", {}).get("Env", {}) info = {"container_id": c.id, "port": 9001, "host": "0.0.0.0"} for var in env: if var.startswith("PORT="): info["port"] = var.split("=")[1] elif var.startswith("HOST="): info["host"] = var.split("=")[1] status = c.attrs.get("State", {}).get("Status") if status: info["status"] = status container_name = c.attrs.get("Name") if container_name: info["container_name_on_host"] = container_name startup_time = c.attrs.get("State", {}).get("StartedAt") if startup_time: info["startup_time_ts"] = startup_time return info def get_container_by_id(id): """ Gets an inference server container by its id Args: id (string): The id of the container Returns: container: The container object if found, None otherwise """ containers = get_inference_containers() for c in containers: if c.id == id: return c return None def get_container_ids(): """ Gets the ids of the inference server containers Returns: list: A list of container ids """ containers = get_inference_containers() return [c.id for c in containers]