点击上方“芋道源码”,选择“设为星标”
管她前浪,还是后浪?
能浪的浪,才是好浪!
每天 10:33 更新文章,每天掉亿点点头发...
源码精品专栏
- 原创 | Java 2021 超神之路,很肝~
- 中文详细注释的开源项目
- RPC 框架 Dubbo 源码解析
- 网络应用框架 Netty 源码解析
- 消息中间件 RocketMQ 源码解析
- 数据库中间件 Sharding-JDBC 和 MyCAT 源码解析
- 作业调度中间件 Elastic-Job 源码解析
- 分布式事务中间件 TCC-Transaction 源码解析
- Eureka 和 Hystrix 源码解析
- Java 并发源码
来源:www.cnblogs.com/Finley/
p/16400287.html
- 原理详解
- pending2ReadyScript
- ready2UnackScript
- unack2RetryScript
- ack
- consume
我们先看看以下业务场景:
- 当订单一直处于未支付状态时,如何及时的关闭订单,并退还库存?
- 新创建店铺,N天内没有上传商品,系统如何知道该信息,并发送激活短信?
上述场景最简单直接的解决方案是定时扫表。我们假设 10 分钟未支付则关闭订单、定时任务设置为 5 分钟一次,那么一个订单最晚会在 15 分钟关闭。高达 5 分钟的误差是业务难以接受的。另一方面频繁的扫表可能消耗过多数据库资源,影响线上交易吞吐量。
此外还有朋友使用 Redis 的过期通知、时间轮、Java 的 DelayQueue 等方式实现延时任务。我们在之前的文章中讨论过他们的缺陷:比如使用 Redis 过期通知不保证准时、发送即忘不保证送达,时间轮缺乏持久化机制容易丢失等。
总结一下,我们对于延时队列的要求有下列几条(从重要到不重要排列):
- 持久化: 服务重启或崩溃不能丢失任务
- 确认重试机制: 任务处理失败或超时应该有重试
- 定时尽量精确
最合适的解决方案是使用 Pulsa、RocketMQ 等专业消息队列的延时投递功能。不过引入新的中间件通常存在各种非技术方面的麻烦。Redis 作为广泛使用的中间件,何不用 Redis 来制作延时队列呢?
使用有序集合结构实现延时队列的方法已经广为人知,无非是将消息作为有序集合的 member 投递时间戳作为 score,使用 zrangebyscore 命令搜索已到投递时间的消息然后将其发给消费者。
除了基本的延时投递之外我们的消息队列具有下列优势:
- 提供 ACK 和重试机制
- 只需要 Redis 和消费者即可运行,无需其它组件
- 提供 At-Least-One 投递语义、并保证消息不会并发消费
本文的完整代码实现在hdt3213/delayqueue,可以直接 go get github.com/hdt3213/delayqueue
完成安装。
具体使用也非常简单,只需要注册处理消息的回调函数并调用 start() 即可:
代码语言:javascript复制package main
import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
)
func main() {
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
// 注册处理消息的回调函数
// 返回 true 表示已成功消费,返回 false 消息队列会重新投递次消息
return true
})
// 发送延时消息
for i := 0; i < 10; i {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
if err != nil {
panic(err)
}
}
// 启动消费协程
done := queue.StartConsume()
// 阻塞等待消费协程退出
<-done
}
由于数据存储在 redis 中所以我们最多能保证在 redis 无故障且消息队列相关 key 未被外部篡改的情况下不会丢失消息。
原理详解
消息队列涉及几个关键的 redis 数据结构:
- msgKey: 为了避免两条内容完全相同的消息造成意外的影响,我们将每条消息放到一个字符串类型的键中,并分配一个 UUID 作为它的唯一标识。其它数据结构中只存储 UUID 而不存储完整的消息内容。每个 msg 拥有一个独立的 key 而不是将所有消息放到一个哈希表是为了利用 TTL 机制避免
- pendingKey: 有序集合类型,member 为消息 ID, score 为投递时间的 unix 时间戳。
- readyKey: 列表类型,需要投递的消息 ID。
- unAckKey: 有序集合类型,member 为消息 ID, score 为重试时间的 unix 时间戳。
- retryKey: 列表类型,已到重试时间的消息 ID
- garbageKey: 集合类型,用于暂存已达重试上线的消息 ID
- retryCountKey: 哈希表类型,键为消息 ID, 值为剩余的重试次数
流程如下图所示:
由于我们允许分布式地部署多个消费者,每个消费者都在定时执行 lua 脚本,所以多个消费者可能处于上述流程中不同状态,我们无法预知(或控制)上图中五个操作发生的先后顺序,也无法控制有多少实例正在执行同一个操作。
因此我们需要保证上图中五个操作满足三个条件:
- 都是原子性的
- 不会重复处理同一条消息
- 操作前后消息队列始终处于正确的状态
只要满足这三个条件,我们就可以部署多个实例且不需要使用分布式锁等技术来进行状态同步。
是不是听起来有点吓人?