| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | from __future__ import absolute_import |
| | from __future__ import print_function |
| | from __future__ import unicode_literals |
| | from contextlib import contextmanager |
| | from functools import partial |
| | import codecs |
| | import logging |
| | import os |
| | import shlex |
| | import shutil |
| | import subprocess |
| | import sys |
| | import tempfile |
| |
|
| | from .importhelper import pkg_resources_filename |
| |
|
| |
|
| | PY3 = sys.version_info.major == 3 |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class NIL: |
| | pass |
| |
|
| |
|
| | class cached_property(object): |
| |
|
| | def __init__(self, func): |
| | self.func = func |
| | self.__name__ = func.__name__ |
| | self.__doc__ = func.__doc__ |
| |
|
| | def __get__(self, obj, type=None): |
| | if obj is None: |
| | return self |
| | value = obj.__dict__.get(self.__name__, NIL) |
| | if value is NIL: |
| | value = self.func(obj) |
| | obj.__dict__[self.__name__] = value |
| | return value |
| |
|
| | def __set__(self, obj, value): |
| | obj.__dict__[self.__name__] = value |
| |
|
| |
|
| | def generate_json_array(tokens): |
| | ''' generate json array with given tokens ''' |
| | first = True |
| | for token in tokens: |
| | if first: |
| | yield '[\n' |
| | first = False |
| | else: |
| | yield ',\n' |
| | yield token |
| | yield '\n]' |
| |
|
| |
|
| | class JsonObjects(object): |
| |
|
| | def __init__(self, objects, object_to_json): |
| | self.objects = objects |
| | self.object_to_json = object_to_json |
| |
|
| | def generate(self, **kwargs): |
| | kwargs.setdefault('sort_keys', True) |
| | kwargs.setdefault('indent', 2) |
| |
|
| | tokens = (self.object_to_json(obj, **kwargs) |
| | for obj in self.objects) |
| | return generate_json_array(tokens) |
| |
|
| | def open(self, **kwargs): |
| | chunks = self.generate(**kwargs) |
| | chunks = (chunk.encode('utf-8') for chunk in chunks) |
| | return GeneratorReader(chunks) |
| |
|
| | def dump(self, outfile, **kwargs): |
| | for s in self.generate(**kwargs): |
| | outfile.write(s) |
| |
|
| |
|
| | def unicode_escape(s): |
| | ''' |
| | Escape a string. |
| | |
| | :param s: |
| | a string to escape |
| | :type s: |
| | unicode |
| | :returns: |
| | escaped string |
| | :rtype: |
| | unicode |
| | ''' |
| | return s.encode('unicode_escape').decode('utf-8') |
| |
|
| |
|
| | def unicode_unescape(s): |
| | ''' |
| | Unescape a string. |
| | |
| | :param s: |
| | a string to unescape |
| | :type s: |
| | unicode |
| | :returns: |
| | unescaped string |
| | :rtype: |
| | unicode |
| | ''' |
| | return s.encode('utf-8').decode('unicode_escape') |
| |
|
| |
|
| | def transcode(backend_stream, backend_encoding, frontend_encoding, |
| | errors='strict'): |
| | enc = codecs.getencoder(frontend_encoding) |
| | dec = codecs.getdecoder(frontend_encoding) |
| | rd = codecs.getreader(backend_encoding) |
| | wr = codecs.getwriter(backend_encoding) |
| | return codecs.StreamRecoder(backend_stream, enc, dec, rd, wr, errors) |
| |
|
| |
|
| | def transcoder(backend_encoding, frontend_encoding, errors='strict'): |
| | return partial(transcode, |
| | backend_encoding=backend_encoding, |
| | frontend_encoding=frontend_encoding, |
| | errors=errors) |
| |
|
| |
|
| | class GeneratorReader(object): |
| | ''' convert a string generator into file-like reader |
| | |
| | def gen(): |
| | yield b'hello' |
| | yield b'world' |
| | |
| | f = GeneratorReader(gen()) |
| | assert 'hell' == f.read(4) |
| | assert 'oworld' == f.read() |
| | ''' |
| |
|
| | def __init__(self, gen): |
| | self.gen = gen |
| | self.buffer = b'' |
| |
|
| | def read(self, size=None): |
| | if size is None: |
| | d, self.buffer = self.buffer, b'' |
| | return d + b''.join(self.gen) |
| |
|
| | for data in self.gen: |
| | self.buffer += data |
| | bufsize = len(self.buffer) |
| | if bufsize >= size: |
| | size = min(bufsize, size) |
| | d, self.buffer = self.buffer[:size], self.buffer[size:] |
| | return d |
| |
|
| | d, self.buffer = self.buffer, b'' |
| | return d |
| |
|
| | def close(self): |
| | self.gen = self.buffer = None |
| |
|
| |
|
| | class GeneratorTextReader(object): |
| | ''' convert a string generator into file-like reader |
| | |
| | def gen(): |
| | yield 'hello' |
| | yield 'world' |
| | |
| | f = GeneratorTextReader(gen()) |
| | assert 'hell' == f.read(4) |
| | assert 'oworld' == f.read() |
| | ''' |
| |
|
| | def __init__(self, gen): |
| | self.gen = gen |
| | self.buffer = '' |
| |
|
| | def read(self, size=None): |
| | if size is None: |
| | d = self.buffer |
| | self.buffer = '' |
| | return d + ''.join(self.gen) |
| |
|
| | for data in self.gen: |
| | self.buffer += data |
| | bufsize = len(self.buffer) |
| | if bufsize >= size: |
| | size = min(bufsize, size) |
| | d, self.buffer = self.buffer[:size], self.buffer[size:] |
| | return d |
| |
|
| | d = self.buffer |
| | self.buffer = '' |
| | return d |
| |
|
| | def close(self): |
| | self.gen = self.buffer = None |
| |
|
| |
|
| | @contextmanager |
| | def hwp5_resources_path(res_path): |
| | try: |
| | path = pkg_resources_filename('hwp5', res_path) |
| | except Exception: |
| | logger.info('%s: pkg_resources_filename failed; using resource_stream', |
| | res_path) |
| | with mkstemp_open() as (path, g): |
| | import pkg_resources |
| | f = pkg_resources.resource_stream('hwp5', res_path) |
| | try: |
| | shutil.copyfileobj(f, g) |
| | g.close() |
| | yield path |
| | finally: |
| | f.close() |
| | else: |
| | yield path |
| |
|
| |
|
| | def make_open_dest_file(path): |
| | if path: |
| | @contextmanager |
| | def open_dest_path(): |
| | with open(path, 'wb') as f: |
| | yield f |
| | return open_dest_path |
| | else: |
| | if PY3: |
| | @contextmanager |
| | def open_stdout(): |
| | yield sys.stdout.buffer |
| | return open_stdout |
| | else: |
| | @contextmanager |
| | def open_stdout(): |
| | yield sys.stdout |
| | return open_stdout |
| |
|
| |
|
| | def wrap_open_dest_for_tty(open_dest, wrappers): |
| | @contextmanager |
| | def open_dest_wrapped(): |
| | with open_dest() as output: |
| | if output.isatty(): |
| | with cascade_contextmanager_filters(output, |
| | wrappers) as output: |
| | yield output |
| | else: |
| | yield output |
| | return open_dest_wrapped |
| |
|
| |
|
| | def wrap_open_dest(open_dest, wrappers): |
| | @contextmanager |
| | def open_dest_wrapped(): |
| | with open_dest() as output: |
| | with cascade_contextmanager_filters(output, wrappers) as output: |
| | yield output |
| | return open_dest_wrapped |
| |
|
| |
|
| | @contextmanager |
| | def cascade_contextmanager_filters(arg, filters): |
| | if len(filters) == 0: |
| | yield arg |
| | else: |
| | flt, filters = filters[0], filters[1:] |
| | with flt(arg) as ret: |
| | with cascade_contextmanager_filters(ret, filters) as ret: |
| | yield ret |
| |
|
| |
|
| | @contextmanager |
| | def null_contextmanager_filter(output): |
| | yield output |
| |
|
| |
|
| | def output_thru_subprocess(cmd): |
| | @contextmanager |
| | def filter(output): |
| | logger.debug('%r', cmd) |
| | try: |
| | p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=output) |
| | except Exception as e: |
| | logger.error('%r: %s', ' '.join(cmd), e) |
| | yield output |
| | else: |
| | try: |
| | yield p.stdin |
| | except IOError as e: |
| | import errno |
| | if e.errno != errno.EPIPE: |
| | raise |
| | finally: |
| | p.stdin.close() |
| | p.wait() |
| | retcode = p.returncode |
| | logger.debug('%r exit %d', cmd, retcode) |
| | return filter |
| |
|
| |
|
| | def xmllint(c14n=False, encode=None, format=False, nonet=True): |
| | cmd = ['xmllint'] |
| | if c14n: |
| | cmd.append('--c14n') |
| | if encode: |
| | cmd += ['--encode', encode] |
| | if format: |
| | cmd.append('--format') |
| | if nonet: |
| | cmd.append('--nonet') |
| | cmd.append('-') |
| | return output_thru_subprocess(cmd) |
| |
|
| |
|
| | def syntaxhighlight(mimetype): |
| | try: |
| | return syntaxhighlight_pygments(mimetype) |
| | except Exception as e: |
| | logger.info(e) |
| | return null_contextmanager_filter |
| |
|
| |
|
| | def syntaxhighlight_pygments(mimetype): |
| | from pygments import highlight |
| | from pygments.lexers import get_lexer_for_mimetype |
| | from pygments.formatters import TerminalFormatter |
| |
|
| | lexer = get_lexer_for_mimetype(mimetype, encoding='utf-8') |
| | formatter = TerminalFormatter(encoding='utf-8') |
| |
|
| | @contextmanager |
| | def filter(output): |
| | with make_temp_file() as f: |
| | yield f |
| | f.seek(0) |
| | code = f.read() |
| | highlight(code, lexer, formatter, output) |
| | return filter |
| |
|
| |
|
| | @contextmanager |
| | def make_temp_file(): |
| | fd, name = tempfile.mkstemp() |
| | with unlink_path(name): |
| | with os.fdopen(fd, 'w+') as f: |
| | yield f |
| |
|
| |
|
| | @contextmanager |
| | def unlink_path(path): |
| | import os |
| | try: |
| | yield |
| | finally: |
| | os.unlink(path) |
| |
|
| |
|
| | def pager(): |
| | pager_cmd = os.environ.get('PAGER') |
| | if pager_cmd: |
| | pager_cmd = shlex.split(pager_cmd) |
| | return output_thru_subprocess(pager_cmd) |
| | return pager_less |
| |
|
| |
|
| | pager_less = output_thru_subprocess(['less', '-R']) |
| |
|
| |
|
| | @contextmanager |
| | def mkstemp_open(*args, **kwargs): |
| |
|
| | if (kwargs.get('text', False) or (len(args) >= 4 and args[3])): |
| | text = True |
| | else: |
| | text = False |
| |
|
| | mode = 'w+' if text else 'wb+' |
| | fd, path = tempfile.mkstemp(*args, **kwargs) |
| | try: |
| | f = os.fdopen(fd, mode) |
| | try: |
| | yield path, f |
| | finally: |
| | try: |
| | f.close() |
| | except Exception: |
| | pass |
| | finally: |
| | unlink_or_warning(path) |
| |
|
| |
|
| | def unlink_or_warning(path): |
| | try: |
| | os.unlink(path) |
| | except Exception as e: |
| | logger.exception(e) |
| | logger.warning('%s cannot be deleted', path) |
| |
|