Spaces:
Runtime error
Runtime error
File size: 1,432 Bytes
bb59984 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
from threading import Lock
class SingletonMeta(type):
"""
This is a thread-safe implementation of Singleton.
"""
_instances = {}
_lock: Lock = Lock()
"""
We now have a lock object that will be used to synchronize threads during
first access to the Singleton.
"""
def __call__(cls, *args, **kwargs):
"""
Possible changes to the value of the `__init__` argument do not affect
the returned instance.
"""
# Now, imagine that the program has just been launched. Since there's no
# Singleton instance yet, multiple threads can simultaneously pass the
# previous conditional and reach this point almost at the same time. The
# first of them will acquire lock and will proceed further, while the
# rest will wait here.
with cls._lock:
# The first thread to acquire the lock, reaches this conditional,
# goes inside and creates the Singleton instance. Once it leaves the
# lock block, a thread that might have been waiting for the lock
# release may then enter this section. But since the Singleton field
# is already initialized, the thread won't create a new object.
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]
|