1. 线程
  2. 线程包的接口
  3. 多线程echo server
  4. 多线程程序中的共享变量
  5. 信号量 – 线程同步的理论
  6. 信号量 – 接口
  7. 信号量 – 二元信号量
  8. 信号量 – 生产者/消费者模型
  9. 信号量 – 读者/写者模型
  10. 线程实际使用 – 预线程服务器

线程

线程有点像上边两个方法的混合, 既有多个逻辑流, 又可以共享整个地址空间.

线程运行在进程上下文中的逻辑流, 现代操作系统允许一个进程里同时运行多个线程. 线程也由内核自动调度, 每个线程都有自己的线程上下文, 包括栈, 栈指针, 程序计数器和寄存器. 注意是栈, 而内存空间是共享的, 也就意味着代码区, 数据区, 堆, 共享区, 文件描述符都是共享的.

内核通过一个整数ID来识别线程. 线程没有父子之分, 每次创建线程, 都是一个对等的线程. 每个进程开始的时候只有一个线程, 叫做主线程, 之后创建的线程对于主线程来说都是对等的线程.

一旦创建线程, 两个线程就并发的运行. 直到全部结束为止. 但是如果这之前进程被结束, 则所有的线程也都被销毁. 所谓对等, 就是指每个线程都可以干掉其他线程, 或者等待任意其他线程终止, 这点与进程符合父子关系是完全不同的.

C语言的多线程支持在1995年出现, 在所有的Linux系统上都可用, 库名是pthread, gcc编译的时候调用 -lpthread 来使用这个库.

来看一个最简单的多线程例子, 然后来看这个接口的详情:

void *thread(void *vargp);

int main(){
    pthread_t tid;
    pthread_create(&tid, NULL, thread, NULL);
    pthread_join(tid,NULL);
    exit(0);
}

void *thread(void *vargp){
    printf("No.%d thread prints Hello world!\n", pthread_self());
    return NULL;
}

这个程序主线程启动了一个新线程, 新线程打印自己的线程号和Hello world. 这个新线程执行的内容就是thread函数中的内容, 这个函数也叫做线程例程.

可以看到, 这个线程例程的参数一定是一个通用指针, 返回值也是通用指针, 因此如果要给线程传参数, 需要传入一个指针, 传多个参数就需要传递一个结构.

线程返回的时候如果需要返回多个结果, 可以返回一个指向一个结构的指针.

线程包的接口

线程包的几个接口就是上边的程序那样, 创建线程, 终止线程, 以及回收线程的资源, 此外还有改变线程的结合性和初始化线程的接口.

创建线程的函数:

#include <pthread.h>
//定义一个参数是void *, 返回值也是void * 的函数叫 func
typedef void *(func)(void *);
//成功返回0, 不成功返回非零数
int pthread_create(pthread_t *tid, pthread_attr_t *attr, func *f, void *arg);

其中第一个参数是tid类型的指针, 函数会在其中填入线程号. 第二个参数是用来改变新创建线程的默认属性.

第三个参数就是线程实际要做的事情也就是例程, 传入那个函数指针. 第四个参数是输入变量, 这个 arg 参数就会被当成例程的参数.

pthread_create 函数返回的时候, 在线程的上下文就会开始运行例程f. 新线程可以调用pthread_self来获取自己的线程ID:

#include <pthread.h>

pthread_t pthread_self(void);

线程的终止有如下几种情况:

  1. 线程例程返回的时候, 线程会隐式的终止
  2. 调用pthread_exit函数, 线程会显式的终止. 如果主线程调用这个函数, 会等待其他所有对等线程终止, 再终止主线程和整个进程, 这个函数返回值为 thread_return .
            #include <pthread.h>
            //成功返回0, 不成功返回非零
            void pthread_exit(void *thread_return);
    
  3. 对等线程调用Linux系统的exit函数, 进程会立刻终止, 所有线程也一起挂了
  4. 一个对等线程通过pthread_cancel(tid)来终止ID为tid的线程.

回收已终止线程的资源采用pthread_join函数:

#include <pthread.h>
//成功返回0, 不成功返回非零
int pthread_join(pthread_t tid, void **thread_return);

一个线程执行这个函数的时候, 这个线程会阻塞, 直到tid线程终止, 之后会将线程例程返回的通用指针赋值为thread_return指向的地方, 然后回收线程占用的所有内存资源.

