| import { FlowProducer, type Job, type Queue } from "bullmq"; |
| import { LRUCache } from "lru-cache"; |
| import type { |
| ActivityBucket, |
| ActivityStatsResponse, |
| CreateFlowChildRequest, |
| CreateFlowRequest, |
| DelayedJobInfo, |
| FlowNode, |
| FlowSummary, |
| HourlyBucket, |
| JobInfo, |
| JobStatus, |
| JobTags, |
| MetricsResponse, |
| OverviewStats, |
| PaginatedResponse, |
| QueueInfo, |
| RunInfo, |
| RunInfoList, |
| SchedulerInfo, |
| SearchResult, |
| SortOptions, |
| TestJobRequest, |
| } from "./types"; |
|
|
| |
| |
| |
| export class QueueManager { |
| private queues: Map<string, Queue> = new Map(); |
| private tagFields: string[] = []; |
| private flowProducer: FlowProducer | null = null; |
|
|
| |
| |
| private cache = new LRUCache<string, any>({ |
| max: 100, |
| ttl: 1000 * 60, |
| allowStale: false, |
| updateAgeOnGet: true, |
| }); |
|
|
| private readonly CACHE_TTL = { |
| metrics: 5 * 60 * 1000, |
| overview: 2 * 60 * 1000, |
| queues: 2 * 60 * 1000, |
| flows: 2 * 60 * 1000, |
| activity: 5 * 60 * 1000, |
| }; |
|
|
| constructor(queues: Queue[], tagFields: string[] = []) { |
| for (const queue of queues) { |
| this.queues.set(queue.name, queue); |
| } |
| this.tagFields = tagFields; |
|
|
| |
| const firstQueue = queues[0]; |
| if (firstQueue) { |
| const connection = firstQueue.opts?.connection; |
| if (connection) { |
| this.flowProducer = new FlowProducer({ connection }); |
| } |
| } |
| } |
|
|
| |
| |
| |
| private async cached<T>( |
| key: string, |
| ttl: number, |
| compute: () => Promise<T>, |
| ): Promise<T> { |
| const cached = this.cache.get(key); |
| if (cached !== undefined) { |
| return cached as T; |
| } |
|
|
| const data = await compute(); |
| this.cache.set(key, data, { ttl }); |
| return data; |
| } |
|
|
| |
| |
| |
| private async withTimeout<T>( |
| promise: Promise<T>, |
| timeoutMs: number, |
| errorMessage: string, |
| ): Promise<T> { |
| return Promise.race([ |
| promise, |
| new Promise<T>((_, reject) => |
| setTimeout(() => reject(new Error(errorMessage)), timeoutMs), |
| ), |
| ]); |
| } |
|
|
| |
| |
| |
| |
| private async getJobsByTimeRange( |
| queue: Queue, |
| status: "completed" | "failed", |
| startTime: number, |
| endTime: number, |
| limit: number, |
| ): Promise<Job[]> { |
| try { |
| |
| const client = (queue as any).client; |
| if (!client) { |
| |
| const jobs = await queue.getJobs([status], 0, limit * 2); |
| return jobs.filter( |
| (job) => |
| job.finishedOn && |
| job.finishedOn >= startTime && |
| job.finishedOn <= endTime, |
| ); |
| } |
|
|
| |
| |
| const queueKey = `bull:${queue.name}:${status}`; |
| const jobIds = await client.zrangebyscore( |
| queueKey, |
| startTime, |
| endTime, |
| "LIMIT", |
| 0, |
| limit, |
| ); |
|
|
| if (!jobIds || jobIds.length === 0) { |
| return []; |
| } |
|
|
| |
| const jobPromises = jobIds.map((jobId: string) => queue.getJob(jobId)); |
| const jobs = await Promise.all(jobPromises); |
|
|
| |
| return jobs.filter( |
| (job): job is Job => job !== null && job !== undefined, |
| ); |
| } catch (_error) { |
| |
| const jobs = await queue.getJobs([status], 0, limit * 2); |
| return jobs.filter( |
| (job) => |
| job.finishedOn && |
| job.finishedOn >= startTime && |
| job.finishedOn <= endTime, |
| ); |
| } |
| } |
|
|
| |
| |
| |
| private jobStateCache = new LRUCache<string, JobStatus>({ |
| max: 1000, |
| ttl: 1000 * 30, |
| }); |
|
|
| |
| |
| |
| |
| private countCache = new LRUCache< |
| string, |
| Awaited<ReturnType<Queue["getJobCounts"]>> |
| >({ |
| max: 100, |
| ttl: 1000 * 5, |
| }); |
|
|
| |
| |
| |
| private async getCachedJobCounts( |
| queue: Queue, |
| ): Promise<Awaited<ReturnType<Queue["getJobCounts"]>>> { |
| const cacheKey = queue.name; |
| const cached = this.countCache.get(cacheKey); |
| if (cached !== undefined) { |
| return cached; |
| } |
|
|
| const counts = await queue.getJobCounts(); |
| this.countCache.set(cacheKey, counts); |
| return counts; |
| } |
|
|
| |
| |
| |
| private invalidateJobCache(queueName: string, jobId?: string): void { |
| |
| this.countCache.delete(queueName); |
|
|
| |
| if (jobId) { |
| const stateCacheKey = `${queueName}:${jobId}`; |
| this.jobStateCache.delete(stateCacheKey); |
| } |
|
|
| |
| |
| |
| this.cache.delete("metrics"); |
| this.cache.delete("overview"); |
| this.cache.delete("activity"); |
| } |
|
|
| |
| |
| |
| clearCache(prefix?: string): void { |
| if (prefix) { |
| for (const key of this.cache.keys()) { |
| if (key.startsWith(prefix)) { |
| this.cache.delete(key); |
| } |
| } |
| } else { |
| this.cache.clear(); |
| } |
| } |
|
|
| |
| |
| |
| |
| async getQuickCounts(): Promise<{ |
| waiting: number; |
| active: number; |
| completed: number; |
| failed: number; |
| delayed: number; |
| total: number; |
| timestamp: number; |
| }> { |
| |
| return this.cached("quick-counts", 2000, async () => { |
| const totals = { |
| waiting: 0, |
| active: 0, |
| completed: 0, |
| failed: 0, |
| delayed: 0, |
| total: 0, |
| timestamp: Date.now(), |
| }; |
|
|
| await Promise.all( |
| Array.from(this.queues.values()).map(async (queue) => { |
| const counts = await this.getCachedJobCounts(queue); |
| totals.waiting += counts.waiting || 0; |
| totals.active += counts.active || 0; |
| totals.completed += counts.completed || 0; |
| totals.failed += counts.failed || 0; |
| totals.delayed += counts.delayed || 0; |
| }), |
| ); |
|
|
| totals.total = |
| totals.waiting + |
| totals.active + |
| totals.completed + |
| totals.failed + |
| totals.delayed; |
|
|
| return totals; |
| }); |
| } |
|
|
| |
| |
| |
| getTagFields(): string[] { |
| return this.tagFields; |
| } |
|
|
| |
| |
| |
| |
| getQueueNames(): string[] { |
| return Array.from(this.queues.keys()); |
| } |
|
|
| |
| |
| |
| getQueue(name: string): Queue | undefined { |
| return this.queues.get(name); |
| } |
|
|
| |
| |
| |
| async getQueues(): Promise<QueueInfo[]> { |
| return this.cached("queues", this.CACHE_TTL.queues, async () => { |
| |
| const queueEntries = Array.from(this.queues.entries()); |
| const results = await Promise.all( |
| queueEntries.map(async ([name, queue]) => { |
| const [counts, isPaused] = await Promise.all([ |
| this.getCachedJobCounts(queue), |
| queue.isPaused(), |
| ]); |
|
|
| return { |
| name, |
| counts: { |
| waiting: counts.waiting || 0, |
| active: counts.active || 0, |
| completed: counts.completed || 0, |
| failed: counts.failed || 0, |
| delayed: counts.delayed || 0, |
| paused: counts.paused || 0, |
| }, |
| isPaused, |
| }; |
| }), |
| ); |
|
|
| return results; |
| }); |
| } |
|
|
| |
| |
| |
| async getOverview(): Promise<OverviewStats> { |
| return this.cached("overview", this.CACHE_TTL.overview, async () => { |
| const queues = await this.getQueues(); |
|
|
| let totalJobs = 0; |
| let activeJobs = 0; |
| let failedJobs = 0; |
|
|
| for (const queue of queues) { |
| totalJobs += |
| queue.counts.waiting + queue.counts.active + queue.counts.delayed; |
| activeJobs += queue.counts.active; |
| failedJobs += queue.counts.failed; |
| } |
|
|
| |
| const completedToday = queues.reduce( |
| (sum, q) => sum + q.counts.completed, |
| 0, |
| ); |
|
|
| return { |
| totalJobs, |
| activeJobs, |
| failedJobs, |
| completedToday, |
| avgDuration: 0, |
| queues, |
| }; |
| }); |
| } |
|
|
| |
| |
| |
|
|
| |
| |
| |
| async pauseQueue(queueName: string): Promise<void> { |
| const queue = this.queues.get(queueName); |
| if (!queue) { |
| throw new Error(`Queue "${queueName}" not found`); |
| } |
| await queue.pause(); |
| } |
|
|
| |
| |
| |
| async resumeQueue(queueName: string): Promise<void> { |
| const queue = this.queues.get(queueName); |
| if (!queue) { |
| throw new Error(`Queue "${queueName}" not found`); |
| } |
| await queue.resume(); |
| } |
|
|
| |
| |
| |
| async isQueuePaused(queueName: string): Promise<boolean> { |
| const queue = this.queues.get(queueName); |
| if (!queue) { |
| throw new Error(`Queue "${queueName}" not found`); |
| } |
| return queue.isPaused(); |
| } |
|
|
| |
| |
| |
|
|
| |
| |
| |
| async getMetrics(): Promise<MetricsResponse> { |
| return this.cached("metrics", this.CACHE_TTL.metrics, async () => { |
| return this.withTimeout( |
| (async () => { |
| const now = Date.now(); |
| const twentyFourHoursAgo = now - 24 * 60 * 60 * 1000; |
|
|
| |
| const createEmptyBuckets = (): HourlyBucket[] => { |
| const buckets: HourlyBucket[] = []; |
| const startHour = |
| Math.floor(twentyFourHoursAgo / (60 * 60 * 1000)) * |
| (60 * 60 * 1000); |
| for (let i = 0; i < 24; i++) { |
| buckets.push({ |
| hour: startHour + i * 60 * 60 * 1000, |
| completed: 0, |
| failed: 0, |
| avgDuration: 0, |
| avgWaitTime: 0, |
| }); |
| } |
| return buckets; |
| }; |
|
|
| const queueMetricsMap = new Map< |
| string, |
| { |
| buckets: HourlyBucket[]; |
| durations: number[][]; |
| waitTimes: number[][]; |
| } |
| >(); |
|
|
| |
| for (const queueName of this.queues.keys()) { |
| queueMetricsMap.set(queueName, { |
| buckets: createEmptyBuckets(), |
| durations: Array.from({ length: 24 }, () => []), |
| waitTimes: Array.from({ length: 24 }, () => []), |
| }); |
| } |
|
|
| |
| const allJobs: Array<{ |
| name: string; |
| queueName: string; |
| duration: number; |
| jobId: string; |
| }> = []; |
| const jobTypeStats = new Map< |
| string, |
| { |
| name: string; |
| queueName: string; |
| completed: number; |
| failed: number; |
| } |
| >(); |
|
|
| |
| |
| const queueEntries = Array.from(this.queues.entries()); |
|
|
| |
| const queueChecks = await Promise.all( |
| queueEntries.map(async ([queueName, queue]) => { |
| const counts = await this.getCachedJobCounts(queue); |
| return { |
| queueName, |
| queue, |
| hasRelevantJobs: |
| (counts.completed || 0) > 0 || (counts.failed || 0) > 0, |
| }; |
| }), |
| ); |
|
|
| |
| const relevantQueues = queueChecks.filter((q) => q.hasRelevantJobs); |
|
|
| const queueResults = await Promise.all( |
| relevantQueues.map(async ({ queueName, queue }) => { |
| |
| |
| const [completedJobs, failedJobs] = await Promise.all([ |
| this.getJobsByTimeRange( |
| queue, |
| "completed", |
| twentyFourHoursAgo, |
| now, |
| 100, |
| ), |
| this.getJobsByTimeRange( |
| queue, |
| "failed", |
| twentyFourHoursAgo, |
| now, |
| 100, |
| ), |
| ]); |
| return { queueName, completedJobs, failedJobs }; |
| }), |
| ); |
|
|
| |
| for (const { queueName, completedJobs, failedJobs } of queueResults) { |
| const metrics = queueMetricsMap.get(queueName)!; |
|
|
| |
| for (const job of completedJobs) { |
| if ( |
| !job || |
| !job.finishedOn || |
| job.finishedOn < twentyFourHoursAgo |
| ) |
| continue; |
|
|
| const bucketIndex = Math.floor( |
| (job.finishedOn - (metrics.buckets[0]?.hour || 0)) / |
| (60 * 60 * 1000), |
| ); |
| if (bucketIndex >= 0 && bucketIndex < 24) { |
| metrics.buckets[bucketIndex].completed++; |
|
|
| const duration = job.processedOn |
| ? job.finishedOn - job.processedOn |
| : 0; |
| const waitTime = job.processedOn |
| ? job.processedOn - job.timestamp |
| : 0; |
|
|
| if (duration > 0) { |
| metrics.durations[bucketIndex].push(duration); |
| allJobs.push({ |
| name: job.name, |
| queueName, |
| duration, |
| jobId: job.id || "", |
| }); |
| } |
| if (waitTime > 0) { |
| metrics.waitTimes[bucketIndex].push(waitTime); |
| } |
| } |
|
|
| |
| const key = `${queueName}:${job.name}`; |
| const stats = jobTypeStats.get(key) || { |
| name: job.name, |
| queueName, |
| completed: 0, |
| failed: 0, |
| }; |
| stats.completed++; |
| jobTypeStats.set(key, stats); |
| } |
|
|
| |
| for (const job of failedJobs) { |
| if ( |
| !job || |
| !job.finishedOn || |
| job.finishedOn < twentyFourHoursAgo |
| ) |
| continue; |
|
|
| const bucketIndex = Math.floor( |
| (job.finishedOn - (metrics.buckets[0]?.hour || 0)) / |
| (60 * 60 * 1000), |
| ); |
| if (bucketIndex >= 0 && bucketIndex < 24) { |
| metrics.buckets[bucketIndex].failed++; |
| } |
|
|
| |
| const key = `${queueName}:${job.name}`; |
| const stats = jobTypeStats.get(key) || { |
| name: job.name, |
| queueName, |
| completed: 0, |
| failed: 0, |
| }; |
| stats.failed++; |
| jobTypeStats.set(key, stats); |
| } |
| } |
|
|
| |
| for (const metrics of queueMetricsMap.values()) { |
| for (let i = 0; i < 24; i++) { |
| const durations = metrics.durations[i]; |
| const waitTimes = metrics.waitTimes[i]; |
| if (durations.length > 0) { |
| metrics.buckets[i].avgDuration = Math.round( |
| durations.reduce((a, b) => a + b, 0) / durations.length, |
| ); |
| } |
| if (waitTimes.length > 0) { |
| metrics.buckets[i].avgWaitTime = Math.round( |
| waitTimes.reduce((a, b) => a + b, 0) / waitTimes.length, |
| ); |
| } |
| } |
| } |
|
|
| |
| |
|
|
| |
| const aggregateBuckets = createEmptyBuckets(); |
| const aggregateDurations: number[][] = Array.from( |
| { length: 24 }, |
| () => [], |
| ); |
| const aggregateWaitTimes: number[][] = Array.from( |
| { length: 24 }, |
| () => [], |
| ); |
|
|
| for (const metrics of queueMetricsMap.values()) { |
| for (let i = 0; i < 24; i++) { |
| aggregateBuckets[i].completed += metrics.buckets[i].completed; |
| aggregateBuckets[i].failed += metrics.buckets[i].failed; |
| aggregateDurations[i].push(...metrics.durations[i]); |
| aggregateWaitTimes[i].push(...metrics.waitTimes[i]); |
| } |
| } |
|
|
| |
| for (let i = 0; i < 24; i++) { |
| if (aggregateDurations[i].length > 0) { |
| aggregateBuckets[i].avgDuration = Math.round( |
| aggregateDurations[i].reduce((a, b) => a + b, 0) / |
| aggregateDurations[i].length, |
| ); |
| } |
| if (aggregateWaitTimes[i].length > 0) { |
| aggregateBuckets[i].avgWaitTime = Math.round( |
| aggregateWaitTimes[i].reduce((a, b) => a + b, 0) / |
| aggregateWaitTimes[i].length, |
| ); |
| } |
| } |
|
|
| const totalCompleted = aggregateBuckets.reduce( |
| (sum, b) => sum + b.completed, |
| 0, |
| ); |
| const totalFailed = aggregateBuckets.reduce( |
| (sum, b) => sum + b.failed, |
| 0, |
| ); |
| const allDurations = aggregateDurations.flat(); |
| const allWaitTimes = aggregateWaitTimes.flat(); |
|
|
| |
| const slowestJobs = allJobs |
| .sort((a, b) => b.duration - a.duration) |
| .slice(0, 10); |
|
|
| |
| const mostFailingTypes = Array.from(jobTypeStats.values()) |
| .filter((s) => s.failed > 0) |
| .map((s) => ({ |
| name: s.name, |
| queueName: s.queueName, |
| failCount: s.failed, |
| totalCount: s.completed + s.failed, |
| errorRate: s.failed / (s.completed + s.failed), |
| })) |
| .sort((a, b) => b.failCount - a.failCount) |
| .slice(0, 10); |
|
|
| return { |
| queues: [], |
| aggregate: { |
| queueName: "all", |
| buckets: aggregateBuckets, |
| summary: { |
| totalCompleted, |
| totalFailed, |
| errorRate: |
| totalCompleted + totalFailed > 0 |
| ? totalFailed / (totalCompleted + totalFailed) |
| : 0, |
| avgDuration: |
| allDurations.length > 0 |
| ? Math.round( |
| allDurations.reduce((a, b) => a + b, 0) / |
| allDurations.length, |
| ) |
| : 0, |
| avgWaitTime: |
| allWaitTimes.length > 0 |
| ? Math.round( |
| allWaitTimes.reduce((a, b) => a + b, 0) / |
| allWaitTimes.length, |
| ) |
| : 0, |
| throughputPerHour: Math.round( |
| (totalCompleted + totalFailed) / 24, |
| ), |
| }, |
| }, |
| slowestJobs, |
| mostFailingTypes, |
| computedAt: now, |
| }; |
| })(), |
| 45000, |
| "Metrics computation timed out after 45 seconds", |
| ); |
| }); |
| } |
|
|
| |
| |
| |
| |
| async getActivityStats(): Promise<ActivityStatsResponse> { |
| return this.cached("activity", this.CACHE_TTL.activity, async () => { |
| const now = Date.now(); |
| const bucketSize = 4 * 60 * 60 * 1000; |
| const bucketCount = 42; |
|
|
| |
| const startDate = new Date(now); |
| startDate.setHours(0, 0, 0, 0); |
| startDate.setDate(startDate.getDate() - 6); |
| const startTime = startDate.getTime(); |
|
|
| |
| const buckets: ActivityBucket[] = []; |
| for (let i = 0; i < bucketCount; i++) { |
| buckets.push({ |
| time: startTime + i * bucketSize, |
| completed: 0, |
| failed: 0, |
| }); |
| } |
|
|
| |
| |
| const queueEntries = Array.from(this.queues.entries()); |
|
|
| |
| const queueChecks = await Promise.all( |
| queueEntries.map(async ([, queue]) => { |
| const counts = await this.getCachedJobCounts(queue); |
| return { |
| queue, |
| hasRelevantJobs: |
| (counts.completed || 0) > 0 || (counts.failed || 0) > 0, |
| }; |
| }), |
| ); |
|
|
| |
| const relevantQueues = queueChecks.filter((q) => q.hasRelevantJobs); |
|
|
| const queueResults = await Promise.all( |
| relevantQueues.map(async ({ queue }) => { |
| |
| |
| const [completedJobs, failedJobs] = await Promise.all([ |
| this.getJobsByTimeRange( |
| queue, |
| "completed", |
| startTime, |
| now, |
| 200, |
| ), |
| this.getJobsByTimeRange( |
| queue, |
| "failed", |
| startTime, |
| now, |
| 200, |
| ), |
| ]); |
| return { completedJobs, failedJobs }; |
| }), |
| ); |
|
|
| |
| for (const { completedJobs, failedJobs } of queueResults) { |
| |
| for (const job of completedJobs) { |
| if (!job || !job.finishedOn || job.finishedOn < startTime) continue; |
|
|
| const bucketIndex = Math.floor( |
| (job.finishedOn - startTime) / bucketSize, |
| ); |
| if (bucketIndex >= 0 && bucketIndex < bucketCount) { |
| buckets[bucketIndex].completed++; |
| } |
| } |
|
|
| |
| for (const job of failedJobs) { |
| if (!job || !job.finishedOn || job.finishedOn < startTime) continue; |
|
|
| const bucketIndex = Math.floor( |
| (job.finishedOn - startTime) / bucketSize, |
| ); |
| if (bucketIndex >= 0 && bucketIndex < bucketCount) { |
| buckets[bucketIndex].failed++; |
| } |
| } |
| } |
|
|
| const totalCompleted = buckets.reduce((sum, b) => sum + b.completed, 0); |
| const totalFailed = buckets.reduce((sum, b) => sum + b.failed, 0); |
|
|
| return { |
| buckets, |
| startTime, |
| endTime: now, |
| bucketSize, |
| totalCompleted, |
| totalFailed, |
| computedAt: now, |
| }; |
| }); |
| } |
|
|
| |
| |
| |
| async getJobs( |
| queueName: string, |
| status?: JobStatus, |
| limit = 50, |
| start = 0, |
| sort?: SortOptions, |
| ): Promise<PaginatedResponse<JobInfo>> { |
| const queue = this.queues.get(queueName); |
| if (!queue) { |
| return { data: [], total: 0, hasMore: false }; |
| } |
|
|
| const types = status |
| ? [status] |
| : ["waiting", "active", "completed", "failed", "delayed"]; |
|
|
| |
| const counts = await this.getCachedJobCounts(queue); |
|
|
| |
| const jobsWithState: { job: Job; state: JobStatus }[] = []; |
| let total = 0; |
|
|
| for (const type of types) { |
| const typeJobs = await queue.getJobs(type as any, start, start + limit); |
| jobsWithState.push( |
| ...typeJobs.map((job) => ({ job, state: type as JobStatus })), |
| ); |
|
|
| const typeCount = counts[type as keyof typeof counts] || 0; |
| total += typeCount; |
| } |
|
|
| |
| const jobInfos = (await Promise.all( |
| jobsWithState.map(({ job, state }) => this.jobToInfo(job, "full", state)), |
| )) as JobInfo[]; |
|
|
| |
| const sortField = sort?.field ?? "timestamp"; |
| const sortDir = sort?.direction === "asc" ? 1 : -1; |
|
|
| jobInfos.sort((a, b) => { |
| const aVal = this.getSortValue(a, sortField); |
| const bVal = this.getSortValue(b, sortField); |
| if (aVal < bVal) return -1 * sortDir; |
| if (aVal > bVal) return 1 * sortDir; |
| return 0; |
| }); |
|
|
| |
| const data = jobInfos.slice(0, limit); |
|
|
| return { |
| data, |
| total, |
| hasMore: start + limit < total, |
| cursor: start + limit < total ? String(start + limit) : undefined, |
| }; |
| } |
|
|
| |
| |
| |
| async getJob(queueName: string, jobId: string): Promise<JobInfo | null> { |
| const queue = this.queues.get(queueName); |
| if (!queue) return null; |
|
|
| const job = await queue.getJob(jobId); |
| if (!job) return null; |
|
|
| return this.jobToInfo(job, "full") as Promise<JobInfo>; |
| } |
|
|
| |
| |
| |
| async retryJob(queueName: string, jobId: string): Promise<boolean> { |
| const queue = this.queues.get(queueName); |
| if (!queue) return false; |
|
|
| const job = await queue.getJob(jobId); |
| if (!job) return false; |
|
|
| await job.retry(); |
| this.invalidateJobCache(queueName, jobId); |
| return true; |
| } |
|
|
| |
| |
| |
| async removeJob(queueName: string, jobId: string): Promise<boolean> { |
| const queue = this.queues.get(queueName); |
| if (!queue) return false; |
|
|
| const job = await queue.getJob(jobId); |
| if (!job) return false; |
|
|
| await job.remove(); |
| this.invalidateJobCache(queueName, jobId); |
| return true; |
| } |
|
|
| |
| |
| |
| async promoteJob(queueName: string, jobId: string): Promise<boolean> { |
| const queue = this.queues.get(queueName); |
| if (!queue) return false; |
|
|
| const job = await queue.getJob(jobId); |
| if (!job) return false; |
|
|
| await job.promote(); |
| this.invalidateJobCache(queueName, jobId); |
| return true; |
| } |
|
|
| |
| |
| |
| |
| private parseSearchQuery(query: string): { |
| filters: Record<string, string>; |
| text: string; |
| } { |
| const filters: Record<string, string> = {}; |
| const parts: string[] = []; |
|
|
| |
| const regex = /(\w+):(?:"([^"]+)"|(\S+))/g; |
| let lastIndex = 0; |
| let match: RegExpExecArray | null; |
|
|
| while (true) { |
| match = regex.exec(query); |
| if (!match) break; |
| |
| if (match.index > lastIndex) { |
| parts.push(query.slice(lastIndex, match.index).trim()); |
| } |
| const field = match[1]; |
| const value = match[2] || match[3]; |
| filters[field] = value; |
| lastIndex = regex.lastIndex; |
| } |
|
|
| |
| if (lastIndex < query.length) { |
| parts.push(query.slice(lastIndex).trim()); |
| } |
|
|
| return { |
| filters, |
| text: parts.filter(Boolean).join(" "), |
| }; |
| } |
|
|
| |
| |
| |
| |
| private jobMatchesAllFilters( |
| job: Job, |
| filters: { |
| status?: JobStatus; |
| tags?: Record<string, string>; |
| text?: string; |
| timeRange?: { start: number; end: number }; |
| }, |
| ): boolean { |
| |
| |
|
|
| |
| if (filters.timeRange) { |
| const jobTime = job.processedOn || job.finishedOn || job.timestamp || 0; |
| if ( |
| jobTime < filters.timeRange.start || |
| jobTime > filters.timeRange.end |
| ) { |
| return false; |
| } |
| } |
|
|
| |
| if (filters.tags && Object.keys(filters.tags).length > 0) { |
| if (!job.data || typeof job.data !== "object") { |
| return false; |
| } |
| const dataObj = job.data as Record<string, unknown>; |
| for (const [field, value] of Object.entries(filters.tags)) { |
| const jobValue = dataObj[field]; |
| if (jobValue === undefined || jobValue === null) { |
| return false; |
| } |
| |
| const strJobValue = String(jobValue).toLowerCase(); |
| const strFilterValue = value.toLowerCase(); |
| if (!strJobValue.includes(strFilterValue)) { |
| return false; |
| } |
| } |
| } |
|
|
| |
| if (filters.text) { |
| const lowerText = filters.text.toLowerCase(); |
| const matchesId = job.id?.toLowerCase().includes(lowerText); |
| const matchesName = job.name?.toLowerCase().includes(lowerText); |
|
|
| if (!matchesId && !matchesName) { |
| |
| const stringifiedData = JSON.stringify(job.data).toLowerCase(); |
| if (!stringifiedData.includes(lowerText)) { |
| return false; |
| } |
| } |
| } |
|
|
| return true; |
| } |
|
|
| |
| |
| |
| private jobMatchesFilters( |
| job: Job, |
| filters: Record<string, string>, |
| ): boolean { |
| if (!job.data || typeof job.data !== "object") { |
| return Object.keys(filters).length === 0; |
| } |
|
|
| const dataObj = job.data as Record<string, unknown>; |
| for (const [field, value] of Object.entries(filters)) { |
| const jobValue = dataObj[field]; |
| if (jobValue === undefined || jobValue === null) { |
| return false; |
| } |
| |
| const strJobValue = String(jobValue).toLowerCase(); |
| const strFilterValue = value.toLowerCase(); |
| if (!strJobValue.includes(strFilterValue)) { |
| return false; |
| } |
| } |
| return true; |
| } |
|
|
| |
| |
| |
| |
| |
| async search(query: string, limit = 20): Promise<SearchResult[]> { |
| const { filters, text } = this.parseSearchQuery(query); |
| const lowerText = text.toLowerCase(); |
| const hasFilters = Object.keys(filters).length > 0; |
| const hasText = lowerText.length > 0; |
|
|
| |
| if (!hasFilters && !hasText) { |
| return []; |
| } |
|
|
| |
| const queueEntries = Array.from(this.queues.entries()); |
| const queueChecks = await Promise.all( |
| queueEntries.map(async ([queueName, queue]) => { |
| const counts = await this.getCachedJobCounts(queue); |
| const hasJobs = |
| (counts.waiting || 0) > 0 || |
| (counts.active || 0) > 0 || |
| (counts.completed || 0) > 0 || |
| (counts.failed || 0) > 0 || |
| (counts.delayed || 0) > 0; |
| return { queueName, queue, hasJobs }; |
| }), |
| ); |
|
|
| const relevantQueues = queueChecks.filter((q) => q.hasJobs); |
| if (relevantQueues.length === 0) { |
| return []; |
| } |
|
|
| |
| const types = ["waiting", "active", "completed", "failed", "delayed"]; |
| const fetchLimit = Math.min(limit * 2, 50); |
|
|
| |
| const stringifiedDataCache = new WeakMap<Job, string>(); |
|
|
| |
| const queueResults = await Promise.allSettled( |
| relevantQueues.map(async ({ queueName, queue }) => { |
| |
| const typeResults = await Promise.all( |
| types.map(async (type) => { |
| try { |
| const jobs = await queue.getJobs(type as any, 0, fetchLimit); |
| const matches: SearchResult[] = []; |
|
|
| for (const job of jobs) { |
| |
| if (hasFilters && !this.jobMatchesFilters(job, filters)) { |
| continue; |
| } |
|
|
| |
| if (hasText) { |
| const matchesId = job.id?.toLowerCase().includes(lowerText); |
| const matchesName = job.name |
| ?.toLowerCase() |
| .includes(lowerText); |
|
|
| |
| let matchesData = false; |
| if (!matchesId && !matchesName) { |
| |
| let stringifiedData = stringifiedDataCache.get(job); |
| if (!stringifiedData) { |
| stringifiedData = JSON.stringify(job.data).toLowerCase(); |
| stringifiedDataCache.set(job, stringifiedData); |
| } |
| matchesData = stringifiedData.includes(lowerText); |
| } |
|
|
| if (!matchesId && !matchesName && !matchesData) { |
| continue; |
| } |
| } |
|
|
| |
| matches.push({ |
| queue: queueName, |
| job: (await this.jobToInfo( |
| job, |
| "full", |
| type as JobStatus, |
| )) as JobInfo, |
| }); |
| } |
|
|
| return matches; |
| } catch { |
| return []; |
| } |
| }), |
| ); |
|
|
| return typeResults.flat(); |
| }), |
| ); |
|
|
| |
| const allMatches: SearchResult[] = []; |
| for (const result of queueResults) { |
| if (result.status === "fulfilled") { |
| allMatches.push(...result.value); |
| } |
| } |
|
|
| |
| return allMatches.slice(0, limit); |
| } |
|
|
| |
| |
| |
| async cleanJobs( |
| queueName: string, |
| status: "completed" | "failed", |
| grace = 0, |
| ): Promise<number> { |
| const queue = this.queues.get(queueName); |
| if (!queue) return 0; |
|
|
| const removed = await queue.clean(grace, 1000, status); |
| return removed.length; |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| private async getLatestRuns( |
| limit: number, |
| start: number, |
| ): Promise<PaginatedResponse<RunInfoList>> { |
| const queueEntries = Array.from(this.queues.entries()); |
| const numQueues = queueEntries.length; |
|
|
| if (numQueues === 0) { |
| return { data: [], total: -1, hasMore: false, cursor: undefined }; |
| } |
|
|
| |
| |
| const perQueueFetch = Math.max(5, Math.ceil((limit + 10) / numQueues) + 2); |
|
|
| |
| const allTypes: JobStatus[] = [ |
| "waiting", |
| "active", |
| "completed", |
| "failed", |
| "delayed", |
| ]; |
|
|
| const results = await Promise.all( |
| queueEntries.map(async ([queueName, queue]) => { |
| |
| const jobs = await queue.getJobs(allTypes as any, 0, perQueueFetch); |
| return jobs.map((job) => ({ job, queueName })); |
| }), |
| ); |
|
|
| |
| const allJobs = results.flat(); |
| allJobs.sort((a, b) => { |
| const timeDiff = (b.job.timestamp || 0) - (a.job.timestamp || 0); |
| if (timeDiff !== 0) return timeDiff; |
| |
| return a.queueName.localeCompare(b.queueName); |
| }); |
|
|
| |
| const jobsToConvert = allJobs.slice(start, start + limit); |
|
|
| |
| const runInfos = await Promise.all( |
| jobsToConvert.map(async ({ job, queueName }) => { |
| |
| let state: JobStatus = "waiting"; |
| if (job.finishedOn) { |
| state = job.failedReason ? "failed" : "completed"; |
| } else if (job.processedOn) { |
| state = "active"; |
| } else if (job.delay && job.delay > 0) { |
| state = "delayed"; |
| } |
|
|
| const info = await this.jobToInfo(job, "list", state); |
| return { ...info, queueName } as RunInfoList; |
| }), |
| ); |
|
|
| |
| const hasMore = allJobs.length > start + limit; |
|
|
| return { |
| data: runInfos, |
| total: -1, |
| hasMore, |
| cursor: hasMore ? String(start + runInfos.length) : undefined, |
| }; |
| } |
|
|
| |
| |
| |
| |
| async getAllRuns( |
| limit = 50, |
| start = 0, |
| sort?: SortOptions, |
| filters?: { |
| status?: JobStatus; |
| tags?: Record<string, string>; |
| text?: string; |
| timeRange?: { start: number; end: number }; |
| }, |
| ): Promise<PaginatedResponse<RunInfoList>> { |
| const sortField = sort?.field ?? "timestamp"; |
| const sortDir = sort?.direction === "asc" ? 1 : -1; |
| const hasFilters = !!( |
| filters?.status || |
| filters?.tags || |
| filters?.text || |
| filters?.timeRange |
| ); |
| const isTimestampSort = sortField === "timestamp"; |
|
|
| |
| |
| if (!hasFilters && isTimestampSort && sortDir === -1) { |
| return this.getLatestRuns(limit, start); |
| } |
|
|
| |
| const queueEntries = Array.from(this.queues.entries()); |
|
|
| |
| const types = filters?.status |
| ? [filters.status] |
| : ["waiting", "active", "completed", "failed", "delayed"]; |
|
|
| const hasTimeRange = !!filters?.timeRange; |
| const numQueues = Math.max(queueEntries.length, 1); |
|
|
| if (queueEntries.length === 0) { |
| return { |
| data: [], |
| total: 0, |
| hasMore: false, |
| cursor: undefined, |
| }; |
| } |
|
|
| |
| |
| const baseFetchPerQueue = Math.max( |
| Math.ceil((limit * 2) / numQueues) + 3, |
| 5, |
| ); |
|
|
| let allJobs: { job: Job; queueName: string; state: JobStatus }[] = []; |
|
|
| |
| const fetchFromQueue = async ( |
| queueName: string, |
| queue: Queue, |
| fetchCount: number, |
| ) => { |
| |
| if (hasTimeRange && filters?.timeRange) { |
| const timeRangeJobs: { job: Job; state: JobStatus }[] = []; |
|
|
| |
| if (types.includes("completed")) { |
| const completedJobs = await this.getJobsByTimeRange( |
| queue, |
| "completed", |
| filters.timeRange.start, |
| filters.timeRange.end, |
| fetchCount, |
| ); |
| timeRangeJobs.push( |
| ...completedJobs.map((job) => ({ |
| job, |
| state: "completed" as JobStatus, |
| })), |
| ); |
| } |
|
|
| if (types.includes("failed")) { |
| const failedJobs = await this.getJobsByTimeRange( |
| queue, |
| "failed", |
| filters.timeRange.start, |
| filters.timeRange.end, |
| fetchCount, |
| ); |
| timeRangeJobs.push( |
| ...failedJobs.map((job) => ({ |
| job, |
| state: "failed" as JobStatus, |
| })), |
| ); |
| } |
|
|
| |
| const otherTypes = types.filter( |
| (t) => t !== "completed" && t !== "failed", |
| ); |
| if (otherTypes.length > 0) { |
| const otherJobArrays = await Promise.all( |
| otherTypes.map(async (type) => { |
| const jobs = await queue.getJobs(type as any, 0, fetchCount); |
| return jobs.map((job) => ({ job, state: type as JobStatus })); |
| }), |
| ); |
| timeRangeJobs.push(...otherJobArrays.flat()); |
| } |
|
|
| return timeRangeJobs.map(({ job, state }) => ({ |
| job, |
| queueName, |
| state, |
| })); |
| } |
|
|
| |
| if (filters?.status) { |
| const jobs = await queue.getJobs(filters.status as any, 0, fetchCount); |
| return jobs.map((job) => ({ |
| job, |
| queueName, |
| state: filters.status as JobStatus, |
| })); |
| } |
|
|
| |
| const jobArrays = await Promise.all( |
| types.map(async (type) => { |
| const jobs = await queue.getJobs(type as any, 0, fetchCount); |
| return jobs.map((job) => ({ job, state: type as JobStatus })); |
| }), |
| ); |
| return jobArrays |
| .flat() |
| .map(({ job, state }) => ({ job, queueName, state })); |
| }; |
|
|
| |
| const results = await Promise.all( |
| queueEntries.map(([queueName, queue]) => |
| fetchFromQueue(queueName, queue, baseFetchPerQueue), |
| ), |
| ); |
| allJobs = results.flat(); |
|
|
| |
| if (filters) { |
| allJobs = allJobs.filter(({ job }) => |
| this.jobMatchesAllFilters(job, filters), |
| ); |
| } |
|
|
| |
| |
| if (isTimestampSort) { |
| allJobs.sort((a, b) => { |
| const aTime = a.job.timestamp || 0; |
| const bTime = b.job.timestamp || 0; |
| |
| const timeDiff = sortDir === -1 ? bTime - aTime : aTime - bTime; |
| if (timeDiff !== 0) return timeDiff; |
| |
| const queueDiff = a.queueName.localeCompare(b.queueName); |
| if (queueDiff !== 0) return queueDiff; |
| |
| return (a.job.id || "").localeCompare(b.job.id || ""); |
| }); |
| } |
|
|
| |
| const jobsToConvert = allJobs.slice(start, start + limit); |
|
|
| |
| const runInfos = await Promise.all( |
| jobsToConvert.map(async ({ job, queueName, state }) => { |
| const info = await this.jobToInfo(job, "list", state); |
| return { ...info, queueName } as RunInfoList; |
| }), |
| ); |
|
|
| |
| if (!isTimestampSort) { |
| runInfos.sort((a, b) => { |
| const aVal = this.getSortValueForList(a, sortField); |
| const bVal = this.getSortValueForList(b, sortField); |
| if (aVal < bVal) return -1 * sortDir; |
| if (aVal > bVal) return 1 * sortDir; |
| return 0; |
| }); |
| } |
|
|
| |
| const hasMore = allJobs.length > start + limit; |
|
|
| return { |
| data: runInfos, |
| total: -1, |
| hasMore, |
| cursor: hasMore ? String(start + runInfos.length) : undefined, |
| }; |
| } |
|
|
| |
| |
| |
| async getSchedulers( |
| repeatableSort?: SortOptions, |
| delayedSort?: SortOptions, |
| ): Promise<{ |
| repeatable: SchedulerInfo[]; |
| delayed: DelayedJobInfo[]; |
| }> { |
| const repeatable: SchedulerInfo[] = []; |
| const delayed: DelayedJobInfo[] = []; |
|
|
| |
| const queueEntries = Array.from(this.queues.entries()); |
| const results = await Promise.all( |
| queueEntries.map(async ([queueName, queue]) => { |
| const [repeatableJobs, delayedJobs] = await Promise.all([ |
| queue.getRepeatableJobs(), |
| queue.getJobs("delayed", 0, 50), |
| ]); |
| return { queueName, repeatableJobs, delayedJobs }; |
| }), |
| ); |
|
|
| |
| for (const { queueName, repeatableJobs, delayedJobs } of results) { |
| for (const job of repeatableJobs) { |
| repeatable.push({ |
| key: job.key, |
| name: job.name || "unnamed", |
| queueName, |
| pattern: job.pattern ?? undefined, |
| every: job.every ? Number(job.every) : undefined, |
| next: job.next ?? undefined, |
| endDate: job.endDate ?? undefined, |
| tz: job.tz ?? undefined, |
| }); |
| } |
|
|
| for (const job of delayedJobs) { |
| const delay = job.opts.delay || 0; |
| delayed.push({ |
| id: job.id || "", |
| name: job.name, |
| queueName, |
| delay, |
| processAt: job.timestamp + delay, |
| data: job.data, |
| }); |
| } |
| } |
|
|
| |
| const repeatableField = repeatableSort?.field ?? "name"; |
| const repeatableDir = repeatableSort?.direction === "desc" ? -1 : 1; |
| repeatable.sort((a, b) => { |
| const aVal = this.getSchedulerSortValue(a, repeatableField); |
| const bVal = this.getSchedulerSortValue(b, repeatableField); |
| if (aVal < bVal) return -1 * repeatableDir; |
| if (aVal > bVal) return 1 * repeatableDir; |
| return 0; |
| }); |
|
|
| |
| const delayedField = delayedSort?.field ?? "processAt"; |
| const delayedDir = delayedSort?.direction === "desc" ? -1 : 1; |
| delayed.sort((a, b) => { |
| const aVal = this.getDelayedSortValue(a, delayedField); |
| const bVal = this.getDelayedSortValue(b, delayedField); |
| if (aVal < bVal) return -1 * delayedDir; |
| if (aVal > bVal) return 1 * delayedDir; |
| return 0; |
| }); |
|
|
| return { repeatable, delayed }; |
| } |
|
|
| |
| |
| |
| async enqueueJob(request: TestJobRequest): Promise<{ id: string }> { |
| const queue = this.queues.get(request.queueName); |
| if (!queue) { |
| throw new Error(`Queue "${request.queueName}" not found`); |
| } |
|
|
| const job = await queue.add(request.jobName, request.data, { |
| delay: request.opts?.delay, |
| priority: request.opts?.priority, |
| attempts: request.opts?.attempts, |
| }); |
|
|
| return { id: job.id || "" }; |
| } |
|
|
| |
| |
| |
| private extractTags(data: unknown): JobTags | undefined { |
| if (!this.tagFields.length || !data || typeof data !== "object") { |
| return undefined; |
| } |
|
|
| const tags: JobTags = {}; |
| const dataObj = data as Record<string, unknown>; |
|
|
| for (const field of this.tagFields) { |
| const value = dataObj[field]; |
| if ( |
| value !== undefined && |
| (typeof value === "string" || |
| typeof value === "number" || |
| typeof value === "boolean" || |
| value === null) |
| ) { |
| tags[field] = value as string | number | boolean | null; |
| } |
| } |
|
|
| return Object.keys(tags).length > 0 ? tags : undefined; |
| } |
|
|
| |
| |
| |
| async getTagValues( |
| field: string, |
| limit = 50, |
| ): Promise<{ value: string; count: number }[]> { |
| const valueMap = new Map<string, number>(); |
| const types = ["waiting", "active", "completed", "failed", "delayed"]; |
|
|
| |
| const queueEntries = Array.from(this.queues.entries()); |
| const queueResults = await Promise.all( |
| queueEntries.map(async ([, queue]) => { |
| const jobArrays = await Promise.all( |
| types.map((type) => queue.getJobs(type as any, 0, 100)), |
| ); |
| return jobArrays.flat(); |
| }), |
| ); |
|
|
| |
| for (const jobs of queueResults) { |
| for (const job of jobs) { |
| if (job.data && typeof job.data === "object") { |
| const dataObj = job.data as Record<string, unknown>; |
| const value = dataObj[field]; |
| if (value !== undefined && value !== null) { |
| const strValue = String(value); |
| valueMap.set(strValue, (valueMap.get(strValue) || 0) + 1); |
| } |
| } |
| } |
| } |
|
|
| |
| const sorted = Array.from(valueMap.entries()) |
| .sort((a, b) => b[1] - a[1]) |
| .slice(0, limit) |
| .map(([value, count]) => ({ value, count })); |
|
|
| return sorted; |
| } |
|
|
| |
| |
| |
| private getSortValue( |
| item: JobInfo | RunInfo, |
| field: string, |
| ): string | number { |
| switch (field) { |
| case "timestamp": |
| return item.timestamp ?? 0; |
| case "name": |
| return item.name.toLowerCase(); |
| case "status": |
| return item.status; |
| case "duration": |
| return item.duration ?? 0; |
| case "queueName": |
| return "queueName" in item ? item.queueName.toLowerCase() : ""; |
| case "processedOn": |
| return item.processedOn ?? 0; |
| default: |
| return item.timestamp ?? 0; |
| } |
| } |
|
|
| |
| |
| |
| private getSortValueForList( |
| item: RunInfoList, |
| field: string, |
| ): string | number { |
| switch (field) { |
| case "timestamp": |
| return item.timestamp ?? 0; |
| case "name": |
| return item.name.toLowerCase(); |
| case "status": |
| return item.status; |
| case "duration": |
| return item.duration ?? 0; |
| case "queueName": |
| return item.queueName.toLowerCase(); |
| case "processedOn": |
| return item.processedOn ?? 0; |
| default: |
| return item.timestamp ?? 0; |
| } |
| } |
|
|
| |
| |
| |
| private getSchedulerSortValue( |
| item: SchedulerInfo, |
| field: string, |
| ): string | number { |
| switch (field) { |
| case "name": |
| return item.name.toLowerCase(); |
| case "queueName": |
| return item.queueName.toLowerCase(); |
| case "pattern": |
| return item.pattern?.toLowerCase() ?? ""; |
| case "next": |
| return item.next ?? 0; |
| case "tz": |
| return item.tz?.toLowerCase() ?? ""; |
| default: |
| return item.name.toLowerCase(); |
| } |
| } |
|
|
| |
| |
| |
| private getDelayedSortValue( |
| item: DelayedJobInfo, |
| field: string, |
| ): string | number { |
| switch (field) { |
| case "name": |
| return item.name.toLowerCase(); |
| case "queueName": |
| return item.queueName.toLowerCase(); |
| case "processAt": |
| return item.processAt; |
| case "delay": |
| return item.delay; |
| default: |
| return item.processAt; |
| } |
| } |
|
|
| |
| |
| |
| |
| |
| |
| private async jobToInfo( |
| job: Job, |
| _fields: "list" | "full" = "full", |
| knownState?: JobStatus, |
| ): Promise<JobInfo | RunInfoList> { |
| |
| |
| let state = knownState; |
| if (!state) { |
| const cacheKey = `${job.queueName}:${job.id}`; |
| state = this.jobStateCache.get(cacheKey); |
| if (!state) { |
| state = (await job.getState()) as JobStatus; |
| this.jobStateCache.set(cacheKey, state); |
| } |
| } |
| const duration = |
| job.finishedOn && job.processedOn |
| ? job.finishedOn - job.processedOn |
| : undefined; |
|
|
| |
| let progress: number | object = 0; |
| if (typeof job.progress === "number") { |
| progress = job.progress; |
| } else if (typeof job.progress === "object" && job.progress !== null) { |
| progress = job.progress; |
| } |
|
|
| |
| const tags = this.extractTags(job.data); |
|
|
| |
| let parent: { id: string; queueName: string } | undefined; |
| if (job.parent?.id) { |
| parent = { |
| id: job.parent.id, |
| queueName: |
| job.parent.queueKey?.split(":")[1] || job.parent.queueKey || "", |
| }; |
| } else if (job.parentKey) { |
| |
| const parts = job.parentKey.split(":"); |
| if (parts.length >= 3) { |
| parent = { |
| id: parts[parts.length - 1] || "", |
| queueName: parts[parts.length - 2] || "", |
| }; |
| } |
| } |
|
|
| return { |
| id: job.id || "", |
| name: job.name, |
| data: job.data, |
| opts: { |
| attempts: job.opts.attempts, |
| delay: job.opts.delay, |
| priority: job.opts.priority, |
| }, |
| progress, |
| attemptsMade: job.attemptsMade, |
| processedOn: job.processedOn, |
| finishedOn: job.finishedOn, |
| timestamp: job.timestamp, |
| failedReason: job.failedReason, |
| stacktrace: job.stacktrace, |
| returnvalue: job.returnvalue, |
| status: state as JobStatus, |
| duration, |
| tags, |
| parent, |
| }; |
| } |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| async bulkRetry( |
| jobs: { queueName: string; jobId: string }[], |
| ): Promise<{ success: number; failed: number }> { |
| const results = await Promise.allSettled( |
| jobs.map(async ({ queueName, jobId }) => { |
| const queue = this.queues.get(queueName); |
| if (!queue) { |
| throw new Error("Queue not found"); |
| } |
|
|
| const job = await queue.getJob(jobId); |
| if (!job) { |
| throw new Error("Job not found"); |
| } |
|
|
| await job.retry(); |
| this.invalidateJobCache(queueName, jobId); |
| return { success: true }; |
| }), |
| ); |
|
|
| let success = 0; |
| let failed = 0; |
| for (const result of results) { |
| if (result.status === "fulfilled") { |
| success++; |
| } else { |
| failed++; |
| } |
| } |
|
|
| return { success, failed }; |
| } |
|
|
| |
| |
| |
| |
| async bulkDelete( |
| jobs: { queueName: string; jobId: string }[], |
| ): Promise<{ success: number; failed: number }> { |
| const results = await Promise.allSettled( |
| jobs.map(async ({ queueName, jobId }) => { |
| const queue = this.queues.get(queueName); |
| if (!queue) { |
| throw new Error("Queue not found"); |
| } |
|
|
| const job = await queue.getJob(jobId); |
| if (!job) { |
| throw new Error("Job not found"); |
| } |
|
|
| await job.remove(); |
| this.invalidateJobCache(queueName, jobId); |
| return { success: true }; |
| }), |
| ); |
|
|
| let success = 0; |
| let failed = 0; |
| for (const result of results) { |
| if (result.status === "fulfilled") { |
| success++; |
| } else { |
| failed++; |
| } |
| } |
|
|
| return { success, failed }; |
| } |
|
|
| |
| |
| |
| |
| async bulkPromote( |
| jobs: { queueName: string; jobId: string }[], |
| ): Promise<{ success: number; failed: number }> { |
| const results = await Promise.allSettled( |
| jobs.map(async ({ queueName, jobId }) => { |
| const queue = this.queues.get(queueName); |
| if (!queue) { |
| throw new Error("Queue not found"); |
| } |
|
|
| const job = await queue.getJob(jobId); |
| if (!job) { |
| throw new Error("Job not found"); |
| } |
|
|
| await job.promote(); |
| this.invalidateJobCache(queueName, jobId); |
| return { success: true }; |
| }), |
| ); |
|
|
| let success = 0; |
| let failed = 0; |
| for (const result of results) { |
| if (result.status === "fulfilled") { |
| success++; |
| } else { |
| failed++; |
| } |
| } |
|
|
| return { success, failed }; |
| } |
|
|
| |
| |
| |
|
|
| |
| |
| |
| |
| async getFlows(limit = 50): Promise<FlowSummary[]> { |
| if (!this.flowProducer) { |
| return []; |
| } |
|
|
| return this.cached(`flows:${limit}`, this.CACHE_TTL.flows, async () => { |
| const queueEntries = Array.from(this.queues.entries()); |
|
|
| |
| const queueChecks = await Promise.all( |
| queueEntries.map(async ([queueName, queue]) => { |
| const counts = await this.getCachedJobCounts(queue); |
| const hasRelevantJobs = |
| (counts.waiting || 0) > 0 || |
| (counts["waiting-children"] || 0) > 0 || |
| (counts.active || 0) > 0; |
| return { queueName, queue, hasRelevantJobs }; |
| }), |
| ); |
|
|
| const relevantQueues = queueChecks.filter((q) => q.hasRelevantJobs); |
| if (relevantQueues.length === 0) { |
| return []; |
| } |
|
|
| |
| |
| const queueResults = await Promise.all( |
| relevantQueues.map(async ({ queueName, queue }) => { |
| try { |
| |
| const waitingChildrenJobs = await queue.getJobs( |
| ["waiting-children"], |
| 0, |
| 50, |
| ); |
|
|
| |
| if (waitingChildrenJobs.length >= limit) { |
| return { queueName, jobs: waitingChildrenJobs }; |
| } |
|
|
| |
| const otherTypes = [ |
| "waiting", |
| "active", |
| "completed", |
| "failed", |
| "delayed", |
| ]; |
| const otherJobArrays = await Promise.all( |
| otherTypes.map(async (type) => { |
| try { |
| return await queue.getJobs(type as any, 0, 30); |
| } catch { |
| return []; |
| } |
| }), |
| ); |
|
|
| const allJobs = [...waitingChildrenJobs, ...otherJobArrays.flat()]; |
| return { queueName, jobs: allJobs }; |
| } catch { |
| return { queueName, jobs: [] }; |
| } |
| }), |
| ); |
|
|
| |
| |
| const seenJobIds = new Set<string>(); |
| const potentialRoots: { queueName: string; job: Job }[] = []; |
|
|
| for (const { queueName, jobs } of queueResults) { |
| |
| if (potentialRoots.length >= limit * 2) { |
| break; |
| } |
|
|
| for (const job of jobs) { |
| if (!job || !job.id) continue; |
|
|
| const jobKey = `${queueName}:${job.id}`; |
| if (seenJobIds.has(jobKey)) continue; |
| seenJobIds.add(jobKey); |
|
|
| |
| const hasParent = !!job.parent || !!job.parentKey; |
| if (!hasParent) { |
| potentialRoots.push({ queueName, job }); |
|
|
| |
| if (potentialRoots.length >= limit * 2) { |
| break; |
| } |
| } |
| } |
| } |
|
|
| |
| const batchSize = 20; |
| const flows: FlowSummary[] = []; |
|
|
| for ( |
| let i = 0; |
| i < potentialRoots.length && flows.length < limit; |
| i += batchSize |
| ) { |
| const batch = potentialRoots.slice(i, i + batchSize); |
| const batchResults = await Promise.all( |
| batch.map(async ({ queueName, job }) => { |
| try { |
| const flowTree = await this.flowProducer!.getFlow({ |
| id: job.id!, |
| queueName, |
| }); |
|
|
| if (flowTree?.children && flowTree.children.length > 0) { |
| const stats = this.countFlowStats(flowTree); |
| const state = await job.getState(); |
|
|
| return { |
| id: job.id!, |
| name: job.name, |
| queueName, |
| status: state as JobStatus, |
| totalJobs: stats.total, |
| completedJobs: stats.completed, |
| failedJobs: stats.failed, |
| timestamp: job.timestamp, |
| duration: |
| job.finishedOn && job.processedOn |
| ? job.finishedOn - job.processedOn |
| : undefined, |
| } as FlowSummary; |
| } |
| } catch { |
| |
| } |
| return null; |
| }), |
| ); |
|
|
| for (const result of batchResults) { |
| if (result && flows.length < limit) { |
| flows.push(result); |
| } |
| } |
| } |
|
|
| return flows.sort((a, b) => b.timestamp - a.timestamp); |
| }); |
| } |
|
|
| |
| |
| |
| async getFlow(queueName: string, jobId: string): Promise<FlowNode | null> { |
| if (!this.flowProducer) { |
| return null; |
| } |
|
|
| try { |
| const flowTree = await this.flowProducer.getFlow({ |
| id: jobId, |
| queueName, |
| }); |
|
|
| if (!flowTree) { |
| return null; |
| } |
|
|
| return this.convertFlowTree(flowTree); |
| } catch { |
| return null; |
| } |
| } |
|
|
| |
| |
| |
| async createFlow(request: CreateFlowRequest): Promise<{ id: string }> { |
| if (!this.flowProducer) { |
| throw new Error("FlowProducer not initialized"); |
| } |
|
|
| const flowJob = this.buildFlowJob(request); |
| const result = await this.flowProducer.add(flowJob); |
|
|
| return { id: result.job.id || "" }; |
| } |
|
|
| |
| |
| |
| private buildFlowJob( |
| request: CreateFlowRequest | CreateFlowChildRequest, |
| ): any { |
| const result: any = { |
| name: request.name, |
| queueName: request.queueName, |
| data: request.data || {}, |
| }; |
|
|
| if (request.children && request.children.length > 0) { |
| result.children = request.children.map((child) => |
| this.buildFlowJob(child), |
| ); |
| } |
|
|
| return result; |
| } |
|
|
| |
| |
| |
| private async convertFlowTree(tree: any): Promise<FlowNode> { |
| const job = tree.job; |
| const state = await job.getState(); |
| const duration = |
| job.finishedOn && job.processedOn |
| ? job.finishedOn - job.processedOn |
| : undefined; |
|
|
| const jobInfo: JobInfo = { |
| id: job.id || "", |
| name: job.name, |
| data: job.data, |
| opts: { |
| attempts: job.opts?.attempts, |
| delay: job.opts?.delay, |
| priority: job.opts?.priority, |
| }, |
| progress: |
| typeof job.progress === "number" |
| ? job.progress |
| : typeof job.progress === "object" |
| ? job.progress |
| : 0, |
| attemptsMade: job.attemptsMade || 0, |
| processedOn: job.processedOn, |
| finishedOn: job.finishedOn, |
| timestamp: job.timestamp, |
| failedReason: job.failedReason, |
| stacktrace: job.stacktrace, |
| returnvalue: job.returnvalue, |
| status: state as JobStatus, |
| duration, |
| tags: this.extractTags(job.data), |
| }; |
|
|
| const children: FlowNode[] = []; |
| if (tree.children && tree.children.length > 0) { |
| for (const child of tree.children) { |
| children.push(await this.convertFlowTree(child)); |
| } |
| } |
|
|
| return { |
| job: jobInfo, |
| queueName: job.queueName || tree.queueName || "", |
| children: children.length > 0 ? children : undefined, |
| }; |
| } |
|
|
| |
| |
| |
| private countFlowStats(tree: any): { |
| total: number; |
| completed: number; |
| failed: number; |
| } { |
| let total = 1; |
| let completed = 0; |
| let failed = 0; |
|
|
| |
| const job = tree.job; |
| if (job.finishedOn && !job.failedReason) { |
| completed = 1; |
| } else if (job.failedReason) { |
| failed = 1; |
| } |
|
|
| if (tree.children) { |
| for (const child of tree.children) { |
| const childStats = this.countFlowStats(child); |
| total += childStats.total; |
| completed += childStats.completed; |
| failed += childStats.failed; |
| } |
| } |
|
|
| return { total, completed, failed }; |
| } |
| } |
|
|