Skip to content

Latest commit

 

History

History
437 lines (328 loc) · 27.2 KB

linux-sync-4.md

File metadata and controls

437 lines (328 loc) · 27.2 KB

内核同步原语. 第四部分.

引言

这是本章的第四部分 chapter ,本章描述了内核中的同步原语,且在之前的部分我们介绍完了 自旋锁信号量 两种不同的同步原语。我们将在这个章节持续学习 同步原语,并考虑另一简称 互斥锁 (mutex) 的同步原语,全名 MUTual EXclusion

本书 前面所有章节一样,我们将先尝试从理论面来探究此同步原语,在此基础上再探究Linux内核所提供用来操作 互斥锁API

让我们开始吧。

互斥锁 的概念

从前一 部分,我们已经很熟悉同步原语 信号量。其表示为:

struct semaphore {
	raw_spinlock_t		lock;
	unsigned int		count;
	struct list_head	wait_list;
};

此结构体包含 的状态和一由等待者所构成的列表。根据 count 字段的值,semaphore 可以向希望访问该资源的多个进程提供对该资源的访问。互斥锁 的概念与 信号量 相似,却有着一些差异。信号量互斥锁 同步原语的主要差异在于 互斥锁 具有更严格的语义。不像 信号量,单一时间一 互斥锁 只能由一个 进程 持有,并且只有该 互斥锁持有者 能够对其进行释放或解锁的动作。另外的差异是 API 实现,信号量 同步原语强置重新调度 (重新调度) 在等待列表中的进程。互斥锁 API 的实现则允许避免这种状况,进而减少这种昂贵的 上下文切换 操作。

互斥锁 同步原语由如下结构呈现在Linux内核中:

struct mutex {
        atomic_t                count;
        spinlock_t              wait_lock;
        struct list_head        wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
        struct task_struct      *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
        struct optimistic_spin_queue osq;
#endif
#ifdef CONFIG_DEBUG_MUTEXES
        void                    *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
        struct lockdep_map      dep_map;
#endif
};

此结构被定义在 include/linux/mutex.h 头文件并且包含着与 信号量 结构体类似的字段。互斥锁 结构的第一个字段是 - count。这个字段的值代表着该 互斥锁 的状态。在 count 字段为 1 的情况下,说明该 互斥锁 处于 无锁 状态。当 count 字段值处于 ,表示该 互斥锁 处于 上锁 状态。此外,count 字段的值可能是 的,在这种情况下表该 互斥锁 处于 上锁 状态且可能有其他的等待者。

互斥锁 结构的下面两个字段 - wait_lockwait_list 分别是用来保护 等待队列自旋锁,以及由某个锁的等待者们所构成的 等待队列 列表。你可能注意到了,互斥锁信号量 结构体的相似处到此结束。剩余的 互斥锁 结构体字段,则如同我们所见,取决于Linux内核的不同配置选项。

第一个字段 - owner 代表持有该锁的 进程。正如我们看到的,此字段是否存在于 mutex 结构体中取决于 CONFIG_DEBUG_MUTEXESCONFIG_MUTEX_SPIN_ON_OWNER 的内核配置选项。这个字段和下个 osq 字段的主要用途在于支援我们之后会介绍的乐观自旋 (optimistic spinning) 功能。最后两个字段 - magicdep_map 仅被用于 调试 模式。magic 字段用来存储跟 互斥锁 用来调试的相关资讯,而第二个字段 - lockdep_map 则在Linux内核的 锁验证器 (lock validator) 中被使用。

我们已探究完 互斥锁 的结构,现在,我们可以开始思考此同步原语是如何在Linux内核中运作的。你可能会猜,一个想获取锁的进程,在允许的情况下,就直接对 mutex->count 字段做递减的操作来试图获取锁。而如果进程希望释放一个锁,则递增该字段的值即可。大致上没错,但也正如你可能猜到的那样,在Linux内核中这可能不会那么简单。

事实上,当某进程试图获取 互斥锁 时,三条可能路径的选择策略要根据当前 mutex 的状态而定。

  • fastpath;
  • midpath;
  • slowpath.

第一条路径或 fastpath 是最快的,正如你从名称中可以领会到的那样。这种情况下的所有事情都很简单。因 互斥锁 还没被任何人获取,其 mutex 结构中的count字段可以直接被进行递减操作。在释放该 互斥锁 的情况下,算法是类似的,进程对该 互斥锁 结构中的 count 字段进行递增的操作即可。当然,所有的这些操作都必须是 原子 的。

