队列生产者实例的工厂函数

4 min read
Zekari
系统设计设计模式后端架构消息队列

重复是技术债的起点

当你第一次创建队列时,代码很简单:

const emailQueue = new Queue('email', {
  connection: {
    host: 'localhost',
    port: 6379,
  }
});

然后你需要创建第二个队列,用于处理图片:

const imageQueue = new Queue('image', {
  connection: {
    host: 'localhost',
    port: 6379,
  }
});

第三个,用于生成报表:

const reportQueue = new Queue('report', {
  connection: {
    host: 'localhost',
    port: 6379,
  }
});

连接配置重复了三次。

这时你还不觉得是问题。直到有一天,Redis 的地址变了。你需要在代码里找到所有创建队列的地方,一个一个改。如果漏掉一个,系统部分功能失效。

重复不仅是代码的浪费,更是维护的陷阱。

💡 Click the maximize icon to view in fullscreen

工厂函数是集中管理的入口

工厂函数的第一个价值是:把重复的配置收拢到一个地方。

// queue-factory.ts
import { Queue } from 'bullmq';

const defaultConnection = {
  host: process.env.REDIS_HOST || 'localhost',
  port: parseInt(process.env.REDIS_PORT || '6379'),
};

export function createQueue(name: string) {
  return new Queue(name, {
    connection: defaultConnection,
  });
}

现在,所有队列的创建都通过工厂函数:

const emailQueue = createQueue('email');
const imageQueue = createQueue('image');
const reportQueue = createQueue('report');

Redis 地址变了?改环境变量。需要加密连接?在工厂函数里统一配置。需要添加监控?在工厂函数里注入。

一处修改,全局生效。

这不是优雅的追求,而是对未来变化的准备。系统总会变,集中管理让变化的成本降低。

单例不是答案

你可能会想:既然队列连接相同,为什么不共享一个实例?

// ❌ 这样做看似节省资源,但有隐患
let emailQueue: Queue | null = null;

export function getEmailQueue() {
  if (!emailQueue) {
    emailQueue = new Queue('email', { connection: defaultConnection });
  }
  return emailQueue;
}

这是单例模式。看起来合理:每个队列只创建一次,节省资源,避免重复连接。

但问题在于:单例引入了全局状态。

全局状态难以测试。当你在测试环境中运行测试时,上一个测试用例可能已经初始化了队列实例。你无法为每个测试创建独立的队列,测试之间会相互影响。

全局状态难以清理。当你想关闭队列连接时,你需要找到所有可能持有队列引用的地方。如果某个模块还在使用,关闭会导致错误。

单例是一种强耦合。

工厂函数不应该决定"是否共享实例",它只负责创建实例。至于是每次创建新实例,还是在调用方缓存实例,应该由调用方决定。

单例看似简单,实则隐患重重

在服务端应用中,单例意味着:

  • 测试困难:无法为每个测试创建隔离环境
  • 状态污染:一个模块的操作可能影响其他模块
  • 内存泄漏风险:实例永不释放,即使不再需要
  • 并发冲突:多个地方同时访问同一实例,可能产生竞态条件

更好的做法是:工厂函数返回新实例,由调用方决定是否缓存。

// ✅ 更好的方式
class QueueManager {
  private queues = new Map<string, Queue>();

  getQueue(name: string): Queue {
    if (!this.queues.has(name)) {
      this.queues.set(name, createQueue(name));
    }
    return this.queues.get(name)!;
  }

  async closeAll() {
    for (const queue of this.queues.values()) {
      await queue.close();
    }
    this.queues.clear();
  }
}

这样既可以复用实例,又可以在需要时清理,还便于测试。

类型安全是边界的保护

工厂函数不仅是代码复用,更是类型约束的入口。

在 TypeScript 中,你可以为工厂函数添加类型定义:

import { Queue, QueueOptions } from 'bullmq';

export type QueueName = 'email' | 'image' | 'report';

export function createQueue<T = any>(
  name: QueueName,
  options?: Partial<QueueOptions>
): Queue<T> {
  return new Queue<T>(name, {
    connection: defaultConnection,
    ...options,
  });
}

这样做有两个好处:

1. 队列名称被限制在有效范围内

const validQueue = createQueue('email');     // ✅ 通过
const invalidQueue = createQueue('unknown'); // ❌ 类型错误

