Taskflow-App / src /utils /rate_limiter.py
Tahasaif3's picture
'code
34e27fb
import time
from collections import defaultdict, deque
from typing import Dict
import threading
class RateLimiter:
def __init__(self, max_requests: int = 100, window_size: int = 60):
"""
Initialize rate limiter.
Args:
max_requests: Maximum number of requests allowed per window
window_size: Time window in seconds
"""
self.max_requests = max_requests
self.window_size = window_size
self.requests = defaultdict(lambda: deque())
self._lock = threading.RLock()
def is_allowed(self, key: str) -> bool:
"""
Check if a request is allowed for the given key.
Args:
key: Identifier for the client/user
Returns:
True if request is allowed, False otherwise
"""
with self._lock:
current_time = time.time()
requests = self.requests[key]
# Remove old requests outside the time window
while requests and current_time - requests[0] > self.window_size:
requests.popleft()
# Check if we're under the limit
if len(requests) < self.max_requests:
requests.append(current_time)
return True
else:
return False
def get_reset_time(self, key: str) -> float:
"""
Get the time when the rate limit will reset for the given key.
Args:
key: Identifier for the client/user
Returns:
Unix timestamp when the rate limit will reset
"""
with self._lock:
if key in self.requests and len(self.requests[key]) > 0:
oldest_request = self.requests[key][0]
return oldest_request + self.window_size
return time.time()
# Global rate limiter instance
event_publisher_rate_limiter = RateLimiter(max_requests=1000, window_size=60) # 1000 events per minute