File size: 3,084 Bytes
8a5e8bc
 
 
 
 
 
 
 
 
 
 
47289f8
8a5e8bc
 
 
47289f8
 
 
8a5e8bc
 
47289f8
8a5e8bc
 
 
 
47289f8
 
8a5e8bc
 
 
47289f8
 
8a5e8bc
 
47289f8
8a5e8bc
 
47289f8
8a5e8bc
 
 
 
47289f8
 
8a5e8bc
 
 
 
47289f8
8a5e8bc
 
47289f8
 
8a5e8bc
 
47289f8
 
8a5e8bc
 
 
 
 
 
 
 
47289f8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8a5e8bc
8dd4d48
8a5e8bc
 
 
 
 
 
 
 
47289f8
8a5e8bc
47289f8
8dd4d48
 
 
 
8a5e8bc
971ac20
8a5e8bc
 
 
47289f8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from toolbox import get_conf
from toolbox import set_conf
from toolbox import set_multi_conf
from toolbox import get_plugin_handle
from toolbox import get_plugin_default_kwargs
from toolbox import get_chat_handle
from toolbox import get_chat_default_kwargs
from functools import wraps
import sys
import os


def chat_to_markdown_str(chat):
    result = ""
    for i, cc in enumerate(chat):
        result += f"\n\n{cc[0]}\n\n{cc[1]}"
        if i != len(chat) - 1:
            result += "\n\n---"
    return result


def silence_stdout(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        _original_stdout = sys.stdout
        sys.stdout = open(os.devnull, "w")
        sys.stdout.reconfigure(encoding="utf-8")
        for q in func(*args, **kwargs):
            sys.stdout = _original_stdout
            yield q
            sys.stdout = open(os.devnull, "w")
            sys.stdout.reconfigure(encoding="utf-8")
        sys.stdout.close()
        sys.stdout = _original_stdout

    return wrapper


def silence_stdout_fn(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        _original_stdout = sys.stdout
        sys.stdout = open(os.devnull, "w")
        sys.stdout.reconfigure(encoding="utf-8")
        result = func(*args, **kwargs)
        sys.stdout.close()
        sys.stdout = _original_stdout
        return result

    return wrapper


class VoidTerminal:
    def __init__(self) -> None:
        pass


vt = VoidTerminal()
vt.get_conf = silence_stdout_fn(get_conf)
vt.set_conf = silence_stdout_fn(set_conf)
vt.set_multi_conf = silence_stdout_fn(set_multi_conf)
vt.get_plugin_handle = silence_stdout_fn(get_plugin_handle)
vt.get_plugin_default_kwargs = silence_stdout_fn(get_plugin_default_kwargs)
vt.get_chat_handle = silence_stdout_fn(get_chat_handle)
vt.get_chat_default_kwargs = silence_stdout_fn(get_chat_default_kwargs)
vt.chat_to_markdown_str = chat_to_markdown_str
(
    proxies,
    WEB_PORT,
    LLM_MODEL,
    CONCURRENT_COUNT,
    AUTHENTICATION,
    CHATBOT_HEIGHT,
    LAYOUT,
    API_KEY,
) = vt.get_conf(
    "proxies",
    "WEB_PORT",
    "LLM_MODEL",
    "CONCURRENT_COUNT",
    "AUTHENTICATION",
    "CHATBOT_HEIGHT",
    "LAYOUT",
    "API_KEY",
)


def plugin_test(main_input, plugin, advanced_arg=None, debug=True):
    from rich.live import Live
    from rich.markdown import Markdown

    vt.set_conf(key="API_KEY", value=API_KEY)
    vt.set_conf(key="LLM_MODEL", value=LLM_MODEL)

    plugin = vt.get_plugin_handle(plugin)
    plugin_kwargs = vt.get_plugin_default_kwargs()
    plugin_kwargs["main_input"] = main_input
    if advanced_arg is not None:
        plugin_kwargs["plugin_kwargs"] = advanced_arg
    if debug:
        my_working_plugin = (plugin)(**plugin_kwargs)
    else:
        my_working_plugin = silence_stdout(plugin)(**plugin_kwargs)

    with Live(Markdown(""), auto_refresh=False, vertical_overflow="visible") as live:
        for cookies, chat, hist, msg in my_working_plugin:
            md_str = vt.chat_to_markdown_str(chat)
            md = Markdown(md_str)
            live.update(md, refresh=True)