File size: 5,281 Bytes
ed3fe33
58c39e0
ed3fe33
 
8f809e2
3573a39
ed3fe33
58c39e0
ed3fe33
 
 
 
 
 
 
 
 
 
 
8e32a09
ed3fe33
 
3573a39
92e2a79
 
ed3fe33
 
8f809e2
 
 
1c00552
92e2a79
8f809e2
 
92e2a79
8f809e2
 
 
 
3573a39
 
8f809e2
1c00552
92e2a79
 
8f809e2
3573a39
ed3fe33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fc361e6
ed3fe33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8e32a09
ed3fe33
8f114e2
ed3fe33
 
6811286
ed3fe33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dcc9315
 
 
ed3fe33
77c3f2f
ed3fe33
77c3f2f
acec2fa
77c3f2f
acec2fa
 
 
 
 
 
 
 
77c3f2f
 
 
ed3fe33
 
 
3573a39
92e2a79
 
8f809e2
 
 
 
1c00552
92e2a79
3573a39
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import json
import logging
import os
import subprocess
import threading
import time
from pathlib import Path

import pipe
from app_env import (
    HF_GSK_HUB_HF_TOKEN,
    HF_GSK_HUB_KEY,
    HF_GSK_HUB_PROJECT_KEY,
    HF_GSK_HUB_UNLOCK_TOKEN,
    HF_GSK_HUB_URL,
    HF_REPO_ID,
    HF_SPACE_ID,
    HF_WRITE_TOKEN,
)
from io_utils import LOG_FILE, get_submitted_yaml_path, write_log_to_user_file
from isolated_env import prepare_venv
from leaderboard import LEADERBOARD

is_running = False

logger = logging.getLogger(__file__)


def start_process_run_job():
    try:
        logging.debug("Running jobs in thread")
        global thread, is_running
        thread = threading.Thread(target=run_job)
        thread.daemon = True
        is_running = True
        thread.start()

    except Exception as e:
        print("Failed to start thread: ", e)


def stop_thread():
    logging.debug("Stop thread")
    global is_running
    is_running = False


def prepare_env_and_get_command(
    m_id,
    d_id,
    config,
    split,
    inference_token,
    uid,
    label_mapping,
    feature_mapping,
):
    leaderboard_dataset = None
    if os.environ.get("SPACE_ID") == "giskardai/giskard-evaluator":
        leaderboard_dataset = LEADERBOARD

    executable = "giskard_scanner"
    try:
        # Copy the current requirements (might be changed)
        with open("requirements.txt", "r") as f:
            executable = prepare_venv(
                uid,
                "\n".join(f.readlines()),
            )
        logger.info(f"Using {executable} as executable")
    except Exception as e:
        logger.warning(f"Create env failed due to {e}, using the current env as fallback.")
        executable = "giskard_scanner"

    command = [
        executable,
        "--loader",
        "huggingface",
        "--model",
        m_id,
        "--dataset",
        d_id,
        "--dataset_config",
        config,
        "--dataset_split",
        split,
        "--output_format",
        "markdown",
        "--output_portal",
        "huggingface",
        "--feature_mapping",
        json.dumps(feature_mapping),
        "--label_mapping",
        json.dumps(label_mapping),
        "--scan_config",
        get_submitted_yaml_path(uid),
        "--inference_type",
        "hf_inference_api",
        "--inference_api_token",
        inference_token,
        "--persist_scan",
    ]
    # The token to publish post
    if os.environ.get(HF_WRITE_TOKEN):
        command.append("--hf_token")
        command.append(os.environ.get(HF_WRITE_TOKEN))

    # The repo to publish for ranking
    if leaderboard_dataset:
        command.append("--leaderboard_dataset")
        command.append(leaderboard_dataset)

    # The info to upload to Giskard hub
    if os.environ.get(HF_GSK_HUB_KEY):
        command.append("--giskard_hub_api_key")
        command.append(os.environ.get(HF_GSK_HUB_KEY))
        if os.environ.get(HF_GSK_HUB_URL):
            command.append("--giskard_hub_url")
            command.append(os.environ.get(HF_GSK_HUB_URL))
        if os.environ.get(HF_GSK_HUB_PROJECT_KEY):
            command.append("--giskard_hub_project_key")
            command.append(os.environ.get(HF_GSK_HUB_PROJECT_KEY))
        if os.environ.get(HF_GSK_HUB_HF_TOKEN):
            command.append("--giskard_hub_hf_token")
            command.append(os.environ.get(HF_GSK_HUB_HF_TOKEN))
        if os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN):
            command.append("--giskard_hub_unlock_token")
            command.append(os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN))

    eval_str = f"[{m_id}]<{d_id}({config}, {split} set)>"

    write_log_to_user_file(
        uid,
        f"Start local evaluation on {eval_str}. Please wait for your job to start...\n",
    )

    return command


def save_job_to_pipe(task_id, job, description, lock):
    with lock:
        pipe.jobs.append((task_id, job, description))


def pop_job_from_pipe():
    if len(pipe.jobs) == 0:
        return
    job_info = pipe.jobs.pop()
    pipe.current = job_info[2]
    task_id = job_info[0]

    # Link to LOG_FILE
    log_file_path = Path(LOG_FILE)
    if log_file_path.exists():
        log_file_path.unlink()
    os.symlink(f"./tmp/{task_id}.log", LOG_FILE)

    write_log_to_user_file(task_id, f"Running job id {task_id}\n")
    command = prepare_env_and_get_command(*job_info[1])

    with open(f"./tmp/{task_id}.log", "a") as log_file:
        return_code = None
        p = subprocess.Popen(command, stdout=log_file, stderr=subprocess.STDOUT)
        while pipe.current and not return_code:
            # Wait for finishing
            return_code = p.wait(timeout=1)

        if not pipe.current:
            # Job interrupted before finishing
            p.terminate()
            p.kill()

            log_file.write(f"\nJob interrupted by admin at {time.asctime()}\n")

        if return_code:
            log_file.write(f"\nJob finished with {return_code} at {time.asctime()}\n")

    pipe.current = None


def run_job():
    global is_running
    while is_running:
        try:
            pop_job_from_pipe()
            time.sleep(10)
        except KeyboardInterrupt:
            logging.debug("KeyboardInterrupt stop background thread")
            is_running = False
            break