手握源码,深入分析Linux互斥体

2023-10-25 17:08:06 浏览数 (2)

【深入理解Linux内核锁】七、互斥体

尽管信号量已经可以实现互斥的功能,但是“正宗”的mutexLinux内核中还是真实地存在着。尤其是在Linux内核代码中,更多能看到mutex的身影。

1、互斥体API

代码语言:javascript复制
struct mutex my_mutex;  // 定义互斥体
mutex_init(&my_mutex);  // 初始化互斥体

/* 获取互斥体 */
void mutex_lock(struct mutex *lock);
int mutex_lock_interruptible(struct mutex *lock);
int mutex_trylock(struct mutex *lock);

void mutex_unlock(struct mutex *lock); // 释放互斥体

2、API实现

2.1 mutex

代码语言:javascript复制
/*
 * Simple, straightforward mutexes with strict semantics:
 *
 * - only one task can hold the mutex at a time
 * - only the owner can unlock the mutex
 * - multiple unlocks are not permitted
 * - recursive locking is not permitted
 * - a mutex object must be initialized via the API
 * - a mutex object must not be initialized via memset or copying
 * - task may not exit with mutex held
 * - memory areas where held locks reside must not be freed
 * - held mutexes must not be reinitialized
 * - mutexes may not be used in hardware or software interrupt
 *   contexts such as tasklets and timers
 *
 * These semantics are fully enforced when DEBUG_MUTEXES is
 * enabled. Furthermore, besides enforcing the above rules, the mutex
 * debugging code also implements a number of additional features
 * that make lock debugging easier and faster:
 *
 * - uses symbolic names of mutexes, whenever they are printed in debug output
 * - point-of-acquire tracking, symbolic lookup of function names
 * - list of all locks held in the system, printout of them
 * - owner tracking
 * - detects self-recursing locks and prints out all relevant info
 * - detects multi-task circular deadlocks and prints out all affected
 *   locks and tasks (and only those tasks)
 */
struct mutex {
 atomic_long_t  owner;
 spinlock_t  wait_lock;
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
 struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
 struct list_head wait_list;
#ifdef CONFIG_DEBUG_MUTEXES
 void   *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
 struct lockdep_map dep_map;
#endif
};

结构体名称mutex

文件位置include/linux/mutex.h

主要作用:互斥锁结构体,用于定义一个互斥锁

  • atomic_long_t owner:原子变量,表示互斥锁当前的持有者,可以安全地被多个线程同时访问,而不会导致数据破坏。
  • spinlock_t wait_lock:一个自旋锁,用于保护涉及 wait_list 的临界区
  • struct list_head wait_list:一个链表头,用于维护等待互斥锁释放的线程列表。

2.2 mutex_init

代码语言:javascript复制
/**
 * mutex_init - initialize the mutex
 * @mutex: the mutex to be initialized
 *
 * Initialize the mutex to unlocked state.
 *
 * It is not allowed to initialize an already locked mutex.
 */
#define mutex_init(mutex)      
do {         
 static struct lock_class_key __key;    
         
 __mutex_init((mutex), #mutex, &__key);    
} while (0)

void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
 atomic_long_set(&lock->owner, 0);
 spin_lock_init(&lock->wait_lock);
 INIT_LIST_HEAD(&lock->wait_list);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
 osq_lock_init(&lock->osq);
#endif

 debug_mutex_init(lock, name, key);
}
EXPORT_SYMBOL(__mutex_init);

结构体名称mutex_init

文件位置include/linux/mutex.h

主要作用:初始化互斥体

相关实现

  1. mutex_init调用__mutex_init接口,实现互斥锁的初始化
  2. atomic_long_set:初始化原子变量,设置值为0
  3. spin_lock_init:初始化自旋锁,设置其为UNLOCK
  4. INIT_LIST_HEAD:初始化链表
  5. debug_mutex_init:初始化调试相关信息

2.3 mutex_lock

