File size: 11,647 Bytes
ea53ae2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
"""
依赖注入容器
提供统一的依赖管理和对象生命周期控制
"""

import inspect
from abc import ABC, abstractmethod
from typing import Any, Dict, Type, TypeVar, Callable, Optional, List, Union
from dataclasses import dataclass
from enum import Enum
import threading


T = TypeVar('T')


class ServiceLifetime(Enum):
    """服务生命周期枚举"""
    SINGLETON = "singleton"    # 单例模式
    TRANSIENT = "transient"   # 每次创建新实例
    SCOPED = "scoped"         # 作用域内单例


@dataclass
class ServiceDescriptor:
    """服务描述符"""
    service_type: Type
    implementation_type: Optional[Type] = None
    factory: Optional[Callable] = None
    instance: Optional[Any] = None
    lifetime: ServiceLifetime = ServiceLifetime.TRANSIENT
    dependencies: Optional[List[Type]] = None


class IDependencyContainer(ABC):
    """依赖注入容器抽象接口"""

    @abstractmethod
    def register_singleton(self, service_type: Type[T], implementation_type: Type[T] = None, factory: Callable[[], T] = None) -> None:
        """注册单例服务"""
        pass

    @abstractmethod
    def register_transient(self, service_type: Type[T], implementation_type: Type[T] = None, factory: Callable[[], T] = None) -> None:
        """注册瞬态服务"""
        pass

    @abstractmethod
    def register_scoped(self, service_type: Type[T], implementation_type: Type[T] = None, factory: Callable[[], T] = None) -> None:
        """注册作用域服务"""
        pass

    @abstractmethod
    def register_instance(self, service_type: Type[T], instance: T) -> None:
        """注册实例"""
        pass

    @abstractmethod
    def resolve(self, service_type: Type[T]) -> T:
        """解析服务"""
        pass

    @abstractmethod
    def try_resolve(self, service_type: Type[T]) -> Optional[T]:
        """尝试解析服务,失败返回None"""
        pass


class DependencyContainer(IDependencyContainer):
    """依赖注入容器实现类"""

    def __init__(self):
        """初始化依赖注入容器"""
        self._services: Dict[Type, ServiceDescriptor] = {}
        self._singletons: Dict[Type, Any] = {}
        self._scoped_instances: Dict[Type, Any] = {}
        self._lock = threading.RLock()
        self._resolving_stack: List[Type] = []  # 用于检测循环依赖

    def register_singleton(self, service_type: Type[T], implementation_type: Type[T] = None, factory: Callable[[], T] = None) -> None:
        """注册单例服务"""
        with self._lock:
            descriptor = ServiceDescriptor(
                service_type=service_type,
                implementation_type=implementation_type or service_type,
                factory=factory,
                lifetime=ServiceLifetime.SINGLETON
            )
            self._services[service_type] = descriptor

    def register_transient(self, service_type: Type[T], implementation_type: Type[T] = None, factory: Callable[[], T] = None) -> None:
        """注册瞬态服务"""
        with self._lock:
            descriptor = ServiceDescriptor(
                service_type=service_type,
                implementation_type=implementation_type or service_type,
                factory=factory,
                lifetime=ServiceLifetime.TRANSIENT
            )
            self._services[service_type] = descriptor

    def register_scoped(self, service_type: Type[T], implementation_type: Type[T] = None, factory: Callable[[], T] = None) -> None:
        """注册作用域服务"""
        with self._lock:
            descriptor = ServiceDescriptor(
                service_type=service_type,
                implementation_type=implementation_type or service_type,
                factory=factory,
                lifetime=ServiceLifetime.SCOPED
            )
            self._services[service_type] = descriptor

    def register_instance(self, service_type: Type[T], instance: T) -> None:
        """注册实例"""
        with self._lock:
            descriptor = ServiceDescriptor(
                service_type=service_type,
                instance=instance,
                lifetime=ServiceLifetime.SINGLETON
            )
            self._services[service_type] = descriptor
            self._singletons[service_type] = instance

    def resolve(self, service_type: Type[T]) -> T:
        """解析服务"""
        instance = self.try_resolve(service_type)
        if instance is None:
            raise ValueError(f"服务类型 {service_type.__name__} 未注册")
        return instance

    def try_resolve(self, service_type: Type[T]) -> Optional[T]:
        """尝试解析服务,失败返回None"""
        with self._lock:
            # 检查循环依赖
            if service_type in self._resolving_stack:
                cycle_path = " -> ".join([t.__name__ for t in self._resolving_stack])
                raise ValueError(f"检测到循环依赖: {cycle_path} -> {service_type.__name__}")

            if service_type not in self._services:
                return None

            descriptor = self._services[service_type]

            # 检查是否已有实例
            if descriptor.lifetime == ServiceLifetime.SINGLETON:
                if service_type in self._singletons:
                    return self._singletons[service_type]
            elif descriptor.lifetime == ServiceLifetime.SCOPED:
                if service_type in self._scoped_instances:
                    return self._scoped_instances[service_type]

            # 创建新实例
            self._resolving_stack.append(service_type)
            try:
                instance = self._create_instance(descriptor)

                # 缓存实例
                if descriptor.lifetime == ServiceLifetime.SINGLETON:
                    self._singletons[service_type] = instance
                elif descriptor.lifetime == ServiceLifetime.SCOPED:
                    self._scoped_instances[service_type] = instance

                return instance
            finally:
                self._resolving_stack.remove(service_type)

    def _create_instance(self, descriptor: ServiceDescriptor) -> Any:
        """创建实例"""
        # 如果已有实例,直接返回
        if descriptor.instance is not None:
            return descriptor.instance

        # 如果有工厂方法,使用工厂方法创建
        if descriptor.factory is not None:
            return descriptor.factory()

        # 使用构造函数创建实例
        implementation_type = descriptor.implementation_type
        if implementation_type is None:
            raise ValueError(f"服务 {descriptor.service_type.__name__} 没有实现类型或工厂方法")

        # 获取构造函数参数
        constructor = implementation_type.__init__
        signature = inspect.signature(constructor)
        parameters = list(signature.parameters.values())[1:]  # 跳过self参数

        # 解析依赖
        args = []
        for param in parameters:
            if param.annotation == inspect.Parameter.empty:
                raise ValueError(f"构造函数参数 {param.name} 缺少类型注解")

            dependency = self.try_resolve(param.annotation)
            if dependency is None:
                if param.default != inspect.Parameter.empty:
                    # 使用默认值
                    continue
                else:
                    raise ValueError(f"无法解析依赖 {param.annotation.__name__}")

            args.append(dependency)

        # 创建实例
        return implementation_type(*args)

    def clear_scoped(self) -> None:
        """清除作用域实例"""
        with self._lock:
            self._scoped_instances.clear()

    def get_service_info(self) -> Dict[str, Dict[str, Any]]:
        """获取服务注册信息"""
        with self._lock:
            service_info = {}
            for service_type, descriptor in self._services.items():
                service_info[service_type.__name__] = {
                    "implementation": descriptor.implementation_type.__name__ if descriptor.implementation_type else "Factory",
                    "lifetime": descriptor.lifetime.value,
                    "has_instance": descriptor.instance is not None,
                    "is_singleton_created": service_type in self._singletons,
                    "is_scoped_active": service_type in self._scoped_instances,
                }
            return service_info


