实时Linux内核的实现

2021-10-28 10:31:41 浏览数 (1)

实时系统要求对事件的响应时间不能超过规定的期限,响应时间是指从某个事件发生到负责处理这个事件的进程处理完成的时间间隔,最大响应时间应该是确定的、可以预测的。

实时分为硬实时和软实时,硬实时要求绝对保证响应时间不超过期限,如果超过期限,会造成灾难性的后果,例如汽车在发生碰撞事故时必须快速展开安全气囊;软实时只需尽力使响应时间不超过期限,如果偶尔超过期限,不会造成灾难性的后果,例如数字电视机顶盒,需要实时地对视频流解码,偶尔丢失几个视频帧,影响不大。

RTLinux、QNX和VxWorks这些操作系统提供了硬实时能力,Linux这种通用操作系统只能提供软实时能力。

目前Linux内核主线不支持软实时,而是使用下面2个仓库存放和Linux内核主线的版本对应的实时内核的源代码。

(1)仓库“http://git.kernel.org/cgit/linux/kernel/git/rt/linux-rt-devel.git”。

(2)仓库“http://git.kernel.org/cgit/linux/kernel/git/rt/linux-stable-rt.git”。

第1个仓库存放正在开发的实时内核的源代码。在发布稳定的版本以后,把开发版本转移到第2个仓库。

内核社区原计划在5.3版本把软实时补丁合并到主线,但是测试的时候发现了问题,所以放弃了。直到5.11版本还没有把软实时补丁合并到主线。

本文分析软实时Linux内核5.10.8版本。下载源代码的方法是:从“https://mirrors.edge.kernel.org/pub/linux/kernel/v5.x/”下载5.10.8版本的内核压缩包,然后从“https://mirrors.edge.kernel.org/pub/linux/kernel/projects/rt/5.10/”下载5.10.8版本的实时补丁压缩包,把实时补丁应用到内核源代码树。

1.影响实时性的因素

从某个事件发生到负责处理这个事件的进程开始执行,过程如下。

(1)硬件检测到某个事件,发送中断请求信号给中断控制器。

(2)中断控制器把中断请求分发给某个处理器。

(3)处理器响应中断请求,执行中断处理程序。

(4)中断处理程序唤醒负责处理这个事件的实时进程。

(5)进程调度器调度进程,选中负责处理这个事件的实时进程。

(6)实时进程开始执行,处理事件。

导致处理器不能及时响应中断的因素有:正在执行中断处理程序,或者正在执行禁止中断的临界区。对应的解决方法如下。

(1)中断线程化,也就是使用内核线程执行中断处理函数。

(2)为了减小时钟中断处理程序的执行时间,把高精度定时器的到期模式分为软中断到期模式和硬中断到期模式,大多数高精度定时器使用软中断到期模式,在软中断里面执行。

(3)如果使用内核线程执行中断处理函数,那么原来禁止硬中断的临界区不需要禁止硬中断,为了兼顾非实时内核和实时内核,引入本地锁,非实时内核把本地锁映射到禁止内核抢占和禁止硬中断,实时内核把本地锁映射到基于实时互斥锁实现的自旋锁。

导致实时进程不能被及时调度的因素如下。

(1)正在执行软中断。

(2)正在执行禁止软中断(调用函数local_bh_disable())的临界区。

(3)正在执行禁止内核抢占(调用函数preempt_disable())的临界区。

(4)正在执行RCU保护(调用函数rcu_read_lock()或rcu_read_lock_bh())的读端临界区,禁止内核抢占。

(5)正在执行自旋锁或读写锁保护的临界区。

(6)需要申请的锁(包括互斥锁、伤害/等待互斥锁和读写信号量)被优先级低的进程持有,导致优先级高的进程等待优先级低的进程,发生优先级反转。

对应的解决方法如下。

(1)软中断全部由软中断线程执行。

(2)如果软中断全部由软中断线程执行,那么原来禁止软中断的临界区可以变成可抢占的,和软中断线程使用本地锁互斥。

(3)在实时内核中大多数禁止内核抢占的临界区可以变成可抢占的,为了兼顾非实时内核和实时内核,引入本地锁,非实时内核把本地锁映射到禁止内核抢占和禁止硬中断,实时内核把本地锁映射到使用实时互斥锁实现的自旋锁。

(4)实现可抢占RCU,把RCU保护的读端临界区变成可以抢占的。

(5)把自旋锁和读写锁替换为可以抢占的、支持优先级继承的锁。

(6)互斥锁、伤害/等待互斥锁和读写信号量支持优先级继承。

实时进程执行的时候,影响实时性的因素如下。

(1)Linux内核使用虚拟内存,对用户空间的内存(包括栈、代码段、数据段以及使用函数malloc()或mmap()动态分配的内存)使用惰性分配的策略,如果实时进程访问的虚拟页没有映射到物理页,那么会触发页错误异常,影响实时性。

(2)Linux内核在内存不足的时候会回收物理页,导致实时进程访问的虚拟页没有映射到物理页,影响实时性。

对应的解决方法是:实时进程在main()函数里面使用函数malloc()或mmap()预先分配内存,并且调用函数mlockall(),把所有虚拟页映射到物理页,锁定在内存中,阻止内核回收这些物理页。

2.内核抢占模型

抢占分为用户抢占和内核抢占,用户抢占是指允许进程在用户模式执行的时候被抢占,内核抢占是指允许进程在内核模式执行的时候被抢占。用户抢占总是无条件支持的,并且不可以关闭。内核抢占取决于内核是不可抢占内核还是可抢占内核,在可抢占内核中,可以在一个临界区里面禁止内核抢占。

为了能够合并到内核主线(Linux是通用操作系统,需要满足不同场合的需求),软实时Linux内核采用非常灵活的策略,划分了5种内核抢占模型,如下。

(1)“No Forced Preemption (Server)”,不可抢占内核,配置宏是CONFIG_PREEMPT_NONE。这是传统的抢占模型,目标是使吞吐量最大化。大多数时候提供良好的延迟,但是没有保证,可能偶尔出现长的延迟。这种模型主要用于服务器和科学计算系统。如果希望使内核的处理能力最大化,不考虑调度延迟,那么应该选择这种模型。

