OMG / inference /enterprise /device_manager /container_service.py
Fucius's picture
Upload 422 files
df6c67d verified
raw
history blame
No virus
9.48 kB
import base64
import time
from dataclasses import dataclass
from datetime import datetime
import requests
import docker
from inference.core.cache import cache
from inference.core.env import METRICS_INTERVAL
from inference.core.logger import logger
from inference.core.utils.image_utils import load_image_rgb
from inference.enterprise.device_manager.helpers import get_cache_model_items
@dataclass
class InferServerContainer:
status: str
id: str
port: int
host: str
startup_time: float
version: str
def __init__(self, docker_container, details):
self.container = docker_container
self.status = details.get("status")
self.id = details.get("uuid")
self.port = details.get("port")
self.host = details.get("host")
self.version = details.get("version")
t = details.get("startup_time_ts").split(".")[0]
self.startup_time = (
datetime.strptime(t, "%Y-%m-%dT%H:%M:%S").timestamp()
if t is not None
else datetime.now().timestamp()
)
def kill(self):
try:
self.container.kill()
return True, None
except Exception as e:
logger.error(e)
return False, None
def restart(self):
try:
self.container.restart()
return True, None
except Exception as e:
logger.error(e)
return False, None
def stop(self):
try:
self.container.stop()
return True, None
except Exception as e:
logger.error(e)
return False, None
def start(self):
try:
self.container.start()
return True, None
except Exception as e:
logger.error(e)
return False, None
def inspect(self):
try:
info = requests.get(f"http://{self.host}:{self.port}/info").json()
return True, info
except Exception as e:
logger.error(e)
return False, None
def snapshot(self):
try:
snapshot = self.get_latest_inferred_images()
snapshot.update({"container_id": self.id})
return True, snapshot
except Exception as e:
logger.error(e)
return False, None
def get_latest_inferred_images(self, max=4):
"""
Retrieve the latest inferred images and associated information for this container.
This method fetches the most recent inferred images within the time interval defined by METRICS_INTERVAL.
Args:
max (int, optional): The maximum number of inferred images to retrieve.
Defaults to 4.
Returns:
dict: A dictionary where each key represents a model ID associated with this
container, and the corresponding value is a list of dictionaries containing
information about the latest inferred images. Each dictionary has the following keys:
- "image" (str): The base64-encoded image data.
- "dimensions" (dict): Image dimensions (width and height).
- "predictions" (list): A list of predictions or results associated with the image.
Notes:
- This method uses the global constant METRICS_INTERVAL to specify the time interval.
"""
now = time.time()
start = now - METRICS_INTERVAL
api_keys = get_cache_model_items().get(self.id, dict()).keys()
model_ids = []
for api_key in api_keys:
mids = get_cache_model_items().get(self.id, dict()).get(api_key, [])
model_ids.extend(mids)
num_images = 0
latest_inferred_images = dict()
for model_id in model_ids:
if num_images >= max:
break
latest_reqs = cache.zrangebyscore(
f"inference:{self.id}:{model_id}", min=start, max=now
)
for req in latest_reqs:
images = req["request"]["image"]
image_dims = req.get("response", {}).get("image", dict())
predictions = req.get("response", {}).get("predictions", [])
if images is None or len(images) == 0:
continue
if type(images) is not list:
images = [images]
for image in images:
value = None
if image["type"] == "base64":
value = image["value"]
else:
loaded_image = load_image_rgb(image)
image_bytes = loaded_image.tobytes()
image_base64 = base64.b64encode(image_bytes).decode("utf-8")
value = image_base64
if latest_inferred_images.get(model_id) is None:
latest_inferred_images[model_id] = []
inference = dict(
image=value, dimensions=image_dims, predictions=predictions
)
latest_inferred_images[model_id].append(inference)
num_images += 1
return latest_inferred_images
def get_startup_config(self):
"""
Get the startup configuration for this container.
Returns:
dict: A dictionary containing the startup configuration for this container.
"""
env_vars = self.container.attrs.get("Config", {}).get("Env", {})
port_bindings = self.container.attrs.get("HostConfig", {}).get(
"PortBindings", {}
)
detached = self.container.attrs.get("HostConfig", {}).get("Detached", False)
image = self.container.attrs.get("Config", {}).get("Image", "")
privileged = self.container.attrs.get("HostConfig", {}).get("Privileged", False)
labels = self.container.attrs.get("Config", {}).get("Labels", {})
env = []
for var in env_vars:
name, value = var.split("=")
env.append(f"{name}={value}")
return {
"env": env,
"port_bindings": port_bindings,
"detach": detached,
"image": image,
"privileged": privileged,
"labels": labels,
# TODO: add device requests
}
def is_inference_server_container(container):
"""
Checks if a container is an inference server container
Args:
container (any): A container object from the Docker SDK
Returns:
boolean: True if the container is an inference server container, False otherwise
"""
image_tags = container.image.tags
for t in image_tags:
if t.startswith("roboflow/roboflow-inference-server"):
return True
return False
def get_inference_containers():
"""
Discovers inference server containers running on the host
and parses their information into a list of InferServerContainer objects
"""
client = docker.from_env()
containers = client.containers.list()
inference_containers = []
for c in containers:
if is_inference_server_container(c):
details = parse_container_info(c)
info = {}
try:
info = requests.get(
f"http://{details['host']}:{details['port']}/info", timeout=3
).json()
except Exception as e:
logger.error(f"Failed to get info from container {c.id} {details} {e}")
details.update(info)
infer_container = InferServerContainer(c, details)
if len(inference_containers) == 0:
inference_containers.append(infer_container)
continue
for ic in inference_containers:
if ic.id == infer_container.id:
continue
inference_containers.append(infer_container)
return inference_containers
def parse_container_info(c):
"""
Parses the container information into a dictionary
Args:
c (any): Docker SDK Container object
Returns:
dict: A dictionary containing the container information
"""
env = c.attrs.get("Config", {}).get("Env", {})
info = {"container_id": c.id, "port": 9001, "host": "0.0.0.0"}
for var in env:
if var.startswith("PORT="):
info["port"] = var.split("=")[1]
elif var.startswith("HOST="):
info["host"] = var.split("=")[1]
status = c.attrs.get("State", {}).get("Status")
if status:
info["status"] = status
container_name = c.attrs.get("Name")
if container_name:
info["container_name_on_host"] = container_name
startup_time = c.attrs.get("State", {}).get("StartedAt")
if startup_time:
info["startup_time_ts"] = startup_time
return info
def get_container_by_id(id):
"""
Gets an inference server container by its id
Args:
id (string): The id of the container
Returns:
container: The container object if found, None otherwise
"""
containers = get_inference_containers()
for c in containers:
if c.id == id:
return c
return None
def get_container_ids():
"""
Gets the ids of the inference server containers
Returns:
list: A list of container ids
"""
containers = get_inference_containers()
return [c.id for c in containers]