File size: 3,954 Bytes
add8f0b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import inspect
import logging
from types import TracebackType
from typing import Any, Dict, Optional, Type

from ._models import Request


class Trace:
    def __init__(
        self,
        name: str,
        logger: logging.Logger,
        request: Optional[Request] = None,
        kwargs: Optional[Dict[str, Any]] = None,
    ) -> None:
        self.name = name
        self.logger = logger
        self.trace_extension = (
            None if request is None else request.extensions.get("trace")
        )
        self.debug = self.logger.isEnabledFor(logging.DEBUG)
        self.kwargs = kwargs or {}
        self.return_value: Any = None
        self.should_trace = self.debug or self.trace_extension is not None
        self.prefix = self.logger.name.split(".")[-1]

    def trace(self, name: str, info: Dict[str, Any]) -> None:
        if self.trace_extension is not None:
            prefix_and_name = f"{self.prefix}.{name}"
            ret = self.trace_extension(prefix_and_name, info)
            if inspect.iscoroutine(ret):  # pragma: no cover
                raise TypeError(
                    "If you are using a synchronous interface, "
                    "the callback of the `trace` extension should "
                    "be a normal function instead of an asynchronous function."
                )

        if self.debug:
            if not info or "return_value" in info and info["return_value"] is None:
                message = name
            else:
                args = " ".join([f"{key}={value!r}" for key, value in info.items()])
                message = f"{name} {args}"
            self.logger.debug(message)

    def __enter__(self) -> "Trace":
        if self.should_trace:
            info = self.kwargs
            self.trace(f"{self.name}.started", info)
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]] = None,
        exc_value: Optional[BaseException] = None,
        traceback: Optional[TracebackType] = None,
    ) -> None:
        if self.should_trace:
            if exc_value is None:
                info = {"return_value": self.return_value}
                self.trace(f"{self.name}.complete", info)
            else:
                info = {"exception": exc_value}
                self.trace(f"{self.name}.failed", info)

    async def atrace(self, name: str, info: Dict[str, Any]) -> None:
        if self.trace_extension is not None:
            prefix_and_name = f"{self.prefix}.{name}"
            coro = self.trace_extension(prefix_and_name, info)
            if not inspect.iscoroutine(coro):  # pragma: no cover
                raise TypeError(
                    "If you're using an asynchronous interface, "
                    "the callback of the `trace` extension should "
                    "be an asynchronous function rather than a normal function."
                )
            await coro

        if self.debug:
            if not info or "return_value" in info and info["return_value"] is None:
                message = name
            else:
                args = " ".join([f"{key}={value!r}" for key, value in info.items()])
                message = f"{name} {args}"
            self.logger.debug(message)

    async def __aenter__(self) -> "Trace":
        if self.should_trace:
            info = self.kwargs
            await self.atrace(f"{self.name}.started", info)
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]] = None,
        exc_value: Optional[BaseException] = None,
        traceback: Optional[TracebackType] = None,
    ) -> None:
        if self.should_trace:
            if exc_value is None:
                info = {"return_value": self.return_value}
                await self.atrace(f"{self.name}.complete", info)
            else:
                info = {"exception": exc_value}
                await self.atrace(f"{self.name}.failed", info)