class ServiceScope:
    """服务作用域上下文管理器"""

    def __init__(self, container: DependencyContainer):
        """初始化服务作用域

        Args:
            container: 依赖注入容器
        """
        self.container = container

    def __enter__(self):
        """进入作用域"""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """退出作用域,清理作用域实例"""
        self.container.clear_scoped()


def autowired(container: IDependencyContainer):
    """自动装配装饰器"""
    def decorator(cls):
        # 检查构造函数
        constructor = cls.__init__
        signature = inspect.signature(constructor)
        parameters = list(signature.parameters.values())[1:]  # 跳过self参数

        original_init = cls.__init__

        def new_init(self, *args, **kwargs):
            # 如果提供了参数,直接使用
            if args or kwargs:
                original_init(self, *args, **kwargs)
                return

            # 自动解析依赖
            resolved_args = []
            for param in parameters:
                if param.annotation == inspect.Parameter.empty:
                    raise ValueError(f"构造函数参数 {param.name} 缺少类型注解")

                dependency = container.resolve(param.annotation)
                resolved_args.append(dependency)

            original_init(self, *resolved_args)

        cls.__init__ = new_init
        return cls

    return decorator


# 创建全局容器单例
_container: Optional[DependencyContainer] = None


def get_container() -> DependencyContainer:
    """获取全局依赖注入容器"""
    global _container
    if _container is None:
        _container = DependencyContainer()
    return _container


def create_container() -> DependencyContainer:
    """创建新的依赖注入容器(主要用于测试)"""
    return DependencyContainer()


# 快捷方法
def register_singleton(service_type: Type[T], implementation_type: Type[T] = None, factory: Callable[[], T] = None) -> None:
    """在全局容器中注册单例服务"""
    get_container().register_singleton(service_type, implementation_type, factory)


def register_transient(service_type: Type[T], implementation_type: Type[T] = None, factory: Callable[[], T] = None) -> None:
    """在全局容器中注册瞬态服务"""
    get_container().register_transient(service_type, implementation_type, factory)


def register_scoped(service_type: Type[T], implementation_type: Type[T] = None, factory: Callable[[], T] = None) -> None:
    """在全局容器中注册作用域服务"""
    get_container().register_scoped(service_type, implementation_type, factory)


def register_instance(service_type: Type[T], instance: T) -> None:
    """在全局容器中注册实例"""
    get_container().register_instance(service_type, instance)


def resolve(service_type: Type[T]) -> T:
    """从全局容器解析服务"""
    return get_container().resolve(service_type)


def try_resolve(service_type: Type[T]) -> Optional[T]:
    """从全局容器尝试解析服务"""
    return get_container().try_resolve(service_type)