|
|
|
__all__ = ['Serializer', 'SerializerError'] |
|
|
|
from .error import YAMLError |
|
from .events import * |
|
from .nodes import * |
|
|
|
class SerializerError(YAMLError): |
|
pass |
|
|
|
class Serializer: |
|
|
|
ANCHOR_TEMPLATE = 'id%03d' |
|
|
|
def __init__(self, encoding=None, |
|
explicit_start=None, explicit_end=None, version=None, tags=None): |
|
self.use_encoding = encoding |
|
self.use_explicit_start = explicit_start |
|
self.use_explicit_end = explicit_end |
|
self.use_version = version |
|
self.use_tags = tags |
|
self.serialized_nodes = {} |
|
self.anchors = {} |
|
self.last_anchor_id = 0 |
|
self.closed = None |
|
|
|
def open(self): |
|
if self.closed is None: |
|
self.emit(StreamStartEvent(encoding=self.use_encoding)) |
|
self.closed = False |
|
elif self.closed: |
|
raise SerializerError("serializer is closed") |
|
else: |
|
raise SerializerError("serializer is already opened") |
|
|
|
def close(self): |
|
if self.closed is None: |
|
raise SerializerError("serializer is not opened") |
|
elif not self.closed: |
|
self.emit(StreamEndEvent()) |
|
self.closed = True |
|
|
|
|
|
|
|
|
|
def serialize(self, node): |
|
if self.closed is None: |
|
raise SerializerError("serializer is not opened") |
|
elif self.closed: |
|
raise SerializerError("serializer is closed") |
|
self.emit(DocumentStartEvent(explicit=self.use_explicit_start, |
|
version=self.use_version, tags=self.use_tags)) |
|
self.anchor_node(node) |
|
self.serialize_node(node, None, None) |
|
self.emit(DocumentEndEvent(explicit=self.use_explicit_end)) |
|
self.serialized_nodes = {} |
|
self.anchors = {} |
|
self.last_anchor_id = 0 |
|
|
|
def anchor_node(self, node): |
|
if node in self.anchors: |
|
if self.anchors[node] is None: |
|
self.anchors[node] = self.generate_anchor(node) |
|
else: |
|
self.anchors[node] = None |
|
if isinstance(node, SequenceNode): |
|
for item in node.value: |
|
self.anchor_node(item) |
|
elif isinstance(node, MappingNode): |
|
for key, value in node.value: |
|
self.anchor_node(key) |
|
self.anchor_node(value) |
|
|
|
def generate_anchor(self, node): |
|
self.last_anchor_id += 1 |
|
return self.ANCHOR_TEMPLATE % self.last_anchor_id |
|
|
|
def serialize_node(self, node, parent, index): |
|
alias = self.anchors[node] |
|
if node in self.serialized_nodes: |
|
self.emit(AliasEvent(alias)) |
|
else: |
|
self.serialized_nodes[node] = True |
|
self.descend_resolver(parent, index) |
|
if isinstance(node, ScalarNode): |
|
detected_tag = self.resolve(ScalarNode, node.value, (True, False)) |
|
default_tag = self.resolve(ScalarNode, node.value, (False, True)) |
|
implicit = (node.tag == detected_tag), (node.tag == default_tag) |
|
self.emit(ScalarEvent(alias, node.tag, implicit, node.value, |
|
style=node.style)) |
|
elif isinstance(node, SequenceNode): |
|
implicit = (node.tag |
|
== self.resolve(SequenceNode, node.value, True)) |
|
self.emit(SequenceStartEvent(alias, node.tag, implicit, |
|
flow_style=node.flow_style)) |
|
index = 0 |
|
for item in node.value: |
|
self.serialize_node(item, node, index) |
|
index += 1 |
|
self.emit(SequenceEndEvent()) |
|
elif isinstance(node, MappingNode): |
|
implicit = (node.tag |
|
== self.resolve(MappingNode, node.value, True)) |
|
self.emit(MappingStartEvent(alias, node.tag, implicit, |
|
flow_style=node.flow_style)) |
|
for key, value in node.value: |
|
self.serialize_node(key, node, None) |
|
self.serialize_node(value, node, key) |
|
self.emit(MappingEndEvent()) |
|
self.ascend_resolver() |
|
|
|
|