BullMQ Worker
分离执行,各自专注
Worker 是一个执行者。它不接受用户请求,不返回即时响应,只是默默地处理任务。
在 BullMQ 的架构中,任务的添加和执行是分离的。你向队列中添加任务,然后就可以离开。Worker 会在稍后的某个时刻取出任务,执行它,记录结果。
这种分离带来的最直接好处是:你不必等待。
发送邮件、处理图片、生成报表,这些操作可能需要几秒甚至几分钟。如果用户发起请求后必须等待这些操作完成,体验会很糟糕。但如果你把这些操作交给 Worker,用户可以立即得到反馈,然后继续做其他事情。
💡 Click the maximize icon to view in fullscreen
时间的重新分配
同步执行意味着:你必须现在做,我必须等你做完。
异步执行意味着:我不需要你现在做,你也不需要现在就做完。
这是对时间的重新分配。
用户的时间是宝贵的。他们不应该为系统的耗时操作买单。当用户点击"生成报表",他们期待的不是漫长的等待,而是一个确认:"我已经收到你的请求,会尽快处理。"
服务器的时间也需要合理分配。接受请求的服务器应该快速响应,而不是被耗时操作堵塞。如果一个请求需要处理 30 秒,而服务器同时只能处理 10 个连接,那么第 11 个用户会等待很久。
Worker 的存在,让这两种时间得以解耦。
用户不必等待任务完成,服务器不必被任务堵塞。任务在队列中等待,Worker 在后台慢慢处理。每个角色都专注于自己擅长的事情。
独立性带来的稳定
Worker 是独立的进程。它不依赖于 API 服务器,也不共享内存或状态。
这种独立性很重要。
如果 API 服务器崩溃,Worker 可以继续运行。如果 Worker 出错,API 服务器不受影响。任务在队列中是持久化的,即使整个系统重启,任务也不会丢失。
更重要的是,Worker 可以扩展。
你可以运行一个 Worker,也可以运行十个。当任务积压时,增加 Worker 数量;当任务减少时,缩减 Worker。这种弹性扩展不需要改动代码,只需要启动更多进程。
Worker 的并发不等于线程的并发
一个 Worker 可以配置并发数,决定同时处理多少个任务。但这不是多线程,而是基于 Node.js 的异步模型。
const worker = new Worker('queueName', async (job) => {
// 处理任务
}, {
concurrency: 5 // 同时处理 5 个任务
});
这意味着 Worker 可以在等待 I/O 时处理其他任务,而不是傻傻地等待。对于 I/O 密集型任务(如网络请求、数据库操作),这种并发模型非常高效。
但对于 CPU 密集型任务(如图像处理、视频转码),单个 Worker 的并发提升有限。这时你需要运行多个 Worker 进程,分散到不同的 CPU 核心。
失败是预期的一部分
Worker 处理的任务可能失败。
网络超时、外部 API 不可用、数据格式错误,失败的原因有很多。在同步执行中,失败意味着用户看到错误。在异步执行中,失败不应该直接暴露给用户。
BullMQ 提供了重试机制。任务失败后,可以自动重试。你可以配置重试次数、重试延迟、重试策略。
await queue.add('send-email', { to: 'user@example.com' }, {
attempts: 3, // 最多重试 3 次
backoff: {
type: 'exponential', // 指数退避
delay: 1000 // 初始延迟 1 秒
}
});
这不是为了掩盖问题,而是为了应对暂时性的失败。网络抖动、服务重启、资源暂时不可用,这些都是可以恢复的。通过重试,系统可以自愈。
但重试也有边界。如果任务连续失败多次,可能是代码有 bug,或者数据本身有问题。这时应该停止重试,记录错误,转入人工处理。
Worker 的设计假设失败是常态,而不是异常。
优先级与顺序
不是所有任务都同等重要。
发送密码重置邮件应该比发送营销邮件优先。生成用户报表应该比统计分析优先。关键业务操作应该比日志归档优先。
BullMQ 支持优先级队列。你可以为任务设置优先级,高优先级的任务会先被处理。
await queue.add('critical-task', data, { priority: 1 });
await queue.add('normal-task', data, { priority: 5 });
await queue.add('low-priority', data, { priority: 10 });
数字越小,优先级越高。
这让你可以在资源有限时,先处理最重要的事情。
但优先级也有代价。低优先级的任务可能被长期延迟,甚至"饿死"。你需要权衡:是严格按优先级处理,还是给低优先级任务保底的执行机会?
这没有标准答案。取决于你的业务场景和容忍度。
监控与可见性
Worker 在后台运行,看不见也摸不着。
这带来一个问题:你怎么知道它在正常工作?
任务堆积了吗?Worker 挂了吗?处理速度够快吗?错误率多高?
没有可见性的系统是不可控的。
BullMQ 提供了丰富的事件机制。你可以监听任务的生命周期:入队、开始、进度、完成、失败。基于这些事件,你可以构建监控系统。
worker.on('completed', (job) => {
console.log(`任务 ${job.id} 完成`);
});
worker.on('failed', (job, err) => {
console.error(`任务 ${job.id} 失败:`, err);
});
worker.on('progress', (job, progress) => {
console.log(`任务 ${job.id} 进度: ${progress}%`);
});
更进一步,你可以集成 Prometheus、Grafana 等监控工具,实时追踪队列长度、处理速度、错误率、平均延迟。
当队列积压时,你会收到告警。当 Worker 崩溃时,你会知道。当处理速度下降时,你可以调查原因。
可见性是控制的前提。
Worker 是长期运行的进程,如果代码中存在内存泄漏,影响会被放大。
常见的泄漏来源:
- 未清理的定时器
- 闭包中保留的大对象引用
- 全局变量的累积
- 事件监听器未移除
定期监控 Worker 的内存使用。如果发现内存持续增长,需要排查泄漏点。
可以配置 Worker 在处理一定数量任务后自动重启,作为临时缓解措施:
const worker = new Worker('queueName', processor, {
maxJobsPerWorker: 1000 // 处理 1000 个任务后重启
});
但这不是根本解决方案。找到并修复内存泄漏才是正道。
不是银弹
Worker 解决了很多问题,但也带来了复杂性。
你需要管理额外的进程。你需要处理任务的序列化和反序列化。你需要设计重试策略。你需要监控队列状态。
更重要的是,异步处理意味着延迟。用户提交任务后,不能立即看到结果。他们需要等待,或者通过其他方式(如通知、轮询)获知任务状态。
这种延迟在某些场景下是可接受的,甚至是必要的。但在另一些场景下,用户需要即时反馈。
不要为了使用 Worker 而使用 Worker。
如果一个操作足够快(比如几十毫秒),直接同步执行可能更简单。如果用户必须等待结果才能继续,异步处理反而增加了复杂度。
Worker 的价值在于:当同步执行不可行时,它提供了一种优雅的替代方案。
专注带来的力量
Worker 的设计哲学可以总结为:分离、专注、独立。
分离意味着不同的职责由不同的组件承担。API 服务器负责快速响应,Worker 负责耗时处理。
专注意味着每个组件只做自己擅长的事。API 服务器不被任务堵塞,Worker 不关心用户交互。
独立意味着组件之间松耦合。一个崩溃不影响另一个,一个扩展不需要改动另一个。
这种架构不是为了炫技,而是为了应对现实的复杂性。
系统会变得越来越复杂。功能会越来越多,流量会越来越大,需求会越来越多样化。如果所有逻辑都挤在一起,最终会不堪重负。
分离是为了可持续。专注是为了可靠。独立是为了灵活。
在一个充满变化的世界里,能够让每个部分各司其职,本身就是一种稳定。
Worker 的并发模型适合所有类型的任务吗?在什么情况下,多进程模型比单进程并发更合适?
如何设计 Worker 的生命周期管理?优雅关闭、任务迁移、滚动更新,这些场景如何处理?
队列积压时,是增加 Worker 数量,还是优化任务处理逻辑?这两种策略的适用场景是什么?
如何测试 Worker 的可靠性?模拟失败、重试、并发冲突,需要哪些测试策略?
异步处理引入了最终一致性。如何向用户解释"任务已提交,正在处理"这种状态?
Worker 模式是否可以应用到前端?Service Worker、Web Worker 与后端 Worker 有什么异同?
参考资源
相关阅读
- 队列、可靠性与系统边界 - 理解队列系统的权衡与设计选择
- BullMQ 队列 - 理解任务的来源与顺序管理
- 错误隔离 - 如何防止失败蔓延
Related Posts
Articles you might also find interesting
BullMQ 队列
队列不是技术选型,而是对时间的承认,对顺序的尊重,对不确定性的应对
队列生产者实例的工厂函数
工厂函数不是设计模式的炫技,而是对重复的拒绝,对集中管理的追求,对变化的准备
单例模式管理 Redis 连接
连接不是技术细节,而是系统与外部世界的第一次握手,是可靠性的起点
管理后台需要两次设计
第一次设计回答"发生了什么",第二次设计回答"我能做什么"。在第一次就试图解决所有问题,结果是功能很多但都不够深入。
告警分级与响应时间
不是所有问题都需要立即响应。RPC失败会在凌晨3点叫醒人。安全事件每15分钟检查一次。支付成功只记录,不告警。系统的响应时间应该匹配问题的紧急程度。
文档标准是成本计算的前提
API文档不只是写给开发者看的。它定义了系统的边界、成本结构和可维护性。统一的文档标准让隐性成本变得可见。
配置不会自动同步
视频生成任务永远pending,代码完美部署,队列正确配置。问题不在代码,在于配置的独立性被低估。静默失败比错误更危险。
CRUD 操作
四个字母背后,是数据的生命周期,是权限的边界,也是系统设计的基础逻辑
数据库参数国际化:从 13 个迁移学到的设计原则
数据不该懂语言。当数据库参数嵌入中文标签时,系统的边界就被语言限制了。这篇文章从 13 个参数对齐迁移中提炼出设计原则——国际化不是功能,是系统设计的底层约束。
Stripe Webhook中的防御性编程
三个Bug揭示的真相:假设是代码中最危险的东西。API返回类型、环境配置、变量作用域——每个看似合理的假设都可能导致客户损失。
双重验证:Stripe生产模式的防御性切换
从测试到生产不是更换API keys,而是建立一套双重验证系统。每一步都在两个环境中验证,确保真实支付不会因假设而失败。
错误隔离
失败是必然的。真正的问题不是失败本身,而是失败如何蔓延。错误隔离不是为了消除失败,而是为了控制失败的范围。
在运行的系统上生长新功能
扩展不是推倒重来,而是理解边界,找到生长点。管理层作为观察者和调节器,附着在核心系统上,监测它,影响它,但不改变它的运行逻辑。
实现幂等性处理,忽略已处理的任务
在代码层面识别和忽略已处理的任务,不是简单的布尔检查,而是对时序、并发和状态的深刻理解
缺失值的级联效应
一个NULL值如何在调用链中传播,最终导致错误的错误消息。理解防御层的设计,在失败传播前拦截。
监控观察期法
部署不是结束,而是验证的开始。修复代码只是假设,监控数据才是证明。48小时观察期:让错误主动暴露,让数据证明修复。
Props Drilling
数据在组件树中层层传递,每一层都不需要它,却必须经手。这不是技术债,是架构对真实需求的误解。
监听 Redis 连接事件 - 让不可见的脆弱变得可见
连接看起来应该是透明的。但当它断开时,你才意识到透明不等于可靠。监听不是多余,而是对脆弱性的承认。
资源不会消失,只会泄露
在积分系统中,用户的钱不会凭空消失,但会因为两个时间窗口而泄露:并发请求之间的竞争,和回调永不到达的沉默。
RPC函数的原子化处理
当一个远程函数做太多事情,失败就变得难以理解
RPC函数
关于远程过程调用的本质思考:当你试图让远方看起来像眼前
使用Secret Token验证回调请求的合法性
在开放的网络中,信任不能被假设。Secret Token 是对身份的确认,对伪装的识别,也是对安全边界的坚守
第三方回调的状态映射完整性
KIE.AI 视频生成的三个修复揭示了一个本质问题:回调不只是接收结果,是建立状态映射。没有 vendor_task_id,系统就失去了追溯能力。
指数退避超时 - 防止无限重试循环
失败后立即重试是本能。但有些失败,需要时间来消化。指数退避不是逃避失败,而是尊重失败。
MD5 和 API token 外表相似但功能相反
API token 看起来像 MD5 哈希值。两者都是字符串。但相似掩盖了本质的分野——一个计算,一个认证。理解这个区别揭示了为什么我们会混淆工具与它们的外表。