redis分布式锁原理面试(数据库索引用的什么数据结构)

2022-07-30 10:40:29 浏览数 (1)

大家好,又见面了,我是你们的朋友全栈君。

业务背景: 后台定时任务刷新Redis的数据到数据库中,有多台机器开启了此定时同步的任务,但是需要其中一台工作,其他的作为备用,提高可用性。使用Redis分布式锁进行限制,拿到锁的机器去执行具体业务,拿不到锁的继续轮询。

分布式锁原理 分布式锁:当多个进程不在同一个系统中,多个进程共同竞争同一个资源,用分布式锁控制多个进程对资源的互斥访问。采用Redis服务器存储锁信息(即SET一个Key表示已加锁),可以实现多进程的并发读锁的状态,如果没有锁,则只允许一个进程加锁。

Redis分布式锁实现的关键点:

问题

问题描述

解决方案

互斥性

保证只有一个client可以获取资源

加锁

原子性

如果锁不存在则执行加锁操作,必须是原子性操作

原子性命令或者执行Lua脚本

避免死锁

当拿到锁的Client因宕机或网络原因断线后,如果锁不能释放就会产生死锁

为锁加超时时间

锁超时时间设定

锁超时时间到了,业务没执行完问题

心跳线程,不断更新锁超时时间

锁的所属权

解铃还需系铃人,加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了

Client 与锁进行一一对应,使用UUID作为锁的值

自动重连

网络故障导致Client连接Redis 失败的情况,网络恢复后可以自动重连

轮询

实现方案 方案一:采用Redis的原子性命令“SET key value EX expire-time NX”可以实现分布式锁的基本功能,其中的NX(Not Exist)即判断是否已存在锁,如果不存在key则可进行操作,SET key value 等同于加锁,EX expire-time即设置超时时间,可以避免死锁,但是超时时间的设置需要根据具体业务设置一个合理的经验值,避免锁超时时间到了,业务没执行完的问题。

方案二:采用Lua脚本实现,Redis会将整个脚本作为一个整体执行,因此Lua脚本可以实现原子性操作。相较于方案一,此处增加了心跳线程,不断更新锁超时时间,解决锁超时时间设置不合理的问题;生成UUID(或者是随机数字符串)作为锁的值,用于保证锁与Client的一一对应;采用轮询来实现断线自动重连。

实现方案1:SET EX NX 加锁流程图:

定义锁的变量名为lock,那么对应Redis命令: 判断是否加锁的命令:GET lock 加锁的命令:SET lock 设置超时时间的命令:EXPIRE expire-time 三条命令分开执行是不具有原子性的,比如可能会出现一个进程执行GET lock得到的结果为nil即尚未加锁,在其执行SET lock前另一个进程也执行了SET lock,导致两个进程都认为是可以加锁的,失去互斥性。 因此判断尚未加锁、加锁、设置超时时间必须原子操作,使用Redis的命令“SET key value EX expire-time NX”可以实现该原子操作。

代码语言:javascript复制
package main

import (
 "fmt"
 "time"
 "github.com/gomodule/redigo/redis"
)

func main() { 
   
    rds, err := redis.Dial("tcp", "127.0.0.1:6379")
    if err != nil { 
   
        fmt.Println("Connect to redis error", err)
        return
    }   
    defer rds.Close()
    
    for true { 
   
    	//检查是否有所与加锁必须是原子性操作
        result, err := rds.Do("SET", "lock", 1, "EX", 5, "NX")	
        if err != nil { 
   
            fmt.Println("redis set error.", err)
            return
        }
        result, err = redis.String(result, err)
        // 加锁失败,继续轮询
		if result != "OK" { 
   
            fmt.Println("SET lock failed.")
            time.Sleep(5 * time.Second)
            continue
        }
		// 加锁成功
        fmt.Println("work begining...")
        // 此处处理业务
        fmt.Println("work end");
        // 业务处理结束后释放锁
        result, err := rds.Do("del", "lock")
        break;
    }
}

此方法弊端是对超时时间的设置有要求,需要根据具体业务设置一个合理的经验值,避免锁超时时间到了,业务没执行完的问题。

实现方案2:Lua脚本

代码语言:javascript复制
package main

import (
 "fmt"
 "time"
 "github.com/satori/go.uuid"
 "github.com/gomodule/redigo/redis"
)

var uuidClient uuid.UUID

const (
    SCRIPT_LOCK = ` local res=redis.call('GET', KEYS[1]) if res then return 0 else redis.call('SET',KEYS[1],ARGV[1]); redis.call('EXPIRE',KEYS[1],ARGV[2]) return 1 end `   
    SCRIPT_EXPIRE = ` local res=redis.call('GET', KEYS[1]) if not res then return -1 end if res==ARGV[1] then redis.call('EXPIRE', KEYS[1], ARGV[2]) return 1 else return 0 end `   
    SCRIPT_DEL = ` local res=redis.call('GET', KEYS[1]) if not res then return -1 end if res==ARGV[1] then redis.call('DEL', KEYS[1]) else return 0 end `   
)

func ResetExpire() { 
   
    fmt.Println("Reset expire begin...")
    rds, err := redis.Dial("tcp", "127.0.0.1:6379")
    if err != nil { 
   
        fmt.Println("Connect to redis server error.", err)
        return
    }

    for true { 
   
        luaExpire := redis.NewScript(1, SCRIPT_EXPIRE)
        result, err := redis.Int(luaExpire.Do(rds, "lock", uuidClient.String(), 5))
        if err != nil { 
   
            fmt.Println("luaExpire exec error", err)
            break
        }
        if result != 1 { 
   
            fmt.Println("Reset expire failed.")
            break
        } else { 
   
            fmt.Println("Reset expire succeed.")
        }
        time.Sleep(3 * time.Second)
    }
    fmt.Println("Reset expire end.")
}

func main() { 
   
    for true { 
   
        rds, err := redis.Dial("tcp", "127.0.0.1:6379")
        if err != nil { 
   
            fmt.Println("Connect to redis server error.", err)
            time.Sleep(5 * time.Second)
            continue
        }
        defer rds.Close()
	
		// 生成UUID标识锁与Client的对应关系
        uuidClient,err = uuid.NewV4()	//也可以生成随机数字符串来代替
        if err != nil { 
   
            fmt.Println("New uuid error.", err)
            return
        }

        luaLock := redis.NewScript(1, SCRIPT_LOCK)
        luaDel:= redis.NewScript(1, SCRIPT_DEL)
        for true { 
   
            result, err := redis.Int(luaLock.Do(rds, "lock", uuidClient.String(), 5))
            if err != nil { 
   
                fmt.Println("luaLock exec error.", err)
                break
            }

            if result == 0 { 
   
                fmt.Println("Set lock failed.")
                time.Sleep(5 * time.Second)
                continue
            }
            fmt.Println("Set lock succeed.")

            go ResetExpire()
            // 加锁成功
        	fmt.Println("work begining...")
	        // 此处处理业务
	        fmt.Println("work end");
	        
	        // 业务处理结束后释放锁
	        result, err = redis.Int(luaDel.Do(rds, "lock", uuidClient.String()))
	        return
        }
    }
}

Redis采用Lua脚本可以执行更多的个性化的原子操作,在我项目中就采用这种容错性更高的方式。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/128965.html原文链接:https://javaforall.cn

0 人点赞