代码语言:javascript复制
/**
 * mutex_lock - acquire the mutex
 * @lock: the mutex to be acquired
 *
 * Lock the mutex exclusively for this task. If the mutex is not
 * available right now, it will sleep until it can get it.
 *
 * The mutex must later on be released by the same task that
 * acquired it. Recursive locking is not allowed. The task
 * may not exit without first unlocking the mutex. Also, kernel
 * memory where the mutex resides must not be freed with
 * the mutex still locked. The mutex must first be initialized
 * (or statically defined) before it can be locked. memset()-ing
 * the mutex to 0 is not allowed.
 *
 * (The CONFIG_DEBUG_MUTEXES .config option turns on debugging
 * checks that will enforce the restrictions and will also do
 * deadlock debugging)
 *
 * This function is similar to (but not equivalent to) down().
 */
void __sched mutex_lock(struct mutex *lock)
{
 might_sleep();

 if (!__mutex_trylock_fast(lock))
  __mutex_lock_slowpath(lock);
}
EXPORT_SYMBOL(mutex_lock);
#endif

# define might_sleep() do { might_resched(); } while (0)

/*
 * Optimistic trylock that only works in the uncontended case. Make sure to
 * follow with a __mutex_trylock() before failing.
 */
static __always_inline bool __mutex_trylock_fast(struct mutex *lock)
{
 unsigned long curr = (unsigned long)current;
 unsigned long zero = 0UL;

 if (atomic_long_try_cmpxchg_acquire(&lock->owner, &zero, curr))
  return true;

 return false;
}


static noinline void __sched
__mutex_lock_slowpath(struct mutex *lock)
{
 __mutex_lock(lock, TASK_UNINTERRUPTIBLE, 0, NULL, _RET_IP_);
}


static int __sched
__mutex_lock(struct mutex *lock, long state, unsigned int subclass,
      struct lockdep_map *nest_lock, unsigned long ip)
{
 return __mutex_lock_common(lock, state, subclass, nest_lock, ip, NULL, false);
}


/*
 * Lock a mutex (possibly interruptible), slowpath:
 */