拼写错误、无效队列名,在编译时就被捕获,而不是等到运行时才报错。

2. 任务数据可以有类型约束

interface EmailJob {
  to: string;
  subject: string;
  body: string;
}

const emailQueue = createQueue<EmailJob>('email');

await emailQueue.add('send', {
  to: 'user@example.com',
  subject: 'Welcome',
  body: 'Hello!',
});

// ❌ 类型错误:缺少必需字段
await emailQueue.add('send', {
  to: 'user@example.com',
});

类型系统在编译时就告诉你:这个任务数据不完整。

**类型不是束缚,而是边界的保护。**它让错误在最早的时刻被发现,而不是等到生产环境才暴露。

依赖注入让测试成为可能

工厂函数的另一个价值是:它可以被替换。

在生产环境中,你使用真实的 Redis 连接。但在测试环境中,你不想依赖外部服务。你希望队列操作是可预测的、快速的、隔离的。

如果队列的创建散落在各处,你无法控制它们的行为。但如果所有队列都通过工厂函数创建,你就可以在测试时注入不同的实现。

// test-helpers.ts
import { Queue } from 'bullmq';

export function createTestQueue(name: string) {
  return new Queue(name, {
    connection: {
      host: 'localhost',
      port: 6380,  // 测试用的独立 Redis 实例
    },
  });
}

或者,使用内存队列:

import { Queue } from 'bullmq';
import IORedis from 'ioredis-mock';

export function createMockQueue(name: string) {
  const connection = new IORedis();
  return new Queue(name, { connection });
}

工厂函数是依赖的抽象层。

它让你可以在不改变业务代码的前提下,替换底层实现。这是可测试性的基础。

工厂模式本身就是依赖注入的一种形式

依赖注入的核心思想是:不要在代码内部直接创建依赖,而是从外部传入。

工厂函数实现了这一点:

// ❌ 硬编码依赖,难以测试
class EmailService {
  private queue = new Queue('email', { /* ... */ });

  async send(email: EmailJob) {
    await this.queue.add('send', email);
  }
}

// ✅ 通过工厂函数注入依赖
class EmailService {
  constructor(private queueFactory: QueueFactory) {}

  async send(email: EmailJob) {
    const queue = this.queueFactory.create('email');
    await queue.add('send', email);
  }
}

测试时,你可以注入 mock 工厂:

const mockFactory = {
  create: jest.fn().mockReturnValue(mockQueue),
};

const service = new EmailService(mockFactory);

这让单元测试不再依赖真实的 Redis,测试更快、更稳定。

延迟初始化是对资源的尊重

并非所有队列都需要立即创建。

如果你在应用启动时创建了所有队列,即使有些队列在当前场景下不会被用到,它们也占用了连接资源。

工厂函数支持延迟初始化:只在需要时才创建队列。

class QueueFactory {
  private queues = new Map<QueueName, Queue>();

  get(name: QueueName): Queue {
    if (!this.queues.has(name)) {
      this.queues.set(name, this.create(name));
    }
    return this.queues.get(name)!;
  }

  private create(name: QueueName): Queue {
    return new Queue(name, {
      connection: defaultConnection,
    });
  }
}

现在,队列只在第一次被使用时创建。如果某个功能模块在运行时从未被触发,对应的队列也不会被创建。

这是对资源的尊重。

不浪费连接,不占用内存,只在需要时才分配。这在资源受限的环境中尤其重要。

清理是生命周期的终点

创建容易,清理难。

当应用关闭时,你需要关闭所有队列连接。如果队列实例散落在各处,你无法追踪哪些需要关闭。

工厂函数可以记录所有创建的实例,并提供统一的清理接口:

class QueueFactory {
  private queues = new Map<QueueName, Queue>();

  get(name: QueueName): Queue {
    // ... 创建逻辑
  }

  async closeAll(): Promise<void> {
    const closingPromises = Array.from(this.queues.values()).map(
      queue => queue.close()
    );
    await Promise.all(closingPromises);
    this.queues.clear();
  }
}

在应用关闭时,调用 closeAll() 即可:

process.on('SIGTERM', async () => {
  await queueFactory.closeAll();
  process.exit(0);
});

工厂不仅管理创建,也管理销毁。

