File size: 3,474 Bytes
5e9cd1d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from pathlib import Path
from pprint import pprint
import os
import shutil
import sys
root_path = Path(__file__).parent.parent
sys.path.append(str(root_path))

from server.knowledge_base.kb_service.base import KBServiceFactory
from server.knowledge_base.utils import get_kb_path, get_doc_path, KnowledgeFile
from server.knowledge_base.migrate import folder2db, prune_db_docs, prune_folder_files


# setup test knowledge base
kb_name = "test_kb_for_migrate"
test_files = {
    "readme.md": str(root_path / "readme.md"),
}


kb_path = get_kb_path(kb_name)
doc_path = get_doc_path(kb_name)

if not os.path.isdir(doc_path):
    os.makedirs(doc_path)

for k, v in test_files.items():
    shutil.copy(v, os.path.join(doc_path, k))


def test_recreate_vs():
    folder2db([kb_name], "recreate_vs")

    kb = KBServiceFactory.get_service_by_name(kb_name)
    assert kb and kb.exists()

    files = kb.list_files()
    print(files)
    for name in test_files:
        assert name in files
        path = os.path.join(doc_path, name)

        # list docs based on file name
        docs = kb.list_docs(file_name=name)
        assert len(docs) > 0
        pprint(docs[0])
        for doc in docs:
            assert doc.metadata["source"] == name

        # list docs base on metadata
        docs = kb.list_docs(metadata={"source": name})
        assert len(docs) > 0

        for doc in docs:
            assert doc.metadata["source"] == name


def test_increment():
    kb = KBServiceFactory.get_service_by_name(kb_name)
    kb.clear_vs()
    assert kb.list_files() == []
    assert kb.list_docs() == []

    folder2db([kb_name], "increment")

    files = kb.list_files()
    print(files)
    for f in test_files:
        assert f in files

        docs = kb.list_docs(file_name=f)
        assert len(docs) > 0
        pprint(docs[0])

        for doc in docs:
            assert doc.metadata["source"] == f


def test_prune_db():
    del_file, keep_file = list(test_files)[:2]
    os.remove(os.path.join(doc_path, del_file))

    prune_db_docs([kb_name])

    kb = KBServiceFactory.get_service_by_name(kb_name)
    files = kb.list_files()
    print(files)
    assert del_file not in files
    assert keep_file in files

    docs = kb.list_docs(file_name=del_file)
    assert len(docs) == 0

    docs = kb.list_docs(file_name=keep_file)
    assert len(docs) > 0
    pprint(docs[0])

    shutil.copy(test_files[del_file], os.path.join(doc_path, del_file))


def test_prune_folder():
    del_file, keep_file = list(test_files)[:2]
    kb = KBServiceFactory.get_service_by_name(kb_name)

    # delete docs for file
    kb.delete_doc(KnowledgeFile(del_file, kb_name))
    files = kb.list_files()
    print(files)
    assert del_file not in files
    assert keep_file in files

    docs = kb.list_docs(file_name=del_file)
    assert len(docs) == 0

    docs = kb.list_docs(file_name=keep_file)
    assert len(docs) > 0

    docs = kb.list_docs(file_name=del_file)
    assert len(docs) == 0

    assert os.path.isfile(os.path.join(doc_path, del_file))

    # prune folder
    prune_folder_files([kb_name])

    # check result
    assert not os.path.isfile(os.path.join(doc_path, del_file))
    assert os.path.isfile(os.path.join(doc_path, keep_file))


def test_drop_kb():
    kb = KBServiceFactory.get_service_by_name(kb_name)
    kb.drop_kb()
    assert not kb.exists()
    assert not os.path.isdir(kb_path)

    kb = KBServiceFactory.get_service_by_name(kb_name)
    assert kb is None