队列生产者实例的工厂函数
重复是技术债的起点
当你第一次创建队列时,代码很简单:
const emailQueue = new Queue('email', {
connection: {
host: 'localhost',
port: 6379,
}
});
然后你需要创建第二个队列,用于处理图片:
const imageQueue = new Queue('image', {
connection: {
host: 'localhost',
port: 6379,
}
});
第三个,用于生成报表:
const reportQueue = new Queue('report', {
connection: {
host: 'localhost',
port: 6379,
}
});
连接配置重复了三次。
这时你还不觉得是问题。直到有一天,Redis 的地址变了。你需要在代码里找到所有创建队列的地方,一个一个改。如果漏掉一个,系统部分功能失效。
重复不仅是代码的浪费,更是维护的陷阱。
💡 Click the maximize icon to view in fullscreen
工厂函数是集中管理的入口
工厂函数的第一个价值是:把重复的配置收拢到一个地方。
// queue-factory.ts
import { Queue } from 'bullmq';
const defaultConnection = {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
};
export function createQueue(name: string) {
return new Queue(name, {
connection: defaultConnection,
});
}
现在,所有队列的创建都通过工厂函数:
const emailQueue = createQueue('email');
const imageQueue = createQueue('image');
const reportQueue = createQueue('report');
Redis 地址变了?改环境变量。需要加密连接?在工厂函数里统一配置。需要添加监控?在工厂函数里注入。
一处修改,全局生效。
这不是优雅的追求,而是对未来变化的准备。系统总会变,集中管理让变化的成本降低。
单例不是答案
你可能会想:既然队列连接相同,为什么不共享一个实例?
// ❌ 这样做看似节省资源,但有隐患
let emailQueue: Queue | null = null;
export function getEmailQueue() {
if (!emailQueue) {
emailQueue = new Queue('email', { connection: defaultConnection });
}
return emailQueue;
}
这是单例模式。看起来合理:每个队列只创建一次,节省资源,避免重复连接。
但问题在于:单例引入了全局状态。
全局状态难以测试。当你在测试环境中运行测试时,上一个测试用例可能已经初始化了队列实例。你无法为每个测试创建独立的队列,测试之间会相互影响。
全局状态难以清理。当你想关闭队列连接时,你需要找到所有可能持有队列引用的地方。如果某个模块还在使用,关闭会导致错误。
单例是一种强耦合。
工厂函数不应该决定"是否共享实例",它只负责创建实例。至于是每次创建新实例,还是在调用方缓存实例,应该由调用方决定。
单例看似简单,实则隐患重重
在服务端应用中,单例意味着:
- 测试困难:无法为每个测试创建隔离环境
- 状态污染:一个模块的操作可能影响其他模块
- 内存泄漏风险:实例永不释放,即使不再需要
- 并发冲突:多个地方同时访问同一实例,可能产生竞态条件
更好的做法是:工厂函数返回新实例,由调用方决定是否缓存。
// ✅ 更好的方式
class QueueManager {
private queues = new Map<string, Queue>();
getQueue(name: string): Queue {
if (!this.queues.has(name)) {
this.queues.set(name, createQueue(name));
}
return this.queues.get(name)!;
}
async closeAll() {
for (const queue of this.queues.values()) {
await queue.close();
}
this.queues.clear();
}
}
这样既可以复用实例,又可以在需要时清理,还便于测试。
类型安全是边界的保护
工厂函数不仅是代码复用,更是类型约束的入口。
在 TypeScript 中,你可以为工厂函数添加类型定义:
import { Queue, QueueOptions } from 'bullmq';
export type QueueName = 'email' | 'image' | 'report';
export function createQueue<T = any>(
name: QueueName,
options?: Partial<QueueOptions>
): Queue<T> {
return new Queue<T>(name, {
connection: defaultConnection,
...options,
});
}
这样做有两个好处:
1. 队列名称被限制在有效范围内
const validQueue = createQueue('email'); // ✅ 通过
const invalidQueue = createQueue('unknown'); // ❌ 类型错误
拼写错误、无效队列名,在编译时就被捕获,而不是等到运行时才报错。
2. 任务数据可以有类型约束
interface EmailJob {
to: string;
subject: string;
body: string;
}
const emailQueue = createQueue<EmailJob>('email');
await emailQueue.add('send', {
to: 'user@example.com',
subject: 'Welcome',
body: 'Hello!',
});
// ❌ 类型错误:缺少必需字段
await emailQueue.add('send', {
to: 'user@example.com',
});
类型系统在编译时就告诉你:这个任务数据不完整。
**类型不是束缚,而是边界的保护。**它让错误在最早的时刻被发现,而不是等到生产环境才暴露。
依赖注入让测试成为可能
工厂函数的另一个价值是:它可以被替换。
在生产环境中,你使用真实的 Redis 连接。但在测试环境中,你不想依赖外部服务。你希望队列操作是可预测的、快速的、隔离的。
如果队列的创建散落在各处,你无法控制它们的行为。但如果所有队列都通过工厂函数创建,你就可以在测试时注入不同的实现。
// test-helpers.ts
import { Queue } from 'bullmq';
export function createTestQueue(name: string) {
return new Queue(name, {
connection: {
host: 'localhost',
port: 6380, // 测试用的独立 Redis 实例
},
});
}
或者,使用内存队列:
import { Queue } from 'bullmq';
import IORedis from 'ioredis-mock';
export function createMockQueue(name: string) {
const connection = new IORedis();
return new Queue(name, { connection });
}
工厂函数是依赖的抽象层。
它让你可以在不改变业务代码的前提下,替换底层实现。这是可测试性的基础。
工厂模式本身就是依赖注入的一种形式
依赖注入的核心思想是:不要在代码内部直接创建依赖,而是从外部传入。
工厂函数实现了这一点:
// ❌ 硬编码依赖,难以测试
class EmailService {
private queue = new Queue('email', { /* ... */ });
async send(email: EmailJob) {
await this.queue.add('send', email);
}
}
// ✅ 通过工厂函数注入依赖
class EmailService {
constructor(private queueFactory: QueueFactory) {}
async send(email: EmailJob) {
const queue = this.queueFactory.create('email');
await queue.add('send', email);
}
}
测试时,你可以注入 mock 工厂:
const mockFactory = {
create: jest.fn().mockReturnValue(mockQueue),
};
const service = new EmailService(mockFactory);
这让单元测试不再依赖真实的 Redis,测试更快、更稳定。
延迟初始化是对资源的尊重
并非所有队列都需要立即创建。
如果你在应用启动时创建了所有队列,即使有些队列在当前场景下不会被用到,它们也占用了连接资源。
工厂函数支持延迟初始化:只在需要时才创建队列。
class QueueFactory {
private queues = new Map<QueueName, Queue>();
get(name: QueueName): Queue {
if (!this.queues.has(name)) {
this.queues.set(name, this.create(name));
}
return this.queues.get(name)!;
}
private create(name: QueueName): Queue {
return new Queue(name, {
connection: defaultConnection,
});
}
}
现在,队列只在第一次被使用时创建。如果某个功能模块在运行时从未被触发,对应的队列也不会被创建。
这是对资源的尊重。
不浪费连接,不占用内存,只在需要时才分配。这在资源受限的环境中尤其重要。
清理是生命周期的终点
创建容易,清理难。
当应用关闭时,你需要关闭所有队列连接。如果队列实例散落在各处,你无法追踪哪些需要关闭。
工厂函数可以记录所有创建的实例,并提供统一的清理接口:
class QueueFactory {
private queues = new Map<QueueName, Queue>();
get(name: QueueName): Queue {
// ... 创建逻辑
}
async closeAll(): Promise<void> {
const closingPromises = Array.from(this.queues.values()).map(
queue => queue.close()
);
await Promise.all(closingPromises);
this.queues.clear();
}
}
在应用关闭时,调用 closeAll() 即可:
process.on('SIGTERM', async () => {
await queueFactory.closeAll();
process.exit(0);
});
工厂不仅管理创建,也管理销毁。
这是完整生命周期的体现。创建和销毁是对称的,工厂函数让这种对称性得以实现。
💡 Click the maximize icon to view in fullscreen
配置的分层
不是所有队列都需要相同的配置。
邮件队列可能需要较低的并发,因为外部 SMTP 服务有速率限制。图片处理队列可能需要较高的并发,因为处理是在本地进行的。
工厂函数可以支持配置覆盖:
export function createQueue<T = any>(
name: QueueName,
options?: Partial<QueueOptions>
): Queue<T> {
return new Queue<T>(name, {
connection: defaultConnection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
},
...options, // 调用方可以覆盖默认配置
});
}
使用时:
// 使用默认配置
const emailQueue = createQueue('email');
// 覆盖部分配置
const imageQueue = createQueue('image', {
defaultJobOptions: {
attempts: 5, // 图片处理可以多重试几次
},
});
默认配置是通用的,覆盖配置是特殊的。
这种分层让配置既有一致性,又有灵活性。大部分队列遵循默认规则,特殊场景可以定制。
工厂不是银弹
工厂函数解决了很多问题,但也引入了抽象层。
多一层抽象,就多一层间接。阅读代码时,你需要跳转到工厂函数才能看到队列是如何创建的。调试时,你需要理解工厂的逻辑,而不仅仅是队列本身。
如果你的应用只有一两个队列,工厂函数可能是过度设计。直接创建队列,代码更直观。
但当队列数量增长,当配置开始重复,当测试变得困难,工厂函数的价值就显现了。
不要为了模式而模式。
工厂函数的价值不在于它是一个"设计模式",而在于它解决了实际问题:重复配置、难以测试、资源管理、类型安全。
如果这些问题不存在,工厂函数就是多余的。如果这些问题存在,工厂函数就是必要的。
抽象即边界
工厂函数的本质,是在"使用队列"和"创建队列"之间划了一条线。
使用队列的代码不关心连接配置、不关心实例化逻辑、不关心资源管理。它只需要知道:我需要一个队列,给我即可。
创建队列的逻辑集中在工厂函数中。配置在这里,默认选项在这里,清理逻辑也在这里。
这是职责的分离。
使用者专注于业务逻辑,工厂专注于实例管理。这种分离不是为了炫技,而是为了让系统更清晰、更易维护。
当系统变得复杂时,清晰的边界是唯一的防线。没有边界,所有逻辑混在一起,最终会变得难以理解、难以修改、难以测试。
工厂函数划定了一个边界,让创建和使用各归其位。
在一个充满变化的世界里,边界即秩序。
工厂函数应该是函数还是类?函数式编程和面向对象编程在这里有何取舍?
如果队列需要根据环境动态选择实现(开发环境用内存队列,生产环境用 Redis),工厂函数如何设计?
工厂函数可以嵌套吗?比如有一个"任务工厂"创建不同类型的任务,它内部使用"队列工厂"获取队列实例?
如何为工厂函数添加可观测性?创建了多少实例、哪些队列被使用、资源占用情况,这些信息如何收集和暴露?
工厂模式在微服务架构中如何应用?不同服务是否应该共享工厂逻辑,还是各自实现?
参考资源
Related Posts
Articles you might also find interesting
BullMQ 队列
队列不是技术选型,而是对时间的承认,对顺序的尊重,对不确定性的应对
BullMQ Worker
Worker 的本质是对时间的重新分配,是对主线的解放,也是对专注的追求
单例模式管理 Redis 连接
连接不是技术细节,而是系统与外部世界的第一次握手,是可靠性的起点
适配器模式:对现实的妥协
当 PayPro 要求 IP 白名单而 Stripe 不需要,当一个按秒计费另一个按请求计费,架构设计不是消除约束——而是管理约束。适配器模式不是优雅设计,而是对现实混乱的务实投降。
管理后台需要两次设计
第一次设计回答"发生了什么",第二次设计回答"我能做什么"。在第一次就试图解决所有问题,结果是功能很多但都不够深入。
告警分级与响应时间
不是所有问题都需要立即响应。RPC失败会在凌晨3点叫醒人。安全事件每15分钟检查一次。支付成功只记录,不告警。系统的响应时间应该匹配问题的紧急程度。
文档标准是成本计算的前提
API文档不只是写给开发者看的。它定义了系统的边界、成本结构和可维护性。统一的文档标准让隐性成本变得可见。
配置不会自动同步
视频生成任务永远pending,代码完美部署,队列正确配置。问题不在代码,在于配置的独立性被低估。静默失败比错误更危险。
CRUD 操作
四个字母背后,是数据的生命周期,是权限的边界,也是系统设计的基础逻辑
数据库参数国际化:从 13 个迁移学到的设计原则
数据不该懂语言。当数据库参数嵌入中文标签时,系统的边界就被语言限制了。这篇文章从 13 个参数对齐迁移中提炼出设计原则——国际化不是功能,是系统设计的底层约束。
Stripe Webhook中的防御性编程
三个Bug揭示的真相:假设是代码中最危险的东西。API返回类型、环境配置、变量作用域——每个看似合理的假设都可能导致客户损失。
双重验证:Stripe生产模式的防御性切换
从测试到生产不是更换API keys,而是建立一套双重验证系统。每一步都在两个环境中验证,确保真实支付不会因假设而失败。
错误隔离
失败是必然的。真正的问题不是失败本身,而是失败如何蔓延。错误隔离不是为了消除失败,而是为了控制失败的范围。
在运行的系统上生长新功能
扩展不是推倒重来,而是理解边界,找到生长点。管理层作为观察者和调节器,附着在核心系统上,监测它,影响它,但不改变它的运行逻辑。
实现幂等性处理,忽略已处理的任务
在代码层面识别和忽略已处理的任务,不是简单的布尔检查,而是对时序、并发和状态的深刻理解
引入懒加载模式
懒加载不是优化技巧,而是关于时机的选择。何时创建,决定了系统的效率和复杂度。
缺失值的级联效应
一个NULL值如何在调用链中传播,最终导致错误的错误消息。理解防御层的设计,在失败传播前拦截。
监控观察期法
部署不是结束,而是验证的开始。修复代码只是假设,监控数据才是证明。48小时观察期:让错误主动暴露,让数据证明修复。
Props Drilling
数据在组件树中层层传递,每一层都不需要它,却必须经手。这不是技术债,是架构对真实需求的误解。
监听 Redis 连接事件 - 让不可见的脆弱变得可见
连接看起来应该是透明的。但当它断开时,你才意识到透明不等于可靠。监听不是多余,而是对脆弱性的承认。
资源不会消失,只会泄露
在积分系统中,用户的钱不会凭空消失,但会因为两个时间窗口而泄露:并发请求之间的竞争,和回调永不到达的沉默。
RPC函数的原子化处理
当一个远程函数做太多事情,失败就变得难以理解
RPC函数
关于远程过程调用的本质思考:当你试图让远方看起来像眼前
使用Secret Token验证回调请求的合法性
在开放的网络中,信任不能被假设。Secret Token 是对身份的确认,对伪装的识别,也是对安全边界的坚守
第三方回调的状态映射完整性
KIE.AI 视频生成的三个修复揭示了一个本质问题:回调不只是接收结果,是建立状态映射。没有 vendor_task_id,系统就失去了追溯能力。