Spaces:
Running
Running
File size: 3,821 Bytes
3a0ee14 f04482d 3a0ee14 f04482d 3a0ee14 9e4233f 3a0ee14 be473e6 3a0ee14 be473e6 3a0ee14 9e4233f 3a0ee14 be473e6 3a0ee14 be473e6 3a0ee14 9e4233f f04482d 9e4233f 3a0ee14 9e4233f f04482d e631fcc f04482d e631fcc f04482d e631fcc f04482d 64f50dd f04482d 64f50dd f04482d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
import yaml
import subprocess
import os
YAML_PATH = "./config.yaml"
PIPE_PATH = "./tmp/pipe"
class Dumper(yaml.Dumper):
def increase_indent(self, flow=False, *args, **kwargs):
return super().increase_indent(flow=flow, indentless=False)
# read scanners from yaml file
# return a list of scanners
def read_scanners(path):
scanners = []
with open(path, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
scanners = config.get("detectors", [])
return scanners
# convert a list of scanners to yaml file
def write_scanners(scanners):
print(scanners)
with open(YAML_PATH, "r+") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if config:
config["detectors"] = scanners
# save scanners to detectors in yaml
yaml.dump(config, f, Dumper=Dumper)
# read model_type from yaml file
def read_inference_type(path):
inference_type = ""
with open(path, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
inference_type = config.get("inference_type", "")
return inference_type
# write model_type to yaml file
def write_inference_type(use_inference):
with open(YAML_PATH, "r+") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if use_inference:
config["inference_type"] = 'hf_inference_api'
else:
config["inference_type"] = 'hf_pipeline'
# save inference_type to inference_type in yaml
yaml.dump(config, f, Dumper=Dumper)
# read column mapping from yaml file
def read_column_mapping(path):
column_mapping = {}
with open(path, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
column_mapping = config.get("column_mapping", dict())
return column_mapping
# write column mapping to yaml file
def write_column_mapping(mapping):
with open(YAML_PATH, "r") as f:
config = yaml.load(f, Loader=yaml.FullLoader)
if config is None:
return
if mapping is None and "column_mapping" in config.keys():
del config["column_mapping"]
else:
config["column_mapping"] = mapping
with open(YAML_PATH, "w") as f:
# save column_mapping to column_mapping in yaml
yaml.dump(config, f, Dumper=Dumper)
# convert column mapping dataframe to json
def convert_column_mapping_to_json(df, label=""):
column_mapping = {}
column_mapping[label] = []
for _, row in df.iterrows():
column_mapping[label].append(row.tolist())
return column_mapping
def write_log_to_user_file(id, log):
with open(f"./tmp/{id}_log", "a") as f:
f.write(log)
def save_job_to_pipe(id, job, lock):
if not os.path.exists('./tmp'):
os.makedirs('./tmp')
job = [str(i) for i in job]
job = ",".join(job)
print(job)
with lock:
with open(PIPE_PATH, "a") as f:
# write each element in job
f.write(f'{id}@{job}\n')
def pop_job_from_pipe():
if not os.path.exists(PIPE_PATH):
return
with open(PIPE_PATH, "r") as f:
job = f.readline().strip()
remaining = f.readlines()
f.close()
print(job, remaining, ">>>>")
with open(PIPE_PATH, "w") as f:
f.write("\n".join(remaining))
f.close()
if len(job) == 0:
return
job_info = job.split('\n')[0].split("@")
if len(job_info) != 2:
raise ValueError("Invalid job info: ", job_info)
write_log_to_user_file(job_info[0], f"Running job {job_info}")
command = job_info[1].split(",")
write_log_to_user_file(job_info[0], f"Running command {command}")
log_file = open(f"./tmp/{job_info[0]}_log", "a")
subprocess.Popen(
command,
cwd=os.path.join(os.path.dirname(os.path.realpath(__file__)), "cicd"),
stdout=log_file,
stderr=log_file,
)
|