是的,这看起来很简单。但如果一个进程想要获取一个已经被其他进程持有的 互斥锁 时,会发生什么事?在这种情况下,控制流程将被转由第二条路径决定 - midpathmidpath乐观自旋 会在锁的持有者仍在运行时,对我们已经熟悉的 MCS lock 进行 循环 操作。这条路径只有在没有其他更高优先级的进程准备运行时执行。这条路径被称作 乐观 是因为等待的进程不会被睡眠或是调度。这可以避免掉昂贵的 上下文切换.

在最后一种情况,当 fastpathmidpath 都不被执行时,就会执行最后一条路径 - slowpath。这条路径的行为跟 信号量 的锁操作相似。如果该锁无法被进程获取,那么此进程将被以如下的结构表示加入 等待队列

struct mutex_waiter {
        struct list_head        list;
        struct task_struct      *task;
#ifdef CONFIG_DEBUG_MUTEXES
        void                    *magic;
#endif
};

此结构定义在include/linux/mutex.h 头文件,且使用上可能会被睡眠。在研究Linux内核提供用来操作 互斥锁API 前,让我们先研究 mutex_waiter 结构。如果你有阅读此章节的 前一部分,你可以能会注意到 mutex_waiter 结构与 kernel/locking/semaphore.c 源代码中的 semaphore_waiter 结构相似:

struct semaphore_waiter {
        struct list_head list;
        struct task_struct *task;
        bool up;
};

它也包含 listtask 字段,用来表示该互斥锁的等待队列。这之间有一个差异是 mutex_waiter 没有 up 字段,但却有一个可以根据内核设定CONFIG_DEBUG_MUTEXES 选项而存在的 magic 字段,它能用来存储一些在调试互斥锁 问题时有用的资讯。

现在我们知道了什么是 互斥锁 以及他是如何在Linux内核中呈现的。在这种情况下,我们可以开始一窥Linux内核所提供用来操作 互斥锁API

互斥锁 API

至此,在前面的章节我们已经了解什么是 互斥锁 原语,而且看过了Linux内核中用来呈现 互斥锁互斥锁 结构。现在是时后开始研究用来操弄互斥锁的 API 了。详细的 互斥锁 API 被记录在 include/linux/mutex.h 头文件。一如既往,在考虑如何获取和释放一 互斥锁 之前,我们需要知道如何初始化它。

初始化 互斥锁 有两种方法,第一种是静态初始化。为此,Linux内核提供以下宏:

#define DEFINE_MUTEX(mutexname) \
        struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)

让我们来研究此宏的实现。正如我们所见,DEFINE_MUTEX 宏接受新定义的 互斥锁 名称,并将其扩展成一个新的 互斥锁 解构。此外新 mutex 结构由 __MUTEX_INITIALIZER 宏初始化。让我们看看 __MUTEX_INITIALIZER的实现:

#define __MUTEX_INITIALIZER(lockname)         \
{                                                             \
       .count = ATOMIC_INIT(1),                               \
       .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock), \
       .wait_list = LIST_HEAD_INIT(lockname.wait_list)        \
}

这个宏被定义在 相同的 头文件,并且我们可以了解到它初始化了 mutex 解构体中的字段。count 被初始化为 1 ,这代表该互斥锁状态为 无锁wait_lock 自旋锁 被初始化为无锁状态,而最后的栏位 wait_list 被初始化为空的 双向列表

第二种做法允许我们动态初始化一个 互斥锁。为此我们需要调用在 kernel/locking/mutex.c 源码文件的 __mutex_init 函数。事实上,大家很少直接调用 __mutex_init 函数。取而代之,我们使用下面的 mutex_init :

# define mutex_init(mutex)                \
do {                                                    \
        static struct lock_class_key __key;             \
                                                        \
        __mutex_init((mutex), #mutex, &__key);          \
} while (0)

我们可以看到 mutex_init 宏定义了 lock_class_key 并且调用 __mutex_init 函数。让我们看看这个函数的实现:

void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
        atomic_set(&lock->count, 1);
        spin_lock_init(&lock->wait_lock);
        INIT_LIST_HEAD(&lock->wait_list);
        mutex_clear_owner(lock);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
        osq_lock_init(&lock->osq);
#endif
        debug_mutex_init(lock, name, key);
}

