Spaces:
Build error
Build error
"""Parse TAP, return two-item tuple: (list of shape objects, list of warnings).""" | |
import re | |
from collections import defaultdict | |
from csv import DictReader | |
from io import StringIO as StringBuffer | |
from dataclasses import asdict | |
from dctap.exceptions import DctapError, NoDataError | |
from dctap.tapclasses import TAPShape, TAPStatementTemplate | |
from dctap.utils import coerce_concise | |
def csvreader( | |
csvfile_str=None, | |
config_dict=None, | |
open_csvfile_obj=None, | |
shape_class=TAPShape, | |
state_class=TAPStatementTemplate, | |
): | |
"""From open CSV file object, return shapes dict.""" | |
if csvfile_str: | |
(csvrows, csvwarns) = _get_rows( | |
csvfile_str=csvfile_str, | |
config_dict=config_dict, | |
) | |
elif open_csvfile_obj: | |
(csvrows, csvwarns) = _get_rows( | |
open_csvfile_obj=open_csvfile_obj, | |
config_dict=config_dict, | |
) | |
else: | |
raise DctapError("No data provided.") | |
(tapshapes, tapwarns) = _get_tapshapes( | |
rows=csvrows, | |
config_dict=config_dict, | |
shape_class=shape_class, | |
state_class=state_class, | |
) | |
tapwarns = {**csvwarns, **tapwarns} | |
prefixes_used = _get_prefixes_actually_used(csvrows) | |
tapshapes = _add_namespaces(tapshapes, config_dict, prefixes_used) | |
tapshapes = _add_tapwarns(tapshapes, tapwarns) | |
return tapshapes | |
def _add_namespaces(tapshapes=None, config_dict=None, prefixes_used=None): | |
"""Adds key 'namespaces' to tapshapes dict.""" | |
tapshapes["namespaces"] = {} | |
if config_dict.get("prefixes"): | |
for prefix in prefixes_used: | |
if config_dict["prefixes"].get(prefix): | |
tapshapes["namespaces"][prefix] = config_dict["prefixes"].get(prefix) | |
return tapshapes | |
def _add_tapwarns(tapshapes=None, tapwarns=None): | |
"""Adds key 'warnings' to tapshapes dict.""" | |
tapshapes["warnings"] = tapwarns | |
return tapshapes | |
def _get_prefixes_actually_used(csvrows): | |
"""List strings before colons in values of elements that could take URI prefixes.""" | |
prefixes = set() | |
for row in csvrows: | |
for element in [ | |
"shapeID", | |
"propertyID", | |
"valueDataType", | |
"valueShape", | |
]: | |
if row.get(element): | |
prefix_plus_uri_pair = re.match(r"([^:]*):", row.get(element)) | |
if prefix_plus_uri_pair: # if there is at least one | |
prefix_as_provided = prefix_plus_uri_pair.group(0) | |
prefixes.add(prefix_as_provided) | |
if row.get("valueConstraint"): | |
pattern = r"\b\w+:" | |
used_in_valueconstraint = re.findall(pattern, row.get("valueConstraint")) | |
prefixes = set(list(prefixes) + list(used_in_valueconstraint)) | |
return list(prefixes) | |
def _get_rows( | |
csvfile_str=None, | |
config_dict=None, | |
open_csvfile_obj=None, | |
): | |
"""Extract from _io.TextIOWrapper object a list of CSV file rows as dicts.""" | |
# pylint: disable=too-many-locals | |
# pylint: disable=too-many-branches | |
if csvfile_str: | |
csvfile_contents_str = csvfile_str | |
elif open_csvfile_obj: | |
csvfile_contents_str = open_csvfile_obj.read() | |
else: | |
raise NoDataError("No data to process.") | |
tmp_buffer = StringBuffer(csvfile_contents_str) | |
csvlines_stripped = [line.strip() for line in tmp_buffer] | |
csvlines_stripped = [ | |
line for line in csvlines_stripped if not re.match("#", line.strip()) | |
] | |
if len(csvlines_stripped) < 2: | |
raise NoDataError("No data to process.") | |
raw_header_line_list = csvlines_stripped[0].split(",") | |
new_header_line_list = [] | |
recognized_elements = config_dict.get("csv_elements") | |
xtra_shems = config_dict.get("extra_shape_elements") | |
xtra_stems = config_dict.get("extra_statement_template_elements") | |
if xtra_shems: | |
recognized_elements.extend(xtra_shems) | |
for element in xtra_shems: | |
config_dict["element_aliases"][element.lower()] = element | |
if xtra_stems: | |
recognized_elements.extend(xtra_stems) | |
for element in xtra_stems: | |
config_dict["element_aliases"][element.lower()] = element | |
recognized_elements = [elem.lower() for elem in recognized_elements] | |
for column in raw_header_line_list: | |
column = coerce_concise(column) | |
column = _normalize_element_name(column, config_dict.get("element_aliases")) | |
new_header_line_list.append(column) | |
csv_warns = defaultdict(dict) | |
for column in new_header_line_list: | |
if column.lower() not in recognized_elements: | |
warn = f"Non-DCTAP element '{column}' not configured as extra element." | |
csv_warns["csv"] = {} | |
csv_warns["csv"]["column"] = [] | |
csv_warns["csv"]["column"].append(warn) | |
new_header_line_str = ",".join(new_header_line_list) | |
csvlines_stripped[0] = new_header_line_str | |
if not csvlines_stripped[0]: | |
raise NoDataError("No data to process.") | |
if "propertyID" not in csvlines_stripped[0]: | |
raise DctapError("Valid DCTAP CSV must have a 'propertyID' column.") | |
tmp_buffer2 = StringBuffer("".join([line + "\n" for line in csvlines_stripped])) | |
csv_rows = list(DictReader(tmp_buffer2)) | |
for row in csv_rows: | |
for key, value in row.items(): | |
if isinstance(value, str): # ignore if instance of NoneType or list | |
row[key] = value.strip() | |
csv_warns = dict(csv_warns) | |
return (csv_rows, csv_warns) | |
def _get_tapshapes(rows=None, config_dict=None, shape_class=None, state_class=None): | |
"""Return tuple: (shapes dict, warnings dict).""" | |
# pylint: disable=too-many-locals | |
# pylint: disable=too-many-branches | |
# pylint: disable=too-many-statements | |
default_shape_id = config_dict["default_shape_identifier"] | |
main_stems = config_dict.get("statement_template_elements") | |
xtra_stems = config_dict.get("extra_statement_template_elements") | |
shapes = {} # dict for shapeID-to-TAPShape_list | |
warns = defaultdict(dict) # dict for shapeID-to-warnings_list | |
for row in rows: | |
shape_id = "" | |
if row.get("propertyID"): | |
if row.get("shapeID"): | |
shape_id = row.get("shapeID") | |
elif not row.get("shapeID"): | |
try: | |
shape_id = list(shapes)[-1] | |
except IndexError: | |
shape_id = row["shapeID"] = default_shape_id | |
elif row.get("shapeID"): | |
shape_id = row.get("shapeID") | |
if shape_id: | |
if shape_id not in list(shapes): | |
shape_obj = _make_shape( | |
row_dict=row, | |
config_dict=config_dict, | |
shape_class=shape_class, | |
) | |
shape_obj.normalize(config_dict) | |
shapes[shape_id] = shape_obj | |
warns[shape_id] = {} | |
shape_warnings = shape_obj.get_warnings() | |
for (elem, warn) in shape_warnings.items(): | |
try: | |
warns[shape_id][elem].append(warn) | |
except KeyError: | |
warns[shape_id][elem] = [] | |
warns[shape_id][elem].append(warn) | |
if not row.get("propertyID"): | |
continue | |
state_class_obj = state_class() | |
for col in row: | |
if col in main_stems: | |
setattr(state_class_obj, col, row[col]) | |
elif col in xtra_stems: | |
state_class_obj.state_extras[col] = row[col] | |
state_class_obj.normalize(config_dict) | |
shapes[shape_id].state_list.append(state_class_obj) | |
warns_dict = dict(warns) | |
shapes_dict = {} | |
shapes_dict["shapes"] = [] | |
for shape_obj in list(shapes.values()): | |
sh_dict = asdict(shape_obj) | |
sh_dict["statement_templates"] = sh_dict.pop("state_list") | |
shapes_dict["shapes"].append(sh_dict) | |
shapes_dict = _simplify(shapes_dict) | |
return (shapes_dict, warns_dict) | |
def _make_shape(row_dict=None, config_dict=None, shape_class=None): | |
"""Populates shape fields of dataclass shape object from dict for one row. | |
Args: | |
row_dict: Dictionary of all columns headers (keys) and cell values (values) | |
found in a given row, with no distinction between shape elements and | |
statement template elements. | |
config_dict: Dictionary of settings, built-in or as read from config file. | |
Returns: | |
Unpopulated instance of shape class, for example: | |
TAPShape(shapeID='', state_list=[], shape_warns={}, state_extras={}, ...) | |
""" | |
main_shems = config_dict.get("shape_elements") | |
xtra_shems = config_dict.get("extra_shape_elements") | |
tapshape_obj = shape_class() | |
for key in row_dict: | |
if key in main_shems: | |
setattr(tapshape_obj, key, row_dict[key]) | |
elif key in xtra_shems: | |
tapshape_obj.shape_extras[key] = row_dict[key] | |
return tapshape_obj | |
def _normalize_element_name(some_str, element_aliases_dict=None): | |
"""Given header string, return converted if aliased, else return unchanged.""" | |
some_str = coerce_concise(some_str) | |
if element_aliases_dict: | |
for key in element_aliases_dict.keys(): | |
if key == some_str: | |
some_str = element_aliases_dict[key] | |
return some_str | |
def _simplify(shapes_dict): | |
"""Remove elements from shapes dictionary with falsy values.""" | |
for shape in shapes_dict["shapes"]: | |
for state in shape["statement_templates"]: | |
if state.get("state_extras"): | |
for (k, v) in state["state_extras"].items(): | |
state[k] = v | |
del state["state_extras"] | |
if state.get("state_warns"): | |
del state["state_warns"] | |
for empty_element in [key for key in state if not state[key]]: | |
del state[empty_element] | |
if shape.get("shape_extras"): | |
for (k, v) in shape["shape_extras"].items(): | |
shape[k] = v | |
del shape["shape_extras"] | |
if shape.get("shape_warns"): | |
del shape["shape_warns"] | |
for empty_element in [key for key in shape if not shape[key]]: | |
del shape[empty_element] | |
return shapes_dict | |