这是完整生命周期的体现。创建和销毁是对称的,工厂函数让这种对称性得以实现。

💡 Click the maximize icon to view in fullscreen

配置的分层

不是所有队列都需要相同的配置。

邮件队列可能需要较低的并发,因为外部 SMTP 服务有速率限制。图片处理队列可能需要较高的并发,因为处理是在本地进行的。

工厂函数可以支持配置覆盖:

export function createQueue<T = any>(
  name: QueueName,
  options?: Partial<QueueOptions>
): Queue<T> {
  return new Queue<T>(name, {
    connection: defaultConnection,
    defaultJobOptions: {
      attempts: 3,
      backoff: {
        type: 'exponential',
        delay: 1000,
      },
    },
    ...options,  // 调用方可以覆盖默认配置
  });
}

使用时:

// 使用默认配置
const emailQueue = createQueue('email');

// 覆盖部分配置
const imageQueue = createQueue('image', {
  defaultJobOptions: {
    attempts: 5,  // 图片处理可以多重试几次
  },
});

默认配置是通用的,覆盖配置是特殊的。

这种分层让配置既有一致性,又有灵活性。大部分队列遵循默认规则,特殊场景可以定制。

工厂不是银弹

工厂函数解决了很多问题,但也引入了抽象层。

多一层抽象,就多一层间接。阅读代码时,你需要跳转到工厂函数才能看到队列是如何创建的。调试时,你需要理解工厂的逻辑,而不仅仅是队列本身。

如果你的应用只有一两个队列,工厂函数可能是过度设计。直接创建队列,代码更直观。

但当队列数量增长,当配置开始重复,当测试变得困难,工厂函数的价值就显现了。

不要为了模式而模式。

工厂函数的价值不在于它是一个"设计模式",而在于它解决了实际问题:重复配置、难以测试、资源管理、类型安全。

如果这些问题不存在,工厂函数就是多余的。如果这些问题存在,工厂函数就是必要的。

抽象即边界

工厂函数的本质,是在"使用队列"和"创建队列"之间划了一条线。

使用队列的代码不关心连接配置、不关心实例化逻辑、不关心资源管理。它只需要知道:我需要一个队列,给我即可。

创建队列的逻辑集中在工厂函数中。配置在这里,默认选项在这里,清理逻辑也在这里。

这是职责的分离。

使用者专注于业务逻辑,工厂专注于实例管理。这种分离不是为了炫技,而是为了让系统更清晰、更易维护。

当系统变得复杂时,清晰的边界是唯一的防线。没有边界,所有逻辑混在一起,最终会变得难以理解、难以修改、难以测试。

工厂函数划定了一个边界,让创建和使用各归其位。

在一个充满变化的世界里,边界即秩序。


工厂函数应该是函数还是类?函数式编程和面向对象编程在这里有何取舍?

如果队列需要根据环境动态选择实现(开发环境用内存队列,生产环境用 Redis),工厂函数如何设计?

工厂函数可以嵌套吗?比如有一个"任务工厂"创建不同类型的任务,它内部使用"队列工厂"获取队列实例?

如何为工厂函数添加可观测性?创建了多少实例、哪些队列被使用、资源占用情况,这些信息如何收集和暴露?

工厂模式在微服务架构中如何应用?不同服务是否应该共享工厂逻辑,还是各自实现?


参考资源

Related Posts

Articles you might also find interesting

BullMQ 队列

3 min read

队列不是技术选型,而是对时间的承认,对顺序的尊重,对不确定性的应对

系统设计异步处理
Read More

BullMQ Worker

2 min read

Worker 的本质是对时间的重新分配,是对主线的解放,也是对专注的追求

系统设计异步处理
Read More

单例模式管理 Redis 连接

5 min read

连接不是技术细节,而是系统与外部世界的第一次握手,是可靠性的起点

系统设计后端架构
Read More

适配器模式:对现实的妥协

4 min read

当 PayPro 要求 IP 白名单而 Stripe 不需要,当一个按秒计费另一个按请求计费,架构设计不是消除约束——而是管理约束。适配器模式不是优雅设计,而是对现实混乱的务实投降。

系统架构支付集成
Read More

管理后台需要两次设计

3 min read

第一次设计回答"发生了什么",第二次设计回答"我能做什么"。在第一次就试图解决所有问题,结果是功能很多但都不够深入。

