MIT_6.S081_xv6.Information 7:Lock
于2022年3月27日2022年3月27日由Sukuna发布
许多操作系统内核,包括xv6都保持着多线程多进程执行,首先是因为这个xv6有许多个微处理器,这些处理器(CPU)是独立地执行一段代码,共享物理内存,这个时候就会有问题,就是在一个CPU读取数据的时候,另外一个CPU会写数据,或者说多个CPU同时写数据.这些都会出现问题.所以说多进程多线程的同步问题是非常重要的,我们需要一系列同步的手段来保证同步.所以这个词“并发性”代表多个指令同时执行的情况,由于中断操作,线程切换以及多核并行执行,我们不得不考虑并发性的问题.
这一章我们会举出一些由于并发性导致的程序执行错误的例子,并着重介绍一个使用广泛的并发操作:锁.
race
首先我们举一个简单的例子来解释什么叫做race现象.假设有两个进程同时调用wait.wait需要free掉子进程的内存,所以说在每个CPU,这个内核会调用kfree来执行操作,kalloc()会从空闲页链表中取出链表首部,kfree()就是从空闲页链表中放一个页进入首部.这个时候两个进程都会进入到wait调用中,这个时候程序的结果是不确定的!
因为你不知道程序执行的顺序,因为CPU的调度我们具体也无法得知.假设这两个进程分别是A和B,这个时候有一种可能就是,A先取了链表的头,B随后取链表的头,A改完之后放进去,B改完之后放进去.这个时候B的修改覆盖了A的修改,导致了执行的错误.
这个叫做race现象,具体的来说就是,对于内存区域的同一块区域有多个进程同时访问,往往,出现race现象就代表着离出现bug也不远了.对于争用现象,最重要的问题就是我们对与并行程序的控制不是很够,我们不知道并行程序的执行顺序,所以说我们要加上一点控制手段来控制执行程序.
spinlocks
xv6自旋锁(spinlock
)用于内核临界区访问的同步和互斥。自旋锁最大的特征是当进程拿不到锁时会进入无限循环,直到拿到锁退出循环。Xv6使用100ms一次的时钟中断和Round-Robin调度算法来避免陷入自旋锁的进程一直无限循环下去。Xv6允许同时运行多个CPU核,多核CPU上的等待队列实现相当复杂,因此使用自旋锁是相对比较简单且能正确执行的实现方案。
// Mutual exclusion lock.
struct spinlock {
uint locked; // Is the lock held?
// For debugging:
char *name; // Name of lock.
struct cpu *cpu; // The cpu holding the lock.
};
最重要的就是一个int类型的变量,这个变量帮助我们记录一下锁有没有被使用.
首先调用锁之前我们要对锁进行初始化,初始化的操作就是:
代码语言:javascript复制void
initlock(struct spinlock *lk, char *name)
{
lk->name = name;
lk->locked = 0;
lk->cpu = 0;
}
给定一个锁结构,还有一个名字,把名字存进去,其他的元素赋值0即可.
RISC-V会给定一个指令,这个指令叫做amoswap
,这是一个原子指令,使用方法是这个amoswap r, a
.本质上就是lw指令,把地址a的内存元素放进r对应的寄存器,但是有一点比较特殊的是:这个操作可以保证在执行操作的时候r和a都不会被其他CPU访问到.保证了同步控制.(不给其他CPU访问相当于规定了对于这个内存的访问顺序).
xv6使用acquire来获取锁的控制:
代码语言:javascript复制void
acquire(struct spinlock *lk)
{
push_off(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");
// On RISC-V, sync_lock_test_and_set turns into an atomic swap:
// a5 = 1
// s1 = &lk->locked
// amoswap.w.aq a5, a5, (s1)
while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that the critical section's memory
// references happen strictly after the lock is acquired.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();
// Record info about lock acquisition for holding() and debugging.
lk->cpu = mycpu();
}
这个函数会一直调用一个C的库函数,这个库函数会按照注释的方法调用来获取数据,如果这个锁一直都没有释放,就让它一直这么循环下去,直到锁被释放了为止.一释放这个进程就获得锁,在获得的第一瞬间把lock值赋值1.然后获取之后为了调试需要顺便记录一下CPU的值.
下面是锁的释放:
代码语言:javascript复制void
release(struct spinlock *lk)
{
if(!holding(lk))
panic("release");
lk->cpu = 0;
// Tell the C compiler and the CPU to not move loads or stores
// past this point, to ensure that all the stores in the critical
// section are visible to other CPUs before the lock is released,
// and that loads in the critical section occur strictly before
// the lock is released.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();
// Release the lock, equivalent to lk->locked = 0.
// This code doesn't use a C assignment, since the C standard
// implies that an assignment might be implemented with
// multiple store instructions.
// On RISC-V, sync_lock_release turns into an atomic swap:
// s1 = &lk->locked
// amoswap.w zero, zero, (s1)
__sync_lock_release(&lk->locked);
pop_off();
}
首先把cpu的值设置为0,然后通过同步的赋值指令把值赋值,这个就代表这个进程结束了对临界区的访问,现在可以让其他进程访问了.
自旋锁的使用.
xv6使用锁来防止race现象的发生.对于使用锁,下面有几个基本的准则:
第一个就是如果有一个变量,一个CPU写的时候另外一个CPU可以去读,这个时候我们需要保护这个变量.第二,记住锁要保护不变量(不变量指的是在某些时候不能被其他程序改变的变量).总的来说就是,如果有可能有多个进程会对一个数据结构进行更改的话,那么我们必须得采取一定的措施来让进程有序地访问数据结构.
当然过分地使用锁也是不应该的,如果这个程序只可能有一个进程访问,那么就没必要使用锁.
使用锁的一个特征就是,我们把访问和修改数据结构的代码称为临界区,当进入临界区的时候获得锁,当离开临界区的时候释放锁.
代码语言:javascript复制获得锁
临界区
释放锁
防止死锁
我们首先得知道,对于不同锁,我们要保证锁的获取顺序是一定的,如果有一段代码要求先获得A锁,再获得B锁.另外一段代码要求获得B锁,在获得A锁,这个时候就会造成卡死.因为一段代码等待B锁,另外一段代码等待A锁.就互相等待,卡死了.
在xv6里面也有很多这样的锁链,比如说在console.c中,首先获取了cons.lock
,接着调用wakeup函数,这个又获得p.lock
.在文件系统中也是首先获取vdisk.lock
再获取p->lock
.xv6里面的锁都有着巧妙的安排,这样可以避免死锁的发生.
死锁有可能是程序的逻辑结构出现了问题,有时锁的特征事先不知道,可能是因为必须持有一个锁才能发现下一个要获取的锁的特征。最后,死锁发生与否取决于你怎么规划和使用锁,使用的锁越多就越容易死锁.
一个比较朴素的方法就是使用“重入锁”.重入锁的设计就是如果一一个锁是被一个进程拥有的,我们还是允许这个进程再获得这个锁.而不是执行panic指令.然而,事实证明,重入锁使并发性更难推理:重入锁打破了锁导致关键部分相对于其他关键部分是原子的直觉。
代码语言:javascript复制struct spinlock lock;
int data = 0; // protected by lock
f() {
acquire(&lock);
if(data == 0){
call_once();
h();
data = 1;
}
release(&lock);
}
g() {
aquire(&lock);
if(data == 0){
call_once();
data = 1;
}
release(&lock);
}
在这里面,假设h函数就是执行g函数的调用,如果饮用了重入锁的话,这个call_once就会执行两遍.但是不引入重入锁的话就会导致死锁.出于这个问题,我们还是放弃了可重入锁,所以说对于程序的编写者来说,我们尽量确定好锁的申请顺序,防止A进程拥有a锁等待b锁,B进程拥有b锁等待a锁的存在.
获取锁之后进入临界区的时候,由于防止被中断打断进入中断处理程序从而导致死锁,所以说获得锁的死后就会打断中断.举个例子,比如说处理时钟中断的时候,clockintr会获得tickslock.
代码语言:javascript复制void
clockintr()
{
acquire(&tickslock);
ticks ;
wakeup(&ticks);
release(&tickslock);
}
与此同时,在执行中断操作的sys_sleep
函数也会访问tickslock:
uint64
sys_sleep(void)
{
int n;
uint ticks0;
if(argint(0, &n) < 0)
return -1;
acquire(&tickslock);
ticks0 = ticks;
while(ticks - ticks0 < n){
if(myproc()->killed){
release(&tickslock);
return -1;
}
sleep(&ticks, &tickslock);
}
release(&tickslock);
return 0;
}
所以说为了避免中断打断之后的死锁现象,一个CPU运行的代码获得了锁之后,这个CPU的中断就会被禁止.所以说这个时候就要禁用中断,但是禁用中断的时候要改变中断使能寄存器:我们就用一个栈来存储之,这个存储中断使能寄存器的函数就是push_pff
和pop_off
.当然一个CPU对应上的进程能开启多个锁,所以说我们用一个intena来代表这个进程拥有多少锁.所以说我们在acquire中申请一遍,在release中再申请一遍.
为了避免死锁,xv6中规定若一个进程持有
spinlock
,则必须要禁用中断。如果此时中断开启,那么可能会出现以下死锁情况:
- 进程A在内核态运行并拿下了p锁时,触发中断进入中断处理程序。
- 中断处理程序也在内核态中请求p锁,由于锁在A进程手里,且只有A进程执行时才能释放p锁,因此中断处理程序必须返回,p锁才能被释放。
- 那么此时中断处理程序会永远拿不到锁,陷入无限循环,进入死锁。
因此调用push_off
和pop_off
来禁用和开启中断。
获取锁的过程可能嵌套:一个进程获取了锁A,然后再获取锁B,释放锁B时,由于A还未释放,因此不能开启中断。注:struct cpu
中的noff
记录目前的深度,intena
记录在获取第一把锁之前的中断使能状态,当深度为0且intena
为1(所有锁都被释放且最初的使能状态为1)时,才开启中断。
// push_off/pop_off are like intr_off()/intr_on() except that they are matched:
// it takes two pop_off()s to undo two push_off()s. Also, if interrupts
// are initially off, then push_off, pop_off leaves them off.
void
push_off(void)
{
// 获取之前的中断使能状态
int old = intr_get();
// 关闭中断使能
intr_off();
if(mycpu()->noff == 0)
mycpu()->intena = old;
mycpu()->noff = 1;
}
void
pop_off(void)
{
struct cpu *c = mycpu();
if(intr_get())
panic("pop_off - interruptible");
if(c->noff < 1)
panic("pop_off");
c->noff -= 1;
if(c->noff == 0 && c->intena)
intr_on();
}
指令和内存顺序
我们一般会认为指令的执行顺序和我们代码的编辑顺序是一样的,但是其实这个并不绝对.CPU为了执行的速度,往往会做一些修改.假如说A和B一组指令,CPU可能会先让B执行,可能因为B的输入比A的输入先准备好,也有可能是因为执行B的时间比A长,我们不如先执行B,,这样B和A可以同步执行,增加CPU的效率,当然对于编译器,编译器也可能会把后面的指令移动到前面.当然CPU和编译器这么做的前提就是我不会改变输出的结果.但是,这样的特性对于锁来说是不好的,这对多处理器的情况下,有可能会出现与时间有关的问题.总而言之,CPU和编译器可能会提高效率改变指令执行的顺序.
比如说下面的这个例子:假如说第4行的语句移动到第6行之后就会引起严重的后果.
代码语言:javascript复制1 l = malloc(sizeof *l);
2 l->data = data;
3 acquire(&listlock);
4 l->next = list;
5 list = l;
6 release(&listlock);
这个时候对于其他的进程来说看到了被更新的list,但是看到了没有被初始化的list->next.这样是不好的.
去告诉CPU和编译器不要执行这样的重新编排,我们在acquire和release中执行了 __sync_synchronize()
函数,这个函数类似于一个memery barrier
.之前的指令不可以跨过这个memeory_barrier执行.
睡眠锁
自旋锁是这个进程一直等待别的进程把锁释放,这个进程在一直执行一个循环等待释放,这个锁适用于获得锁到释放锁用不了多久的时间的那种,等待的一方执行执行不了不久循环.但是万一这个锁要使用很久呢,一直等待别人,等一会儿还行,等很久我不如先睡一会儿,等别人用完了再叫醒我就好了.
所以说xv6提供一个睡眠锁.
代码语言:javascript复制void
acquiresleep(struct sleeplock *lk)
{
acquire(&lk->lk);
while (lk->locked) {
sleep(lk, &lk->lk);
}
lk->locked = 1;
lk->pid = myproc()->pid;
release(&lk->lk);
}
获得锁的时候,自旋锁就是一直查询,查询到可以使用锁了就接着往下走,但是这个睡眠锁就不一样了,当锁被占用的时候,它就直接引入睡眠模式,当有进程release这个锁的时候这个进程就会被唤醒,然后再查询一遍锁是否是可用的即可.如果是可用的,就往下执行和spinlock一样的操作.
总结:自旋锁就是一直查,睡眠锁就是被叫醒一次查一次.
对于release操作也是一样:首先清空标记,然后把所有为了lk而等待的进程全部唤醒.
代码语言:javascript复制void
releasesleep(struct sleeplock *lk)
{
acquire(&lk->lk);
lk->locked = 0;
lk->pid = 0;
wakeup(lk);
release(&lk->lk);
}