static __always_inline int __sched
__mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
      struct lockdep_map *nest_lock, unsigned long ip,
      struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
 struct mutex_waiter waiter;
 bool first = false;
 struct ww_mutex *ww;
 int ret;

 might_sleep();

 ww = container_of(lock, struct ww_mutex, base);
 if (use_ww_ctx && ww_ctx) {
  if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
   return -EALREADY;

  /*
   * Reset the wounded flag after a kill. No other process can
   * race and wound us here since they can't have a valid owner
   * pointer if we don't have any locks held.
   */
  if (ww_ctx->acquired == 0)
   ww_ctx->wounded = 0;
 }

 preempt_disable();
 mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);

 if (__mutex_trylock(lock) ||
     mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
  /* got the lock, yay! */
  lock_acquired(&lock->dep_map, ip);
  if (use_ww_ctx && ww_ctx)
   ww_mutex_set_context_fastpath(ww, ww_ctx);
  preempt_enable();
  return 0;
 }

 spin_lock(&lock->wait_lock);
 /*
  * After waiting to acquire the wait_lock, try again.
  */
 if (__mutex_trylock(lock)) {
  if (use_ww_ctx && ww_ctx)
   __ww_mutex_check_waiters(lock, ww_ctx);

  goto skip_wait;
 }

 debug_mutex_lock_common(lock, &waiter);

 lock_contended(&lock->dep_map, ip);

 if (!use_ww_ctx) {
  /* add waiting tasks to the end of the waitqueue (FIFO): */
  __mutex_add_waiter(lock, &waiter, &lock->wait_list);


#ifdef CONFIG_DEBUG_MUTEXES
  waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
 } else {
  /*
   * Add in stamp order, waking up waiters that must kill
   * themselves.
   */
  ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
  if (ret)
   goto err_early_kill;

  waiter.ww_ctx = ww_ctx;
 }

 waiter.task = current;

 set_current_state(state);
 for (;;) {
  /*
   * Once we hold wait_lock, we're serialized against
   * mutex_unlock() handing the lock off to us, do a trylock
   * before testing the error conditions to make sure we pick up
   * the handoff.
   */
  if (__mutex_trylock(lock))
   goto acquired;

  /*
   * Check for signals and kill conditions while holding
   * wait_lock. This ensures the lock cancellation is ordered
   * against mutex_unlock() and wake-ups do not go missing.
   */
  if (unlikely(signal_pending_state(state, current))) {
   ret = -EINTR;
   goto err;
  }

  if (use_ww_ctx && ww_ctx) {
   ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
   if (ret)
    goto err;
  }

  spin_unlock(&lock->wait_lock);
  schedule_preempt_disabled();

  /*
   * ww_mutex needs to always recheck its position since its waiter
   * list is not FIFO ordered.
   */
  if ((use_ww_ctx && ww_ctx) || !first) {
   first = __mutex_waiter_is_first(lock, &waiter);
   if (first)
    __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
  }

  set_current_state(state);
  /*
   * Here we order against unlock; we must either see it change
   * state back to RUNNING and fall through the next schedule(),
   * or we must see its unlock and acquire.
   */
  if (__mutex_trylock(lock) ||
      (first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
   break;

  spin_lock(&lock->wait_lock);
 }
 spin_lock(&lock->wait_lock);
acquired:
 __set_current_state(TASK_RUNNING);

 if (use_ww_ctx && ww_ctx) {
  /*
   * Wound-Wait; we stole the lock (!first_waiter), check the
   * waiters as anyone might want to wound us.
   */
  if (!ww_ctx->is_wait_die &&
      !__mutex_waiter_is_first(lock, &waiter))
   __ww_mutex_check_waiters(lock, ww_ctx);
 }

 mutex_remove_waiter(lock, &waiter, current);
 if (likely(list_empty(&lock->wait_list)))
  __mutex_clear_flag(lock, MUTEX_FLAGS);

 debug_mutex_free_waiter(&waiter);

skip_wait:
 /* got the lock - cleanup and rejoice! */
 lock_acquired(&lock->dep_map, ip);

 if (use_ww_ctx && ww_ctx)
  ww_mutex_lock_acquired(ww, ww_ctx);

 spin_unlock(&lock->wait_lock);
 preempt_enable();
 return 0;

err:
 __set_current_state(TASK_RUNNING);
 mutex_remove_waiter(lock, &waiter, current);
err_early_kill:
 spin_unlock(&lock->wait_lock);
 debug_mutex_free_waiter(&waiter);
 mutex_release(&lock->dep_map, 1, ip);
 preempt_enable();
 return ret;
}

结构体名称mutex_lock

文件位置kernel/locking/mutex.c

主要作用:用于获取一个互斥锁

调用流程

代码语言:javascript复制
mutex_lock(kernel/locking/mutex.c)
    |--> might_sleep
    |--> __mutex_trylock_fast
        |--> atomic_long_try_cmpxchg_acquire        //  尝试获取互斥锁
    |--> __mutex_lock_slowpath
        |--> __mutex_lock
            |--> __mutex_lock_common                //  获取锁并等待

相关实现

might_sleep:用于标记可能在非原子上下文(允许睡眠)中执行的操作,以确保不会在不允许睡眠的上下文中调用这段代码。

__mutex_trylock_fast:尝试获取互斥锁(mutex),如果获取成功则范围true,获取失败,返回false

获取失败后,调用__mutex_lock_slowpath接口,最终调用__mutex_lock_common接口,该接口才是重头戏

__mutex_lock_common接口主要有几个作用:死锁避免策略、

下面我们看第一部分:死锁避免策略

代码语言:javascript复制
 struct ww_mutex *ww;
 int ret;

 might_sleep();

 ww = container_of(lock, struct ww_mutex, base);
 if (use_ww_ctx && ww_ctx) {
  if (unlikely(ww_ctx == READ_ONCE(ww->ctx)))
   return -EALREADY;

  /*
   * Reset the wounded flag after a kill. No other process can
   * race and wound us here since they can't have a valid owner
   * pointer if we don't have any locks held.
   */
  if (ww_ctx->acquired == 0)
   ww_ctx->wounded = 0;
 }
  • 这段代码片段是针对带有死锁避免机制(Wound-Wait,WW)的互斥锁的操作,是一种多线程环境中避免死锁的策略。让我们逐行分析这段代码的作用
  • struct ww_mutex *ww:声明了一个指向 struct ww_mutex 类型的指针 ww,这是一种带有死锁避免机制的互斥锁的数据结构。
  • might_sleep:用于标记可能在非原子上下文(允许睡眠)中执行的操作,以确保不会在不允许睡眠的上下文中调用这段代码。
  • ww = container_of(lock, struct ww_mutex, base):使用 container_of 宏将传递进来的 lock 指针转换为 struct ww_mutex 结构体的指针。这意味着 lock 指向的是 struct ww_mutex 结构体中的 base 成员。这样可以将更高级别的数据结构与较低级别的数据结构关联起来。
  • if (use_ww_ctx && ww_ctx):如果 use_ww_ctx 为真且 ww_ctx 不为空(即使用死锁避免上下文并且上下文存在)。
  • if (unlikely(ww_ctx == READ_ONCE(ww->ctx))):检查是否已经在同一个上下文中尝试获取锁。如果是,则返回错误码 -EALREADY,表示已经在当前上下文中获取了锁。(避免死锁)
  • if (ww_ctx->acquired == 0):检查是否当前上下文已经没有持有任何锁。如果是,则将 ww_ctxwounded 标志重置为0。

下面我们看第二部分:获取锁阶段

代码语言:javascript复制
 preempt_disable();
 mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);

 if (__mutex_trylock(lock) ||
     mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, NULL)) {
  /* got the lock, yay! */
  lock_acquired(&lock->dep_map, ip);
  if (use_ww_ctx && ww_ctx)
   ww_mutex_set_context_fastpath(ww, ww_ctx);
  preempt_enable();
  return 0;
 }
  • preempt_disable:禁用抢占,确保在获取锁期间不会被抢占。
  • 通过调用__mutex_trylockmutex_optimistic_spin多次尝试获取锁,如果获取锁成功,说明没有其他进程占用,直接返回,否则接着往下执行
  • ww_mutex_set_context_fastpath(ww, ww_ctx):如果使用了"wait-wound"上下文,这个函数会将上下文设置为快速路径,以便在解锁时进行优化。
  • preempt_enable():重新启用抢占,允许系统在后续操作中进行抢占。