(2)“Voluntary Kernel Preemption (Desktop)”,自愿内核抢占,配置宏是CONFIG_PREEMPT_VOLUNTARY。这种模型通过增加抢占点的方式减小延迟。在内核里面选择性地增加了一些抢占点,目的是减小最大调度延迟和对交互事件提供更快的响应,代价是稍微降低吞吐量。当低优先级进程在内核模式执行的时候,在预定的抢占点自愿被抢占。这种模型主要用于桌面系统。它减少了长延迟(几百毫秒到几秒)的发生,但是没有消除。

(3)“Preemptible Kernel (Low-Latency Desktop)”,低延迟可抢占内核,配置宏是CONFIG_PREEMPT__LL。这种模型使除了临界区以外的所有内核代码是可以抢占的。当低优先级进程在内核模式执行的时候,可以非自愿地被抢占。这种模型提供了很低的响应延迟,最坏情况的延迟时间是几毫秒,代价是稍微降低吞吐量和稍微增加运行时开销。这种模型主要用于有毫秒级别延迟需求的桌面系统和嵌入式系统。

(4)“Preemptible Kernel (Basic RT)”,基本实时内核,配置宏是CONFIG_PREEMPT_RTB。这种模型基本上和低延迟可抢占内核相同,但是开启了完全抢占内核的初步修改。

(5)“Fully Preemptible Kernel (RT)”,完全抢占内核,也称为实时内核,配置宏是CONFIG_PREEMPT_RT_FULL。这种模型把自旋锁和读写锁替换为可以抢占的、支持优先级继承的锁,强制中断线程化,并且引入各种机制来打破长的、不可抢占的临界区。这种模型主要用于延迟要求为100微秒或稍低(几十微秒)的实时系统。

“基本实时内核”很少用,存在的意义不大。软实时Linux内核5.4版本删除了“基本实时内核”,划分了4种内核抢占模型,如下。

(1)“No Forced Preemption (Server)”,不可抢占内核,配置宏是CONFIG_PREEMPT_NONE。

(2)“Voluntary Kernel Preemption (Desktop)”,自愿内核抢占,配置宏是CONFIG_PREEMPT_VOLUNTARY。

(3)“Preemptible Kernel (Low-Latency Desktop)”,低延迟可抢占内核,配置宏是CONFIG_PREEMPT。注意:把配置宏从CONFIG_PREEMPT__LL修改为CONFIG_PREEMPT。

(4)“Fully Preemptible Kernel (Real-Time)”,完全抢占内核,也称为实时内核,配置宏是CONFIG_PREEMPT_RT。注意:把配置宏从CONFIG_PREEMPT_RT_FULL修改为CONFIG_PREEMPT_RT。

Linux内核主线在5.3版本增加了配置宏CONFIG_PREEMPT_RT,但是还没有把软实时补丁合并进来。

自愿内核抢占模型使用宏might_sleep()在内核里面增加抢占点,例如在函数mutex_lock()里面增加抢占点,如下。

kernel/locking/mutex.c

void __sched mutex_lock(struct mutex *lock)

{

might_sleep();

if (!__mutex_trylock_fast(lock))

__mutex_lock_slowpath(lock);

}

宏might_sleep()的定义如下。

include/linux/kernel.h

# define might_sleep() do { might_resched(); } while (0)

#ifdef CONFIG_PREEMPT_VOLUNTARY

extern int _cond_resched(void);

# define might_resched() _cond_resched()

#else

# define might_resched() do { } while (0)

#endif

kernel/sched/core.c

#ifndef CONFIG_PREEMPTION

int __sched _cond_resched(void)

{

if (should_resched(0)) {

preempt_schedule_common();

return 1;

}

...

return 0;

}

#endif

3.调度策略

Linux内核为实时进程提供了2种调度器:限期调度器和POSIX实时调度器(简称实时调度器)。限期调度器根据截止期限调度实时进程,实时调度器根据优先级调度实时进程。

限期调度器实现限期调度策略(SCHED_DEADLINE),实时调度器提供2种调度策略:先进先出调度(SCHED_FIFO)和轮流调度(SCHED_RR)。

限期调度策略有3个参数:运行时间runtime、截止期限deadline和周期period。如图3.1所示,每个周期运行一次,在截止期限之前执行完,一次运行的时间长度是runtime。

图3.1 限期调度策略

先进先出调度没有时间片,非常霸道,如果没有更高优先级的实时进程,并且它不睡眠,那么它将一直霸占处理器。

轮流调度有时间片,进程用完时间片以后加入优先级对应运行队列的尾部,把处理器让给优先级相同的其他实时进程。

先进先出调度和轮流调度的主要区别是对优先级相同的实时进程的处理策略不同:前者不会把处理器让给优先级相同的实时进程,后者会把处理器让给优先级相同的实时进程。

如果选择限期调度策略,那么需要提供3个参数,看起来复杂,但参数只依赖应用程序自身,不需要考虑系统中的所有其他实时进程。如果选择实时调度策略,那么需要考虑系统中的所有实时进程,根据它们的紧急程度为每个实时进程选择合适的优先级。

4.中断线程化

中断线程化是使用内核线程执行中断处理函数,内核线程的名称是“irq/<irq>-<devname>”(<irq>是Linux中断号,<devname>是设备名称),调度策略是SCHED_FIFO,实时优先级是50。

函数request_threaded_irq()用来注册中断处理函数,原型如下。

include/linux/interrupt.h

extern int __must_check

request_threaded_irq(unsigned int irq, irq_handler_t handler,

irq_handler_t thread_fn,

unsigned long flags, const char *name, void *dev);

参数handler指定主函数,主函数在硬中断上下文里面被调用,需要检查中断是不是对应的外围设备发送的。如果中断是对应的外围设备发送的,那么handler函数返回IRQ_HANDLED或IRQ_WAKE_THREAD。如果不需要进一步的处理,那么返回IRQ_HANDLED。如果需要进一步的处理,那么返回IRQ_WAKE_THREAD,中断处理程序将会唤醒中断处理线程,执行参数thread_fn指定的函数。如果多个外围设备共享同一个硬件中断号(即多个外围设备的中断请求线连接到中断控制器的同一个引脚,现在这种用法很少),那么参数handler必须指定一个函数。其他情况通常把参数handler设置为空指针。

