chroma / chromadb /test /segment /distributed /test_memberlist_provider.py
badalsahani's picture
feat: chroma initial deploy
287a0bc
# Tests the CustomResourceMemberlist provider
import threading
from chromadb.test.conftest import skip_if_not_cluster
from kubernetes import client, config
from chromadb.config import System, Settings
from chromadb.segment.distributed import Memberlist
from chromadb.segment.impl.distributed.segment_directory import (
CustomResourceMemberlistProvider,
KUBERNETES_GROUP,
KUBERNETES_NAMESPACE,
)
import time
# Used for testing to update the memberlist CRD
def update_memberlist(n: int, memberlist_name: str = "test-memberlist") -> Memberlist:
config.load_config()
api_instance = client.CustomObjectsApi()
members = [{"url": f"10.0.0.{i}"} for i in range(1, n + 1)]
body = {
"kind": "MemberList",
"metadata": {"name": memberlist_name},
"spec": {"members": members},
}
_ = api_instance.patch_namespaced_custom_object(
group=KUBERNETES_GROUP,
version="v1",
namespace=KUBERNETES_NAMESPACE,
plural="memberlists",
name=memberlist_name,
body=body,
)
return [m["url"] for m in members]
def compare_memberlists(m1: Memberlist, m2: Memberlist) -> bool:
return sorted(m1) == sorted(m2)
@skip_if_not_cluster()
def test_can_get_memberlist() -> None:
# This test assumes that the memberlist CRD is already created with the name "test-memberlist"
system = System(Settings(allow_reset=True))
provider = system.instance(CustomResourceMemberlistProvider)
provider.set_memberlist_name("test-memberlist")
system.reset_state()
system.start()
# Update the memberlist
members = update_memberlist(3)
# Check that the memberlist is updated after a short delay
time.sleep(2)
assert compare_memberlists(provider.get_memberlist(), members)
system.stop()
@skip_if_not_cluster()
def test_can_update_memberlist_multiple_times() -> None:
# This test assumes that the memberlist CRD is already created with the name "test-memberlist"
system = System(Settings(allow_reset=True))
provider = system.instance(CustomResourceMemberlistProvider)
provider.set_memberlist_name("test-memberlist")
system.reset_state()
system.start()
# Update the memberlist
members = update_memberlist(3)
# Check that the memberlist is updated after a short delay
time.sleep(2)
assert compare_memberlists(provider.get_memberlist(), members)
# Update the memberlist again
members = update_memberlist(5)
# Check that the memberlist is updated after a short delay
time.sleep(2)
assert compare_memberlists(provider.get_memberlist(), members)
system.stop()
@skip_if_not_cluster()
def test_stop_memberlist_kills_thread() -> None:
# This test assumes that the memberlist CRD is already created with the name "test-memberlist"
system = System(Settings(allow_reset=True))
provider = system.instance(CustomResourceMemberlistProvider)
provider.set_memberlist_name("test-memberlist")
system.reset_state()
system.start()
# Make sure a background thread is running
assert len(threading.enumerate()) == 2
# Update the memberlist
members = update_memberlist(3)
# Check that the memberlist is updated after a short delay
time.sleep(2)
assert compare_memberlists(provider.get_memberlist(), members)
# Stop the system
system.stop()
# Check to make sure only one thread is running
assert len(threading.enumerate()) == 1