下面我们看第三部分:将等待进程置入睡眠状态

代码语言:javascript复制
 spin_lock(&lock->wait_lock);
 /*
  * After waiting to acquire the wait_lock, try again.
  */
 if (__mutex_trylock(lock)) {
  if (use_ww_ctx && ww_ctx)
   __ww_mutex_check_waiters(lock, ww_ctx);

  goto skip_wait;
 }

 debug_mutex_lock_common(lock, &waiter);

 lock_contended(&lock->dep_map, ip);

 if (!use_ww_ctx) {
  /* add waiting tasks to the end of the waitqueue (FIFO): */
  __mutex_add_waiter(lock, &waiter, &lock->wait_list);


#ifdef CONFIG_DEBUG_MUTEXES
  waiter.ww_ctx = MUTEX_POISON_WW_CTX;
#endif
 } else {
  /*
   * Add in stamp order, waking up waiters that must kill
   * themselves.
   */
  ret = __ww_mutex_add_waiter(&waiter, lock, ww_ctx);
  if (ret)
   goto err_early_kill;

  waiter.ww_ctx = ww_ctx;
 }

 waiter.task = current;

 set_current_state(state);
  • 如果不使用"wait-wound"上下文,会将等待任务添加到等待队列的末尾(FIFO顺序)。
  • 如果使用"wait-wound"上下文,会按照时间戳的顺序将等待任务添加到等待队列,并可能唤醒需要终止的等待者。
  • spin_lock(&lock->wait_lock):获取wait_lock自旋锁,这是为了在等待期间操作等待队列时防止竞争。
  • if (__mutex_trylock(lock)):再次尝试获取锁,如果成功获取,说明此时没有竞争,会跳到acquired标签。
  • 根据是否使用"wait-wound"上下文进行不同的等待队列操作:
  • set_current_state(state):设置等待任务的上下文和当前状态

