这是无量测试之道的第197篇原创
今天主要通过多读单写的例子来说下读写锁的原理
概念
多读单写,简单说,就是对资源的访问分为两种状态,一种是读操作,另一种是写操作。由应用程序提示锁应该做哪种操作。当为读模式时,所有的写动作被悬挂,而读请求被允许通过,而写动作时,所有操作被悬挂。并且,读写切换时,有足够的状态等待,直到真正安全时,才会切换动作。
如下图所示:
业务场景举例
比如现在有 A、B、C、D、E、F、G 6个线程,其中A、B、C、G 4个线程之行读请求,E、F 2个线程之行写请求,如何保证读写安全?
分析:
- 读写请求是可以在多个线程进行的
- 写请求时,所有的请求都会被停止即悬挂
解决:使用读写锁
代码:
demo里面的代码就是业务场景的表达,即有多个线程同时执行读写请求的业务场景
代码语言:javascript复制- (void)demo {
self.queue = dispatch_queue_create("rw_queue", DISPATCH_QUEUE_CONCURRENT);
for (int i = 0; i < 2; i ) {
dispatch_async(self.queue, ^{
[self read:1];
});
dispatch_async(self.queue, ^{
[self read:2];
});
dispatch_barrier_async(self.queue, ^{
[self write];
});
dispatch_async(self.queue, ^{
[self read:3];
});
}
}
下面的 read 和 write 方法里,就是读写锁的使用
代码语言:javascript复制
- (void)read {
pthread_rwlock_rdlock(&_lock);
sleep(1);
NSLog(@"%s", __func__);
pthread_rwlock_unlock(&_lock);
}
- (void)write
{
pthread_rwlock_wrlock(&_lock);
sleep(1);
NSLog(@"%s", __func__);
pthread_rwlock_unlock(&_lock);
}
读写锁的原理
- 在 AQS 中,通过 int 类型的全局变量 state 来表示同步状态,即用 state 来表示锁。 ReentrantReadWriteLock 也是通过 AQS 来实现锁的,但是 ReentrantReadWriteLock有两把锁:读锁和写锁,它们保护的都是同一个资源,那么如何用一个共享变量来区分锁是写锁还是读锁呢?答案就是按位拆分。
- 由于 state 是 int 类型的变量,在内存中占用4个字节,也就是32位。将其拆分为两部分:高16位和低16位,其中高16位用来表示读锁状态,低16位用来表示写锁状态。 当设置读锁成功时,就将高16位加1,释放读锁时,将高16位减1; 当设置写锁成功时,就将低16位加1,释放写锁时,将第16位减1; 如下图所示:
写锁加锁的原理
获取写锁的流程
c == 0表示锁还没有被任何线程占用
w 写锁的数量
如果 c==0,标记锁成功后,表述获取写锁成功
如果 c!=0 && w==0,表示读锁在占用锁,所以获取锁失败
如果 c!=0 && w!=0,表示写锁在占用锁,此时就需要判断访问该锁的线程是否和占用该锁的线程为同一线程,如果不为同一线程就返回失败;如果为同一线程,则判断重入的数量,数量为超过就返回成功,否则抛出异常
读锁加锁的原理
获取读锁的流程
c == 0 表示锁还没有被任何线程占用
r 读锁的数量
w = exclusiveCount(c) 写锁的数量
如果c!=0 && w!=0,表示写锁在占用锁,改线程就未获取到读锁所以立即执行fullTryAcquireShared(current);
如果c!=0 && r!=0,表示锁被写线程占用
- 如果 r==0, firstReader = current
- 如果第一个获取到读锁的线程不是当前线程就记录当前线程的获取锁的数量,并让请求线程获得锁
读锁获取锁失败后会循环的去执行下面这个方法,直到满足相应的条件才会 return 退出,否则一直循环
代码语言:javascript复制 final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
// for死循环,直到满足相应的条件才会return退出,否则一直循环
for (;;) {
int c = getState();
// 锁的状态为写锁时,持有锁的线程不等于当期那线程,就说明当前线程获取锁失败,返回-1
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 尝试设置同步变量的值,只要设置成功了,就表示当前线程获取到了锁,然后就设置锁的获取次数等相关信息
if (compareAndSetState(c, c SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount ;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count ;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
问题
当有100个线程来并发的进行读写请求,其中有99个线程是进行读请求,只有一个线程是进行写请求(假设写请求的编号为20)
- 先有1-19线程进行了读请求
- 然后第20线程进行了写请求
- 又来21-100线程80个线程进行读请求
- 结果是第20线程等到所有读线程执行完了才能执行写请求
- 从而导致写锁饥饿问题
总结
多读单写在实际开发过程中是非常常见的,不同的开发语言有不同的解决方式,但是大体的实现思路是差不多的。我们会使用读写锁,但是其读写锁的原理也需要明白和理解。
end