参数thread_fn指定中断处理线程调用的函数。如果参数thread_fn是空指针,那么不创建中断处理线程。

如果参数handler是空指针,但是参数thread_fn不是空指针,那么使用默认的主函数irq_default_primary_handler()。函数irq_default_primary_handler()的代码如下,这个函数直接返回IRQ_WAKE_THREAD。

kernel/irq/manage.c

static irqreturn_t irq_default_primary_handler(int irq, void *dev_id)

{

return IRQ_WAKE_THREAD;

}

少数中断不能线程化,典型的例子是时钟中断。对于不能线程化的中断,注册处理函数的时候必须设置标志IRQF_NO_THREAD。

有些旧的设备驱动程序调用旧的函数request_irq()注册中断处理函数。函数request_irq()的代码如下。

include/linux/interrupt.h

static inline int __must_check

request_irq(unsigned int irq, irq_handler_t handler, unsigned long flags,

const char *name, void *dev)

{

return request_threaded_irq(irq, handler, NULL, flags, name, dev);

}

这个函数调用函数request_threaded_irq(),把参数thread_fn设置为空指针。实时内核开启强制中断线程化的配置宏CONFIG_IRQ_FORCED_THREADING。在开启强制中断线程化的情况下,如果参数thread_fn是空指针,并且没有设置标志IRQF_NO_THREAD,那么函数request_threaded_irq()强制线程化,把主函数设置为默认的函数irq_default_primary_handler(),把线程函数设置为参数handler指定的函数。

5.高精度定时器

在实时内核中,把高精度定时器的到期模式分为软中断到期模式(HRTIMER_MODE_SOFT)和硬中断到期模式(HRTIMER_MODE_HARD),如下。

include/linux/hrtimer.h

1 enum hrtimer_mode {

2 HRTIMER_MODE_ABS = 0x00,

3 HRTIMER_MODE_REL = 0x01,

4 HRTIMER_MODE_PINNED = 0x02,

5 HRTIMER_MODE_SOFT = 0x04,

6 HRTIMER_MODE_HARD = 0x08,

7

8 HRTIMER_MODE_ABS_PINNED = HRTIMER_MODE_ABS | HRTIMER_MODE_PINNED,

9 HRTIMER_MODE_REL_PINNED = HRTIMER_MODE_REL | HRTIMER_MODE_PINNED,

10

11 HRTIMER_MODE_ABS_SOFT = HRTIMER_MODE_ABS | HRTIMER_MODE_SOFT,

12 HRTIMER_MODE_REL_SOFT = HRTIMER_MODE_REL | HRTIMER_MODE_SOFT,

13

14 HRTIMER_MODE_ABS_PINNED_SOFT = HRTIMER_MODE_ABS_PINNED | HRTIMER_MODE_SOFT,

15 HRTIMER_MODE_REL_PINNED_SOFT = HRTIMER_MODE_REL_PINNED | HRTIMER_MODE_SOFT,

16

17 HRTIMER_MODE_ABS_HARD = HRTIMER_MODE_ABS | HRTIMER_MODE_HARD,

18 HRTIMER_MODE_REL_HARD = HRTIMER_MODE_REL | HRTIMER_MODE_HARD,

19

20 HRTIMER_MODE_ABS_PINNED_HARD = HRTIMER_MODE_ABS_PINNED | HRTIMER_MODE_HARD,

21 HRTIMER_MODE_REL_PINNED_HARD = HRTIMER_MODE_REL_PINNED | HRTIMER_MODE_HARD,

22 };

软中断到期模式的高精度定时器,到期的时候在类型为HRTIMER_SOFTIRQ的软中断里面执行定时器回调函数。在实时内核中,软中断由软中断线程执行,或者在进程开启软中断的时候执行。

硬中断到期模式的高精度定时器,到期的时候在时钟中断处理程序里面执行定时器回调函数。

如果没有指定到期模式,那么在实时内核中默认使用软中断到期模式。

为了减小时钟中断处理程序的执行时间,大多数高精度定时器应该使用软中断到期模式。少数高精度定时器必须使用硬中断到期模式,如下。

(1)必须在硬中断里面执行,例如进程调度器周期性地调度进程。

(2)对延迟很敏感,例如函数nanosleep()把睡眠时间精确到纳秒。

在内核主线中,4.16版本以前高精度定时器总是在硬中断里面执行定时器回调函数,4.16版本增加HRTIMER_MODE_SOFT:如果把到期模式指定为HRTIMER_MODE_SOFT,那么在类型为HRTIMER_SOFTIRQ的软中断里面执行定时器回调函数,否则在硬中断里面执行定时器回调函数。

5.4版本为内核主线将来支持软实时做准备,增加HRTIMER_MODE_HARD,如下。

(1)如果把到期模式指定为HRTIMER_MODE_HARD,那么在硬中断里面执行定时器回调函数,即使在实时内核中也是这样做。

(2)如果没有指定到期模式,那么在非实时内核中默认使用硬中断到期模式,在实时内核中默认使用软中断到期模式。

6.软中断线程化

在非实时内核中,一部分软中断在中断处理程序的后半部分执行,有时间限制:最多执行10轮,并且总时间不超过2毫秒。剩下的软中断由软中断线程执行,或者在进程开启软中断(调用函数local_bh_enable)的时候执行。每个处理器有一个软中断线程,名称是“ksoftirqd/<cpu>”(<cpu>是处理器编号),调度策略是SCHED_NORMAL,优先级是120。

在实时内核中,软中断由软中断线程执行,或者在进程开启软中断的时候执行。中断处理程序的后半部分唤醒当前处理器上的软中断线程,代码如下。

kernel/softirq.c

1 void irq_exit(void)

2 {

3 __irq_exit_rcu();

4 ...

5 }

6

7 static inline void __irq_exit_rcu(void)

8 {

9 ...

10 if (!in_interrupt() && local_softirq_pending())

11 invoke_softirq();

12 ...

13 }

14

15 #ifdef CONFIG_PREEMPT_RT

16 static inline void invoke_softirq(void)

17 {

18 if (should_wake_ksoftirqd())

19 wakeup_softirqd();

20 }