与wait函数不同的是, 一个这个函数只能等待指定为tid的线程终止, 不能等待一批线程终止. 这使得多线程的代码更加复杂了. 因为要等待指定的线程, 很显然不能只调用一次这个函数.

线程还是可结合的或者可分离的. 一个可结合的线程是默认情况, 可以被其他线程收回或者杀死. 在回收之前, 栈是不释放的. 而分离的线程无法被其他线程回收或者杀死, 内存资源在它终止的时候由系统自动释放.

为了避免内存泄露, 每个线程要么可结合,然后被显式的回收, 要么通过调用pthread_detach被分离:

#include <pthread.h>
//成功返回0, 不成功返回非零
int pthread_detach(pthread_t tid);

现实程序中采用分离线程比如Web服务器, 每次收到新连接创建一个分离线程, 然后无需等待该线程终止, 就立刻做其他事情. 分离线程在返回响应之后就被系统回收了.

可以采用pthread_once来初始化线程例程相关的状态:

#include <pthread.h>

pthread_once_t once_control = PTHREAD_ONCE_INIT;

//固定返回0
int pthread_once(pthread_once_t *once_control, void (*init_routine)(void));

这个函数用于动态初始化多个线程共享的全局变量. once_control是一个全局或者静态变量, 必须固定赋值为PTHREAD_ONCE_INT这个宏, 第一次调用pthread_once的时候, 会调用第二个参数中的函数, 之后如果再调用, 就什么都不做, 就可以起到初始化的作用.

多线程echo server

多线程的程序与多进程有着明显的不同, 通过将echo server 改造成多线程来看一看其中的奥妙:

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include "csapp.h"

#define MAXLINE 8192

void echo(int connfd);

//线程例程声明
void *thread(void *vargp);

int main(int argc, char **argv) {
    //一个整数的指针connfdp, 用于指向已连接描述符的地址
    int listenfd, *connfdp;
    socklen_t clientlen;
    struct sockaddr_storage clientaddr;

    //声明线程id
    pthread_t tid;

    if (argc != 2) {
        fprintf(stderr, "usage: %s <port>\n", argv[0]);
        exit(0);
    }

    listenfd = Open_listenfd(argv[1]);

    while (1) {
        clientlen = sizeof(struct sockaddr_storage);
        //每次申请新内存, 然后在新的内存里装入新的已连接描述符
        //申请一块新的内存
        connfdp = Malloc(sizeof(int));

        //accept函数返回的connfdp存放在新的区域里, 这样可以确保每个线程分配到的connfdp指针指向不同的内存区域.
        *connfdp = Accept(listenfd, (SA *) &clientaddr, &clientlen);

        //启动新线程
        Pthread_create(&tid, NULL, thread, connfdp);
    }
}

//线程例程的实际执行代码
void *thread(void *vargp){
    //强制转换vargp的类型, 然后取出传入的connfdp指针的值, 就是新的已连接描述符
    int connfd = *((int *) vargp);
    //将自己变成分离线程, 为了结束的时候可以回收内存
    Pthread_detach(pthread_self());
    //释放vargp指向的内存, 也就是本来存放当前线程对应的已连接描述符的内存
    Free(vargp);
    echo(connfd);
    Close(connfd);
    return NULL;
}


void echo(int connfd){
    size_t n;
    char buf[MAXLINE];
    rio_t rio;

    Rio_readinitb(&rio, connfd);

    while ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
        printf("server received %d bytes \n", (int) n);
        Rio_writen(connfd, buf, n);
    }

}

这里特别要注意标红色的部分, 以前我们是直接将描述符的值传递给子进程, 在I/O多路复用中也是直接使用的已连接描述符的值. 但是在这里如果用同一个局部变量传递值, 就会出现问题, 由于并发的顺序不可预测, 很有可能新的accept函数还没执行之前, 就将旧的已连接描述符的值传递给了线程例程.这样会导致两个线程读同一个描述符, 出现不可预料的情况.

所以必须要传递一个指针. 但是传递指针的时候还需要解决另外一个问题, 就是不能传递每次都指向同一个位置的指针, 否则依然没有解决竞争问题.

这里采用了每次都去新申请一块空间, 然后在例程中传入这个指向新空间的指针. 在将其中的值取出之后, 释放这块空间的做法. 这样就保证了每个例程在直到不释放这个空间之前, 每个例程的指针都不同, 而释放之后, 例程也不再使用这个空间, 这样就避免了竞争问题.

