#!/usr/bin/python
# -*- coding: utf-8 -*-
# Hive Appier Framework
# Copyright (c) 2008-2024 Hive Solutions Lda.
#
# This file is part of Hive Appier Framework.
#
# Hive Appier Framework is free software: you can redistribute it and/or modify
# it under the terms of the Apache License as published by the Apache
# Foundation, either version 2.0 of the License, or (at your option) any
# later version.
#
# Hive Appier Framework is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# Apache License for more details.
#
# You should have received a copy of the Apache License along with
# Hive Appier Framework. If not, see .
__author__ = "João Magalhães "
""" The author(s) of the module """
__copyright__ = "Copyright (c) 2008-2024 Hive Solutions Lda."
""" The copyright for the module """
__license__ = "Apache License, Version 2.0"
""" The license for the module """
import json
import socket
import inspect
import logging
import itertools
import threading
import collections
from . import common
from . import config
from . import legacy
LOGGING_FORMAT_T = "%%(asctime)s [%%(levelname)s] %s%%(message)s"
""" The format to be used for the logging operation in
the app, these operations are going to be handled by
multiple stream handlers """
LOGGING_FORMAT_TID_T = "%%(asctime)s [%%(levelname)s] %s[%%(thread)d] %%(message)s"
""" The format to be used for the logging operation in
the app, these operations are going to be handled by
multiple stream handlers, this version of the string
includes the thread identification number and should be
used for messages called from outside the main thread """
LOGGING_EXTRA = "[%(name)s] " if config.conf("LOGGING_EXTRA", cast=bool) else ""
""" The extra logging attributes that are going to be applied
to the format strings to obtain the final on the logging """
LOGGIGN_SYSLOG = '1 %%(asctime)s %%(hostname)s %s %%(process)d %%(thread)d \
[appierSDID@0 tid="%%(thread)d"] %%(json)s'
""" The format to be used for the message sent using the syslog
logger, should contain extra structured data """
MAX_LENGTH = 10000
""" The maximum amount of messages that are kept in
memory until they are discarded, avoid a very large
number for this value or else a large amount of memory
may be used for logging purposes """
SILENT = logging.CRITICAL + 1
""" The "artificial" silent level used to silent a logger
or an handler, this is used as an utility for debugging
purposes more that a real feature for production systems """
LEVELS = ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")
""" The sequence of levels from the least sever to the
most sever this sequence may be used to find all the
levels that are considered more sever that a level """
LEVEL_ALIAS = {
"DEBU": "DEBUG",
"WARN": "WARNING",
"INF": "INFO",
"ERR": "ERROR",
"CRIT": "CRITICAL",
}
""" Map defining a series of alias that may be used latter
for proper debug level resolution """
SYSLOG_PORTS = dict(tcp=601, udp=514)
""" Dictionary that maps the multiple transport protocol
used by syslog with the appropriate default ports """
LOGGING_FORMAT = LOGGING_FORMAT_T % LOGGING_EXTRA
LOGGING_FORMAT_TID = LOGGING_FORMAT_TID_T % LOGGING_EXTRA
class MemoryHandler(logging.Handler):
"""
Logging handler that is used to store information in
memory so that anyone else may consult it latter as
long as the execution session is the same.
"""
def __init__(self, level=logging.NOTSET, max_length=MAX_LENGTH):
logging.Handler.__init__(self, level=level)
self.max_length = max_length
self.messages = collections.deque()
self.messages_l = dict()
format = config.conf("LOGGING_FORMAT", None)
format_base = format or LOGGING_FORMAT
format_tid = format or LOGGING_FORMAT_TID
formatter = ThreadFormatter(format_base)
formatter.set_tid(format_tid)
self.setFormatter(formatter)
def get_messages_l(self, level):
# in case the level is not found in the list of levels
# it's not considered valid and so an empty list is returned
try:
index = LEVELS.index(level)
except Exception:
return collections.deque()
# retrieves the complete set of levels that are considered
# equal or more severe than the requested one
levels = LEVELS[: index + 1]
# creates the list that will hold the various message
# lists associated with the current severity level
messages_l = collections.deque()
# iterates over the complete set of levels considered
# equal or less sever to add the respective messages
# list to the list of message lists
for level in levels:
_messages_l = self.messages_l.get(level, None)
if _messages_l == None:
_messages_l = collections.deque()
self.messages_l[level] = _messages_l
messages_l.append(_messages_l)
# returns the complete set of messages lists that
# have a level equal or less severe that the one
# that has been requested by argument
return messages_l
def emit(self, record):
# formats the current record according to the defined
# logging rules so that we can used the resulting message
# for any logging purposes
message = self.format(record)
# retrieves the level (as a string) associated with
# the current record to emit and uses it to retrieve
# the associated messages list
level = record.levelname
messages_l = self.get_messages_l(level)
# inserts the message into the messages queue and in
# case the current length of the message queue overflows
# the one defined as maximum must pop message from queue
self.messages.appendleft(message)
messages_s = len(self.messages)
if messages_s > self.max_length:
self.messages.pop()
# iterates over all the messages list included in the retrieve
# messages list to add the logging message to each of them
for _messages_l in messages_l:
# inserts the message into the proper level specific queue
# and in case it overflows runs the same pop operation as
# specified also for the more general queue
_messages_l.appendleft(message)
messages_s = len(_messages_l)
if messages_s > self.max_length:
_messages_l.pop()
def clear(self):
self.messages = collections.deque()
self.messages_l = dict()
def get_latest(self, count=None, level=None):
count = count or 100
is_level = level and not legacy.is_string(level)
if is_level:
level = logging.getLevelName(level)
level = level.upper() if level else level
level = LEVEL_ALIAS.get(level, level)
messages = self.messages_l.get(level, []) if level else self.messages
slice = itertools.islice(messages, 0, count)
return list(slice)
def flush_to_file(self, path, count=None, level=None, reverse=True, clear=True):
messages = self.get_latest(level=level, count=count or 65536)
if not messages:
return
if reverse:
messages.reverse()
is_path = isinstance(path, legacy.STRINGS)
file = open(path, "wb") if is_path else path
try:
for message in messages:
message = legacy.bytes(message, "utf-8", force=True)
file.write(message + b"\n")
finally:
if is_path:
file.close()
if clear:
self.clear()
class BaseFormatter(logging.Formatter):
"""
The base Appier logging formatted used to add some extra
functionality on top of Python's base formatting infra-structure.
Most of its usage focus on empowering the base record object
with some extra values.
"""
def __init__(self, *args, **kwargs):
self._wrap = kwargs.pop("wrap", False)
logging.Formatter.__init__(self, *args, **kwargs)
@classmethod
def _wrap_record(cls, record):
if hasattr(record, "_wrapped"):
return
record.hostname = socket.gethostname()
record.json = json.dumps(
dict(
message=str(record.msg),
hostname=record.hostname,
lineno=record.lineno,
module=record.module,
callable=record.funcName,
level=record.levelname,
thread=record.thread,
process=record.process,
logger=record.name,
)
)
record._wrapped = True
def format(self, record):
# runs the wrapping operation on the record so that more
# information becomes available in it (as expected)
if self._wrap:
self.__class__._wrap_record(record)
# runs the basic format operation on the record so that
# it gets properly formatted into a plain string
return logging.Formatter.format(self, record)
class ThreadFormatter(BaseFormatter):
"""
Custom formatter class that changing the default format
behavior so that the thread identifier is printed when
the threading printing the log records is not the main one.
"""
def __init__(self, *args, **kwargs):
BaseFormatter.__init__(self, *args, **kwargs)
self._basefmt = BaseFormatter(*args, **kwargs)
self._tidfmt = BaseFormatter(*args, **kwargs)
def format(self, record):
# runs the wrapping operation on the record so that more
# information becomes available in it (as expected)
if self._wrap:
self.__class__._wrap_record(record)
# retrieves the reference to the current thread and verifies
# if it represent the current process main thread, then selects
# the appropriate formating string taking that into account
current = threading.current_thread()
is_main = current.name == "MainThread"
if not is_main:
return self._tidfmt.format(record)
return self._basefmt.format(record)
def set_base(self, value, *args, **kwargs):
self._basefmt = BaseFormatter(value, *args, **kwargs)
def set_tid(self, value, *args, **kwargs):
self._tidfmt = BaseFormatter(value, *args, **kwargs)
class DummyLogger(object):
def debug(self, object):
pass
def info(self, object):
pass
def warning(self, object):
pass
def error(self, object):
pass
def critical(self, object):
pass
def reload_format(app=None):
global LOGGING_FORMAT
global LOGGING_FORMAT_TID
app = app or common.base().get_app()
extra = LOGGING_EXTRA
if app and not app.is_parent():
extra = "[%(process)d] " + extra
LOGGING_FORMAT = LOGGING_FORMAT_T % extra
LOGGING_FORMAT_TID = LOGGING_FORMAT_TID_T % extra
def rotating_handler(
path="appier.log", max_bytes=1048576, max_log=5, encoding=None, delay=False
):
return logging.handlers.RotatingFileHandler(
path, maxBytes=max_bytes, backupCount=max_log, encoding=encoding, delay=delay
)
def smtp_handler(
host="localhost",
port=25,
sender="no-reply@appier.com",
receivers=[],
subject="Appier logging",
username=None,
password=None,
stls=False,
):
address = (host, port)
if username and password:
credentials = (username, password)
else:
credentials = None
has_secure = in_signature(logging.handlers.SMTPHandler.__init__, "secure")
if has_secure:
kwargs = dict(secure=() if stls else None)
else:
kwargs = dict()
return logging.handlers.SMTPHandler(
address, sender, receivers, subject, credentials=credentials, **kwargs
)
def in_signature(callable, name):
has_full = hasattr(inspect, "getfullargspec")
if has_full:
spec = inspect.getfullargspec(callable)
else:
spec = inspect.getargspec(callable)
args, _varargs, kwargs = spec[:3]
return (args and name in args) or (kwargs and "secure" in kwargs)