redis实战之业务通用分布式锁

2022-08-23 14:19:58 浏览数 (1)

一.前言

​ hello,everyone。在之前的博客:老生常谈之缓存踩坑中介绍了缓存的常见坑点。而redis是目前java后端缓存最重要的中间件,熟悉redis的常见的使用场景是非常重要的。本文将是redis实战第一篇:分布式锁。希望能帮助到大家,文中如有不对之处,欢迎指出,共同进步。

二.分布式锁介绍

​ 一文吃透AQS&ReentrantLock的前世与今生文中介绍了单体应用下编程式加锁,解锁的实现原理与使用。那么在微服务体系下,对于共享资源的访问不在一台jvm中,我们如何控制共享资源的访问安全呢?答案就是分布式锁。分布式锁常见的实现方式有很多种:mysql,ZK,redis。最高频使用的还是基于redis的,性能好,但是相对来说维护复杂度比较高。

​ 本文核心内容为实战,提供基于单redis服务如何轻便接入与使用的方式。关于分布式锁的概念,本文默认大家已经了解了相关概念。如果对分布式锁常见的实现方式还是有不熟悉的,建议阅读:再有人问你分布式锁,这篇文章扔给他

三.通用分布式锁

​ 讲通用分布式锁之前,我们先来回忆一下,在单体应用下我们对共享资源的加解锁操作范式代码

代码语言:javascript复制
ReentrantLock lock = new ReentrantLock();
try {
    lock.lock();
    //正常业务逻辑
}catch (Exception e){
   //异常处理 
}finally {
    lock.unlock();
    //回滚业务逻辑
}

​ 同样的,基于redis实现的加锁范式也是如上,只不过是锁的申明方式做了改变了。借助redis的set命令来实现set resourceName value ex 5 nx。到这里我们在看一下上面ReentrantLock的范式,是不是感觉对临界资源的加锁解锁完全可以抽离出来变成一个切面。在处理正常逻辑之前加锁,处理结束解锁。

​ 在求求你,别写祖传代码了与看完这篇,你就是架构师两文中对日常编写代码时需要注意的规范做了建议。根据上面两文的内容并基于写切面的思路,我们是不是可以定义一个通用的注解,只要方法上标注了这个注解,在执行方法之前,我们统一设置分布式锁,业务逻辑执行结束,就释放锁。如果加锁与解锁的执行逻辑在一个方法中间,完全可以把加锁到解锁这段业务逻辑剥离出来写一个函数,实现统一的加解锁。

3.1.业务分布式锁注解

ok,到了这里我们先来定义一个注解,可以加在业务方法之上

代码语言:javascript复制
package com.baiyan.lock.annotation;

import com.baiyan.lock.config.RedisConstant;

import java.lang.annotation.*;

/**
 * 通用分布式锁
 * @author baiyan
 * @time 2021/5/29 14:41
 */
