dms3_demo / utils /common.py
qilongyu
Add application file
446f9ef
# -*-coding:utf-8-*-
import colorsys
import datetime
import functools
import signal
import sys
from contextlib import contextmanager
import numpy as np
import time
# Generate colors
def gen_color(n):
hsv_tuples = [(x / n, 1., 1.)
for x in range(n)]
colors = list(map(lambda x: colorsys.hsv_to_rgb(*x), hsv_tuples))
colors = list(map(lambda x: (int(x[0] * 255), int(x[1] * 255), int(x[2] * 255)), colors))
# np.random.seed(10101) # Fixed seed for consistent colors across runs.
np.random.shuffle(colors) # Shuffle colors to decorrelate adjacent classes.
# np.random.seed(None) # Reset seed to default.
return colors
# pylint: disable=W0232
class Color:
GRAY = 30
RED = 31
GREEN = 32
YELLOW = 33
BLUE = 34
MAGENTA = 35
CYAN = 36
WHITE = 67
CRIMSON = 38
# 返回字符串的输出格式码,调整前景色背景色、加粗等
def colorize(num, string, bold=False, highlight=False):
assert isinstance(num, int)
attr = []
if bold:
attr.append('1')
if highlight and num == 67:
num += 30
if highlight and num != 67:
num += 60
attr.append(str(num))
# \x1b[显示方式;前景色;背景色m +"输出内容"+\x1b[0m
# ; 的顺序可以改变
return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), string)
def colorprint(colorcode, text, o=sys.stdout, bold=False, highlight=False, end='\n'):
o.write(colorize(colorcode, text, bold=bold, highlight=highlight) + end)
def cprint(text, colorcode=67, bold=False, highlight=False, end='\n', prefix=None,
pre_color=34, pre_bold=True, pre_high=True, pre_end=': '):
prefix = str(prefix).rstrip() if prefix is not None else prefix
if prefix is not None:
prefix = prefix.rstrip(':') if ':' in pre_end else prefix
prefix += pre_end
colorprint(pre_color, prefix, sys.stdout, pre_bold, pre_high, end='')
colorprint(colorcode, text, bold=bold, highlight=highlight, end=end)
def log_warn(msg):
cprint(msg, colorcode=33, prefix='Warning', pre_color=33, highlight=True)
def log_error(msg):
cprint(msg, colorcode=31, prefix='Error', pre_color=31, highlight=True)
def now_time():
return datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
# http://stackoverflow.com/questions/366682/how-to-limit-execution-time-of-a-function-call-in-python
class TimeoutException(Exception):
def __init__(self, msg):
self.msg = msg
# 上下文管理器,装饰函数func, func()中必须有yield.使用方法:
# with func as f:
# do()
# with语句执行yield之前的代码,然后执行f和do,最后执行yield之后的代码
@contextmanager
def time_limit(seconds):
# 这是一个限制程序运行时间的函数,如果超出预设时间,会报错
def signal_handler(signum, frame): # 信号处理函数必须有这两个参数
raise TimeoutException(colorize(Color.RED, "Timed out! Retry again ...", highlight=True))
signal.signal(signal.SIGALRM, signal_handler) # 接收信号,signal.SIGALRM 为信号,signal_handler处理信号的函数
signal.alarm(seconds) # 如果seconds是非0,这个函数则响应一个SIGALRM信号并在seconds秒后发送到该进程。
try:
yield
finally:
signal.alarm(0) # 中断信号
def clock(func): # 加强版
@functools.wraps(func) # 把func的属性复制给clocked,避免func的name变成clocked
def clocked(*args, **kwargs): # 内部函数可接受任意个参数,以及支持关键字参数
t0 = time.perf_counter()
result = func(*args, **kwargs) # clocked闭包中包含自由变量func
elapsed = time.perf_counter() - t0
name = func.__name__
arg_lst = []
if args:
arg_lst.append(', '.join(repr(arg) for arg in args))
if kwargs:
pairs = ['%s=%r' % (k, w) for k, w in sorted(kwargs.items())]
arg_lst.append(', '.join(pairs))
arg_str = ', '.join(arg_lst)
colorprint(Color.GREEN, '[%0.8fs] %s(%s) -> %r' % (elapsed, name, arg_str, result))
return result
return clocked # 返回clocked,取代被装饰的函数
DEFAULT_FMT = '[{elapsed:0.8f}s] {name}({args}) -> {result}'
def clock_custom(fmt=DEFAULT_FMT, color=Color.GREEN):
def decorate(func): # 加强版
@functools.wraps(func) # 把func的属性复制给clocked,避免func的name变成clocked
def clocked(*_args, **_kwargs): # 内部函数可接受任意个参数,以及支持关键字参数
t0 = time.perf_counter()
_result = func(*_args, **_kwargs) # clocked闭包中包含自由变量func
elapsed = time.perf_counter() - t0
name = func.__name__
_arg_lst = []
if _args:
_arg_lst.append(', '.join(repr(arg) for arg in _args))
if _kwargs:
_pairs = ['%s=%r' % (k, w) for k, w in sorted(_kwargs.items())]
_arg_lst.append(', '.join(_pairs))
args = ', '.join(_arg_lst)
result = repr(_result) # 字符串形式
colorprint(color, fmt.format(**locals())) # 使用clocked的局部变量
return _result # 返回原函数的结果
return clocked # 返回clocked,取代被装饰的函数
return decorate # 装饰器工厂必须返回装饰器
def colorstr(*input):
# Colors a string https://en.wikipedia.org/wiki/ANSI_escape_code, i.e. colorstr('blue', 'hello world')
*args, string = input if len(input) > 1 else ('blue', 'bold', input[0]) # color arguments, string
colors = {'black': '\033[30m', # basic colors
'red': '\033[31m',
'green': '\033[32m',
'yellow': '\033[33m',
'blue': '\033[34m',
'magenta': '\033[35m',
'cyan': '\033[36m',
'white': '\033[37m',
'bright_black': '\033[90m', # bright colors
'bright_red': '\033[91m',
'bright_green': '\033[92m',
'bright_yellow': '\033[93m',
'bright_blue': '\033[94m',
'bright_magenta': '\033[95m',
'bright_cyan': '\033[96m',
'bright_white': '\033[97m',
'end': '\033[0m', # misc
'bold': '\033[1m',
'underline': '\033[4m'}
return ''.join(colors[x] for x in args) + f'{string}' + colors['end']
class Logger:
def __init__(self, file_path):
self.log_file_path = file_path
self.log_file = None
self.color = {
'R': Color.RED,
'B': Color.BLUE,
'G': Color.GREEN,
'Y': Color.YELLOW
}
def start(self):
self.log_file = open(self.log_file_path, 'w', encoding='utf-8')
def close(self):
if self.log_file is not None:
self.log_file.close()
return
def info(self, text, color='W', prefix=None, pre_color='B', pre_end=': ', prints=True):
assert self.log_file is not None, "Please firstly confirm 'logger.start()' method"
color_code = self.color.get(color, Color.WHITE)
pre_color = self.color.get(pre_color, Color.BLUE)
prefix = str(prefix).rstrip() if prefix is not None else prefix
if prefix is not None:
prefix = prefix.rstrip(':') if ':' in pre_end else prefix
prefix += pre_end
self.log_file.write(f"{prefix if prefix is not None else ''}{text}\n")
if prints:
cprint(text, color_code, prefix=prefix, pre_color=pre_color)
def error(self, text):
assert self.log_file is not None, "Please firstly confirm 'logger.start()' method"
# self.log_file.write(text + '\n')
log_error(text)
def warn(self, text):
assert self.log_file is not None, "Please firstly confirm 'logger.start()' method"
# self.log_file.write(text + '\n')
log_warn(text)
def r(val):
return int(np.random.random() * val)
# if __name__ == '__main__':
# #print(colorize(31, 'I am fine!', bold=True, highlight=True))
# #colorprint(35, 'I am fine!')
# #error('get out')
# import time
#
# # ends after 5 seconds
# with time_limit(2):
# time.sleep(3)