Spaces:
Sleeping
Sleeping
# Tests the CustomResourceMemberlist provider | |
import threading | |
from chromadb.test.conftest import skip_if_not_cluster | |
from kubernetes import client, config | |
from chromadb.config import System, Settings | |
from chromadb.segment.distributed import Memberlist | |
from chromadb.segment.impl.distributed.segment_directory import ( | |
CustomResourceMemberlistProvider, | |
KUBERNETES_GROUP, | |
KUBERNETES_NAMESPACE, | |
) | |
import time | |
# Used for testing to update the memberlist CRD | |
def update_memberlist(n: int, memberlist_name: str = "test-memberlist") -> Memberlist: | |
config.load_config() | |
api_instance = client.CustomObjectsApi() | |
members = [{"url": f"10.0.0.{i}"} for i in range(1, n + 1)] | |
body = { | |
"kind": "MemberList", | |
"metadata": {"name": memberlist_name}, | |
"spec": {"members": members}, | |
} | |
_ = api_instance.patch_namespaced_custom_object( | |
group=KUBERNETES_GROUP, | |
version="v1", | |
namespace=KUBERNETES_NAMESPACE, | |
plural="memberlists", | |
name=memberlist_name, | |
body=body, | |
) | |
return [m["url"] for m in members] | |
def compare_memberlists(m1: Memberlist, m2: Memberlist) -> bool: | |
return sorted(m1) == sorted(m2) | |
def test_can_get_memberlist() -> None: | |
# This test assumes that the memberlist CRD is already created with the name "test-memberlist" | |
system = System(Settings(allow_reset=True)) | |
provider = system.instance(CustomResourceMemberlistProvider) | |
provider.set_memberlist_name("test-memberlist") | |
system.reset_state() | |
system.start() | |
# Update the memberlist | |
members = update_memberlist(3) | |
# Check that the memberlist is updated after a short delay | |
time.sleep(2) | |
assert compare_memberlists(provider.get_memberlist(), members) | |
system.stop() | |
def test_can_update_memberlist_multiple_times() -> None: | |
# This test assumes that the memberlist CRD is already created with the name "test-memberlist" | |
system = System(Settings(allow_reset=True)) | |
provider = system.instance(CustomResourceMemberlistProvider) | |
provider.set_memberlist_name("test-memberlist") | |
system.reset_state() | |
system.start() | |
# Update the memberlist | |
members = update_memberlist(3) | |
# Check that the memberlist is updated after a short delay | |
time.sleep(2) | |
assert compare_memberlists(provider.get_memberlist(), members) | |
# Update the memberlist again | |
members = update_memberlist(5) | |
# Check that the memberlist is updated after a short delay | |
time.sleep(2) | |
assert compare_memberlists(provider.get_memberlist(), members) | |
system.stop() | |
def test_stop_memberlist_kills_thread() -> None: | |
# This test assumes that the memberlist CRD is already created with the name "test-memberlist" | |
system = System(Settings(allow_reset=True)) | |
provider = system.instance(CustomResourceMemberlistProvider) | |
provider.set_memberlist_name("test-memberlist") | |
system.reset_state() | |
system.start() | |
# Make sure a background thread is running | |
assert len(threading.enumerate()) == 2 | |
# Update the memberlist | |
members = update_memberlist(3) | |
# Check that the memberlist is updated after a short delay | |
time.sleep(2) | |
assert compare_memberlists(provider.get_memberlist(), members) | |
# Stop the system | |
system.stop() | |
# Check to make sure only one thread is running | |
assert len(threading.enumerate()) == 1 | |