下面我们看第四部分:等待状态处理

代码语言:javascript复制
 for (;;) {
  /*
   * Once we hold wait_lock, we're serialized against
   * mutex_unlock() handing the lock off to us, do a trylock
   * before testing the error conditions to make sure we pick up
   * the handoff.
   */
  if (__mutex_trylock(lock))
   goto acquired;

  /*
   * Check for signals and kill conditions while holding
   * wait_lock. This ensures the lock cancellation is ordered
   * against mutex_unlock() and wake-ups do not go missing.
   */
  if (unlikely(signal_pending_state(state, current))) {
   ret = -EINTR;
   goto err;
  }

  if (use_ww_ctx && ww_ctx) {
   ret = __ww_mutex_check_kill(lock, &waiter, ww_ctx);
   if (ret)
    goto err;
  }

  spin_unlock(&lock->wait_lock);
  schedule_preempt_disabled();

  /*
   * ww_mutex needs to always recheck its position since its waiter
   * list is not FIFO ordered.
   */
  if ((use_ww_ctx && ww_ctx) || !first) {
   first = __mutex_waiter_is_first(lock, &waiter);
   if (first)
    __mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
  }

  set_current_state(state);
  /*
   * Here we order against unlock; we must either see it change
   * state back to RUNNING and fall through the next schedule(),
   * or we must see its unlock and acquire.
   */
  if (__mutex_trylock(lock) ||
      (first && mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, &waiter)))
   break;

  spin_lock(&lock->wait_lock);
 }
 spin_lock(&lock->wait_lock);
acquired:
 __set_current_state(TASK_RUNNING);

 if (use_ww_ctx && ww_ctx) {
  /*
   * Wound-Wait; we stole the lock (!first_waiter), check the
   * waiters as anyone might want to wound us.
   */
  if (!ww_ctx->is_wait_die &&
      !__mutex_waiter_is_first(lock, &waiter))
   __ww_mutex_check_waiters(lock, ww_ctx);
 }

 mutex_remove_waiter(lock, &waiter, current);
 if (likely(list_empty(&lock->wait_list)))
  __mutex_clear_flag(lock, MUTEX_FLAGS);

 debug_mutex_free_waiter(&waiter);

skip_wait:
 /* got the lock - cleanup and rejoice! */
 lock_acquired(&lock->dep_map, ip);

 if (use_ww_ctx && ww_ctx)
  ww_mutex_lock_acquired(ww, ww_ctx);

 spin_unlock(&lock->wait_lock);
 preempt_enable();
 return 0;

err:
 __set_current_state(TASK_RUNNING);
 mutex_remove_waiter(lock, &waiter, current);
err_early_kill:
 spin_unlock(&lock->wait_lock);
 debug_mutex_free_waiter(&waiter);
 mutex_release(&lock->dep_map, 1, ip);
 preempt_enable();
 return ret;

这部分代码是在等待期间的处理:

如果获取到锁,跳转到acquired标签。

  • __set_current_state(TASK_RUNNING):设置当前线程的状态为TASK_RUNNING
  • 如果使用"wait-wound"上下文,检查是否需要继续检查等待者。
  • mutex_remove_waiter:从等待队列中移除等待任务。
  • list_empty:判断是否等待队列为空,如果是,则清除锁的标志。
  • 实时调用__mutex_trylock判断是否获取到锁,如果成功获取,跳转到acquired标签。
  • signal_pending_state:检查是否有信号到达,如果有,返回错误码-EINTR
  • 如果使用"wait-wound"上下文,检查是否需要终止等待者。
  • spin_unlock(&lock->wait_lock):解锁wait_lock,禁用抢占并进入调度等待状态。
  • schedule_preempt_disabled:进入调度等待状态
  • set_current_state:设置当前线程的状态为等待状态。
  • __mutex_trylockmutex_optimistic_spin:尝试获取锁,或者如果是第一个等待任务且可以进行自旋优化,则尝试自旋等待。
  • spin_lock(&lock->wait_lock):获取锁,继续自旋等待

