最前边的这段话其实是我看完整个章节回来写的, 锁的本质就是一个变量, 操作系统通过硬件提供的各种原子指令, 构建出一些原子的操作. 然后将这些原子操作组合就来, 就达成了构建锁的功能.

一旦有了原子操作, 就不用担心了, 只需要最先执行原子操作的线程更改一个状态, 其他线程就都被堵在这个改变的状态上, 直到那个线程再使用原子操作更改那个状态. 其核心思想就是, 通过原子操作控制一个变量的变化, 以达到同时只能让一个线程进入临界区的目的.

话虽如此, 一个一个看起来也不容易, 这里的锁还都是比较简单的互斥锁. 即一堆线程被堵在同一个变量上. 同时也从单一的锁, 逐步过渡到操作系统实际使用的锁管理代码上, 即用一个数据结构存放堵在同一个变量上的所有线程, 然后让那个可以执行临界区的线程同时担负起唤醒其他线程的任务.

  1. 锁与硬件支持
  2. 测试并设置 = 原子交换
  3. 比较并交换
  4. 链接加载和条件存储指令对
  5. 获取并增加
  6. 改进自旋 – Solaris系统中的加锁和唤醒机制
  7. 改进自旋 – Linux的futex机制

锁与硬件支持

我自己想了一下, 锁只是一个形象的称呼, 实际上从前边的代码可以看到, 锁就是一个变量, 用于控制让代码是不是可以继续往下运行(当然, 运行的部分是临界区的代码).

所以其实内部就是一个判断和更新, 用于是否让代码往下执行. 这就是最基本的锁的逻辑.

由于已经知道, 取数, 判断, 更新在我们编写的程序中是无法保证为原子操作的, 因此在系统调用或者说CPU指令中, 就需要提供一些特别的原子操作, 以让对锁的操作可以原子化, 这样之后的代码只需要进行判断即可, 这样就可以使用锁了.

从代码的角度, 会先设置一个全局变量当成锁, 然后的关键, 其实就是pthread_mutex_lock(&lock)pthread_mutex_unlock(&lock)对这个变量做了什么操作.

或者可以简化一点, 就是lock(&lock)unlock(&lock)内部是如何实现的, 继续追究的话, 也就是其中的CPU原子化指令是什么样的.

从这里也可以看出, 就像很多纯粹通过软件想不明白的问题一样, 只要加上硬件的帮助, 立刻就变得方便多了, 这也是哥德尔不完备定理要求我们跳出系统看系统的思想.

测试并设置 = 原子交换

硬件支持之一是一个指令, 叫做测试并设置(test-and-set instruction), 也叫做原子交换(atomic exchange), 对于我们属性的x86系统, 这个指令叫做xchg, 我查了一下, 这确实是一条汇编指令, 所以是原子的.

我还找到下边这段话, 来自于原子操作与 x86 上的 lock 指令前缀:

在所有的 X86 CPU 上都具有锁定一个特定内存地址的能力,当这个特定内存地址被锁定后,它就可以阻止其他的系统总线读取或修改这个内存地址。这种能力是通过 LOCK 指令前缀再加上下面的汇编指令来实现的。当使用 LOCK 指令前缀时,它会使 CPU 宣告一个 LOCK# 信号,这样就能确保在多处理器系统或多线程竞争的环境下互斥地使用这个内存地址。当指令执行完毕,这个锁定动作也就会消失。

能够和 LOCK 指令前缀一起使用的指令如下所示:

BT, BTS, BTR, BTC   (mem, reg/imm)
XCHG, XADD  (reg, mem / mem, reg)
ADD, OR, ADC, SBB   (mem, reg/imm)
AND, SUB, XOR   (mem, reg/imm)
NOT, NEG, INC, DEC  (mem)
注意:XCHG 和 XADD (以及所有以 ‘X’ 开头的指令)都能够保证在多处理器系统下的原子操作,它们总会宣告一个 “LOCK#” 信号,而不管有没有 LOCK 前缀。

可见在汇编中只要使用了XCHG, 就张会自动带上LOCK, 阻止其他系统总线修改这个内存地址.

那么这个指令的具体内容是什么呢, 很简单, 也就是交换两个位置, 由于交换之后, 原本出现在传入值的位置的地方出现的是交换后的结果, 所以实际执行类似如下代码:

