Spaces:
Paused
Paused
# | |
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
import os | |
import typing | |
import traceback | |
import logging | |
import inspect | |
from logging.handlers import TimedRotatingFileHandler | |
from threading import RLock | |
from api.utils import file_utils | |
class LoggerFactory(object): | |
TYPE = "FILE" | |
LOG_FORMAT = "[%(levelname)s] [%(asctime)s] [%(module)s.%(funcName)s] [line:%(lineno)d]: %(message)s" | |
logging.basicConfig(format=LOG_FORMAT) | |
LEVEL = logging.DEBUG | |
logger_dict = {} | |
global_handler_dict = {} | |
LOG_DIR = None | |
PARENT_LOG_DIR = None | |
log_share = True | |
append_to_parent_log = None | |
lock = RLock() | |
# CRITICAL = 50 | |
# FATAL = CRITICAL | |
# ERROR = 40 | |
# WARNING = 30 | |
# WARN = WARNING | |
# INFO = 20 | |
# DEBUG = 10 | |
# NOTSET = 0 | |
levels = (10, 20, 30, 40) | |
schedule_logger_dict = {} | |
def set_directory(directory=None, parent_log_dir=None, | |
append_to_parent_log=None, force=False): | |
if parent_log_dir: | |
LoggerFactory.PARENT_LOG_DIR = parent_log_dir | |
if append_to_parent_log: | |
LoggerFactory.append_to_parent_log = append_to_parent_log | |
with LoggerFactory.lock: | |
if not directory: | |
directory = file_utils.get_project_base_directory("logs") | |
if not LoggerFactory.LOG_DIR or force: | |
LoggerFactory.LOG_DIR = directory | |
if LoggerFactory.log_share: | |
oldmask = os.umask(000) | |
os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True) | |
os.umask(oldmask) | |
else: | |
os.makedirs(LoggerFactory.LOG_DIR, exist_ok=True) | |
for loggerName, ghandler in LoggerFactory.global_handler_dict.items(): | |
for className, (logger, | |
handler) in LoggerFactory.logger_dict.items(): | |
logger.removeHandler(ghandler) | |
ghandler.close() | |
LoggerFactory.global_handler_dict = {} | |
for className, (logger, | |
handler) in LoggerFactory.logger_dict.items(): | |
logger.removeHandler(handler) | |
_handler = None | |
if handler: | |
handler.close() | |
if className != "default": | |
_handler = LoggerFactory.get_handler(className) | |
logger.addHandler(_handler) | |
LoggerFactory.assemble_global_handler(logger) | |
LoggerFactory.logger_dict[className] = logger, _handler | |
def new_logger(name): | |
logger = logging.getLogger(name) | |
logger.propagate = False | |
logger.setLevel(LoggerFactory.LEVEL) | |
return logger | |
def get_logger(class_name=None): | |
with LoggerFactory.lock: | |
if class_name in LoggerFactory.logger_dict.keys(): | |
logger, handler = LoggerFactory.logger_dict[class_name] | |
if not logger: | |
logger, handler = LoggerFactory.init_logger(class_name) | |
else: | |
logger, handler = LoggerFactory.init_logger(class_name) | |
return logger | |
def get_global_handler(logger_name, level=None, log_dir=None): | |
if not LoggerFactory.LOG_DIR: | |
return logging.StreamHandler() | |
if log_dir: | |
logger_name_key = logger_name + "_" + log_dir | |
else: | |
logger_name_key = logger_name + "_" + LoggerFactory.LOG_DIR | |
# if loggerName not in LoggerFactory.globalHandlerDict: | |
if logger_name_key not in LoggerFactory.global_handler_dict: | |
with LoggerFactory.lock: | |
if logger_name_key not in LoggerFactory.global_handler_dict: | |
handler = LoggerFactory.get_handler( | |
logger_name, level, log_dir) | |
LoggerFactory.global_handler_dict[logger_name_key] = handler | |
return LoggerFactory.global_handler_dict[logger_name_key] | |
def get_handler(class_name, level=None, log_dir=None, | |
log_type=None, job_id=None): | |
if not log_type: | |
if not LoggerFactory.LOG_DIR or not class_name: | |
return logging.StreamHandler() | |
# return Diy_StreamHandler() | |
if not log_dir: | |
log_file = os.path.join( | |
LoggerFactory.LOG_DIR, | |
"{}.log".format(class_name)) | |
else: | |
log_file = os.path.join(log_dir, "{}.log".format(class_name)) | |
else: | |
log_file = os.path.join(log_dir, "rag_flow_{}.log".format( | |
log_type) if level == LoggerFactory.LEVEL else 'rag_flow_{}_error.log'.format(log_type)) | |
os.makedirs(os.path.dirname(log_file), exist_ok=True) | |
if LoggerFactory.log_share: | |
handler = ROpenHandler(log_file, | |
when='D', | |
interval=1, | |
backupCount=14, | |
delay=True) | |
else: | |
handler = TimedRotatingFileHandler(log_file, | |
when='D', | |
interval=1, | |
backupCount=14, | |
delay=True) | |
if level: | |
handler.level = level | |
return handler | |
def init_logger(class_name): | |
with LoggerFactory.lock: | |
logger = LoggerFactory.new_logger(class_name) | |
handler = None | |
if class_name: | |
handler = LoggerFactory.get_handler(class_name) | |
logger.addHandler(handler) | |
LoggerFactory.logger_dict[class_name] = logger, handler | |
else: | |
LoggerFactory.logger_dict["default"] = logger, handler | |
LoggerFactory.assemble_global_handler(logger) | |
return logger, handler | |
def assemble_global_handler(logger): | |
if LoggerFactory.LOG_DIR: | |
for level in LoggerFactory.levels: | |
if level >= LoggerFactory.LEVEL: | |
level_logger_name = logging._levelToName[level] | |
logger.addHandler( | |
LoggerFactory.get_global_handler( | |
level_logger_name, level)) | |
if LoggerFactory.append_to_parent_log and LoggerFactory.PARENT_LOG_DIR: | |
for level in LoggerFactory.levels: | |
if level >= LoggerFactory.LEVEL: | |
level_logger_name = logging._levelToName[level] | |
logger.addHandler( | |
LoggerFactory.get_global_handler(level_logger_name, level, LoggerFactory.PARENT_LOG_DIR)) | |
def setDirectory(directory=None): | |
LoggerFactory.set_directory(directory) | |
def setLevel(level): | |
LoggerFactory.LEVEL = level | |
def getLogger(className=None, useLevelFile=False): | |
if className is None: | |
frame = inspect.stack()[1] | |
module = inspect.getmodule(frame[0]) | |
className = 'stat' | |
return LoggerFactory.get_logger(className) | |
def exception_to_trace_string(ex): | |
return "".join(traceback.TracebackException.from_exception(ex).format()) | |
class ROpenHandler(TimedRotatingFileHandler): | |
def _open(self): | |
prevumask = os.umask(000) | |
rtv = TimedRotatingFileHandler._open(self) | |
os.umask(prevumask) | |
return rtv | |
def sql_logger(job_id='', log_type='sql'): | |
key = job_id + log_type | |
if key in LoggerFactory.schedule_logger_dict.keys(): | |
return LoggerFactory.schedule_logger_dict[key] | |
return get_job_logger(job_id=job_id, log_type=log_type) | |
def ready_log(msg, job=None, task=None, role=None, party_id=None, detail=None): | |
prefix, suffix = base_msg(job, task, role, party_id, detail) | |
return f"{prefix}{msg} ready{suffix}" | |
def start_log(msg, job=None, task=None, role=None, party_id=None, detail=None): | |
prefix, suffix = base_msg(job, task, role, party_id, detail) | |
return f"{prefix}start to {msg}{suffix}" | |
def successful_log(msg, job=None, task=None, role=None, | |
party_id=None, detail=None): | |
prefix, suffix = base_msg(job, task, role, party_id, detail) | |
return f"{prefix}{msg} successfully{suffix}" | |
def warning_log(msg, job=None, task=None, role=None, | |
party_id=None, detail=None): | |
prefix, suffix = base_msg(job, task, role, party_id, detail) | |
return f"{prefix}{msg} is not effective{suffix}" | |
def failed_log(msg, job=None, task=None, role=None, | |
party_id=None, detail=None): | |
prefix, suffix = base_msg(job, task, role, party_id, detail) | |
return f"{prefix}failed to {msg}{suffix}" | |
def base_msg(job=None, task=None, role: str = None, | |
party_id: typing.Union[str, int] = None, detail=None): | |
if detail: | |
detail_msg = f" detail: \n{detail}" | |
else: | |
detail_msg = "" | |
if task is not None: | |
return f"task {task.f_task_id} {task.f_task_version} ", f" on {task.f_role} {task.f_party_id}{detail_msg}" | |
elif job is not None: | |
return "", f" on {job.f_role} {job.f_party_id}{detail_msg}" | |
elif role and party_id: | |
return "", f" on {role} {party_id}{detail_msg}" | |
else: | |
return "", f"{detail_msg}" | |
def exception_to_trace_string(ex): | |
return "".join(traceback.TracebackException.from_exception(ex).format()) | |
def get_logger_base_dir(): | |
job_log_dir = file_utils.get_rag_flow_directory('logs') | |
return job_log_dir | |
def get_job_logger(job_id, log_type): | |
rag_flow_log_dir = file_utils.get_rag_flow_directory('logs', 'rag_flow') | |
job_log_dir = file_utils.get_rag_flow_directory('logs', job_id) | |
if not job_id: | |
log_dirs = [rag_flow_log_dir] | |
else: | |
if log_type == 'audit': | |
log_dirs = [job_log_dir, rag_flow_log_dir] | |
else: | |
log_dirs = [job_log_dir] | |
if LoggerFactory.log_share: | |
oldmask = os.umask(000) | |
os.makedirs(job_log_dir, exist_ok=True) | |
os.makedirs(rag_flow_log_dir, exist_ok=True) | |
os.umask(oldmask) | |
else: | |
os.makedirs(job_log_dir, exist_ok=True) | |
os.makedirs(rag_flow_log_dir, exist_ok=True) | |
logger = LoggerFactory.new_logger(f"{job_id}_{log_type}") | |
for job_log_dir in log_dirs: | |
handler = LoggerFactory.get_handler(class_name=None, level=LoggerFactory.LEVEL, | |
log_dir=job_log_dir, log_type=log_type, job_id=job_id) | |
error_handler = LoggerFactory.get_handler( | |
class_name=None, | |
level=logging.ERROR, | |
log_dir=job_log_dir, | |
log_type=log_type, | |
job_id=job_id) | |
logger.addHandler(handler) | |
logger.addHandler(error_handler) | |
with LoggerFactory.lock: | |
LoggerFactory.schedule_logger_dict[job_id + log_type] = logger | |
return logger | |