BullMQ 队列

3 min read
Zekari
系统设计异步处理后端架构消息队列

队列是对时间的承认

队列的存在,首先承认了一个事实:事情不能同时发生。

当十个请求同时到达,服务器不能同时处理十个请求。当一千封邮件需要发送,系统不能瞬间完成。资源是有限的,时间是线性的,处理能力有上限。

队列不是为了解决这个限制,而是为了接受这个限制。

它说:我知道你现在无法处理这么多,那就先排队。先来的先处理,后来的等一会儿。这不是妥协,而是一种秩序。

💡 Click the maximize icon to view in fullscreen

没有队列时,系统要么拒绝请求,要么崩溃。有了队列,系统可以说:我收到了,会处理,请等待。

这是对时间的诚实。

持久化是对可靠性的保证

队列在内存中运行很快,但不安全。

服务器重启,内存清空,任务消失。这对于某些场景可以接受,但对于关键业务不行。

BullMQ 基于 Redis,任务被持久化到磁盘。即使进程崩溃、服务器重启,任务依然存在。当系统恢复时,队列继续从中断的地方执行。

const queue = new Queue('email-queue', {
  connection: {
    host: 'localhost',
    port: 6379,
  }
});

await queue.add('send-welcome', {
  email: 'user@example.com',
  template: 'welcome'
});

这段代码执行后,任务已经安全存储。即使下一秒服务器断电,任务也不会丢失。

持久化不是性能优化,而是可靠性的基础。

当你把任务放入队列,你期待的不是"可能会被处理",而是"一定会被处理"。持久化是这种承诺的技术保障。

顺序不是绝对的

队列默认是先进先出(FIFO)。第一个进入的任务,第一个被处理。

但顺序不是绝对的。

有些任务更紧急。用户重置密码的邮件,应该比营销邮件先发送。支付成功的通知,应该比推荐内容的推送先处理。

BullMQ 支持优先级。

// 高优先级:密码重置
await queue.add('password-reset', data, { priority: 1 });

// 普通优先级:订单确认
await queue.add('order-confirmation', data, { priority: 5 });

// 低优先级:营销邮件
await queue.add('marketing-email', data, { priority: 10 });

优先级破坏了绝对的顺序,但维护了重要性的顺序。

这是对现实的妥协。并非所有事情都同等重要。在资源有限时,先做重要的事情是理性的选择。

但优先级也有代价。低优先级的任务可能被延迟很久。如果高优先级任务源源不断,低优先级任务可能永远得不到执行。

这需要平衡。是严格按优先级,还是给低优先级任务保底的机会?没有标准答案,取决于你的容忍度。

队列不仅处理现在,也可以处理未来。

你可以指定任务在某个时间点后才执行:

await queue.add('reminder', {
  userId: 123,
  message: '你的会员即将到期'
}, {
  delay: 7 * 24 * 60 * 60 * 1000  // 7天后执行
});

这是对未来的约定。任务现在加入队列,但不立即执行,而是等待指定的时间。

应用场景:

  • 定时提醒
  • 延迟重试
  • 定期任务
  • 试用期到期通知

延迟任务让队列不再局限于"现在",而是扩展到"未来"。

失败与重试

任务可能失败。

网络超时、外部服务不可用、数据格式错误。失败的原因有很多,大部分是暂时性的。

队列需要应对失败。

BullMQ 提供自动重试机制。任务失败后,不会立即丢弃,而是重新加入队列,等待下次尝试。

await queue.add('fetch-data', { url: 'https://api.example.com' }, {
  attempts: 3,
  backoff: {
    type: 'exponential',
    delay: 1000
  }
});

这段配置的含义是:

  • 最多尝试 3 次
  • 第一次失败后,等待 1 秒重试
  • 第二次失败后,等待 2 秒重试
  • 第三次失败后,等待 4 秒重试

指数退避(exponential backoff)是对暂时性失败的应对。如果服务短暂不可用,等待一段时间后可能恢复。如果立即重试,可能继续失败,还会加重对方的负担。

