import os from pathlib import Path import subprocess import yaml import pipe YAML_PATH = "./cicd/configs" LOG_FILE = "temp_log" class Dumper(yaml.Dumper): def increase_indent(self, flow=False, *args, **kwargs): return super().increase_indent(flow=flow, indentless=False) def get_yaml_path(uid): if not os.path.exists(YAML_PATH): os.makedirs(YAML_PATH) if not os.path.exists(f"{YAML_PATH}/{uid}_config.yaml"): os.system(f"cp config.yaml {YAML_PATH}/{uid}_config.yaml") return f"{YAML_PATH}/{uid}_config.yaml" # read scanners from yaml file # return a list of scanners def read_scanners(uid): scanners = [] with open(get_yaml_path(uid), "r") as f: config = yaml.load(f, Loader=yaml.FullLoader) scanners = config.get("detectors", []) return scanners # convert a list of scanners to yaml file def write_scanners(scanners, uid): with open(get_yaml_path(uid), "r") as f: config = yaml.load(f, Loader=yaml.FullLoader) if config: config["detectors"] = scanners # save scanners to detectors in yaml with open(get_yaml_path(uid), "w") as f: yaml.dump(config, f, Dumper=Dumper) # read model_type from yaml file def read_inference_type(uid): inference_type = "" with open(get_yaml_path(uid), "r") as f: config = yaml.load(f, Loader=yaml.FullLoader) inference_type = config.get("inference_type", "") return inference_type # write model_type to yaml file def write_inference_type(use_inference, inference_token, uid): with open(get_yaml_path(uid), "r") as f: config = yaml.load(f, Loader=yaml.FullLoader) if use_inference: config["inference_type"] = "hf_inference_api" config["inference_token"] = inference_token else: config["inference_type"] = "hf_pipeline" # FIXME: A quick and temp fix for missing token config["inference_token"] = "" # save inference_type to inference_type in yaml with open(get_yaml_path(uid), "w") as f: yaml.dump(config, f, Dumper=Dumper) # read column mapping from yaml file def read_column_mapping(uid): column_mapping = {} with open(get_yaml_path(uid), "r") as f: config = yaml.load(f, Loader=yaml.FullLoader) if config: column_mapping = config.get("column_mapping", dict()) f.close() return column_mapping # write column mapping to yaml file def write_column_mapping(mapping, uid): with open(get_yaml_path(uid), "r") as f: config = yaml.load(f, Loader=yaml.FullLoader) f.close() if config is None: return if mapping is None and "column_mapping" in config.keys(): del config["column_mapping"] else: config["column_mapping"] = mapping with open(get_yaml_path(uid), "w") as f: yaml.dump(config, f, Dumper=Dumper) f.close() # convert column mapping dataframe to json def convert_column_mapping_to_json(df, label=""): column_mapping = {} column_mapping[label] = [] for _, row in df.iterrows(): column_mapping[label].append(row.tolist()) return column_mapping def get_logs_file(): try: with open(LOG_FILE, "r") as file: return file.read() except Exception: return "Log file does not exist" def write_log_to_user_file(task_id, log): with open(f"./tmp/{task_id}.log", "a") as f: f.write(log) def save_job_to_pipe(task_id, job, description, lock): with lock: pipe.jobs.append((task_id, job, description)) def pop_job_from_pipe(): if len(pipe.jobs) == 0: return job_info = pipe.jobs.pop() pipe.current = job_info[2] task_id = job_info[0] write_log_to_user_file(task_id, f"Running job id {task_id}\n") command = job_info[1] # Link to LOG_FILE log_file_path = Path(LOG_FILE) if log_file_path.exists(): log_file_path.unlink() os.symlink(f"./tmp/{task_id}.log", LOG_FILE) with open(f"./tmp/{task_id}.log", "a") as log_file: p = subprocess.Popen(command, stdout=log_file, stderr=subprocess.STDOUT) p.wait() pipe.current = None