这里还要理解, 例程的局部变量是放在栈里的, 每个线程都是独立的, 不会互相影响.

练习12.5 为何只在线程例程中关闭了已连接描述符?

由于所有的线程共享进程的内容, 而描述符是跟着进程走的, 所以一个线程里的描述符也是进程的描述符, 其引用数量也是按照进程来的, 所以就是1, 只要一个线程关闭了, 文件表项就彻底关闭了.

多线程程序中的共享变量

在上一个简单的多线程服务器中, 看上去对于每个例程分配了独立的存放已连接描述符的内存空间, 但实际上所有例程依然有共享的部分, 那就是文件描述符. 所以如果同时有很多个连接进来但没有终止连接, 文件描述符会由于超过用量而导致连接失败.

除此之外, 更常见的工作是使用多个线程对一个数据结构进行并行的计算, 很显然这些线程要共享这个数据, 但是这可能会导致一系列潜在的错误, 这就必须先了解线程相关的一些理论.

再来仔细的看看线程的内存使用:

  1. 每个线程独立的使用线程ID, 栈, 栈指针, 程序计数器, 条件码 和通用目的寄存器
  2. 所有线程共享进程上下文的剩余部分, 也就是整个虚拟地址空间, 包括只读代码区, 数据区, 堆, 共享库代码
  3. 所有的线程对外都是一个进程, 因此也共享操作系统为进程提供的上下文, 其中最核心的就是文件描述符

这里比较特别的就是栈, 虽然每个线程的栈在运行的时候独立使用, 但由于栈也是在虚拟内存空间中, 因此在运行的时候, 只要知道指针, 一样可以读写. 寄存器和其他进程上下文由于是操作系统给设置的, 所以相对独立.

通过一个程序来看看共享的部分:

#include "csapp.h"
#define N 2

void *thread(void *vargp);

char **ptr;

int main(){
    int i;
    pthread_t tid;
    char *msgs[N] = {
            "Hello from foo",
            "Hello from bar",
    };

    ptr = msgs;

    for (i = 0; i < N; i++) {
        Pthread_create(&tid, NULL, thread, (void *) i);
    }
    sleep(1);
    printf("tid is %u\n", tid);
}

void *thread(void *vargp){
    int myid = (int) vargp;
    static int cnt = 0;
    printf("[%d]: %s (cnt = %d)\n", myid, ptr[myid], ++cnt);
    return NULL;
}

这段代码强制转换指针和int的时候, gcc 会报warning, 可以用 -w 选项来压制警告.

通过代码的运行结果, 可以看到如下共享变量的情况:

  1. char **ptr;是全局变量, 显然被共享, 两个线程都从其中获得了自己要输入的字符串
  2. main函数中声明的变量虽然这些是局部变量, 把线程看做一个过程的话, 这些过程都被main函数调用, 这些局部变量, 也只有一个副本, 被所有线程共享.
  3. 线程的局部变量, 这个要首先从程序结构而不是并发上去理解, myid是局部非静态变量, 所以只被当前过程使用, 过程变成线程例程的时候, 各个例程使用各自的myid局部变量, 这在输出结果中也能看到, myid是不同的.
  4. static int cnt, 依然要先从程序结构去判断, 在之前章节已经知道, 局部的静态变量会存储在一个单独的位置, 过程结束依然存在. 这里也是这样, 两个线程共享线程代码中定义的静态变量, 从输出结果中就可以看到.

练习 12.6 填写上边程序中各个变量的共享情况

变量实例 是否可被主线程引用 是否可被线程0引用 是否可被线程1引用
ptr
cnt
i.m
msgs.m
myid.p0
myid.p1

i.m这看如何理解了, 这个示例程序实际上向新线程传递了i的值, 我认为也算引用. CSAPP的答案是不引用. 实际上可以传入指针, 我就觉得是引用. 只是这个程序没传而已, main函数的局部变量对于线程都是可见的, 因为全部可以在创建新线程的时候将指针传给线程.

根据分析, ptr, i, msgs 这些由所有线程共享. cnt 由 主线程以外的线程共享, myid由各个线程单独使用.

总结下来就是, 全局变量和主线程的局部变量可以由所有线程共享, 线程内部的静态变量由主线程以外的线程共享, 线程内部的局部变量由各个线程自己玩耍. 但获取了具体指针之后, 由于都是同一个进程, 可以突破上述限制.

