Spaces:
Sleeping
Sleeping
| /** | |
| * 基于 BullMQ 的分布式任务队列实现 (持久化与可靠处理) | |
| * 支持:自动重试、延迟执行、并发控制、任务持久化 | |
| */ | |
| import { Queue, Worker, Job } from 'bullmq'; | |
| import { redis, isRedisAvailable } from './redis.js'; | |
| const QUEUE_NAME = 'codex_tasks'; | |
| // 内存队列模拟 (当 Redis 不可用时) | |
| const memoryQueue: any[] = []; | |
| // 1. 定义任务队列 | |
| export const taskQueue = isRedisAvailable ? new Queue(QUEUE_NAME, { | |
| connection: redis as any, | |
| defaultJobOptions: { | |
| attempts: 3, | |
| backoff: { type: 'exponential', delay: 1000 }, | |
| removeOnComplete: true, | |
| removeOnFail: false, | |
| }, | |
| }) : null; | |
| let worker: Worker | null = null; | |
| // 2. 设置处理器与并发控制 | |
| export const setupWorkers = ( | |
| aiHandler: (data: any) => Promise<any>, | |
| docHandler: (data: any) => Promise<any> | |
| ) => { | |
| if (worker || !isRedisAvailable) { | |
| if (!isRedisAvailable) { | |
| console.warn('[Queue] Redis 不可用,启用内存模拟模式 (任务不持久化)'); | |
| // 启动一个简单的定时器处理内存任务 | |
| setInterval(async () => { | |
| if (memoryQueue.length > 0) { | |
| const task = memoryQueue.shift(); | |
| console.log(`[Queue:Memory] 正在处理任务: ${task.type}`); | |
| try { | |
| if (task.type === 'ai_workflow') await aiHandler(task.data); | |
| else if (task.type === 'document_process') await docHandler(task.data); | |
| } catch (e) {} | |
| } | |
| }, 3000); | |
| } | |
| return; | |
| } | |
| worker = new Worker( | |
| QUEUE_NAME, | |
| async (job: Job) => { | |
| // ... 逻辑保持不变 | |
| console.log(`[Queue] 正在执行任务: ${job.name} (ID: ${job.id})`); | |
| const { type, data } = job.data; | |
| try { | |
| if (type === 'ai_workflow') { | |
| return await aiHandler(data); | |
| } else if (type === 'document_process') { | |
| return await docHandler(data); | |
| } else { | |
| throw new Error(`未知任务类型: ${type}`); | |
| } | |
| } catch (err) { | |
| console.error(`[Queue] 任务失败: ${job.id}`, err); | |
| throw err; | |
| } | |
| }, | |
| { | |
| connection: redis as any, | |
| concurrency: 5, // 最大并发处理数 | |
| } | |
| ); | |
| worker.on('completed', (job) => { | |
| console.log(`[Queue] 任务已完成: ${job.id}`); | |
| }); | |
| worker.on('failed', (job, err) => { | |
| console.error(`[Queue] 任务彻底失败: ${job?.id}`, err.message); | |
| }); | |
| console.log('[Queue] BullMQ 任务队列 Workers 已就绪,并发数: 5'); | |
| }; | |
| // 3. 添加任务接口 | |
| export const addJob = async (type: string, data: any) => { | |
| const jobName = `${type}_${Date.now()}`; | |
| if (taskQueue) { | |
| const job = await taskQueue.add(jobName, { type, data }); | |
| return { id: job.id, name: jobName }; | |
| } else { | |
| // 内存降级逻辑 | |
| memoryQueue.push({ type, data }); | |
| return { id: `mem_${Date.now()}`, name: jobName }; | |
| } | |
| }; | |
| // 获取队列简要状态 (用于监控) | |
| export const getQueueStatus = async () => { | |
| if (taskQueue) { | |
| const [active, waiting, completed, failed] = await Promise.all([ | |
| taskQueue.getActiveCount(), | |
| taskQueue.getWaitingCount(), | |
| taskQueue.getCompletedCount(), | |
| taskQueue.getFailedCount(), | |
| ]); | |
| return { active, waiting, completed, failed }; | |
| } else { | |
| return { active: 0, waiting: memoryQueue.length, completed: 0, failed: 0 }; | |
| } | |
| }; | |