| | import { keyvRedisClient } from '~/cache/redisClients'; |
| | import { cacheConfig as cache } from '~/cache/cacheConfig'; |
| | import { clusterConfig as cluster } from './config'; |
| | import { logger } from '@librechat/data-schemas'; |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | export class LeaderElection { |
| | |
| | static readonly LEADER_KEY = `${cache.REDIS_KEY_PREFIX}${cache.GLOBAL_PREFIX_SEPARATOR}LeadingServerUUID`; |
| | private static _instance = new LeaderElection(); |
| |
|
| | readonly UUID: string = crypto.randomUUID(); |
| | private refreshTimer: NodeJS.Timeout | undefined = undefined; |
| |
|
| | |
| | |
| | constructor() { |
| | if (LeaderElection._instance) return LeaderElection._instance; |
| |
|
| | process.on('SIGTERM', () => this.resign()); |
| | process.on('SIGINT', () => this.resign()); |
| | LeaderElection._instance = this; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | public async isLeader(): Promise<boolean> { |
| | if (!cache.USE_REDIS) return true; |
| |
|
| | try { |
| | const currentLeader = await LeaderElection.getLeaderUUID(); |
| | |
| | |
| | |
| | if (currentLeader === this.UUID) return this.refreshTimer != null; |
| | if (currentLeader != null) return false; |
| |
|
| | const delay = Math.random() * 2000; |
| | await new Promise((resolve) => setTimeout(resolve, delay)); |
| | return await this.electSelf(); |
| | } catch (error) { |
| | logger.error('Failed to check leadership status:', error); |
| | return false; |
| | } |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | public async resign(): Promise<void> { |
| | if (!cache.USE_REDIS) return; |
| |
|
| | try { |
| | this.clearRefreshTimer(); |
| |
|
| | |
| | const script = ` |
| | if redis.call("get", KEYS[1]) == ARGV[1] then |
| | redis.call("del", KEYS[1]) |
| | end |
| | `; |
| |
|
| | await keyvRedisClient!.eval(script, { |
| | keys: [LeaderElection.LEADER_KEY], |
| | arguments: [this.UUID], |
| | }); |
| | } catch (error) { |
| | logger.error('Failed to release leadership lock:', error); |
| | } |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | public static async getLeaderUUID(): Promise<string | null> { |
| | if (!cache.USE_REDIS) return null; |
| | return await keyvRedisClient!.get(LeaderElection.LEADER_KEY); |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | public clearRefreshTimer(): void { |
| | clearInterval(this.refreshTimer); |
| | this.refreshTimer = undefined; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | private async electSelf(): Promise<boolean> { |
| | try { |
| | const result = await keyvRedisClient!.set(LeaderElection.LEADER_KEY, this.UUID, { |
| | NX: true, |
| | EX: cluster.LEADER_LEASE_DURATION, |
| | }); |
| |
|
| | if (result !== 'OK') return false; |
| |
|
| | this.clearRefreshTimer(); |
| | this.refreshTimer = setInterval(async () => { |
| | await this.renewLeadership(); |
| | }, cluster.LEADER_RENEW_INTERVAL * 1000); |
| | this.refreshTimer.unref(); |
| |
|
| | return true; |
| | } catch (error) { |
| | logger.error('Leader election failed:', error); |
| | return false; |
| | } |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | private async renewLeadership(attempts: number = 1): Promise<void> { |
| | try { |
| | |
| | const script = ` |
| | if redis.call("get", KEYS[1]) == ARGV[1] then |
| | return redis.call("expire", KEYS[1], ARGV[2]) |
| | else |
| | return 0 |
| | end |
| | `; |
| |
|
| | const result = await keyvRedisClient!.eval(script, { |
| | keys: [LeaderElection.LEADER_KEY], |
| | arguments: [this.UUID, cluster.LEADER_LEASE_DURATION.toString()], |
| | }); |
| |
|
| | if (result === 0) { |
| | logger.warn('Lost leadership, clearing refresh timer'); |
| | this.clearRefreshTimer(); |
| | } |
| | } catch (error) { |
| | logger.error(`Failed to renew leadership (attempts No.${attempts}):`, error); |
| | if (attempts <= cluster.LEADER_RENEW_ATTEMPTS) { |
| | await new Promise((resolve) => |
| | setTimeout(resolve, cluster.LEADER_RENEW_RETRY_DELAY * 1000), |
| | ); |
| | await this.renewLeadership(attempts + 1); |
| | } else { |
| | logger.error('Exceeded maximum attempts to renew leadership.'); |
| | this.clearRefreshTimer(); |
| | } |
| | } |
| | } |
| | } |
| |
|
| | const defaultElection = new LeaderElection(); |
| | export const isLeader = (): Promise<boolean> => defaultElection.isLeader(); |
| |
|