Spaces:
Runtime error
Runtime error
import base64 | |
import time | |
from dataclasses import dataclass | |
from datetime import datetime | |
import requests | |
import docker | |
from inference.core.cache import cache | |
from inference.core.env import METRICS_INTERVAL | |
from inference.core.logger import logger | |
from inference.core.utils.image_utils import load_image_rgb | |
from inference.enterprise.device_manager.helpers import get_cache_model_items | |
class InferServerContainer: | |
status: str | |
id: str | |
port: int | |
host: str | |
startup_time: float | |
version: str | |
def __init__(self, docker_container, details): | |
self.container = docker_container | |
self.status = details.get("status") | |
self.id = details.get("uuid") | |
self.port = details.get("port") | |
self.host = details.get("host") | |
self.version = details.get("version") | |
t = details.get("startup_time_ts").split(".")[0] | |
self.startup_time = ( | |
datetime.strptime(t, "%Y-%m-%dT%H:%M:%S").timestamp() | |
if t is not None | |
else datetime.now().timestamp() | |
) | |
def kill(self): | |
try: | |
self.container.kill() | |
return True, None | |
except Exception as e: | |
logger.error(e) | |
return False, None | |
def restart(self): | |
try: | |
self.container.restart() | |
return True, None | |
except Exception as e: | |
logger.error(e) | |
return False, None | |
def stop(self): | |
try: | |
self.container.stop() | |
return True, None | |
except Exception as e: | |
logger.error(e) | |
return False, None | |
def start(self): | |
try: | |
self.container.start() | |
return True, None | |
except Exception as e: | |
logger.error(e) | |
return False, None | |
def inspect(self): | |
try: | |
info = requests.get(f"http://{self.host}:{self.port}/info").json() | |
return True, info | |
except Exception as e: | |
logger.error(e) | |
return False, None | |
def snapshot(self): | |
try: | |
snapshot = self.get_latest_inferred_images() | |
snapshot.update({"container_id": self.id}) | |
return True, snapshot | |
except Exception as e: | |
logger.error(e) | |
return False, None | |
def get_latest_inferred_images(self, max=4): | |
""" | |
Retrieve the latest inferred images and associated information for this container. | |
This method fetches the most recent inferred images within the time interval defined by METRICS_INTERVAL. | |
Args: | |
max (int, optional): The maximum number of inferred images to retrieve. | |
Defaults to 4. | |
Returns: | |
dict: A dictionary where each key represents a model ID associated with this | |
container, and the corresponding value is a list of dictionaries containing | |
information about the latest inferred images. Each dictionary has the following keys: | |
- "image" (str): The base64-encoded image data. | |
- "dimensions" (dict): Image dimensions (width and height). | |
- "predictions" (list): A list of predictions or results associated with the image. | |
Notes: | |
- This method uses the global constant METRICS_INTERVAL to specify the time interval. | |
""" | |
now = time.time() | |
start = now - METRICS_INTERVAL | |
api_keys = get_cache_model_items().get(self.id, dict()).keys() | |
model_ids = [] | |
for api_key in api_keys: | |
mids = get_cache_model_items().get(self.id, dict()).get(api_key, []) | |
model_ids.extend(mids) | |
num_images = 0 | |
latest_inferred_images = dict() | |
for model_id in model_ids: | |
if num_images >= max: | |
break | |
latest_reqs = cache.zrangebyscore( | |
f"inference:{self.id}:{model_id}", min=start, max=now | |
) | |
for req in latest_reqs: | |
images = req["request"]["image"] | |
image_dims = req.get("response", {}).get("image", dict()) | |
predictions = req.get("response", {}).get("predictions", []) | |
if images is None or len(images) == 0: | |
continue | |
if type(images) is not list: | |
images = [images] | |
for image in images: | |
value = None | |
if image["type"] == "base64": | |
value = image["value"] | |
else: | |
loaded_image = load_image_rgb(image) | |
image_bytes = loaded_image.tobytes() | |
image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
value = image_base64 | |
if latest_inferred_images.get(model_id) is None: | |
latest_inferred_images[model_id] = [] | |
inference = dict( | |
image=value, dimensions=image_dims, predictions=predictions | |
) | |
latest_inferred_images[model_id].append(inference) | |
num_images += 1 | |
return latest_inferred_images | |
def get_startup_config(self): | |
""" | |
Get the startup configuration for this container. | |
Returns: | |
dict: A dictionary containing the startup configuration for this container. | |
""" | |
env_vars = self.container.attrs.get("Config", {}).get("Env", {}) | |
port_bindings = self.container.attrs.get("HostConfig", {}).get( | |
"PortBindings", {} | |
) | |
detached = self.container.attrs.get("HostConfig", {}).get("Detached", False) | |
image = self.container.attrs.get("Config", {}).get("Image", "") | |
privileged = self.container.attrs.get("HostConfig", {}).get("Privileged", False) | |
labels = self.container.attrs.get("Config", {}).get("Labels", {}) | |
env = [] | |
for var in env_vars: | |
name, value = var.split("=") | |
env.append(f"{name}={value}") | |
return { | |
"env": env, | |
"port_bindings": port_bindings, | |
"detach": detached, | |
"image": image, | |
"privileged": privileged, | |
"labels": labels, | |
# TODO: add device requests | |
} | |
def is_inference_server_container(container): | |
""" | |
Checks if a container is an inference server container | |
Args: | |
container (any): A container object from the Docker SDK | |
Returns: | |
boolean: True if the container is an inference server container, False otherwise | |
""" | |
image_tags = container.image.tags | |
for t in image_tags: | |
if t.startswith("roboflow/roboflow-inference-server"): | |
return True | |
return False | |
def get_inference_containers(): | |
""" | |
Discovers inference server containers running on the host | |
and parses their information into a list of InferServerContainer objects | |
""" | |
client = docker.from_env() | |
containers = client.containers.list() | |
inference_containers = [] | |
for c in containers: | |
if is_inference_server_container(c): | |
details = parse_container_info(c) | |
info = {} | |
try: | |
info = requests.get( | |
f"http://{details['host']}:{details['port']}/info", timeout=3 | |
).json() | |
except Exception as e: | |
logger.error(f"Failed to get info from container {c.id} {details} {e}") | |
details.update(info) | |
infer_container = InferServerContainer(c, details) | |
if len(inference_containers) == 0: | |
inference_containers.append(infer_container) | |
continue | |
for ic in inference_containers: | |
if ic.id == infer_container.id: | |
continue | |
inference_containers.append(infer_container) | |
return inference_containers | |
def parse_container_info(c): | |
""" | |
Parses the container information into a dictionary | |
Args: | |
c (any): Docker SDK Container object | |
Returns: | |
dict: A dictionary containing the container information | |
""" | |
env = c.attrs.get("Config", {}).get("Env", {}) | |
info = {"container_id": c.id, "port": 9001, "host": "0.0.0.0"} | |
for var in env: | |
if var.startswith("PORT="): | |
info["port"] = var.split("=")[1] | |
elif var.startswith("HOST="): | |
info["host"] = var.split("=")[1] | |
status = c.attrs.get("State", {}).get("Status") | |
if status: | |
info["status"] = status | |
container_name = c.attrs.get("Name") | |
if container_name: | |
info["container_name_on_host"] = container_name | |
startup_time = c.attrs.get("State", {}).get("StartedAt") | |
if startup_time: | |
info["startup_time_ts"] = startup_time | |
return info | |
def get_container_by_id(id): | |
""" | |
Gets an inference server container by its id | |
Args: | |
id (string): The id of the container | |
Returns: | |
container: The container object if found, None otherwise | |
""" | |
containers = get_inference_containers() | |
for c in containers: | |
if c.id == id: | |
return c | |
return None | |
def get_container_ids(): | |
""" | |
Gets the ids of the inference server containers | |
Returns: | |
list: A list of container ids | |
""" | |
containers = get_inference_containers() | |
return [c.id for c in containers] | |