但重试不是万能的。

如果任务连续失败多次,可能不是暂时性问题,而是代码错误或数据异常。这时继续重试没有意义,应该停止,记录错误,转入人工处理。

重试是为了应对不确定性,而不是掩盖问题。

队列的边界

队列可以无限增长吗?

理论上可以,实际上不行。

Redis 的内存是有限的。如果任务添加的速度远超处理速度,队列会不断膨胀,最终耗尽内存。

这时有几种选择:

  1. 拒绝新任务:队列满时,不再接受新任务
  2. 移除旧任务:保留最新的任务,丢弃最旧的任务
  3. 降低优先级:将低优先级任务降级或延迟
  4. 增加处理能力:启动更多 Worker 加快处理

没有完美的策略。每种选择都有代价。

拒绝新任务意味着用户的请求被忽略。移除旧任务意味着有些任务永远不会执行。降低优先级意味着不公平。增加处理能力意味着成本上升。

队列的边界是对资源有限性的再次承认。

你不能无限延迟,不能无限积压。当队列持续增长时,不是队列的问题,而是系统设计的问题。

当任务多次失败后,它会去哪里?

BullMQ 支持死信队列(Dead Letter Queue)。当任务耗尽所有重试机会后,不会被丢弃,而是被移入死信队列。

这是失败的终点站。

死信队列中的任务不会再被自动处理,但可以被人工检查、修复、重新入队。

worker.on('failed', async (job, err) => {
  if (job.attemptsMade >= job.opts.attempts) {
    // 任务已耗尽重试次数
    await deadLetterQueue.add('failed-task', {
      originalJob: job.data,
      error: err.message,
      failedAt: new Date()
    });
  }
});

死信队列的价值在于:它保留了失败的证据。

你可以分析哪些任务经常失败,为什么失败,如何改进。这是系统改进的起点。

没有死信队列,失败的任务消失了,你永远不知道哪里出了问题。

队列不是缓存

队列和缓存经常被混淆,但它们本质不同。

缓存是对读取的优化。数据被读取后,暂存在快速存储中,下次读取时直接返回,避免重复计算。

队列是对写入的延迟。任务被添加后,不立即执行,而是等待稍后处理。

缓存解决的是"如何更快地获取已有的结果"。

队列解决的是"如何在资源有限时,有序地完成所有任务"。

缓存可以丢弃。如果内存不足,可以清空缓存,下次重新计算。

队列不能随意丢弃。队列中的任务代表了用户的期待或系统的责任。丢弃任务意味着违背承诺。

队列是对责任的承接,缓存是对性能的优化。

它们的相似之处只是都使用了 Redis。但用途和设计哲学完全不同。

解耦的意义

队列最重要的作用,不是性能优化,而是解耦。

没有队列时,生产者和消费者必须同时在线。生产者创建任务,消费者立即处理。如果消费者不可用,生产者必须等待或失败。

有了队列,生产者和消费者可以独立运行。

生产者只需要把任务放入队列,不关心谁来处理,何时处理。消费者只需要从队列中取出任务,不关心谁创建了任务,何时创建的。

这种解耦带来的灵活性是巨大的。

你可以随时停止消费者,升级代码,重新启动,任务不会丢失。你可以增加消费者数量,提高处理速度,不需要改动生产者。你可以让不同的消费者处理不同类型的任务,不影响生产者逻辑。

队列是生产者和消费者之间的契约。

契约的内容是:我把任务放在这里,你从这里取走处理。中间发生什么,双方都不关心。

这种松耦合是系统可扩展性的基础。

可见性是控制的前提

队列在后台运行,看不见。

任务有多少?处理速度如何?失败率多高?队列积压了吗?

没有可见性的队列是失控的。

BullMQ 提供了丰富的监控接口。你可以查询队列状态:

const waiting = await queue.getWaitingCount();
const active = await queue.getActiveCount();
const completed = await queue.getCompletedCount();
const failed = await queue.getFailedCount();