信号量 – 线程同步理论

由于在具体操作共享变量的时候, 一个线程取数, 计算, 写回并不是一条指令完成的, 所以就可能出现错误:

#include "csapp.h"

void *thread(void *vargp);

volatile long cnt = 0;

int main(int argc, char **argv){
    long niters;
    pthread_t tid1, tid2;

    if (argc != 2) {
        printf("usage: %s <niters>\n", argv[0]);
        exit(0);
    }

    niters = atoi(argv[1]);

    Pthread_create(&tid1, NULL, thread, &niters);
    Pthread_create(&tid2, NULL, thread, &niters);

    Pthread_join(tid1, NULL);
    Pthread_join(tid2, NULL);

    if (cnt != 2 * niters) {
        printf("BOOM! cnt = %ld\n", cnt);
    } else {
        printf("OK cnt = %ld\n", cnt);
    }
}

void *thread(void *vargp){
    long i, niters = *(long *) vargp;

    for (i = 0; i < niters; i++) {
        cnt++;
    }
}

这个程序使用两个线程将cnt按照参数的大小自增, 如果运行正常的话, 很显然最后cnt的值应该是参数*2, 但当参数比较大的时候, 程序得不到正确的结果.

这是因为第i个线程操作cnt++的时候, 实际上机器指令分成了三步:

  1. 取数步骤Li, movq cnt(%rip), %rdx, 从全局变量cnt的地址中将值装入寄存器%rdx
  2. 自增步骤Ui, addq $1, %rdx, 将%rdx自增1
  3. 写回步骤Si, movq %rdx, cnt(%rip), 将%rdx中自增1后的值写回cnt

问题就在于, 这个过程不是一次性完成的, 实际的步骤取决于操作系统的调度, 很有可能L0,U0,S0在执行的过程中, 被另外一个线程的L1,U1,S1穿插进来, 穿插的结果可能正确也可能错误.

比如考虑如下一个序列, 在cnt=10的时候: L0,U0,L1,S0,U1,S1, 在L0和U0执行完毕的时候, cnt还是10, 这时候L1来读取了10, 之后S0写入了11, U1,S1结束之后, 依然写入了11. 这样本该从10变成12的cnt, 就变成了11. 这就是为什么出错的执行结果, 一定会比原来的cnt*2要小的原因.

练习题 12.7 根据指令顺序完成下表

步骤 线程 指令 %rdx1 %rdx2 cnt
1 1 H1 0
2 1 L1 0 0
3 2 H2 0 0
4 2 L2 0 0 0
5 2 U2 0 1 0
6 2 S2 0 1 1
7 1 U1 1 1 1
8 1 S1 1 1 1
9 1 T1 1 1 1
10 2 T2 1 1 1

这个程序没有产生一个正确的cnt值, 因为两个线程在更新cnt前都读取了更新前的cnt值, 导致本该被增加2的cnt只增加了1.

通过这个分析, 可以发现, 如果想要让程序获得正确的值, 就要像单线程的运算一样, 必须在完成一次 Li-Ui-Si 的整体操作之后, 才能允许开始另外一个 Li-Ui-Si 操作, 我们称一个(Li-Ui-Si)指令集合构成了一个关于要操作的cnt变量的临界区, 临界区不能有重叠, 在临界区内, 必须保证线程对变量的互斥访问. 如果将两个线程的循环指令画成笛卡尔图, 那么指令产生交错的部分就是不安全的区域.

练习 12.8 使用进度图下列轨迹线是否安全?

  1. H1,L1,U1,S1,H2,L2,U2,S2,T2,T1
  2. H2,L2,H1,L1,U1,S1,T1,U2,S2,T2
  3. H1,H2,L2,U2,S2,L1,U1,S1,T1,T2

可以画出三个图来看:

笛卡尔进度图

通过图可以发现, 第一种和第三种情况安全, 第二种不安全.

为了解决这个问题, 需要有一种机制, 即保证一个完整的(Li-Ui-Si)可以一次性操作结束. 但由于操作系统随时会打断指令的执行, 因此需要额外的东西来实现互斥, 这就是信号量.

信号量 – 实际操作

