File size: 4,871 Bytes
079c32c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
import os
import multiprocessing
import threading
import platform
from enum import Enum, unique
from pathlib import Path
if platform.system().lower() != 'windows':
import fcntl
else:
fcntl = None
@unique
class LockContextType(Enum):
"""
Overview:
Enum to express the type of the lock.
"""
THREAD_LOCK = 1
PROCESS_LOCK = 2
_LOCK_TYPE_MAPPING = {
LockContextType.THREAD_LOCK: threading.Lock,
LockContextType.PROCESS_LOCK: multiprocessing.Lock,
}
class LockContext(object):
"""
Overview:
Generate a LockContext in order to make sure the thread safety.
Interfaces:
``__init__``, ``__enter__``, ``__exit__``.
Example:
>>> with LockContext() as lock:
>>> print("Do something here.")
"""
def __init__(self, type_: LockContextType = LockContextType.THREAD_LOCK):
"""
Overview:
Init the lock according to the given type.
Arguments:
- type_ (:obj:`LockContextType`): The type of lock to be used. Defaults to LockContextType.THREAD_LOCK.
"""
self.lock = _LOCK_TYPE_MAPPING[type_]()
def acquire(self):
"""
Overview:
Acquires the lock.
"""
self.lock.acquire()
def release(self):
"""
Overview:
Releases the lock.
"""
self.lock.release()
def __enter__(self):
"""
Overview:
Enters the context and acquires the lock.
"""
self.lock.acquire()
def __exit__(self, *args, **kwargs):
"""
Overview:
Exits the context and releases the lock.
Arguments:
- args (:obj:`Tuple`): The arguments passed to the ``__exit__`` function.
- kwargs (:obj:`Dict`): The keyword arguments passed to the ``__exit__`` function.
"""
self.lock.release()
rw_lock_mapping = {}
def get_rw_file_lock(name: str, op: str):
"""
Overview:
Get generated file lock with name and operator
Arguments:
- name (:obj:`str`): Lock's name.
- op (:obj:`str`): Assigned operator, i.e. ``read`` or ``write``.
Returns:
- (:obj:`RWLockFairD`): Generated rwlock
"""
assert op in ['read', 'write']
try:
from readerwriterlock import rwlock
except ImportError:
import sys
from ditk import logging
logging.warning("Please install readerwriterlock first, such as `pip3 install readerwriterlock`.")
sys.exit(1)
if name not in rw_lock_mapping:
rw_lock_mapping[name] = rwlock.RWLockFairD()
lock = rw_lock_mapping[name]
if op == 'read':
return lock.gen_rlock()
elif op == 'write':
return lock.gen_wlock()
class FcntlContext:
"""
Overview:
A context manager that acquires an exclusive lock on a file using fcntl. \
This is useful for preventing multiple processes from running the same code.
Interfaces:
``__init__``, ``__enter__``, ``__exit__``.
Example:
>>> lock_path = "/path/to/lock/file"
>>> with FcntlContext(lock_path) as lock:
>>> # Perform operations while the lock is held
"""
def __init__(self, lock_path: str) -> None:
"""
Overview:
Initialize the LockHelper object.
Arguments:
- lock_path (:obj:`str`): The path to the lock file.
"""
self.lock_path = lock_path
self.f = None
def __enter__(self) -> None:
"""
Overview:
Acquires the lock and opens the lock file in write mode. \
If the lock file does not exist, it is created.
"""
assert self.f is None, self.lock_path
self.f = open(self.lock_path, 'w')
fcntl.flock(self.f.fileno(), fcntl.LOCK_EX)
def __exit__(self, *args, **kwargs) -> None:
"""
Overview:
Closes the file and releases any resources used by the lock_helper object.
Arguments:
- args (:obj:`Tuple`): The arguments passed to the ``__exit__`` function.
- kwargs (:obj:`Dict`): The keyword arguments passed to the ``__exit__`` function.
"""
self.f.close()
self.f = None
def get_file_lock(name: str, op: str) -> FcntlContext:
"""
Overview:
Acquires a file lock for the specified file. \
Arguments:
- name (:obj:`str`): The name of the file.
- op (:obj:`str`): The operation to perform on the file lock.
"""
if fcntl is None:
return get_rw_file_lock(name, op)
else:
lock_name = name + '.lock'
if not os.path.isfile(lock_name):
try:
Path(lock_name).touch()
except Exception as e:
pass
return FcntlContext(lock_name)
|