|
import os |
|
from typing import Sequence |
|
|
|
import dpath |
|
from dpath import MutableSequence |
|
from dpath.segments import extend |
|
|
|
|
|
def is_subpath(subpath, fullpath): |
|
|
|
subpath = os.path.normpath(subpath) |
|
fullpath = os.path.normpath(fullpath) |
|
|
|
|
|
subpath_components = subpath.split(os.path.sep) |
|
fullpath_components = fullpath.split(os.path.sep) |
|
|
|
|
|
return fullpath_components[: len(subpath_components)] == subpath_components |
|
|
|
|
|
def dpath_get(dic, query_path): |
|
return [v for _, v in dpath.search(dic, query_path, yielded=True)] |
|
|
|
|
|
def dpath_set_one(dic, query_path, value): |
|
n = dpath.set(dic, query_path, value) |
|
if n != 1: |
|
raise ValueError(f'query "{query_path}" matched multiple items in dict: {dic}') |
|
|
|
|
|
def dict_delete(dic, query_path): |
|
n = dpath.delete(dic, query_path) |
|
|
|
|
|
def dict_creator(current, segments, i, hints=()): |
|
""" |
|
Create missing path components. If the segment is an int, then it will |
|
create a list. Otherwise a dictionary is created. |
|
|
|
set(obj, segments, value) -> obj |
|
""" |
|
segment = segments[i] |
|
length = len(segments) |
|
|
|
if isinstance(current, Sequence): |
|
segment = int(segment) |
|
|
|
if isinstance(current, MutableSequence): |
|
extend(current, segment) |
|
|
|
|
|
if i < len(hints): |
|
current[segment] = hints[i][1]() |
|
else: |
|
|
|
|
|
if i + 1 < length: |
|
segment_next = segments[i + 1] |
|
else: |
|
segment_next = None |
|
|
|
if isinstance(segment_next, int) or (isinstance(segment_next, str) and segment_next.isdecimal()): |
|
current[segment] = [] |
|
else: |
|
current[segment] = {} |
|
|
|
|
|
def dpath_set(dic, query_path, value, not_exist_ok=True): |
|
paths = [p for p, _ in dpath.search(dic, query_path, yielded=True)] |
|
if len(paths) == 0 and not_exist_ok: |
|
dpath.new(dic, query_path, value, creator=dict_creator) |
|
else: |
|
if len(paths) != 1: |
|
raise ValueError(f'query "{query_path}" matched {len(paths)} items in dict: {dic}. should match only one.') |
|
for path in paths: |
|
dpath_set_one(dic, path, value) |
|
|
|
|
|
def dpath_set_multiple(dic, query_path, values, not_exist_ok=True): |
|
paths = [p for p, _ in dpath.search(dic, query_path, yielded=True)] |
|
if len(paths) == 0: |
|
if not_exist_ok: |
|
raise ValueError(f"Cannot set multiple values to non-existing path: {query_path}") |
|
raise ValueError(f'query "{query_path}" did not match any item in dict: {dic}') |
|
else: |
|
if len(paths) != len(values): |
|
raise ValueError( |
|
f'query "{query_path}" matched {len(paths)} items in dict: {dic} but {len(values)} values are provided. should match only one.' |
|
) |
|
for path, value in zip(paths, values): |
|
dpath_set_one(dic, path, value) |
|
|
|
|
|
def dict_get(dic, query, use_dpath=True, not_exist_ok=False, default=None): |
|
if use_dpath: |
|
values = dpath_get(dic, query) |
|
if len(values) == 0 and not_exist_ok: |
|
return default |
|
elif len(values) == 0: |
|
raise ValueError(f'query "{query}" did not match any item in dict: {dic}') |
|
|
|
if len(values) == 1 and "*" not in query and "," not in query: |
|
return values[0] |
|
|
|
return values |
|
else: |
|
if not_exist_ok: |
|
return dic.get(query, default) |
|
else: |
|
return dic[query] |
|
|
|
|
|
def dict_set(dic, query, value, use_dpath=True, not_exist_ok=True, set_multiple=False): |
|
if use_dpath: |
|
if set_multiple: |
|
dpath_set_multiple(dic, query, value, not_exist_ok=not_exist_ok) |
|
else: |
|
dpath_set(dic, query, value, not_exist_ok=not_exist_ok) |
|
else: |
|
dic[query] = value |
|
|