codex-ai-platform / api /lib /queue.ts
3v324v23's picture
chore: 彻底清理项目,符合 Hugging Face 部署规范
ae4ceef
/**
* 基于 BullMQ 的分布式任务队列实现 (持久化与可靠处理)
* 支持:自动重试、延迟执行、并发控制、任务持久化
*/
import { Queue, Worker, Job } from 'bullmq';
import { redis, isRedisAvailable } from './redis.js';
const QUEUE_NAME = 'codex_tasks';
// 内存队列模拟 (当 Redis 不可用时)
const memoryQueue: any[] = [];
// 1. 定义任务队列
export const taskQueue = isRedisAvailable ? new Queue(QUEUE_NAME, {
connection: redis as any,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
removeOnFail: false,
},
}) : null;
let worker: Worker | null = null;
// 2. 设置处理器与并发控制
export const setupWorkers = (
aiHandler: (data: any) => Promise<any>,
docHandler: (data: any) => Promise<any>
) => {
if (worker || !isRedisAvailable) {
if (!isRedisAvailable) {
console.warn('[Queue] Redis 不可用,启用内存模拟模式 (任务不持久化)');
// 启动一个简单的定时器处理内存任务
setInterval(async () => {
if (memoryQueue.length > 0) {
const task = memoryQueue.shift();
console.log(`[Queue:Memory] 正在处理任务: ${task.type}`);
try {
if (task.type === 'ai_workflow') await aiHandler(task.data);
else if (task.type === 'document_process') await docHandler(task.data);
} catch (e) {}
}
}, 3000);
}
return;
}
worker = new Worker(
QUEUE_NAME,
async (job: Job) => {
// ... 逻辑保持不变
console.log(`[Queue] 正在执行任务: ${job.name} (ID: ${job.id})`);
const { type, data } = job.data;
try {
if (type === 'ai_workflow') {
return await aiHandler(data);
} else if (type === 'document_process') {
return await docHandler(data);
} else {
throw new Error(`未知任务类型: ${type}`);
}
} catch (err) {
console.error(`[Queue] 任务失败: ${job.id}`, err);
throw err;
}
},
{
connection: redis as any,
concurrency: 5, // 最大并发处理数
}
);
worker.on('completed', (job) => {
console.log(`[Queue] 任务已完成: ${job.id}`);
});
worker.on('failed', (job, err) => {
console.error(`[Queue] 任务彻底失败: ${job?.id}`, err.message);
});
console.log('[Queue] BullMQ 任务队列 Workers 已就绪,并发数: 5');
};
// 3. 添加任务接口
export const addJob = async (type: string, data: any) => {
const jobName = `${type}_${Date.now()}`;
if (taskQueue) {
const job = await taskQueue.add(jobName, { type, data });
return { id: job.id, name: jobName };
} else {
// 内存降级逻辑
memoryQueue.push({ type, data });
return { id: `mem_${Date.now()}`, name: jobName };
}
};
// 获取队列简要状态 (用于监控)
export const getQueueStatus = async () => {
if (taskQueue) {
const [active, waiting, completed, failed] = await Promise.all([
taskQueue.getActiveCount(),
taskQueue.getWaitingCount(),
taskQueue.getCompletedCount(),
taskQueue.getFailedCount(),
]);
return { active, waiting, completed, failed };
} else {
return { active: 0, waiting: memoryQueue.length, completed: 0, failed: 0 };
}
};