大家好,又见面了,我是你们的朋友全栈君。
业务背景: 后台定时任务刷新Redis的数据到数据库中,有多台机器开启了此定时同步的任务,但是需要其中一台工作,其他的作为备用,提高可用性。使用Redis分布式锁进行限制,拿到锁的机器去执行具体业务,拿不到锁的继续轮询。
分布式锁原理 分布式锁:当多个进程不在同一个系统中,多个进程共同竞争同一个资源,用分布式锁控制多个进程对资源的互斥访问。采用Redis服务器存储锁信息(即SET一个Key表示已加锁),可以实现多进程的并发读锁的状态,如果没有锁,则只允许一个进程加锁。
Redis分布式锁实现的关键点:
问题 | 问题描述 | 解决方案 |
---|---|---|
互斥性 | 保证只有一个client可以获取资源 | 加锁 |
原子性 | 如果锁不存在则执行加锁操作,必须是原子性操作 | 原子性命令或者执行Lua脚本 |
避免死锁 | 当拿到锁的Client因宕机或网络原因断线后,如果锁不能释放就会产生死锁 | 为锁加超时时间 |
锁超时时间设定 | 锁超时时间到了,业务没执行完问题 | 心跳线程,不断更新锁超时时间 |
锁的所属权 | 解铃还需系铃人,加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了 | Client 与锁进行一一对应,使用UUID作为锁的值 |
自动重连 | 网络故障导致Client连接Redis 失败的情况,网络恢复后可以自动重连 | 轮询 |
实现方案 方案一:采用Redis的原子性命令“SET key value EX expire-time NX”可以实现分布式锁的基本功能,其中的NX(Not Exist)即判断是否已存在锁,如果不存在key则可进行操作,SET key value 等同于加锁,EX expire-time即设置超时时间,可以避免死锁,但是超时时间的设置需要根据具体业务设置一个合理的经验值,避免锁超时时间到了,业务没执行完的问题。
方案二:采用Lua脚本实现,Redis会将整个脚本作为一个整体执行,因此Lua脚本可以实现原子性操作。相较于方案一,此处增加了心跳线程,不断更新锁超时时间,解决锁超时时间设置不合理的问题;生成UUID(或者是随机数字符串)作为锁的值,用于保证锁与Client的一一对应;采用轮询来实现断线自动重连。
实现方案1:SET EX NX 加锁流程图:
定义锁的变量名为lock,那么对应Redis命令: 判断是否加锁的命令:GET lock 加锁的命令:SET lock 设置超时时间的命令:EXPIRE expire-time 三条命令分开执行是不具有原子性的,比如可能会出现一个进程执行GET lock得到的结果为nil即尚未加锁,在其执行SET lock前另一个进程也执行了SET lock,导致两个进程都认为是可以加锁的,失去互斥性。 因此判断尚未加锁、加锁、设置超时时间必须原子操作,使用Redis的命令“SET key value EX expire-time NX”可以实现该原子操作。
代码语言:javascript复制package main
import (
"fmt"
"time"
"github.com/gomodule/redigo/redis"
)
func main() {
rds, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
fmt.Println("Connect to redis error", err)
return
}
defer rds.Close()
for true {
//检查是否有所与加锁必须是原子性操作
result, err := rds.Do("SET", "lock", 1, "EX", 5, "NX")
if err != nil {
fmt.Println("redis set error.", err)
return
}
result, err = redis.String(result, err)
// 加锁失败,继续轮询
if result != "OK" {
fmt.Println("SET lock failed.")
time.Sleep(5 * time.Second)
continue
}
// 加锁成功
fmt.Println("work begining...")
// 此处处理业务
fmt.Println("work end");
// 业务处理结束后释放锁
result, err := rds.Do("del", "lock")
break;
}
}
此方法弊端是对超时时间的设置有要求,需要根据具体业务设置一个合理的经验值,避免锁超时时间到了,业务没执行完的问题。
实现方案2:Lua脚本
代码语言:javascript复制package main
import (
"fmt"
"time"
"github.com/satori/go.uuid"
"github.com/gomodule/redigo/redis"
)
var uuidClient uuid.UUID
const (
SCRIPT_LOCK = ` local res=redis.call('GET', KEYS[1]) if res then return 0 else redis.call('SET',KEYS[1],ARGV[1]); redis.call('EXPIRE',KEYS[1],ARGV[2]) return 1 end `
SCRIPT_EXPIRE = ` local res=redis.call('GET', KEYS[1]) if not res then return -1 end if res==ARGV[1] then redis.call('EXPIRE', KEYS[1], ARGV[2]) return 1 else return 0 end `
SCRIPT_DEL = ` local res=redis.call('GET', KEYS[1]) if not res then return -1 end if res==ARGV[1] then redis.call('DEL', KEYS[1]) else return 0 end `
)
func ResetExpire() {
fmt.Println("Reset expire begin...")
rds, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
fmt.Println("Connect to redis server error.", err)
return
}
for true {
luaExpire := redis.NewScript(1, SCRIPT_EXPIRE)
result, err := redis.Int(luaExpire.Do(rds, "lock", uuidClient.String(), 5))
if err != nil {
fmt.Println("luaExpire exec error", err)
break
}
if result != 1 {
fmt.Println("Reset expire failed.")
break
} else {
fmt.Println("Reset expire succeed.")
}
time.Sleep(3 * time.Second)
}
fmt.Println("Reset expire end.")
}
func main() {
for true {
rds, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
fmt.Println("Connect to redis server error.", err)
time.Sleep(5 * time.Second)
continue
}
defer rds.Close()
// 生成UUID标识锁与Client的对应关系
uuidClient,err = uuid.NewV4() //也可以生成随机数字符串来代替
if err != nil {
fmt.Println("New uuid error.", err)
return
}
luaLock := redis.NewScript(1, SCRIPT_LOCK)
luaDel:= redis.NewScript(1, SCRIPT_DEL)
for true {
result, err := redis.Int(luaLock.Do(rds, "lock", uuidClient.String(), 5))
if err != nil {
fmt.Println("luaLock exec error.", err)
break
}
if result == 0 {
fmt.Println("Set lock failed.")
time.Sleep(5 * time.Second)
continue
}
fmt.Println("Set lock succeed.")
go ResetExpire()
// 加锁成功
fmt.Println("work begining...")
// 此处处理业务
fmt.Println("work end");
// 业务处理结束后释放锁
result, err = redis.Int(luaDel.Do(rds, "lock", uuidClient.String()))
return
}
}
}
Redis采用Lua脚本可以执行更多的个性化的原子操作,在我项目中就采用这种容错性更高的方式。
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/128965.html原文链接:https://javaforall.cn