21 #else /* CONFIG_PREEMPT_RT */

22 ...

23 #endif /* !CONFIG_PREEMPT_RT */

24

25 static void wakeup_softirqd(void)

26 {

27 /* Interrupts are disabled: no need to stop preemption */

28 struct task_struct *tsk = __this_cpu_read(ksoftirqd);

29

30 if (tsk && tsk->state != TASK_RUNNING)

31 wake_up_process(tsk);

32 }

7.解决RCU读端临界区不可抢占的问题

Linux内核支持3种RCU,如下。

(1)不可抢占RCU(RCU-sched),不允许进程在读端临界区被其他进程抢占。

(2)加速版不可抢占RCU(RCU-bh)是针对不可抢占RCU的改进,在软中断很多的情况可以缩短宽限期。

(3)可抢占RCU(RCU-preempt),也称为实时RCU。可抢占RCU允许进程在读端临界区被其他进程抢占。编译内核时需要开启配置宏CONFIG_PREEMPT_RCU。

4.20版本做了修改:在非抢占式内核中把RCU-bh、RCU-preempt和RCU-sched合并为RCU-sched,在抢占式内核中把RCU-bh、RCU-preempt和RCU-sched合并为RCU-preempt。

实时内核强制开启可抢占RCU的配置宏CONFIG_PREEMPT_RCU,rcu_read_lock()、rcu_read_unlock()和call_rcu()这些函数使用可抢占RCU实现,所以使用rcu_read_lock()和rcu_read_unlock()保护的读端临界区是可以抢占的。

如果读端临界区绝对不能被抢占,那么应该使用不可抢占RCU提供的函数rcu_read_lock_sched()和rcu_read_unlock_sched()保护临界区,函数rcu_read_lock_sched()禁止内核抢占,函数rcu_read_unlock_sched()开启内核抢占。

RCU-bh使用函数rcu_read_lock_bh()和rcu_read_unlock_bh()保护读端临界区,rcu_read_lock_bh()等价于“rcu_read_lock() 禁止软中断”,rcu_read_unlock_bh()等价于“rcu_read_unlock() 开启软中断”。在实时内核中,软中断全部由软中断线程执行,RCU-bh保护的读端临界区是可以抢占的,只需和当前处理器上的软中断线程互斥。函数rcu_read_lock_bh()的代码如下。

include/linux/rcupdate.h

1 static inline void rcu_read_lock_bh(void)

2 {

3 local_bh_disable();

4 ...

5 }

6

7 include/linux/bottom_half.h

8 static inline void local_bh_disable(void)

9 {

10 __local_bh_disable_ip(_THIS_IP_, SOFTIRQ_DISABLE_OFFSET);

11 }

12

13 kernel/softirq.c

14 #ifdef CONFIG_PREEMPT_RT

15 void __local_bh_disable_ip(unsigned long ip, unsigned int cnt)

16 {

17 unsigned long flags;

18 int newcnt;

19

20 ...

21 /* First entry of a task into a BH disabled section? */

22 if (!current->softirq_disable_cnt) {

23 if (preemptible()) {

24 local_lock(&softirq_ctrl.lock);

25 /* Required to meet the RCU bottomhalf requirements. */

26 rcu_read_lock();

27 } else {

28 DEBUG_LOCKS_WARN_ON(this_cpu_read(softirq_ctrl.cnt));

29 }

30 }

31 ...

32 }

33 #else /* CONFIG_PREEMPT_RT */

34 ...

35 #endif /* !CONFIG_PREEMPT_RT */

第22行,如果当前进程第1次禁止软中断,那么处理如下。

(1)第23行,如果开启了内核抢占,那么处理如下。

q第24行,使用本地锁“softirq_ctrl.lock”和当前处理器上的软中断线程互斥。

q第26行调用函数rcu_read_lock()标记进入RCU读端临界区。

(2)第27~29行,如果禁止了内核抢占,并且当前处理器的softirq_ctrl.cnt不是0(说明其他进程禁止软中断以后被抢占或者软中断线程被抢占),那么这种用法是错误的,打印警告信息,应该在开启内核抢占的情况下禁止软中断。

函数rcu_read_unlock_bh()用来标记退出RCU-bh读端临界区,代码如下。

include/linux/rcupdate.h

1 static inline void rcu_read_unlock_bh(void)

2 {

3 ...

4 local_bh_enable();

5 }

6

7 include/linux/bottom_half.h

8 static inline void local_bh_enable(void)

9 {

10 __local_bh_enable_ip(_THIS_IP_, SOFTIRQ_DISABLE_OFFSET);

11 }

12

13 kernel/softirq.c

14 #ifdef CONFIG_PREEMPT_RT

15 void __local_bh_enable_ip(unsigned long ip, unsigned int cnt)

16 {

17 ...

18 out:

19 __local_bh_enable(cnt, preempt_on);

20 ...

21 }

22

23 static void __local_bh_enable(unsigned int cnt, bool unlock)

24 {

25 ...

26 if (!newcnt && unlock) {

27 rcu_read_unlock();

28 local_unlock(&softirq_ctrl.lock);

29 }

30 }

31

32 #else /* CONFIG_PREEMPT_RT */

33 ...

34 #endif /* !CONFIG_PREEMPT_RT */

35

第26行,如果当前处理器上禁止软中断的计数值变成0,并且要释放本地锁,那么处理如下。

(1)第27行调用函数rcu_read_unlock()标记退出RCU读端临界区。

(2)第28行释放本地锁“softirq_ctrl.lock”。

8.解决优先级反转问题

什么是优先级反转(priority inversion)问题?

假设进程1的优先级低,进程2的优先级高。进程1持有互斥锁,进程2申请互斥锁,因为进程1已经占有互斥锁,所以进程2必须睡眠等待,导致优先级高的进程2等待优先级低的进程1。

如果存在进程3,优先级在进程1和进程2之间,那么情况更糟糕。假设进程1仍然持有互斥锁,进程2正在等待。进程3开始运行,因为它的优先级比进程1高,所以它可以抢占进程1,导致进程1持有互斥锁的时间延长,进程2等待的时间延长。

