from typing import List, Union
from copy import deepcopy
from collections import namedtuple
from pathlib import Path
import argparse
from argparse import RawDescriptionHelpFormatter
import yaml
from pydantic import BaseModel as _Base
class BaseConf(_Base):
class Config:
validate_all = True
allow_mutation = True
extra = "ignore"
def SingleOrList(inner_type):
return Union[inner_type, List[inner_type]]
def optional_load_config(fname="config.yml"):
cfg = {}
conf_fname = Path.cwd() / fname
if conf_fname.is_file():
with"r") as f:
raw =
print("loaded config\n ")
print(raw) # yaml raw itself is well formatted
cfg = yaml.safe_load(raw)
return cfg
def write_full_config(cfg_obj, fname="full_config.yml"):
cfg = cfg_obj.dict()
cfg = _dict_to_yaml(cfg)
print(f"\n--- full config ---\n\n{cfg}\n")
with (Path.cwd() / fname).open("w") as f:
def argparse_cfg_template(curr_cfgs):
parser = argparse.ArgumentParser(
description='Manual spec of configs',
epilog=f'curr cfgs:\n\n{_dict_to_yaml(curr_cfgs)}',
_, args = parser.parse_known_args()
clauses = []
for i in range(0, len(args), 2):
assert args[i][:2] == "--", "please start args with --"
clauses.append({args[i][2:]: args[i+1]})
print(f"cmdline clauses: {clauses}")
maker = ConfigMaker(curr_cfgs)
for clu in clauses:
final = maker.state.copy()
return final
def _dict_to_yaml(arg):
return yaml.safe_dump(arg, sort_keys=False, allow_unicode=True)
def dispatch(module):
cfg = optional_load_config()
cfg = module(**cfg).dict()
cfg = argparse_cfg_template(cfg) # cmdline takes priority
mod = module(**cfg)
# below are some support tools
class ConfigMaker():
CMD = namedtuple('cmd', field_names=['sub', 'verb', 'objs'])
VERBS = ('add', 'replace', 'del')
def __init__(self, base_node):
self.state = base_node
self.clauses = []
def clone(self):
return deepcopy(self)
def execute_clause(self, raw_clause):
cls = self.__class__
assert isinstance(raw_clause, (str, dict))
if isinstance(raw_clause, dict):
assert len(raw_clause) == 1, \
"a clause can only have 1 statement: {} clauses in {}".format(
len(raw_clause), raw_clause
cmd = list(raw_clause.keys())[0]
arg = raw_clause[cmd]
cmd = raw_clause
arg = None
cmd = self.parse_clause_cmd(cmd)
tracer = NodeTracer(self.state)
if cmd.verb == cls.VERBS[0]:
tracer.add(cmd.objs, arg)
elif cmd.verb == cls.VERBS[1]:
tracer.replace(cmd.objs, arg)
elif cmd.verb == cls.VERBS[2]:
assert isinstance(raw_clause, str)
self.state = tracer.state
def parse_clause_cmd(cls, input):
input: a string to be parsed
1. First test whether a verb is present
2. If not present, then str is a single subject, and verb is replace
This is a syntactical sugar that makes writing config easy
3. If a verb is found, whatever comes before is a subject, and after the
4. Handle the edge cases properly. Below are expected parse outputs
input sub verb obj
--- No verb
'' '' replace []
'a.b' 'a.b' replace []
'add' '' add []
'P Q' err: 2 subjects
--- Verb present
'T add' 'T' add []
'T del a b' 'T' del [a, b]
'P Q add a' err: 2 subjects
'P add del b' err: 2 verbs
assert isinstance(input, str)
input = input.split()
objs = []
sub = ''
verb, verb_inx = cls.scan_for_verb(input)
if verb is None:
assert len(input) <= 1, "no verb present; more than 1 subject: {}"\
sub = input[0] if len(input) == 1 else ''
verb = cls.VERBS[1]
assert not verb_inx > 1, 'verb {} at inx {}; more than 1 subject in: {}'\
.format(verb, verb_inx, input)
sub = input[0] if verb_inx == 1 else ''
objs = input[verb_inx + 1:]
cmd = cls.CMD(sub=sub, verb=verb, objs=objs)
return cmd
def scan_for_verb(cls, input_list):
assert isinstance(input_list, list)
counts = [ input_list.count(v) for v in cls.VERBS ]
presence = [ cnt > 0 for cnt in counts ]
if sum(presence) == 0:
return None, -1
elif sum(presence) > 1:
raise ValueError("multiple verbs discovered in {}".format(input_list))
if max(counts) > 1:
raise ValueError("verbs repeated in cmd: {}".format(input_list))
# by now, there is 1 verb that has occured exactly 1 time
verb = cls.VERBS[presence.index(1)]
inx = input_list.index(verb)
return verb, inx
class NodeTracer():
def __init__(self, src_node):
A src node can be either a list or dict
assert isinstance(src_node, (list, dict))
# these are movable pointers
self.child_token = "_" # init token can be anything
self.parent = {self.child_token: src_node}
# these are permanent pointers at the root
self.root_child_token = self.child_token
self.root = self.parent
def state(self):
return self.root[self.root_child_token]
def pointed(self):
return self.parent[self.child_token]
def advance_pointer(self, path):
if len(path) == 0:
path_list = list(
map(lambda x: int(x) if str.isdigit(x) else x, path.split('.'))
for i, token in enumerate(path_list):
self.parent = self.pointed
self.child_token = token
except (IndexError, KeyError):
raise ValueError(
"During the tracing of {}, {}-th token '{}'"
" is not present in node {}".format(
path, i, self.child_token, self.state
def replace(self, objs, arg):
assert len(objs) == 0
val_type = type(self.parent[self.child_token])
# this is such an unfortunate hack
# turn everything to string, so that eval could work
# some of the clauses come from cmdline, some from yaml files for sow.
arg = str(arg)
if val_type == str:
arg = eval(arg)
assert type(arg) == val_type, \
f"require {val_type.__name__}, given {type(arg).__name__}"
self.parent[self.child_token] = arg