File size: 1,707 Bytes
a8b3f00
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import subprocess
import time

from core.rag.datasource.vdb.couchbase.couchbase_vector import CouchbaseConfig, CouchbaseVector
from tests.integration_tests.vdb.test_vector_store import (
    AbstractVectorTest,
    get_example_text,
    setup_mock_redis,
)


def wait_for_healthy_container(service_name="couchbase-server", timeout=300):
    start_time = time.time()
    while time.time() - start_time < timeout:
        result = subprocess.run(
            ["docker", "inspect", "--format", "{{.State.Health.Status}}", service_name], capture_output=True, text=True
        )
        if result.stdout.strip() == "healthy":
            print(f"{service_name} is healthy!")
            return True
        else:
            print(f"Waiting for {service_name} to be healthy...")
        time.sleep(10)
    raise TimeoutError(f"{service_name} did not become healthy in time")


class CouchbaseTest(AbstractVectorTest):
    def __init__(self):
        super().__init__()
        self.vector = CouchbaseVector(
            collection_name=self.collection_name,
            config=CouchbaseConfig(
                connection_string="couchbase://127.0.0.1",
                user="Administrator",
                password="password",
                bucket_name="Embeddings",
                scope_name="_default",
            ),
        )

    def search_by_vector(self):
        # brief sleep to ensure document is indexed
        time.sleep(5)
        hits_by_vector = self.vector.search_by_vector(query_vector=self.example_embedding)
        assert len(hits_by_vector) == 1


def test_couchbase(setup_mock_redis):
    wait_for_healthy_container("couchbase-server", timeout=60)
    CouchbaseTest().run_all_tests()