优先级继承(priority inheritance)可以解决优先级反转问题。如果低优先级的进程持有互斥锁,高优先级的进程申请互斥锁,那么把持有互斥锁的进程的优先级临时提升到申请互斥锁的进程的优先级。在上面的例子中,把进程1的优先级临时提升到进程2的优先级,防止进程3抢占进程1,使进程1尽快执行完临界区,减少进程2的等待时间。

实时互斥锁(rt_mutex)实现了优先级继承。锁的等待者按优先级从高到低排序,如果优先级相等,那么先申请锁的进程的优先级高。持有锁的进程,如果它的优先级比优先级最高的等待者低,那么把它的优先级临时提升到优先级最高的等待者的优先级,代码如下。如果普通进程1持有锁,实时进程2等待锁,那么把普通进程1的优先级临时提升到实时进程2的优先级,普通进程1变成实时进程。

rt_mutex_lock() -> __rt_mutex_lock() -> rt_mutex_lock_state()

-> __rt_mutex_lock_state() -> rt_mutex_fastlock() -> rt_mutex_slowlock()

-> rt_mutex_slowlock_locked() -> task_blocks_on_rt_mutex()

kernel/locking/rtmutex.c

1 static int task_blocks_on_rt_mutex(struct rt_mutex *lock,

2 struct rt_mutex_waiter *waiter,

3 struct task_struct *task,

4 enum rtmutex_chainwalk chwalk)

5 {

6 ...

7 if (waiter == rt_mutex_top_waiter(lock)) {

8 rt_mutex_dequeue_pi(owner, top_waiter);

9 rt_mutex_enqueue_pi(owner, waiter);

10

11 rt_mutex_adjust_prio(owner);

12 if (rt_mutex_real_waiter(owner->pi_blocked_on))

13 chain_walk = 1;

14 } else if (rt_mutex_cond_detect_deadlock(waiter, chwalk)) {

15 chain_walk = 1;

16 }

17

18 ...

19 }

20

21 static void rt_mutex_adjust_prio(struct task_struct *p)

22 {

23 ...

24 if (task_has_pi_waiters(p))

25 pi_task = task_top_pi_waiter(p)->task;

26

27 rt_mutex_setprio(p, pi_task);

28 }

29

30 kernel/sched/core.c

31 void rt_mutex_setprio(struct task_struct *p, struct task_struct *pi_task)

32 {

33 ...

34 prio = __rt_effective_prio(pi_task, p->normal_prio);

35 ...

36

37 prev_class = p->sched_class;

38 queued = task_on_rq_queued(p);

39 running = task_current(rq, p);

40 if (queued)

41 dequeue_task(rq, p, queue_flag);

42 if (running)

43 put_prev_task(rq, p);

44

45 if (dl_prio(prio)) {

46 if (!dl_prio(p->normal_prio) ||

47 (pi_task && dl_prio(pi_task->prio) &&

48 dl_entity_preempt(&pi_task->dl, &p->dl))) {

49 p->dl.pi_se = pi_task->dl.pi_se;

50 queue_flag |= ENQUEUE_REPLENISH;

51 } else {

52 p->dl.pi_se = &p->dl;

53 }

54 p->sched_class = &dl_sched_class;

55 } else if (rt_prio(prio)) {

56 if (dl_prio(oldprio))

57 p->dl.pi_se = &p->dl;

58 if (oldprio < prio)

59 queue_flag |= ENQUEUE_HEAD;

60 p->sched_class = &rt_sched_class;

61 } else {

62 if (dl_prio(oldprio))

63 p->dl.pi_se = &p->dl;

64 if (rt_prio(oldprio))

65 p->rt.timeout = 0;

66 p->sched_class = &fair_sched_class;

67 }

68

69 p->prio = prio;

70

71 if (queued)

72 enqueue_task(rq, p, queue_flag);

73 if (running)

74 set_next_task(rq, p);

75

76 check_class_changed(rq, p, prev_class, oldprio);

77 ...

78 }

79

80 static inline void check_class_changed(struct rq *rq, struct task_struct *p,

81 const struct sched_class *prev_class,

82 int oldprio)

83 {

84 if (prev_class != p->sched_class) {

85 if (prev_class->switched_from)

86 prev_class->switched_from(rq, p);

87

88 p->sched_class->switched_to(rq, p);

89 } else if (oldprio != p->prio || dl_task(p))

90 p->sched_class->prio_changed(rq, p, oldprio);

91 }

实时内核使用实时互斥锁实现互斥锁(mutex)和伤害/等待互斥锁(ww_mutex),支持优先级继承。互斥锁的定义如下,可以看到在实时内核中互斥锁等同于实时互斥锁。

include/linux/mutex.h

#ifdef CONFIG_PREEMPT_RT

# include

#else

...

#endif /* !PREEMPT_RT */

include/linux/mutex_rt.h

struct mutex {

struct rt_mutex lock;

#ifdef CONFIG_DEBUG_LOCK_ALLOC

struct lockdep_map dep_map;

#endif

};

实时内核使用实时互斥锁实现读写信号量(rw_semaphore),支持优先级继承。读写信号量的定义如下。

include/linux/rwsem.h

#ifdef CONFIG_PREEMPT_RT

#include

#else /* PREEMPT_RT */

...

#endif /* !PREEMPT_RT */

include/linux/rwsem-rt.h

struct rw_semaphore {

atomic_t readers;

struct rt_mutex rtmutex;

#ifdef CONFIG_DEBUG_LOCK_ALLOC

struct lockdep_map dep_map;

#endif

};

在实时内核中,自旋锁(spinlock_t)和读写锁(rwlock_t)是基于实时互斥锁实现的,临界区是可以抢占的,支持优先级继承。

9.对自旋锁的修改

自旋锁(spinlock_t)保护的临界区是不可抢占的,导致实时进程不能被及时调度。实时内核使用实时互斥锁实现自旋锁,临界区是可以抢占的,支持优先级继承,spin_lock_irq()和spin_lock_irqsave()不会禁止硬中断。自旋锁的定义如下。

include/linux/spinlock.h

#include

include/linux/spinlock_types.h

#ifndef CONFIG_PREEMPT_RT

# include

...

#else

...

# include

...

#endif

include/linux/spinlock_types_rt.h

