Spaces:
Sleeping
Sleeping
| import time | |
| from collections import defaultdict, deque | |
| from typing import Dict | |
| import threading | |
| class RateLimiter: | |
| def __init__(self, max_requests: int = 100, window_size: int = 60): | |
| """ | |
| Initialize rate limiter. | |
| Args: | |
| max_requests: Maximum number of requests allowed per window | |
| window_size: Time window in seconds | |
| """ | |
| self.max_requests = max_requests | |
| self.window_size = window_size | |
| self.requests = defaultdict(lambda: deque()) | |
| self._lock = threading.RLock() | |
| def is_allowed(self, key: str) -> bool: | |
| """ | |
| Check if a request is allowed for the given key. | |
| Args: | |
| key: Identifier for the client/user | |
| Returns: | |
| True if request is allowed, False otherwise | |
| """ | |
| with self._lock: | |
| current_time = time.time() | |
| requests = self.requests[key] | |
| # Remove old requests outside the time window | |
| while requests and current_time - requests[0] > self.window_size: | |
| requests.popleft() | |
| # Check if we're under the limit | |
| if len(requests) < self.max_requests: | |
| requests.append(current_time) | |
| return True | |
| else: | |
| return False | |
| def get_reset_time(self, key: str) -> float: | |
| """ | |
| Get the time when the rate limit will reset for the given key. | |
| Args: | |
| key: Identifier for the client/user | |
| Returns: | |
| Unix timestamp when the rate limit will reset | |
| """ | |
| with self._lock: | |
| if key in self.requests and len(self.requests[key]) > 0: | |
| oldest_request = self.requests[key][0] | |
| return oldest_request + self.window_size | |
| return time.time() | |
| # Global rate limiter instance | |
| event_publisher_rate_limiter = RateLimiter(max_requests=1000, window_size=60) # 1000 events per minute |