信号量是一个特殊的全局变量, 是一个非负整数. 要操作这个全局变量, 只能由两种特殊的行为来操作, 分为P操作和V操作. 对于信号量s:

  1. P操作: 检查信号量, 如果非0, 将s减1, 然后立刻返回(即不阻塞, 执行后续代码); 如果是0, 就挂起当前线程, 直到s不再是0.
  2. V操作: 将s加1, 然后检测是不是有任何线程阻塞在P操作, 如果有, 重启这些线程中的一个.

P操作的 检查信号量-减1 和 检查信号量-挂起 以及V操作的 s+1 都是连续不可中断的. 如果有多个线程在等待, 重启哪一个线程是不可知的.

有了P操作和V操作, 就确保了一个正确初始化的信号量绝对不可能是一个负值. 将P操作放在临界区之前, V操作放在临界区之后, 如果程序启动的时候信号量是1, 由于P操作不可打断, 无论哪一个线程最先执行P操作, 那个线程会立刻执行P之后的代码, 而其他所有P操作都会阻塞. 线程执行完临界区之后, 启动了V操作, V也不可中断, 之后S变成1, 然后启动线程, 这个线程又会立刻执行P操作, 将信号量变为0, 导致其他P又在阻塞.

这样反复循环, 就保证了临界区的代码要么压根不执行, 要执行就执行完毕, 这样就解决了多线程的数据同步问题.(其实有点像I/O多路复用, 每次只能允许一个流来改变状态, 而且本质是把异步变同步, 因为这是由操作单一数据的特性决定的. 如果操作不同的数据, 也无需共享变量, 就是彻底的异步.)

semaphore的英文原来语义是信号灯, 这其实就像是一个二元信号灯, 要么放行, 要么等待. 就像很多条铁轨要并轨一样, 同时只能允许一列火车通过, 其他火车必须等待这个火车一节一节全部通过才行, 这就像等待组成临界区的一行一行代码执行完毕一样的.

信号量也是由操作系统实现的, Unix的信号量接口函数如下:

#include <semaphore.h>

//声明信号量
sem_t sem;

//初始化信号量
int sem_init(sem_t *sem, 0, unsinged int value);

//P操作
int sem_wait(sem_t *s);

//V操作
int sem_post(sem_t *s);

//上边这些函数成功都返回0, 出错返回-1

信号量初始化函数将信号量初始化为第三个参数value的值, 在一个程序中使用信号量的话, 必须要先初始化. 其第二个参数总是0.

P操作和V操作的函数参数都是指向信号量的指针. 一般喜欢自定义P和V函数来对其进行包装, 以使得信号量部分的操作与程序其他部分可以明显区分开来:

void P(sem_t *sem)
{
    if (sem_wait(sem) < 0)
        unix_error("P error");
}

void V(sem_t *sem)
{
    if (sem_post(sem) < 0)
        unix_error("V error");
}

信号量 – 二元信号量

刚才自己简单分析了一下信号量是1的情况, 这种情况就称作二元信号量, 即信号量始终在1和0之间变换, 也称为互斥信号量.

用互斥信号量来修改刚才的程序:

#include "csapp.h"

void *thread(void *vargp);

volatile long cnt = 0;

//声明信号量
sem_t mutex;

int main(int argc, char **argv){

    long niters;
    pthread_t tid1, tid2;

    if (argc != 2) {
        printf("usage: %s <niters>\n", argv[0]);
        exit(0);
    }

    niters = atoi(argv[1]);

    //初始化信号量
    Sem_init(&mutex, 0, 1);

    Pthread_create(&tid1, NULL, thread, &niters);
    Pthread_create(&tid2, NULL, thread, &niters);

    Pthread_join(tid1, NULL);
    Pthread_join(tid2, NULL);

    if (cnt != 2 * niters) {
        printf("BOOM! cnt = %ld\n", cnt);
    } else {
        printf("OK cnt = %ld\n", cnt);
    }
}

void *thread(void *vargp){
    long i, niters = *(long *) vargp;

    for (i = 0; i < niters; i++) {
        P(&mutex);
        cnt++;
        V(&mutex);
    }
}

红色的部分就是信号量相关的代码. 由于在前边我们已经分析到, for循环的前后准备工作不影响结果, 临界区域是cnt++所拆分成的三个阶段, 所以用P操作和V操作来环绕cnt++, 就可以得到正确的结果了.

