Spaces:
Running
Running
import os | |
import subprocess | |
import yaml | |
import pipe | |
YAML_PATH = "./configs" | |
class Dumper(yaml.Dumper): | |
def increase_indent(self, flow=False, *args, **kwargs): | |
return super().increase_indent(flow=flow, indentless=False) | |
def get_yaml_path(uid): | |
if not os.path.exists(YAML_PATH): | |
os.makedirs(YAML_PATH) | |
if not os.path.exists(f"{YAML_PATH}/{uid}_config.yaml"): | |
os.system(f"cp {YAML_PATH}/config.yaml {YAML_PATH}/{uid}_config.yaml") | |
return f"{YAML_PATH}/{uid}_config.yaml" | |
# read scanners from yaml file | |
# return a list of scanners | |
def read_scanners(uid): | |
scanners = [] | |
with open(get_yaml_path(uid), "r") as f: | |
config = yaml.load(f, Loader=yaml.FullLoader) | |
scanners = config.get("detectors", []) | |
return scanners | |
# convert a list of scanners to yaml file | |
def write_scanners(scanners, uid): | |
with open(get_yaml_path(uid), "r+") as f: | |
config = yaml.load(f, Loader=yaml.FullLoader) | |
if config: | |
config["detectors"] = scanners | |
# save scanners to detectors in yaml | |
yaml.dump(config, f, Dumper=Dumper) | |
# read model_type from yaml file | |
def read_inference_type(uid): | |
inference_type = "" | |
with open(get_yaml_path(uid), "r") as f: | |
config = yaml.load(f, Loader=yaml.FullLoader) | |
inference_type = config.get("inference_type", "") | |
return inference_type | |
# write model_type to yaml file | |
def write_inference_type(use_inference, uid): | |
with open(get_yaml_path(uid), "r+") as f: | |
config = yaml.load(f, Loader=yaml.FullLoader) | |
if use_inference: | |
config["inference_type"] = "hf_inference_api" | |
else: | |
config["inference_type"] = "hf_pipeline" | |
# save inference_type to inference_type in yaml | |
yaml.dump(config, f, Dumper=Dumper) | |
# read column mapping from yaml file | |
def read_column_mapping(uid): | |
column_mapping = {} | |
with open(get_yaml_path(uid), "r") as f: | |
config = yaml.load(f, Loader=yaml.FullLoader) | |
if config: | |
column_mapping = config.get("column_mapping", dict()) | |
return column_mapping | |
# write column mapping to yaml file | |
def write_column_mapping(mapping, uid): | |
with open(get_yaml_path(uid), "r") as f: | |
config = yaml.load(f, Loader=yaml.FullLoader) | |
if config is None: | |
return | |
if mapping is None and "column_mapping" in config.keys(): | |
del config["column_mapping"] | |
else: | |
config["column_mapping"] = mapping | |
with open(get_yaml_path(uid), "w") as f: | |
# save column_mapping to column_mapping in yaml | |
yaml.dump(config, f, Dumper=Dumper) | |
# convert column mapping dataframe to json | |
def convert_column_mapping_to_json(df, label=""): | |
column_mapping = {} | |
column_mapping[label] = [] | |
for _, row in df.iterrows(): | |
column_mapping[label].append(row.tolist()) | |
return column_mapping | |
def get_logs_file(uid): | |
try: | |
file = open(f"./tmp/{uid}_log", "r") | |
return file.read() | |
except Exception: | |
return "Log file does not exist" | |
def write_log_to_user_file(id, log): | |
with open(f"./tmp/{id}_log", "a") as f: | |
f.write(log) | |
def save_job_to_pipe(id, job, lock): | |
with lock: | |
pipe.jobs.append((id, job)) | |
def pop_job_from_pipe(): | |
if len(pipe.jobs) == 0: | |
return | |
job_info = pipe.jobs.pop() | |
write_log_to_user_file(job_info[0], f"Running job id {job_info[0]}\n") | |
command = job_info[1] | |
log_file = open(f"./tmp/{job_info[0]}_log", "a") | |
subprocess.Popen( | |
command, | |
cwd=os.path.join(os.path.dirname(os.path.realpath(__file__)), "cicd"), | |
stdout=log_file, | |
stderr=log_file, | |
) | |