File size: 4,331 Bytes
3d3d712
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import logging
import os
from dataclasses import dataclass
from typing import Any, Dict

from injector import Module, provider

from taskweaver.config.module_config import ModuleConfig

# from .log_file import dump_log_file


class LoggingModuleConfig(ModuleConfig):
    def _configure(self) -> None:
        self._set_name("logging")

        import os

        app_dir = self.src.app_base_path

        self.remote = self._get_bool("remote", False)
        self.app_insights_connection_string = self._get_str(
            "appinsights_connection_string",
            None if self.remote else "",
        )
        self.injector = self._get_bool("injector", False)
        self.log_folder = self._get_str("log_folder", "logs")
        self.log_file = self._get_str("log_file", "task_weaver.log")
        self.log_full_path = os.path.join(app_dir, self.log_folder, self.log_file)


@dataclass
class TelemetryLogger:
    is_remote: bool
    logger: logging.Logger

    def telemetry_logging(
        self,
        telemetry_log_message: str,
        telemetry_log_content: Dict[str, Any],
    ):
        try:
            properties = {"custom_dimensions": telemetry_log_content}
            self.logger.warning(telemetry_log_message, extra=properties)
        except Exception as e:
            self.logger.error(f"Error in telemetry: {str(e)}")

    def dump_log_file(self, obj: Any, file_path: str):
        if isinstance(obj, (list, dict)):
            dumped_obj: Any = obj
        elif hasattr(obj, "to_dict"):
            dumped_obj = obj.to_dict()
        else:
            raise Exception(
                f"Object {obj} does not have to_dict method and also not a list or dict",
            )

        if not self.is_remote:
            import json

            with open(file_path, "w") as log_file:
                json.dump(dumped_obj, log_file)
        else:
            self.telemetry_logging(
                telemetry_log_message=file_path,
                telemetry_log_content=dumped_obj,
            )

    def info(self, msg: str, *args: Any, **kwargs: Any):
        self.logger.info(msg, *args, **kwargs)

    def warning(self, msg: str, *args: Any, **kwargs: Any):
        self.logger.warning(msg, *args, **kwargs)

    def error(self, msg: str, *args: Any, **kwargs: Any):
        self.logger.error(msg, *args, **kwargs)

    def debug(self, msg: str, *args: Any, **kwargs: Any):
        self.logger.debug(msg, *args, **kwargs)


class LoggingModule(Module):
    @provider
    def provide_logger(self, config: LoggingModuleConfig) -> logging.Logger:
        logger = logging.getLogger(__name__)

        logger.setLevel(logging.INFO)

        if not any(isinstance(handler, logging.FileHandler) for handler in logger.handlers):
            if not os.path.exists(config.log_full_path):
                os.makedirs(os.path.dirname(config.log_full_path), exist_ok=True)
                open(config.log_full_path, "w").close()
            file_handler = logging.FileHandler(config.log_full_path)
            file_handler.setLevel(logging.INFO)
            log_format = "%(asctime)s - %(levelname)s - %(message)s"
            formatter = logging.Formatter(log_format)
            file_handler.setFormatter(formatter)
            logger.addHandler(file_handler)

        if config.injector:
            logging.getLogger("injector").setLevel(logging.INFO)

        return logger

    @provider
    def configure_remote_logging(
        self,
        config: LoggingModuleConfig,
        app_logger: logging.Logger,
    ) -> TelemetryLogger:
        if config.remote is not True:
            return TelemetryLogger(logger=app_logger, is_remote=False)
        telemetry_logger = logging.getLogger(__name__ + "_telemetry")

        from opencensus.ext.azure.log_exporter import AzureLogHandler  # type: ignore

        az_appinsights_connection_string = config.app_insights_connection_string
        assert (
            az_appinsights_connection_string is not None
        ), "az appinsights connection string must be set for remote logging mode"
        telemetry_logger = logging.getLogger(__name__ + "_telemetry")
        telemetry_logger.addHandler(
            AzureLogHandler(connection_string=az_appinsights_connection_string),
        )
        return TelemetryLogger(logger=telemetry_logger, is_remote=True)