OS:Three Easy Pieces
导语
锁Lock,正如现实中的锁一样,决定了对于资源的访问权。在并发编程中,由于资源共享的缘故,一个线程中的write操作有可能影响到另一个线程的read操作。
部分严格的程序员为了杜绝这种side effect,选择了Functional Programming,以确保完全的Thread Safety。而在正常的结构化编程中,程序员倾向于使用锁,防止意料之外的Side Effect。
锁控制了一个资源只能被一个线程同时访问,因此有效避免了多线程情况下的读写导致的异常输出。
自旋Spinning锁
代码语言:javascript复制typedef struct __lock_t { int flag; } lock_t;
void init(lock_t *mutex) {
// 0 -> lock is available, 1 -> held
mutex->flag = 0;
}
void lock(lock_t *mutex) {
while (mutex->flag == 1) // TEST the flag Line a
; // spin-wait (do nothing)
mutex->flag = 1; // now SET it! Line b
}
void unlock(lock_t *mutex) {
mutex->flag = 0;
这是一个最基本的版本,当flag置1时,锁被获得,而flag置0时,锁被释放。而当锁没有被释放时,程序将会不断检测flag,而不做任何实际事情,因此被称作自旋。
原子性Atomicity
这个版本存在着一个致命的bug,因为CPU调度并不保证Line a与Line b之间不会插入其他线程的代码。如果在line a之后其他线程已经获得了锁,那么line ba仍然会被执行,也就是说flag的检测和设置被分开了,导致同时有两个线程持有这把锁。
我们一般使用atomic exchange来保证获取锁会是原子性操作,要么同时完成flag的检测和设置,要么什么都不做。这里C代码形式如下(TestAndSet):
代码语言:javascript复制int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr; // fetch old value at old_ptr
*old_ptr = new; // store ’new’ into old_ptr
return old;
}
typedef struct __lock_t {
int flag;
} lock_t;
void init(lock_t *lock) {
// 0: lock is available, 1: lock is held
lock->flag = 0;
}
void lock(lock_t *lock) {
while (TestAndSet(&lock->flag, 1) == 1)
; // spin-wait (do nothing)
}
void unlock(lock_t *lock) {
lock->flag = 0;
}
而另一种形式为(CompareAndSwap),作用类似,但比TestAndSet更为泛用
代码语言:javascript复制int CompareAndSwap(int *ptr, int expected, int new) {
int original = *ptr;
if (original == expected)
*ptr = new;
return original;
}
void lock(lock_t *lock) {
while (CompareAndSwap(&lock->flag, 0, 1) == 1)
; // spin
}
饥饿Starvation
由于CPU调度并不保证先试图获取锁的必定能先获得,可能出现某个线程很久无法获得锁的情况。一个简单的想法就是使用队列,保证FIFO。
我们先以FetchAndAdd作为原子性的后置 操作。
代码语言:javascript复制int FetchAndAdd(int *ptr) {
int old = *ptr;
*ptr = old 1;
return old;
}
typedef struct __lock_t {
int ticket;
int turn;
} lock_t;
void lock_init(lock_t *lock) {
lock->ticket = 0;
lock->turn = 0;
}
void lock(lock_t *lock) {
int myturn = FetchAndAdd(&lock->ticket);
while (lock->turn != myturn)
; // spin
}
void unlock(lock_t *lock) {
lock->turn = lock->turn 1;
}
正如同餐厅叫号,turn表示当前的号码,ticket表示手中的号码,每一个顾客(线程)用完之后就呼叫下一个号码。
Sleeping锁
由于自旋锁导致每个线程都在执行while操作,空转造成了极大浪费,因此一种改进思路是:在没有获得锁之前,令线程直接沉睡。而当释放锁时,再唤醒下一个线程。同理,我们使用queue作为数据结构,但是维护一个显式的链表。
代码语言:javascript复制typedef struct __lock_t {
int flag;
int guard;
queue_t *q;
} lock_t;
void lock_init(lock_t *m) {
m->flag = 0;
m->guard = 0;
queue_init(m->q);
}
lock_t:
这里的flag表示锁有没有被线程需求,锁可以同时被多个线程所等候,仅当没有线程等候时才会置0。
而guard是lock和unlock过程的一个自旋锁。在过程结束后自动释放。(basically as a spin-lock around the flag and queue manipulations the lock is using)
代码语言:javascript复制void lock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (m->flag == 0) {
m->flag = 1; // lock is acquired
m->guard = 0;
} else {
queue_add(m->q, gettid());
m->guard = 0;
park();
}
}
lock:
当锁中队列为空时: 置flag为1,即flag锁被占用
当锁中队列不为空时: 入队,使用park操作令线程休眠等待唤醒。
代码语言:javascript复制void unlock(lock_t *m) {
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // let go of lock; no one wants it
else
unpark(queue_remove(m->q)); // hold lock
// (for next thread!)
m->guard = 0;
unlock:
当锁中队列为空时:置flag为0,即此锁闲置
当锁中队列不为空时:出队,使用unpark操作唤醒下一个线程并释放锁
Buggy
代码语言:javascript复制 queue_add(m->q, gettid());
m->guard = 0; Line a
park(); Line b
假如在line a和line b之间正好有一个线程unlock了,那么将会唤醒当前正在加锁的线程,然后再运行line b使得当前线程进入休眠,而队列中当前线程却已经出队。这样一来,陷入休眠的当前线程就不再可以被唤醒了。
为了解决这个问题,如果能直接让ab原子性就好了,然而实际情况却很难做到。
我们可以特异性针对上面的问题处理,例如某种实现中,setpark函数可以令程序进入准备park的状态,如果在park之前进程已经被unpark,那么park将直接返回。
代码语言:javascript复制queue_add(m->q, gettid());
setpark();
m->guard = 0;
park();
Two-phase锁
实际操作系统中,互斥锁的实现综合了以上两种锁的实现。以下是Linux的Mutex实现机制。
膜这段代码!!!
代码语言:javascript复制 void mutex_lock (int *mutex) {
int v;
/* Bit 31 was clear, we got the mutex (the fastpath) */
//自旋锁!
if (atomic_bit_test_set (mutex, 31) == 0)
return;
//维护等待队列长度!
atomic_increment (mutex);
//这里存在一个问题,假如后面setpark部分continue,那么会存在两个醒着的线程抢夺锁。如果刚刚被唤醒的线程抢不到的话
//原本的队首又得重新进队尾了,那就很迷。可能需要调度算法,保证刚刚唤醒的线程能先执行?
while (1) {
//被unlock唤醒了!!获取锁然后维护等待队列长度
if (atomic_bit_test_set (mutex, 31) == 0) {
atomic_decrement (mutex);
return;
}
/* We have to waitFirst make sure the futex value
we are monitoring is truly negative (locked). */
//类似setpark!防止v = *mutex;前面被插入unlock
v = *mutex;
if (v >= 0)
continue;
//类似setpark!防止v = *mutex;后面被插入unlock
futex_wait (mutex, v);
}
}
void mutex_unlock (int *mutex) {
/* Adding 0x80000000 to counter results in 0 if and
only if there are not other interested threads */
//解锁,如果等待队列长度是0就不用唤醒!不把这个逻辑放futex_wake是为了减少sys call的开销。
if (atomic_add_zero (mutex, 0x80000000))
return;
/* There are other threads waiting for this mutex,
wake one of them up. */
//唤醒队首线程!
futex_wake (mutex);
}
Two-phase 锁意识到对于那些将会马上被释放的锁,使用自旋锁更有益处。而唤醒等操作需要使用更多的sys-call,因此会增大开销。
futex_wait和futex_wake会在内核态维护一个mutex对应的队列。
在第一阶段,线程将会自旋若干次,试图获取锁。
一旦第一阶段没有完成,则会进入第二阶段,线程沉睡,直到锁被释放后将线程唤醒。
上述linux的实现只自旋了一次,但是也可以使用有固定自旋次数的循环。
注意:
这里setpark的原因和上面不同,因为这里不会出现先入队列再沉睡的情况。
Special case: Queue Empty
假如没有v>=0的判断,
假如B lock中间插入C unlock,由于队列为空,lock位变为0,不wake下一个线程。此时B wait,则无法被唤醒。
但是如果continue,B就能直接拿锁,而不会wait。
联想
我个人其实是把这个和轮询/中断类比的,
以IO为例,轮询需要CPU不断访问IO,而中断则是仅当IO发生改变时CPU才进行访问。
同理,Spin其实就是CPU不断判断锁,而Sleep则是仅当锁被释放时才唤醒下一个线程。
尽管所处的抽象层次不同,但是这种思想确实有共通之处。
再往上,其实观察者模式也类似,查询其他对象是否发生改变很困难,因为在不知情的情况下,每时每刻都有可能发生改变。但改变的当事人想通知查询者就很容易,只需要在改变的同时发出消息即可。
猜想或者归纳一下,或许这种现象是由于信息隔离导致的。正是因为信息交流的双方信息的不对等,才导致传递这种信息所需要的代价截然不同。