实现幂等性处理,忽略已处理的任务
识别已处理的任务
忽略已处理的任务,听起来只需要一个简单的检查:任务处理过了吗?处理过就跳过,没处理过就执行。
但这个"简单检查"藏着复杂性。
问题不在于检查本身,而在于如何确定一个任务真的处理过了。
你可以用一个布尔字段 processed 来标记。但这个字段什么时候设为 true?执行操作之前还是之后?如果在执行前设为 true,操作失败了怎么办?如果在执行后设为 true,并发请求怎么办?
💡 Click the maximize icon to view in fullscreen
两个请求同时检查,都发现任务未处理,于是都去执行。检查没有意义。
这不是实现的问题,而是时序的问题。检查和执行之间存在时间窗口,这个窗口里可能发生任何事情。
原子性是关键
要真正忽略已处理的任务,检查和标记必须是原子的。
要么你成功标记了这个任务为"处理中",然后去执行;要么标记失败,说明别人已经在处理了,你直接返回。
不能先检查再标记。检查和标记必须在同一个不可分割的操作里完成。
数据库的原子操作可以做到这一点:
-- 尝试将未处理的任务标记为处理中
UPDATE tasks
SET status = 'processing'
WHERE task_id = ?
AND status = 'pending'
这个 UPDATE 语句是原子的。如果 status 已经不是 pending 了,UPDATE 不会成功。你可以检查 affected_rows,如果是 0,说明任务已经被处理或正在被处理。如果是 1,说明你成功获得了处理权。
这是一种乐观锁的思路。不要先问能不能做,直接去做,看做成了没有。
不要写成这样:
if (task.status === 'pending') {
task.status = 'processing';
await processTask(task);
}
而要写成这样:
const updated = await db.update({
where: { id: task.id, status: 'pending' },
data: { status: 'processing' }
});
if (updated.count === 0) {
// 任务已被处理,直接返回
return;
}
await processTask(task);
检查和标记在数据库层面原子完成。
状态标记的选择
用什么来标记任务已处理?
最简单的是布尔值 processed。但布尔值只有两个状态:处理过和没处理过。它无法区分"正在处理"和"处理完成"。
如果一个任务正在被处理,但处理到一半系统崩溃了,这个任务应该怎么办?如果标记是 processed = true,它会被永久忽略。如果标记是 processed = false,它会被重复执行。
你需要更细的粒度。
一个状态字段可以有多个值:pending、processing、completed、failed。
pending:等待处理processing:正在处理completed:处理成功failed:处理失败
有了这些状态,你可以做更智能的判断。
completed 的任务应该被忽略。processing 的任务要看情况——如果它处于这个状态太久了,可能是之前的处理者崩溃了,应该重新处理。failed 的任务可能需要重试。
状态不只是标记,它携带了信息。它告诉你这个任务经历了什么,现在处于什么情况。
💡 Click the maximize icon to view in fullscreen
时间戳的作用
状态字段告诉你任务处于什么阶段,但它不告诉你任务在这个阶段待了多久。
这就是时间戳的价值。
updated_at 记录了状态最后一次变更的时间。如果一个任务的状态是 processing,但 updated_at 是 10 分钟前,那很可能之前的处理者已经挂了。
你可以写一个定时任务,把这些"卡住"的任务重置为 pending,让它们重新被处理。
UPDATE tasks
SET status = 'pending'
WHERE status = 'processing'
AND updated_at < NOW() - INTERVAL 5 MINUTES
这是一种自愈机制。系统不需要人工介入,就能从异常状态中恢复。
但要注意,这个超时时间不能太短。如果一个任务真的需要处理 5 分钟,你把超时设成 3 分钟,它会被反复重置,永远处理不完。
超时时间应该基于正常处理时间的最大值,而不是平均值。
即使用了原子操作和状态机,仍然可能出现边界情况:
场景 1:处理者慢,超时重置
进程 A 正在处理任务,但很慢。超时任务把状态重置为 pending。进程 B 获取到这个任务,开始处理。现在两个进程都在处理同一个任务。
解决方案:
- 在处理过程中定期更新
updated_at(心跳) - 或者用分布式锁而不是数据库状态
场景 2:状态更新失败
任务处理成功了,但更新状态为 completed 时数据库连接断了。任务会被重新处理。
解决方案:
- 确保操作本身是幂等的(参考 idempotency-check)
- 或者使用事务,确保操作和状态更新一起成功或失败
唯一标识与去重
有时状态字段不够,你需要更明确的标识来判断重复。
比如消息队列。同一个消息可能因为网络问题被投递多次。你不能依赖消息队列的去重机制,因为它可能不存在或不可靠。
你需要给每个消息一个唯一的 ID,然后在处理时检查这个 ID 是否已经处理过。
async function processMessage(message) {
const messageId = message.id;
// 尝试插入消息 ID
try {
await db.insert({
table: 'processed_messages',
data: { message_id: messageId, processed_at: new Date() }
});
} catch (error) {
if (error.code === 'UNIQUE_VIOLATION') {
// 消息已处理,直接返回
return;
}
throw error;
}
// 执行实际处理
await handleMessage(message);
}
这里的关键是先插入 ID,再处理消息。插入失败说明已处理过,直接返回。插入成功才去处理。
这个模式依赖数据库的唯一约束。它把"是否处理过"的判断交给了数据库,而不是应用层的逻辑。
数据库的约束是可靠的,应用层的逻辑是脆弱的。把关键决策放在可靠的层面。
清理的必要性
无论是状态标记还是消息 ID,它们都会积累。
一个高流量的系统,每天可能处理百万级的任务或消息。如果你永久保存所有的处理记录,存储会成为问题,查询也会变慢。
你需要定期清理已完成的记录。
但清理多久之前的记录?这取决于你的重复窗口有多大。
如果消息队列的重试策略是"失败后每隔 1 分钟重试一次,最多重试 10 次",那么一个消息最多会在 10 分钟内重复。你可以安全地删除 1 小时前的处理记录。
如果系统有人工重试,或者有延迟很久的异步通知,你可能需要保存更久——比如 7 天或 30 天。
清理策略反映了你对系统行为的理解。 你知道重复会在什么时间范围内发生,你就知道记录需要保存多久。
如果处理记录非常多,可以考虑用数据库分区表按时间分区:
-- 按日期分区
CREATE TABLE processed_tasks (
task_id VARCHAR(255),
processed_at TIMESTAMP,
...
) PARTITION BY RANGE (processed_at);
-- 定期删除旧分区
DROP TABLE processed_tasks_2024_10;
删除整个分区比逐行删除快得多,对系统影响也小。
不只是防止重复
忽略已处理的任务,表面上是为了防止重复执行。
但更深层的意义是:让系统对混乱有韧性。
消息会重复,网络会超时,进程会崩溃。如果系统假设一切正常,它会在异常情况下崩溃。如果系统预期混乱,它会在混乱中保持稳定。
实现幂等性处理,就是在告诉系统:重复是正常的,失败是常态,不要惊慌,知道如何应对。
这不只是一个技术细节,而是一种设计哲学。
你不是在构建一个只能在完美环境下运行的系统,你是在构建一个能在不完美环境下可靠运行的系统。
参考与延伸
这篇文章侧重实现层面的细节。如果你想了解幂等性的概念和哲学思考,可以阅读 idempotency-check。
关于分布式系统中的并发控制,rpc-atomic-operations 讨论了原子操作的重要性。
Related Posts
Articles you might also find interesting
文档标准是成本计算的前提
API文档不只是写给开发者看的。它定义了系统的边界、成本结构和可维护性。统一的文档标准让隐性成本变得可见。
CRUD 操作
四个字母背后,是数据的生命周期,是权限的边界,也是系统设计的基础逻辑
错误隔离
失败是必然的。真正的问题不是失败本身,而是失败如何蔓延。错误隔离不是为了消除失败,而是为了控制失败的范围。
资源不会消失,只会泄露
在积分系统中,用户的钱不会凭空消失,但会因为两个时间窗口而泄露:并发请求之间的竞争,和回调永不到达的沉默。
幂等性检查
在不确定的系统中,幂等性检查是对重复的容忍,是对稳定性的追求,也是对失败的预期与接纳
管理后台需要两次设计
第一次设计回答"发生了什么",第二次设计回答"我能做什么"。在第一次就试图解决所有问题,结果是功能很多但都不够深入。
告警分级与响应时间
不是所有问题都需要立即响应。RPC失败会在凌晨3点叫醒人。安全事件每15分钟检查一次。支付成功只记录,不告警。系统的响应时间应该匹配问题的紧急程度。
API 测试各种边界情况
边界情况是系统最脆弱的地方,也是最容易被忽略的地方。测试边界情况不是为了追求完美,而是为了理解系统的真实边界。
BullMQ 队列
队列不是技术选型,而是对时间的承认,对顺序的尊重,对不确定性的应对
BullMQ Worker
Worker 的本质是对时间的重新分配,是对主线的解放,也是对专注的追求
配置不会自动同步
视频生成任务永远pending,代码完美部署,队列正确配置。问题不在代码,在于配置的独立性被低估。静默失败比错误更危险。
数据库参数国际化:从 13 个迁移学到的设计原则
数据不该懂语言。当数据库参数嵌入中文标签时,系统的边界就被语言限制了。这篇文章从 13 个参数对齐迁移中提炼出设计原则——国际化不是功能,是系统设计的底层约束。
Stripe Webhook中的防御性编程
三个Bug揭示的真相:假设是代码中最危险的东西。API返回类型、环境配置、变量作用域——每个看似合理的假设都可能导致客户损失。
让文档跟着代码走
文档过时是熵增的必然。对抗衰败的方法不是更频繁的手工维护,而是让文档"活"起来——跟随代码自动更新。三种文档形态,三种生命周期。
双重验证:Stripe生产模式的防御性切换
从测试到生产不是更换API keys,而是建立一套双重验证系统。每一步都在两个环境中验证,确保真实支付不会因假设而失败。
端到端 Postback 模拟测试
真实的测试不是模拟完美的流程,而是重现真实世界的混乱。Postback 测试的价值在于发现系统在不确定性中的表现。
在运行的系统上生长新功能
扩展不是推倒重来,而是理解边界,找到生长点。管理层作为观察者和调节器,附着在核心系统上,监测它,影响它,但不改变它的运行逻辑。
单例模式管理 Redis 连接
连接不是技术细节,而是系统与外部世界的第一次握手,是可靠性的起点
缺失值的级联效应
一个NULL值如何在调用链中传播,最终导致错误的错误消息。理解防御层的设计,在失败传播前拦截。
监控观察期法
部署不是结束,而是验证的开始。修复代码只是假设,监控数据才是证明。48小时观察期:让错误主动暴露,让数据证明修复。
Props Drilling
数据在组件树中层层传递,每一层都不需要它,却必须经手。这不是技术债,是架构对真实需求的误解。
队列生产者实例的工厂函数
工厂函数不是设计模式的炫技,而是对重复的拒绝,对集中管理的追求,对变化的准备
监听 Redis 连接事件 - 让不可见的脆弱变得可见
连接看起来应该是透明的。但当它断开时,你才意识到透明不等于可靠。监听不是多余,而是对脆弱性的承认。
RPC函数的原子化处理
当一个远程函数做太多事情,失败就变得难以理解
RPC函数
关于远程过程调用的本质思考:当你试图让远方看起来像眼前