File size: 3,470 Bytes
287a0bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# Tests the CustomResourceMemberlist provider
import threading
from chromadb.test.conftest import skip_if_not_cluster
from kubernetes import client, config
from chromadb.config import System, Settings
from chromadb.segment.distributed import Memberlist
from chromadb.segment.impl.distributed.segment_directory import (
    CustomResourceMemberlistProvider,
    KUBERNETES_GROUP,
    KUBERNETES_NAMESPACE,
)
import time


# Used for testing to update the memberlist CRD
def update_memberlist(n: int, memberlist_name: str = "test-memberlist") -> Memberlist:
    config.load_config()
    api_instance = client.CustomObjectsApi()

    members = [{"url": f"10.0.0.{i}"} for i in range(1, n + 1)]

    body = {
        "kind": "MemberList",
        "metadata": {"name": memberlist_name},
        "spec": {"members": members},
    }

    _ = api_instance.patch_namespaced_custom_object(
        group=KUBERNETES_GROUP,
        version="v1",
        namespace=KUBERNETES_NAMESPACE,
        plural="memberlists",
        name=memberlist_name,
        body=body,
    )

    return [m["url"] for m in members]


def compare_memberlists(m1: Memberlist, m2: Memberlist) -> bool:
    return sorted(m1) == sorted(m2)


@skip_if_not_cluster()
def test_can_get_memberlist() -> None:
    # This test assumes that the memberlist CRD is already created with the name "test-memberlist"
    system = System(Settings(allow_reset=True))
    provider = system.instance(CustomResourceMemberlistProvider)
    provider.set_memberlist_name("test-memberlist")
    system.reset_state()
    system.start()

    # Update the memberlist
    members = update_memberlist(3)

    # Check that the memberlist is updated after a short delay
    time.sleep(2)
    assert compare_memberlists(provider.get_memberlist(), members)

    system.stop()


@skip_if_not_cluster()
def test_can_update_memberlist_multiple_times() -> None:
    # This test assumes that the memberlist CRD is already created with the name "test-memberlist"
    system = System(Settings(allow_reset=True))
    provider = system.instance(CustomResourceMemberlistProvider)
    provider.set_memberlist_name("test-memberlist")
    system.reset_state()
    system.start()

    # Update the memberlist
    members = update_memberlist(3)

    # Check that the memberlist is updated after a short delay
    time.sleep(2)
    assert compare_memberlists(provider.get_memberlist(), members)

    # Update the memberlist again
    members = update_memberlist(5)

    # Check that the memberlist is updated after a short delay
    time.sleep(2)
    assert compare_memberlists(provider.get_memberlist(), members)

    system.stop()


@skip_if_not_cluster()
def test_stop_memberlist_kills_thread() -> None:
    # This test assumes that the memberlist CRD is already created with the name "test-memberlist"
    system = System(Settings(allow_reset=True))
    provider = system.instance(CustomResourceMemberlistProvider)
    provider.set_memberlist_name("test-memberlist")
    system.reset_state()
    system.start()

    # Make sure a background thread is running
    assert len(threading.enumerate()) == 2

    # Update the memberlist
    members = update_memberlist(3)

    # Check that the memberlist is updated after a short delay
    time.sleep(2)
    assert compare_memberlists(provider.get_memberlist(), members)

    # Stop the system
    system.stop()

    # Check to make sure only one thread is running
    assert len(threading.enumerate()) == 1