C|并发编程|互斥锁实现

2021-11-22 11:12:02 浏览数 (1)

OS:Three Easy Pieces

导语

锁Lock,正如现实中的锁一样,决定了对于资源的访问权。在并发编程中,由于资源共享的缘故,一个线程中的write操作有可能影响到另一个线程的read操作。

部分严格的程序员为了杜绝这种side effect,选择了Functional Programming,以确保完全的Thread Safety。而在正常的结构化编程中,程序员倾向于使用锁,防止意料之外的Side Effect。

锁控制了一个资源只能被一个线程同时访问,因此有效避免了多线程情况下的读写导致的异常输出。


自旋Spinning锁

代码语言:javascript复制
typedef struct __lock_t { int flag; } lock_t;

void init(lock_t *mutex) {
// 0 -> lock is available, 1 -> held
 mutex->flag = 0;
 }

 void lock(lock_t *mutex) {
 while (mutex->flag == 1) // TEST the flag   Line a
 ; // spin-wait (do nothing)
 mutex->flag = 1; // now SET it!             Line b
 }

 void unlock(lock_t *mutex) {
 mutex->flag = 0;

这是一个最基本的版本,当flag置1时,锁被获得,而flag置0时,锁被释放。而当锁没有被释放时,程序将会不断检测flag,而不做任何实际事情,因此被称作自旋。

原子性Atomicity

这个版本存在着一个致命的bug,因为CPU调度并不保证Line a与Line b之间不会插入其他线程的代码。如果在line a之后其他线程已经获得了锁,那么line ba仍然会被执行,也就是说flag的检测和设置被分开了,导致同时有两个线程持有这把锁。

我们一般使用atomic exchange来保证获取锁会是原子性操作,要么同时完成flag的检测和设置,要么什么都不做。这里C代码形式如下(TestAndSet):

代码语言:javascript复制
int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr; // fetch old value at old_ptr
*old_ptr = new; // store ’new’ into old_ptr
return old;
}

typedef struct __lock_t {
int flag;
} lock_t;

 void init(lock_t *lock) {
 // 0: lock is available, 1: lock is held
 lock->flag = 0;
 }

 void lock(lock_t *lock) {
 while (TestAndSet(&lock->flag, 1) == 1)
; // spin-wait (do nothing)
}

 void unlock(lock_t *lock) {
 lock->flag = 0;
}

而另一种形式为(CompareAndSwap),作用类似,但比TestAndSet更为泛用

代码语言:javascript复制
int CompareAndSwap(int *ptr, int expected, int new) {
 int original = *ptr;
 if (original == expected)
 *ptr = new;
 return original;
 }
void lock(lock_t *lock) {
 while (CompareAndSwap(&lock->flag, 0, 1) == 1)
 ; // spin
}

饥饿Starvation

由于CPU调度并不保证先试图获取锁的必定能先获得,可能出现某个线程很久无法获得锁的情况。一个简单的想法就是使用队列,保证FIFO。

我们先以FetchAndAdd作为原子性的后置 操作。

代码语言:javascript复制
int FetchAndAdd(int *ptr) {
 int old = *ptr;
 *ptr = old   1;
 return old;
}

typedef struct __lock_t {
 int ticket;
 int turn;
 } lock_t;

void lock_init(lock_t *lock) {
 lock->ticket = 0;
 lock->turn = 0;
 }

 void lock(lock_t *lock) {
 int myturn = FetchAndAdd(&lock->ticket);
 while (lock->turn != myturn)
; // spin
 }

 void unlock(lock_t *lock) {
 lock->turn = lock->turn   1;
 }

正如同餐厅叫号,turn表示当前的号码,ticket表示手中的号码,每一个顾客(线程)用完之后就呼叫下一个号码。

Sleeping锁

由于自旋锁导致每个线程都在执行while操作,空转造成了极大浪费,因此一种改进思路是:在没有获得锁之前,令线程直接沉睡。而当释放锁时,再唤醒下一个线程。同理,我们使用queue作为数据结构,但是维护一个显式的链表。

代码语言:javascript复制
typedef struct __lock_t {
 int flag;
 int guard;
 queue_t *q;
 } lock_t;

 void lock_init(lock_t *m) {
 m->flag = 0;
 m->guard = 0;
 queue_init(m->q);
 }

lock_t:

这里的flag表示锁有没有被线程需求,锁可以同时被多个线程所等候,仅当没有线程等候时才会置0。

而guard是lock和unlock过程的一个自旋锁。在过程结束后自动释放。(basically as a spin-lock around the flag and queue manipulations the lock is using)

代码语言:javascript复制
void lock(lock_t *m) {
 while (TestAndSet(&m->guard, 1) == 1)
 ; //acquire guard lock by spinning
 if (m->flag == 0) {
 m->flag = 1; // lock is acquired
 m->guard = 0;
 } else {
 queue_add(m->q, gettid());
 m->guard = 0;
 park();
 }
 }