typedef struct spinlock {

struct rt_mutex lock;

unsigned int break_lock;

#ifdef CONFIG_DEBUG_LOCK_ALLOC

struct lockdep_map dep_map;

#endif

} spinlock_t;

少数使用自旋锁保护的临界区不允许抢占,内核定义了原始自旋锁(raw_spinlock),提供传统的自旋锁。在非实时内核中,spinlock和raw_spinlock完全相同。

选择spinlock和raw_spinlock的时候,最好坚持3个原则。

(1)尽可能使用spinlock。

(2)绝对不允许被抢占和睡眠的地方,使用raw_spinlock,否则使用spinlock。

(3)如果临界区足够小,那么使用raw_spinlock。

10.对读写锁的修改

读写锁(rwlock_t)保护的临界区是不可抢占的,导致实时进程不能被及时调度。实时内核使用实时互斥锁实现读写锁,临界区是可以抢占的,支持优先级继承,read_lock_irq()、read_lock_irqsave()、write_lock_irq()和write_lock_irqsave()不会禁止硬中断。为了降低实现的复杂性,只允许一个进程获取读锁,进程可以递归获取读锁。

读写锁的定义如下。

include/linux/spinlock.h

#include

include/linux/spinlock_types.h

#ifndef CONFIG_PREEMPT_RT

...

# include

#else

...

# include

#endif

include/linux/rwlock_types_rt.h

typedef struct rt_rw_lock rwlock_t;

struct rt_rw_lock {

struct rt_mutex rtmutex;

atomic_t readers;

#ifdef CONFIG_DEBUG_LOCK_ALLOC

struct lockdep_map dep_map;

#endif

};

少数使用读写锁保护的临界区不允许抢占,内核定义了原始读写锁(raw_rwlock),提供传统的读写锁。在非实时内核中,rwlock和raw_rwlock完全相同。

选择rwlock和raw_rwlock的原则,与选择spinlock和raw_spinlock的原则相同。

11.修改使用禁止内核抢占或硬中断保护的临界区

对于使用禁止硬中断保护的临界区,因为在实时内核中使用内核线程执行大多数中断处理函数,所以大多数临界区不需要禁止硬中断。

对于使用禁止内核抢占保护的临界区,在实时内核中大多数临界区可以修改为可以抢占的。

为了在实时内核中把这两种临界区修改为可以抢占的,实时内核从3.0版本开始引入local_irq_lock,在合并到内核主线5.8版本的时候把名称改为local_lock(本地锁)。local_lock为使用禁止内核抢占或硬中断保护的临界区提供了命名的作用域。local_lock的定义如下。

include/linux/local_lock_internal.h

typedef struct {

#ifdef CONFIG_PREEMPT_RT

spinlock_t lock;

struct task_struct *owner;/* 持有本地锁的进程 */

int nestcnt;/* 嵌套层数 */

#elif defined(CONFIG_DEBUG_LOCK_ALLOC)

struct lockdep_map dep_map;

struct task_struct *owner;

#endif

} local_lock_t;

非实时内核把local_lock映射到禁止内核抢占和禁止硬中断,如下。

(1)local_lock(&llock)映射到preempt_disable()。

(2)local_unlock(&llock)映射到preempt_enable()。

(3)local_lock_irq(&llock)映射到local_irq_disable()。

(4)local_unlock_irq(&llock)映射到local_irq_enable()。

(5)local_lock_irqsave(&llock)映射到local_irq_save()。

(6)local_unlock_irqrestore(&llock)映射到local_irq_restore()。

实时内核把local_lock映射到一个每处理器自旋锁。函数local_lock()用来获取一个每处理器本地锁,代码如下。

include/linux/local_lock.h

1 #define local_lock(lock) __local_lock(lock)

2

3 include/linux/local_lock_internal.h

4 #ifdef CONFIG_PREEMPT_RT

5

6 #define __local_lock(lock)

7 do {

8 migrate_disable();

9 local_lock_acquire(this_cpu_ptr(lock));

10 } while (0)

11

12 ...

13 #else

14 ...

15 #endif

16

17 #ifdef CONFIG_PREEMPT_RT

18

19 static inline void local_lock_acquire(local_lock_t *l)

20 {

21 if (l->owner != current) {

22 spin_lock(&l->lock);

23 ...

24 l->owner = current;

25 }

26 l->nestcnt ;

27 }

28

29 ...

30 #elif defined(CONFIG_DEBUG_LOCK_ALLOC)

31 ...

32 #else /* CONFIG_DEBUG_LOCK_ALLOC */

33 ...

34 #endif /* !CONFIG_DEBUG_LOCK_ALLOC */

第8行,禁止迁移,也就是禁止把当前进程迁移到其他处理器。

第22行,申请自旋锁。

第26行,把嵌套层数的计数值加一。

函数local_unlock()用来释放一个每处理器本地锁,代码如下。

include/linux/local_lock.h

1 #define local_unlock(lock) __local_unlock(lock)

2

3 include/linux/local_lock_internal.h

4 #ifdef CONFIG_PREEMPT_RT

5

6 #define __local_unlock(lock)

7 do {

8 local_lock_release(this_cpu_ptr(lock));

9 migrate_enable();

10 } while (0)

11

12 ...

13 #else

14 ...

15 #endif

16

17 #ifdef CONFIG_PREEMPT_RT

18

19 static inline void local_lock_release(local_lock_t *l)

20 {

21 ...

22 if (--l->nestcnt)

23 return;

24

25 l->owner = NULL;

26 spin_unlock(&l->lock);

27 }

28 ...

29 #elif defined(CONFIG_DEBUG_LOCK_ALLOC)

30 ...

31 #else /* CONFIG_DEBUG_LOCK_ALLOC */

32 ...

33 #endif /* !CONFIG_DEBUG_LOCK_ALLOC */

第8行,调用函数local_lock_release()释放锁。函数local_lock_release()的处理如下。

(1)第22行,把嵌套层数的计数值减一,如果不是0,说明不是最外层的释放本地锁,那么直接返回。

(2)第26行,释放自旋锁。

第9行,开启迁移,也就是允许把当前进程迁移到其他处理器。