这段代码是内核中用于互斥锁获取的一个复杂实现,其中包含了自旋、等待队列、信号处理和死锁避免等多个关键概念和操作,它旨在在高并发的多线程环境中提供高性能的互斥锁实现。

2.4 mutex_unlock

代码语言:javascript复制
/**
 * mutex_unlock - release the mutex
 * @lock: the mutex to be released
 *
 * Unlock a mutex that has been locked by this task previously.
 *
 * This function must not be used in interrupt context. Unlocking
 * of a not locked mutex is not allowed.
 *
 * This function is similar to (but not equivalent to) up().
 */
void __sched mutex_unlock(struct mutex *lock)
{
#ifndef CONFIG_DEBUG_LOCK_ALLOC
 if (__mutex_unlock_fast(lock))
  return;
#endif
 __mutex_unlock_slowpath(lock, _RET_IP_);
}
EXPORT_SYMBOL(mutex_unlock);

/*
 * Release the lock, slowpath:
 */
static noinline void __sched __mutex_unlock_slowpath(struct mutex *lock, unsigned long ip)
{
 struct task_struct *next = NULL;
 DEFINE_WAKE_Q(wake_q);
 unsigned long owner;

 mutex_release(&lock->dep_map, 1, ip);

 /*
  * Release the lock before (potentially) taking the spinlock such that
  * other contenders can get on with things ASAP.
  *
  * Except when HANDOFF, in that case we must not clear the owner field,
  * but instead set it to the top waiter.
  */
 owner = atomic_long_read(&lock->owner);
 for (;;) {
  unsigned long old;

#ifdef CONFIG_DEBUG_MUTEXES
  DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
  DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);
#endif

  if (owner & MUTEX_FLAG_HANDOFF)
   break;

  old = atomic_long_cmpxchg_release(&lock->owner, owner,
        __owner_flags(owner));
  if (old == owner) {
   if (owner & MUTEX_FLAG_WAITERS)
    break;

   return;
  }

  owner = old;
 }

 spin_lock(&lock->wait_lock);
 debug_mutex_unlock(lock);
 if (!list_empty(&lock->wait_list)) {
  /* get the first entry from the wait-list: */
  struct mutex_waiter *waiter =
   list_first_entry(&lock->wait_list,
      struct mutex_waiter, list);

  next = waiter->task;

  debug_mutex_wake_waiter(lock, waiter);
  wake_q_add(&wake_q, next);
 }

 if (owner & MUTEX_FLAG_HANDOFF)
  __mutex_handoff(lock, next);

 spin_unlock(&lock->wait_lock);

 wake_up_q(&wake_q);
}

结构体名称mutex_unlock

文件位置kernel/locking/mutex.c

主要作用:用于释放一个互斥锁,并唤醒等待这个锁的任务

调用流程

代码语言:javascript复制
mutex_unlock(kernel/locking/mutex.c)
    |--> __mutex_unlock_slowpath
        |--> DEFINE_WAKE_Q          //  定义唤醒队列
        |--> atomic_long_read       //  获取原子变量,即获取当前锁的持有者
        |--> atomic_long_cmpxchg_release    //  判断ock->owner和owner是否相等,如果有,则跳出循环,没有则直接返回
        |--> spin_lock              //  处理等待队列
        |--> list_empty             //  判断等待队列是否为空
        |--> list_first_entry       //  获取等待队列的第一个任务
        |--> wake_q_add             //  加入到唤醒队列    
        |--> spin_unlock

