|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
__all__ = ['Scanner', 'ScannerError'] |
|
|
|
from .error import MarkedYAMLError |
|
from .tokens import * |
|
|
|
class ScannerError(MarkedYAMLError): |
|
pass |
|
|
|
class SimpleKey: |
|
|
|
|
|
def __init__(self, token_number, required, index, line, column, mark): |
|
self.token_number = token_number |
|
self.required = required |
|
self.index = index |
|
self.line = line |
|
self.column = column |
|
self.mark = mark |
|
|
|
class Scanner: |
|
|
|
def __init__(self): |
|
"""Initialize the scanner.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.done = False |
|
|
|
|
|
|
|
self.flow_level = 0 |
|
|
|
|
|
self.tokens = [] |
|
|
|
|
|
self.fetch_stream_start() |
|
|
|
|
|
self.tokens_taken = 0 |
|
|
|
|
|
self.indent = -1 |
|
|
|
|
|
self.indents = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.allow_simple_key = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.possible_simple_keys = {} |
|
|
|
|
|
|
|
def check_token(self, *choices): |
|
|
|
while self.need_more_tokens(): |
|
self.fetch_more_tokens() |
|
if self.tokens: |
|
if not choices: |
|
return True |
|
for choice in choices: |
|
if isinstance(self.tokens[0], choice): |
|
return True |
|
return False |
|
|
|
def peek_token(self): |
|
|
|
|
|
while self.need_more_tokens(): |
|
self.fetch_more_tokens() |
|
if self.tokens: |
|
return self.tokens[0] |
|
else: |
|
return None |
|
|
|
def get_token(self): |
|
|
|
while self.need_more_tokens(): |
|
self.fetch_more_tokens() |
|
if self.tokens: |
|
self.tokens_taken += 1 |
|
return self.tokens.pop(0) |
|
|
|
|
|
|
|
def need_more_tokens(self): |
|
if self.done: |
|
return False |
|
if not self.tokens: |
|
return True |
|
|
|
|
|
self.stale_possible_simple_keys() |
|
if self.next_possible_simple_key() == self.tokens_taken: |
|
return True |
|
|
|
def fetch_more_tokens(self): |
|
|
|
|
|
self.scan_to_next_token() |
|
|
|
|
|
self.stale_possible_simple_keys() |
|
|
|
|
|
|
|
self.unwind_indent(self.column) |
|
|
|
|
|
ch = self.peek() |
|
|
|
|
|
if ch == '\0': |
|
return self.fetch_stream_end() |
|
|
|
|
|
if ch == '%' and self.check_directive(): |
|
return self.fetch_directive() |
|
|
|
|
|
if ch == '-' and self.check_document_start(): |
|
return self.fetch_document_start() |
|
|
|
|
|
if ch == '.' and self.check_document_end(): |
|
return self.fetch_document_end() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if ch == '[': |
|
return self.fetch_flow_sequence_start() |
|
|
|
|
|
if ch == '{': |
|
return self.fetch_flow_mapping_start() |
|
|
|
|
|
if ch == ']': |
|
return self.fetch_flow_sequence_end() |
|
|
|
|
|
if ch == '}': |
|
return self.fetch_flow_mapping_end() |
|
|
|
|
|
if ch == ',': |
|
return self.fetch_flow_entry() |
|
|
|
|
|
if ch == '-' and self.check_block_entry(): |
|
return self.fetch_block_entry() |
|
|
|
|
|
if ch == '?' and self.check_key(): |
|
return self.fetch_key() |
|
|
|
|
|
if ch == ':' and self.check_value(): |
|
return self.fetch_value() |
|
|
|
|
|
if ch == '*': |
|
return self.fetch_alias() |
|
|
|
|
|
if ch == '&': |
|
return self.fetch_anchor() |
|
|
|
|
|
if ch == '!': |
|
return self.fetch_tag() |
|
|
|
|
|
if ch == '|' and not self.flow_level: |
|
return self.fetch_literal() |
|
|
|
|
|
if ch == '>' and not self.flow_level: |
|
return self.fetch_folded() |
|
|
|
|
|
if ch == '\'': |
|
return self.fetch_single() |
|
|
|
|
|
if ch == '\"': |
|
return self.fetch_double() |
|
|
|
|
|
if self.check_plain(): |
|
return self.fetch_plain() |
|
|
|
|
|
raise ScannerError("while scanning for the next token", None, |
|
"found character %r that cannot start any token" % ch, |
|
self.get_mark()) |
|
|
|
|
|
|
|
def next_possible_simple_key(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
min_token_number = None |
|
for level in self.possible_simple_keys: |
|
key = self.possible_simple_keys[level] |
|
if min_token_number is None or key.token_number < min_token_number: |
|
min_token_number = key.token_number |
|
return min_token_number |
|
|
|
def stale_possible_simple_keys(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
for level in list(self.possible_simple_keys): |
|
key = self.possible_simple_keys[level] |
|
if key.line != self.line \ |
|
or self.index-key.index > 1024: |
|
if key.required: |
|
raise ScannerError("while scanning a simple key", key.mark, |
|
"could not find expected ':'", self.get_mark()) |
|
del self.possible_simple_keys[level] |
|
|
|
def save_possible_simple_key(self): |
|
|
|
|
|
|
|
|
|
|
|
required = not self.flow_level and self.indent == self.column |
|
|
|
|
|
|
|
if self.allow_simple_key: |
|
self.remove_possible_simple_key() |
|
token_number = self.tokens_taken+len(self.tokens) |
|
key = SimpleKey(token_number, required, |
|
self.index, self.line, self.column, self.get_mark()) |
|
self.possible_simple_keys[self.flow_level] = key |
|
|
|
def remove_possible_simple_key(self): |
|
|
|
if self.flow_level in self.possible_simple_keys: |
|
key = self.possible_simple_keys[self.flow_level] |
|
|
|
if key.required: |
|
raise ScannerError("while scanning a simple key", key.mark, |
|
"could not find expected ':'", self.get_mark()) |
|
|
|
del self.possible_simple_keys[self.flow_level] |
|
|
|
|
|
|
|
def unwind_indent(self, column): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.flow_level: |
|
return |
|
|
|
|
|
while self.indent > column: |
|
mark = self.get_mark() |
|
self.indent = self.indents.pop() |
|
self.tokens.append(BlockEndToken(mark, mark)) |
|
|
|
def add_indent(self, column): |
|
|
|
if self.indent < column: |
|
self.indents.append(self.indent) |
|
self.indent = column |
|
return True |
|
return False |
|
|
|
|
|
|
|
def fetch_stream_start(self): |
|
|
|
|
|
|
|
|
|
mark = self.get_mark() |
|
|
|
|
|
self.tokens.append(StreamStartToken(mark, mark, |
|
encoding=self.encoding)) |
|
|
|
|
|
def fetch_stream_end(self): |
|
|
|
|
|
self.unwind_indent(-1) |
|
|
|
|
|
self.remove_possible_simple_key() |
|
self.allow_simple_key = False |
|
self.possible_simple_keys = {} |
|
|
|
|
|
mark = self.get_mark() |
|
|
|
|
|
self.tokens.append(StreamEndToken(mark, mark)) |
|
|
|
|
|
self.done = True |
|
|
|
def fetch_directive(self): |
|
|
|
|
|
self.unwind_indent(-1) |
|
|
|
|
|
self.remove_possible_simple_key() |
|
self.allow_simple_key = False |
|
|
|
|
|
self.tokens.append(self.scan_directive()) |
|
|
|
def fetch_document_start(self): |
|
self.fetch_document_indicator(DocumentStartToken) |
|
|
|
def fetch_document_end(self): |
|
self.fetch_document_indicator(DocumentEndToken) |
|
|
|
def fetch_document_indicator(self, TokenClass): |
|
|
|
|
|
self.unwind_indent(-1) |
|
|
|
|
|
|
|
self.remove_possible_simple_key() |
|
self.allow_simple_key = False |
|
|
|
|
|
start_mark = self.get_mark() |
|
self.forward(3) |
|
end_mark = self.get_mark() |
|
self.tokens.append(TokenClass(start_mark, end_mark)) |
|
|
|
def fetch_flow_sequence_start(self): |
|
self.fetch_flow_collection_start(FlowSequenceStartToken) |
|
|
|
def fetch_flow_mapping_start(self): |
|
self.fetch_flow_collection_start(FlowMappingStartToken) |
|
|
|
def fetch_flow_collection_start(self, TokenClass): |
|
|
|
|
|
self.save_possible_simple_key() |
|
|
|
|
|
self.flow_level += 1 |
|
|
|
|
|
self.allow_simple_key = True |
|
|
|
|
|
start_mark = self.get_mark() |
|
self.forward() |
|
end_mark = self.get_mark() |
|
self.tokens.append(TokenClass(start_mark, end_mark)) |
|
|
|
def fetch_flow_sequence_end(self): |
|
self.fetch_flow_collection_end(FlowSequenceEndToken) |
|
|
|
def fetch_flow_mapping_end(self): |
|
self.fetch_flow_collection_end(FlowMappingEndToken) |
|
|
|
def fetch_flow_collection_end(self, TokenClass): |
|
|
|
|
|
self.remove_possible_simple_key() |
|
|
|
|
|
self.flow_level -= 1 |
|
|
|
|
|
self.allow_simple_key = False |
|
|
|
|
|
start_mark = self.get_mark() |
|
self.forward() |
|
end_mark = self.get_mark() |
|
self.tokens.append(TokenClass(start_mark, end_mark)) |
|
|
|
def fetch_flow_entry(self): |
|
|
|
|
|
self.allow_simple_key = True |
|
|
|
|
|
self.remove_possible_simple_key() |
|
|
|
|
|
start_mark = self.get_mark() |
|
self.forward() |
|
end_mark = self.get_mark() |
|
self.tokens.append(FlowEntryToken(start_mark, end_mark)) |
|
|
|
def fetch_block_entry(self): |
|
|
|
|
|
if not self.flow_level: |
|
|
|
|
|
if not self.allow_simple_key: |
|
raise ScannerError(None, None, |
|
"sequence entries are not allowed here", |
|
self.get_mark()) |
|
|
|
|
|
if self.add_indent(self.column): |
|
mark = self.get_mark() |
|
self.tokens.append(BlockSequenceStartToken(mark, mark)) |
|
|
|
|
|
|
|
else: |
|
pass |
|
|
|
|
|
self.allow_simple_key = True |
|
|
|
|
|
self.remove_possible_simple_key() |
|
|
|
|
|
start_mark = self.get_mark() |
|
self.forward() |
|
end_mark = self.get_mark() |
|
self.tokens.append(BlockEntryToken(start_mark, end_mark)) |
|
|
|
def fetch_key(self): |
|
|
|
|
|
if not self.flow_level: |
|
|
|
|
|
if not self.allow_simple_key: |
|
raise ScannerError(None, None, |
|
"mapping keys are not allowed here", |
|
self.get_mark()) |
|
|
|
|
|
if self.add_indent(self.column): |
|
mark = self.get_mark() |
|
self.tokens.append(BlockMappingStartToken(mark, mark)) |
|
|
|
|
|
self.allow_simple_key = not self.flow_level |
|
|
|
|
|
self.remove_possible_simple_key() |
|
|
|
|
|
start_mark = self.get_mark() |
|
self.forward() |
|
end_mark = self.get_mark() |
|
self.tokens.append(KeyToken(start_mark, end_mark)) |
|
|
|
def fetch_value(self): |
|
|
|
|
|
if self.flow_level in self.possible_simple_keys: |
|
|
|
|
|
key = self.possible_simple_keys[self.flow_level] |
|
del self.possible_simple_keys[self.flow_level] |
|
self.tokens.insert(key.token_number-self.tokens_taken, |
|
KeyToken(key.mark, key.mark)) |
|
|
|
|
|
|
|
if not self.flow_level: |
|
if self.add_indent(key.column): |
|
self.tokens.insert(key.token_number-self.tokens_taken, |
|
BlockMappingStartToken(key.mark, key.mark)) |
|
|
|
|
|
self.allow_simple_key = False |
|
|
|
|
|
else: |
|
|
|
|
|
|
|
|
|
if not self.flow_level: |
|
|
|
|
|
|
|
if not self.allow_simple_key: |
|
raise ScannerError(None, None, |
|
"mapping values are not allowed here", |
|
self.get_mark()) |
|
|
|
|
|
|
|
|
|
if not self.flow_level: |
|
if self.add_indent(self.column): |
|
mark = self.get_mark() |
|
self.tokens.append(BlockMappingStartToken(mark, mark)) |
|
|
|
|
|
self.allow_simple_key = not self.flow_level |
|
|
|
|
|
self.remove_possible_simple_key() |
|
|
|
|
|
start_mark = self.get_mark() |
|
self.forward() |
|
end_mark = self.get_mark() |
|
self.tokens.append(ValueToken(start_mark, end_mark)) |
|
|
|
def fetch_alias(self): |
|
|
|
|
|
self.save_possible_simple_key() |
|
|
|
|
|
self.allow_simple_key = False |
|
|
|
|
|
self.tokens.append(self.scan_anchor(AliasToken)) |
|
|
|
def fetch_anchor(self): |
|
|
|
|
|
self.save_possible_simple_key() |
|
|
|
|
|
self.allow_simple_key = False |
|
|
|
|
|
self.tokens.append(self.scan_anchor(AnchorToken)) |
|
|
|
def fetch_tag(self): |
|
|
|
|
|
self.save_possible_simple_key() |
|
|
|
|
|
self.allow_simple_key = False |
|
|
|
|
|
self.tokens.append(self.scan_tag()) |
|
|
|
def fetch_literal(self): |
|
self.fetch_block_scalar(style='|') |
|
|
|
def fetch_folded(self): |
|
self.fetch_block_scalar(style='>') |
|
|
|
def fetch_block_scalar(self, style): |
|
|
|
|
|
self.allow_simple_key = True |
|
|
|
|
|
self.remove_possible_simple_key() |
|
|
|
|
|
self.tokens.append(self.scan_block_scalar(style)) |
|
|
|
def fetch_single(self): |
|
self.fetch_flow_scalar(style='\'') |
|
|
|
def fetch_double(self): |
|
self.fetch_flow_scalar(style='"') |
|
|
|
def fetch_flow_scalar(self, style): |
|
|
|
|
|
self.save_possible_simple_key() |
|
|
|
|
|
self.allow_simple_key = False |
|
|
|
|
|
self.tokens.append(self.scan_flow_scalar(style)) |
|
|
|
def fetch_plain(self): |
|
|
|
|
|
self.save_possible_simple_key() |
|
|
|
|
|
|
|
|
|
self.allow_simple_key = False |
|
|
|
|
|
self.tokens.append(self.scan_plain()) |
|
|
|
|
|
|
|
def check_directive(self): |
|
|
|
|
|
|
|
if self.column == 0: |
|
return True |
|
|
|
def check_document_start(self): |
|
|
|
|
|
if self.column == 0: |
|
if self.prefix(3) == '---' \ |
|
and self.peek(3) in '\0 \t\r\n\x85\u2028\u2029': |
|
return True |
|
|
|
def check_document_end(self): |
|
|
|
|
|
if self.column == 0: |
|
if self.prefix(3) == '...' \ |
|
and self.peek(3) in '\0 \t\r\n\x85\u2028\u2029': |
|
return True |
|
|
|
def check_block_entry(self): |
|
|
|
|
|
return self.peek(1) in '\0 \t\r\n\x85\u2028\u2029' |
|
|
|
def check_key(self): |
|
|
|
|
|
if self.flow_level: |
|
return True |
|
|
|
|
|
else: |
|
return self.peek(1) in '\0 \t\r\n\x85\u2028\u2029' |
|
|
|
def check_value(self): |
|
|
|
|
|
if self.flow_level: |
|
return True |
|
|
|
|
|
else: |
|
return self.peek(1) in '\0 \t\r\n\x85\u2028\u2029' |
|
|
|
def check_plain(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ch = self.peek() |
|
return ch not in '\0 \t\r\n\x85\u2028\u2029-?:,[]{}#&*!|>\'\"%@`' \ |
|
or (self.peek(1) not in '\0 \t\r\n\x85\u2028\u2029' |
|
and (ch == '-' or (not self.flow_level and ch in '?:'))) |
|
|
|
|
|
|
|
def scan_to_next_token(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.index == 0 and self.peek() == '\uFEFF': |
|
self.forward() |
|
found = False |
|
while not found: |
|
while self.peek() == ' ': |
|
self.forward() |
|
if self.peek() == '#': |
|
while self.peek() not in '\0\r\n\x85\u2028\u2029': |
|
self.forward() |
|
if self.scan_line_break(): |
|
if not self.flow_level: |
|
self.allow_simple_key = True |
|
else: |
|
found = True |
|
|
|
def scan_directive(self): |
|
|
|
start_mark = self.get_mark() |
|
self.forward() |
|
name = self.scan_directive_name(start_mark) |
|
value = None |
|
if name == 'YAML': |
|
value = self.scan_yaml_directive_value(start_mark) |
|
end_mark = self.get_mark() |
|
elif name == 'TAG': |
|
value = self.scan_tag_directive_value(start_mark) |
|
end_mark = self.get_mark() |
|
else: |
|
end_mark = self.get_mark() |
|
while self.peek() not in '\0\r\n\x85\u2028\u2029': |
|
self.forward() |
|
self.scan_directive_ignored_line(start_mark) |
|
return DirectiveToken(name, value, start_mark, end_mark) |
|
|
|
def scan_directive_name(self, start_mark): |
|
|
|
length = 0 |
|
ch = self.peek(length) |
|
while '0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' \ |
|
or ch in '-_': |
|
length += 1 |
|
ch = self.peek(length) |
|
if not length: |
|
raise ScannerError("while scanning a directive", start_mark, |
|
"expected alphabetic or numeric character, but found %r" |
|
% ch, self.get_mark()) |
|
value = self.prefix(length) |
|
self.forward(length) |
|
ch = self.peek() |
|
if ch not in '\0 \r\n\x85\u2028\u2029': |
|
raise ScannerError("while scanning a directive", start_mark, |
|
"expected alphabetic or numeric character, but found %r" |
|
% ch, self.get_mark()) |
|
return value |
|
|
|
def scan_yaml_directive_value(self, start_mark): |
|
|
|
while self.peek() == ' ': |
|
self.forward() |
|
major = self.scan_yaml_directive_number(start_mark) |
|
if self.peek() != '.': |
|
raise ScannerError("while scanning a directive", start_mark, |
|
"expected a digit or '.', but found %r" % self.peek(), |
|
self.get_mark()) |
|
self.forward() |
|
minor = self.scan_yaml_directive_number(start_mark) |
|
if self.peek() not in '\0 \r\n\x85\u2028\u2029': |
|
raise ScannerError("while scanning a directive", start_mark, |
|
"expected a digit or ' ', but found %r" % self.peek(), |
|
self.get_mark()) |
|
return (major, minor) |
|
|
|
def scan_yaml_directive_number(self, start_mark): |
|
|
|
ch = self.peek() |
|
if not ('0' <= ch <= '9'): |
|
raise ScannerError("while scanning a directive", start_mark, |
|
"expected a digit, but found %r" % ch, self.get_mark()) |
|
length = 0 |
|
while '0' <= self.peek(length) <= '9': |
|
length += 1 |
|
value = int(self.prefix(length)) |
|
self.forward(length) |
|
return value |
|
|
|
def scan_tag_directive_value(self, start_mark): |
|
|
|
while self.peek() == ' ': |
|
self.forward() |
|
handle = self.scan_tag_directive_handle(start_mark) |
|
while self.peek() == ' ': |
|
self.forward() |
|
prefix = self.scan_tag_directive_prefix(start_mark) |
|
return (handle, prefix) |
|
|
|
def scan_tag_directive_handle(self, start_mark): |
|
|
|
value = self.scan_tag_handle('directive', start_mark) |
|
ch = self.peek() |
|
if ch != ' ': |
|
raise ScannerError("while scanning a directive", start_mark, |
|
"expected ' ', but found %r" % ch, self.get_mark()) |
|
return value |
|
|
|
def scan_tag_directive_prefix(self, start_mark): |
|
|
|
value = self.scan_tag_uri('directive', start_mark) |
|
ch = self.peek() |
|
if ch not in '\0 \r\n\x85\u2028\u2029': |
|
raise ScannerError("while scanning a directive", start_mark, |
|
"expected ' ', but found %r" % ch, self.get_mark()) |
|
return value |
|
|
|
def scan_directive_ignored_line(self, start_mark): |
|
|
|
while self.peek() == ' ': |
|
self.forward() |
|
if self.peek() == '#': |
|
while self.peek() not in '\0\r\n\x85\u2028\u2029': |
|
self.forward() |
|
ch = self.peek() |
|
if ch not in '\0\r\n\x85\u2028\u2029': |
|
raise ScannerError("while scanning a directive", start_mark, |
|
"expected a comment or a line break, but found %r" |
|
% ch, self.get_mark()) |
|
self.scan_line_break() |
|
|
|
def scan_anchor(self, TokenClass): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
start_mark = self.get_mark() |
|
indicator = self.peek() |
|
if indicator == '*': |
|
name = 'alias' |
|
else: |
|
name = 'anchor' |
|
self.forward() |
|
length = 0 |
|
ch = self.peek(length) |
|
while '0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' \ |
|
or ch in '-_': |
|
length += 1 |
|
ch = self.peek(length) |
|
if not length: |
|
raise ScannerError("while scanning an %s" % name, start_mark, |
|
"expected alphabetic or numeric character, but found %r" |
|
% ch, self.get_mark()) |
|
value = self.prefix(length) |
|
self.forward(length) |
|
ch = self.peek() |
|
if ch not in '\0 \t\r\n\x85\u2028\u2029?:,]}%@`': |
|
raise ScannerError("while scanning an %s" % name, start_mark, |
|
"expected alphabetic or numeric character, but found %r" |
|
% ch, self.get_mark()) |
|
end_mark = self.get_mark() |
|
return TokenClass(value, start_mark, end_mark) |
|
|
|
def scan_tag(self): |
|
|
|
start_mark = self.get_mark() |
|
ch = self.peek(1) |
|
if ch == '<': |
|
handle = None |
|
self.forward(2) |
|
suffix = self.scan_tag_uri('tag', start_mark) |
|
if self.peek() != '>': |
|
raise ScannerError("while parsing a tag", start_mark, |
|
"expected '>', but found %r" % self.peek(), |
|
self.get_mark()) |
|
self.forward() |
|
elif ch in '\0 \t\r\n\x85\u2028\u2029': |
|
handle = None |
|
suffix = '!' |
|
self.forward() |
|
else: |
|
length = 1 |
|
use_handle = False |
|
while ch not in '\0 \r\n\x85\u2028\u2029': |
|
if ch == '!': |
|
use_handle = True |
|
break |
|
length += 1 |
|
ch = self.peek(length) |
|
handle = '!' |
|
if use_handle: |
|
handle = self.scan_tag_handle('tag', start_mark) |
|
else: |
|
handle = '!' |
|
self.forward() |
|
suffix = self.scan_tag_uri('tag', start_mark) |
|
ch = self.peek() |
|
if ch not in '\0 \r\n\x85\u2028\u2029': |
|
raise ScannerError("while scanning a tag", start_mark, |
|
"expected ' ', but found %r" % ch, self.get_mark()) |
|
value = (handle, suffix) |
|
end_mark = self.get_mark() |
|
return TagToken(value, start_mark, end_mark) |
|
|
|
def scan_block_scalar(self, style): |
|
|
|
|
|
if style == '>': |
|
folded = True |
|
else: |
|
folded = False |
|
|
|
chunks = [] |
|
start_mark = self.get_mark() |
|
|
|
|
|
self.forward() |
|
chomping, increment = self.scan_block_scalar_indicators(start_mark) |
|
self.scan_block_scalar_ignored_line(start_mark) |
|
|
|
|
|
min_indent = self.indent+1 |
|
if min_indent < 1: |
|
min_indent = 1 |
|
if increment is None: |
|
breaks, max_indent, end_mark = self.scan_block_scalar_indentation() |
|
indent = max(min_indent, max_indent) |
|
else: |
|
indent = min_indent+increment-1 |
|
breaks, end_mark = self.scan_block_scalar_breaks(indent) |
|
line_break = '' |
|
|
|
|
|
while self.column == indent and self.peek() != '\0': |
|
chunks.extend(breaks) |
|
leading_non_space = self.peek() not in ' \t' |
|
length = 0 |
|
while self.peek(length) not in '\0\r\n\x85\u2028\u2029': |
|
length += 1 |
|
chunks.append(self.prefix(length)) |
|
self.forward(length) |
|
line_break = self.scan_line_break() |
|
breaks, end_mark = self.scan_block_scalar_breaks(indent) |
|
if self.column == indent and self.peek() != '\0': |
|
|
|
|
|
|
|
|
|
|
|
if folded and line_break == '\n' \ |
|
and leading_non_space and self.peek() not in ' \t': |
|
if not breaks: |
|
chunks.append(' ') |
|
else: |
|
chunks.append(line_break) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
break |
|
|
|
|
|
if chomping is not False: |
|
chunks.append(line_break) |
|
if chomping is True: |
|
chunks.extend(breaks) |
|
|
|
|
|
return ScalarToken(''.join(chunks), False, start_mark, end_mark, |
|
style) |
|
|
|
def scan_block_scalar_indicators(self, start_mark): |
|
|
|
chomping = None |
|
increment = None |
|
ch = self.peek() |
|
if ch in '+-': |
|
if ch == '+': |
|
chomping = True |
|
else: |
|
chomping = False |
|
self.forward() |
|
ch = self.peek() |
|
if ch in '0123456789': |
|
increment = int(ch) |
|
if increment == 0: |
|
raise ScannerError("while scanning a block scalar", start_mark, |
|
"expected indentation indicator in the range 1-9, but found 0", |
|
self.get_mark()) |
|
self.forward() |
|
elif ch in '0123456789': |
|
increment = int(ch) |
|
if increment == 0: |
|
raise ScannerError("while scanning a block scalar", start_mark, |
|
"expected indentation indicator in the range 1-9, but found 0", |
|
self.get_mark()) |
|
self.forward() |
|
ch = self.peek() |
|
if ch in '+-': |
|
if ch == '+': |
|
chomping = True |
|
else: |
|
chomping = False |
|
self.forward() |
|
ch = self.peek() |
|
if ch not in '\0 \r\n\x85\u2028\u2029': |
|
raise ScannerError("while scanning a block scalar", start_mark, |
|
"expected chomping or indentation indicators, but found %r" |
|
% ch, self.get_mark()) |
|
return chomping, increment |
|
|
|
def scan_block_scalar_ignored_line(self, start_mark): |
|
|
|
while self.peek() == ' ': |
|
self.forward() |
|
if self.peek() == '#': |
|
while self.peek() not in '\0\r\n\x85\u2028\u2029': |
|
self.forward() |
|
ch = self.peek() |
|
if ch not in '\0\r\n\x85\u2028\u2029': |
|
raise ScannerError("while scanning a block scalar", start_mark, |
|
"expected a comment or a line break, but found %r" % ch, |
|
self.get_mark()) |
|
self.scan_line_break() |
|
|
|
def scan_block_scalar_indentation(self): |
|
|
|
chunks = [] |
|
max_indent = 0 |
|
end_mark = self.get_mark() |
|
while self.peek() in ' \r\n\x85\u2028\u2029': |
|
if self.peek() != ' ': |
|
chunks.append(self.scan_line_break()) |
|
end_mark = self.get_mark() |
|
else: |
|
self.forward() |
|
if self.column > max_indent: |
|
max_indent = self.column |
|
return chunks, max_indent, end_mark |
|
|
|
def scan_block_scalar_breaks(self, indent): |
|
|
|
chunks = [] |
|
end_mark = self.get_mark() |
|
while self.column < indent and self.peek() == ' ': |
|
self.forward() |
|
while self.peek() in '\r\n\x85\u2028\u2029': |
|
chunks.append(self.scan_line_break()) |
|
end_mark = self.get_mark() |
|
while self.column < indent and self.peek() == ' ': |
|
self.forward() |
|
return chunks, end_mark |
|
|
|
def scan_flow_scalar(self, style): |
|
|
|
|
|
|
|
|
|
|
|
|
|
if style == '"': |
|
double = True |
|
else: |
|
double = False |
|
chunks = [] |
|
start_mark = self.get_mark() |
|
quote = self.peek() |
|
self.forward() |
|
chunks.extend(self.scan_flow_scalar_non_spaces(double, start_mark)) |
|
while self.peek() != quote: |
|
chunks.extend(self.scan_flow_scalar_spaces(double, start_mark)) |
|
chunks.extend(self.scan_flow_scalar_non_spaces(double, start_mark)) |
|
self.forward() |
|
end_mark = self.get_mark() |
|
return ScalarToken(''.join(chunks), False, start_mark, end_mark, |
|
style) |
|
|
|
ESCAPE_REPLACEMENTS = { |
|
'0': '\0', |
|
'a': '\x07', |
|
'b': '\x08', |
|
't': '\x09', |
|
'\t': '\x09', |
|
'n': '\x0A', |
|
'v': '\x0B', |
|
'f': '\x0C', |
|
'r': '\x0D', |
|
'e': '\x1B', |
|
' ': '\x20', |
|
'\"': '\"', |
|
'\\': '\\', |
|
'/': '/', |
|
'N': '\x85', |
|
'_': '\xA0', |
|
'L': '\u2028', |
|
'P': '\u2029', |
|
} |
|
|
|
ESCAPE_CODES = { |
|
'x': 2, |
|
'u': 4, |
|
'U': 8, |
|
} |
|
|
|
def scan_flow_scalar_non_spaces(self, double, start_mark): |
|
|
|
chunks = [] |
|
while True: |
|
length = 0 |
|
while self.peek(length) not in '\'\"\\\0 \t\r\n\x85\u2028\u2029': |
|
length += 1 |
|
if length: |
|
chunks.append(self.prefix(length)) |
|
self.forward(length) |
|
ch = self.peek() |
|
if not double and ch == '\'' and self.peek(1) == '\'': |
|
chunks.append('\'') |
|
self.forward(2) |
|
elif (double and ch == '\'') or (not double and ch in '\"\\'): |
|
chunks.append(ch) |
|
self.forward() |
|
elif double and ch == '\\': |
|
self.forward() |
|
ch = self.peek() |
|
if ch in self.ESCAPE_REPLACEMENTS: |
|
chunks.append(self.ESCAPE_REPLACEMENTS[ch]) |
|
self.forward() |
|
elif ch in self.ESCAPE_CODES: |
|
length = self.ESCAPE_CODES[ch] |
|
self.forward() |
|
for k in range(length): |
|
if self.peek(k) not in '0123456789ABCDEFabcdef': |
|
raise ScannerError("while scanning a double-quoted scalar", start_mark, |
|
"expected escape sequence of %d hexadecimal numbers, but found %r" % |
|
(length, self.peek(k)), self.get_mark()) |
|
code = int(self.prefix(length), 16) |
|
chunks.append(chr(code)) |
|
self.forward(length) |
|
elif ch in '\r\n\x85\u2028\u2029': |
|
self.scan_line_break() |
|
chunks.extend(self.scan_flow_scalar_breaks(double, start_mark)) |
|
else: |
|
raise ScannerError("while scanning a double-quoted scalar", start_mark, |
|
"found unknown escape character %r" % ch, self.get_mark()) |
|
else: |
|
return chunks |
|
|
|
def scan_flow_scalar_spaces(self, double, start_mark): |
|
|
|
chunks = [] |
|
length = 0 |
|
while self.peek(length) in ' \t': |
|
length += 1 |
|
whitespaces = self.prefix(length) |
|
self.forward(length) |
|
ch = self.peek() |
|
if ch == '\0': |
|
raise ScannerError("while scanning a quoted scalar", start_mark, |
|
"found unexpected end of stream", self.get_mark()) |
|
elif ch in '\r\n\x85\u2028\u2029': |
|
line_break = self.scan_line_break() |
|
breaks = self.scan_flow_scalar_breaks(double, start_mark) |
|
if line_break != '\n': |
|
chunks.append(line_break) |
|
elif not breaks: |
|
chunks.append(' ') |
|
chunks.extend(breaks) |
|
else: |
|
chunks.append(whitespaces) |
|
return chunks |
|
|
|
def scan_flow_scalar_breaks(self, double, start_mark): |
|
|
|
chunks = [] |
|
while True: |
|
|
|
|
|
prefix = self.prefix(3) |
|
if (prefix == '---' or prefix == '...') \ |
|
and self.peek(3) in '\0 \t\r\n\x85\u2028\u2029': |
|
raise ScannerError("while scanning a quoted scalar", start_mark, |
|
"found unexpected document separator", self.get_mark()) |
|
while self.peek() in ' \t': |
|
self.forward() |
|
if self.peek() in '\r\n\x85\u2028\u2029': |
|
chunks.append(self.scan_line_break()) |
|
else: |
|
return chunks |
|
|
|
def scan_plain(self): |
|
|
|
|
|
|
|
|
|
|
|
chunks = [] |
|
start_mark = self.get_mark() |
|
end_mark = start_mark |
|
indent = self.indent+1 |
|
|
|
|
|
|
|
|
|
spaces = [] |
|
while True: |
|
length = 0 |
|
if self.peek() == '#': |
|
break |
|
while True: |
|
ch = self.peek(length) |
|
if ch in '\0 \t\r\n\x85\u2028\u2029' \ |
|
or (ch == ':' and |
|
self.peek(length+1) in '\0 \t\r\n\x85\u2028\u2029' |
|
+ (u',[]{}' if self.flow_level else u''))\ |
|
or (self.flow_level and ch in ',?[]{}'): |
|
break |
|
length += 1 |
|
if length == 0: |
|
break |
|
self.allow_simple_key = False |
|
chunks.extend(spaces) |
|
chunks.append(self.prefix(length)) |
|
self.forward(length) |
|
end_mark = self.get_mark() |
|
spaces = self.scan_plain_spaces(indent, start_mark) |
|
if not spaces or self.peek() == '#' \ |
|
or (not self.flow_level and self.column < indent): |
|
break |
|
return ScalarToken(''.join(chunks), True, start_mark, end_mark) |
|
|
|
def scan_plain_spaces(self, indent, start_mark): |
|
|
|
|
|
|
|
chunks = [] |
|
length = 0 |
|
while self.peek(length) in ' ': |
|
length += 1 |
|
whitespaces = self.prefix(length) |
|
self.forward(length) |
|
ch = self.peek() |
|
if ch in '\r\n\x85\u2028\u2029': |
|
line_break = self.scan_line_break() |
|
self.allow_simple_key = True |
|
prefix = self.prefix(3) |
|
if (prefix == '---' or prefix == '...') \ |
|
and self.peek(3) in '\0 \t\r\n\x85\u2028\u2029': |
|
return |
|
breaks = [] |
|
while self.peek() in ' \r\n\x85\u2028\u2029': |
|
if self.peek() == ' ': |
|
self.forward() |
|
else: |
|
breaks.append(self.scan_line_break()) |
|
prefix = self.prefix(3) |
|
if (prefix == '---' or prefix == '...') \ |
|
and self.peek(3) in '\0 \t\r\n\x85\u2028\u2029': |
|
return |
|
if line_break != '\n': |
|
chunks.append(line_break) |
|
elif not breaks: |
|
chunks.append(' ') |
|
chunks.extend(breaks) |
|
elif whitespaces: |
|
chunks.append(whitespaces) |
|
return chunks |
|
|
|
def scan_tag_handle(self, name, start_mark): |
|
|
|
|
|
|
|
ch = self.peek() |
|
if ch != '!': |
|
raise ScannerError("while scanning a %s" % name, start_mark, |
|
"expected '!', but found %r" % ch, self.get_mark()) |
|
length = 1 |
|
ch = self.peek(length) |
|
if ch != ' ': |
|
while '0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' \ |
|
or ch in '-_': |
|
length += 1 |
|
ch = self.peek(length) |
|
if ch != '!': |
|
self.forward(length) |
|
raise ScannerError("while scanning a %s" % name, start_mark, |
|
"expected '!', but found %r" % ch, self.get_mark()) |
|
length += 1 |
|
value = self.prefix(length) |
|
self.forward(length) |
|
return value |
|
|
|
def scan_tag_uri(self, name, start_mark): |
|
|
|
|
|
chunks = [] |
|
length = 0 |
|
ch = self.peek(length) |
|
while '0' <= ch <= '9' or 'A' <= ch <= 'Z' or 'a' <= ch <= 'z' \ |
|
or ch in '-;/?:@&=+$,_.!~*\'()[]%': |
|
if ch == '%': |
|
chunks.append(self.prefix(length)) |
|
self.forward(length) |
|
length = 0 |
|
chunks.append(self.scan_uri_escapes(name, start_mark)) |
|
else: |
|
length += 1 |
|
ch = self.peek(length) |
|
if length: |
|
chunks.append(self.prefix(length)) |
|
self.forward(length) |
|
length = 0 |
|
if not chunks: |
|
raise ScannerError("while parsing a %s" % name, start_mark, |
|
"expected URI, but found %r" % ch, self.get_mark()) |
|
return ''.join(chunks) |
|
|
|
def scan_uri_escapes(self, name, start_mark): |
|
|
|
codes = [] |
|
mark = self.get_mark() |
|
while self.peek() == '%': |
|
self.forward() |
|
for k in range(2): |
|
if self.peek(k) not in '0123456789ABCDEFabcdef': |
|
raise ScannerError("while scanning a %s" % name, start_mark, |
|
"expected URI escape sequence of 2 hexadecimal numbers, but found %r" |
|
% self.peek(k), self.get_mark()) |
|
codes.append(int(self.prefix(2), 16)) |
|
self.forward(2) |
|
try: |
|
value = bytes(codes).decode('utf-8') |
|
except UnicodeDecodeError as exc: |
|
raise ScannerError("while scanning a %s" % name, start_mark, str(exc), mark) |
|
return value |
|
|
|
def scan_line_break(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ch = self.peek() |
|
if ch in '\r\n\x85': |
|
if self.prefix(2) == '\r\n': |
|
self.forward(2) |
|
else: |
|
self.forward() |
|
return '\n' |
|
elif ch in '\u2028\u2029': |
|
self.forward() |
|
return ch |
|
return '' |
|
|