内核中禁止内核抢占或禁止硬中断的临界区比较多,需要判断是否可以使用local_lock替换,修改的工作量巨大,目前只有少数临界区使用local_lock,例如函数__free_pages_ok()的代码如下。

mm/page_alloc.c

1 struct pa_lock {

2 local_lock_t l;

3 };

4 static DEFINE_PER_CPU(struct pa_lock, pa_lock) = {

5 .l = INIT_LOCAL_LOCK(l),

6 };

7

8 static void __free_pages_ok(struct page *page, unsigned int order,

9 fpi_t fpi_flags)

10 {

11 unsigned long flags;

12 int migratetype;

13 unsigned long pfn = page_to_pfn(page);

14

15 if (!free_pages_prepare(page, order, true))

16 return;

17

18 migratetype = get_pfnblock_migratetype(page, pfn);

19 local_lock_irqsave(&pa_lock.l, flags);

20 __count_vm_events(PGFREE, 1 << order);

21 free_one_page(page_zone(page), page, pfn, order, migratetype,

22 fpi_flags);

23 local_unlock_irqrestore(&pa_lock.l, flags);

24 }

12.修改使用禁止软中断保护的临界区

在实时内核中,软中断由软中断线程执行,或者在进程开启软中断的时候执行,使用禁止软中断保护的临界区和软中断线程使用本地锁“softirq_ctrl.lock”互斥。本地锁“softirq_ctrl.lock”的定义如下。

kernel/softirq.c

1 struct softirq_ctrl {

2 local_lock_t lock;

3 int cnt;/* 禁止软中断的嵌套层数 */

4 };

5

6 static DEFINE_PER_CPU(struct softirq_ctrl, softirq_ctrl) = {

7 .lock = INIT_LOCAL_LOCK(softirq_ctrl.lock),

8 };

12.1.函数local_bh_disable()

函数local_bh_disable()用来禁止软中断,代码如下。

include/linux/bottom_half.h

1 static inline void local_bh_disable(void)

2 {

3 __local_bh_disable_ip(_THIS_IP_, SOFTIRQ_DISABLE_OFFSET);

4 }

5

6 kernel/softirq.c

7 #ifdef CONFIG_PREEMPT_RT

8 void __local_bh_disable_ip(unsigned long ip, unsigned int cnt)

9 {

10 unsigned long flags;

11 int newcnt;

12

13 ...

14 /* First entry of a task into a BH disabled section? */

15 if (!current->softirq_disable_cnt) {

16 if (preemptible()) {

17 local_lock(&softirq_ctrl.lock);

18 /* Required to meet the RCU bottomhalf requirements. */

19 rcu_read_lock();

20 } else {

21 DEBUG_LOCKS_WARN_ON(this_cpu_read(softirq_ctrl.cnt));

22 }

23 }

24

25 /*

26 * Track the per CPU softirq disabled state. On RT this is per CPU

27 * state to allow preemption of bottom half disabled sections.

28 */

29 newcnt = __this_cpu_add_return(softirq_ctrl.cnt, cnt);

30 /*

31 * Reflect the result in the task state to prevent recursion on the

32 * local lock and to make softirq_count() & al work.

33 */

34 current->softirq_disable_cnt = newcnt;

35 ...

36 }

37 #else /* CONFIG_PREEMPT_RT */

38 ...

39 #endif /* !CONFIG_PREEMPT_RT */

第15行,如果当前进程第1次禁止软中断,那么处理如下。

(1)第16行,如果开启了内核抢占,那么处理如下。

q第17行,使用本地锁“softirq_ctrl.lock”和当前处理器上的软中断线程互斥。

q第19行,调用函数rcu_read_lock()标记进入RCU读端临界区。这里调用函数rcu_read_lock(),是为了支持RCU-bh,RCU-bh的函数rcu_read_lock_bh()调用函数local_bh_disable(),rcu_read_lock_bh()等价于“rcu_read_lock() 禁止软中断”。

(2)第20~22行,如果禁止了内核抢占,并且当前处理器的softirq_ctrl.cnt不是0(说明其他进程禁止软中断以后被抢占或者软中断线程被抢占),那么这种用法是错误的,打印警告信息,应该在开启内核抢占的情况下禁止软中断。

第29行,把当前处理器上禁止软中断的计数值加上cnt,cnt的值是SOFTIRQ_DISABLE_OFFSET(它的值是SOFTIRQ_OFFSET的2倍)。

第34行,把当前进程的成员softirq_disable_cnt设置为当前处理器上禁止软中断的计数值。

12.2.函数local_bh_enable()

函数local_bh_enable()用来开启软中断,代码如下。

include/linux/bottom_half.h

1 static inline void local_bh_enable(void)

2 {

3 __local_bh_enable_ip(_THIS_IP_, SOFTIRQ_DISABLE_OFFSET);

4 }

5

6 kernel/softirq.c

7 #ifdef CONFIG_PREEMPT_RT

8 void __local_bh_enable_ip(unsigned long ip, unsigned int cnt)

9 {

10 bool preempt_on = preemptible();

11 unsigned long flags;

12 u32 pending;

13 int curcnt;

14

15 ...

16 local_irq_save(flags);

17 curcnt = this_cpu_read(softirq_ctrl.cnt);

18

19 /*

20 * If this is not reenabling soft interrupts, no point in trying to

21 * run pending ones.

22 */

23 if (curcnt != cnt)

24 goto out;

25

26 pending = local_softirq_pending();

27 if (!pending || ksoftirqd_running(pending))

28 goto out;

29

30 /*

31 * If this was called from non preemptible context, wake up the

32 * softirq daemon.

33 */

34 if (!preempt_on) {

35 wakeup_softirqd();

36 goto out;

37 }

38

39 /*

40 * Adjust softirq count to SOFTIRQ_OFFSET which makes

41 * in_serving_softirq() become true.

42 */

43 cnt = SOFTIRQ_OFFSET;

44 __local_bh_enable(cnt, false);

45 __do_softirq();

46

47 out:

48 __local_bh_enable(cnt, preempt_on);

49 local_irq_restore(flags);

50 }

51 #else /* CONFIG_PREEMPT_RT */

52 ...

53 #endif /* !CONFIG_PREEMPT_RT */