系统设计API 设计
Read More

告警分级与响应时间

2 min read

不是所有问题都需要立即响应。RPC失败会在凌晨3点叫醒人。安全事件每15分钟检查一次。支付成功只记录,不告警。系统的响应时间应该匹配问题的紧急程度。

系统设计监控
Read More

文档标准是成本计算的前提

3 min read

API文档不只是写给开发者看的。它定义了系统的边界、成本结构和可维护性。统一的文档标准让隐性成本变得可见。

API文档
Read More

配置不会自动同步

2 min read

视频生成任务永远pending,代码完美部署,队列正确配置。问题不在代码,在于配置的独立性被低估。静默失败比错误更危险。

部署配置管理
Read More

CRUD 操作

2 min read

四个字母背后,是数据的生命周期,是权限的边界,也是系统设计的基础逻辑

系统设计软件工程
Read More

数据库参数国际化:从 13 个迁移学到的设计原则

3 min read

数据不该懂语言。当数据库参数嵌入中文标签时,系统的边界就被语言限制了。这篇文章从 13 个参数对齐迁移中提炼出设计原则——国际化不是功能,是系统设计的底层约束。

数据库国际化
Read More

Stripe Webhook中的防御性编程

2 min read

三个Bug揭示的真相:假设是代码中最危险的东西。API返回类型、环境配置、变量作用域——每个看似合理的假设都可能导致客户损失。

Web开发系统设计
Read More

双重验证:Stripe生产模式的防御性切换

7 min read

从测试到生产不是更换API keys,而是建立一套双重验证系统。每一步都在两个环境中验证,确保真实支付不会因假设而失败。

系统设计Stripe
Read More

错误隔离

3 min read

失败是必然的。真正的问题不是失败本身,而是失败如何蔓延。错误隔离不是为了消除失败,而是为了控制失败的范围。

系统设计可靠性工程
Read More

在运行的系统上生长新功能

3 min read

扩展不是推倒重来,而是理解边界,找到生长点。管理层作为观察者和调节器,附着在核心系统上,监测它,影响它,但不改变它的运行逻辑。

系统设计架构
Read More

实现幂等性处理,忽略已处理的任务

3 min read

在代码层面识别和忽略已处理的任务,不是简单的布尔检查,而是对时序、并发和状态的深刻理解

系统设计并发控制
Read More

引入懒加载模式

1 min read

懒加载不是优化技巧,而是关于时机的选择。何时创建,决定了系统的效率和复杂度。

软件设计性能优化
Read More

缺失值的级联效应

3 min read

一个NULL值如何在调用链中传播,最终导致错误的错误消息。理解防御层的设计,在失败传播前拦截。

系统设计防御性编程
Read More

监控观察期法

3 min read

部署不是结束,而是验证的开始。修复代码只是假设,监控数据才是证明。48小时观察期:让错误主动暴露,让数据证明修复。

系统设计监控
Read More

Props Drilling

3 min read

数据在组件树中层层传递,每一层都不需要它,却必须经手。这不是技术债,是架构对真实需求的误解。

React组件设计
Read More

监听 Redis 连接事件 - 让不可见的脆弱变得可见

4 min read

连接看起来应该是透明的。但当它断开时,你才意识到透明不等于可靠。监听不是多余,而是对脆弱性的承认。

系统设计Redis
Read More

资源不会消失,只会泄露

2 min read

在积分系统中,用户的钱不会凭空消失,但会因为两个时间窗口而泄露:并发请求之间的竞争,和回调永不到达的沉默。

系统设计并发控制
Read More

RPC函数的原子化处理

1 min read

当一个远程函数做太多事情,失败就变得难以理解

分布式系统RPC
Read More

RPC函数

2 min read

关于远程过程调用的本质思考:当你试图让远方看起来像眼前

分布式系统RPC
Read More

使用Secret Token验证回调请求的合法性

2 min read

在开放的网络中,信任不能被假设。Secret Token 是对身份的确认,对伪装的识别,也是对安全边界的坚守

Web 安全系统设计
Read More

第三方回调的状态映射完整性

5 min read

KIE.AI 视频生成的三个修复揭示了一个本质问题:回调不只是接收结果,是建立状态映射。没有 vendor_task_id,系统就失去了追溯能力。

Purikura 项目系统设计
Read More