互斥信号量使用简单, 然而相比刚才的程序, 很显然新程序变慢了, 这是因为本质上, 这和使用单线程计算一个循环没有本质的区别, 还额外增加了P和V操作的开销. 因此信号量除了互斥之外, 还有着更灵活的使用, 就是调度共享资源, 有两个非常经典的例子:生产者-消费者模型 和 读者-写者模型 需要仔细揣摩.

信号量 – 生产者消费者模型

生产者消费者是一个非常经典的模型, 两个线程共享一个n个槽位的缓冲区, 生产者不断向其中放入新项目, 消费者不断从中取出新项目. 有很多变种.

消费者的问题在于, 当槽中没有数据的时候, 需要等待直到出现新数据, 生产者的问题是, 如果槽满, 则必须等待有新的槽空出来才可以.

由于插入和取出项目都涉及到访问共享的缓冲区, 很显然需要信号量来保证互斥. 然而上述消费者和生产者的等待问题, 就不能简单的使用互斥信号量, 而需要使用其他类型的信号量.

CSAPP这里手把手的教写了一个生产者消费者模型, 来看一下. 首先是缓冲区的结构体:

#include "csapp.h"

#ifndef C_SBUF_H
#define C_SBUF_H

typedef struct {
    int *buf;       //缓冲区数组, 采用动态分配内存
    int n;          //最大槽位, 也就是数组的最大长度
    int front;      //记录数组第一项的索引
    int rear;       //记录数组最后一项的索引
    sem_t mutex;    //用于控制访问缓冲区的互斥信号量
    sem_t slots;    //统计空的插槽
    sem_t items;    //统计可用的数据(装了数据的插槽)
} sbuf_t;

#endif //C_SBUF_H

这个缓冲区里有三个信号量, 一个用于互斥访问, 一个用于统计空的插槽, 很显然, 这个数量能够控制生产者可以向其中放入数据还是阻塞. 而可用的数据数量控制了消费者从其中拿出数据还是阻塞.

有了缓冲区之后, 整体的程序结构如下:

  1. sbuf_init函数初始化缓冲区, 指定n个槽位, 然后会给buf动态分配空间, 同时初始化信号量, mutex作为互斥信号量为1, slots一开始全部可用, 设置为n. 而可用数据一开始是0.
  2. 生产者生产数据, 调用sbuf_insert 向其中放入数据, 由于要根据slot的数量来放入数据, 因此需要加上slots信号量, 然后实际放数据的时候, 因为要访问缓冲区, 还必须加上mutex信号量. 然后插入一个数据, 此时会因为V操作将slots信号量减去1. 如果slots已经是0, 就会阻塞.
  3. 消费者取用数据, 调用sbuf_remove 取出数据, 由于要根据items的数量来取出数据, 因此需要加上items信号量, 然后实际取数据的时候, 因为要访问缓冲区, 还必须加上mutex信号量. 然后取走一个数据, 将front指针移动到下一个位置(这里使用了取余的技巧, 让front超过长度再移动到索引0的位置), 此时会因为V操作将items信号量减去1. 如果items已经是0, 就会阻塞.
  4. 需要结束生产者和消费者程序的时候, 释放结构中的动态分配的部分

先来看初始化的代码:

void sbuf_init(sbuf_t *sp, int n){

    //动态分配n个int的空间, 这里就是用int当做生产和消费的数据类型
    sp->buf = Calloc(n, sizeof(int));
    //数组长度设置为n
    sp->n = n;
    //一开始没有数据, front和rear都是0
    sp->front = sp->rear = 0;
    //访问缓冲区的互斥信号量要初始化为1
    Sem_init(&sp->mutex, 0, 1);
    //初始没有数据, 可用槽位就是n
    Sem_init(&sp->slots, 0, n);
    //初始可用数据是0
    Sem_init(&sp->items, 0, 0);
}

这个初始化的代码很简单, 主要要注意三个信号量的初始化值. 之后是生产者放入新数据的代码:

void sbuf_insert(sbuf_t *sp, int item){

    //检查是不是有空槽
    P(&sp->slots);
    //通过检查空槽的话, 要读写缓冲区了, 必须要互斥访问
    P(&sp->mutex);
    //都通过了, 放入数据
    //这里用了一个小技巧, rear增加后不断取余, 索引会反复在1-9-0之间变动.
    sp->buf[(++sp->rear) % (sp->n)] = item;
    //解除互斥访问
    V(&sp->mutex);
    //这里很关键, 有了空槽写完之后, 要给谁加1. 由于空槽的数量已经被P减1了, 又放入了一个数据, 显然不能再P(&sp->slots), 否则这个slots失去意义了
    //所以是要给items 加1, 表示现在可用数据多了1个
    V(&sp->items);
}

