MIT_6.S081_xv6.Information 5:MultiPlexing
于2022年3月25日2022年3月25日由Sukuna发布
多路复用
在xv6中,我们拥有的进程数往往要比CPU核要多.那么我们通过多路复用来进行调度.我们这个时候多路复用来进行调度.所有的进程共用几个多路复用器,使用的方法一般是时分复用.
首先xv6当等待设备完成I/O或者等待子进程退出的时候,就会使用sleep
和wakeup
系统调用来切换进程的状态.同时xv6操作系统并不会允许一个进程占用CPU太多时间,当一个进程连续占用CPU一段时间这个进程也会强制改变状态.这个对于进程来说,它被唤醒和打断是无法侦查的.所以说对于一个进程来说相当于占用了属于自己的CPU.
完成多路复用有一定的挑战,第一点就是如何进行进程的切换,切换CPU的运行状态以及其他部件的状态.第二点,对于用户进程,怎么处理可以让用户进程无法觉察到自己失去了CPU的控制权.这里xv6使用时钟中断来进行切换,(每段时间暂停一遍,进入内核态执行进程切换函数,这样子对于用户进程是透明的).第三点,所有CPU核都会执行一组进程,我们需要设计一个锁结构来防止race的出现.第四点,一个进程的所有内存和其他的资源在进程退出的时候都必须得释放,但是在释放的时候我们很难释放内核态栈.第五点,每一个核都需要知道自己运行进程的序号,否则我们进入内核态的时候不知道使用哪个栈.最后一点,就是sleep和wakeup的系统调用让进程放弃CPU.但是我们需要的注意是在唤醒进程的时候的race现象.
上下文切换
在切换进程的时候首先用户态先进入内核态,然后会把上下文信息放入到内核栈,切换到新的进程,然后新的进程的上下文信息会从内核栈中取出,再切换到用户态.每个进程会拥有一个内核态栈,因为多个进程共用一个内核栈是非常危险的.
保存的信息就是CPU的寄存器的值,同时,恢复也是恢复保存在内核态栈的寄存器值.swtch函数会执行寄存器的保存和提取.
代码语言:javascript复制.globl swtch
swtch:
sd ra, 0(a0)
sd sp, 8(a0)
sd s0, 16(a0)
sd s1, 24(a0)
sd s2, 32(a0)
sd s3, 40(a0)
sd s4, 48(a0)
sd s5, 56(a0)
sd s6, 64(a0)
sd s7, 72(a0)
sd s8, 80(a0)
sd s9, 88(a0)
sd s10, 96(a0)
sd s11, 104(a0)
ld ra, 0(a1)
ld sp, 8(a1)
ld s0, 16(a1)
ld s1, 24(a1)
ld s2, 32(a1)
ld s3, 40(a1)
ld s4, 48(a1)
ld s5, 56(a1)
ld s6, 64(a1)
ld s7, 72(a1)
ld s8, 80(a1)
ld s9, 88(a1)
ld s10, 96(a1)
ld s11, 104(a1)
ret
swtch(&c->context, &p->context);
在这里swtch首先把当前的寄存器信息存放到a0对应的内核栈中,再从a1对应的内核栈中取出数据放到寄存器中.对于这个函数,它并不知道这是什么进程在执行stwch调用.
现在我们知道context(上下文)的内容了,对于每一个进程和CPU的数据结构都有一部分保存上下文.
代码语言:javascript复制// Saved registers for kernel context switches.
struct context {
uint64 ra;
uint64 sp;
// callee-saved
uint64 s0;
uint64 s1;
uint64 s2;
uint64 s3;
uint64 s4;
uint64 s5;
uint64 s6;
uint64 s7;
uint64 s8;
uint64 s9;
uint64 s10;
uint64 s11;
};
在C语言中,a0分别是老进程的context字段的地址,a1是新的进程的context字段的地址.我们发现这里只保存了callee-saved寄存器.但是你会发现,ra寄存器的值被改变了,所以说我知道当函数返回的时候,返回地址改变了,所以说这下pc就变成之前调用swtch的进程调用swtch的PC.这听上去很绕,简单的说就是反悔的PC不是这个进程调用之前的那个PC,而是上个进程调用之前的PC.
Swtch takes two arguments: struct context *old and struct context *new. It saves the current registers in old, loads registers from new, and returns.
我们回到之前的代码,在trap中最后调用了yield函数.
代码语言:javascript复制void
yield(void)
{
struct proc *p = myproc();
acquire(&p->lock);
p->state = RUNNABLE;
sched();
release(&p->lock);
}
首先yield函数讲当前进程的状态改成“可执行”,接着又调用sched()函数.
代码语言:javascript复制void
sched(void)
{
int intena;
struct proc *p = myproc();
if(!holding(&p->lock))
panic("sched p->lock");
if(mycpu()->noff != 1)
panic("sched locks");
if(p->state == RUNNING)
panic("sched running");
if(intr_get())
panic("sched interruptible");
intena = mycpu()->intena;
swtch(&p->context, &mycpu()->context);
mycpu()->intena = intena;
}
先判断各种情况,这个不是特别重要,重要的是我执行了swtch函数,这个函数会把当前进程的上下文保存,然后把scheduler()的上下文拿出来开始执行中间的调度过程.这个调度过程是每个CPU都特有的调度过程,其上下文存放在cpu的context里面,这个context区间每个CPU核都有一个.这下返回的地址不是sched()函数而是scheduler()函数了.也就是说这个地方很巧妙地改变ra寄存器让程序的返回地址改变,返回的是调度函数.这种思路只改变了部分上下文就可以改变运行的程序,非常妙.(最重要的是每个CPU一个防止race现象)
调度方法
现在完成了第一步,从原进程到调度程序,对于第一步,都是先获得进程的锁,然后更改进程的状态然后调用sched,这个对于sleep还是yiled还是exit都是一样.sched函数会进行一定的检查,然后最后sched会调用swtch转移到scheduler函数.
代码语言:javascript复制void
scheduler(void)
{
struct proc *p;
struct cpu *c = mycpu();
c->proc = 0;
for(;;){
// Avoid deadlock by ensuring that devices can interrupt.
intr_on();
for(p = proc; p < &proc[NPROC]; p ) {
acquire(&p->lock);
if(p->state == RUNNABLE) {
// Switch to chosen process. It is the process's job
// to release its lock and then reacquire it
// before jumping back to us.
p->state = RUNNING;
c->proc = p;
swtch(&c->context, &p->context);
// Process is done running for now.
// It should have changed its p->state before coming back.
c->proc = 0;
}
release(&p->lock);
}
}
}
接着根据scheduler的返回地址开始执行,首先这个函数是一个永远循环下去的循环,接着把上一次调用swtch进程的锁给释放了.你会发现这个十分巧妙,因为进程之间的解锁和上锁是通过swtch实现的,swtch的调用者已经有了锁,接着这个锁传递给scheduler.
由于scheduler是swtch(&c->context, &p->context);
开始执行的,所以说第一步就是标记CPU正在运行的进程,把这个进程改成0(NULL),然后再来释放这个锁.(因为当前上锁的进程一定是上一次进入的进程.)这个时候在sched()上的锁,在scheduler()释放.在scheduler()获得的锁,在sched()释放因为这是一个回环.这个时候你可以认为scheduler和sched是一个回环.当然也不绝对,因为当新的进程第一次运行的时候,返回是从forkret函数返回的,这个是在frok函数一开始就已经设定好的.第一次运行执行forkret函数,然后通过usertrapret返回到用户态.
void
forkret(void)
{
static int first = 1;
// Still holding p->lock from scheduler.
release(&myproc()->lock);
if (first) {
// File system initialization must be run in the context of a
// regular process (e.g., because it calls sleep), and thus cannot
// be run from main().
first = 0;
fsinit(ROOTDEV);
}
usertrapret();
}
scheduler运行一个永远运行的循环,首先找到一个可以运行的进程,一直运行直到调用yield()函数,这个可以运行的定义就是p->state==RUNNABLE.每一次找到可以运行的进程,都会把这个进程的进程信息放到当前cpu的结构体里面,标记为RUNNING.
首先我们知道,当一个进程的状态是RUNNING,我们可以安全地保存这个进程的上下文,因为现在这个CPU的寄存器是属于某个进成的寄存器,还有一个就是挑选属于RUNNABLE的进程它也是为了防止出现问题.
由于要保存进程的状态,所以说这就是为什么xv6需要给处理进程的代码上锁了,就是因为进程状态这个变量就是临界变量,这块代码是临界区
当前的CPU和进程信息.
我们需要记录当前的进程指针来获取信息,一般来说,如果你的机器是一个核的,我们可以设置一个全局变量,但是我们的机器是多核的,每个核执行不同的进程,这个方案就有一定的问题.
所以说xv6维护一个结构体叫做CPU,这个CPU存储着当前CPU核正在运行什么核.
代码语言:javascript复制// Per-CPU state.
struct cpu {
struct proc *proc; // The process running on this cpu, or null.
struct context context; // swtch() here to enter scheduler().
int noff; // Depth of push_off() nesting.
int intena; // Were interrupts enabled before push_off()?
};
这个结构体存放着scheduler的上下文,以及当前运行的进程.
代码语言:javascript复制struct cpu*
mycpu(void) {
int id = cpuid();
struct cpu *c = &cpus[id];
return c;
}
int
cpuid()
{
int id = r_tp();
return id;
}
这个时候可以获得cpuid以及对应的结构体.因为CPU的核号是存放在tp寄存器的.在mstart中,就是在启动的时候已经把CPU的核号已经送入到tp寄存器中了,在M-mode的时候,usertrapret把tp寄存器保存在trapoline寄存器中.并且编译器永远不会把tp寄存器改变,所以说我们可以很方便地获得cpu核的id值.
当然,为了保证CPU返回不会被时钟中断打扰,调用这个函数要求使用cpu结构体的时候关闭中断,当使用完毕之后再来开启中断.
当然我们还可以使用myproc()
函数来获得当前cpu运行的进程结构体:
// Return the current struct proc *, or zero if none.
struct proc*
myproc(void) {
push_off();
struct cpu *c = mycpu();
struct proc *p = c->proc;
pop_off();
return p;
}
睡眠与唤醒
sleep和wakeup调用给底层提供了一个同步接口,我们可以根据这个同步接口构建一个叫做信号量的顶层接口,信号量的定义和P-V操作我们都在操作系统课上学过了,我们就不需要解释究竟什么是P-V操作.贴个代码吧:
代码语言:javascript复制100 struct semaphore {
101 struct spinlock lock;
102 int count;
103 };
104
400 void
401 V(struct semaphore *s)
402 {
403 acquire(&s->lock);
404 s->count = 1;
405 wakeup(s);
406 release(&s->lock);
407 }
408
409 void
410 P(struct semaphore *s)
411 {
412 acquire(&s->lock);
413 while(s->count == 0)
414 sleep(s, &s->lock);
415 s->count -= 1;
416 release(&s->lock);
417 }
P-V操作需要获得锁,因为对于信号量,同时可以有多个进程对信号量进行操作.
总而言之,P-V操作要求我们sleep(s,s->lock),要求这个进程为了s信号灯而等待,放弃当前CPU的占用,wakeup(s)要求通知所有为s信号灯而等待的进程,有位占了.
代码语言:javascript复制// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void *chan, struct spinlock *lk)
{
struct proc *p = myproc();
// Must acquire p->lock in order to
// change p->state and then call sched.
// Once we hold p->lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup locks p->lock),
// so it's okay to release lk.
acquire(&p->lock); //DOC: sleeplock1
release(lk);
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
release(&p->lock);
acquire(lk);
}
首先先标记一下,这个目前是睡眠状态.还标记一下睡眠的理由,就是proc的chan元素.然后进行进程调度,因为这个程序在返回的时候还是需要对信号灯进行更改,所以说在返回的时候还是需要信号灯的锁.但是当进程进入睡眠状态就可以不需要信号灯的锁了.记录进程是为了谁而睡眠的十分重要.
对应的,在wakeup()函数:
代码语言:javascript复制// Wake up all processes sleeping on chan.
// Must be called without any p->lock.
void
wakeup(void *chan)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p ) {
if(p != myproc()){
acquire(&p->lock);
if(p->state == SLEEPING && p->chan == chan) {
p->state = RUNNABLE;
}
release(&p->lock);
}
}
}
这个时候就寻找一个正在睡眠的并且因为chan这个原因睡眠的进程,表示你要的资源已经到达,这个时候就可以提醒这些进程现在可以执行了.(为什么要一次性全部通知,这样会错吗?其实不会,因为是执行while循环的,如果发现s->count还是小于0,那我还是接着睡吧zzz)
由于我们对很多临界变量,比如说信号灯,进程控制块进行了修改,所以说锁结构是必要的.
管道
xv6的管道使用sleep和wakeup的操作来进行复杂的同步通讯.我们在之前已经了解过Linux的管道系统.
代码语言:javascript复制struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
int readopen; // read fd is still open
int writeopen; // write fd is still open
};
在xv6,我们使用pipe这个数据结构,首先我们看到一个锁结构,还有一个数据buffer.还有nread和nwrite来表示目前管道的两端已经读出了几个元素,已经写入几个元素.这个数据buffer还是一个循环队列,也就是第PIPELINE-1项之后就是第0项.循环队列的判空和判满非常容易.由于在这里我们使用了类似于TCP id的模式处理,所以说我们读区元素使用%运算来进行读取.
代码语言:javascript复制int
pipewrite(struct pipe *pi, uint64 addr, int n)
{
int i = 0;
struct proc *pr = myproc();
acquire(&pi->lock);
while(i < n){
if(pi->readopen == 0 || pr->killed){
release(&pi->lock);
return -1;
}
if(pi->nwrite == pi->nread PIPESIZE){ //DOC: pipewrite-full
wakeup(&pi->nread);
sleep(&pi->nwrite, &pi->lock);
} else {
char ch;
if(copyin(pr->pagetable, &ch, addr i, 1) == -1)
break;
pi->data[pi->nwrite % PIPESIZE] = ch;
i ;
}
}
wakeup(&pi->nread);
release(&pi->lock);
return i;
}
首先就是pipewrite,在对管道这个临界资源修改之前我们先上锁,所以说读的时候不能写,写的时候不能读,也不能同时读也不能同时写,这个就是锁的魅力.
首先就是各种异常情况,这个不说了,接着就是满的情况,那么只能让暂时让写的进程休眠,顺便让那群没字节可读的进程醒过来.如果没有满,那就一个一个字节地往buffer里面写,写的操作就是普通的循环队列,只不过这里巧妙地使用了wakeup和sleep调节满和空之间的平衡.
代码语言:javascript复制int
piperead(struct pipe *pi, uint64 addr, int n)
{
int i;
struct proc *pr = myproc();
char ch;
acquire(&pi->lock);
while(pi->nread == pi->nwrite && pi->writeopen){ //DOC: pipe-empty
if(pr->killed){
release(&pi->lock);
return -1;
}
sleep(&pi->nread, &pi->lock); //DOC: piperead-sleep
}
for(i = 0; i < n; i ){ //DOC: piperead-copy
if(pi->nread == pi->nwrite)
break;
ch = pi->data[pi->nread % PIPESIZE];
if(copyout(pr->pagetable, addr i, &ch, 1) == -1)
break;
}
wakeup(&pi->nwrite); //DOC: piperead-wakeup
release(&pi->lock);
return i;
}
对于读也是一样,如果管道是空的,那么没字节可读,那就让自己休眠.顺便通知写的进程抓紧来写,如果不是空的,那就慢慢来读.一个字一个字地读.
由于读写区间都上了锁,所以说这样可以保证只有一个进程能对一个管道进行处理,防止乱套.
等待,退出和杀死的系统调用
首先就是wait的系统调用:wait调用就是等待子进程退出,退出了之后父进程就可以继续执行.
代码语言:javascript复制int
wait(uint64 addr)
{
struct proc *np;
int havekids, pid;
struct proc *p = myproc();
acquire(&wait_lock);
for(;;){
// Scan through table looking for exited children.
havekids = 0;
for(np = proc; np < &proc[NPROC]; np ){
if(np->parent == p){
// make sure the child isn't still in exit() or swtch().
acquire(&np->lock);
havekids = 1;
if(np->state == ZOMBIE){
// Found one.
pid = np->pid;
if(addr != 0 && copyout(p->pagetable, addr, (char *)&np->xstate,
sizeof(np->xstate)) < 0) {
release(&np->lock);
release(&wait_lock);
return -1;
}
freeproc(np);
release(&np->lock);
release(&wait_lock);
return pid;
}
release(&np->lock);
}
}
// No point waiting if we don't have any children.
if(!havekids || p->killed){
release(&wait_lock);
return -1;
}
// Wait for a child to exit.
sleep(p, &wait_lock); //DOC: wait-sleep
}
}
这个程序的代码非常好懂,由于这个wait比较菜,只需要任何一个子进程退出就可以了,所以说实现起来不难.首先判断这个进程有没有一个儿子,并且这个儿子就是僵尸状态的,如果有的话就太好了,我就不用wait了,我就把这个儿子给释放了.并且把信息传递给用户那边.如果没有找到,那还是乖乖等着吧.我们获得这些锁,第一是为了保护进程,第二个是为了防止多线程访问..导致这个pid和havekids出现问题.
接着就是exit了,由于wait调用要等待有没有儿子退出,所以说sleep和wakeup是要配套的,wakeup就是在exit里面实现的.
代码语言:javascript复制void
exit(int status)
{
struct proc *p = myproc();
if(p == initproc)
panic("init exiting");
// Close all open files.
for(int fd = 0; fd < NOFILE; fd ){
if(p->ofile[fd]){
struct file *f = p->ofile[fd];
fileclose(f);
p->ofile[fd] = 0;
}
}
begin_op();
iput(p->cwd);
end_op();
p->cwd = 0;
acquire(&wait_lock);
// Give any children to init.
reparent(p);
// Parent might be sleeping in wait().
wakeup(p->parent);
acquire(&p->lock);
p->xstate = status;
p->state = ZOMBIE;
release(&wait_lock);
// Jump into the scheduler, never to return.
sched();
panic("zombie exit");
}
首先看看是不是init的守护进程要退出,不是就还好.第一步就是解决打开的文件问题,一一关闭文件即可.接着就是reparent,就是如果它的父进程要被exit掉,但是子进程还在存活,就需要使用reparent来处理父进程.如果这个进程有父进程,顺便叫醒正在沉睡的,等待它的儿子调用exit()的爸爸.接着由于这个已经退出了,所以说转进程调度吧.
exit是自己退场,那么kill是勒令让别人退场.
代码语言:javascript复制int
kill(int pid)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p ){
acquire(&p->lock);
if(p->pid == pid){
p->killed = 1;
if(p->state == SLEEPING){
// Wake process from sleep().
p->state = RUNNABLE;
}
release(&p->lock);
return 0;
}
release(&p->lock);
}
return -1;
}
//usertrapret()
if(p->killed)
exit(-1);
这个就像注射了慢性毒药,我在kill函数什么都不做,我只是设定一个killed为1,然后这个进程在执行usertrapret的时候由于killed值为1,这个时候就它会自己调用exit()然后退场.