int test_and_set(int *old_ptr, int new){
    int old = *old_ptr;
    *old_ptr = new;
    return old;
}

这里一定要注意, 这个代码要看成伪代码, 如果我们自己写这个代码, 不会是原子的, 实际上的pthread中的lock函数直接以汇编写成, C语言中无法写出原子性的代码.

为了说明原理, 现在我们需要假设test_and_set这个函数是原子的, 基于这个操作, 就可以写出来lock()函数:

//需要将锁初始化为0

void lock(lock_t *lock){
    //去测试和更新是不是1
    while(test_and_set(lock, 1) == 1){
        //自旋, 什么也不做
    }
    //成功往下执行, 即lock()函数正常结束, 进入线程自己的临界区代码
}

解锁的代码也很简单, 将锁重置为0.

void unlock(lock_t *lock){
    *lock = 0
}

现在看一下我们的代码, 当有若干个线程同时启动的时候, lock函数中的第一个执行的代码其实就是test_and_set函数, 这是一个原子操作.

那么无论哪个线程, 总有一个线程第一次执行完了test_and_set, 此时锁的值是1, 然后返回0, 就算此时该线程立刻被打断, 其他所有线程也都需要执行test_and_set函数, 结果就是除了这一个线程之外, 其他的所有线程都返回1.

哪怕有再多的打断, 也只会有返回0的那个线程, 成功从while()循环中逃脱, lock()函数执行完毕, 线程进入到临界区, 哪怕再被打断, 其他的线程还是会卡在while()上无法执行结束.

这里还需要知道的是, 只有在指令之间才会插入中断, 一条指令一旦执行, 就会执行, 不会在汇编指令的中间被打断.
最终返回0的线程进入unlock函数, 无所谓打断, 反正最后只能由这个线程将锁变量置为0. 哪怕在置为0之后的一瞬间被打断, 其他线程也会执行test_and_set函数, 因为原子操作,结果又是一个线程拿到了锁, 即test_and_set成功返回0, 然后其他线程又会卡在while()上.

这就实现了互斥锁.

当然, 这里依然要注意, 所以的代码只是描述了如何工作, 实际的锁函数直接以汇编写成, 无法以C语言写成.

这里通过原子交换, 以及让线程卡在自旋等待的锁, 成为自旋锁.

注意自旋锁的特点是不会去让出CPU, 所以不能简单的使用, 必须通过带中断的调度器, 定期打断自旋占用CPU, 这样才能够保证其他线程有运行机会.

比较并交换

有了上边的自旋锁的概念, 再看其他的锁实现就容易多了, 其本质也是利用各种原子操作, 将保证只有一个线程能够进入临界区域, 只不过采取的具体手段不同而已.

这次的指令是比较并交换(compare-and-exchange), 简称为CAS, 其汇编指令是CMPXCHG, 其C语言伪代码如下:

//这个函数全部原子化操作
int cas(long *addr, long old, long new)
{
    int result = *addr;
    if(*addr == old)
        *addr = new;
    return result;
}

这个函数的意思是:*addr是内存中某个区域, new是想把这个内存区域更新为的新值, old是这个内存区域原来的值. 在进行更新的时候, 先比较一下这个内存区域是不是被更新了, 如果被更新了, 就不写入新值. 如果没有被更新, 就写入新值. 返回值则总是交换之前原来的内存区域的值.

也就是说, 如果知道一个内存区域的旧值的话, 就可以原子性的根据内存区域当前的值, 来决定要不要将其更新为新值.

根据这个函数, 只需要将其替换一下原来的锁函数中的判断, 注意参数即可创建lock函数:

//需要将锁初始化为0

void lock(lock_t *lock){
    //比较原来的值是不是0, 如果是0就更新成1, 如果原来的值是0就返回0. 如果原来的值是1就返回1.
    while(cas(lock, 0, 1) == 1){
        //自旋, 什么也不做
    }
    //成功往下执行, 即lock()函数正常结束, 进入线程自己的临界区代码
}

解锁的代码也很简单, 将锁重置为0.

void unlock(lock_t *lock){
    *lock = 0
}