这里的关键是要理解使用信号量的逻辑, 以及完成操作的时候, 到底该对哪些信号量执行V操作.

知道了insert函数, sbuf_remove也很好写了:

int sbuf_remove(sbuf_t *sp){

    int data;
    //检查是不是有可读取的数据
    P(&sp->items);
    //如果有可读取数据, 要读写缓冲区了, 必须要互斥访问
    P(&sp->mutex);
    //取出数据, 这里的技巧和之前相同, 也让front在1-9-0之间变动.
    data = sp->buf[(++sp->front) % (sp->n)];
    //解除互斥访问
    V(&sp->mutex);
    //基于同样的分析, 这里应该给slots加1, 表示空出来一个槽
    V(&sp->slots);
    return data;
}

这个函数取出放入的数据, 然后会将slots增加1. 如果没有可取的数据, 就会阻塞.

最后是辅助函数, 释放空间, 很简单:

void subf_deinit(sbuf_t *sp) {
    Free(sp->buf);
}

通过这个模型, 可以看到, 信号量并不是一定要是全局变量或者刻意的什么东西, 而是要有机的看待信号量和P,V函数操作信号量和阻塞的功能. 这样才能灵活的使用信号量.

练习 12.9 指出下列场景互斥锁是否必要

  1. p = 1, c = 1, n > 1
  2. p = 1, c = 1, n = 1
  3. p > 1, c > 1, n = 1

A场景下, 只有一个生产者和一个消费者, 在一开始消费者阻塞, 生成者放入数据之后, 消费者去读, 此时生产者再放, 如果生产者放的多, 会覆盖消费者, 所以要加锁

B场景下 slots 和 item 就像互斥锁一样, 不用加互斥锁

如果有多个生产者和消费者, 通过slots的变化进来是一样的, 所以不用加互斥锁.

从这个练习可以看出, 缓冲区的长度本身也能起到锁的作用, 只要是一个二元变化的内容, 都可以起到互斥锁的作用.

信号量 – 读者/写者模型

这是一个利用互斥的模型, 一般用在一组并发的线程要访问共享对象的时候. 有些线程只读对象, 一些线程负责修改对象. 需要写的线程需要独占访问, 但读取不需要. 在现实中, 这个模型广泛的应用在订票等方面.

比如很多客户同时查看影院的座位, 在一个座位没有被订之前, 这个座位对所有读者都显示可用, 一旦一个人进入订票的程序, 另外一个人再想订票, 就要等待当前的人操作结束, 看是否订了票才行.

此外这个模型涉及到权限问题, 即究竟是读者优先还是写者优先. 如果读者优先, 即使有写者在等待, 也可以读. 如果写者优先, 一旦一个写者准备好, 就要让其写入, 之后再读.

CSAPP这里举了读者优先的例子. 先来看写者的代码.

int readcnt=0;
sem_t mutex=1, W=1;

void writer(void){
    while (1) {
        P(&W);
        do something;
        V(&W);
    }
}

写者的代码比较简单, 就是等待W信号, 进去再出来, 由于W初始化1, 所以同时只能有一个写者, 互斥的概念就是同时只能有一个. 看读者的代码:

void reader(void){
    while(1) {
        //读者进来的代码
        P(&mutex);
        readcnt++;
        if (readcnt == 1) {
            P(&W);
        }
        V(&mutex);

        do something;

        //读者干完活要离开的代码
        P(&mutex);
        readcnt--;
        if (readcnt == 0) {
            V(&W);
        }
        V(&mutex);
    }
}

读者的代码首先去加上mutex锁, 因为要访问readcnt, 不能被写改变. 之后的代码就微妙了. 在修改了readcnt++之后, 这个readcnt++其实是读者的数量, 如果只有一个读者, 读者就会获取W锁, 即不让写者进来.

之后读者干完活离开的时候, 要更新当前读者数量, 因此又要加mutex锁, 然后减少readcnt, 发现自己是最后一个走人的, 才会交还W锁.

