|
import logging |
|
import os |
|
from dataclasses import dataclass |
|
from typing import Any, Dict |
|
|
|
from injector import Module, provider |
|
|
|
from taskweaver.config.module_config import ModuleConfig |
|
|
|
|
|
|
|
|
|
class LoggingModuleConfig(ModuleConfig): |
|
def _configure(self) -> None: |
|
self._set_name("logging") |
|
|
|
import os |
|
|
|
app_dir = self.src.app_base_path |
|
|
|
self.remote = self._get_bool("remote", False) |
|
self.app_insights_connection_string = self._get_str( |
|
"appinsights_connection_string", |
|
None if self.remote else "", |
|
) |
|
self.injector = self._get_bool("injector", False) |
|
self.log_folder = self._get_str("log_folder", "logs") |
|
self.log_file = self._get_str("log_file", "task_weaver.log") |
|
self.log_full_path = os.path.join(app_dir, self.log_folder, self.log_file) |
|
|
|
|
|
@dataclass |
|
class TelemetryLogger: |
|
is_remote: bool |
|
logger: logging.Logger |
|
|
|
def telemetry_logging( |
|
self, |
|
telemetry_log_message: str, |
|
telemetry_log_content: Dict[str, Any], |
|
): |
|
try: |
|
properties = {"custom_dimensions": telemetry_log_content} |
|
self.logger.warning(telemetry_log_message, extra=properties) |
|
except Exception as e: |
|
self.logger.error(f"Error in telemetry: {str(e)}") |
|
|
|
def dump_log_file(self, obj: Any, file_path: str): |
|
if isinstance(obj, (list, dict)): |
|
dumped_obj: Any = obj |
|
elif hasattr(obj, "to_dict"): |
|
dumped_obj = obj.to_dict() |
|
else: |
|
raise Exception( |
|
f"Object {obj} does not have to_dict method and also not a list or dict", |
|
) |
|
|
|
if not self.is_remote: |
|
import json |
|
|
|
with open(file_path, "w") as log_file: |
|
json.dump(dumped_obj, log_file) |
|
else: |
|
self.telemetry_logging( |
|
telemetry_log_message=file_path, |
|
telemetry_log_content=dumped_obj, |
|
) |
|
|
|
def info(self, msg: str, *args: Any, **kwargs: Any): |
|
self.logger.info(msg, *args, **kwargs) |
|
|
|
def warning(self, msg: str, *args: Any, **kwargs: Any): |
|
self.logger.warning(msg, *args, **kwargs) |
|
|
|
def error(self, msg: str, *args: Any, **kwargs: Any): |
|
self.logger.error(msg, *args, **kwargs) |
|
|
|
def debug(self, msg: str, *args: Any, **kwargs: Any): |
|
self.logger.debug(msg, *args, **kwargs) |
|
|
|
|
|
class LoggingModule(Module): |
|
@provider |
|
def provide_logger(self, config: LoggingModuleConfig) -> logging.Logger: |
|
logger = logging.getLogger(__name__) |
|
|
|
logger.setLevel(logging.INFO) |
|
|
|
if not any(isinstance(handler, logging.FileHandler) for handler in logger.handlers): |
|
if not os.path.exists(config.log_full_path): |
|
os.makedirs(os.path.dirname(config.log_full_path), exist_ok=True) |
|
open(config.log_full_path, "w").close() |
|
file_handler = logging.FileHandler(config.log_full_path) |
|
file_handler.setLevel(logging.INFO) |
|
log_format = "%(asctime)s - %(levelname)s - %(message)s" |
|
formatter = logging.Formatter(log_format) |
|
file_handler.setFormatter(formatter) |
|
logger.addHandler(file_handler) |
|
|
|
if config.injector: |
|
logging.getLogger("injector").setLevel(logging.INFO) |
|
|
|
return logger |
|
|
|
@provider |
|
def configure_remote_logging( |
|
self, |
|
config: LoggingModuleConfig, |
|
app_logger: logging.Logger, |
|
) -> TelemetryLogger: |
|
if config.remote is not True: |
|
return TelemetryLogger(logger=app_logger, is_remote=False) |
|
telemetry_logger = logging.getLogger(__name__ + "_telemetry") |
|
|
|
from opencensus.ext.azure.log_exporter import AzureLogHandler |
|
|
|
az_appinsights_connection_string = config.app_insights_connection_string |
|
assert ( |
|
az_appinsights_connection_string is not None |
|
), "az appinsights connection string must be set for remote logging mode" |
|
telemetry_logger = logging.getLogger(__name__ + "_telemetry") |
|
telemetry_logger.addHandler( |
|
AzureLogHandler(connection_string=az_appinsights_connection_string), |
|
) |
|
return TelemetryLogger(logger=telemetry_logger, is_remote=True) |
|
|