|
const clearPendingReq = require('../../cache/clearPendingReq'); |
|
const { logViolation, getLogStores } = require('../../cache'); |
|
const denyRequest = require('./denyRequest'); |
|
|
|
const { |
|
USE_REDIS, |
|
CONCURRENT_MESSAGE_MAX = 1, |
|
CONCURRENT_VIOLATION_SCORE: score, |
|
} = process.env ?? {}; |
|
const ttl = 1000 * 60 * 1; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const concurrentLimiter = async (req, res, next) => { |
|
const namespace = 'pending_req'; |
|
const cache = getLogStores(namespace); |
|
if (!cache) { |
|
return next(); |
|
} |
|
|
|
if (Object.keys(req?.body ?? {}).length === 1 && req?.body?.abortKey) { |
|
return next(); |
|
} |
|
|
|
const userId = req.user?.id ?? req.user?._id ?? ''; |
|
const limit = Math.max(CONCURRENT_MESSAGE_MAX, 1); |
|
const type = 'concurrent'; |
|
|
|
const key = `${USE_REDIS ? namespace : ''}:${userId}`; |
|
const pendingRequests = +((await cache.get(key)) ?? 0); |
|
|
|
if (pendingRequests >= limit) { |
|
const errorMessage = { |
|
type, |
|
limit, |
|
pendingRequests, |
|
}; |
|
|
|
await logViolation(req, res, type, errorMessage, score); |
|
return await denyRequest(req, res, errorMessage); |
|
} else { |
|
await cache.set(key, pendingRequests + 1, ttl); |
|
} |
|
|
|
|
|
let cleared = false; |
|
const cleanUp = async () => { |
|
if (cleared) { |
|
return; |
|
} |
|
cleared = true; |
|
await clearPendingReq({ userId, cache }); |
|
}; |
|
|
|
if (pendingRequests < limit) { |
|
res.on('finish', cleanUp); |
|
res.on('close', cleanUp); |
|
} |
|
|
|
next(); |
|
}; |
|
|
|
module.exports = concurrentLimiter; |
|
|