第23行和24行,如果不是最外层的开启软中断,那么跳转到out。

第26~45行,如果是最外层的开启软中断,那么处理如下。

(1)第27行和28行,如果没有需要处理的软中断或软中断线程正在运行,那么跳转到out。

(2)第34~37行,如果从不可抢占的上下文调用,那么唤醒软中断线程来处理软中断,然后跳转到out。

(3)第44行,调用函数__local_bh_enable(),把计数值减去SOFTIRQ_OFFSET(它的值是SOFTIRQ_DISABLE_OFFSET的一半),并且不释放本地锁。

(4)第45行,调用函数__do_softirq()执行软中断。

第48行,调用函数__local_bh_enable(),把计数值减去cnt,并且在开启内核抢占的情况下要释放本地锁。

函数__local_bh_enable()的代码如下。

kernel/softirq.c

1 #ifdef CONFIG_PREEMPT_RT

2 static void __local_bh_enable(unsigned int cnt, bool unlock)

3 {

4 unsigned long flags;

5 int newcnt;

6

7 ...

8 newcnt = __this_cpu_sub_return(softirq_ctrl.cnt, cnt);

9 current->softirq_disable_cnt = newcnt;

10

11 if (!newcnt && unlock) {

12 rcu_read_unlock();

13 local_unlock(&softirq_ctrl.lock);

14 }

15 }

16 #else /* CONFIG_PREEMPT_RT */

17 ...

18 #endif /* !CONFIG_PREEMPT_RT */

第8行,把当前处理器上禁止软中断的计数值减去cnt。

第9行,把当前进程的成员softirq_disable_cnt设置为当前处理器上禁止软中断的计数值。

第11行,如果当前处理器上禁止软中断的计数值变成0,并且要释放本地锁,那么处理如下。

(1)第12行,调用rcu_read_unlock()标记退出RCU读端临界区。这里调用函数rcu_read_unlock(),是为了支持RCU-bh,RCU-bh的函数rcu_read_unlock_bh()调用函数local_bh_enable(),rcu_read_unlock_bh()等价于“rcu_read_unlock() 开启软中断”。

(2)第13行,释放本地锁softirq_ctrl.lock。

12.3.软中断线程

软中断线程的线程函数是run_ksoftirqd(),代码如下。

kernel/softirq.c

1 static void run_ksoftirqd(unsigned int cpu)

2 {

3 ksoftirqd_run_begin();

4 if (local_softirq_pending()) {

5 __do_softirq();

6 ksoftirqd_run_end();

7 cond_resched();

8 return;

9 }

10 ksoftirqd_run_end();

11 }

12

13 #ifdef CONFIG_PREEMPT_RT

14 static inline void ksoftirqd_run_begin(void)

15 {

16 __local_bh_disable_ip(_RET_IP_, SOFTIRQ_OFFSET);

17 local_irq_disable();

18 }

19

20 static inline void ksoftirqd_run_end(void)

21 {

22 __local_bh_enable(SOFTIRQ_OFFSET, true);

23 WARN_ON_ONCE(in_interrupt());

24 local_irq_enable();

25 }

26 #else /* CONFIG_PREEMPT_RT */

27 ...

28 #endif /* !CONFIG_PREEMPT_RT */

可以看到,软中断线程调用函数__local_bh_disable_ip(),使用本地锁“softirq_ctrl.lock”和禁止软中断的临界区互斥。

13.对实时应用程序的要求

Linux内核对用户空间的内存(包括栈、代码段、数据段以及使用函数malloc()或mmap()动态分配的内存)使用惰性分配的策略,在内存不足的时候回收物理页,导致实时进程在访问页的时候触发页错误异常,影响实时性。

为了避免页错误异常造成的延迟,对实时应用程序的要求如下。

(1)在启动的时候创建所有线程,不要在运行的过程中动态创建线程。

(2)在启动的时候预留需要的内存,不要在运行的过程中使用函数malloc()或mmap()动态分配内存。

(3)主动把线程的用户栈扩大到最大的需求。

(4)在做完上面的事情以后,调用函数mlockall(MCL_CURRENT)锁定映射到进程的虚拟地址空间的所有页,确保所有已经分配的虚拟内存区域映射到物理页,并且不允许内核回收这些物理页。

(5)在运行的过程中不要调用可能引入或生成页错误异常的函数。例如调用fork()创建子进程,fork()使用写时复制技术,进程第1次写的时候触发页错误异常。例如调用fopen()打开文件,这个函数会动态分配内存,可能生成页错误异常。

14.参考文档

(1)A realtime preemption overview,https://lwn.net/Articles/146861/,(说明:Linux内核没有完全按照这篇文档实现)

(2)Real-Time Linux Wiki,https://rt.wiki.kernel.org/index.php/Main_Page

(3)Frequently Asked Questions,https://rt.wiki.kernel.org/index.php/Frequently_Asked_Questions

(4)实时Linux的相关文档,https://rt.wiki.kernel.org/index.php/Publications

(5)Attempted summary of "RT patch acceptance" thread, take 2,https://lwn.net/Articles/143323/

(6)HOWTO: Build an RT-application,https://rt.wiki.kernel.org/index.php/HOWTO:_Build_an_RT-application

(7)Real Time Linux,https://wiki.linuxfoundation.org/realtime/start

(8)PREEMPT_RT patch versions,https://wiki.linuxfoundation.org/realtime/preempt_rt_versions

(9)宋宝华:在实时操作系统里面随便怎么写代码都能硬实时吗?https://mp.weixin.qq.com/s/U9okzKGuhKlSZyeKmI4Q4g

(10)Linux硬实时和Preempt-RT补丁(中断、软中断、调度、内存与调试),https://mp.weixin.qq.com/s/iGeUIdOrq1PN30Q-MGXM3w

(11)Ingo Molnar 的实时补丁,https://www.ibm.com/developerworks/cn/linux/l-lrt/part2/

(12)Linux实时补丁即将合并进Linux 5.3,https://mp.weixin.qq.com/s/rYBIeYIiQVVgNjdpe6Pp3w

(13)每个Linux内核版本对应的实时补丁,https://mirrors.edge.kernel.org/pub/linux/kernel/projects/rt/

0 人点赞