console.log(`等待: ${waiting}, 处理中: ${active}, 已完成: ${completed}, 失败: ${failed}`);

这些数字告诉你系统的健康状况。

如果等待的任务持续增长,说明处理速度跟不上。如果失败率突然升高,说明某个环节出问题了。如果活跃任务为零,说明消费者可能挂了。

基于这些指标,你可以设置告警:

  • 队列长度超过 10000,触发告警
  • 失败率超过 5%,触发告警
  • 处理速度低于每秒 10 个,触发告警

可见性让问题在恶化之前被发现。

不是所有问题都需要队列

队列很强大,但不是所有场景都适用。

如果任务很快(几十毫秒),直接同步执行更简单。引入队列会增加复杂度:需要管理队列进程,需要监控队列状态,需要处理异步反馈。

如果任务必须立即返回结果,队列也不合适。用户提交表单,需要立即知道是否成功。这种场景下,延迟处理没有意义。

如果任务之间有严格的顺序依赖,队列也不是最好的选择。任务 A 必须在任务 B 之前完成,否则 B 会失败。这种复杂的依赖关系,用工作流引擎更合适。

不要为了使用队列而使用队列。

队列的价值在于:当同步处理不可行时,它提供了一种优雅的异步方案。

队列即承诺

队列的本质,是一种承诺。

当任务进入队列,系统对外承诺:这个任务会被处理。

这个承诺不是立即兑现的,但一定会兑现。可能是一秒后,可能是一分钟后,可能是一小时后。但只要任务在队列中,就不会被遗忘。

队列是系统的记忆。

它记得每一个未完成的责任,记得每一个待处理的请求。即使服务器重启,即使进程崩溃,队列依然记得。

这种记忆是可靠性的来源。

在一个充满不确定性的世界里,队列提供了一种确定性:你交给我的事情,我会做完。


队列的顺序是否总是有意义的?在什么场景下,无序队列比有序队列更合适?

如果队列中的任务相互依赖,如何设计?是拆分成多个队列,还是在队列内部管理依赖?

队列的持久化有多重要?如果任务可以重新生成,是否还需要持久化?

队列积压时,如何判断是处理能力不足,还是任务添加过快?这两种情况的应对策略有何不同?

队列可以跨服务使用吗?如果多个服务共享一个队列,如何避免冲突和混乱?

如何测试队列的可靠性?模拟崩溃、网络故障、并发冲突,需要哪些测试场景?


参考资源

相关阅读

Related Posts

Articles you might also find interesting

BullMQ Worker

2 min read

Worker 的本质是对时间的重新分配,是对主线的解放,也是对专注的追求

系统设计异步处理
Read More

队列生产者实例的工厂函数

4 min read

工厂函数不是设计模式的炫技,而是对重复的拒绝,对集中管理的追求,对变化的准备

系统设计设计模式
Read More

单例模式管理 Redis 连接

5 min read

连接不是技术细节,而是系统与外部世界的第一次握手,是可靠性的起点

系统设计后端架构
Read More

管理后台需要两次设计

3 min read

第一次设计回答"发生了什么",第二次设计回答"我能做什么"。在第一次就试图解决所有问题,结果是功能很多但都不够深入。

系统设计API 设计
Read More

告警分级与响应时间

2 min read

不是所有问题都需要立即响应。RPC失败会在凌晨3点叫醒人。安全事件每15分钟检查一次。支付成功只记录,不告警。系统的响应时间应该匹配问题的紧急程度。

系统设计监控
Read More

文档标准是成本计算的前提

3 min read

API文档不只是写给开发者看的。它定义了系统的边界、成本结构和可维护性。统一的文档标准让隐性成本变得可见。

API文档
Read More

配置不会自动同步

2 min read

视频生成任务永远pending,代码完美部署,队列正确配置。问题不在代码,在于配置的独立性被低估。静默失败比错误更危险。