这个指令的功能要比测试并设置强大, 不过目前我还没看出来哪里强大了. 应该是在参数的设置上可以更加灵活. 分析的逻辑和原来基本上一样, 一开始锁为0的时候, 只有最先执行cas的线程可以返回0, 从而越过while()循环, 其他的cas返回的结果都是1, 从而卡在while()循环上.

对于自旋锁来说, 使用这个指令和上一个测试并设置指令的效果是完全一样的. 就看后边如何能把这个指令用出其他的效果.

链接加载和条件存储指令对

这个一般称为load-linked/store-conditional instruction pair, 简称为LL和SC指令对. X86架构无此指令, 根据wiki的说法, 存在于如下硬件体系中:

Alpha: ldl_l/stl_c , ldq_l/stq_c
PowerPC: lwarx/stwcx , ldarx/stdcx
MIPS架构: ll/sc
ARM架构: ldrex/strex (ARMv6 , v7), ldxr/stxr (ARM version 8)
RISC-V: lr/sc

这两个指令必须成对使用, 第一个LL指令从存储器中加载一个值, 下一个SC指令(两个指令之间可能间隔很多指令)去判断该存储器在这些指令之间是否有改变, 如果没有改变, 会往其中写入新值, 如果已经改变, 则不会写入新值.

很显然这两个操作和硬件密切相关. 这个指令如果从高级语言的角度去理解, 比较困难, 只能认为硬件层面在执行了LL指令之后, 硬件会去跟踪那个地址的状态, 从而在执行SC指令的时候可以得到正确的判断结果. 所以wiki里也写了LL和SC有时候不能嵌套, 嵌套的有特殊用处.

这里我自己想了一下, 从纯软件的角度, 跨了很多函数调用之后, 确实很难跟踪某一个内存区域的变动, 但是一对LL和SC指令即可, 只能理解为硬件可能在高级语言无法控制的寄存器中保存了地址和标志位. 确实有意思啊, 果然软硬一体才是真谛.

LL指令的伪代码如下:

int ll(int *ptr){
    return *ptr;
}

SC指令的伪代码如下:

int sc(int *ptr, int new){
    //如果ptr未改变, 更新其值为new, 然后返回1, 表示成功
    if(*ptr has not changed){
        *ptr = new;
        return 1;
    //返回0, 表示更新失败, 什么也没做
    } else {
        return 0;
    }
}

用这个来实现lock就有点意思了, 我反正想了半天, 最后发现就是按照简单逻辑写下来就可以:

//需要将锁初始化为0

void lock(lock_t *lock){
    while(1){

        //LEVEL1 打断

        while(ll(lock) == 1){

        //LEVEL2 打断
            //自旋
        }

        //LEVEL3 打断

        if(sc(*lock,1) == 1){
            return;
        }

    }
}

解锁的代码不变, 将锁重置为0.

void unlock(lock_t *lock){
    *lock = 0
}

一开始LL必须和SC配对的思想迷惑了我, 后来发现书中的例子是执行LL之后实际上是更新那个隐藏的寄存器以供下一次SC判断.

线程的行为分析就有意思了, 在伪代码中我标记了三个可以打断的点. 一批线程刚开始运行的时候, 会有若干线程同时执行ll, 然后都得到了0, 因此LEVEL3这一层, 其实会有若干个线程都闯入.

不过遗憾的是只有一个线程能最先执行SC指令, 第一个执行SC指令的线程因为LL指令到现在锁变量0没有被修改, 所以成功将其更新为1,并且返回1 , 所以就成功到达了return语句, lock执行完毕, 这个而线程就进入了临界区.

此时其他LEVEL3的线程, 再去执行SC, 结果发现从自己的LL到SC之间内存被修改, 就失败返回0, 然后又返回到循环的起点.

一开始的线程进入不了LEVEL2, 只可能在LEVEL1和3, 因为在第一个执行完SC的线程之后, 才可能进入到LEVEL2区域. 随着继续执行, 所有LEVEL3的线程都会逐步重新回到LEVEL1, 然后一部分线程在ll的时候就会进入LEVEL2开始自旋.

之后进入到LEVEL3的情形与原来相同.

说实话这个LL和SC究竟需要配对还是可以跨线程, 书上没有明说. 导致还是有点糊涂的.

获取并增加

