ReadReview / deepdoc /utils /log_utils.py
root
first commit
7a919c0
#
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import typing
import traceback
import logging
import inspect
from logging.handlers import TimedRotatingFileHandler
from threading import RLock
from deepdoc.utils import file_utils
class LoggerFactory(object):
TYPE = "FILE"
LOG_FORMAT = "[%(levelname)s] [%(asctime)s] [%(module)s.%(funcName)s] [line:%(lineno)d]: %(message)s"
logging.basicConfig(format=LOG_FORMAT)
LEVEL = logging.DEBUG
logger_dict = {}
global_handler_dict = {}
LOG_DIR = None
PARENT_LOG_DIR = None
log_share = True
append_to_parent_log = None
lock = RLock()
# CRITICAL = 50
# FATAL = CRITICAL
# ERROR = 40
# WARNING = 30
# WARN = WARNING
# INFO = 20
# DEBUG = 10
# NOTSET = 0
levels = (10, 20, 30, 40)
schedule_logger_dict = {}
@staticmethod
def set_directory(directory=None, parent_log_dir=None,
append_to_parent_log=None, force=False):
if parent_log_dir:
LoggerFactory.PARENT_LOG_DIR = parent_log_dir
if append_to_parent_log:
LoggerFactory.append_to_parent_log = append_to_parent_log
with LoggerFactory.lock:
if not directory:
directory = file_utils.get_project_base_directory("logs")
if not LoggerFactory.LOG_DIR or force:
LoggerFactory.LOG_DIR = directory
if LoggerFactory.log_share:
oldmask = os.umask(000)
os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True)
os.umask(oldmask)
else:
os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True)
for loggerName, ghandler in LoggerFactory.global_handler_dict.items():
for className, (logger,
handler) in LoggerFactory.logger_dict.items():
logger.removeHandler(ghandler)
ghandler.close()
LoggerFactory.global_handler_dict = {}
for className, (logger,
handler) in LoggerFactory.logger_dict.items():
logger.removeHandler(handler)
_handler = None
if handler:
handler.close()
if className != "default":
_handler = LoggerFactory.get_handler(className)
logger.addHandler(_handler)
LoggerFactory.assemble_global_handler(logger)
LoggerFactory.logger_dict[className] = logger, _handler
@staticmethod
def new_logger(name):
logger = logging.getLogger(name)
logger.propagate = False
logger.setLevel(LoggerFactory.LEVEL)
return logger
@staticmethod
def get_logger(class_name=None):
with LoggerFactory.lock:
if class_name in LoggerFactory.logger_dict.keys():
logger, handler = LoggerFactory.logger_dict[class_name]
if not logger:
logger, handler = LoggerFactory.init_logger(class_name)
else:
logger, handler = LoggerFactory.init_logger(class_name)
return logger
@staticmethod
def get_global_handler(logger_name, level=None, log_dir=None):
if not LoggerFactory.LOG_DIR:
return logging.StreamHandler()
if log_dir:
logger_name_key = logger_name + "_" + log_dir
else:
logger_name_key = logger_name + "_" + LoggerFactory.LOG_DIR
# if loggerName not in LoggerFactory.globalHandlerDict:
if logger_name_key not in LoggerFactory.global_handler_dict:
with LoggerFactory.lock:
if logger_name_key not in LoggerFactory.global_handler_dict:
handler = LoggerFactory.get_handler(
logger_name, level, log_dir)
LoggerFactory.global_handler_dict[logger_name_key] = handler
return LoggerFactory.global_handler_dict[logger_name_key]
@staticmethod
def get_handler(class_name, level=None, log_dir=None,
log_type=None, job_id=None):
if not log_type:
if not LoggerFactory.LOG_DIR or not class_name:
return logging.StreamHandler()
# return Diy_StreamHandler()
if not log_dir:
log_file = os.path.join(
LoggerFactory.LOG_DIR,
"{}.log".format(class_name))
else:
log_file = os.path.join(log_dir, "{}.log".format(class_name))
else:
log_file = os.path.join(log_dir, "rag_flow_{}.log".format(
log_type) if level == LoggerFactory.LEVEL else 'rag_flow_{}_error.log'.format(log_type))
os.makedirs(os.path.dirname(log_file), exist_ok=True)
if LoggerFactory.log_share:
handler = ROpenHandler(log_file,
when='D',
interval=1,
backupCount=14,
delay=True)
else:
handler = TimedRotatingFileHandler(log_file,
when='D',
interval=1,
backupCount=14,
delay=True)
if level:
handler.level = level
return handler
@staticmethod
def init_logger(class_name):
with LoggerFactory.lock:
logger = LoggerFactory.new_logger(class_name)
handler = None
if class_name:
handler = LoggerFactory.get_handler(class_name)
logger.addHandler(handler)
LoggerFactory.logger_dict[class_name] = logger, handler
else:
LoggerFactory.logger_dict["default"] = logger, handler
LoggerFactory.assemble_global_handler(logger)
return logger, handler
@staticmethod
def assemble_global_handler(logger):
if LoggerFactory.LOG_DIR:
for level in LoggerFactory.levels:
if level >= LoggerFactory.LEVEL:
level_logger_name = logging._levelToName[level]
logger.addHandler(
LoggerFactory.get_global_handler(
level_logger_name, level))
if LoggerFactory.append_to_parent_log and LoggerFactory.PARENT_LOG_DIR:
for level in LoggerFactory.levels:
if level >= LoggerFactory.LEVEL:
level_logger_name = logging._levelToName[level]
logger.addHandler(
LoggerFactory.get_global_handler(level_logger_name, level, LoggerFactory.PARENT_LOG_DIR))
def setDirectory(directory=None):
LoggerFactory.set_directory(directory)
def setLevel(level):
LoggerFactory.LEVEL = level
def getLogger(className=None, useLevelFile=False):
if className is None:
frame = inspect.stack()[1]
module = inspect.getmodule(frame[0])
className = 'stat'
return LoggerFactory.get_logger(className)
def exception_to_trace_string(ex):
return "".join(traceback.TracebackException.from_exception(ex).format())
class ROpenHandler(TimedRotatingFileHandler):
def _open(self):
prevumask = os.umask(000)
rtv = TimedRotatingFileHandler._open(self)
os.umask(prevumask)
return rtv
def sql_logger(job_id='', log_type='sql'):
key = job_id + log_type
if key in LoggerFactory.schedule_logger_dict.keys():
return LoggerFactory.schedule_logger_dict[key]
return get_job_logger(job_id=job_id, log_type=log_type)
def ready_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
prefix, suffix = base_msg(job, task, role, party_id, detail)
return f"{prefix}{msg} ready{suffix}"
def start_log(msg, job=None, task=None, role=None, party_id=None, detail=None):
prefix, suffix = base_msg(job, task, role, party_id, detail)
return f"{prefix}start to {msg}{suffix}"
def successful_log(msg, job=None, task=None, role=None,
party_id=None, detail=None):
prefix, suffix = base_msg(job, task, role, party_id, detail)
return f"{prefix}{msg} successfully{suffix}"
def warning_log(msg, job=None, task=None, role=None,
party_id=None, detail=None):
prefix, suffix = base_msg(job, task, role, party_id, detail)
return f"{prefix}{msg} is not effective{suffix}"
def failed_log(msg, job=None, task=None, role=None,
party_id=None, detail=None):
prefix, suffix = base_msg(job, task, role, party_id, detail)
return f"{prefix}failed to {msg}{suffix}"
def base_msg(job=None, task=None, role: str = None,
party_id: typing.Union[str, int] = None, detail=None):
if detail:
detail_msg = f" detail: \n{detail}"
else:
detail_msg = ""
if task is not None:
return f"task {task.f_task_id} {task.f_task_version} ", f" on {task.f_role} {task.f_party_id}{detail_msg}"
elif job is not None:
return "", f" on {job.f_role} {job.f_party_id}{detail_msg}"
elif role and party_id:
return "", f" on {role} {party_id}{detail_msg}"
else:
return "", f"{detail_msg}"
def exception_to_trace_string(ex):
return "".join(traceback.TracebackException.from_exception(ex).format())
def get_logger_base_dir():
job_log_dir = file_utils.get_rag_flow_directory('logs')
return job_log_dir
def get_job_logger(job_id, log_type):
rag_flow_log_dir = file_utils.get_rag_flow_directory('logs', 'rag_flow')
job_log_dir = file_utils.get_rag_flow_directory('logs', job_id)
if not job_id:
log_dirs = [rag_flow_log_dir]
else:
if log_type == 'audit':
log_dirs = [job_log_dir, rag_flow_log_dir]
else:
log_dirs = [job_log_dir]
if LoggerFactory.log_share:
oldmask = os.umask(000)
os.makedirs(job_log_dir, exist_ok=True)
os.makedirs(rag_flow_log_dir, exist_ok=True)
os.umask(oldmask)
else:
os.makedirs(job_log_dir, exist_ok=True)
os.makedirs(rag_flow_log_dir, exist_ok=True)
logger = LoggerFactory.new_logger(f"{job_id}_{log_type}")
for job_log_dir in log_dirs:
handler = LoggerFactory.get_handler(class_name=None, level=LoggerFactory.LEVEL,
log_dir=job_log_dir, log_type=log_type, job_id=job_id)
error_handler = LoggerFactory.get_handler(
class_name=None,
level=logging.ERROR,
log_dir=job_log_dir,
log_type=log_type,
job_id=job_id)
logger.addHandler(handler)
logger.addHandler(error_handler)
with LoggerFactory.lock:
LoggerFactory.schedule_logger_dict[job_id + log_type] = logger
return logger