部署配置管理
Read More

CRUD 操作

2 min read

四个字母背后,是数据的生命周期,是权限的边界,也是系统设计的基础逻辑

系统设计软件工程
Read More

数据库参数国际化:从 13 个迁移学到的设计原则

3 min read

数据不该懂语言。当数据库参数嵌入中文标签时,系统的边界就被语言限制了。这篇文章从 13 个参数对齐迁移中提炼出设计原则——国际化不是功能,是系统设计的底层约束。

数据库国际化
Read More

Stripe Webhook中的防御性编程

2 min read

三个Bug揭示的真相:假设是代码中最危险的东西。API返回类型、环境配置、变量作用域——每个看似合理的假设都可能导致客户损失。

Web开发系统设计
Read More

双重验证:Stripe生产模式的防御性切换

7 min read

从测试到生产不是更换API keys,而是建立一套双重验证系统。每一步都在两个环境中验证,确保真实支付不会因假设而失败。

系统设计Stripe
Read More

错误隔离

3 min read

失败是必然的。真正的问题不是失败本身,而是失败如何蔓延。错误隔离不是为了消除失败,而是为了控制失败的范围。

系统设计可靠性工程
Read More

在运行的系统上生长新功能

3 min read

扩展不是推倒重来,而是理解边界,找到生长点。管理层作为观察者和调节器,附着在核心系统上,监测它,影响它,但不改变它的运行逻辑。

系统设计架构
Read More

实现幂等性处理,忽略已处理的任务

3 min read

在代码层面识别和忽略已处理的任务,不是简单的布尔检查,而是对时序、并发和状态的深刻理解

系统设计并发控制
Read More

缺失值的级联效应

3 min read

一个NULL值如何在调用链中传播,最终导致错误的错误消息。理解防御层的设计,在失败传播前拦截。

系统设计防御性编程
Read More

监控观察期法

3 min read

部署不是结束,而是验证的开始。修复代码只是假设,监控数据才是证明。48小时观察期:让错误主动暴露,让数据证明修复。

系统设计监控
Read More

Props Drilling

3 min read

数据在组件树中层层传递,每一层都不需要它,却必须经手。这不是技术债,是架构对真实需求的误解。

React组件设计
Read More

监听 Redis 连接事件 - 让不可见的脆弱变得可见

4 min read

连接看起来应该是透明的。但当它断开时,你才意识到透明不等于可靠。监听不是多余,而是对脆弱性的承认。

系统设计Redis
Read More

资源不会消失,只会泄露

2 min read

在积分系统中,用户的钱不会凭空消失,但会因为两个时间窗口而泄露:并发请求之间的竞争,和回调永不到达的沉默。

系统设计并发控制
Read More

RPC函数的原子化处理

1 min read

当一个远程函数做太多事情,失败就变得难以理解

分布式系统RPC
Read More

RPC函数

2 min read

关于远程过程调用的本质思考:当你试图让远方看起来像眼前

分布式系统RPC
Read More

使用Secret Token验证回调请求的合法性

2 min read

在开放的网络中,信任不能被假设。Secret Token 是对身份的确认,对伪装的识别,也是对安全边界的坚守

Web 安全系统设计
Read More

第三方回调的状态映射完整性

5 min read

KIE.AI 视频生成的三个修复揭示了一个本质问题:回调不只是接收结果,是建立状态映射。没有 vendor_task_id,系统就失去了追溯能力。

Purikura 项目系统设计
Read More

指数退避超时 - 防止无限重试循环

3 min read

失败后立即重试是本能。但有些失败,需要时间来消化。指数退避不是逃避失败,而是尊重失败。

系统设计可靠性工程
Read More

MD5 和 API token 外表相似但功能相反

2 min read

API token 看起来像 MD5 哈希值。两者都是字符串。但相似掩盖了本质的分野——一个计算,一个认证。理解这个区别揭示了为什么我们会混淆工具与它们的外表。

安全系统设计
Read More