lock:

当锁中队列为空时: 置flag为1,即flag锁被占用

当锁中队列不为空时: 入队,使用park操作令线程休眠等待唤醒。

代码语言:javascript复制
void unlock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
 ; //acquire guard lock by spinning
 if (queue_empty(m->q))
 m->flag = 0; // let go of lock; no one wants it
 else
 unpark(queue_remove(m->q)); // hold lock
// (for next thread!)
 m->guard = 0;

unlock:

当锁中队列为空时:置flag为0,即此锁闲置

当锁中队列不为空时:出队,使用unpark操作唤醒下一个线程并释放锁

Buggy

代码语言:javascript复制
 queue_add(m->q, gettid());
 m->guard = 0;             Line a   
 park();                   Line b

假如在line a和line b之间正好有一个线程unlock了,那么将会唤醒当前正在加锁的线程,然后再运行line b使得当前线程进入休眠,而队列中当前线程却已经出队。这样一来,陷入休眠的当前线程就不再可以被唤醒了。

为了解决这个问题,如果能直接让ab原子性就好了,然而实际情况却很难做到。

我们可以特异性针对上面的问题处理,例如某种实现中,setpark函数可以令程序进入准备park的状态,如果在park之前进程已经被unpark,那么park将直接返回

代码语言:javascript复制
queue_add(m->q, gettid());
setpark();
m->guard = 0;                
park();                  

Two-phase锁

实际操作系统中,互斥锁的实现综合了以上两种锁的实现。以下是Linux的Mutex实现机制。

膜这段代码!!!

代码语言:javascript复制
 void mutex_lock (int *mutex) {
 int v;
 /* Bit 31 was clear, we got the mutex (the fastpath) */
//自旋锁!
 if (atomic_bit_test_set (mutex, 31) == 0)
 return;
//维护等待队列长度!
 atomic_increment (mutex);
//这里存在一个问题,假如后面setpark部分continue,那么会存在两个醒着的线程抢夺锁。如果刚刚被唤醒的线程抢不到的话
//原本的队首又得重新进队尾了,那就很迷。可能需要调度算法,保证刚刚唤醒的线程能先执行?
 while (1) {
//被unlock唤醒了!!获取锁然后维护等待队列长度
 if (atomic_bit_test_set (mutex, 31) == 0) {
 atomic_decrement (mutex);
 return;
 }
 /* We have to waitFirst make sure the futex value
 we are monitoring is truly negative (locked). */

//类似setpark!防止v = *mutex;前面被插入unlock
v = *mutex;
 if (v >= 0)
 continue;

//类似setpark!防止v = *mutex;后面被插入unlock
 futex_wait (mutex, v);
 }
 }

 void mutex_unlock (int *mutex) {
 /* Adding 0x80000000 to counter results in 0 if and
 only if there are not other interested threads */
//解锁,如果等待队列长度是0就不用唤醒!不把这个逻辑放futex_wake是为了减少sys call的开销。
 if (atomic_add_zero (mutex, 0x80000000))
 return;

/* There are other threads waiting for this mutex,
 wake one of them up. */
//唤醒队首线程!
 futex_wake (mutex);
 }

Two-phase 锁意识到对于那些将会马上被释放的锁,使用自旋锁更有益处。而唤醒等操作需要使用更多的sys-call,因此会增大开销。

futex_wait和futex_wake会在内核态维护一个mutex对应的队列。

在第一阶段,线程将会自旋若干次,试图获取锁。

一旦第一阶段没有完成,则会进入第二阶段,线程沉睡,直到锁被释放后将线程唤醒。

上述linux的实现只自旋了一次,但是也可以使用有固定自旋次数的循环。

注意:

这里setpark的原因和上面不同,因为这里不会出现先入队列再沉睡的情况。

Special case: Queue Empty

假如没有v>=0的判断,

假如B lock中间插入C unlock,由于队列为空,lock位变为0,不wake下一个线程。此时B wait,则无法被唤醒。

但是如果continue,B就能直接拿锁,而不会wait。

联想

我个人其实是把这个和轮询/中断类比的,

以IO为例,轮询需要CPU不断访问IO,而中断则是仅当IO发生改变时CPU才进行访问。

同理,Spin其实就是CPU不断判断锁,而Sleep则是仅当锁被释放时才唤醒下一个线程。

尽管所处的抽象层次不同,但是这种思想确实有共通之处。

再往上,其实观察者模式也类似,查询其他对象是否发生改变很困难,因为在不知情的情况下,每时每刻都有可能发生改变。但改变的当事人想通知查询者就很容易,只需要在改变的同时发出消息即可。

猜想或者归纳一下,或许这种现象是由于信息隔离导致的。正是因为信息交流的双方信息的不对等,才导致传递这种信息所需要的代价截然不同。

0 人点赞