Linux Kernel: Synchronization

Table of Contents

1. Linux Kernel: Synchronization

1.1. 内核抢占

内核抢占 (kernel preemption) 在 ret_from_intr 时已经描述过, 简单的说, 内核抢占是指:

一个进程正在 kernel mode 执行时能否被其它进程抢占.

举例来说, 若进程 A 正在 kernel mode 执行一个 syscall, 此时发生时钟中断 (schedule_tick), schedule_tick 更新 A 的 time slice 时发现它的 time slice 已经用尽,如果当前打开了内核抢占, 则 ret_from_intr -> resume_kernel 时会调用 schedule. 如果没有打开内核抢占, 则 A 虽然 time slice 耗尽, resume_kernel 还是会返回到 A 正在执行在 syscall, 直到 syscall 返回(或 A 主动放弃 CPU, 例如阻塞)

内核抢占相关的代码主要有:

  1. resume_kernel

    #ifdef CONFIG_PREEMPT
    ENTRY(resume_kernel)
        cli
        cmpl $0,TI_preempt_count(%ebp)      # non-zero preempt_count ?
        jnz restore_all
    need_resched:
        movl TI_flags(%ebp), %ecx   # need_resched set ?
        testb $_TIF_NEED_RESCHED, %cl
        jz restore_all
        testl $IF_MASK,EFLAGS(%esp)     # interrupts off (exception path) ?
        jz restore_all
        call preempt_schedule_irq
        jmp need_resched
    #else
    #define resume_kernel           restore_all
    #endif
    

    当 CONFIG_PREEMPT 为假时, 即内核抢占功能没有打开时, resume_kernel 只是简单的一个 restore_all.

    当 CONFIG_PREEMPT 为真时, resume_kernel 会先检查 thread_info->preempt_cout, 若 preempt_count 为 0, 表示可以进行内核抢占, 则根据 TIF_NEED_RESCHED 标记决定是否调用 reschedule 来调度另一个进程执行

  2. preempt_count

    preempt_count 是 thread_info 的一个成员, 大小为 4 字节, 但它的值并不仅仅用来表示 `preempt_count`:

    preempt_count 的值被下面三个 MASK 分为三部分:

    1. PREEMPT_MASK: 0x000000ff

      preempt_count 的 bit 0~7 用来表示 `preempt_count`, preempt_disable/enable 负责加减这些 bit

      preempt_disable 将相应的部分加 1, preempt_enable 减 1

    2. SOFTIRQ_MASK: 0x0000ff00

      bit 8~15 表示 softirq 的计数, do_softirq 时的 local_bh_disable/enable 负责加减这些 bit.

      do_softirq 开始时会通过 local_bh_disable 将相应的部分加 1, do_softirq 结束时通过 local_bh_enable 减 1

    3. HARDIRQ_MASK: 0x0fff0000

      其余 bit 表示 hardirq 的计数, do_IRQ 时的 irq_enter 和 irq_exit 负责加减这些 bit.

      do_IRQ 开始时通过 irq_enter 将相应的部分加 1, 结束时通过 irq_exit 减 1

    resume_kernel 时要求 preempt_count 为 0 才能进行内核抢占, 也就要求:

    1. preempt_enable
    2. 当前没有执行 softirq
    3. 当前没有执行 hardirq

    以上三点实际上意味着 kernel 只有在执行 syscall 时被打断 (而不是在执行 softirq 或 hardirq 时被打断) 且 preempt_enable 时才能发生内核抢占

1.2. 原子操作

x86 上原子操作 (aotmic operations) 的分类:

1.2.1. 单核 (UP)

单核系统上决定一个操作是不是原子操作的主要因素是 interrupt. 一般情况下, CPU 每执行一条指令都会检查是否发生了 interrupt, 所以单条指令一般都是原子操作, 但有一个例外: 带有 rep 前缀 (f2, f3) 的指令会重复执行, 且每次重复时都会检查是否有 interrupt, 所以这种指令不是原子操作

1.2.2. 多核 (MP)

