【深入理解Linux内核锁】七、互斥体
尽管信号量已经可以实现互斥的功能,但是“正宗”的mutex
在Linux
内核中还是真实地存在着。尤其是在Linux
内核代码中,更多能看到mutex
的身影。
1、互斥体API
代码语言:javascript复制struct mutex my_mutex; // 定义互斥体
mutex_init(&my_mutex); // 初始化互斥体
/* 获取互斥体 */
void mutex_lock(struct mutex *lock);
int mutex_lock_interruptible(struct mutex *lock);
int mutex_trylock(struct mutex *lock);
void mutex_unlock(struct mutex *lock); // 释放互斥体
2、API实现
2.1 mutex
代码语言:javascript复制/*
* Simple, straightforward mutexes with strict semantics:
*
* - only one task can hold the mutex at a time
* - only the owner can unlock the mutex
* - multiple unlocks are not permitted
* - recursive locking is not permitted
* - a mutex object must be initialized via the API
* - a mutex object must not be initialized via memset or copying
* - task may not exit with mutex held
* - memory areas where held locks reside must not be freed
* - held mutexes must not be reinitialized
* - mutexes may not be used in hardware or software interrupt
* contexts such as tasklets and timers
*
* These semantics are fully enforced when DEBUG_MUTEXES is
* enabled. Furthermore, besides enforcing the above rules, the mutex
* debugging code also implements a number of additional features
* that make lock debugging easier and faster:
*
* - uses symbolic names of mutexes, whenever they are printed in debug output
* - point-of-acquire tracking, symbolic lookup of function names
* - list of all locks held in the system, printout of them
* - owner tracking
* - detects self-recursing locks and prints out all relevant info
* - detects multi-task circular deadlocks and prints out all affected
* locks and tasks (and only those tasks)
*/
struct mutex {
atomic_long_t owner;
spinlock_t wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
struct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
结构体名称:mutex
文件位置:include/linux/mutex.h
主要作用:互斥锁结构体,用于定义一个互斥锁
atomic_long_t owner
:原子变量,表示互斥锁当前的持有者,可以安全地被多个线程同时访问,而不会导致数据破坏。spinlock_t wait_lock
:一个自旋锁,用于保护涉及wait_list
的临界区struct list_head wait_list
:一个链表头,用于维护等待互斥锁释放的线程列表。
2.2 mutex_init
代码语言:javascript复制/**
* mutex_init - initialize the mutex
* @mutex: the mutex to be initialized
*
* Initialize the mutex to unlocked state.
*
* It is not allowed to initialize an already locked mutex.
*/
#define mutex_init(mutex)
do {
static struct lock_class_key __key;
__mutex_init((mutex), #mutex, &__key);
} while (0)
void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
atomic_long_set(&lock->owner, 0);
spin_lock_init(&lock->wait_lock);
INIT_LIST_HEAD(&lock->wait_list);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
osq_lock_init(&lock->osq);
#endif
debug_mutex_init(lock, name, key);
}
EXPORT_SYMBOL(__mutex_init);
结构体名称:mutex_init
文件位置:include/linux/mutex.h
主要作用:初始化互斥体
相关实现:
mutex_init
调用__mutex_init
接口,实现互斥锁的初始化atomic_long_set
:初始化原子变量,设置值为0spin_lock_init
:初始化自旋锁,设置其为UNLOCK
INIT_LIST_HEAD
:初始化链表debug_mutex_init
:初始化调试相关信息
2.3 mutex_lock
代码语言:javascript复制/**
* mutex_lock - acquire the mutex
* @lock: the mutex to be acquired
*
* Lock the mutex exclusively for this task. If the mutex is not
* available right now, it will sleep until it can get it.
*
* The mutex must later on be released by the same task that
* acquired it. Recursive locking is not allowed. The task
* may not exit without first unlocking the mutex. Also, kernel
* memory where the mutex resides must not be freed with
* the mutex still locked. The mutex must first be initialized
* (or statically defined) before it can be locked. memset()-ing
* the mutex to 0 is not allowed.
*
* (The CONFIG_DEBUG_MUTEXES .config option turns on debugging
* checks that will enforce the restrictions and will also do
* deadlock debugging)
*
* This function is similar to (but not equivalent to) down().
*/
void __sched mutex_lock(struct mutex *lock)
{
might_sleep();
if (!__mutex_trylock_fast(lock))
__mutex_lock_slowpath(lock);
}
EXPORT_SYMBOL(mutex_lock);
#endif
# define might_sleep() do { might_resched(); } while (0)
/*
* Optimistic trylock that only works in the uncontended case. Make sure to
* follow with a __mutex_trylock() before failing.
*/
static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
unsigned long curr = (unsigned long)current;
unsigned long zero = 0UL;
if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))
return true;
return false;
}
static noinline void __sched
__mutex_lock_slowpath(struct mutex *lock)
{
__mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);
}
static int __sched
__mutex_lock(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip)
{
return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
}
/*
* Lock a mutex (possibly interruptible), slowpath:
*/
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct mutex_waiter waiter;
bool first = false;
struct ww_mutex *ww;
int ret;
might_sleep();
ww = container_of(lock, struct ww_mutex, base);
if (use_ww_ctx && ww_ctx) {
if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
return -EALREADY;
/*
* Reset the wounded flag after a kill. No other process can
* race and wound us here since they can't have a valid owner
* pointer if we don't have any locks held.
*/
if (ww_ctx->acquired == 0)
ww_ctx->wounded = 0;
}
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
if (__mutex_trylock(lock) ||
mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
/* got the lock, yay! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_set_context_fastpath(ww, ww_ctx);
preempt_enable();
return 0;
}
spin_lock(&lock->wait_lock);
/*
* After waiting to acquire the wait_lock, try again.
*/
if (__mutex_trylock(lock)) {
if (use_ww_ctx && ww_ctx)
__ww_mutex_check_waiters(lock, ww_ctx);
goto skip_wait;
}
debug_mutex_lock_common(lock, &waiter);
lock_contended(&lock->dep_map, ip);
if (!use_ww_ctx) {
/* add waiting tasks to the end of the waitqueue (FIFO): */
__mutex_add_waiter(lock, &waiter, &lock->wait_list);
#ifdef CONFIG_DEBUG_MUTEXES
waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
} else {
/*
* Add in stamp order, waking up waiters that must kill
* themselves.
*/
ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
if (ret)
goto err_early_kill;
waiter.ww_ctx = ww_ctx;
}
waiter.task = current;
set_current_state(state);
for (;;) {
/*
* Once we hold wait_lock, we're serialized against
* mutex_unlock() handing the lock off to us, do a trylock
* before testing the error conditions to make sure we pick up
* the handoff.
*/
if (__mutex_trylock(lock))
goto acquired;
/*
* Check for signals and kill conditions while holding
* wait_lock. This ensures the lock cancellation is ordered
* against mutex_unlock() and wake-ups do not go missing.
*/
if (unlikely(signal_pending_state(state, current))) {
ret = -EINTR;
goto err;
}
if (use_ww_ctx && ww_ctx) {
ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
if (ret)
goto err;
}
spin_unlock(&lock->wait_lock);
schedule_preempt_disabled();
/*
* ww_mutex needs to always recheck its position since its waiter
* list is not FIFO ordered.
*/
if ((use_ww_ctx && ww_ctx) || !first) {
first = __mutex_waiter_is_first(lock, &waiter);
if (first)
__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
}
set_current_state(state);
/*
* Here we order against unlock; we must either see it change
* state back to RUNNING and fall through the next schedule(),
* or we must see its unlock and acquire.
*/
if (__mutex_trylock(lock) ||
(first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
break;
spin_lock(&lock->wait_lock);
}
spin_lock(&lock->wait_lock);
acquired:
__set_current_state(TASK_RUNNING);
if (use_ww_ctx && ww_ctx) {
/*
* Wound-Wait; we stole the lock (!first_waiter), check the
* waiters as anyone might want to wound us.
*/
if (!ww_ctx->is_wait_die &&
!__mutex_waiter_is_first(lock, &waiter))
__ww_mutex_check_waiters(lock, ww_ctx);
}
mutex_remove_waiter(lock, &waiter, current);
if (likely(list_empty(&lock->wait_list)))
__mutex_clear_flag(lock, MUTEX_FLAGS);
debug_mutex_free_waiter(&waiter);
skip_wait:
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_lock_acquired(ww, ww_ctx);
spin_unlock(&lock->wait_lock);
preempt_enable();
return 0;
err:
__set_current_state(TASK_RUNNING);
mutex_remove_waiter(lock, &waiter, current);
err_early_kill:
spin_unlock(&lock->wait_lock);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, 1, ip);
preempt_enable();
return ret;
}
结构体名称:mutex_lock
文件位置:kernel/locking/mutex.c
主要作用:用于获取一个互斥锁
调用流程:
代码语言:javascript复制mutex_lock(kernel/locking/mutex.c)
|--> might_sleep
|--> __mutex_trylock_fast
|--> atomic_long_try_cmpxchg_acquire // 尝试获取互斥锁
|--> __mutex_lock_slowpath
|--> __mutex_lock
|--> __mutex_lock_common // 获取锁并等待
相关实现:
might_sleep
:用于标记可能在非原子上下文(允许睡眠)中执行的操作,以确保不会在不允许睡眠的上下文中调用这段代码。
__mutex_trylock_fast
:尝试获取互斥锁(mutex),如果获取成功则范围true
,获取失败,返回false
获取失败后,调用__mutex_lock_slowpath
接口,最终调用__mutex_lock_common
接口,该接口才是重头戏
__mutex_lock_common
接口主要有几个作用:死锁避免策略、
下面我们看第一部分:死锁避免策略
代码语言:javascript复制 struct ww_mutex *ww;
int ret;
might_sleep();
ww = container_of(lock, struct ww_mutex, base);
if (use_ww_ctx && ww_ctx) {
if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
return -EALREADY;
/*
* Reset the wounded flag after a kill. No other process can
* race and wound us here since they can't have a valid owner
* pointer if we don't have any locks held.
*/
if (ww_ctx->acquired == 0)
ww_ctx->wounded = 0;
}
- 这段代码片段是针对带有死锁避免机制
(Wound-Wait,WW)
的互斥锁的操作,是一种多线程环境中避免死锁的策略。让我们逐行分析这段代码的作用 struct ww_mutex *ww
:声明了一个指向struct ww_mutex
类型的指针ww
,这是一种带有死锁避免机制的互斥锁的数据结构。might_sleep
:用于标记可能在非原子上下文(允许睡眠)中执行的操作,以确保不会在不允许睡眠的上下文中调用这段代码。ww = container_of(lock, struct ww_mutex, base)
:使用container_of
宏将传递进来的lock
指针转换为struct ww_mutex
结构体的指针。这意味着lock
指向的是struct ww_mutex
结构体中的base
成员。这样可以将更高级别的数据结构与较低级别的数据结构关联起来。if (use_ww_ctx && ww_ctx)
:如果use_ww_ctx
为真且ww_ctx
不为空(即使用死锁避免上下文并且上下文存在)。if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
:检查是否已经在同一个上下文中尝试获取锁。如果是,则返回错误码-EALREADY
,表示已经在当前上下文中获取了锁。(避免死锁)if (ww_ctx->acquired == 0)
:检查是否当前上下文已经没有持有任何锁。如果是,则将ww_ctx
的wounded
标志重置为0。
下面我们看第二部分:获取锁阶段
代码语言:javascript复制 preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
if (__mutex_trylock(lock) ||
mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
/* got the lock, yay! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_set_context_fastpath(ww, ww_ctx);
preempt_enable();
return 0;
}
preempt_disable
:禁用抢占,确保在获取锁期间不会被抢占。- 通过调用
__mutex_trylock
和mutex_optimistic_spin
多次尝试获取锁,如果获取锁成功,说明没有其他进程占用,直接返回,否则接着往下执行 ww_mutex_set_context_fastpath(ww, ww_ctx)
:如果使用了"wait-wound"上下文,这个函数会将上下文设置为快速路径,以便在解锁时进行优化。preempt_enable()
:重新启用抢占,允许系统在后续操作中进行抢占。
下面我们看第三部分:将等待进程置入睡眠状态
代码语言:javascript复制 spin_lock(&lock->wait_lock);
/*
* After waiting to acquire the wait_lock, try again.
*/
if (__mutex_trylock(lock)) {
if (use_ww_ctx && ww_ctx)
__ww_mutex_check_waiters(lock, ww_ctx);
goto skip_wait;
}
debug_mutex_lock_common(lock, &waiter);
lock_contended(&lock->dep_map, ip);
if (!use_ww_ctx) {
/* add waiting tasks to the end of the waitqueue (FIFO): */
__mutex_add_waiter(lock, &waiter, &lock->wait_list);
#ifdef CONFIG_DEBUG_MUTEXES
waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
} else {
/*
* Add in stamp order, waking up waiters that must kill
* themselves.
*/
ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
if (ret)
goto err_early_kill;
waiter.ww_ctx = ww_ctx;
}
waiter.task = current;
set_current_state(state);
- 如果不使用"wait-wound"上下文,会将等待任务添加到等待队列的末尾(FIFO顺序)。
- 如果使用"wait-wound"上下文,会按照时间戳的顺序将等待任务添加到等待队列,并可能唤醒需要终止的等待者。
spin_lock(&lock->wait_lock)
:获取wait_lock
自旋锁,这是为了在等待期间操作等待队列时防止竞争。if (__mutex_trylock(lock))
:再次尝试获取锁,如果成功获取,说明此时没有竞争,会跳到acquired
标签。- 根据是否使用"wait-wound"上下文进行不同的等待队列操作:
set_current_state(state)
:设置等待任务的上下文和当前状态
下面我们看第四部分:等待状态处理
代码语言:javascript复制 for (;;) {
/*
* Once we hold wait_lock, we're serialized against
* mutex_unlock() handing the lock off to us, do a trylock
* before testing the error conditions to make sure we pick up
* the handoff.
*/
if (__mutex_trylock(lock))
goto acquired;
/*
* Check for signals and kill conditions while holding
* wait_lock. This ensures the lock cancellation is ordered
* against mutex_unlock() and wake-ups do not go missing.
*/
if (unlikely(signal_pending_state(state, current))) {
ret = -EINTR;
goto err;
}
if (use_ww_ctx && ww_ctx) {
ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
if (ret)
goto err;
}
spin_unlock(&lock->wait_lock);
schedule_preempt_disabled();
/*
* ww_mutex needs to always recheck its position since its waiter
* list is not FIFO ordered.
*/
if ((use_ww_ctx && ww_ctx) || !first) {
first = __mutex_waiter_is_first(lock, &waiter);
if (first)
__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
}
set_current_state(state);
/*
* Here we order against unlock; we must either see it change
* state back to RUNNING and fall through the next schedule(),
* or we must see its unlock and acquire.
*/
if (__mutex_trylock(lock) ||
(first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
break;
spin_lock(&lock->wait_lock);
}
spin_lock(&lock->wait_lock);
acquired:
__set_current_state(TASK_RUNNING);
if (use_ww_ctx && ww_ctx) {
/*
* Wound-Wait; we stole the lock (!first_waiter), check the
* waiters as anyone might want to wound us.
*/
if (!ww_ctx->is_wait_die &&
!__mutex_waiter_is_first(lock, &waiter))
__ww_mutex_check_waiters(lock, ww_ctx);
}
mutex_remove_waiter(lock, &waiter, current);
if (likely(list_empty(&lock->wait_list)))
__mutex_clear_flag(lock, MUTEX_FLAGS);
debug_mutex_free_waiter(&waiter);
skip_wait:
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx && ww_ctx)
ww_mutex_lock_acquired(ww, ww_ctx);
spin_unlock(&lock->wait_lock);
preempt_enable();
return 0;
err:
__set_current_state(TASK_RUNNING);
mutex_remove_waiter(lock, &waiter, current);
err_early_kill:
spin_unlock(&lock->wait_lock);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, 1, ip);
preempt_enable();
return ret;
这部分代码是在等待期间的处理:
如果获取到锁,跳转到
acquired
标签。
__set_current_state(TASK_RUNNING)
:设置当前线程的状态为TASK_RUNNING
。- 如果使用"wait-wound"上下文,检查是否需要继续检查等待者。
mutex_remove_waiter
:从等待队列中移除等待任务。list_empty
:判断是否等待队列为空,如果是,则清除锁的标志。- 实时调用
__mutex_trylock
判断是否获取到锁,如果成功获取,跳转到acquired
标签。 signal_pending_state
:检查是否有信号到达,如果有,返回错误码-EINTR
。- 如果使用"wait-wound"上下文,检查是否需要终止等待者。
spin_unlock(&lock->wait_lock)
:解锁wait_lock
,禁用抢占并进入调度等待状态。schedule_preempt_disabled
:进入调度等待状态set_current_state
:设置当前线程的状态为等待状态。__mutex_trylock
和mutex_optimistic_spin
:尝试获取锁,或者如果是第一个等待任务且可以进行自旋优化,则尝试自旋等待。spin_lock(&lock->wait_lock)
:获取锁,继续自旋等待
这段代码是内核中用于互斥锁获取的一个复杂实现,其中包含了自旋、等待队列、信号处理和死锁避免等多个关键概念和操作,它旨在在高并发的多线程环境中提供高性能的互斥锁实现。
2.4 mutex_unlock
代码语言:javascript复制/**
* mutex_unlock - release the mutex
* @lock: the mutex to be released
*
* Unlock a mutex that has been locked by this task previously.
*
* This function must not be used in interrupt context. Unlocking
* of a not locked mutex is not allowed.
*
* This function is similar to (but not equivalent to) up().
*/
void __sched mutex_unlock(struct mutex *lock)
{
#ifndef CONFIG_DEBUG_LOCK_ALLOC
if (__mutex_unlock_fast(lock))
return;
#endif
__mutex_unlock_slowpath(lock, _RET_IP_);
}
EXPORT_SYMBOL(mutex_unlock);
/*
* Release the lock, slowpath:
*/
static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip)
{
struct task_struct *next = NULL;
DEFINE_WAKE_Q(wake_q);
unsigned long owner;
mutex_release(&lock->dep_map, 1, ip);
/*
* Release the lock before (potentially) taking the spinlock such that
* other contenders can get on with things ASAP.
*
* Except when HANDOFF, in that case we must not clear the owner field,
* but instead set it to the top waiter.
*/
owner = atomic_long_read(&lock->owner);
for (;;) {
unsigned long old;
#ifdef CONFIG_DEBUG_MUTEXES
DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);
#endif
if (owner & MUTEX_FLAG_HANDOFF)
break;
old = atomic_long_cmpxchg_release(&lock->owner, owner,
__owner_flags(owner));
if (old == owner) {
if (owner & MUTEX_FLAG_WAITERS)
break;
return;
}
owner = old;
}
spin_lock(&lock->wait_lock);
debug_mutex_unlock(lock);
if (!list_empty(&lock->wait_list)) {
/* get the first entry from the wait-list: */
struct mutex_waiter *waiter =
list_first_entry(&lock->wait_list,
struct mutex_waiter, list);
next = waiter->task;
debug_mutex_wake_waiter(lock, waiter);
wake_q_add(&wake_q, next);
}
if (owner & MUTEX_FLAG_HANDOFF)
__mutex_handoff(lock, next);
spin_unlock(&lock->wait_lock);
wake_up_q(&wake_q);
}
结构体名称:mutex_unlock
文件位置:kernel/locking/mutex.c
主要作用:用于释放一个互斥锁,并唤醒等待这个锁的任务
调用流程:
代码语言:javascript复制mutex_unlock(kernel/locking/mutex.c)
|--> __mutex_unlock_slowpath
|--> DEFINE_WAKE_Q // 定义唤醒队列
|--> atomic_long_read // 获取原子变量,即获取当前锁的持有者
|--> atomic_long_cmpxchg_release // 判断ock->owner和owner是否相等,如果有,则跳出循环,没有则直接返回
|--> spin_lock // 处理等待队列
|--> list_empty // 判断等待队列是否为空
|--> list_first_entry // 获取等待队列的第一个任务
|--> wake_q_add // 加入到唤醒队列
|--> spin_unlock
实现流程:
mutex_unlock
主要有两个步骤,其一是释放互斥锁,其二是唤醒等待队列的任务,核心实现在__mutex_unlock_slowpath
__mutex_unlock_slowpath
中主要负责实现这两个工作,第一个释放锁部分如下:
/*
* Release the lock before (potentially) taking the spinlock such that
* other contenders can get on with things ASAP.
*
* Except when HANDOFF, in that case we must not clear the owner field,
* but instead set it to the top waiter.
*/
owner = atomic_long_read(&lock->owner);
for (;;) {
unsigned long old;
#ifdef CONFIG_DEBUG_MUTEXES
DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);
#endif
if (owner & MUTEX_FLAG_HANDOFF)
break;
old = atomic_long_cmpxchg_release(&lock->owner, owner,
__owner_flags(owner));
if (old == owner) {
if (owner & MUTEX_FLAG_WAITERS)
break;
return;
}
owner = old;
}
- 调用
atomic_long_read
来获取当前锁的拥有者 - 然后调用
atomic_long_cmpxchg_release
来比较当前锁的拥有者是否与之前获取的相同,以此来判断是否有其他线程操作了锁 old == owner
:如果相同,则表示没有其他线程获取锁;如果有,则将owner = old
旧的锁的持有者赋值给当前继续判断。- 进而判断是否有等待者
if (owner & MUTEX_FLAG_WAITERS)
,如果有,就退出循环,执行下面的唤醒等待者的任务,如果没有就直接return
__mutex_unlock_slowpath
中第二个唤醒等待线程部分如下:
spin_lock(&lock->wait_lock);
debug_mutex_unlock(lock);
if (!list_empty(&lock->wait_list)) {
/* get the first entry from the wait-list: */
struct mutex_waiter *waiter =
list_first_entry(&lock->wait_list,
struct mutex_waiter, list);
next = waiter->task;
debug_mutex_wake_waiter(lock, waiter);
wake_q_add(&wake_q, next);
}
if (owner & MUTEX_FLAG_HANDOFF)
__mutex_handoff(lock, next);
spin_unlock(&lock->wait_lock);
wake_up_q(&wake_q);
- 开始调用
spin_lock
和spin_unlock
来避免线程的并发,保护临界资源 if (!list_empty(&lock->wait_list))
:判断等待队列是否为空list_first_entry
:获取等待队列的第一个任务wake_q_add
:将该任务加入到唤醒队列中if (owner & MUTEX_FLAG_HANDOFF)
:判断当前锁是否被标记为HANDOFF
。如果是,它会调用__mutex_handoff
函数,将锁的所有权交给等待者。wake_up_q
:唤醒等待队列
互斥锁的操作确实比较复杂,其底层牵涉到了原子操作,自旋锁,唤醒队列等操作
3、自旋锁与互斥体区别
实现上的区别:
- 自旋锁:当锁不能获取到时,一直在原地自旋等待,直到锁变为可用,自旋锁等待期间,不会让线程处于睡眠状态,而是一直忙等。
- 互斥锁:当锁不能获取到时,将该线程直接置入睡眠状态,直到互斥体可用被唤醒。
时间开销:
- 自旋锁:在等待锁的过程中,自旋锁所浪费的时间为等待获取锁的时间 执行临界区的时间
- 互斥锁:其时间开销为等待锁的时间 进程上下文切换的时间 执行临界区的时间
总结:如果临界区很小,锁的持有时间非常短,那么使用自旋锁可能更为高效。反之,如果临界区大,锁的持有时间较长,或者涉及到I/O操作等可能导致线程睡眠的操作,那么使用互斥体可能更为合适。
嵌入式艺术