实现流程

mutex_unlock主要有两个步骤,其一是释放互斥锁,其二是唤醒等待队列的任务,核心实现在__mutex_unlock_slowpath

__mutex_unlock_slowpath中主要负责实现这两个工作,第一个释放锁部分如下:

代码语言:javascript复制
/*
  * Release the lock before (potentially) taking the spinlock such that
  * other contenders can get on with things ASAP.
  *
  * Except when HANDOFF, in that case we must not clear the owner field,
  * but instead set it to the top waiter.
  */
 owner = atomic_long_read(&lock->owner);
 for (;;) {
  unsigned long old;

#ifdef CONFIG_DEBUG_MUTEXES
  DEBUG_LOCKS_WARN_ON(__owner_task(owner) != current);
  DEBUG_LOCKS_WARN_ON(owner & MUTEX_FLAG_PICKUP);
#endif

  if (owner & MUTEX_FLAG_HANDOFF)
   break;

  old = atomic_long_cmpxchg_release(&lock->owner, owner,
        __owner_flags(owner));
  if (old == owner) {
   if (owner & MUTEX_FLAG_WAITERS)
    break;

   return;
  }

  owner = old;
 }
  • 调用atomic_long_read来获取当前锁的拥有者
  • 然后调用atomic_long_cmpxchg_release来比较当前锁的拥有者是否与之前获取的相同,以此来判断是否有其他线程操作了锁
  • old == owner:如果相同,则表示没有其他线程获取锁;如果有,则将owner = old旧的锁的持有者赋值给当前继续判断。
  • 进而判断是否有等待者if (owner & MUTEX_FLAG_WAITERS),如果有,就退出循环,执行下面的唤醒等待者的任务,如果没有就直接return

__mutex_unlock_slowpath中第二个唤醒等待线程部分如下:

代码语言:javascript复制
 spin_lock(&lock->wait_lock);
 debug_mutex_unlock(lock);
 if (!list_empty(&lock->wait_list)) {
  /* get the first entry from the wait-list: */
  struct mutex_waiter *waiter =
   list_first_entry(&lock->wait_list,
      struct mutex_waiter, list);

  next = waiter->task;

  debug_mutex_wake_waiter(lock, waiter);
  wake_q_add(&wake_q, next);
 }

 if (owner & MUTEX_FLAG_HANDOFF)
  __mutex_handoff(lock, next);

 spin_unlock(&lock->wait_lock);

 wake_up_q(&wake_q);
  • 开始调用spin_lockspin_unlock来避免线程的并发,保护临界资源
  • if (!list_empty(&lock->wait_list)):判断等待队列是否为空
  • list_first_entry:获取等待队列的第一个任务
  • wake_q_add:将该任务加入到唤醒队列中
  • if (owner & MUTEX_FLAG_HANDOFF):判断当前锁是否被标记为HANDOFF。如果是,它会调用__mutex_handoff函数,将锁的所有权交给等待者。
  • wake_up_q:唤醒等待队列

互斥锁的操作确实比较复杂,其底层牵涉到了原子操作,自旋锁,唤醒队列等操作

3、自旋锁与互斥体区别

实现上的区别

  • 自旋锁:当锁不能获取到时,一直在原地自旋等待,直到锁变为可用,自旋锁等待期间,不会让线程处于睡眠状态,而是一直忙等。
  • 互斥锁:当锁不能获取到时,将该线程直接置入睡眠状态,直到互斥体可用被唤醒。

时间开销

  • 自旋锁:在等待锁的过程中,自旋锁所浪费的时间为等待获取锁的时间 执行临界区的时间
  • 互斥锁:其时间开销为等待锁的时间 进程上下文切换的时间 执行临界区的时间

总结:如果临界区很小,锁的持有时间非常短,那么使用自旋锁可能更为高效。反之,如果临界区大,锁的持有时间较长,或者涉及到I/O操作等可能导致线程睡眠的操作,那么使用互斥体可能更为合适。

嵌入式艺术

0 人点赞