|
import time |
|
from threading import Lock |
|
from fastapi import HTTPException, Request |
|
|
|
rate_limit_data = {} |
|
rate_limit_lock = Lock() |
|
|
|
def protect_from_abuse(request: Request, max_requests_per_minute: int = 30, max_requests_per_day_per_ip: int = 600): |
|
now = int(time.time()) |
|
minute = now // 60 |
|
day = now // (60 * 60 * 24) |
|
|
|
minute_key = f"{request.url.path}:{minute}" |
|
day_key = f"{request.client.host}:{day}" |
|
|
|
with rate_limit_lock: |
|
minute_count, minute_timestamp = rate_limit_data.get( |
|
minute_key, (0, now)) |
|
if now - minute_timestamp >= 60: |
|
minute_count = 0 |
|
minute_timestamp = now |
|
minute_count += 1 |
|
rate_limit_data[minute_key] = (minute_count, minute_timestamp) |
|
|
|
day_count, day_timestamp = rate_limit_data.get(day_key, (0, now)) |
|
if now - day_timestamp >= 86400: |
|
day_count = 0 |
|
day_timestamp = now |
|
day_count += 1 |
|
rate_limit_data[day_key] = (day_count, day_timestamp) |
|
|
|
if minute_count > max_requests_per_minute: |
|
raise HTTPException(status_code=429, detail={ |
|
"message": "Too many requests per minute", "limit": max_requests_per_minute}) |
|
if day_count > max_requests_per_day_per_ip: |
|
raise HTTPException(status_code=429, detail={"message": "Too many requests per day from this IP", "limit": max_requests_per_day_per_ip}) |