@Documented
@Target( { ElementType.ANNOTATION_TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface DistributedLock {

    /**
     * 争抢分布式锁失败后是否自旋
     * true:自旋
     * false:抛出异常
     *
     * @return
     */
    boolean blocked() default true;

    /**
     * 锁过期时间,默认为15秒
     *
     * @return
     */
    int lockTime() default RedisConstant.DEFAULT_EXPIRE_TIME;

    /**
     * 是否开启锁续期
     *
     * @return
     */
    boolean enableExtendTime() default false;

    /**
     * 默认锁续期时间
     *
     * @return
     */
    int extendTime() default RedisConstant.DEFAULT_EXTEND_EXPIRE_TIME;

    /**
     * 默认锁续期次数
     *
     * @return
     */
    int extendCounts() default RedisConstant.DEFAULT_EXTEND_EXPIRE_COUNTS;


}

说明:

该注解定义了分布式锁的一个通用性概念,为了demo演示方便,没有使用redission那种可重入锁的方式,加的锁都是不可重入的。其实知道redission实现的同学都知道,redission实现可重入锁其实也是基于AQS的方式实现了可重入锁,本文不展开解释。

参数解析:

blocked() :boolean类型,默认为true,当方法访问共享资源时,如果线程没有抢到锁是自选等待,还是直接返回报错。

**lockTime()**:锁过期时间,默认为15S

**enableExtendTime()**:是否开启锁续期。分布式锁在调用的过程中,有可能业务逻辑还没有实现完成,但是锁却到期了,导致其他线程获取到了锁,修改的数据,导致了线程不安全。redission中实现锁续期有专门的watchdog机制,锁到期时,服务端将会去校验客户端是否还在继续持有锁,如果持有会对锁进行延期。

**extendTime()**:锁续期时间

**extendCounts()**:锁续期次数

3.2.锁接口

代码语言:javascript复制
package com.baiyan.lock.model;

/**
 * @author baiyan
 * @time 2021/5/29 15:14
 */
public interface BaseLock {

    /**
     * 业务类型,务必保证业务类型是唯一
     *
     * @return
     */
    String getLockBusinessType();

    /**
     * 分布式锁唯一标识
     *
     * @return
     */
    String getLockUniqueFlag();

}

说明:

由于是基于切面形式的,加锁的方式接是针对解析方法的入参参数解析。这里跟开头处说的一样,如果加锁的位子不是在方法的开头,可以把持有锁的逻辑独立成一个方法,入参实现该接口。

参数解析:

**getLockBusinessType()**:业务类型,分布式锁在加锁的时候,需要申明业务,保证一类锁的的前缀一致

**getLockUniqueFlag()**:分布式锁唯一标识,保证同一业务类型下不同的数据的锁key值唯一,建议使用数据的主键id。

3.3.切面

代码语言:javascript复制
package com.baiyan.lock.aspect;

import com.baiyan.lock.annotation.DistributedLock;
import com.baiyan.lock.config.RedisConstant;
import com.baiyan.lock.model.BaseLock;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Stream;

/**
 * 分布式锁实现
 *
 * @author baiyan
 * @time 2021/5/29 15:07
 */
@Component
@Aspect
public class DistributedLockAspect {

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Around("@annotation(com.baiyan.lock.annotation.DistributedLock)")
    public Object logExecutionTime(ProceedingJoinPoint pjp) throws Throwable {
        MethodSignature signature = (MethodSignature) pjp.getSignature();
        DistributedLock annotation = signature.getMethod().getAnnotation(DistributedLock.class);
        BaseLock lock = (BaseLock) Optional.ofNullable(pjp.getArgs())
                .flatMap(args -> Stream.of(args).filter(e -> e instanceof BaseLock).findFirst()).orElse(null);
        if(Objects.isNull(lock)){
            throw new RuntimeException("方法参数为空,分布式锁添加失败");
        }
        String redisLockKey = RedisConstant.LOCK_PREFIX   lock.getLockBusinessType()  lock.getLockUniqueFlag();
        Object proceed;
        try{
            lock(annotation,redisLockKey);
            proceed = pjp.proceed();
            return proceed;
        }finally {
            unlock(redisLockKey);
        }
    }

    /**
     * 加锁
     *
     * @param annotation 注解信息
     * @param redisLockKey 加锁key
     */
    private void lock(DistributedLock annotation, String redisLockKey){
        int expireTime = annotation.enableExtendTime() ?
                    annotation.lockTime()   annotation.extendTime() * annotation.extendCounts()
                    :
                    annotation.lockTime();
        boolean getLock = false;
        //自旋的过程可以设置一定的间隔时间与最大的自旋时间,放置锁长期未被释放,抢占锁的线程越来越多,服务
        //雪崩,这里演示方便,仅提供思路
        while (!getLock){
            getLock = stringRedisTemplate.opsForValue()
                        .setIfAbsent(redisLockKey,String.valueOf(Thread.currentThread().getId()), Duration.ofSeconds(expireTime));
            if(!annotation.blocked() && !getLock){
                throw new RuntimeException("争抢锁失败");
            }
        }
    }

    /**
     * 解锁
     *
     * @param redisLockKey 加锁key
     */
    private void unlock( String redisLockKey){
        String value = stringRedisTemplate.opsForValue().get(redisLockKey);
        //避免参数解析异常时抛出的异常,走入解锁逻辑,删除了非本线程锁持有的锁
        if(Objects.equals(value,String.valueOf(Thread.currentThread().getId()))){
            stringRedisTemplate.delete(redisLockKey);
        }
    }

}

开头说到过,分布式锁是一个通用类型的组件,我们可以把这个切面做成一个组件的starter,利用spring的spi机制,业务应用引入,开箱即用

关于spring对于start的spi机制不太熟悉的可以看看这篇我写的一文吃透@SpringbootApplication的前世与今生

3.4.常量配置类

代码语言:javascript复制
package com.baiyan.lock.config;

/**
 * @author baiyan
 * @time 2021/5/29 14:38
 */
public class RedisConstant {
    /**
     * 分布式锁前缀
     */
    public final static String LOCK_PREFIX = "distributed_lock_prefix::";

    /**
     * 默认分布式锁过期时间
     */
    public final static int DEFAULT_EXPIRE_TIME = 15;

    /**
     * 默认分布式锁单次续期时间
     */
    public final static int DEFAULT_EXTEND_EXPIRE_TIME = 5;

    /**
     * 默认分布式锁续期次数
     */
    public final static int DEFAULT_EXTEND_EXPIRE_COUNTS = 3;
}

四.使用示例

4.1.加锁实体类

代码语言:javascript复制
package com.baiyan.lock.model;

import lombok.Data;

/**
 * @author baiyan
 * @time 2021/5/29 15:20
 */
@Data
public class Order implements BaseLock {

    /**
     * 订单编号
     */
    private Long orderId;

    /**
     * 手机号
     */
    private String mobile;

    @Override
    public String getLockBusinessType(){
        return "order";
    }

    @Override
    public String getLockUniqueFlag(){
        return this.mobile;
    }

}

4.2.请求类

代码语言:javascript复制
package com.baiyan.lock.controller;

import com.baiyan.lock.annotation.DistributedLock;
import com.baiyan.lock.model.Order;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author baiyan
 * @time 2021/5/29 14:59
 */
@RestController
public class TestController {

    @PostMapping
    @DistributedLock(blocked = false)
    public void add(@RequestBody Order order) throws Exception{
        Thread.sleep(10000);
        System.out.println(order);
    }
}

注解@DistributedLock(blocked = false)表示请求到达时,如果当前手机号的分布式锁已经被持有,那么将会直接报错返回给前端,当然这里可以在切面定义特定的业务异常,然后利用Spring中优雅的处理全局异常返回特定的业务提示给前端。直接标注**@DistributedLock**表示请求到达时,将会自旋等待锁释放,然后再进入方法。

0 人点赞