File size: 2,233 Bytes
9705b6c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
const clearPendingReq = require('../../cache/clearPendingReq');
const { logViolation, getLogStores } = require('../../cache');
const denyRequest = require('./denyRequest');

const {
  USE_REDIS,
  CONCURRENT_MESSAGE_MAX = 1,
  CONCURRENT_VIOLATION_SCORE: score,
} = process.env ?? {};
const ttl = 1000 * 60 * 1;

/**
 * Middleware to limit concurrent requests for a user.
 *
 * This middleware checks if a user has exceeded a specified concurrent request limit.
 * If the user exceeds the limit, an error is returned. If the user is within the limit,
 * their request count is incremented. After the request is processed, the count is decremented.
 * If the `cache` store is not available, the middleware will skip its logic.
 *
 * @function
 * @param {Object} req - Express request object containing user information.
 * @param {Object} res - Express response object.
 * @param {function} next - Express next middleware function.
 * @throws {Error} Throws an error if the user exceeds the concurrent request limit.
 */
const concurrentLimiter = async (req, res, next) => {
  const namespace = 'pending_req';
  const cache = getLogStores(namespace);
  if (!cache) {
    return next();
  }

  if (Object.keys(req?.body ?? {}).length === 1 && req?.body?.abortKey) {
    return next();
  }

  const userId = req.user?.id ?? req.user?._id ?? '';
  const limit = Math.max(CONCURRENT_MESSAGE_MAX, 1);
  const type = 'concurrent';

  const key = `${USE_REDIS ? namespace : ''}:${userId}`;
  const pendingRequests = +((await cache.get(key)) ?? 0);

  if (pendingRequests >= limit) {
    const errorMessage = {
      type,
      limit,
      pendingRequests,
    };

    await logViolation(req, res, type, errorMessage, score);
    return await denyRequest(req, res, errorMessage);
  } else {
    await cache.set(key, pendingRequests + 1, ttl);
  }

  // Ensure the requests are removed from the store once the request is done
  let cleared = false;
  const cleanUp = async () => {
    if (cleared) {
      return;
    }
    cleared = true;
    await clearPendingReq({ userId, cache });
  };

  if (pendingRequests < limit) {
    res.on('finish', cleanUp);
    res.on('close', cleanUp);
  }

  next();
};

module.exports = concurrentLimiter;