File size: 8,801 Bytes
cc69c66 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
"""
This file handles reshaping raw results data into a list of nodes that match
what we expect the model output to be. That is to say, it handles parsing any raw data.
"""
import json
from collections import defaultdict
from functools import reduce
from typing import Any, Dict, Hashable, Optional, Tuple, Union
from models import DataModel
from shared import (
TOP_LEVEL_IDENTIFIERS,
attempt,
get_json_from_model_output,
keep_errors,
on_fail,
)
def handle_parsing_schema_files(expected_location: str, actual_location: str):
raw_reference, raw_generated = read_in_expected_and_actual_json(
expected_location, actual_location
)
read_errors = keep_errors((raw_reference, raw_generated))
if len(read_errors) > 0:
raise ValueError(f"Could not ingest raw data: {read_errors}")
generated_nodes_to_content = try_parsing_actual_model_output(raw_generated)
reference_nodes_to_content = derive_nodes_from_actual_json_output(
parse_json(raw_reference)
)
errors = keep_errors((reference_nodes_to_content, generated_nodes_to_content))
if len(errors) > 0:
raise ValueError(f"Error parsing files: {errors}")
return generated_nodes_to_content, reference_nodes_to_content
def read_in_expected_and_actual_json(
expected_json_location: str, actual_json_location: str
):
unparsed_expected, err = read_json_as_text(expected_json_location)
if err:
return {"error": f"Could not read in expected file. e: {err}"}, None
unparsed_actual, err = read_json_as_text(actual_json_location)
if err:
return None, {"error": str(err)}
return unparsed_expected, unparsed_actual
def read_json_as_text(file_path: str) -> Tuple[Optional[str], Optional[Exception]]:
"""
Reads the contents of a file as text without attempting to parse it as JSON.
"""
try:
with open(file_path, "r", encoding="utf-8") as file:
return file.read(), None
except Exception as e:
return None, e
def try_parsing_actual_model_output(model_output: str):
first_parse_json = parse_json(model_output)
if isinstance(first_parse_json, list):
return {
"error": "Could not parse json file. Model output should not be a list."
}
first_pass_failed = "error" in first_parse_json
recovered_json, errors = (
get_json_from_model_output(model_output) if first_pass_failed else ({}, 0)
)
if errors > 0:
return {"error": "Could not parse json file, no metrics to calculate"}
parsed_json = recovered_json if first_pass_failed else first_parse_json
node_derivation_outcome = on_fail(
attempt(derive_nodes_from_actual_json_output, (parsed_json,)), []
)
if not node_derivation_outcome:
return {"error": f"Could not derive nodes. Parsed json: {parsed_json}"}
return node_derivation_outcome
def find_all_nodes(name_and_contents) -> Dict:
name, contents = name_and_contents
content_contains_nodes = bool(set(contents.keys()) & TOP_LEVEL_IDENTIFIERS)
if content_contains_nodes:
return dict([name_and_contents])
sub_dicts = list(filter(lambda kvp: isinstance(kvp[1], dict), contents.items()))
all_sub_nodes = {}
for sub_name_and_contents in sub_dicts:
sub_nodes = find_all_nodes(sub_name_and_contents)
all_sub_nodes.update(sub_nodes)
return all_sub_nodes
def assign_to_key(key: Hashable):
def add_at_key(
assignment_mapping: Dict[Hashable, Any], mapping_to_add: Dict[Hashable, Any]
):
assignment_id = mapping_to_add[key]
assignment_mapping[assignment_id] = mapping_to_add
return assignment_mapping
return add_at_key
key_exists = lambda key: lambda mapping: key in mapping
def handle_property_correction(all_nodes):
has_incorrect_property_shape = lambda node: (
"properties" in node[1] and isinstance(node[1].get("properties", {}), list)
)
nodes_that_need_corrected = dict(
filter(has_incorrect_property_shape, all_nodes.items())
)
nodes_with_corrected_properties = dict(
map(correct_properties_for_node, nodes_that_need_corrected.items())
)
all_corrected_nodes = {**all_nodes, **nodes_with_corrected_properties}
return all_corrected_nodes
def correct_properties_for_node(node):
"""Maps node's property names to their actual content."""
node_name, node_data = node
properties = node_data["properties"]
identified_properties = list(filter(key_exists("name"), properties))
actual_properties = reduce(assign_to_key("name"), identified_properties, {})
node_data["properties"] = actual_properties
return node
def derive_nodes_from_actual_json_output(json_data: Union[dict, list]):
"""
Find nodes from non-deterministic AI output
"""
if isinstance(json_data, list):
return {}
all_nodes = flatten_all_nodes(json_data)
if json_data.get("nodes", None) is None:
return all_nodes
nodes_with_properties_corrected = handle_property_correction(all_nodes)
return nodes_with_properties_corrected
def flatten_all_nodes(json_data) -> Dict[Hashable, Any]:
"""
Model output could have nested nodes, this extracts them.
"""
nodes = json_data.get("nodes", None)
if nodes is None:
sub_nodes_list = [
find_all_nodes((name, contents))
for name, contents in json_data.items()
if isinstance(contents, dict)
]
else:
sub_nodes_list = [
find_all_nodes((node["name"], node))
for node in nodes
if isinstance(node, dict) and node.get("name") is not None
]
all_nodes = {k: v for sub_nodes in sub_nodes_list for k, v in sub_nodes.items()}
return all_nodes
def aggregate_desc(acc, node):
node_name, node_info = node
desc = node_info.get("description", None)
if desc is not None and isinstance(desc, str):
acc[desc].add(node_name)
return acc
def reform_links(outer_acc, node):
node_name, node_info = node
links = node_info.get("links", [])
collect_links_to_aggregator = lambda inner_acc, link: upsert_set(
inner_acc, (link, node_name)
)
links_to_node_names = reduce(collect_links_to_aggregator, links, outer_acc)
return links_to_node_names
def lens(key, default=None):
"""Simple way to interface with the contents of a dict"""
return lambda d: d.get(key, default)
def aggregate_properties(outer_acc, node):
node_name, node_info = node
properties = node_info.get("properties", {})
is_list = isinstance(properties, list)
property_names = (
list(map(lens("name"), properties)) if is_list else list(properties.keys())
)
aggregate_properties = reduce(
lambda inner_acc, prop_name: upsert_set(inner_acc, (prop_name, node_name)),
property_names,
outer_acc,
)
return aggregate_properties
def conform_node_to_expected_schema(name_to_data_model):
name, dm = name_to_data_model
conform_result = attempt(DataModel.model_validate, (dm,))
model_outcome = (
conform_result.model_dump(exclude_none=True, exclude_unset=True)
if "error" not in conform_result
else {}
)
errors = conform_result if "error" in conform_result else {}
return (name, model_outcome), errors
def aggregate_parsed_file(nodes: dict):
conformed_nodes_result = [
conform_node_to_expected_schema(node) for node in nodes.items()
]
conformed_nodes = [node for node, errors in conformed_nodes_result if not errors]
aggregated_links = reduce(reform_links, conformed_nodes, defaultdict(set))
aggregated_properties = reduce(
aggregate_properties, conformed_nodes, defaultdict(set)
)
aggregated_descriptions = reduce(aggregate_desc, conformed_nodes, defaultdict(set))
parsed_results = {
"node_names": dict(conformed_nodes),
"links": aggregated_links,
"properties": aggregated_properties,
"description": aggregated_descriptions,
}
return parsed_results
def upsert_set(accumulator, kvp):
key, value = kvp
if isinstance(key, Hashable):
accumulator[key].add(value)
return accumulator
def parse_json(json_string: str) -> Optional[Union[dict, list]]:
"""
Safely parses a JSON string into a Python dictionary or list.
Args:
json_string (str): The JSON string to parse
Returns:
dict/list: Parsed JSON data if successful
dict["error"]: If parsing fails, provides error as string in response
"""
try:
return json.loads(json_string)
except json.JSONDecodeError as e:
return {"error": str(e)}
except TypeError as e:
return {"error": str(e)}
|