为什么需要分布式锁
1.为了解决Java共享内存模型带来的线程安全问题,我们可以通过加锁来保证资源访问的单一,如JVM内置锁synchronized,类级别的锁ReentrantLock。
2.但是随着业务的发展,单机服务毕竟存在着限制,故会往多台组合形成集群架构,面对集群架构,我们同样存在则资源共享问题,而每台服务器有着自己的JVM,这时候我们对于锁的实现不得不考虑分布式的实现。
分布式锁应该具备哪些条件
1.在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
2.高可用的获取锁与释放锁
3.高性能的获取锁与释放锁
4.具备可重入特性(可理解为重新进入,由多于一个任务并发使用,而不必担心数据错误)
5.具备锁失效机制,即自动解锁,防止死锁
6.具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败
秒杀抢购场景模拟(模拟并发问题:其实就是指每一步如果存在间隔时间,那么当某一线程间隔时间拉长,会对其余线程造成什么影响)
0.如果要在本机测试的话
1)配置Nginx实现负载均衡
代码语言:javascript复制http {
upstream testfuzai {
server 127.0.0.1:8080 weight=1;
server 127.0.0.1:8090 weight=1;
}
server {
listen 80;
server_name localhost;
location / {
//proxy_pass:设置后端代理服务器的地址。这个地址(address)可以是一个域名或ip地址和端口,或者一个 unix-domain socket路径。
proxy_pass http://testfuzai;
proxy_set_header Host $proxy_host;
}
}
}
2)启动redis设置好参数与数量
3)启动项目并分别配置不同端口(要与Nginx里面的一致)
4)进行压测,通过jmeter的Thread Group里面编辑好HTTP Request,设置参数 线程数 Number of Threads 【设置为200】 ,请求的重复次数 Loop count 【设置为5】 ,Ramp-up period(seconds)线程启动开始运行的时间间隔(单位是秒)【设置为1】。则,一秒内会有1000个请求打过去。
1.不加锁进行库存扣减的情况:
代码示例
代码语言:javascript复制@RequestMapping("/deduct_stock")
public String deductStock() {
//从redis取出库存
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int realStock = stock - 1;
//往redis写入库存
stringRedisTemplate.opsForValue().set("stock", realStock "");
System.out.println("扣减成功,剩余库存:" realStock);
} else {
System.out.println("扣减失败,库存不足");
}
return "end";
}
发现说明
1)通过打印输出,我们会发现两台机器上会出现重复的值(即出现了超卖现象)。甚至会出现另一台服务器的数据覆盖本服务器的数据。
2)原因在于读取数据和写入数据存在时间差,如两个服务器Q1和Q1,Q1有请求,获取库存【假设300】,在库存判断大小之后进行扣减库存如果慢了【假设需要3秒】,那么Q2有5次请求,获取到库存,扣减完后设置,依次5次,则库存为【295】。但是此时Q1完成自身请求又会把库存设置为【299】。故不合理。所以应该改为使用stringRedisTemplate.boundValueOps("stock").increment(-1); 改为采用redis内部扣除,减少了超卖的个数。但是就算改了也只是避免了覆盖问题,仍然没有解决超卖问题。如果有6台服务器,库存剩下1个的时候六个请求同时进入到扣减库存这一步,那么就会出现超卖5个的现象(这也是超卖个数最多的现象)。
2.采用SETNX的方式加分布式锁的情况:
代码示例
代码语言:javascript复制public String deductStock() {
String lockKey = "lock:product_101";
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);
if (!result) {
return "error_code";
}
try {
int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
if (stock > 0) {
Long realStock = (Long) stringRedisTemplate.opsForValue().decrement("stock");
System.out.println("扣减成功,剩余库存:" realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
stringRedisTemplate.delete(lockKey);
}
return "end";
}
发现说明
1)这种方式明显保证了在分布式情况下只有一个线程能够执行业务代码。但是我们不可能对于用户买商品的时候返回错误提示,如果不断自旋的话又容易让CPU飙升。肯定要考虑休眠与唤醒,但可以在上层方法里面处理。
2)同时很明显存在个问题,如果我在扣减库存时候服务器宕机了,库存扣减还没设置【且没执行finally代码,那么我这个商品的锁就不会被释放,除非手动清除】。
那么肯定需要设置超时时间。如
代码语言:javascript复制Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
会发现补一个超时时间的话依旧无法避免之前的问题,故加锁和设置超时时间需要保持原子性。
3)采用原子操作:Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);基于设置了超时时间,那么我们如何考量超时时间呢,业务执行多久我们根本不可得知。故容易出现时间到期了,业务还没执行完。这就容易出现A持有锁执行任务,还没完成就超时了,B持有锁执行任务,A执行完,释放锁【此时会释放B的锁】的情况。所以释放锁必须要持有锁本人才能执行。
代码语言:javascript复制if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
stringRedisTemplate.delete(lockKey);
}
所以clientId需要是分布式ID,然后释放锁改为判断clientId符合才能去释放。
3.改进之后的情况:
代码示例
代码语言:javascript复制public String deductStock() {
String lockKey = "lock:product_101";
String clientId = UUID.randomUUID().toString();
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
if (!result) {
return "error_code";
}
try {
发现说明
1)即时加了判断,我们会发现依旧会存在问题【因为判断与释放锁操作不是原子性的】,如果在判断里面加上休眠进行试验
代码语言:javascript复制if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
Thread.sleep(20000);
stringRedisTemplate.delete(lockKey);
}
我们会发现根本问题依旧没有解决,只是减少了发生的情况。究其原因,本质上还是锁超时导致的。解决这个问题就要引入一个完美的解决方案叫做锁续命。
2)锁续命(watchDog):假设主线程抢到锁开始执行业务逻辑,开启一个分线程,在分线程里边做一个定时任务,比如说设置的锁超时时间是30s,那么我们的定时任务时间就设置为10s,定时任务设置的时间一定要比锁超时时间小,每10s定时任务先去判断主线程有没有结束,没有结束的话说明主线程就还在,还在进行业务逻辑操作,这个时候我们执行一条expire命令,将主线程锁的超时时间重新设置为30s,这样的话只要主线程还没结束,主线程就会被分线程定时任务去做续命逻辑,维持在30s,判断主线程结束,就不再执行续命逻辑。
Redisson分布式锁框架剖析
1.引入依赖
代码语言:javascript复制<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
2.进行配置
代码语言:javascript复制@Bean
public Redisson redisson() {
// 此为单机模式
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
3.业务代码展示
代码语言:javascript复制public String deductStock2() {
String lockKey = "lock:product_101";
//获取锁对象
RLock redissonLock = redisson.getLock(lockKey);
//加分布式锁
redissonLock.lock();
try {
Long stock = (Long) stringRedisTemplate.opsForValue().decrement("stock");
if (stock > 0) {
Long realStock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", realStock "");
System.out.println("扣减成功,剩余库存:" realStock);
} else {
System.out.println("扣减失败,库存不足");
}
} finally {
//解锁
redissonLock.unlock();
}
return "end";
}
发现说明
1.如果在集群架构下面,分布式锁如果在Master节点上写成功了就会返回给客户端,但是此时还需要同步给从节点。
2.如果在此时间内Master节点结点宕机,那么数据将会消失,而从节点上没有锁的信息(变为Master节点)。【主从架构锁失效问题】
4.为解决主从架构锁失效问题引入的RedLock(不建议用,因为本质上还是没有解决主从架构锁失效问题)
0.原理展示
1.redssion 集群配置(在resource下创建 redssion.yml文件)
代码语言:javascript复制clusterServersConfig:
# 连接空闲超时,单位:毫秒 默认10000
idleConnectionTimeout: 10000
pingTimeout: 1000
# 同任何节点建立连接时的等待超时。时间单位是毫秒 默认10000
connectTimeout: 10000
# 等待节点回复命令的时间。该时间从命令发送成功时开始计时。默认3000
timeout: 3000
# 命令失败重试次数
retryAttempts: 3
# 命令重试发送时间间隔,单位:毫秒
retryInterval: 1500
# 重新连接时间间隔,单位:毫秒
reconnectionTimeout: 3000
# 执行失败最大次数
failedAttempts: 3
# 密码
password: test1234
# 单个连接最大订阅数量
subscriptionsPerConnection: 5
clientName: null
# loadBalancer 负载均衡算法类的选择
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
#从节点发布和订阅连接的最小空闲连接数
slaveSubscriptionConnectionMinimumIdleSize: 1
#从节点发布和订阅连接池大小 默认值50
slaveSubscriptionConnectionPoolSize: 50
# 从节点最小空闲连接数 默认值32
slaveConnectionMinimumIdleSize: 32
# 从节点连接池大小 默认64
slaveConnectionPoolSize: 64
# 主节点最小空闲连接数 默认32
masterConnectionMinimumIdleSize: 32
# 主节点连接池大小 默认64
masterConnectionPoolSize: 64
# 订阅操作的负载均衡模式
subscriptionMode: SLAVE
# 只在从服务器读取
readMode: SLAVE
# 集群地址
nodeAddresses:
- "redis://IP地址:30001" //
- "redis://IP地址:30002"
- "redis://IP地址:30003"
- "redis://IP地址:30004"
- "redis://IP地址:30005"
- "redis://IP地址:30006"
# 对Redis集群节点状态扫描的时间间隔。单位是毫秒。默认1000
scanInterval: 1000
#这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享。默认2
threads: 0
#这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。默认2
nettyThreads: 0
# 编码方式 默认org.redisson.codec.JsonJacksonCodec
codec: !<org.redisson.codec.JsonJacksonCodec> {}
#传输模式
transportMode: NIO
# 分布式锁自动过期时间,防止死锁,默认30000
lockWatchdogTimeout: 30000
# 通过该参数来修改是否按订阅发布消息的接收顺序出来消息,如果选否将对消息实行并行处理,该参数只适用于订阅发布消息的情况, 默认true
keepPubSubOrder: true
# 用来指定高性能引擎的行为。由于该变量值的选用与使用场景息息相关(NORMAL除外)我们建议对每个参数值都进行尝试。
2.代码配置
代码语言:javascript复制@Bean
public RedissonClient redisson() throws IOException {
Config config = Config.fromYAML(new ClassPathResource("redisson.yml").getInputStream());
RedissonClient redisson = Redisson.create(config);
return redisson;
}
//或者
@Bean
public Redisson redisson() {
// 此为集群模式
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://127.0.0.1:6379")
.addNodeAddress("redis://127.0.0.1:6389")
.addNodeAddress("redis://127.0.0.1:6399")
.addNodeAddress("redis://127.0.0.1:6369");
return (Redisson) Redisson.create(config);
}
3.业务代码示例
代码语言:javascript复制@RequestMapping("/redlock")
public String redlock() {
RLock lock1 = redisson.getLock("Key1_product_001");
RLock lock2 = redisson.getLock("Key2_product_001");
RLock lock3 = redisson.getLock("Key3_product_001");
/**
* 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)
*/
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
/**
* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
*/
boolean res = redLock.tryLock(10, 30, TimeUnit.SECONDS);
if (res) {
//成功获得锁,在这里处理业务
}
} catch (Exception e) {
throw new RuntimeException("lock fail");
} finally {
//无论如何, 最后都要解锁
redLock.unlock();
}
return "end";
}
4.分析说明(为什么不推荐用)
1)如果不是集群,为保证高可用,要对三个节点都添加了从节点(因为如果没有从节点,线上只要有两个服务宕机了,那么这个分布式锁将不再可用)
2)针对三主三从的情况,A线程对redis_1_主 和 redis_2_主 加锁成功,对 redis_3_主 加锁失败,则可以获得分布式锁,执行任务。但是还没同步情况下,redis_1_主宕机,redis_1_从 晋升成功数据丢失,此时B线程来加锁,redis_1_从加锁成功和 redis_3_主 加锁成功,对 redis_2_主 加锁失败,也能获得分布式锁。【概率不大但还是会存在问题】
3)针对集群如果不搞主从【一旦出现宕机,数据量大,且访问高的话,这里面就存在着缓存雪崩的危机】,此外如果集群半数节点宕机,集群会被迫停了,此外如果加锁节点越多,加锁效率越低下。
4)既然原理与zookeeper的差不多而且也损失了高性能的特性,那其实还不如使用zookeeper分布式锁。
5.原理分析
6.源码剖析
1)Redisson类#getLock方法
代码语言:javascript复制public RLock getLock(String name) {
return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}
2)Redisson类#lock方法
代码语言:javascript复制public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
//RedissonLock类#lockInterruptibly方法
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
//RedissonLock类#lockInterruptibly方法
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
//先在redis中发布订阅消息,等待用完锁的线程通知
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
//再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
break;
}
if (ttl >= 0) {
//利用 Semaphore 信号量的方式获得许可,但是这种休眠是定时的
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
}
//RedissonLock类#tryAcquire方法
//利用future的方式阻塞式等待返回结果
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
//RedissonObject类#get方法
protected <V> V get(RFuture<V> future) {
return commandExecutor.get(future);
}
//RedissonLock类#subscribe方法
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
//PublishSubscribe类#subscribe方法
public RFuture<E> subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) {
final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName);
final RPromise<E> newPromise = new RedissonPromise<E>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get());
}
};
Runnable listener = new Runnable() {
@Override
public void run() {
// 1:判断RedisLockEntry 是否存在
E entry = entries.get(entryName);
if (entry != null) {
entry.aquire();
semaphore.release();
entry.getPromise().addListener(new TransferListener<E>(newPromise));
return;
}
// 2:创建RedisLockEntry
E value = createEntry(newPromise);
value.aquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.aquire();
semaphore.release();
oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
return;
}
// 3:创建一个监听器,别的线程进行redis-pub命令之后进行调用
RedisPubSubListener<Object> listener = createListener(channelName, value);
// 4:底层交给netty调用redis-sub命令
subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener);
}
};
semaphore.acquire(listener);
listenerHolder.set(listener);
return newPromise;
}
3)RedissonLock类#tryAcquireAsync方法(核心点主体)
代码语言:javascript复制//RedissonLock类#tryAcquireAsync方法
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//尝试加锁逻辑
RFuture<Long> ttlRemainingFuture=tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//添加监听器
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
//Future任务执行完会回调该方法
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// 加锁成功
if (ttlRemaining == null) {
//看门狗续命
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
4)RedissonLock类#tryLockInnerAsync方法(核心点,加锁逻辑)
代码语言:javascript复制//RedissonLock类#tryLockInnerAsync方法
//利用redis的单线程执行任务,redis会将整个脚本作为一个整体执行,且中间不会被其他命令插入
//采用的是hash的类型来存储锁,为了实现重入锁的概念
//Redis pttl命令以毫秒为单位返回 key 的剩余过期时间
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then "
"redis.call('hset', KEYS[1], ARGV[2], 1); "
"redis.call('pexpire', KEYS[1], ARGV[1]); "
"return nil; "
"end; "
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "
"redis.call('hincrby', KEYS[1], ARGV[2], 1); "
"redis.call('pexpire', KEYS[1], ARGV[1]); "
"return nil; "
"end; "
"return redis.call('pttl', KEYS[1]);",
//对应为KEYS[1](对应传入的锁的命名),ARGV[1](设置的超时时间,默认30s) ,ARGV[2] -》(uuid ":" threadId)
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
5)RedissonLock类#scheduleExpirationRenewal方法(核心点,看门狗的逻辑【续命】)
代码语言:javascript复制//RedissonLock类#scheduleExpirationRenewal方法
//采用Future 事件监听的方式,方法嵌套调用来实现定时任务
private void scheduleExpirationRenewal(final long threadId) {
if (expirationRenewalMap.containsKey(getEntryName())) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "
"redis.call('pexpire', KEYS[1], ARGV[1]); "
"return 1; "
"end; "
"return 0;",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
//再次添加监听器,重复检查
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
expirationRenewalMap.remove(getEntryName());
if (!future.isSuccess()) {
log.error("Can't update lock " getName() " expiration", future.cause());
return;
}
if (future.getNow()) {
// reschedule itself //递归调用
scheduleExpirationRenewal(threadId);
}
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
//如果该任务已经存在一个了,就把新建的任务关闭,Map中的key为(uuid ":" threadId)
if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
task.cancel();
}
}
6)Redisson类#unlock方法
代码语言:javascript复制//RedissonLock类#unlock方法
public void unlock() {
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException(...);
}
if (opStatus) {
//移除看门狗的定时任务
cancelExpirationRenewal();
}
}
//RedissonLock类#unlockInnerAsync方法
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果不存在锁
"if (redis.call('exists', KEYS[1]) == 0) then "
"redis.call('publish', KEYS[2], ARGV[1]); "
"return 1; "
"end;"
//当前线程并没有持有锁,则返回nil
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then "
"return nil;"
"end; "
//前线程持有锁,则对value-1,拿到-1之后的vlaue
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); "
//value>0,以毫秒为单位返回剩下的过期时间。(保证可重入)
"if (counter > 0) then "
"redis.call('pexpire', KEYS[1], ARGV[2]); "
"return 0; "
//value<=0,则对key进行删除操作,return 1 (方法返回 true)。然后进行redis-pub指令,用于唤醒其他正在休眠的线程。
"else "
"redis.call('del', KEYS[1]); "
"redis.call('publish', KEYS[2], ARGV[1]); "
"return 1; "
"end; "
"return nil;",
//参数顺序KEYS[1](锁的名称),KEYS[2](发布订阅的Channel名:redisson_lock__channel 锁名),ARGV[1](发布的消息),ARGV[2](锁超时时间),ARGV[3](uuid ":" threadId)
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
7)Redisson类#tryLock方法
代码语言:javascript复制public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
newLeaseTime = unit.toMillis(waitTime)*2;
}
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
long lockWaitTime = calcLockWaitTime(remainTime);
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
lockAcquired = false;
}
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
if (failedLocksLimit == 0) {
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// reset iterator
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
failedLocksLimit--;
}
}
if (remainTime != -1) {
remainTime -= (System.currentTimeMillis() - time);
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
Redis与Zookeeper分布式锁的区别
1.从单机角度上来说,两者差别不大,都是项目引入的外部组件,redis相对于zookeeper来说,项目中使用的更多,常用性角度redis更加。
2.但是一般我们都会做集群(容错率更高):
【1】从分布式的CAP角度分析:
redis满足AP,在Master节点上写成功了会优先返回给客户端,之后在同步给从节点
zookeeper满足CP,在Master节点上写成功了会优先同步给从节点【ZAB协议(半数以上写成功)】,之后在返回给客户端
【2】主从架构锁失效问题:
redis会出现,因为从节点变成主节点时,会出现丢失数据的问题。
zookeeper不会出现,因为从节点变成主节点时,不会会出现丢失数据的问题。
【3】集群下性能角度:
redis性能会高于zookeeper,同步是个耗时的操作(而且这个过程中还是相当于阻塞线程),并发越高的情况,我们想要的是耗时越少的越好。
3.选redis还是zk实现分布式锁:
首先zk的性能肯定不如redis,但是从分布式锁的角度语义上来说,zk可能更适合一些,所以如果对性能要求比较高的话就选redis,对数据的强一致性有特别严格要求的话就选zk,现在的主流的分布式锁方案还是redis,也有一些办法去减少redis主从架构锁失效问题。
如何提升分布式锁性能
问题分析
1.分布式锁为我们解决了并发问题,但是其底层思路是将并行执行的请求给串行化了,因为redis是单线程执行任务的,肯定就不会有并发问题了。
2.但是这种设计本身是与我们高并发的需求是冲突的。但是某些场景下我们又不得不用,所以我们应该基于场景做一些优化。
3.正如阿里巴巴Java开发手册里面写到:
代码语言:javascript复制6. 【强制】高并发时,同步调用应该去考量锁的性能损耗。能用无锁数据结构,就不要用锁;能锁区块,就不要锁整个方法体;能用对象锁,就不要用类锁。
说明:尽可能使加锁的代码块工作量尽可能的小,避免在锁代码块中调用 RPC 方法。
7. 【强制】对多个资源、数据库表、对象同时加锁时,需要保持一致的加锁顺序,否则可能会造成死锁。
说明:线程一需要对表 A、B、C 依次全部加锁后才可以进行更新操作,那么线程二的加锁顺序也必须是 A、B、C,否则可能出现死锁。
8. 【强制】并发修改同一记录时,避免更新丢失,需要加锁。要么在应用层加锁,要么在缓存加锁,要么在数据库层使用乐观锁,使用 version 作为更新依据。
说明:如果每次访问冲突概率小于 20%,推荐使用乐观锁,否则使用悲观锁。乐观锁的重试次数不得小于 3 次。
4.所以我们优先从锁的粒度开始,锁是否合适,加锁的范围是否够小。锁的粒度范围越小越好,加锁的代码越少性能就越高,因为加锁的代码会串行执行,没有必要加锁的代码肯定是让他们并行执行这样效率更高。
案例演示
场景说明:
在秒杀抢购的情况下,大量的秒杀商品其实都是走同一逻辑的,如果使用公用的锁必然是不合适的,这会大大阻塞住整个系统,而且不同商品之前根本不存在竞争关系,故一般我们会采用类似 redis_promotion_product_stock_$productId :1000 这种设置库存值 。那么对于每个商品既然拥有了自己的库存那么对于对应库存加锁就能缩小了锁的颗粒度。
但是这种真的就可行了嘛?对A商品的下单,都必对"redis_promotion_product_lock_A"这个锁key来加锁。这样会导致对同一个商品的下单请求,就必须串行化,一个接一个的处理。假设加锁之后,释放锁之前,查库存 -> 创建订单 -> 扣减库存,这个过程性能很高吧,算他全过程20毫秒,这应该不错了。那么1秒是1000毫秒,只能容纳50个对这个商品的请求依次串行完成处理。这种性能远远不能满足我们想要的。而且对于变量进行原子操作这种:
代码语言:javascript复制13. 【参考】volatile 解决多线程内存不可见问题。对于一写多读,是可以解决变量同步问题,但是如果多写,同样无法解决线程安全问题。
如果是 count 操作,使用如下类实现:AtomicInteger count = new AtomicInteger(); count.addAndGet(1);
如果是 JDK8,推荐使用 LongAdder 对象,比 AtomicLong 性能更好(减少乐观锁的重试次数)。
了解过源码的都应该知道 AtomicLong和ConcurrentHashMap 都是优化过类似操作的。那么为何不参考呢?【分段加锁思想
】
AtomicLong将变量base结合一些数组变量,共同维持总数。面对高并发下,是针对单个数组节点进行加锁,修改节点内数据,而总量依旧是他们加起来,而且数组的最大容量与核心数有关。是不是豁然开朗?这与我们的场景是不是很像。多台服务器对应多核心。假设有4台服务器,我们是不是可以将变量 redis_promotion_product_stock_productId :1000 拆解为 redis_promotion_product_stock_productId_1 :250,..,redis_promotion_product_stock_
这样的4份(性能好的服务器,可以适当偏多)。那么服务器的CPU是不是就充分利用了,而且他们之前的并发问题是不是变小了。
又或者分成10份,每个服务器持有一份,用完再去获取新的份额(这种需要额外添加列表维护,但是并发冲突再次下降)。
一旦对某个数据做了分段处理之后,就会存在一个问题:假设服务器A的份额消耗完了,但是其余服务器还存于份额:
解决方案(处理库存不足的方案是必须要做的):
1.发现这个分段库存里的库存不足了,释放锁,然后立马换下一个分段库存,再次尝试加锁后尝试处理(核心逻辑)。
2.依托于负载均衡,先判断总库存是否还是有的,有的负载到其他服务器,要设置好重试次数。(或者不重试,返回友好提示,让客户自己去重试,毕竟秒杀抢购这东西)
总结
1.分布式锁并发优化,是一个十分复杂的过程,需要考虑数据的拆分,如何选择拆分的数据,如何校验,如何切换等等。这些都是需要我们考量和积累经验的。