import json import logging import os import subprocess import threading import time from pathlib import Path import pipe from app_env import ( HF_GSK_HUB_HF_TOKEN, HF_GSK_HUB_KEY, HF_GSK_HUB_PROJECT_KEY, HF_GSK_HUB_UNLOCK_TOKEN, HF_GSK_HUB_URL, HF_REPO_ID, HF_SPACE_ID, HF_WRITE_TOKEN, ) from io_utils import LOG_FILE, get_submitted_yaml_path, write_log_to_user_file from isolated_env import prepare_venv from leaderboard import LEADERBOARD is_running = False logger = logging.getLogger(__file__) def start_process_run_job(): try: logging.debug("Running jobs in thread") global thread, is_running thread = threading.Thread(target=run_job) thread.daemon = True is_running = True thread.start() except Exception as e: print("Failed to start thread: ", e) def stop_thread(): logging.debug("Stop thread") global is_running is_running = False def prepare_env_and_get_command( m_id, d_id, config, split, inference_token, uid, label_mapping, feature_mapping, ): leaderboard_dataset = None if os.environ.get("SPACE_ID") == "giskardai/giskard-evaluator": leaderboard_dataset = LEADERBOARD executable = "giskard_scanner" try: # Copy the current requirements (might be changed) with open("requirements.txt", "r") as f: executable = prepare_venv( uid, "\n".join(f.readlines()), ) logger.info(f"Using {executable} as executable") except Exception as e: logger.warning(f"Create env failed due to {e}, using the current env as fallback.") executable = "giskard_scanner" command = [ executable, "--loader", "huggingface", "--model", m_id, "--dataset", d_id, "--dataset_config", config, "--dataset_split", split, "--output_format", "markdown", "--output_portal", "huggingface", "--feature_mapping", json.dumps(feature_mapping), "--label_mapping", json.dumps(label_mapping), "--scan_config", get_submitted_yaml_path(uid), "--inference_type", "hf_inference_api", "--inference_api_token", inference_token, "--persist_scan", ] # The token to publish post if os.environ.get(HF_WRITE_TOKEN): command.append("--hf_token") command.append(os.environ.get(HF_WRITE_TOKEN)) # The repo to publish for ranking if leaderboard_dataset: command.append("--leaderboard_dataset") command.append(leaderboard_dataset) # The info to upload to Giskard hub if os.environ.get(HF_GSK_HUB_KEY): command.append("--giskard_hub_api_key") command.append(os.environ.get(HF_GSK_HUB_KEY)) if os.environ.get(HF_GSK_HUB_URL): command.append("--giskard_hub_url") command.append(os.environ.get(HF_GSK_HUB_URL)) if os.environ.get(HF_GSK_HUB_PROJECT_KEY): command.append("--giskard_hub_project_key") command.append(os.environ.get(HF_GSK_HUB_PROJECT_KEY)) if os.environ.get(HF_GSK_HUB_HF_TOKEN): command.append("--giskard_hub_hf_token") command.append(os.environ.get(HF_GSK_HUB_HF_TOKEN)) if os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN): command.append("--giskard_hub_unlock_token") command.append(os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN)) eval_str = f"[{m_id}]<{d_id}({config}, {split} set)>" write_log_to_user_file( uid, f"Start local evaluation on {eval_str}. Please wait for your job to start...\n", ) return command def save_job_to_pipe(task_id, job, description, lock): with lock: pipe.jobs.append((task_id, job, description)) def pop_job_from_pipe(): if len(pipe.jobs) == 0: return job_info = pipe.jobs.pop() pipe.current = job_info[2] task_id = job_info[0] # Link to LOG_FILE log_file_path = Path(LOG_FILE) if log_file_path.exists(): log_file_path.unlink() os.symlink(f"./tmp/{task_id}.log", LOG_FILE) write_log_to_user_file(task_id, f"Running job id {task_id}\n") command = prepare_env_and_get_command(*job_info[1]) with open(f"./tmp/{task_id}.log", "a") as log_file: return_code = None p = subprocess.Popen(command, stdout=log_file, stderr=subprocess.STDOUT) while pipe.current and not return_code: # Wait for finishing try: return_code = p.wait(timeout=1) except subprocess.TimeoutExpired: return_code = None if not pipe.current: # Job interrupted before finishing p.kill() log_file.write(f"\nJob interrupted by admin at {time.asctime()}\n") if return_code: log_file.write(f"\nJob finished with {return_code} at {time.asctime()}\n") pipe.current = None def run_job(): global is_running while is_running: try: pop_job_from_pipe() time.sleep(10) except KeyboardInterrupt: logging.debug("KeyboardInterrupt stop background thread") is_running = False break