这样的结果就是, 只要有一个读者没离开, 写者就永远没法写, 因为W锁在读者那里. 除此之外, 在读者和写者数量不平衡的情况下, 也会出问题.

练习 12.10 数量不平衡时候出现的问题

假如程序现在处于没有读者, 只有写者的状态, 只有一个读者, 大部分是写者. 当一个写者离开的时候, 会交还W锁, 然后重启一个等待W锁的线程. 从代码可以发现, 第一个读者和一群写者都在等待W锁. 由于数量上写者占据优势, 很可能又启动一个写线程.

这样不断循环下去, 可能很长时间读者都无法读出数据.

线程实际使用 – 预线程服务器

在之前多线程 echo server 中, 为每一个连接创建一个新的线程, 其实开销也不小. 而且一个进程能够同时开设的线程是有限的. 根据简单的测试, 基于每个连接一个线程的 tomcat 服务器在 500用户左右就开始出现问题.

那么要如何使用多线程来提高并行能力呢, 一般就是采取不特定对应的方式, 即采用一批线程进程干活, 主线程负责接受任务和分发.

看一下代码:

#include "csapp.h"
#include "sbuf.h"
#include "echo.h"

#define NTHREADS 4
#define SBUFSIZE 16

void echo_cnt(int connfd);
void *thread(void *vargp);

sbuf_t sbuf;

int main(int argc, char **argv){
    int i, listenfd, connfd;
    socklen_t clientlen;
    struct sockaddr_storage clientaddr;
    pthread_t tid;

    if (argc != 2) {
        fprintf(stderr, "usage: %s <port>\n", argv[0]);
        exit(0);
    }

    listenfd = Open_listenfd(argv[1]);

    //初始化工作缓冲区, 创建指定数量的线程
    sbuf_init(&sbuf, SBUFSIZE);
    for (i = 0; i < NTHREADS; i++) {
        Pthread_create(&tid, NULL, thread, NULL);
    }

    //主线程之后做的事情, 就是等待连接然后将已连接描述符放入缓冲区
    while(1){
        clientlen = sizeof(struct sockaddr_storage);
        connfd = Accept(listenfd, (SA *) &clientaddr, &clientlen);

        //使用编写好的库, 添加connfd到缓冲区中
        sbuf_insert(&sbuf, connfd);
    }
}

void *thread(void *vargp){
    //将自己变成分离线程
    Pthread_detach(pthread_self());
    //不断循环的等待从缓冲区拿描述符然后干活
    while (1) {
        int connfd = sbuf_remove(&sbuf);
        echo_cnt(connfd);
        Close(connfd);
    }
}

这里利用了之前编写的sbuf包, 这样主线程充当了生产者的角色, 按照指定数量产生的线程充当了消费者的角色.

这里还有一个小问题, 就是原来调用的echo函数有统计字节数量的功能. 现在这个函数会被很多线程可能同时调用, 也就是运行在多个线程中. 如果还需要正确的统计数量, 则可以考虑将其分离出来成为一个模块, 其中用全局变量来统计, 同样也需要用到锁.

#include "csapp.h"
#include "echo.h"

static int byte_cnt;
static sem_t mutex;

static void init_echo_cnt(void){
    Sem_init(&mutex, 0, 1);
    byte_cnt = 0;
}

void echo_cnt(int connfd){
    int n;
    char buf[MAXLINE];
    rio_t rio;
    static pthread_once_t once = PTHREAD_ONCE_INIT;

    Pthread_once(&once, init_echo_cnt);
    Rio_readinitb(&rio, connfd);
    while ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
        P(&mutex);
        byte_cnt += n;
        printf("Server received %d (%d total) bytes on fd %d", n, byte_cnt, connfd);
        V(&mutex);
        Rio_writen(connfd, buf, n);
    }
}

这个程序包首先将自己的信号量和全局变量通过static 声明变为自己可见. 然后使用了 Pthread_once 进行初始化, 其他的代码基本上和原来的相同.

这里实际上是多线程程序包的一般技术, 即先隐藏只供自己使用的信号量, 然后采用初始化技术, 这样可以有效的避免污染到主程序的全局信号量.

这个服务器本质上讲, 和I/O多路复用的事件驱动服务器很像, 工作线程一直在等待缓冲区上出现数据, 出现一个数据就相当于一个事件, 之后逻辑流就会在一个线程中执行. 这说明编写事件驱动服务器的另外一个方法就是使用信号量来等待数据.