这个指令在前边列出来过, 就是XADD, 先交换两个操作数的值,再进行算术加法操作。多处理器安全,在80486及以上CPU中支持。使用的过程中自动相当于加了LOCK.

伪代码如下:

int fetchAndAdd(int *ptr){
    int result = *ptr;
    *ptr = result + 1;
    return result;
}

这个方法给旧值+1并且返回旧值. 由于是自增, 可以实现很有意思的锁, 书上有一个例子:

//定义一个锁结构变量
typedef struct lock_t {
    int ticket;
    int turn;
} lock_t;

//初始化锁结构变量中的ticket和turn都是0

lock(lock_t *lock){
    //对ticket原子加1, 返回原来的值
    int myturn = fetchAndAdd(&lock->ticket);

    //判断锁里的turn是不是等于我的顺序, 是的话就进入临界区, 不是的话继续等待.
    while(lock->turn != myturn){
        //自旋
    }
}

void unlock(lock_t *lock){
    fetchAndAdd(&lock->turn);
}

跟踪这个就很有意思了. 一开始第一个线程会去fetchAndAdd ticket变量, 只要执行了以后, 不管之后还有多少线程, 每个线程获得的值都不同. 然后只有一个线程, 也就是第一个线程的号码=0, 因此可以通过临界区.

执行完毕之后, 会把turn加1, 此时就只有持有myturn = 1的变量进入临界区, 依次类推, 所有排队的线程, 都有机会进入临界区. 锁的ticket和turn的原子操作保证了不会出问题. 所有排队的线程, 也最终都会有机会执行临界区的代码.

改进自旋 – Solaris系统中的加锁和唤醒机制

自旋的话由于运行速度太快, 每次都会耗一些时间片, 因此不太合理. 改进自选需要操作系统的支持, 即操作系统提供一种机制, 让线程主动出让CPU.

一般的现代操作系统都会提供一些陷入内核的方法, 让线程通知调度程序结束自己的运行. 不过如果是随机运行, 则某些线程可能饿死, 因此操作系统内部一般不是简单的任由线程调用函数, 而是会进行额外的处理.

最常见的处理方式就是操作系统会用一个队列来保存所有等待锁的线程, 在收到某个线程的信号时, 调度程序会按照自己的考虑来运行其他线程.

对于锁来说, 则需要将锁和唤醒机制结合起来, 即一个线程如果拿不到锁, 就要让其睡眠, 而锁空出来的时候, 则必须去唤醒线程让其继续执行.

书这里举了一个例子, 使用测试并设置, 这个例子让我发现, 临界区代码不一定就是lock()与unlock()之间的代码, 应该说原子指令测试通过之后的语句, 即使在lock()函数内, 线程也可以不慌不忙的执行, 因为这也是临界区代码, 不会有其他线程来竞争.

//创建一个锁数据结构, 其中有两个int flag 和 guard, guard相当于原来的互斥锁变量, queue_t是存放线程号的队列. flag下边来看
typedef struct lock_t {
    int flag;
    int guard;
    queue_t *q;
} lock_t;

//初始化, 所有变量为0, 然后创建一个队列
void lock_init(lock_t *m){
    m->flag = 0;
    m->guard = 0;
    queue_init(m->q);
}

//lock函数, 如果成功进入到临界区的时候, 还需要做点事情
void lock(lock_t *lock){
    while(test_and_set(&lock->guard, 1) == 1){
        //自旋, 什么也不做
    }
    //只要通过这里, 就可以认为是临界区了

    //如果flag=0, 将其设置为1, 然后恢复guard为0 , 可以看到这其实相当于用guard给flag上了一个锁, 而flag此时又标记着有没有线程在临界区.
    if(lock->flag == 0){
        lock->flag =1;
        lock->guard = 0;
    } else {
        queue_add(m->q, gettid());
        lock->guard = 0;
        //这个调用让线程休眠
        //危险打断点
        park();
    }
}

这个lock函数先是使用了一个互斥锁guard, 用于保护flag变量和添加进队列的操作. 然后用flag变量来标记是否有线程进入了临界区域. 如果还没有, 则线程会将flag设置为1, 并且释放锁, 继续运行临界区域. 其他线程拿到guard锁, 依然会停在flag判断的地方.

