File size: 1,406 Bytes
4051191
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import asyncio
from typing import List, Tuple, Any, Set


class AQueue:
    def __init__(self, maxsize=None):
        self._queue = []  # type: List[Tuple[Any, int]]
        self._mask = set()  # type: Set[int]
        self._put_lock = asyncio.Lock()
        self._get_lock = asyncio.Lock()
        self._not_empty = asyncio.Event()

    async def put(self, item: Any, lock: int):
        async with self._put_lock:
            self._queue.append((item, lock))
            if lock not in self._mask:
                self._not_empty.set()

    async def get(self, worker_id):
        async with self._get_lock:
            await self._not_empty.wait()
            available = [i for i, (_, i_lock) in enumerate(self._queue) if i_lock not in self._mask]
            item, lock = self._queue.pop(available[0])
            self.acquire(lock)
            available = [i for i, (_, i_lock) in enumerate(self._queue) if i_lock not in self._mask]
            if not available:
                self._not_empty.clear()
            return item, lock

    def acquire(self, lock: int):
        self._mask.add(lock)

    def release(self, lock: int):
        self._mask.remove(lock)
        for _, i_lock in self._queue:
            if i_lock == lock:
                self._not_empty.set()
                break

    def qsize(self):
        return len(self._queue)

    def empty(self):
        return not self._queue