dctap2shex / dctap /csvreader.py
rimpo's picture
feat: dctap-python files
28c59d9
"""Parse TAP, return two-item tuple: (list of shape objects, list of warnings)."""
import re
from collections import defaultdict
from csv import DictReader
from io import StringIO as StringBuffer
from dataclasses import asdict
from dctap.exceptions import DctapError, NoDataError
from dctap.tapclasses import TAPShape, TAPStatementTemplate
from dctap.utils import coerce_concise
def csvreader(
csvfile_str=None,
config_dict=None,
open_csvfile_obj=None,
shape_class=TAPShape,
state_class=TAPStatementTemplate,
):
"""From open CSV file object, return shapes dict."""
if csvfile_str:
(csvrows, csvwarns) = _get_rows(
csvfile_str=csvfile_str,
config_dict=config_dict,
)
elif open_csvfile_obj:
(csvrows, csvwarns) = _get_rows(
open_csvfile_obj=open_csvfile_obj,
config_dict=config_dict,
)
else:
raise DctapError("No data provided.")
(tapshapes, tapwarns) = _get_tapshapes(
rows=csvrows,
config_dict=config_dict,
shape_class=shape_class,
state_class=state_class,
)
tapwarns = {**csvwarns, **tapwarns}
prefixes_used = _get_prefixes_actually_used(csvrows)
tapshapes = _add_namespaces(tapshapes, config_dict, prefixes_used)
tapshapes = _add_tapwarns(tapshapes, tapwarns)
return tapshapes
def _add_namespaces(tapshapes=None, config_dict=None, prefixes_used=None):
"""Adds key 'namespaces' to tapshapes dict."""
tapshapes["namespaces"] = {}
if config_dict.get("prefixes"):
for prefix in prefixes_used:
if config_dict["prefixes"].get(prefix):
tapshapes["namespaces"][prefix] = config_dict["prefixes"].get(prefix)
return tapshapes
def _add_tapwarns(tapshapes=None, tapwarns=None):
"""Adds key 'warnings' to tapshapes dict."""
tapshapes["warnings"] = tapwarns
return tapshapes
def _get_prefixes_actually_used(csvrows):
"""List strings before colons in values of elements that could take URI prefixes."""
prefixes = set()
for row in csvrows:
for element in [
"shapeID",
"propertyID",
"valueDataType",
"valueShape",
]:
if row.get(element):
prefix_plus_uri_pair = re.match(r"([^:]*):", row.get(element))
if prefix_plus_uri_pair: # if there is at least one
prefix_as_provided = prefix_plus_uri_pair.group(0)
prefixes.add(prefix_as_provided)
if row.get("valueConstraint"):
pattern = r"\b\w+:"
used_in_valueconstraint = re.findall(pattern, row.get("valueConstraint"))
prefixes = set(list(prefixes) + list(used_in_valueconstraint))
return list(prefixes)
def _get_rows(
csvfile_str=None,
config_dict=None,
open_csvfile_obj=None,
):
"""Extract from _io.TextIOWrapper object a list of CSV file rows as dicts."""
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
if csvfile_str:
csvfile_contents_str = csvfile_str
elif open_csvfile_obj:
csvfile_contents_str = open_csvfile_obj.read()
else:
raise NoDataError("No data to process.")
tmp_buffer = StringBuffer(csvfile_contents_str)
csvlines_stripped = [line.strip() for line in tmp_buffer]
csvlines_stripped = [
line for line in csvlines_stripped if not re.match("#", line.strip())
]
if len(csvlines_stripped) < 2:
raise NoDataError("No data to process.")
raw_header_line_list = csvlines_stripped[0].split(",")
new_header_line_list = []
recognized_elements = config_dict.get("csv_elements")
xtra_shems = config_dict.get("extra_shape_elements")
xtra_stems = config_dict.get("extra_statement_template_elements")
if xtra_shems:
recognized_elements.extend(xtra_shems)
for element in xtra_shems:
config_dict["element_aliases"][element.lower()] = element
if xtra_stems:
recognized_elements.extend(xtra_stems)
for element in xtra_stems:
config_dict["element_aliases"][element.lower()] = element
recognized_elements = [elem.lower() for elem in recognized_elements]
for column in raw_header_line_list:
column = coerce_concise(column)
column = _normalize_element_name(column, config_dict.get("element_aliases"))
new_header_line_list.append(column)
csv_warns = defaultdict(dict)
for column in new_header_line_list:
if column.lower() not in recognized_elements:
warn = f"Non-DCTAP element '{column}' not configured as extra element."
csv_warns["csv"] = {}
csv_warns["csv"]["column"] = []
csv_warns["csv"]["column"].append(warn)
new_header_line_str = ",".join(new_header_line_list)
csvlines_stripped[0] = new_header_line_str
if not csvlines_stripped[0]:
raise NoDataError("No data to process.")
if "propertyID" not in csvlines_stripped[0]:
raise DctapError("Valid DCTAP CSV must have a 'propertyID' column.")
tmp_buffer2 = StringBuffer("".join([line + "\n" for line in csvlines_stripped]))
csv_rows = list(DictReader(tmp_buffer2))
for row in csv_rows:
for key, value in row.items():
if isinstance(value, str): # ignore if instance of NoneType or list
row[key] = value.strip()
csv_warns = dict(csv_warns)
return (csv_rows, csv_warns)
def _get_tapshapes(rows=None, config_dict=None, shape_class=None, state_class=None):
"""Return tuple: (shapes dict, warnings dict)."""
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
default_shape_id = config_dict["default_shape_identifier"]
main_stems = config_dict.get("statement_template_elements")
xtra_stems = config_dict.get("extra_statement_template_elements")
shapes = {} # dict for shapeID-to-TAPShape_list
warns = defaultdict(dict) # dict for shapeID-to-warnings_list
for row in rows:
shape_id = ""
if row.get("propertyID"):
if row.get("shapeID"):
shape_id = row.get("shapeID")
elif not row.get("shapeID"):
try:
shape_id = list(shapes)[-1]
except IndexError:
shape_id = row["shapeID"] = default_shape_id
elif row.get("shapeID"):
shape_id = row.get("shapeID")
if shape_id:
if shape_id not in list(shapes):
shape_obj = _make_shape(
row_dict=row,
config_dict=config_dict,
shape_class=shape_class,
)
shape_obj.normalize(config_dict)
shapes[shape_id] = shape_obj
warns[shape_id] = {}
shape_warnings = shape_obj.get_warnings()
for (elem, warn) in shape_warnings.items():
try:
warns[shape_id][elem].append(warn)
except KeyError:
warns[shape_id][elem] = []
warns[shape_id][elem].append(warn)
if not row.get("propertyID"):
continue
state_class_obj = state_class()
for col in row:
if col in main_stems:
setattr(state_class_obj, col, row[col])
elif col in xtra_stems:
state_class_obj.state_extras[col] = row[col]
state_class_obj.normalize(config_dict)
shapes[shape_id].state_list.append(state_class_obj)
warns_dict = dict(warns)
shapes_dict = {}
shapes_dict["shapes"] = []
for shape_obj in list(shapes.values()):
sh_dict = asdict(shape_obj)
sh_dict["statement_templates"] = sh_dict.pop("state_list")
shapes_dict["shapes"].append(sh_dict)
shapes_dict = _simplify(shapes_dict)
return (shapes_dict, warns_dict)
def _make_shape(row_dict=None, config_dict=None, shape_class=None):
"""Populates shape fields of dataclass shape object from dict for one row.
Args:
row_dict: Dictionary of all columns headers (keys) and cell values (values)
found in a given row, with no distinction between shape elements and
statement template elements.
config_dict: Dictionary of settings, built-in or as read from config file.
Returns:
Unpopulated instance of shape class, for example:
TAPShape(shapeID='', state_list=[], shape_warns={}, state_extras={}, ...)
"""
main_shems = config_dict.get("shape_elements")
xtra_shems = config_dict.get("extra_shape_elements")
tapshape_obj = shape_class()
for key in row_dict:
if key in main_shems:
setattr(tapshape_obj, key, row_dict[key])
elif key in xtra_shems:
tapshape_obj.shape_extras[key] = row_dict[key]
return tapshape_obj
def _normalize_element_name(some_str, element_aliases_dict=None):
"""Given header string, return converted if aliased, else return unchanged."""
some_str = coerce_concise(some_str)
if element_aliases_dict:
for key in element_aliases_dict.keys():
if key == some_str:
some_str = element_aliases_dict[key]
return some_str
def _simplify(shapes_dict):
"""Remove elements from shapes dictionary with falsy values."""
for shape in shapes_dict["shapes"]:
for state in shape["statement_templates"]:
if state.get("state_extras"):
for (k, v) in state["state_extras"].items():
state[k] = v
del state["state_extras"]
if state.get("state_warns"):
del state["state_warns"]
for empty_element in [key for key in state if not state[key]]:
del state[empty_element]
if shape.get("shape_extras"):
for (k, v) in shape["shape_extras"].items():
shape[k] = v
del shape["shape_extras"]
if shape.get("shape_warns"):
del shape["shape_warns"]
for empty_element in [key for key in shape if not shape[key]]:
del shape[empty_element]
return shapes_dict