|
import codecs |
|
import re |
|
from typing import (IO, Iterator, Match, NamedTuple, Optional, |
|
Pattern, Sequence, Tuple) |
|
|
|
|
|
def make_regex(string: str, extra_flags: int = 0) -> Pattern[str]: |
|
return re.compile(string, re.UNICODE | extra_flags) |
|
|
|
|
|
_newline = make_regex(r"(\r\n|\n|\r)") |
|
_multiline_whitespace = make_regex(r"\s*", extra_flags=re.MULTILINE) |
|
_whitespace = make_regex(r"[^\S\r\n]*") |
|
_export = make_regex(r"(?:export[^\S\r\n]+)?") |
|
_single_quoted_key = make_regex(r"'([^']+)'") |
|
_unquoted_key = make_regex(r"([^=\#\s]+)") |
|
_equal_sign = make_regex(r"(=[^\S\r\n]*)") |
|
_single_quoted_value = make_regex(r"'((?:\\'|[^'])*)'") |
|
_double_quoted_value = make_regex(r'"((?:\\"|[^"])*)"') |
|
_unquoted_value = make_regex(r"([^\r\n]*)") |
|
_comment = make_regex(r"(?:[^\S\r\n]*#[^\r\n]*)?") |
|
_end_of_line = make_regex(r"[^\S\r\n]*(?:\r\n|\n|\r|$)") |
|
_rest_of_line = make_regex(r"[^\r\n]*(?:\r|\n|\r\n)?") |
|
_double_quote_escapes = make_regex(r"\\[\\'\"abfnrtv]") |
|
_single_quote_escapes = make_regex(r"\\[\\']") |
|
|
|
|
|
class Original(NamedTuple): |
|
string: str |
|
line: int |
|
|
|
|
|
class Binding(NamedTuple): |
|
key: Optional[str] |
|
value: Optional[str] |
|
original: Original |
|
error: bool |
|
|
|
|
|
class Position: |
|
def __init__(self, chars: int, line: int) -> None: |
|
self.chars = chars |
|
self.line = line |
|
|
|
@classmethod |
|
def start(cls) -> "Position": |
|
return cls(chars=0, line=1) |
|
|
|
def set(self, other: "Position") -> None: |
|
self.chars = other.chars |
|
self.line = other.line |
|
|
|
def advance(self, string: str) -> None: |
|
self.chars += len(string) |
|
self.line += len(re.findall(_newline, string)) |
|
|
|
|
|
class Error(Exception): |
|
pass |
|
|
|
|
|
class Reader: |
|
def __init__(self, stream: IO[str]) -> None: |
|
self.string = stream.read() |
|
self.position = Position.start() |
|
self.mark = Position.start() |
|
|
|
def has_next(self) -> bool: |
|
return self.position.chars < len(self.string) |
|
|
|
def set_mark(self) -> None: |
|
self.mark.set(self.position) |
|
|
|
def get_marked(self) -> Original: |
|
return Original( |
|
string=self.string[self.mark.chars:self.position.chars], |
|
line=self.mark.line, |
|
) |
|
|
|
def peek(self, count: int) -> str: |
|
return self.string[self.position.chars:self.position.chars + count] |
|
|
|
def read(self, count: int) -> str: |
|
result = self.string[self.position.chars:self.position.chars + count] |
|
if len(result) < count: |
|
raise Error("read: End of string") |
|
self.position.advance(result) |
|
return result |
|
|
|
def read_regex(self, regex: Pattern[str]) -> Sequence[str]: |
|
match = regex.match(self.string, self.position.chars) |
|
if match is None: |
|
raise Error("read_regex: Pattern not found") |
|
self.position.advance(self.string[match.start():match.end()]) |
|
return match.groups() |
|
|
|
|
|
def decode_escapes(regex: Pattern[str], string: str) -> str: |
|
def decode_match(match: Match[str]) -> str: |
|
return codecs.decode(match.group(0), 'unicode-escape') |
|
|
|
return regex.sub(decode_match, string) |
|
|
|
|
|
def parse_key(reader: Reader) -> Optional[str]: |
|
char = reader.peek(1) |
|
if char == "#": |
|
return None |
|
elif char == "'": |
|
(key,) = reader.read_regex(_single_quoted_key) |
|
else: |
|
(key,) = reader.read_regex(_unquoted_key) |
|
return key |
|
|
|
|
|
def parse_unquoted_value(reader: Reader) -> str: |
|
(part,) = reader.read_regex(_unquoted_value) |
|
return re.sub(r"\s+#.*", "", part).rstrip() |
|
|
|
|
|
def parse_value(reader: Reader) -> str: |
|
char = reader.peek(1) |
|
if char == u"'": |
|
(value,) = reader.read_regex(_single_quoted_value) |
|
return decode_escapes(_single_quote_escapes, value) |
|
elif char == u'"': |
|
(value,) = reader.read_regex(_double_quoted_value) |
|
return decode_escapes(_double_quote_escapes, value) |
|
elif char in (u"", u"\n", u"\r"): |
|
return u"" |
|
else: |
|
return parse_unquoted_value(reader) |
|
|
|
|
|
def parse_binding(reader: Reader) -> Binding: |
|
reader.set_mark() |
|
try: |
|
reader.read_regex(_multiline_whitespace) |
|
if not reader.has_next(): |
|
return Binding( |
|
key=None, |
|
value=None, |
|
original=reader.get_marked(), |
|
error=False, |
|
) |
|
reader.read_regex(_export) |
|
key = parse_key(reader) |
|
reader.read_regex(_whitespace) |
|
if reader.peek(1) == "=": |
|
reader.read_regex(_equal_sign) |
|
value: Optional[str] = parse_value(reader) |
|
else: |
|
value = None |
|
reader.read_regex(_comment) |
|
reader.read_regex(_end_of_line) |
|
return Binding( |
|
key=key, |
|
value=value, |
|
original=reader.get_marked(), |
|
error=False, |
|
) |
|
except Error: |
|
reader.read_regex(_rest_of_line) |
|
return Binding( |
|
key=None, |
|
value=None, |
|
original=reader.get_marked(), |
|
error=True, |
|
) |
|
|
|
|
|
def parse_stream(stream: IO[str]) -> Iterator[Binding]: |
|
reader = Reader(stream) |
|
while reader.has_next(): |
|
yield parse_binding(reader) |
|
|