如果发现flag=1, 即已经有线程在临界区, 这个线程就会把自己的线程号加入队列, 然后调用park()来进行休眠.

再来看unlock():

void unlock(lock_t *lock){

    //也是先尝试去获取guard锁, 注意这和原来直接设置互斥锁为0不同. 因为guard锁保护着flag和队列, 因此对flag和队列的操作需要去拿到guard锁
    while(test_and_set(&lock->guard, 1) == 1){
        //自旋, 什么也不做
    }

    //如果队列为空, 即没有任何等待中的线程, 将flag设置为0
    if(queue_isEmpty(lock->q){
        lock->flag = 0;

    //如果队列不为空, 在一个线程结束之后, 会执行unlock, 去唤醒其他的线程
    } else {
        //从队列中弹出一个线程id并且唤醒这个线程id
        unpark(queue_remove(lock->q));
    }

    //解除guard锁
    lock->guard = 0;

}

可以看到, 使用了额外的flag变量用于表示是否有线程在等待, 由于操作flag和queue也需要保证原子性, 因此在外边又加了一把锁就是guard.

因此在lock和unlock中, 可以看到各自又是一个加锁过程, 仅仅只有先拿到guard锁, 再拿到flag锁然后释放guard锁的情况下, 线程才会进入临界区. 而一旦进入临界区之后, 最早执行到unlock的线程又会去唤醒其他等待的线程, 如果还有等待的线程, 会先执行完, 才有可能让新的线程继续拿到flag锁.

这里首先要解释的就是为什么要额外添加一个flag, 看上去是两个锁, 这是因为实际线程在执行的时候, 到了park()函数, 所谓休眠就是被系统挂起, 下一次被唤醒的时候, 就相当于从park()中返回, 然后继续执行.

但是在进入睡眠之前, 由于必须让其他线程来更新flag和队列, 因此让出了guard锁, 其睡醒之后, 状态和之前相同, 如果没有flag锁, 那这个线程此时就没有任何锁, 变得非常危险.

这并发代码如果自己写, 还真够难想的啊……不过这里有一个问题, 看上边lock函数中的危险打断点注释, 如果队列中已经有一个线程, 当前线程A运行到这里, 说明队列中有一个睡着的B, 还有自己A, 然后不在队列中有一个已经拿过guard并现在持有flag要进入临界区的C.
A运行到这里, 释放了guard锁, 但是A线程还没有睡眠, 这个时候如果C线程运行完毕, 唤醒了B, B运行完毕, 发现队列还有一个A, 就去唤醒A.

但此时A还没有睡眠, 就会出现问题, A的线程号已经从queue中移除, 但是执行了park(), 结果导致再也没有其他线程能唤醒A. 所以又添加了一个系统调用setpark(). 线程调用这个函数之后, 如果其他线程刚刚尝试唤醒自己, 再调用park()想睡觉的时候, 就不会真的睡, 而会立刻返回.

lock函数修改如下:

void lock(lock_t *lock){
    while(test_and_set(&lock->guard, 1) == 1){
        //自旋, 什么也不做
    }
    //只要通过这里, 就可以认为是临界区了

    //如果flag=0, 将其设置为1, 然后恢复guard为0 , 可以看到这其实相当于用guard给flag上了一个锁, 而flag此时又标记着有没有线程在临界区.
    if(lock->flag == 0){
        lock->flag =1;
        lock->guard = 0;
    } else {
        queue_add(m->q, gettid());
        setpark();
        lock->guard = 0;
        //即使这里被打断, 被其他线程在queue中叫醒, 再执行park()也不会睡眠.
        park();
    }
}

这个新增的橙色语句就可以防止睡死, 由于setpark()添加在释放guard锁之前, 所以可以安心的保证每一个线程在调用park()之前都已经调用了setpark(), 就不怕park()之前被打断了.

改进自旋 – Linux的futex机制

Liunx使用了一系列原子化的函数, 组成了一个futex机制, 用于等待和唤醒线程. 先看一下互斥锁的加锁函数:

void mutex_lock(int *mutex){
    int v;

    //原子的比较并设置, 作用是取出mutex的31位也就是最高位并尝试设置为1(这个函数的set意思就是设置成1), 检测是不是0, 如果是0, 说明没有锁, 就会返回, lock函数结束, 进入临界区, 而且此时mutex最高位就是1
    if(atomic_bit_test_set(mutex, 31) == 0){
        return;
    }

    //如果不通过, 说明当前有锁, 需要想办法让线程睡眠
    //刚才的solaris是在临界区执行的再用一个锁来标记睡眠. Linux不是在临界区执行, 而是在没取到锁的时候执行

    //断点1

    //策略是第一次没有拿到锁的话, 去增加mutex1, 表示多了一个线程在等待. 然后让线程进行自旋, 如果拿到锁, 肯定要return, 但是在return之前去减少一次mutex的计数.
    atomic_increment(mutex);

    //断点2
    while(1){
        if(atomic_bit_test_set(mutex, 31) == 0){
            atomic_decrement(mutex);
            return;
        }

        //自旋中又没拿到锁, mutex为负数, 此时要确认一下mutex是不是真的是负数, 如果真的有锁, 就尝试睡眠.
        //这里没有原子操作, 如果出现不相等(很可能, 因为其他线程更新mutex), 那就不睡了, 正好继续去抢, 如果相等, 说明某个线程一直在临界区或者线程很少, 都停止了争抢, 就休眠.
        v = *mutex;
        if(v > 0){
            continue;
        }
        //断点3
        futex_wait(mutex, v);
    }
}

这个锁的机制我想了一会,假如有三个线程.有一个线程A进了临界区, 剩下的2个线程BC都会增加mutex, 然后等于2, 只要A还没运行完, BC都会在自旋中, 然后测试V, 这时候无所谓原子与否, 因为都是取数.

如果这个时候v大于0 ,哪怕v的值不够新, 也说明锁可能被释放了, 因此就continue继续取得锁. 假如A一直没有从临界区出来, 那么剩下的线程全部都在取数然后比较mutex和v, 正常情况下虽然会出现不相等的情况, 但逐渐都会睡眠.

一旦A从临界区中出来, 那要看一下unlock代码, 不过这里可以想象的是, unlock肯定会把锁的31位给改成0:

void mutex_unlock(int *mutex){

    //这个作用是将1加到mutex的最高位上, 然后检查结果是不是0. 很显然, 这里首先必定会释放最高位(作为锁, 因为只有一个线程能进入到执行unlock), 然后会检查是不是0, 只有剩下的31位也都是0, 即线程队列中已经没有线程, 才会直接返回, 否则就会通知内核去唤醒一个线程.
    if(atomic_add_zero(mutex, 0x80000000)){
        return;
    }

    //执行到这里的话, 之前的atomic_add_zero已经释放了作为锁的最高位变量, 可以唤醒等待队列中的一个线程了.
    futex_wait(mutex);
}

仔细分析了Linux的实现, 其实也是差不多的情况, 一开始一个线程抢到锁, 其他的线程要么陷入自旋中等待, 要么就压根还没有开始执行.

随着继续执行, 更多线程都陷入自旋中. 陷入自旋中很好理解, 必定会睡眠或者自旋一下在睡眠. 主要就是一个线程执行完了, 后边一个线程在断点1或者2怎么办, 可以发现即使某个线程抢到锁并且执行完了unlock, 有线程卡在断点1或者断点2继续执行, 都不会影响, 因为默认会自增然后再去拿锁, 实际上是用两次拿锁来将这一部分来合理解决掉.

如果是在进入睡眠之前的断点3, 另外一个线程执行完毕, 也唤醒了一个线程, 即使这个线程没有睡眠, 也无所谓, 因为一旦其返回, mutex和v的值就不相等了, 因为解锁的时候修改了mutex, 所以就不会睡眠, 会立刻再去抢锁.

第二次又没拿到也无所谓, 要么自旋要么休眠, 各个地方断开也不影响, 因为用原子自增和自减操作了mutex的低31位, 其实相当于mutex的最高位和剩下31位都是一种锁.

从这里也可以看出, 从完全自旋到加上唤醒机制, 要额外解决的问题就是线程永远无法被唤醒的问题. Linux和Solaris是两个不同的方向解决这个问题.

并发这个东西不太好分析的原因就是可能断开的地方太多, 必须要考虑各种情况才可以. Linux相比Solaris的三个原语, 其实使用了更多的函数, 这其中的原子增加和减少也是必不可少的函数.