如我们所见,__mutex_init 函数接收三个参数:

  • lock - 互斥锁本身;
  • name - 调试用的互斥锁名称;
  • key - 锁验证器用的key.

__mutex_init 函数的一开始,我们可以看到 互斥锁 状态被初始化。我们通过 atomic_set 函数原子地将给定的变量赋予给定的值来将 互斥锁 的状态设为 无锁。之后我们可以看到 自旋锁 被初始化为解锁状态,这将保护 互斥锁等待队列,之后始化 互斥锁等待队列。 之后我们清除该 的持有者并且通过调用 include/linux/osq_lock.h 头文件中的 osq_lock_init 函数来初始化乐观队列(optimistic queue)。 而此函数也只是将乐观队列的的tail设定为无锁状态:

static inline bool osq_is_locked(struct optimistic_spin_queue *lock)
{
        return atomic_read(&lock->tail) != OSQ_UNLOCKED_VAL;
}

__mutex_init 函数的结尾阶段,我们可以看到它调用了 debug_mutex_init 函数,不过就如同我在此 章节 前面提到的那样,我们并不会在此章节讨论调试相关的内容。

互斥锁 结构被初始化后,我们可以继续研究 互斥锁 同步原语的 上锁解锁 API。mutex_lockmutex_unlock 函数的实现位于 kernel/locking/mutex.c 源码文件。首先,让我们从 mutex_lock 的实现开始吧。代码如下:

void __sched mutex_lock(struct mutex *lock)
{
        might_sleep();
        __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
        mutex_set_owner(lock);
}

我们可以从 include/linux/kernel.h 头文件的 mutex_lock 函数开头看到 might_sleep 宏被调用。该宏的实现取决于 CONFIG_DEBUG_ATOMIC_SLEEP 内核配置选项,如果这个选项被启用,该宏会在 原子 上下文中执行时打印栈追踪。此宏为调试用途上的帮手,除此之外,这个宏什么都没做。

might_sleep 宏之后,我们可以看到 __mutex_fastpath_lock 函数被调用。此函数是体系结构相关的,并且因为我们此书探讨的是 x86_64 体系结构, \ __mutex_fastpath_lock 的实现部分位于 arch/x86/include/asm/mutex_64.h 头文件。正如我们从__mutex_fastpath_lock 函数名称可以理解的那样,此函数将尝试通过 fast path 试图获取一个锁,或者换句话说此函数试图递减一个给定互斥锁的 count 字段。

__mutex_fastpath_lock 函数的实现由两部分组成。第一部分是 内联汇编 。让我们看看:

asm_volatile_goto(LOCK_PREFIX "   decl %0\n"
                              "   jns %l[exit]\n"
                              : : "m" (v->counter)
                              : "memory", "cc"
                              : exit);

首先,让我们将注意力放在 asm_volatile_goto。这个宏被定义在 include/linux/compiler-gcc.h 头文件,并且它只是扩展成两个内联汇编:

#define asm_volatile_goto(x...) do { asm goto(x); asm (""); } while (0)

第一个汇编包含 goto 特性,而第二个空的内联汇编是 屏障 (barrier)。现在回去看看我们的内联汇编。正如我们所看到的,它从被 LOCK前缀 的宏定义开始,该宏仅是扩展 lock 指令:

#define LOCK_PREFIX LOCK_PREFIX_HERE "\n\tlock; "

正如我们从前面部分已经知道的那样,该指令允许 原子地 执行被它前缀修饰的指令 。所以,在我们的汇编语句的第一步中,我们尝试将给定的 mutex->counter 的字段递减。再下一步,如 mutex->counter 在被递减完后的值为非负数,那么 jns 指令将会执行跳转到 exit 标签所在处。exit 标签是 __mutex_fastpath_lock 函数的第二部分,而它仅仅是指到该函数的出口:

exit:
        return;

就目前为止, __mutex_fastpath_lock 函数的实现看起来还挺简单。但 mutex->counter 的字段有可能在递减后是负的,在这种情况下:

fail_fn(v);