多核系统决定一个操作是否是原子操作的主要因素并不是 interrupt, 因为单条指令虽然不会被 interrupt 打断, 但却可能因为多核同时执行的原因变成非原子操作.

以 inc [mem] 为例: inc [mem] 实际上包含三步操作:

  1. 从 mem 取值 (Read)
  2. 将取到的值加 1 (Modify)
  3. 将新值写回 mem (Write)

如果两个核 ca 和 cb 同时执行这条指令, 且顺序为 ca-1, cb-1, ca-2, cb-2, ca-3, cb-3, 则最后 mem 的值是 [mem]+1 而不是 [mem]+2

一般来说, 除了中断的因素, 多核系统上决定操作是否为原子操作还需要考虑 " 内存读写":

只进行零次或一次内存访问的操作是原子操作

例如:

  1. ✘ inc [mem] 这种指令包含两次内存操作
  2. ✔ inc [reg] 不操作内存
  3. ✘ mov eax [mem], 但 mem 并不是 4-byte 对齐的
  4. ✔ mov eax [mem], 且 mem 是 4-byte 对齐的

针对 inc, add, dec, sub, xchg 等 Read-Modify-Write 类的指令, x86 提供了一个 LOCK 前缀1, 可以锁住 memory bus, 其它 CPU 无法再操作内存, 所以有 LOCK 前缀的 Read-Modify-Write 指令是原子的.

1.2.3. 高级语言中的原子操作

一般情况下, 高级语言中的"疑似"原子操作的语句很多时候不是原子操作, 例如:

  1. x = xxxx

    int x = 10;
    int64_t y = 10000000000000000;
    

    对应的汇编为:

    movl    $10, -12(%ebp)
    movl    $1874919424, -24(%ebp)
    movl    $2328306, -20(%ebp)
    

    可见 c 代码的第一行 x=10 在 UP 和 MP 都是原子操作 (-12(%ebp) 是 4-byte 对齐的), 但第二行对 int64_t 的赋值并不是原子操作

  2. x++

    x++ 可能被直接被编译为 inc 或 add 指令, 对于 UP, 是原子的, 但对于 MP, 并不是原子的.

    x++ 也可能被编译为:

    mov [mem] %eax
    inc %eax
    mov %eax [mem]
    

    显然在 UP 和 MP 都不是原子的.

所以高级语言中的基本操作无法保证是否是原子操作, 因此 kernel 使用汇编实现了一系统的 atomic_xxx 函数, 以支持原子操作, 例如:

static __inline__ void atomic_add(int i, atomic_t *v) {
    __asm__ __volatile__(
        LOCK "addl %1,%0"
        :"=m" (v->counter)
        :"ir" (i), "m" (v->counter));
}

1.3. 屏障

屏障 (barrier, fence) 是和乱序执行有关的概念. 编译器和 CPU 为了优化代码通常会采用乱序执行的方式, 乱序执行能充分的利用 CPU 的流水线功能 (pipeline stall 问题), 例如原始代码为:

mov %0 [mem1]
add %1 [mem1]
mov %2 [mem2]
mov %3 [mem3]

通过编译器或 CPU 的乱序执行优化后可能变成:

mov %0 [mem1]
mov %2 [mem2]
mov %3 [mem3]
add %1 [mem1]

因为原始代码中第二行 add 指令会造成 pipeline stall 问题, 如果程序由于逻辑上的原因要求第二行必须不能被乱序优化怎么办? 解决的方法就是 memory barrier

1.3.1. 内存屏障

内存屏障 (memory barrier), 是一类 CPU 指令, 这些指令会禁止 CPU 乱序执行, CPU 会保证 memory barrier 之前的指令先执行完才会执行 memory barrier 之后的指令.

x86 下以下指令是 memory barrier:

  1. 操作 IO 端口的指令 (in, out)
  2. LOCK
  3. 操作控制寄存器的指令 (cli, sti 等)
  4. lfence, mfence, sfence, iret …

Kernel 提供了一个 mb 宏做为 memory barrier, 其实现为:

#define mb() alternative("lock; addl $0,0(%%esp)", "mfence", X86_FEATURE_XMM2)

1.3.2. 优化屏障

除了 memory barrier, 还存在一种叫做优化屏障 (optimization barrier) 的屏障, 这种 barrier 并不是一种 CPU 指令, 而是给编译器的一个提示.

kernel 提供了一个 barrier 宏做为 optimization barrier:

#define barrier() __asm__ __volatile__("": : :"memory")

这个 barrier 宏主要有两个作用:

  1. __volatile__ 阻止编译器 barrier 这条指令与它之前或之后的指令交换位置, 导致编译器无法把 barrier 之后的指令移到到 barrier 之前, 反之亦然
  2. "memory" 的意义为告诉编译器: barrier 之后的代码应该认为所有内存都可能已经被修改了, 所以不能再用之前寄存器的值来代替内存访问, 看起来 optimization barrier 和 volatile 用来修饰变量时的意义类似, 只不过它不仅仅是针对某一个变量, 而是针对整块的代码.

optimization barrier 阻止了编译器生成乱序的代码, memory barrier 阻止 CPU 乱序执行, 通过两者的配合, 可以确保程序确实是以代码上显示的逻辑和顺序在运行.

1.4. 自旋锁

在介绍自旋锁 (spin lock) 之前, 先考虑一下在单核下不使用锁如何来保护临界区 (critical region)

  1. 如果不支持内核抢占, 且临界区不会被 interrupt 使用, 则不需要任何措施保护临界区
  2. 如果支持内核抢占, 且临界区不会被 interrupt 使用, 则通过 preempt_disable 就可以保护临界区了
  3. 如果临界区会被 interrupt 使用, 则在前面上基础上 (preempt_disable) 通过 local_irq_disable 关中断就可以了. 因为:
    1. 若当前是 exception 正在使用临界区, 则关中断可以防止 interrupt 使用它
    2. 若当前是 interrupt 正在使用临界区,
      • 如果临界区只会被当前这一种 irq handler 使用, 则不需要其它措施, 因为同一种 irq handler 不需要考虑重入 (PIC 上对应的 irq line 已经被 CPU mask off 了).
      • 若其它种 irq handler 也会使用临界区, 则通过关中断可以防止其它 interrupt 使用它. 而且 interrupt 返回前, exception 是不会执行的, 也就无法使用临界区

综上, UP 最多通过关闭内核抢占和关中断就可以保护临界区了

但是到了多核下仅仅用上面两种方法是无法保护临界区的: 禁用内核抢占和关中断后其它 CPU 还是可以执行临界区的代码, 这时就需要 spin lock 来做额外的保护了

1.4.1. spin_lock

spin_lock 是由 BUILD_LOCK_OPS 这个宏定义的

spin_lock:
  preempt_disable();
  for (;;):
    if (likely(_raw_spin_trylock(lock))):
      break;
    // 若 _raw_spin_trylock 返回 0, 表示 spin lock 是 locked 状态, 则在
    // while 中 spin, 直到 spin_lock->slock 为 1, 然后再次尝试.
    //
    // 在 spin 的过程中, 内核抢占是开启的, 因为此时代码并没有在临界区中.
    // 一旦 _raw_spin_trylock 成功, 内核抢占一定是关闭的
    preempt_enable();
    while (!spin_can_lock(lock)):
      cpu_relax();
    preempt_disable();
1.4.1.1. UP 上的 spin_lock
// 若 CONFIG_SMP 为 false, 即 UP, 则 _raw_spin_trylock 直接返回 1
#define _raw_spin_trylock(lock) (((void)(lock), 1))

可见 UP 上的 spin_lock 等同于 preempt_disable

