ssa-perin / mtool /ucca /normalization.py
larkkin's picture
Add code
991f07c
from ucca import layer0, layer1
from ucca.layer0 import NodeTags as L0Tags
from ucca.layer1 import EdgeTags as ETags, NodeTags as L1Tags
NO_MULTIPLE_INCOMING_CATEGORIES = {ETags.Function, ETags.ParallelScene, ETags.Linker, ETags.LinkRelation,
ETags.Connector, ETags.Punctuation, ETags.Terminal}
TOP_CATEGORIES = {ETags.ParallelScene, ETags.Linker, ETags.Function, ETags.Ground, ETags.Punctuation,
ETags.LinkRelation, ETags.LinkArgument, ETags.Connector}
COORDINATED_MAIN_REL = "Coordinated_Main_Rel."
def fparent(node_or_edge):
try:
return node_or_edge.fparent
except AttributeError:
try:
return node_or_edge.parent
except AttributeError:
return node_or_edge.parents[0] if node_or_edge.parents else None
def remove_unmarked_implicits(node):
while node is not None and not node.children and not node.attrib.get("implicit"):
parent = fparent(node)
if parent is None:
break
node.destroy()
node = parent
def remove(parent, child):
if parent is not None:
parent.remove(child)
remove_unmarked_implicits(parent)
def destroy(node_or_edge):
parent = fparent(node_or_edge)
try:
node_or_edge.destroy()
except AttributeError:
parent.remove(node_or_edge)
if parent is not None:
remove_unmarked_implicits(parent)
return parent
def copy_edge(edge, parent=None, child=None, tag=None, attrib=None):
if parent is None:
parent = edge.parent
if child is None:
child = edge.child
if not tag:
categories = [(c.tag, c.slot, c.layer, c.parent) for c in edge.categories]
else:
categories = [(tag,)]
if attrib is None:
attrib = edge.attrib
if parent in child.iter():
# raise ValueError("Created cycle (%s->%s) when trying to normalize '%s'" % (
# "->".join(n.ID for n in child.iter() if parent in n.iter()), child.ID, parent))
return False
parent.add_multiple(categories, child, edge_attrib=attrib)
return True
def replace_center(edge):
if len(edge.parent) == 1 and not edge.parent.parents:
return ETags.ParallelScene
if edge.parent.participants and not edge.parent.is_scene():
return ETags.Process # TODO should be state if the word is a copula
return edge.tag
def replace_edge_tags(node):
for edge in node:
if not edge.attrib.get("remote") and edge.tag == ETags.Center:
edge.tag = replace_center(edge)
elif node.parallel_scenes:
if edge.tag == ETags.Connector:
edge.tag = ETags.Linker
elif edge.tag == ETags.Linker:
edge.tag = ETags.Connector
elif node.is_scene():
if edge.tag == ETags.Elaborator:
edge.tag = ETags.Adverbial
elif edge.tag == ETags.Adverbial:
edge.tag = ETags.Elaborator
def move_elements(node, tags, parent_tags, forward=True):
for edge in node:
if edge.child.tag == L1Tags.Foundational and edge.tag in ((tags,) if isinstance(tags, str) else tags):
try:
parent_edge = min((e for e in node if e != edge and e.child.tag == L1Tags.Foundational),
key=lambda e: abs(((edge.child.start_position - e.child.end_position),
(e.child.start_position - edge.child.end_position))[forward]))
except ValueError:
continue
if parent_edge.tag in ((parent_tags,) if isinstance(parent_tags, str) else parent_tags):
parent = parent_edge.child
if copy_edge(edge, parent=parent):
remove(node, edge)
def move_scene_elements(node):
if node.parallel_scenes:
move_elements(node, tags=(ETags.Relator, ETags.Elaborator, ETags.Center), parent_tags=ETags.ParallelScene)
def move_sub_scene_elements(node):
if node.is_scene():
move_elements(node, tags=(ETags.Elaborator, ETags.Center), parent_tags=ETags.Participant, forward=False)
def separate_scenes(node, l1, top_level=False):
if (node.is_scene() or node.participants) and (top_level or node.parallel_scenes):
edges = list(node)
scene = l1.add_fnode(node, ETags.ParallelScene)
for edge in edges:
if edge.tag not in (ETags.ParallelScene, ETags.Punctuation, ETags.Linker, ETags.Ground):
if copy_edge(edge, parent=scene):
remove(node, edge)
def lowest_common_ancestor(*nodes):
parents = [nodes[0]] if nodes else []
while parents:
for parent in parents:
if parent.tag == L1Tags.Foundational and (not parent.terminals or nodes[1:]) \
and all(n in parent.iter() for n in nodes[1:]):
return parent
parents = [p for n in parents for p in n.parents]
return None
def nearest_word(l0, position, step):
while True:
position += step
try:
terminal = l0.by_position(position)
except IndexError:
return None
if terminal.tag == L0Tags.Word:
return terminal
def nearest_parent(l0, *terminals):
return lowest_common_ancestor(*filter(None, (nearest_word(l0, terminals[0].position, -1),
nearest_word(l0, terminals[-1].position, 1))))
def reattach_punct(l0, l1):
detach_punct(l1)
attach_punct(l0, l1)
def attach_punct(l0, l1):
for terminal in l0.all:
if layer0.is_punct(terminal) and not terminal.incoming:
l1.add_punct(nearest_parent(l0, terminal), terminal)
def detach_punct(l1):
for node in l1.all:
if node.tag == L1Tags.Punctuation:
destroy(node)
def reattach_terminals(l0, l1):
attach_terminals(l0, l1)
for terminal in l0.all:
for edge in terminal.incoming:
if any(e.tag != ETags.Terminal for e in edge.parent):
node = l1.add_fnode(edge.parent, ETags.Center)
if copy_edge(edge, parent=node):
remove(edge.parent, edge)
def attach_terminals(l0, l1):
for terminal in l0.all:
if not terminal.incoming:
node = l1.add_fnode(nearest_parent(l0, terminal), ETags.Function)
node.add(ETags.Terminal, terminal)
def flatten_centers(node):
"""
Whenever there are Cs inside Cs, remove the external C.
Whenever there is a C as an only child, remove it.
"""
if node.tag == L1Tags.Foundational and len(node.centers) == 1:
if node.ftag == ETags.Center and len(fparent(node).centers) == 1: # Center inside center
for edge in node.incoming:
if edge.attrib.get("remote"):
copy_edge(edge, child=node.centers[0])
for edge in node:
copy_edge(edge, parent=fparent(node))
return destroy(node)
elif len(node.children) == 1: # Center as only child
for edge in node.incoming:
attrib = edge.attrib
if node.outgoing[0].attrib.get("remote"):
attrib["remote"] = True
copy_edge(edge, child=node.centers[0], attrib=attrib)
return destroy(node)
return node
def flatten_functions(node):
"""
Whenever there is an F as an only child, remove it. If an F has non-terminal children, move them up.
"""
if node.tag == L1Tags.Foundational and node.incoming: # Avoid creating root->terminal edge
for child in node.functions:
if len(child.children) > len(child.terminals):
for edge in child:
copy_edge(edge, parent=node, tag=ETags.Function if edge.tag == ETags.Center else edge.tag)
destroy(child)
if len(node.functions) == len(node.children) == 1:
for edge in node.incoming:
copy_edge(edge, child=node.functions[0])
return destroy(node)
return node
def flatten_participants(node):
"""
Whenever there is an A as an only child, remove it.
If there is an implicit A in a scene without a main relation, remove it.
"""
if node.tag == L1Tags.Foundational:
participants = node.participants
if len(participants) == len(node.children) == 1 and len(participants[0].ftags) == 1:
for edge in node.incoming:
copy_edge(edge, child=participants[0])
return destroy(node)
elif participants and not node.is_scene():
for child in participants:
if child.attrib.get("implicit"):
destroy(child)
return node
def split_coordinated_main_rel(node, l1):
for edge in node:
attrib = edge.attrib.copy()
if attrib.pop(COORDINATED_MAIN_REL, None):
assert {ETags.Process, ETags.State}.issuperset(edge.tags), \
"%s node without main relation: %s" % (COORDINATED_MAIN_REL, node)
main_rel = edge.child
centers = main_rel.centers
assert centers, "%s node without centers: %s" % (COORDINATED_MAIN_REL, main_rel)
top = fparent(node)
if ETags.ParallelScene in node.ftags:
top.remove(node)
else:
top = node
outgoing = list(node.outgoing)
scenes = []
for center in centers:
main_rel.remove(center)
new_scene = l1.add_fnode(top, ETags.ParallelScene)
copy_edge(edge, parent=new_scene, child=center, attrib=attrib)
for scene_edge in outgoing:
if scene_edge.ID != edge.ID and not (
scenes and NO_MULTIPLE_INCOMING_CATEGORIES.intersection(scene_edge.tags)):
# Not the CMR edge itself, and not a category that does not allow multiple parents
copy_edge(scene_edge, parent=new_scene, attrib={"remote": True} if scenes else None)
scenes.append(new_scene)
for main_rel_edge in list(main_rel.outgoing):
tags = main_rel_edge.tags
copy_edge(main_rel_edge, parent=top if TOP_CATEGORIES.issuperset(tags) else scenes[0],
tag=ETags.Linker if ETags.Connector in main_rel_edge.tags else None)
destroy(main_rel_edge)
for scene_edge in outgoing:
if scene_edge.ID != edge.ID:
destroy(scene_edge)
if main_rel.incoming:
main_rel.destroy()
if not node.incoming:
node.destroy()
return node
def normalize_node(node, l1, extra):
if node.tag == L1Tags.Foundational:
if extra:
replace_edge_tags(node)
move_scene_elements(node)
move_sub_scene_elements(node)
separate_scenes(node, l1, top_level=node in l1.heads)
node = split_coordinated_main_rel(node, l1)
if node is None:
return None
node = flatten_centers(node)
if node is None:
return
node = flatten_functions(node)
if node is None:
return
flatten_participants(node)
def normalize(passage, extra=False):
l0 = passage.layer(layer0.LAYER_ID)
l1 = passage.layer(layer1.LAYER_ID)
reattach_punct(l0, l1)
heads = list(l1.heads)
stack = [heads]
visited = set()
path = []
path_set = set()
while stack:
for edge in stack[-1]:
try:
node = edge.child
except AttributeError:
node = edge
if node in path_set:
destroy(edge)
elif node not in visited:
visited.add(node)
path.append(node)
path_set.add(node)
stack.append(node)
normalize_node(node, l1, extra)
break
else:
if path:
path_set.remove(path.pop())
stack.pop()
reattach_punct(l0, l1)
if extra:
reattach_terminals(l0, l1)