tskwvr / taskweaver /logging /__init__.py
TRaw's picture
Upload 297 files
3d3d712
import logging
import os
from dataclasses import dataclass
from typing import Any, Dict
from injector import Module, provider
from taskweaver.config.module_config import ModuleConfig
# from .log_file import dump_log_file
class LoggingModuleConfig(ModuleConfig):
def _configure(self) -> None:
self._set_name("logging")
import os
app_dir = self.src.app_base_path
self.remote = self._get_bool("remote", False)
self.app_insights_connection_string = self._get_str(
"appinsights_connection_string",
None if self.remote else "",
)
self.injector = self._get_bool("injector", False)
self.log_folder = self._get_str("log_folder", "logs")
self.log_file = self._get_str("log_file", "task_weaver.log")
self.log_full_path = os.path.join(app_dir, self.log_folder, self.log_file)
@dataclass
class TelemetryLogger:
is_remote: bool
logger: logging.Logger
def telemetry_logging(
self,
telemetry_log_message: str,
telemetry_log_content: Dict[str, Any],
):
try:
properties = {"custom_dimensions": telemetry_log_content}
self.logger.warning(telemetry_log_message, extra=properties)
except Exception as e:
self.logger.error(f"Error in telemetry: {str(e)}")
def dump_log_file(self, obj: Any, file_path: str):
if isinstance(obj, (list, dict)):
dumped_obj: Any = obj
elif hasattr(obj, "to_dict"):
dumped_obj = obj.to_dict()
else:
raise Exception(
f"Object {obj} does not have to_dict method and also not a list or dict",
)
if not self.is_remote:
import json
with open(file_path, "w") as log_file:
json.dump(dumped_obj, log_file)
else:
self.telemetry_logging(
telemetry_log_message=file_path,
telemetry_log_content=dumped_obj,
)
def info(self, msg: str, *args: Any, **kwargs: Any):
self.logger.info(msg, *args, **kwargs)
def warning(self, msg: str, *args: Any, **kwargs: Any):
self.logger.warning(msg, *args, **kwargs)
def error(self, msg: str, *args: Any, **kwargs: Any):
self.logger.error(msg, *args, **kwargs)
def debug(self, msg: str, *args: Any, **kwargs: Any):
self.logger.debug(msg, *args, **kwargs)
class LoggingModule(Module):
@provider
def provide_logger(self, config: LoggingModuleConfig) -> logging.Logger:
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
if not any(isinstance(handler, logging.FileHandler) for handler in logger.handlers):
if not os.path.exists(config.log_full_path):
os.makedirs(os.path.dirname(config.log_full_path), exist_ok=True)
open(config.log_full_path, "w").close()
file_handler = logging.FileHandler(config.log_full_path)
file_handler.setLevel(logging.INFO)
log_format = "%(asctime)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(log_format)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
if config.injector:
logging.getLogger("injector").setLevel(logging.INFO)
return logger
@provider
def configure_remote_logging(
self,
config: LoggingModuleConfig,
app_logger: logging.Logger,
) -> TelemetryLogger:
if config.remote is not True:
return TelemetryLogger(logger=app_logger, is_remote=False)
telemetry_logger = logging.getLogger(__name__ + "_telemetry")
from opencensus.ext.azure.log_exporter import AzureLogHandler # type: ignore
az_appinsights_connection_string = config.app_insights_connection_string
assert (
az_appinsights_connection_string is not None
), "az appinsights connection string must be set for remote logging mode"
telemetry_logger = logging.getLogger(__name__ + "_telemetry")
telemetry_logger.addHandler(
AzureLogHandler(connection_string=az_appinsights_connection_string),
)
return TelemetryLogger(logger=telemetry_logger, is_remote=True)