1.4.1.2. MP 上的 spin_lock
// 在 MP 上, _raw_spin_trylock 将 spin_lock->slock 置为 0, 并返回 slock
// 的旧值
//
// 若旧值为 1, 说明 spin lock 是 unlocked 状态, _raw_spin_trylock 返回 1,
// 上层的 spin_lock 返回, 获得 spin_lock.
//
// 若旧值为 0, 说明 spin lock 是 locked 状态, _raw_spin_trylock 返回 0,
// 上层 spin_lock 继续循环
//
int _raw_spin_trylock(spinlock_t *lock):
  char oldval;
  __asm__ __volatile__(
      "xchgb %b0,%1"
      :"=q" (oldval), "=m" (lock->slock)
      :"0" (0) : "memory");
  return oldval > 0;

1.4.2. spin_unlock

UP 上的 spin_unlock 等同于 preempt_enable

MP 上的 spin_unlock 为:

spin_unlock:
  movb $1, slp->slock
  preempt_enable();

1.5. 信号量

信号量 (semaphore) 是同步原语 (synchronization primitives) 2 的一种, 实际上, 许多其它的同步原语 (mutex, lock, …) 都可以用信号量来实现, 例如 mutex 可以用二元信号量 (binary semaphore) 来实现.

semaphore 简单的定义为:

它维护了一个整数: count, 定义了两个操作 P/V3, 或者叫 down/up.

  • P/down 操作

    将 count 减 1, 若 count >= 0 则返回, 否则等待被 V 操作唤醒

  • V/up 操作

    将 count 加 1, 若 count > 0, 则返回, 否则唤醒之前因为 P 操作等待的进程

可见 count 可以看作是 "可用资源的数量", 而负值的 count 可以看作是 "需求的资源的数量"

1.5.1. kernel 中的 semaphore

1.5.1.1. down
down:
        movl $sem->count,%ecx
        # 将 count 减 1
        lock; decl (%ecx);
        # 若 count >= 0, 返回
        jns 1f
        lea %ecx, %eax
        pushl %edx
        pushl %ecx
        # 资源不可用, 调用  __down 等待
        call __down
        popl %ecx
        popl %edx
1:
__down(struct semaphore * sem):
  struct task_struct *tsk = current;
  DECLARE_WAITQUEUE(wait, tsk);
  unsigned long flags;

  tsk->state = TASK_UNINTERRUPTIBLE;
  add_wait_queue_exclusive_locked(&sem->wait, &wait);

  sem->sleepers++;
  for (;;):
    int sleepers = sem->sleepers;
    // 实际上下面的代码等价于:
    //
    // tmp = sem->sleeper + &sem->count
    // if (tmp >= 0) {
    //   sem->sleeper --;
    //   break;
    // }
    //
    // 但为了原子操作, 使用了 sem->count 代替上面的 tmp, 导致后面复杂的逻
    // 辑...
    if (!atomic_add_negative(sleepers - 1, &sem->count)):
      sem->sleepers = 0;
      break;
    sem->sleepers = 1;
    schedule();
    tsk->state = TASK_UNINTERRUPTIBLE;
  remove_wait_queue_locked(&sem->wait, &wait);
  tsk->state = TASK_RUNNING;
1.5.1.2. up
        movl $sem->count,%ecx
        lock; incl (%ecx)
        // count + 1, 若 count 大于 0, 返回. 否则唤醒之前等待的进程
        jg 1f
        lea %ecx,%eax
        pushl %edx
        pushl %ecx
        call __up
        popl %ecx
        popl %edx
1:
__up:
  wake_up(&sem->wait);

1.5.2. 信号量与自旋锁

spin lock 与 semaphore 都可以在 MP 上保证同步, 它们的区别是:

  1. spin lock 是通过 spin 来保证同步的, 不会发生进程切换, 所以在大多数的场景都可以使用, 但一般在不允许进程切换的场景下使用 spin lock
  2. 使用 semaphore 时会发生进程切换, 代价比 spin lock 高, 在允许进程切换的场景一般优先使用 semaphore

Footnotes:

3

P/V 操作来源于荷兰语, 因为 Dijkstra 是荷兰人… 其中 P 表示 passeren (通过), V 表示 vrijgeven (释放)

Author: [email protected]
Date: 2016-08-25 Thu 00:00
Last updated: 2024-01-18 Thu 16:43

知识共享许可协议