将在内联汇编后被调用。fail_fn__mutex_fastpath_lock 函数的第二个参数,其为一指针指到用来获取给定锁的 midpath/slowpath 路径函数。我们的这个例子中fail_fn__mutex_lock_slowpath 函数。在我们一窥 __mutex_lock_slowpath 函数的实现前,我们先把 mutex_lock 函数的实现看完。在最简单的情况下,锁会被某进程通过执行 __mutex_fastpath_lock 路径后成功被获取。这种情况下,我们仅在 mutex_lock 结尾调用即可。

mutex_set_owner(lock);

mutex_set_owner 函数被定义在 kernel/locking/mutex.h 头文件,这将一个锁的持有者设为当前进程:

static inline void mutex_set_owner(struct mutex *lock)
{
        lock->owner = current;
}

另外一种情况,让我们研究一进程因为锁已被其它进程持有,而无法顺利获得的情况,我们已经知道在这种情境下 __mutex_lock_slowpath 函数会被调用,让我们开始研究这个函数。此函数被定义在 kernel/locking/mutex.c 源码文件,并且这个函数由 container_of 宏开头,意图通过 __mutex_fastpath_lock 给的互斥锁状态变量来获得互斥锁本身:

__visible void __sched
__mutex_lock_slowpath(atomic_t *lock_count)
{
        struct mutex *lock = container_of(lock_count, struct mutex, count);

        __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
                            NULL, _RET_IP_, NULL, 0);
}

之后将获得的 互斥锁 带入函数 __mutex_lock_common 进行调用。 __mutex_lock_common 函数一开始会关闭 抢占 直到下次的重新调度:

preempt_disable();

之后我们进到乐观自旋阶段。如同我们知道的,这个阶段取决于CONFIG_MUTEX_SPIN_ON_OWNER 内核配置选项。如该选项被禁用,我们就跳过这个阶段并迈入最后一条路径 - 获取 互斥锁slowpath

if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
        preempt_enable();
        return 0;
}

首先 mutex_optimistic_spin 函数检查我们需不需要被重新调度,或者换句话说,没有其他优先级更高的任务可以运行。 如果这个检查成立我们就将当前 循环者(spinner) 更新至 MCS 锁的等待队列。这种情况下互斥锁的获取在某个时间只会由一个循环者获得:

osq_lock(&lock->osq)

之后下个步奏,我们在下面的迭代中循环:

while (true) {
    owner = READ_ONCE(lock->owner);

    if (owner && !mutex_spin_on_owner(lock, owner))
        break;

    if (mutex_try_to_acquire(lock)) {
        lock_acquired(&lock->dep_map, ip);

        mutex_set_owner(lock);
        osq_unlock(&lock->osq);
        return true;
    }
}

并试图获取该锁。首先我们尝试获取该锁的持有者资讯,如果持有者存在 (在进程已释放互斥锁的情况下可能不存在),我们就在持有者释放锁前于 mutex_spin_on_owner 函数中等待。如果在等待锁持有者的过程中遭遇了高优先级任務,我们就离开循环进入睡眠。在另外一种情况下,该锁被进程释放,那我们就试图通过 mutex_try_to_acquired 来获取该锁。如果这个操作顺利完成,我们就为该互斥锁设定新的持有者,将我们自身从MCS 等待队列中移除,并从 mutex_optimistic_spin 函数中离开。至此锁就被一进程获取完成,我们接着开启 抢占 并从 __mutex_lock_common 函数中离开:

if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
    preempt_enable();
    return 0;
}

以上就是这种情况的全部了。

在其他情况下,一切可能都不那么顺利。例如,在我们于 mutex_optimistic_spin 迭代中循环的同时可能有新任务出现,更甚我们可能在进入 mutex_optimistic_spin循环前,系统就存在着更高优先级别的任务了。又或者 CONFIG_MUTEX_SPIN_ON_OWNER 内核选项根本是禁用的,在这种情况下 mutex_optimistic_spin 什么都不会做:

#ifndef CONFIG_MUTEX_SPIN_ON_OWNER
static bool mutex_optimistic_spin(struct mutex *lock,
                                  struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
    return false;
}
#endif

在所有这种情况下,__mutex_lock_common 函数的行为就像 信号量。我们就再次尝试获取锁,因为锁的持有者在此之前可能已经将它释放:

if (!mutex_is_locked(lock) &&
   (atomic_xchg_acquire(&lock->count, 0) == 1))
      goto skip_wait;

在失败的情况下,希望获取锁的进程将被添加到等待者列表中

