|
"""Implementation of the DOM Level 3 'LS-Load' feature.""" |
|
|
|
import copy |
|
import xml.dom |
|
|
|
from xml.dom.NodeFilter import NodeFilter |
|
|
|
|
|
__all__ = ["DOMBuilder", "DOMEntityResolver", "DOMInputSource"] |
|
|
|
|
|
class Options: |
|
"""Features object that has variables set for each DOMBuilder feature. |
|
|
|
The DOMBuilder class uses an instance of this class to pass settings to |
|
the ExpatBuilder class. |
|
""" |
|
|
|
|
|
|
|
|
|
namespaces = 1 |
|
namespace_declarations = True |
|
validation = False |
|
external_parameter_entities = True |
|
external_general_entities = True |
|
external_dtd_subset = True |
|
validate_if_schema = False |
|
validate = False |
|
datatype_normalization = False |
|
create_entity_ref_nodes = True |
|
entities = True |
|
whitespace_in_element_content = True |
|
cdata_sections = True |
|
comments = True |
|
charset_overrides_xml_encoding = True |
|
infoset = False |
|
supported_mediatypes_only = False |
|
|
|
errorHandler = None |
|
filter = None |
|
|
|
|
|
class DOMBuilder: |
|
entityResolver = None |
|
errorHandler = None |
|
filter = None |
|
|
|
ACTION_REPLACE = 1 |
|
ACTION_APPEND_AS_CHILDREN = 2 |
|
ACTION_INSERT_AFTER = 3 |
|
ACTION_INSERT_BEFORE = 4 |
|
|
|
_legal_actions = (ACTION_REPLACE, ACTION_APPEND_AS_CHILDREN, |
|
ACTION_INSERT_AFTER, ACTION_INSERT_BEFORE) |
|
|
|
def __init__(self): |
|
self._options = Options() |
|
|
|
def _get_entityResolver(self): |
|
return self.entityResolver |
|
def _set_entityResolver(self, entityResolver): |
|
self.entityResolver = entityResolver |
|
|
|
def _get_errorHandler(self): |
|
return self.errorHandler |
|
def _set_errorHandler(self, errorHandler): |
|
self.errorHandler = errorHandler |
|
|
|
def _get_filter(self): |
|
return self.filter |
|
def _set_filter(self, filter): |
|
self.filter = filter |
|
|
|
def setFeature(self, name, state): |
|
if self.supportsFeature(name): |
|
state = state and 1 or 0 |
|
try: |
|
settings = self._settings[(_name_xform(name), state)] |
|
except KeyError: |
|
raise xml.dom.NotSupportedErr( |
|
"unsupported feature: %r" % (name,)) from None |
|
else: |
|
for name, value in settings: |
|
setattr(self._options, name, value) |
|
else: |
|
raise xml.dom.NotFoundErr("unknown feature: " + repr(name)) |
|
|
|
def supportsFeature(self, name): |
|
return hasattr(self._options, _name_xform(name)) |
|
|
|
def canSetFeature(self, name, state): |
|
key = (_name_xform(name), state and 1 or 0) |
|
return key in self._settings |
|
|
|
|
|
|
|
|
|
|
|
|
|
_settings = { |
|
("namespace_declarations", 0): [ |
|
("namespace_declarations", 0)], |
|
("namespace_declarations", 1): [ |
|
("namespace_declarations", 1)], |
|
("validation", 0): [ |
|
("validation", 0)], |
|
("external_general_entities", 0): [ |
|
("external_general_entities", 0)], |
|
("external_general_entities", 1): [ |
|
("external_general_entities", 1)], |
|
("external_parameter_entities", 0): [ |
|
("external_parameter_entities", 0)], |
|
("external_parameter_entities", 1): [ |
|
("external_parameter_entities", 1)], |
|
("validate_if_schema", 0): [ |
|
("validate_if_schema", 0)], |
|
("create_entity_ref_nodes", 0): [ |
|
("create_entity_ref_nodes", 0)], |
|
("create_entity_ref_nodes", 1): [ |
|
("create_entity_ref_nodes", 1)], |
|
("entities", 0): [ |
|
("create_entity_ref_nodes", 0), |
|
("entities", 0)], |
|
("entities", 1): [ |
|
("entities", 1)], |
|
("whitespace_in_element_content", 0): [ |
|
("whitespace_in_element_content", 0)], |
|
("whitespace_in_element_content", 1): [ |
|
("whitespace_in_element_content", 1)], |
|
("cdata_sections", 0): [ |
|
("cdata_sections", 0)], |
|
("cdata_sections", 1): [ |
|
("cdata_sections", 1)], |
|
("comments", 0): [ |
|
("comments", 0)], |
|
("comments", 1): [ |
|
("comments", 1)], |
|
("charset_overrides_xml_encoding", 0): [ |
|
("charset_overrides_xml_encoding", 0)], |
|
("charset_overrides_xml_encoding", 1): [ |
|
("charset_overrides_xml_encoding", 1)], |
|
("infoset", 0): [], |
|
("infoset", 1): [ |
|
("namespace_declarations", 0), |
|
("validate_if_schema", 0), |
|
("create_entity_ref_nodes", 0), |
|
("entities", 0), |
|
("cdata_sections", 0), |
|
("datatype_normalization", 1), |
|
("whitespace_in_element_content", 1), |
|
("comments", 1), |
|
("charset_overrides_xml_encoding", 1)], |
|
("supported_mediatypes_only", 0): [ |
|
("supported_mediatypes_only", 0)], |
|
("namespaces", 0): [ |
|
("namespaces", 0)], |
|
("namespaces", 1): [ |
|
("namespaces", 1)], |
|
} |
|
|
|
def getFeature(self, name): |
|
xname = _name_xform(name) |
|
try: |
|
return getattr(self._options, xname) |
|
except AttributeError: |
|
if name == "infoset": |
|
options = self._options |
|
return (options.datatype_normalization |
|
and options.whitespace_in_element_content |
|
and options.comments |
|
and options.charset_overrides_xml_encoding |
|
and not (options.namespace_declarations |
|
or options.validate_if_schema |
|
or options.create_entity_ref_nodes |
|
or options.entities |
|
or options.cdata_sections)) |
|
raise xml.dom.NotFoundErr("feature %s not known" % repr(name)) |
|
|
|
def parseURI(self, uri): |
|
if self.entityResolver: |
|
input = self.entityResolver.resolveEntity(None, uri) |
|
else: |
|
input = DOMEntityResolver().resolveEntity(None, uri) |
|
return self.parse(input) |
|
|
|
def parse(self, input): |
|
options = copy.copy(self._options) |
|
options.filter = self.filter |
|
options.errorHandler = self.errorHandler |
|
fp = input.byteStream |
|
if fp is None and options.systemId: |
|
import urllib.request |
|
fp = urllib.request.urlopen(input.systemId) |
|
return self._parse_bytestream(fp, options) |
|
|
|
def parseWithContext(self, input, cnode, action): |
|
if action not in self._legal_actions: |
|
raise ValueError("not a legal action") |
|
raise NotImplementedError("Haven't written this yet...") |
|
|
|
def _parse_bytestream(self, stream, options): |
|
import xml.dom.expatbuilder |
|
builder = xml.dom.expatbuilder.makeBuilder(options) |
|
return builder.parseFile(stream) |
|
|
|
|
|
def _name_xform(name): |
|
return name.lower().replace('-', '_') |
|
|
|
|
|
class DOMEntityResolver(object): |
|
__slots__ = '_opener', |
|
|
|
def resolveEntity(self, publicId, systemId): |
|
assert systemId is not None |
|
source = DOMInputSource() |
|
source.publicId = publicId |
|
source.systemId = systemId |
|
source.byteStream = self._get_opener().open(systemId) |
|
|
|
|
|
source.encoding = self._guess_media_encoding(source) |
|
|
|
|
|
import posixpath, urllib.parse |
|
parts = urllib.parse.urlparse(systemId) |
|
scheme, netloc, path, params, query, fragment = parts |
|
|
|
if path and not path.endswith("/"): |
|
path = posixpath.dirname(path) + "/" |
|
parts = scheme, netloc, path, params, query, fragment |
|
source.baseURI = urllib.parse.urlunparse(parts) |
|
|
|
return source |
|
|
|
def _get_opener(self): |
|
try: |
|
return self._opener |
|
except AttributeError: |
|
self._opener = self._create_opener() |
|
return self._opener |
|
|
|
def _create_opener(self): |
|
import urllib.request |
|
return urllib.request.build_opener() |
|
|
|
def _guess_media_encoding(self, source): |
|
info = source.byteStream.info() |
|
if "Content-Type" in info: |
|
for param in info.getplist(): |
|
if param.startswith("charset="): |
|
return param.split("=", 1)[1].lower() |
|
|
|
|
|
class DOMInputSource(object): |
|
__slots__ = ('byteStream', 'characterStream', 'stringData', |
|
'encoding', 'publicId', 'systemId', 'baseURI') |
|
|
|
def __init__(self): |
|
self.byteStream = None |
|
self.characterStream = None |
|
self.stringData = None |
|
self.encoding = None |
|
self.publicId = None |
|
self.systemId = None |
|
self.baseURI = None |
|
|
|
def _get_byteStream(self): |
|
return self.byteStream |
|
def _set_byteStream(self, byteStream): |
|
self.byteStream = byteStream |
|
|
|
def _get_characterStream(self): |
|
return self.characterStream |
|
def _set_characterStream(self, characterStream): |
|
self.characterStream = characterStream |
|
|
|
def _get_stringData(self): |
|
return self.stringData |
|
def _set_stringData(self, data): |
|
self.stringData = data |
|
|
|
def _get_encoding(self): |
|
return self.encoding |
|
def _set_encoding(self, encoding): |
|
self.encoding = encoding |
|
|
|
def _get_publicId(self): |
|
return self.publicId |
|
def _set_publicId(self, publicId): |
|
self.publicId = publicId |
|
|
|
def _get_systemId(self): |
|
return self.systemId |
|
def _set_systemId(self, systemId): |
|
self.systemId = systemId |
|
|
|
def _get_baseURI(self): |
|
return self.baseURI |
|
def _set_baseURI(self, uri): |
|
self.baseURI = uri |
|
|
|
|
|
class DOMBuilderFilter: |
|
"""Element filter which can be used to tailor construction of |
|
a DOM instance. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
FILTER_ACCEPT = 1 |
|
FILTER_REJECT = 2 |
|
FILTER_SKIP = 3 |
|
FILTER_INTERRUPT = 4 |
|
|
|
whatToShow = NodeFilter.SHOW_ALL |
|
|
|
def _get_whatToShow(self): |
|
return self.whatToShow |
|
|
|
def acceptNode(self, element): |
|
return self.FILTER_ACCEPT |
|
|
|
def startContainer(self, element): |
|
return self.FILTER_ACCEPT |
|
|
|
del NodeFilter |
|
|
|
|
|
class DocumentLS: |
|
"""Mixin to create documents that conform to the load/save spec.""" |
|
|
|
async_ = False |
|
|
|
def _get_async(self): |
|
return False |
|
|
|
def _set_async(self, flag): |
|
if flag: |
|
raise xml.dom.NotSupportedErr( |
|
"asynchronous document loading is not supported") |
|
|
|
def abort(self): |
|
|
|
|
|
raise NotImplementedError( |
|
"haven't figured out what this means yet") |
|
|
|
def load(self, uri): |
|
raise NotImplementedError("haven't written this yet") |
|
|
|
def loadXML(self, source): |
|
raise NotImplementedError("haven't written this yet") |
|
|
|
def saveXML(self, snode): |
|
if snode is None: |
|
snode = self |
|
elif snode.ownerDocument is not self: |
|
raise xml.dom.WrongDocumentErr() |
|
return snode.toxml() |
|
|
|
|
|
class DOMImplementationLS: |
|
MODE_SYNCHRONOUS = 1 |
|
MODE_ASYNCHRONOUS = 2 |
|
|
|
def createDOMBuilder(self, mode, schemaType): |
|
if schemaType is not None: |
|
raise xml.dom.NotSupportedErr( |
|
"schemaType not yet supported") |
|
if mode == self.MODE_SYNCHRONOUS: |
|
return DOMBuilder() |
|
if mode == self.MODE_ASYNCHRONOUS: |
|
raise xml.dom.NotSupportedErr( |
|
"asynchronous builders are not supported") |
|
raise ValueError("unknown value for mode") |
|
|
|
def createDOMWriter(self): |
|
raise NotImplementedError( |
|
"the writer interface hasn't been written yet!") |
|
|
|
def createDOMInputSource(self): |
|
return DOMInputSource() |
|
|