File size: 2,405 Bytes
c71b819
 
 
 
 
2000f4c
c71b819
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b988f68
c71b819
b988f68
c71b819
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import os
import time
import threading
import subprocess
import gradio as gr
from utils import EN_US

ZH2EN = {
    "状态栏": "Status",
    "执行结果": "Command output",
    "命令执行测试工具": "Command executor",
    "输入一个命令, 点击查看其执行结果": "Enter a command, and click to see its result",
    "执行超时时间": "Execution timeout",
}


def _L(zh_txt: str):
    return ZH2EN[zh_txt] if EN_US else zh_txt


class Inject:
    def __init__(self):
        self.output = ""
        self.status = "Success"
        self.thread: threading.Thread = None
        self.timeout = 10
        self.delay = 0.1

    def run(self, cmd: str):
        try:
            self.output = subprocess.check_output(
                cmd,
                shell=True,
                stderr=subprocess.STDOUT,
                text=True,
            )

        except Exception as e:
            self.status = f" {e} "
            self.output = ""

    def infer(self, cmd: str):
        self.status = "Success"
        self.output = ""
        try:
            if self.thread and self.thread.is_alive():
                self.thread.join(timeout=0)

            self.thread = threading.Thread(
                target=self.run,
                args=(cmd,),
                daemon=True,
            )
            self.thread.start()
            delay = 0
            while self.thread and self.thread.is_alive():
                delay += self.delay
                time.sleep(self.delay)
                if delay > self.timeout:
                    self.thread.join(timeout=0)
                    self.status = "Killed"
                    self.output = _L("执行超时时间") + f": {self.timeout}s"
                    break

        except Exception as e:
            self.status = f" {e} "
            self.output = ""

        return self.status, self.output


def cmd_inject():
    inject = Inject()
    return gr.Interface(
        fn=inject.infer,
        inputs=gr.Textbox(
            label=_L("命令执行测试工具"),
            value="dir" if os.name == "nt" else "ls /",
            placeholder=_L("输入一个命令, 点击查看其执行结果"),
        ),
        outputs=[
            gr.Textbox(label=_L("状态栏"), show_copy_button=True),
            gr.TextArea(label=_L("执行结果"), show_copy_button=True),
        ],
        flagging_mode="never",
    )