list_add_tail(&waiter.list, &lock->wait_list);
waiter.task = task;

在成功的情况下,我们更新该锁的持有者、允许抢占,并从__mutex_lock_common 函数离开:

skip_wait:
        mutex_set_owner(lock);
        preempt_enable();
        return 0;

在这个情况下锁将被获取。如果到这边都无法获取锁,我们将进入下面的迭代中:

for (;;) {

    if (atomic_read(&lock->count) >= 0 && (atomic_xchg_acquire(&lock->count, -1) == 1))
        break;

    if (unlikely(signal_pending_state(state, task))) {
        ret = -EINTR;
        goto err;
    }

    __set_task_state(task, state);

     schedule_preempt_disabled();
}

这边我们再尝试获取一次锁,如果成功的话就离开。是的,我们会在循环前的那次尝试失败后立再一次的尝试获取锁。我们需要这样做,以确保一旦锁在之后被解锁时,我们会被唤醒。 除此之外,它还允许我们在睡眠后获得锁。在另一种情况下,我们检查当前进程是否有pending 信号,如果进程在等待锁获取期间被 信号 中断,则退出。在迭代的结尾因为我们未能成功获取锁,所以我们将任务的状态设为 TASK_UNINTERRUPTIBLE 并通过调用 schedule_preempt_disabled 函数去睡眠。

这就是全部,我们已经考虑了进程获取锁时可能通过的所有三条路径。现在让我们来研究下 mutex_unlock 是如何被实现的。将被一个希望释放锁的进程调用 mutex_unlock__mutex_fastpath_unlock 也将被从 arch/x86/include/asm/mutex_64.h 头文件中调用:

void __sched mutex_unlock(struct mutex *lock)
{
    __mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
}

__mutex_fastpath_unlock 函数的实现与__mutex_fastpath_lock 函数非常相似:

static inline void __mutex_fastpath_unlock(atomic_t *v,
                                           void (*fail_fn)(atomic_t *))
{
       asm_volatile_goto(LOCK_PREFIX "   incl %0\n"
                         "   jg %l[exit]\n"
                         : : "m" (v->counter)
                         : "memory", "cc"
                         : exit);
       fail_fn(v);
exit:
       return;
}

事实上, 其中只有一个区别,我们递增 mutex->count 字段。这个操作过后锁将呈现 无锁 状态。随着 互斥锁 释放,如等待队列有条目的话我们必须将其更新。在这种情况下,fail_fn 函数会被调用,也就是 __mutex_unlock_slowpath__mutex_unlock_slowpath 函数仅是从给定的mutex->count 来获取 mutex 实例,并调用 __mutex_unlock_common_slowpath 函数:

__mutex_unlock_slowpath(atomic_t *lock_count)
{
      struct mutex *lock = container_of(lock_count, struct mutex, count);

      __mutex_unlock_common_slowpath(lock, 1);
}

__mutex_unlock_common_slowpath 函数中,如果等待队列非空,我们将从中获取第一个条目,并唤醒相关的进程:

if (!list_empty(&lock->wait_list)) {
    struct mutex_waiter *waiter =
           list_entry(lock->wait_list.next, struct mutex_waiter, list);
                wake_up_process(waiter->task);
}

在此之后,前一个进程将释放互斥锁,并由另一个在等待队列中的进程获取。

这就是全部了. 我们已经研究完两个主要用来操作 互斥锁 的 API mutex_lockmutex_unlock。除此之外,Linux内核还提供以下 API:

  • mutex_lock_interruptible;
  • mutex_lock_killable;
  • mutex_trylock.

以及对应相同前缀的 unlock 函数. 我们就不在此解释这些 API 了, 因为他们跟 信号量 所提供的 API 类似。你可以通过阅读 前一部分 来了解更多。

总结

至此我们结束了Linux内核 同步原语 章节的第四部分。在这部分我们见到了新的同步原语 - 互斥锁。就理论上来说,此同步原语与 信号量 非常相似。事实上,互斥锁 代表着二进制信号量。但它的实现与Linux内核中 信号量 实现并不同。在下一部分中,我们将继续深入研究Linux内核中的同步原语。

如果你有问题或者建议,请在twitter 0xAX上联系我,通过 email 联系我,或者创建一个issue

Please note that English is not my first language and I am really sorry for any inconvenience. If you found any mistakes please send me PR to linux-insides.

链接