Spaces:
Sleeping
Sleeping
File size: 3,470 Bytes
287a0bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# Tests the CustomResourceMemberlist provider
import threading
from chromadb.test.conftest import skip_if_not_cluster
from kubernetes import client, config
from chromadb.config import System, Settings
from chromadb.segment.distributed import Memberlist
from chromadb.segment.impl.distributed.segment_directory import (
CustomResourceMemberlistProvider,
KUBERNETES_GROUP,
KUBERNETES_NAMESPACE,
)
import time
# Used for testing to update the memberlist CRD
def update_memberlist(n: int, memberlist_name: str = "test-memberlist") -> Memberlist:
config.load_config()
api_instance = client.CustomObjectsApi()
members = [{"url": f"10.0.0.{i}"} for i in range(1, n + 1)]
body = {
"kind": "MemberList",
"metadata": {"name": memberlist_name},
"spec": {"members": members},
}
_ = api_instance.patch_namespaced_custom_object(
group=KUBERNETES_GROUP,
version="v1",
namespace=KUBERNETES_NAMESPACE,
plural="memberlists",
name=memberlist_name,
body=body,
)
return [m["url"] for m in members]
def compare_memberlists(m1: Memberlist, m2: Memberlist) -> bool:
return sorted(m1) == sorted(m2)
@skip_if_not_cluster()
def test_can_get_memberlist() -> None:
# This test assumes that the memberlist CRD is already created with the name "test-memberlist"
system = System(Settings(allow_reset=True))
provider = system.instance(CustomResourceMemberlistProvider)
provider.set_memberlist_name("test-memberlist")
system.reset_state()
system.start()
# Update the memberlist
members = update_memberlist(3)
# Check that the memberlist is updated after a short delay
time.sleep(2)
assert compare_memberlists(provider.get_memberlist(), members)
system.stop()
@skip_if_not_cluster()
def test_can_update_memberlist_multiple_times() -> None:
# This test assumes that the memberlist CRD is already created with the name "test-memberlist"
system = System(Settings(allow_reset=True))
provider = system.instance(CustomResourceMemberlistProvider)
provider.set_memberlist_name("test-memberlist")
system.reset_state()
system.start()
# Update the memberlist
members = update_memberlist(3)
# Check that the memberlist is updated after a short delay
time.sleep(2)
assert compare_memberlists(provider.get_memberlist(), members)
# Update the memberlist again
members = update_memberlist(5)
# Check that the memberlist is updated after a short delay
time.sleep(2)
assert compare_memberlists(provider.get_memberlist(), members)
system.stop()
@skip_if_not_cluster()
def test_stop_memberlist_kills_thread() -> None:
# This test assumes that the memberlist CRD is already created with the name "test-memberlist"
system = System(Settings(allow_reset=True))
provider = system.instance(CustomResourceMemberlistProvider)
provider.set_memberlist_name("test-memberlist")
system.reset_state()
system.start()
# Make sure a background thread is running
assert len(threading.enumerate()) == 2
# Update the memberlist
members = update_memberlist(3)
# Check that the memberlist is updated after a short delay
time.sleep(2)
assert compare_memberlists(provider.get_memberlist(), members)
# Stop the system
system.stop()
# Check to make sure only one thread is running
assert len(threading.enumerate()) == 1
|