操作系统导论-并发

并发:介绍

经典观点是一个程序只有一个执行点(一个程序计数器,用来存放要执行的指令),但多线程(multi-threaded)程序会有多个执行点(多个程序计数器,每个都用于取指令和执行)。每个线程类似于独立的进程,只有一点区别:它们共享地址空间,从而能够访问相同的数据。

单个线程的状态与进程状态非常类似。线程有一个程序计数器(PC),记录程序从哪里获取指令。每个线程有自己的一组用于计算的寄存器。所以,如果有两个线程运行在一个处理器上,从运行一个线程(T1)切换到另一个线程(T2)时,必定发生上下文切换(context switch)。线程之间的上下文切换类似于进程间的上下文切换。对于进程,我们将状态保存到进程控制块(Process Control Block,PCB)。现在,我们需要一个或多个线程控制块(Thread Control Block,TCB),保存每个线程的状态。但是,与进程相比,线程之间的上下文切换有一点主要区别:地址空间保持不变(即不需要切换当前使用的页表)。

线程和进程之间的另一个主要区别在于栈。在简单的传统进程地址空间模型 [我们现在可以称之为单线程(single-threaded)进程] 中,只有一个栈,通常位于地址空间的底部。
?
然而,在多线程的进程中,每个线程独立运行,当然可以调用各种例程来完成正在执行的任何工作。不是地址空间中只有一个栈,而是每个线程都有一个栈。

实例:线程创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <stdio.h>
#include <assert.h>
#include <pthread.h>

void *mythread(void *arg) {
printf("%s\n", (char *) arg);
return NULL;
}

int
main(int argc, char *argv[]) {
pthread_t p1, p2;
int rc;
printf("main: begin\n");
rc = pthread_create(&p1, NULL, mythread, "A"); assert(rc == 0);
rc = pthread_create(&p2, NULL, mythread, "B"); assert(rc == 0);
// join waits for the threads to finish
rc = pthread_join(p1, NULL); assert(rc == 0);
rc = pthread_join(p2, NULL); assert(rc == 0);
printf("main: end\n");
return 0;
}

?
这种排序不是唯一可能的顺序。实际上,给定一系列指令,有很多可能的
顺序,这取决于调度程序决定在给定时刻运行哪个线程。

?

即使先前创建了线程 1,如果调度程序决定先运行线程 2,没有理由认为先创建的线程先运行。
?

核心问题:不可控的调度

1
2
3
mov 0x8049a1c, %eax
add $0x1, %eax
mov %eax, 0x8049a1c

变量 counter 位于地址 0x8049a1c。在这 3 条指令中,先用 x86 的 mov指令,从内存地址处取出值,放入 eax。然后,给 eax 寄存器的值加 1(0x1)。最后,eax的值被存回内存中相同的地址。

设想我们的两个线程之一(线程 1)进入这个代码区域,并且因此将要增加一个计数器。它将 counter 的值(假设它这时是 50)加载到它的寄存器 eax 中。因此,线程 1 的 eax = 50。然后它向寄存器加 1,因此 eax = 51。现在,一件不幸的事情发生了:时钟中断发生。因此,操作系统将当前正在运行的线程(它的程序计数器、寄存器,包括 eax 等)的状态保存到线程的 TCB。

现在更糟的事发生了:线程 2 被选中运行,并进入同一段代码。它也执行了第一条指令,获取计数器的值并将其放入其 eax 中 [请记住:运行时每个线程都有自己的专用寄存器。上下文切换代码将寄存器虚拟化(virtualized),保存并恢复它们的值]。此时 counter 的值仍为 50,因此线程 2 的 eax = 50。假设线程 2 执行接下来的两条指令,将 eax 递增 1(因此 eax= 51),然后将 eax 的内容保存到 counter(地址 0x8049a1c)中。因此,全局变量 counter 现在的值是 51。

最后,又发生一次上下文切换,线程 1 恢复运行。还记得它已经执行过 mov 和 add 指令,现在准备执行最后一条 mov 指令。回忆一下,eax=51。因此,最后的 mov 指令执行,将值保存到内存,counter 再次被设置为 51。

简单来说,发生的情况是:增加 counter 的代码被执行两次,初始值为 50,但是结果为51。这个程序的“正确”版本应该导致变量 counter 等于 52。

由于执行这段代码的多个线程可能导致竞争状态,因此我们将此段代码称为临界区(critical section)。临界区是访问共享变量(或更一般地说,共享资源)的代码片段,一定不能由多个线程同时执行。
?

我们真正想要的代码就是所谓的互斥(mutual exclusion)。这个属性保证了如果一个线程在临界区内执行,其他线程将被阻止进入临界区。

原子性愿望

原子方式的意思是“作为一个单元”,当指令执行时,它会像期望那样执行更新。它不能在指令中间中断,因为这正是我们从硬件获得的保证:发生中断时,指令根本没有运行,或者运行完成,没有中间状态。

如果有一条指令来做到这一点,我们可以发出这条指令然后完事。但在一般情况下,不会有这样的指令。

因此,我们要做的是要求硬件提供一些有用的指令,可以在这些指令上构建一个通用的集合,即所谓的同步原语(synchronization primitive)。通过使用这些硬件同步原语,加上操作系统的一些帮助,我们将能够构建多线程代码,以同步和受控的方式访问临界区,从而可靠地产生正确的结果

补充:关键并发术语

临界区、竞态条件、不确定性、互斥执行

临界区(critical section)是访问共享资源的一段代码,资源通常是一个变量或数据结构。

竞态条件(race condition)出现在多个执行线程大致同时进入临界区时,它们都试图更新共享的数据结构,导致了令人惊讶的(也许是不希望的)结果。

不确定性(indeterminate)程序由一个或多个竞态条件组成,程序的输出因运行而异,具体取决于哪些线程在何时运行。这导致结果不是确定的(deterministic),而我们通常期望计算机系统给出确定的结果。

为了避免这些问题,线程应该使用某种互斥(mutual exclusion)原语。这样做可以保证只有一个线程进入临界区,从而避免出现竞态,并产生确定的程序输出。

还有一个问题:等待另一个线程

线程之间有一种交互,即访问共享变量,因此需要为临界区支持原子性。事实证明,还有另一种常见的交互,即一个线程在继续之前必须等待另一个线程完成某些操作。例如,当进程执行磁盘 I/O 并进入睡眠状态时,会产生这种交互。当 I/O 完成时,该进程需要从睡眠中唤醒,以便继续进行。


插叙:线程API

线程创建

1
2
3
4
5
6
7
8
#include <pthread.h>
int
pthread_create(
pthread_t * thread,
const pthread_attr_t * attr,
void * (*start_routine)(void*),
void * arg
);

第一个参数 thread 是指向 pthread_t 结构类型的指针,我们将利用这个结构与该线程交互,因此需要将它传入pthread_create(),以便将它初始化。

第二个参数 attr 用于指定该线程可能具有的任何属性。一些例子包括设置栈大小,或关于该线程调度优先级的信息。一个属性通过单独调用 pthread_attr_init()来初始化。在大多数情况下,默认值就行。在这个例子中,我们只需传入 NULL。

第三个参数最复杂,但它实际上只是问:这个线程应该在哪个函数中运行?在 C 中,我们把它称为一个函数指针(function pointer),这个指针告诉我们需要以下内容:一个函数名称(start_routine),它被传入一个类型为 void *的参数(start_routine 后面的括号表明了这一点),并且它返回一个 void *类型的值(即一个 void 指针)。

最后,第四个参数 arg 就是要传递给线程开始执行的函数的参数。你可能会问:为什么我们需要这些 void 指针?好吧,答案很简单:将 void 指针作为函数的参数 start_routine,允许我们传入任何类型的参数,将它作为返回值,允许线程返回任何类型的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include <pthread.h>

typedef struct myarg_t {
int a;
int b;
} myarg_t;

void *mythread(void *arg) {
myarg_t *m = (myarg_t *) arg;
printf("%d %d\n", m->a, m->b);
return NULL;
}

int
main(int argc, char *argv[]) {
pthread_t p;
int rc;
myarg_t args;
args.a = 10;
args.b = 20;
rc = pthread_create(&p, NULL, mythread, &args);
...
}

线程完成

如果你想等待线程完成,你必须调用函数 pthread_join()。

1
int pthread_join(pthread_t thread, void ** retval);

该函数有两个参数。第一个是 pthread_t 类型,用于指定要等待的线程。这个变量是由线程创建函数初始化的(当你将一个指针作为参数传递给 pthread_create()时)。如果你保留了它,就可以用它来等待该线程终止。

第二个参数是一个指针,指向你希望得到的返回值。因为函数可以返回任何东西,所以它被定义为返回一个指向 void 的指针。因为 pthread_join()函数改变了传入参数的值,所以你需要传入一个指向该值的指针,而不只是该值本身。

gcc -o test1 test1.c -pthread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <stdio.h>
#include <pthread.h>
#include <assert.h>
#include <stdlib.h>

typedef struct myarg_t {
int a;
int b;
} myarg_t;

typedef struct myret_t {
int x;
int y;
} myret_t;

void *mythread(void *arg) {
myarg_t *m = (myarg_t *) arg;
printf("%d %d\n", m->a, m->b);
myret_t *r = Malloc(sizeof(myret_t));
r->x = 1;
r->y = 2;
return (void *) r;
}

int
main(int argc, char *argv[]) {
int rc;
pthread_t p;
myret_t *m;

myarg_t args;
args.a = 10;
args.b = 20;
Pthread_create(&p, NULL, mythread, &args);
Pthread_join(p, (void **) &m);
printf("returned %d %d\n", m->x, m->y);
return 0;
}

我们应该注注,并非所有多线程代码都使用 join 函数。例如,多线程 Web 服务器可能会创建大量工作线程,然后使用主线程接受请求,并将其无限期地传递给工作线程。因此这样的长期程序可能不需要 join。然而,创建线程来(并行)执行特定任务的并行程序,很可能会使用 join 来确保在退出或进入下一阶段计算之前完成所有这些工作。

除了线程创建和 join 之外,POSIX 线程库提供的最有用的函数集,可能是通过锁(lock)来提供互斥进入临界区的那些函数。这方面最基本的一对函数是:

1
2
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

有一段代码是一个临界区,就需要通过锁来保护,以便像需要的那样运行。

1
2
3
4
pthread_mutex_t lock;
pthread_mutex_lock(&lock);
x = x + 1; // or whatever your critical section is
pthread_mutex_unlock(&lock);
  1. 第一个问题是缺乏正确的初始化(lack of proper initialization)。所有锁必须正确初始化,以确保它们具有正确的值,并在锁和解锁被调用时按照需要工作。

对于 POSIX 线程,有两种方法来初始化锁。一种方法是使用 PTHREAD_MUTEX_INITIALIZER,如下所示:

1
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

这样做会将锁设置为默认值,从而使锁可用。初始化的动态方法(即在运行时)是调用 pthread_mutex_init(),如下所示:

1
2
int rc = pthread_mutex_init(&lock, NULL);
assert(rc == 0); // always check success!

用完锁时,还应该相应地调用 pthread_mutex_destroy()

  1. 上述代码的第二个问题是在调用获取锁和释放锁时没有检查错误代码。
1
2
3
4
5
6
// Use this to keep your code clean but check for failures
// Only use if exiting program is OK upon failure
void Pthread_mutex_lock(pthread_mutex_t *mutex) {
int rc = pthread_mutex_lock(mutex);
assert(rc == 0);
}

获取锁和释放锁函数不是 pthread 与锁进行交互的仅有的函数。

1
2
3
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_timedlock(pthread_mutex_t *mutex,
struct timespec *abs_timeout);

这两个调用用于获取锁。如果锁已被占用,则 trylock 版本将失败。获取锁的 timedlock定版本会在超时或获取锁后返回,以先发生者为准。因此,具有零超时的 timedlock 退化为trylock 的情况。通常应避免使用这两种版本,但有些情况下,避免卡在(可能无限期的)获取锁的函数中会很有用。

条件变量

当线程之间必须发生某种信号时,如果一个线程在等待另一个线程继续执行某些操作,条件变量就很有用。希望以这种方式进行交互的程序使用两个主要函数:

1
2
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_signal(pthread_cond_t *cond);

要使用条件变量,必须另外有一个与此条件相关的锁。在调用上述任何一个函数时,应该持有这个锁。

第一个函数 pthread_cond_wait()使调用线程进入休眠状态,因此等待其他线程发出信号,通常当程序中的某些内容发生变化时,现在正在休眠的线程可能会关心它。典型的用法如下所示

1
2
3
4
5
6
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
Pthread_mutex_lock(&lock);
while (ready == 0)
Pthread_cond_wait(&cond, &lock);
Pthread_mutex_unlock(&lock);

在这段代码中,在初始化相关的锁和条件之后,一个线程检查变量 ready 是否已经被设置为零以外的值。如果没有,那么线程只是简单地调用等待函数以便休眠,直到其他线程唤醒它。

唤醒线程的代码运行在另外某个线程中,像下面这样:

1
2
3
4
Pthread_mutex_lock(&lock);
ready = 1;
Pthread_cond_signal(&cond);
Pthread_mutex_unlock(&lock);

首先,在发出信号时(以及修改全局变量 ready 时),我们始终确保持有锁。这确保我们不会在代码中注外引入竞态条件。

等待调用将锁作为其第二个参数,而信号调用仅需要一个条件。造成这种差异的原因在于,等待调用除了使调用线程进入睡眠状态外,还会让调用者睡眠时释放锁。想象一下,如果不是这样:其他线程如何获得锁并将其唤醒?但是,在被唤醒之后返回之前,pthread_cond_wait()会重新获取该锁,从而确保等待线程在等待序列开始时获取锁与结束时释放锁之间运行的任何时间,它持有锁。

编译和运行

本章所有代码很容易运行。代码需要包括头文件 pthread.h 才能编译。链接时需要 pthread库,增加-pthread 标记。
例如,要编译一个简单的多线程程序,只需像下面这样做:

1
gcc -o main main.c -Wall -pthread

补充:线程 API 指导
当你使用 POSIX 线程库(或者实际上,任何线程库)来构建多线程程序时,需要记住一些小而重要的事情:

保持简洁。最重要的一点,线程之间的锁和信号的代码应该尽可能简洁。复杂的线程交互容易产生缺陷。

让线程交互减到最少。尽量减少线程之间的交互。每次交互都应该想清楚,并用验证过的、正确的方法来实现(很多方法会在后续章节中学习)。

初始化锁和条件变量。未初始化的代码有时工作正常,有时失败,会产生奇怪的结果。

检查返回值。当然,任何 C 和 UNIX 的程序,都应该检查返回值,这里也是一样。否则会导致古怪而难以理解的行为,让你尖叫,或者痛苦地揪自己的头发。

注意传给线程的参数和返回值。具体来说,如果传递在栈上分配的变量的引用,可能就是在犯错误。

每个线程都有自己的栈。类似于上一条,记住每一个线程都有自己的栈。因此,线程局部变量应该是线程私有的,其他线程不应该访问。线程之间共享数据,值要在堆(heap)或者其他全局可访问的位置。

线程之间总是通过条件变量发送信号。切记不要用标记变量来同步。


我们希望原子式执行一系列指令,但由于单处理器上的中断(或者多个线程在多处理器上并发执行),我们做不到。

程序员在源代码中加锁,放在临界区周围,保证临界区能够像单条原子指令一样执行。

锁的基本思想

多个线程同时共享的资源——临界区:

1
balance = balance + 1;

加锁:

1
2
3
4
5
lock_t mutex; // some globally-allocated lock 'mutex'
...
lock(&mutex);
balance = balance + 1;
unlock(&mutex);

锁就是一个变量,因此我们需要声明一个某种类型的锁变量,才能使用。这个锁变量(简称锁)保存了锁在某一时刻的状态。它要么是可用的(available,或 unlocked,或 free),表示没有线程持有锁,要么是被占用的(acquired,或 locked,或 held),表示有一个线程持有锁,正处于临界区。

ock()和 unlock()函数的语义很简单。调用 lock()尝试获取锁,如果没有其他线程持有锁(即它是可用的),该线程会获得锁,进入临界区。这个线程有时被称为锁的持有者(owner)。

如果另外一个线程对相同的锁变量(本例中的 mutex)调用 lock(),因为锁被另一线程持有,该调用不会返回。这样,当持有锁的线程在临界区时,其他线程就无法进入临界区。

锁的持有者一旦调用 unlock(),锁就变成可用了。如果没有其他等待线程(即没有其他线程调用过 lock()并卡在那里),锁的状态就变成可用了。如果有等待线程(卡在 lock()里),其中一个会(最终)注意到(或收到通知)锁状态的变化,获取该锁,进入临界区。

Pthread 锁

POSIX 库将锁称为互斥量(mutex),因为它被用来提供线程之间的互斥。

下面的POSIX 线程代码,应该理解它和上面的代码段执行相同的任务

1
2
3
4
5
1 pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
2
3 Pthread_mutex_lock(&lock); // wrapper for pthread_mutex_lock()
4 balance = balance + 1;
5 Pthread_mutex_unlock(&lock);

POSIX 的 lock 和 unlock 函数会传入一个变量,因为我们可能用不
同的锁来保护不同的变量。这样可以增加并发。

实现一个锁

我们需要硬件和操作系统的帮助来实现一个可用的锁。近些年来,各种计算机体系结构的指令集都增加了一些不同的硬件原语。

评价锁

最基本的,锁是否有效,能够阻止多个线程进入临界区?
当锁可用时,是否每一个竞争线程有公平的机会抢到锁?
最后是性能(performance),具体来说,是使用锁之后增加的时间开销。

控制中断

最早提供的互斥解决方案之一,就是在临界区关闭中断。这个解决方案是为单处理器系统开发的。

1
2
3
4
5
6
1 void lock() {
2 DisableInterrupts();
3 }
4 void unlock() {
5 EnableInterrupts();
6 }

这种方法要求我们允许所有调用线程执行特权操作(打开关闭中断),即信任这种机制不会被滥用。

缺点:
第一,一个贪婪的程序可能在它开始时就调用 lock(),从而独占处理器。更糟的情况是,恶意程序调用 lock()后,一直死循环。后一种情况,系统无法重新获得控制,只能重启系统。关闭中断对应用要求太多,不太适合作为通用的同步解决方案。

第二,这种方案不支持多处理器。如果多个线程运行在不同的 CPU 上,每个线程都试图进入同一个临界区,关闭中断也没有作用。线程可以运行在其他处理器上,因此能够进入临界区。多处理器已经很普遍了,我们的通用解决方案需要更好一些。

第三,关闭中断导致中断丢失,可能会导致严重的系统问题。假如磁盘设备完成了读取请求,但 CPU 错失了这一事实,那么,操作系统如何知道去唤醒等待读取的进程?

最后一个不太重要的原因就是效率低。与正常指令执行相比,现代 CPU 对于关闭和打开中断的代码执行得较慢。

load 和 store(早期的硬件上,它们是原子的)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int flag[2];
int turn;
void init() {
flag[0] = flag[1] = 0; // 1->thread wants to grab lock
turn = 0; // whose turn? (thread 0 or 1?)
}
void lock() {
flag[self] = 1; // self: thread ID of caller
turn = 1 - self; // make it other thread's turn
while ((flag[1-self] == 1) && (turn == 1 - self))
; // spin-wait
}
void unlock() {
flag[self] = 0; // simply undo your intent
}

情况1:线程0想锁,线程1不想
lock函数内:flag[0]=1,turn=1
进入循环:flag[1]≠1,突破循环,成功锁上。

情况2:线程0,1都想锁
lock函数内,flag[0]=1,flag[1]=1,0、1进程肯定有一个先执行,0先时turn=1,进入循环,第二个条件满足,锁住。此时1执行到turn=0,进入循环条件满足,卡住,但是此时turn=0不满足0线程的循环条件,0线程突破。

情况3:线程0锁住将要解锁,线程1循环卡住
线程0解锁将flag=0,线程1不满足第一个条件,突破循环获得锁。

turn变量在两个线程竞争锁时起到了让给对方的作用。flag是在判断另一个线程是否也想要获取锁

测试并设置指令(原子交换)

因为关闭中断的方法无法工作在多处理器上,所以系统设计者开始让硬件支持锁。

最简单的硬件支持是测试并设置指令(test-and-set instruction),也叫作原子交换(atomic exchange)。

我们首先实现一个不依赖它的锁,用一个变量标记锁是否被持有。
在第一次尝试中,想法很简单:用一个变量来标志锁是否被某些线程占用。
第一个线程进入临界区,调用 lock(),检查标志是否为 1(这里不是 1),然后设置标志为 1,表明线程持有该锁。结束临界区时,线程调用 unlock(),清除标志,表示锁未被持有。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
1 typedef struct lock_t { int flag; } lock_t;
2
3 void init(lock_t *mutex) {
4 // 0 -> lock is available, 1 -> held
5 mutex->flag = 0;
6 }
7
8 void lock(lock_t *mutex) {
9 while (mutex->flag == 1) // TEST the flag
10 ; // spin-wait (do nothing)
11 mutex->flag = 1; // now SET it!
12 }
13
14 void unlock(lock_t *mutex) {
15 mutex->flag = 0;
16 }

当第一个线程正处于临界区时,如果另一个线程调用 lock(),它会在 while 循环中自旋等待(spin-wait),直到第一个线程调用 unlock()清空标志。然后等待的线程会退出 while 循环,设置标志,执行临界区代码。
?
在没有设置flag之前发生中断另一个线程也到达这一步
通过适时的中断,我们很容易构造出两个线程都将标志设置为 1,都能进入临界区的场景。这种行为就是专家所说的“不好”,我们显然没有满足最基本的要求:互斥。

性能问题(稍后会有更多讨论)主要是线程在等待已经被持有的锁时,采用了自旋等待(spin-waiting)的技术,就是不停地检查标志的值。

实现可用的自旋锁

尽管上面例子的想法很好,但没有硬件的支持是无法实现的(没办法保证用于互斥的语句为原子操作不可打断)。
在 SPARC上,这个指令叫 ldstub(load/store unsigned byte,加载/保存无符号字节);
在 x86 上,是 xchg(atomic exchange,原子交换)指令。但它们基本上在不同的平台上做同样的事,通常称为测试并设置指令(test-and-set)。

将测试(test 旧的锁值)和设置(set 新的值)合并为一个原子操作之后,我们保证了只有一个线程能获取锁。这就实现了一个有效的互斥原语!

1
2
3
4
5
1 int TestAndSet(int *old_ptr, int new) {
2 int old = *old_ptr; // fetch old value at old_ptr
3 *old_ptr = new; // store 'new' into old_ptr
4 return old; // return the old value
5 }

它返回 old_ptr 指向的旧值,同时更新为 new 的新值。
当然,关键是这些代码是原子地(atomically)执行。因为既可以测试旧值,又可以设置新值,所以我们把这条指令叫作“测试并设置”。这一条指令完全可以实现一个简单的自旋锁(spin lock)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1 typedef struct lock_t {
2 int flag;
3 } lock_t;
4
5 void init(lock_t *lock) {
6 // 0 indicates that lock is available, 1 that it is held
7 lock->flag = 0;
8 }
9
10 void lock(lock_t *lock) {
11 while (TestAndSet(&lock->flag, 1) == 1)
12 ; // spin-wait (do nothing)
13 }
14
15 void unlock(lock_t *lock) {
16 lock->flag = 0;
17 }

第二种场景是,当某一个线程已经持有锁(即 flag 为 1)。本线程调用 lock(),然后调用TestAndSet(flag, 1),这一次返回 1。只要另一个线程一直持有锁,TestAndSet()会重复返回 1,本线程会一直自旋。当 flag 终于被改为 0,本线程会调用 TestAndSet(),返回 0 并且原子地设置为 1,从而获得锁,进入临界区。

在单处理器上,需要抢占式的调度器(preemptive scheduler,即不断通过时钟中断一个线程,运行其他线程)。否则,自旋锁在单 CPU 上无法使用,因为一个自旋的线程永远不会放弃 CPU。

评价自旋锁

自旋锁一次只允许一个线程进入临界区。因此,这是正确的锁。

自旋的线程在竞争条件下可能会永远自旋。自旋锁没有公平性,可能会导致饿死。

在单 CPU 的情况下,性能开销相当大。假设一个线程持有锁进入临界区时被抢占。调度器可能会运行其他每一个线程(假设有 N−1 个这种线程)。而其他线程都在竞争锁,都会在放弃 CPU 之前,自旋一个时间片,浪费 CPU 周期。
但是,在多 CPU 上,自旋锁性能不错(如果线程数大致等于 CPU 数)。假设线程 A 在CPU 1,线程 B 在 CPU 2 竞争同一个锁。线程 A(CPU 1)占有锁时,线程 B 竞争锁就会自旋(在 CPU 2 上)。然而,临界区一般都很短,因此很快锁就可用,然后线程 B 获得锁。自旋等待其他处理器上的锁,并没有浪费很多 CPU 周期,因此效果不错。

比较并交换

某些系统提供了另一个硬件原语,即比较并交换指令(SPARC 系统中是 compare-and-swap,x86 系统是 compare-and-exchange)。

1
2
3
4
5
6
1 int CompareAndSwap(int *ptr, int expected, int new) {
2 int actual = *ptr;
3 if (actual == expected)
4 *ptr = new;
5 return actual;
6 }

比较并交换的基本思路是检测 ptr 指向的值是否和 expected 相等;如果是,更新 ptr 所指的值为新值。否则,什么也不做。不论哪种情况,都返回该内存地址的实际值,让调用者能够知道执行是否成功。

我们只要用下面的代码替换 lock()函数:

1
2
3
4
1 void lock(lock_t *lock) {
2 while (CompareAndSwap(&lock->flag, 0, 1) == 1)
3 ; // spin
4 }

链接的加载和条件式存储指令

一些平台提供了实现临界区的一对指令。例如 MIPS 架构[H93]中,链接的加载(load-linked)和条件式存储(store-conditional)可以用来配合使用,实现其他并发结构。

1
2
3
4
5
6
7
8
9
10
11
12
1 int LoadLinked(int *ptr) {
2 return *ptr;
3 }
4
5 int StoreConditional(int *ptr, int value) {
6 if (no one has updated *ptr since the LoadLinked to this address) {
7 *ptr = value;
8 return 1; // success!
9 } else {
10 return 0; // failed to update
11 }
12 }

链接的加载指令和典型加载指令类似,都是从内存中取出值存入一个寄存器。关键区别来自条件式存储(store-conditional)指令,只有上一次加载的地址在期间都没有更新时,才会成功,(同时更新刚才链接的加载的地址的值)。成功时,条件存储返回 1,并将 ptr 指的值更新为 value。失败时,返回 0,并且不会更新值。

1
2
3
4
5
6
7
8
9
10
11
12
13
1 void lock(lock_t *lock) {
2 while (1) {
3 while (LoadLinked(&lock->flag) == 1)
4 ; // spin until it's zero
5 if (StoreConditional(&lock->flag, 1) == 1)
6 return; // if set-it-to-1 was a success: all done
7 // otherwise: try it all over again
8 }
9 }
10
11 void unlock(lock_t *lock) {
12 lock->flag = 0;
13 }

获取并增加

它能原子地返回特定地址的旧值,并且让该值自增一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1 int FetchAndAdd(int *ptr) {
2 int old = *ptr;
3 *ptr = old + 1;
4 return old;
5 }
1 typedef struct lock_t {
2 int ticket;
3 int turn;
4 } lock_t;
5
6 void lock_init(lock_t *lock) {
7 lock->ticket = 0;
8 lock->turn = 0;
9 }
10
11 void lock(lock_t *lock) {
12 int myturn = FetchAndAdd(&lock->ticket);
13 while (lock->turn != myturn)
14 ; // spin
15 }
16
17 void unlock(lock_t *lock) {
18 FetchAndAdd(&lock->turn);
19 }

如何让锁不会不必要地自旋,浪费 CPU 时间?

第一种简单友好的方法就是,在要自旋的时候,放弃 CPU。

1
2
3
4
5
6
7
8
9
10
11
12
1 void init() {
2 flag = 0;
3 }
4
5 void lock() {
6 while (TestAndSet(&flag, 1) == 1)
7 yield(); // give up the CPU
8 }
9
10 void unlock() {
11 flag = 0;
12 }

假定操作系统提供原语 yield(),线程可以调用它主动放弃 CPU,让其他线程运行。线程可以处于 3 种状态之一(运行、就绪和阻塞)。yield()系统调用能够让运行(running)态变为就绪(ready)态,从而允许其他线程运行。因此,让出线程本质上取消调度(deschedules)了它自己。

使用队列:休眠替代自旋

我们必须显式地施加某种控制,决定锁释放时,谁能抢到锁,我们需要操作系统的更多支持,并需要一个队列来保存等待锁的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1 typedef struct lock_t {
2 int flag;
3 int guard;
4 queue_t *q;
5 } lock_t;
6
7 void lock_init(lock_t *m) {
8 m->flag = 0;
9 m->guard = 0;
10 queue_init(m->q);
11 }
12
13 void lock(lock_t *m) {
14 while (TestAndSet(&m->guard, 1) == 1)
15 ; //acquire guard lock by spinning
16 if (m->flag == 0) {
17 m->flag = 1; // lock is acquired
18 m->guard = 0;
19 } else {
20 queue_add(m->q, gettid());
21 m->guard = 0;
22 park();
23 }
24 }
25
26 void unlock(lock_t *m) {
27 while (TestAndSet(&m->guard, 1) == 1)
28 ; //acquire guard lock by spinning
29 if (queue_empty(m->q))
30 m->flag = 0; // let go of lock; no one wants it
31 else
32 unpark(queue_remove(m->q)); // hold lock (for next thread!)
33 m->guard = 0;
34 }

基于锁的并发数据结构

并发计数器

定义了一个非并发的计数器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1 typedef struct counter_t {
2 int value;
3 } counter_t;
4
5 void init(counter_t *c) {
6 c->value = 0;
7 }
8
9 void increment(counter_t *c) {
10 c->value++;
11 }
12
13 void decrement(counter_t *c) {
14 c->value--;
15 }
16
17 int get(counter_t *c) {
18 return c->value;
19 }

如何让这段代码线程安全(thread safe)?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
1 typedef struct counter_t {
2 int value;
3 pthread_mutex_t lock;
4 } counter_t;
5
6 void init(counter_t *c) {
7 c->value = 0;
8 Pthread_mutex_init(&c->lock, NULL);
9 }
10
11 void increment(counter_t *c) {
12 Pthread_mutex_lock(&c->lock);
13 c->value++;
14 Pthread_mutex_unlock(&c->lock);
15 }
16
17 void decrement(counter_t *c) {
18 Pthread_mutex_lock(&c->lock);
19 c->value--;
20 Pthread_mutex_unlock(&c->lock);
21 }
22
23 int get(counter_t *c) {
24 Pthread_mutex_lock(&c->lock);
25 int rc = c->value;
26 Pthread_mutex_unlock(&c->lock);
27 return rc;
28 }

它只是加了一把锁,在调用函数操作该数据结构时获取锁,从调用返回时
释放锁。

并发链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
1 // basic node structure
2 typedef struct node_t {
3 int key;
4 struct node_t *next;
5 } node_t;
6
7 // basic list structure (one used per list)
8 typedef struct list_t {
9 node_t *head;
10 pthread_mutex_t lock;
11 } list_t;
12
13 void List_Init(list_t *L) {
14 L->head = NULL;
15 pthread_mutex_init(&L->lock, NULL);
16 }
17
18 int List_Insert(list_t *L, int key) {
19 pthread_mutex_lock(&L->lock);
20 node_t *new = malloc(sizeof(node_t));
21 if (new == NULL) {
22 perror("malloc");
23 pthread_mutex_unlock(&L->lock);
24 return -1; // fail
25 }
26 new->key = key;
27 new->next = L->head;
28 L->head = new;
29 pthread_mutex_unlock(&L->lock);
30 return 0; // success
31 }
32
33 int List_Lookup(list_t *L, int key) {
34 pthread_mutex_lock(&L->lock);
35 node_t *curr = L->head;
36 while (curr) {
37 if (curr->key == key) {
38 pthread_mutex_unlock(&L->lock);
39 return 0; // success
40 }
41 curr = curr->next;
42 }
43 pthread_mutex_unlock(&L->lock);
44 return -1; // failure
45 }

减少了代码中需要获取锁、释放锁的地方,降低了代码中不小心引入缺陷(诸如在返回前忘记释放锁)的可能性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1 void List_Init(list_t *L) {
2 L->head = NULL;
3 pthread_mutex_init(&L->lock, NULL);
4 }
5
6 void List_Insert(list_t *L, int key) {
7 // synchronization not needed
8 node_t *new = malloc(sizeof(node_t));
9 if (new == NULL) {
10 perror("malloc");
11 return;
12 }
13 new->key = key;
14
15 // just lock critical section
16 pthread_mutex_lock(&L->lock);
17 new->next = L->head;
18 L->head = new;
19 pthread_mutex_unlock(&L->lock);
20 }
21
22 int List_Lookup(list_t *L, int key) {
23 int rv = -1;
24 pthread_mutex_lock(&L->lock);
25 node_t *curr = L->head;
26 while (curr) {
27 if (curr->key == key) {
28 rv = 0;
29 break;
30 }
31 curr = curr->next;
32 }
33 pthread_mutex_unlock(&L->lock);
34 return rv; // now both success and failure
35 }

条件变量

锁并不是并发程序设计所需的唯一原语

在很多情况下,线程需要检查某一条件(condition)满足之后,才会继续运行。例如,父线程需要检查子线程是否执行完毕 [这常被称为 join()]。这种等待如何实现呢?

我们可以尝试用一个共享变量。这种解决方案一般能工作,但是效率低下,因为主线程会自旋检查,浪费 CPU 时间。我们希望有某种方式让父线程休眠,直到等待的条件满足(即子线程完成执行)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1 volatile int done = 0;
2
3 void *child(void *arg) {
4 printf("child\n");
5 done = 1;
6 return NULL;
7 }
8
9 int main(int argc, char *argv[]) {
10 printf("parent: begin\n");
11 pthread_t c;
12 Pthread_create(&c, NULL, child, NULL); // create child
13 while (done == 0)
14 ; // spin
15 printf("parent: end\n");
16 return 0;
17 }

定义和程序

线程可以使用条件变量(condition variable),来等待一个条件变成真。条件变量是一个显式队列,当某些执行状态(即条件,condition)不满足时,线程可以把自己加入队列,等待(waiting)该条件。当它改变了上述状态时,就可以唤醒一个或者多个等待线程(通过在该条件上发信号),让它们继续执行。

要声明这样的条件变量,只要像这样写:pthread_cond_t c;,这里声明 c 是一个条件变量(注意:还需要适当的初始化)。条件变量有两种相关操作:wait()和 signal()。线程要睡眠的时候,调用 wait()。当线程想唤醒等待在某个条件变量上的睡眠线程时,调用 signal()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m);
//&cond:条件变量(pthread_cond_t类型),用于等待和唤醒线程。
//&mutex:互斥锁(pthread_mutex_t类型),在进入睡眠状态之前会被自动释放,唤醒后会被重新获得。
pthread_cond_signal(pthread_cond_t *c);
1 int done = 0;
2 pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
3 pthread_cond_t c = PTHREAD_COND_INITIALIZER;
4
5 void thr_exit() {
6 Pthread_mutex_lock(&m);
7 done = 1;
8 Pthread_cond_signal(&c);
9 Pthread_mutex_unlock(&m);
10 }
11
12 void *child(void *arg) {
13 printf("child\n");
14 thr_exit();
15 return NULL;
16 }
17
18 void thr_join() {
19 Pthread_mutex_lock(&m);
20 while (done == 0)
21 Pthread_cond_wait(&c, &m);
22 Pthread_mutex_unlock(&m);
23 }
24
25 int main(int argc, char *argv[]) {
26 printf("parent: begin\n");
27 pthread_t p;
28 Pthread_create(&p, NULL, child, NULL);
29 thr_join();
30 printf("parent: end\n");
31 return 0;
32 }

我们常简称为 wait()和 signal()。你可能注意到一点,wait()调用有一个参数,它是互斥量。它假定在 wait()调用时,这个互斥量是已上锁状态。wait()的职责是释放锁,并让调用线程休眠(原子地)。当线程被唤醒时(在另外某个线程发信号给它后),它必须重新获取锁,再返回调用者。 这样复杂的步骤也是为了避免在线程陷入休眠时,产生一些竞态条件。

第一种情况是父线程创建出子线程,但自己继续运行(假设只有一个处理器),然后马上调用 thr_join()等待子线程。在这种情况下,它会先获取锁,检查子进程是否完成(还没有完成),然后调用 wait(),让自己休眠。子线程最终得以运行,打印出“child”,并调用 thr_exit()函数唤醒父进程,这段代码会在获得锁后设置状态变量 done,然后向父线程发信号唤醒它。最后,父线程会运行(从 wait()调用返回并持有锁),释放锁,打印出“parent:end”。

第二种情况是,子线程在创建后,立刻运行,设置变量 done 为 1,调用 signal 函数唤醒其他线程(这里没有其他线程),然后结束。父线程运行后,调用 thr_join()时,发现 done已经是 1 了,就直接返回。

1
2
3
4
5
6
7
8
9
10
11
1 void thr_exit() {
2 Pthread_mutex_lock(&m);
3 Pthread_cond_signal(&c);
4 Pthread_mutex_unlock(&m);
5 }
6
7 void thr_join() {
8 Pthread_mutex_lock(&m);
9 Pthread_cond_wait(&c, &m);
10 Pthread_mutex_unlock(&m);
11 }

这段代码是有问题的。假设子线程立刻运行,并且调用 thr_exit()。在这种情况下,子线程发送信号,但此时却没有在条件变量上睡眠等待的线程。父线程运行时,就会调用 wait并卡在那里,没有其他线程会唤醒它。

1
2
3
4
5
6
7
8
9
1 void thr_exit() {
2 done = 1;
3 Pthread_cond_signal(&c);
4 }
5
6 void thr_join() {
7 if (done == 0)
8 Pthread_cond_wait(&c);
9 }

这里的问题是一个微妙的竞态条件。具体来说,如果父进程调用 thr_join(),然后检查完done 的值为 0,然后试图睡眠。但在调用 wait 进入睡眠之前,父进程被中断。子线程修改变量 done 为 1,发出信号,同样没有等待线程。父线程再次运行时,就会长眠不醒,这就惨了。

提示:发信号时总是持有锁
尽管并不是所有情况下都严格需要,但有效且简单的做法,还是在使用条件变量发送信号时持有锁。虽然上面的例子是必须加锁的情况,但也有一些情况可以不加锁,而这可能是你应该避免的。因此,为了简单,请在调用 signal 时持有锁(hold the lock when calling signal)。
这个提示的反面,即调用 wait 时持有锁,不只是建议,而是 wait 的语义强制要求的。因为 wait 调用总是假设你调用它时已经持有锁、调用者睡眠之前会释放锁以及返回前重新持有锁。因此,这个提示的一般化形式是正确的:调用 signal 和 wait 时要持有锁(hold the lock when calling signal or wait),你会保持身心健康的。

条件变量和上述代码中的done

条件变量:用于通知父进程,子进程中的条件满足了
done:用于控制父子进程中代码的运行行数(if卡住某行,while在卡住某行时避免因为父子顺序而导致线程僵住。)

生产者/消费者(有界缓冲区)问题

假设有一个或多个生产者线程和一个或多个消费者线程。生产者把生成的数据项放入缓冲区;消费者从缓冲区取走数据项,以某种方式消费。

  1. 很多实际的系统中都会有这种场景。例如,在多线程的网络服务器中,一个生产者将HTTP 请求放入工作队列(即有界缓冲区),消费线程从队列中取走请求并处理。

  2. 我们在使用管道连接不同程序的输出和输入时,也会使用有界缓冲区,例如 grep foofile.txt | wc -l。这个例子并发执行了两个进程,grep 进程从 file.txt 中查找包括“foo”的行,写到标准输出;UNIX shell 把输出重定向到管道(通过 pipe 系统调用创建)。管道的另一端是 wc 进程的标准输入,wc 统计完行数后打印出结果。因此,grep 进程是生产者,wc 是进程是消费者,它们之间是内核中的有界缓冲区。

因为有界缓冲区是共享资源,所以我们必须通过同步机制来访问它,以免产生竞态条件。

首先需要一个共享缓冲区,让生产者放入数据,消费者取出数据。简单起见,我们就拿一个整数来做缓冲区(你当然可以想到用一个指向数据结构的指针来代替),两个内部函数将值放入缓冲区,从缓冲区取值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1 int buffer;
2 int count = 0; // initially, empty
3
4 void put(int value) {
5 assert(count == 0);
6 count = 1;
7 buffer = value;
8 }
9
10 int get() {
11 assert(count == 1);
12 count = 0;
13 return buffer;
14 }

put()函数会假设缓冲区是空的,把一个值存在缓冲区,然后把 count设置为 1 表示缓冲区满了。get()函数刚好相反,把缓冲区清空后(即将 count 设置为 0),并返回该值。

现在我们需要编写一些函数,知道何时可以访问缓冲区,以便将数据放入缓冲区或从缓冲区取出数据。条件是显而易见的:仅在 count 为 0 时(即缓冲器为空时),才将数据放入缓冲器中。仅在计数为 1 时(即缓冲器已满时),才从缓冲器获得数据。如果我们编写同步代码,让生产者将数据放入已满的缓冲区,或消费者从空的数据获取数据,就做错了(在这段代码中,断言将触发)。

这项工作将由两种类型的线程完成,其中一类我们称之为生产者(producer)线程,另一类我们称之为消费者(consumer)线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1 void *producer(void *arg) {
2 int i;
3 int loops = (int) arg;
4 for (i = 0; i < loops; i++) {
5 put(i);
6 }
7 }
8
9 void *consumer(void *arg) {
10 int i;
11 while (1) {
12 int tmp = get();
13 printf("%d\n", tmp);
14 }
15 }

假设只有一个生产者和一个消费者。显然,put()和 get()函数之中会有临界区,因为 put()更新缓冲区,get()读取缓冲区。但是,给代码加锁没有用,我们还需别的东西。不奇怪,别的东西就是某些条件变量。在这个(有问题的)首次尝试中(见图 30.6),我们用了条件变量 cond 和相关的锁 mutex。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1 cond_t cond;
2 mutex_t mutex;
3
4 void *producer(void *arg) {
5 int i;
6 for (i = 0; i < loops; i++) {
7 Pthread_mutex_lock(&mutex); // p1
8 if (count == 1) // p2
9 Pthread_cond_wait(&cond, &mutex); // p3
10 put(i); // p4
11 Pthread_cond_signal(&cond); // p5
12 Pthread_mutex_unlock(&mutex); // p6
13 }
14 }
15
16 void *consumer(void *arg) {
17 int i;
18 for (i = 0; i < loops; i++) {
19 Pthread_mutex_lock(&mutex); // c1
20 if (count == 0) // c2
21 Pthread_cond_wait(&cond, &mutex); // c3
22 int tmp = get(); // c4
23 Pthread_cond_signal(&cond); // c5
24 Pthread_mutex_unlock(&mutex); // c6
25 printf("%d\n", tmp);
26 }
27 }

我们来理解第一个问题,它与等待之前的 if 语句有关。假设有两个消费者(Tc1 和 Tc2),一个生产者(Tp)。首先,一个消费者(Tc1)先开始执行,它获得锁(c1),检查缓冲区是否可以消费(c2),然后等待(c3)(这会释放锁)。

接着生产者(Tp)运行。它获取锁(p1),检查缓冲区是否满(p2),发现没满就给缓冲区加入一个数字(p4)。然后生产者发出信号,说缓冲区已满(p5)。关键的是,这让第一个消费者(Tc1)不再睡在条件变量上,进入就绪队列。Tc1 现在可以运行(但还未运行)。生产者继续执行,直到发现缓冲区满后睡眠(p6,p1-p3)。

这时问题发生了:另一个消费者(Tc2)抢先执行,消费了缓冲区中的值(c1,c2,c4,c5,c6,跳过了 c3 的等待,因为缓冲区是满的)。现在假设 Tc1 运行,在从 wait 返回之前,它获取了锁,然后返回。然后它调用了 get() (p4),但缓冲区已无法消费!断言触发,代码不能像预期那样工作。显然,我们应该设法阻止 Tc1 去消费,因为 Tc2 插进来,消费了缓冲区中之前生产的一个值。
?
?

问题产生的原因很简单:在 Tc1 被生产者唤醒后,但在它运行之前,缓冲区的状态改变了(由于 Tc2)。发信号给线程只是唤醒它们,暗示状态发生了变化(在这个例子中,就是值已被放入缓冲区),但并不会保证在它运行之前状态一直是期望的情况。信号的这种释义常称为 Mesa 语义(Mesa semantic),为了纪念以这种方式建立条件变量的首次研究[LR80]。另一种释义是 Hoare 语义(Hoare semantic),虽然实现难度大,但是会保证被唤醒线程立刻执行[H74]。实际上,几乎所有系统都采用了 Mesa 语义。

较好但仍有问题的方案:使用 While 语句替代 If

把 if 语句改为 while。当消费者 Tc1 被唤醒后,立刻再次检查共享变量(c2)。如果缓冲区此时为空,消费者就会回去继续睡眠(c3)。生产者中相应的 if 也改为 while(p2)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
1 cond_t cond;
2 mutex_t mutex;
3
4 void *producer(void *arg) {
5 int i;
6 for (i = 0; i < loops; i++) {
7 Pthread_mutex_lock(&mutex);
// p1
8 while (count == 1)
// p2
9 Pthread_cond_wait(&cond, &mutex);
// p3
10 put(i);
// p4
11 Pthread_cond_signal(&cond);
// p5
12 Pthread_mutex_unlock(&mutex);
// p6
13 }
14 }
15
16 void *consumer(void *arg) {
17 int i;
18 for (i = 0; i < loops; i++) {
19 Pthread_mutex_lock(&mutex);
// c1
20 while (count == 0)
// c2
21 Pthread_cond_wait(&cond, &mutex);
// c3
22 int tmp = get();
// c4
23 Pthread_cond_signal(&cond);
// c5
24 Pthread_mutex_unlock(&mutex);
// c6
25 printf("%d\n", tmp);
26 }
27 }

由于 Mesa 语义,我们要记住一条关于条件变量的简单规则:总是使用 while 循环(always use while loop)。虽然有时候不需要重新检查条件,但这样做总是安全的。

都睡眠了怎么办?

假设两个消费者(Tc1 和 Tc2)先运行,都睡眠了(c3)。生产者开始运行,在缓冲区放入一个值,唤醒了一个消费者(假定是 Tc1),并开始睡眠。现在是一个消费者马上要运行(Tc1),两个线程(Tc2 和 生产者)都等待在同一个条件变量上。

消费者 Tc1 醒过来并从 wait()调用返回(c3),重新检查条件(c2),发现缓冲区是满的,消费了这个值(c4)。这个消费者然后在该条件上发信号(c5),唤醒一个在睡眠的线程。但是,应该唤醒哪个线程呢?

因为消费者已经清空了缓冲区,很显然,应该唤醒生产者。但是,如果它唤醒了 Tc2(这绝对是可能的,取决于等待队列是如何管理的),问题就出现了。具体来说,消费者 Tc2 会醒过来,发现队列为空(c2),又继续回去睡眠(c3)。生产者 Tp 刚才在缓冲区中放了一个值,现在在睡眠。另一个消费者线程 Tc1 也回去睡眠了。3 个线程都在睡眠,显然是一个缺陷。

?
?

消费者不应该唤醒消费者,而应该只唤醒生产者,反之亦然。

单值缓冲区的生产者/消费者方案

使用两个条件变量,而不是一个,以便正确地发出信号,在系统状态改变时,哪类线程应该唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1 cond_t empty, fill;
2 mutex_t mutex;
3
4 void *producer(void *arg) {
5 int i;
6 for (i = 0; i < loops; i++) {
7 Pthread_mutex_lock(&mutex);
8 while (count == 1)
9 Pthread_cond_wait(&empty, &mutex);
10 put(i);
11 Pthread_cond_signal(&fill);
12 Pthread_mutex_unlock(&mutex);
13 }
14 }
15
16 void *consumer(void *arg) {
17 int i;
18 for (i = 0; i < loops; i++) {
19 Pthread_mutex_lock(&mutex);
20 while (count == 0)
21 Pthread_cond_wait(&fill, &mutex);
22 int tmp = get();
23 Pthread_cond_signal(&empty);
24 Pthread_mutex_unlock(&mutex);
25 printf("%d\n", tmp);
26 }
27 }

在上述代码中,生产者线程等待条件变量 empty,发信号给变量 fill。相应地,消费者线程等待 fill,发信号给 empty。这样做,从设计上避免了上述第二个问题:消费者再也不会唤醒消费者,生产者也不会唤醒生产者。

最终的生产者/消费者方案

第一处修改是缓冲区结构本身,以及对应的 put()和 get()方法(见图 30.9)。我们还稍稍修改了生产者和消费者的检查条件,以便决定是否要睡眠。图 30.10 展示了最终的等待和信号逻辑。生产者只有在缓冲区满了的时候才会睡眠(p2),消费者也只有在队列为空的时候睡眠(c2)。至此,我们解决了生产者/消费者问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1 int buffer[MAX];
2 int fill = 0;
3 int use = 0;
4 int count = 0;
5
6 void put(int value) {
7 buffer[fill] = value;
8 fill = (fill + 1) % MAX;
9 count++;
10 }
11
12 int get() {
13 int tmp = buffer[use];
14 use = (use + 1) % MAX;
15 count--;
16 return tmp;
17 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
1 cond_t empty, fill;
2 mutex_t mutex;
3
4 void *producer(void *arg) {
5 int i;
6 for (i = 0; i < loops; i++) {
7 Pthread_mutex_lock(&mutex);
// p1
8 while (count == MAX)
// p2
9 Pthread_cond_wait(&empty, &mutex);
// p3
10 put(i);
// p4
11 Pthread_cond_signal(&fill);
// p5
12 Pthread_mutex_unlock(&mutex);
// p6
13 }
14 }
15
16 void *consumer(void *arg) {
17 int i;
18 for (i = 0; i < loops; i++) {
19 Pthread_mutex_lock(&mutex);
// c1
20 while (count == 0)
// c2
21 Pthread_cond_wait(&fill, &mutex);
// c3
22 int tmp = get();
// c4
23 Pthread_cond_signal(&empty);
// c5
24 Pthread_mutex_unlock(&mutex);
// c6
25 printf("%d\n", tmp);
26 }
27 }

覆盖条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
1 // how many bytes of the heap are free?
2 int bytesLeft = MAX_HEAP_SIZE;
3
4 // need lock and condition too
5 cond_t c;
6 mutex_t m;
7
8 void *
9 allocate(int size) {
10 Pthread_mutex_lock(&m);
11 while (bytesLeft < size)
12 Pthread_cond_wait(&c, &m);
13 void *ptr = ...; // get mem from heap
14 bytesLeft -= size;
15 Pthread_mutex_unlock(&m);
16 return ptr;
17 }
18
19 void free(void *ptr, int size) {
20 Pthread_mutex_lock(&m);
21 bytesLeft += size;
22 Pthread_cond_signal(&c); // whom to signal??
23 Pthread_mutex_unlock(&m);
24 }

当线程调用进入内存分配代码时,它可能会因为内存不足而等待。
相应的,线程释放内存时,会发信号说有更多内存空闲。但是,代码中有一个问题:应该唤醒哪个等待线程(可能有多个线程)?

解决方案也很直接:用 pthread_cond_broadcast()代替上述代码中的pthread_cond_signal(),唤醒所有的等待线程。

Lampson 和 Redell 把这种条件变量叫作覆盖条件(covering condition),因为它能覆盖所有需要唤醒线程的场景(保守策略)。成本如上所述,就是太多线程被唤醒。

一般来说,如果你发现程序只有改成广播信号时才能工作(但你认为不需要),可能是程序有缺陷。

小结

引入锁之外的另一个重要同步原语:条件变量。当某些程序状态不符合要求时,通过允许线程进入休眠状态,条件变量使我们能够漂亮地解决许多重要的同步问题,包括著名的(仍然重要的)生产者/消费者问题,以及覆盖条件。

信号量

Dijkstra 及其同事发明了信号量,作为与同步有关的所有工作的唯一原语。可以使用信号量作为锁和条件变量。

信号量的定义

信号量是有一个整数值的对象,可以用两个函数来操作它。在 POSIX 标准中,是sem_wait()和 sem_post()。因为信号量的初始值能够决定其行为,所以首先要初始化信号量,才能调用其他函数与之交互。

初始化信号量

1
2
3
1 #include <semaphore.h>
2 sem_t s;
3 sem_init(&s, 0, 1);

历史上,sem_wait()开始被 Dijkstra 称为 P()(代指荷兰语单词“to probe”),而 sem_post()被称为 V()(代指荷兰语单词“to test”)。有时候,人们也会称它们为下(down)和上(up)。

其中申明了一个信号量 s,通过第三个参数,将它的值初始化为 1。sem_init()的第二个参数,在我们看到的所有例子中都设置为 0,表示信号量是在同一进程的多个线程共享的。

信号量初始化之后,我们可以调用 sem_wait()或 sem_post()与之交互。多个线程会调用 sem_wait()和 sem_post(),显然需要管理这些临界区。

1
2
3
4
5
6
7
8
9
1 int sem_wait(sem_t *s) {
2 decrement the value of semaphore s by one
3 wait if value of semaphore s is negative
4 }
5
6 int sem_post(sem_t *s) {
7 increment the value of semaphore s by one
8 if there are one or more threads waiting, wake one
9 }

首先,sem_wait()要么立刻返回(调用 sem_wait()时,信号量的值大于等于 1),要么会让调用线程挂起,直到之后的一个 post 操作。当然,也可能多个调用线程都调用 sem_wait(),因此都在队列中等待被唤醒。

其次,sem_post()并没有等待某些条件满足。它直接增加信号量的值,如果有等待线程,唤醒其中一个。

最后,当信号量的值为负数时,这个值就是等待线程的个数。

二值信号量(锁)

信号量的第一种用法是我们已经熟悉的:用信号量作为锁。
我们直接把临界区用一对 sem_wait()/sem_post()环绕。但是,为了使这段代码正常工作,信号量 m 的初始值(图中初始化为 X)是至关重要的。X 应该是1.

1
2
3
4
5
6
1 sem_t m;
2 sem_init(&m, 0, X); // initialize semaphore to X; what should X be?
3
4 sem_wait(&m);
5 // critical section here
6 sem_post(&m);

假设有两个线程的场景。第一个线程(线程 0)调用了 sem_wait(),它把信号量的值减为 0。然后,它只会在值小于 0 时等待。因为值是 0,调用线程从函数返回并继续,线程 0 现在可以自由进入临界区。线程 0 在临界区中,如果没有其他线程尝试获取锁,当它调用 sem_post()时,会将信号量重置为 1(因为没有等待线程,不会唤醒其他线程)。

?

如果线程 0 持有锁(即调用了 sem_wait()之后,调用 sem_post()之前),另一个线程(线程 1)调用 sem_wait()尝试进入临界区,那么更有趣的情况就发生了。这种情况下,线程 1把信号量减为−1,然后等待(自己睡眠,放弃处理器)。线程 0 再次运行,它最终调用sem_post(),将信号量的值增加到 0,唤醒等待的线程(线程 1),然后线程 1 就可以获取锁。线程 1 执行结束时,再次增加信号量的值,将它恢复为 1。

当线程 1 尝试获取已经被持有的锁时,陷入睡眠。只有线程 0 再次运行之后,线程 1 才可能会唤醒并继续运行。
?

因为锁只有两个状态(持有和没持有),所以这种用法有时也叫作二值信号量(binary semaphore)。

信号量用作条件变量

信号量也可以用在一个线程暂停执行,等待某一条件成立的场景。

例如,一个线程要等待一个链表非空,然后才能删除一个元素。在这种场景下,通常一个线程等待条件成立,另外一个线程修改条件并发信号给等待线程,从而唤醒等待线程。因为等待线程在等待某些条件(condition)发生变化,所以我们将信号量作为条件变量(condition variable)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1 sem_t s;
2
3 void *
4 child(void *arg) {
5 printf("child\n");
6 sem_post(&s); // signal here: child is done
7 return NULL;
8 }
9
10 int
11 main(int argc, char *argv[]) {
12 sem_init(&s, 0, X); // what should X be?
13 printf("parent: begin\n");
14 pthread_t c;
15 Pthread_create(c, NULL, child, NULL);
16 sem_wait(&s); // wait here for child
17 printf("parent: end\n");
18 return 0;
19 }

信号量初始值应该是 0。有两种情况需要考虑。第一种,父线程创建了子线程,但是子线程并没有运行。这种情况下(见表 31.3),父线程调用 sem_wait()会先于子线程调用 sem_post()。我们希望父线程等待子线程运行。为此,唯一的办法是让信号量的值不大于 0。因此,0 为初值。父线程运行,将信号量减为−1,然后睡眠等待;子线程运行的时候,调用 sem_post(),信号量增加为 0,唤醒父线程,父线程然后从 sem_wait()返回,完成该程序。

?

第二种情况是子线程在父线程调用 sem_wait()之前就运行结束(见表 31.4)。在这种情况下,子线程会先调用 sem_post(),将信号量从 0 增加到 1。然后当父线程有机会运行时,会调用sem_wait(),发现信号量的值为 1。于是父线程将信号量从 1 减为 0,没有等待,直接从sem_wait()返回,也达到了预期效果。

?

生产者/消费者(有界缓冲区)问题

第一次尝试

第一次尝试解决该问题时,我们用两个信号量 empty 和 full 分别表示缓冲区空或者满。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1 int buffer[MAX];
2 int fill = 0;
3 int use = 0;
4
5 void put(int value) {
6 buffer[fill] = value; // line f1
7 fill = (fill + 1) % MAX; // line f2
8 }
9
10 int get() {
11 int tmp = buffer[use]; // line g1
12 use = (use + 1) % MAX; // line g2
13 return tmp;
14 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1 sem_t empty;
2 sem_t full;
3
4 void *producer(void *arg) {
5 int i;
6 for (i = 0; i < loops; i++) {
7 sem_wait(&empty);
// line P1
8 put(i);
// line P2
9 sem_post(&full);
// line P3
10 }
11 }
12
13 void *consumer(void *arg) {
14 int i, tmp = 0;
15 while (tmp != -1) {
16 sem_wait(&full);
// line C1
17 tmp = get();
// line C2
18 sem_post(&empty);
// line C3
19 printf("%d\n", tmp);
20 }
21 }
22
23 int main(int argc, char *argv[]) {
24 // ...
25 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with...
26 sem_init(&full, 0, 0); // ... and 0 are full
27 // ...
28 }

假设 MAX=1
假设有两个线程,一个生产者和一个消费者。我们来看在一个 CPU 上的具体场景。消费者先运行,执行到 C1 行,调用 sem_wait(&full)。因为 full 初始值为 0,wait 调用会将 full减为−1,导致消费者睡眠,等待另一个线程调用 sem_post(&full),符合预期。

假设生产者然后运行。执行到 P1 行,调用 sem_wait(&empty)。不像消费者,生产者将继续执行,因为 empty 被初始化为 MAX(在这里是 1)。因此,empty 被减为 0,生产者向缓冲区中加入数据,然后执行 P3 行,调用 sem_post(&full),把 full 从−1 变成 0,唤醒消费者(即将它从阻塞变成就续)。

在这种情况下,可能会有两种情况。如果生产者继续执行,再次循环到 P1 行,由于 empty值为 0,它会阻塞。如果生产者被中断,而消费者开始执行,调用 sem_wait(&full)(c1 行),发现缓冲区确实满了,消费它。这两种情况都是符合预期的。

假设两个生产者(Pa 和 Pb)几乎同时调用 put()。当 Pa 先运行,在 f1 行先加入第一条数据(fill=0),假设 Pa 在将 fill 计数器更新为 1 之前被中断,Pb开始运行,也在 f1 行给缓冲区的 0 位置加入一条数据,这意味着那里的老数据被覆盖!这可不行,我们不能让生产者的数据丢失。

解决方案:增加互斥

向缓冲区加入元素和增加缓冲区的索引是临界区,需要小心保护起来。所以,我们使用二值信号量来增加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1 sem_t empty;
2 sem_t full;
3 sem_t mutex;
4
5 void *producer(void *arg) {
6 int i;
7 for (i = 0; i < loops; i++) {
8 sem_wait(&mutex);
// line p0 (NEW LINE)
9 sem_wait(&empty);
// line p1
10 put(i);
// line p2
11 sem_post(&full);
// line p3
12 sem_post(&mutex);
// line p4 (NEW LINE)
13 }
14 }
15
16 void *consumer(void *arg) {
17 int i;
18 for (i = 0; i < loops; i++) {
19 sem_wait(&mutex);
// line c0 (NEW LINE)
20 sem_wait(&full);
// line c1
21 int tmp = get();
// line c2
22 sem_post(&empty);
// line c3
23 sem_post(&mutex);
// line c4 (NEW LINE)
24 printf("%d\n", tmp);
25 }
26 }
27
28 int main(int argc, char *argv[]) {
29 // ...
30 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with...
31 sem_init(&full, 0, 0); // ... and 0 are full
32 sem_init(&mutex, 0, 1); // mutex=1 because it is a lock (NEW LINE)
33 // ...
34 }

为什么会发生死锁?

避免死锁

假设有两个线程,一个生产者和一个消费者。消费者首先运行,获得锁(c0 行),然后对 full 信号量执行 sem_wait() (c1 行)。因为还没有数据,所以消费者阻塞,让出 CPU。但是,重要的是,此时消费者仍然持有锁。

然后生产者运行。假如生产者能够运行,它就能生产数据并唤醒消费者线程。遗憾的是,它首先对二值互斥信号量调用 sem_wait()(p0 行)。锁已经被持有,因此生产者也被卡住。

这里出现了一个循环等待。消费者持有互斥量,等待在 full 信号量上。生产者可以发送full 信号,却在等待互斥量。因此,生产者和消费者互相等待对方——典型的死锁。

最后,可行的方案

把获取和释放互斥量的操作调整为紧挨着临界区,把 full、empty 的唤醒和等待操作调整到锁外面。结果得到了简单而有效的有界缓冲区,多线程程序的常用模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
1 sem_t empty;
2 sem_t full;
3 sem_t mutex;
4
5 void *producer(void *arg) {
6 int i;
7 for (i = 0; i < loops; i++) {
8 sem_wait(&empty);
// line p1
9 sem_wait(&mutex);
// line p1.5 (MOVED MUTEX HERE...)
10 put(i);
// line p2
11 sem_post(&mutex);
// line p2.5 (... AND HERE)
12 sem_post(&full);
// line p3
13 }
14 }
15
16 void *consumer(void *arg) {
17 int i;
18 for (i = 0; i < loops; i++) {
19 sem_wait(&full);
// line c1
20 sem_wait(&mutex);
// line c1.5 (MOVED MUTEX HERE...)
21 int tmp = get();
// line c2
22 sem_post(&mutex);
// line c2.5 (... AND HERE)
23 sem_post(&empty);
// line c3
24 printf("%d\n", tmp);
25 }
26 }
27
28 int main(int argc, char *argv[]) {
29 // ...
30 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with...
31 sem_init(&full, 0, 0); // ... and 0 are full
32 sem_init(&mutex, 0, 1); // mutex=1 because it is a lock
33 // ...
34 }

读者—写者锁

不同的数据结构访问可能需要不同类型的锁。

例如,一个并发链表有很多插入和查找操作。插入操作会修改链表的状态(因此传统的临界区有用),而查找操作只是读取该结构,只要没有进行插入操作,我们可以并发的执行多个查找操作。

如果某个线程要更新数据结构,需要调用 rwlock_acquire_lock()获得写锁,调用 rwlock_release_writelock()释放锁。内部通过一个 writelock 的信号量保证只有一个写者能获得锁进入临界区,从而更新数据结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
1 typedef struct _rwlock_t {
2 sem_t lock; // binary semaphore (basic lock)
3 sem_t writelock; // used to allow ONE writer or MANY readers
4 int readers; // count of readers reading in critical section
5 } rwlock_t;
6
7 void rwlock_init(rwlock_t *rw) {
8 rw->readers = 0;
9 sem_init(&rw->lock, 0, 1);
10 sem_init(&rw->writelock, 0, 1);
11 }
12
13 void rwlock_acquire_readlock(rwlock_t *rw) {
14 sem_wait(&rw->lock);
15 rw->readers++;
16 if (rw->readers == 1)
17 sem_wait(&rw->writelock); // first reader acquires writelock
18 sem_post(&rw->lock);
19 }
20
21 void rwlock_release_readlock(rwlock_t *rw) {
22 sem_wait(&rw->lock);
23 rw->readers--;
24 if (rw->readers == 0)
25 sem_post(&rw->writelock); // last reader releases writelock
26 sem_post(&rw->lock);
27 }
28
29 void rwlock_acquire_writelock(rwlock_t *rw) {
30 sem_wait(&rw->writelock);
31 }
32
33 void rwlock_release_writelock(rwlock_t *rw) {
34 sem_post(&rw->writelock);
35 }

获取读锁时,读者首先要获取 lock,然后增加 reader变量,追踪目前有多少个读者在访问该数据结构。重要的步骤然后在 rwlock_acquire_readlock()内发生,当第一个读者获取该锁时。在这种情况下,读者也会获取写锁,即在 writelock 信号量上调用 sem_wait(),最后调用 sem_post()释放 lock。

一旦一个读者获得了读锁,其他的读者也可以获取这个读锁。但是,想要获取写锁的线程,就必须等到所有的读者都结束。最后一个退出的写者在“writelock”信号量上调用sem_post(),从而让等待的写者能够获取该锁。

读者很容易饿死写者。有写者等待时,如何能够避免更多的读者进入并持有锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
typedef struct _rwlock_t {
sem_t lock; // binary semaphore (basic lock)
sem_t writelock; // used to allow ONE writer or MANY readers
int readers; // count of readers reading in critical section
int writers_waiting; // count of writers waiting for the write lock
} rwlock_t;

void rwlock_init(rwlock_t *rw) {
rw->readers = 0;
rw->writers_waiting = 0;
sem_init(&rw->lock, 0, 1);
sem_init(&rw->writelock, 0, 1);
}

void rwlock_acquire_readlock(rwlock_t *rw) {
sem_wait(&rw->lock);
rw->readers++;
if (rw->readers == 1)
sem_wait(&rw->writelock); // first reader acquires writelock
sem_post(&rw->lock);
}

void rwlock_release_readlock(rwlock_t *rw) {
sem_wait(&rw->lock);
rw->readers--;
if (rw->readers == 0)
sem_post(&rw->writelock); // last reader releases writelock
sem_post(&rw->lock);
}

void rwlock_acquire_writelock(rwlock_t *rw) {
sem_wait(&rw->lock);
rw->writers_waiting++;
while (rw->readers > 0 || sem_trywait(&rw->writelock) != 0) {
sem_post(&rw->lock);
sem_wait(&rw->writelock);
sem_wait(&rw->lock);
}
rw->writers_waiting--;
sem_post(&rw->lock);
}

void rwlock_release_writelock(rwlock_t *rw) {
sem_post(&rw->writelock);
}

哲学家就餐问题

?
假定有 5 位“哲学家”围着一个圆桌。每两位哲学家之间有一把餐叉(一共 5 把)。哲学家有时要思考一会,不需要餐叉;有时又要就餐。而一位哲学家只有同时拿到了左手边和右手边的两把餐叉,才能吃到东西。

每个哲学家的基本循环:

1
2
3
4
5
6
while (1) {
think();
getforks();
eat();
putforks();
}

如何实现 getforks()和 putforks()函数,保证没有死锁,没有哲学家饿死,并且并发度更高(尽可能让更多哲学家同时吃东西)。

1
2
int left(int p) { return p; }
int right(int p) { return (p + 1) % 5; }

如果哲学家 p 希望用左手边的叉子,他们就调用 left(p)。类似地,右手边的叉子就用right(p)。模运算解决了最后一个哲学家(p = 4)右手边叉子的编号问题,就是餐叉 0。需要一些信号量来解决这个问题。假设需要 5 个,每个餐叉一个:sem_t forks[5]。

有问题的解决方案

假设我们把每个信号量(在 fork 数组中)都用 1 初始化。

1
2
3
4
5
6
7
8
9
1 void getforks() {
2 sem_wait(forks[left(p)]);
3 sem_wait(forks[right(p)]);
4 }
5
6 void putforks() {
7 sem_post(forks[left(p)]);
8 sem_post(forks[right(p)]);
9 }

问题是死锁(deadlock)。假设每个哲学家都拿到了左手边的餐叉,他们每个都会阻塞住,并且一直等待另一个餐叉。具体来说,哲学家 0 拿到了餐叉 0,哲学家 1 拿到了餐叉 1,哲学家 2 拿到餐叉 2,哲学家 3 拿到餐叉 3,哲学家 4 拿到餐叉 4。所有的餐叉都被占有了,所有的哲学家都阻塞着,并且等待另一个哲学家占有的餐叉。

一种方案:破除依赖

修改某个或者某些哲学家的取餐叉顺序

1
2
3
4
5
6
7
8
9
1 void getforks() {
2 if (p == 4) {
3 sem_wait(forks[right(p)]);
4 sem_wait(forks[left(p)]);
5 } else {
6 sem_wait(forks[left(p)]);
7 sem_wait(forks[right(p)]);
8 }
9 }

如何实现信号量

我们用底层的同步原语(锁和条件变量),来实现自己的信号量,名字叫作Zemaphore。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
1 typedef struct _Zem_t {
2 int value;
3 pthread_cond_t cond;
4 pthread_mutex_t lock;
5 } Zem_t;
6
7 // only one thread can call this
8 void Zem_init(Zem_t *s, int value) {
9 s->value = value;
10 Cond_init(&s->cond);
11 Mutex_init(&s->lock);
12 }
13
14 void Zem_wait(Zem_t *s) {
15 Mutex_lock(&s->lock);
16 while (s->value <= 0)
17 Cond_wait(&s->cond, &s->lock);
18 s->value--;
19 Mutex_unlock(&s->lock);
20 }
21
22 void Zem_post(Zem_t *s) {
23 Mutex_lock(&s->lock);
24 s->value++;
25 Cond_signal(&s->cond);
26 Mutex_unlock(&s->lock);
27 }

常见并发问题

有哪些类型的缺陷

研究集中在 4 个重要的开源应用:MySQL(流行的数据库管理系统)、Apache(著名的Web 服务器)、Mozilla(著名的 Web 浏览器)和 OpenOffice(微软办公套件的开源版本)。
?

非死锁缺陷

违反原子性(atomicity violation)缺陷和错误顺序(order violation)缺陷。

违反原子性缺陷

这是一个 MySQL 中出现的例子。

1
2
3
4
5
6
7
8
9
1 Thread 1::
2 if (thd->proc_info) {
3 ...
4 fputs(thd->proc_info, ...);
5 ...
6 }
7
8 Thread 2::
9 thd->proc_info = NULL;

这个例子中,两个线程都要访问 thd 结构中的成员 proc_info。第一个线程检查 proc_info非空,然后打印出值;第二个线程设置其为空。显然,当第一个线程检查之后,在 fputs()调用之前被中断,第二个线程把指针置为空;当第一个线程恢复执行时,由于引用空指针,导致程序奔溃。

违反原子性的定义是:“违反了多次内存访问中预期的可串行性(即代码段本意是原子的,但在执行中并没有强制实现原子性)”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1 pthread_mutex_t proc_info_lock = PTHREAD_MUTEX_INITIALIZER;
2
3 Thread 1::
4 pthread_mutex_lock(&proc_info_lock);
5 if (thd->proc_info) {
6 ...
7 fputs(thd->proc_info, ...);
8 ...
9 }
10 pthread_mutex_unlock(&proc_info_lock);
11
12 Thread 2::
13 pthread_mutex_lock(&proc_info_lock);
14 thd->proc_info = NULL;
15 pthread_mutex_unlock(&proc_info_lock);

违反顺序缺陷

另一种常见的非死锁问题叫作违反顺序(order violation)。

1
2
3
4
5
6
7
8
9
10
11
12
13
1 Thread 1::
2 void init() {
3 ...
4 mThread = PR_CreateThread(mMain, ...);
5 ...
6 }
7
8 Thread 2::
9 void mMain(...) {
10 ...
11 mState = mThread->State;
12 ...
13 }

线程 2 的代码中似乎假定变量 mThread 已经被初始化了(不为空)。然而,如果线程 1 并没有首先执行,线程 2 就可能因为引用空指针奔溃(假设 mThread初始值为空;否则,可能会产生更加奇怪的问题,因为线程 2 中会读到任意的内存位置并引用)。

违反顺序更正式的定义是:“两个内存访问的预期顺序被打破了(即 A 应该在 B 之前执行,但是实际运行中却不是这个顺序)”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
1 pthread_mutex_t mtLock = PTHREAD_MUTEX_INITIALIZER;
2 pthread_cond_t mtCond = PTHREAD_COND_INITIALIZER;
3 int mtInit = 0;
4
5 Thread 1::
6 void init() {
7 ...
8 mThread = PR_CreateThread(mMain, ...);
9
10 // signal that the thread has been created...
11 pthread_mutex_lock(&mtLock);
12 mtInit = 1;
13 pthread_cond_signal(&mtCond);
14 pthread_mutex_unlock(&mtLock);
15 ...
16 }
17
18 Thread 2::
19 void mMain(...) {
20 ...
21 // wait for the thread to be initialized...
22 pthread_mutex_lock(&mtLock);
23 while (mtInit == 0)
24 pthread_cond_wait(&mtCond, &mtLock);
25 pthread_mutex_unlock(&mtLock);
26
27 mState = mThread->State;
//mState = mThread->State; 在锁外部。这是因为这段代码没有修改任何共享资源或数据,所以没有必要在锁内部执行。
28 ...
29 }

在这段修复的代码中,我们增加了一个锁(mtLock)、一个条件变量(mtCond)以及状态的变量(mtInit)。初始化代码运行时,会将 mtInit 设置为 1,并发出信号表明它已做了这件事。如果线程 2 先运行,就会一直等待信号和对应的状态变化;如果后运行,线程 2 会检查是否初始化(即 mtInit 被设置为 1),然后正常运行。请注意,我们可以用 mThread 本身作为状态变量,但为了简洁,我们没有这样做。当线程之间的顺序很重要时,条件变量(或信号量)能够解决问题。

死锁缺陷

当线程 1 持有锁 L1,正在等待另外一个锁 L2,而线程 2 持有锁 L2,却在等待锁 L1 释放时,死锁就产生了。

1
2
3
Thread 1: Thread 2:
lock(L1); lock(L2);
lock(L2); lock(L1);

这段代码运行时,不是一定会出现死锁的。当线程 1 占有锁 L1,上下文切换到线程 2。线程 2 锁住 L2,试图锁住 L1。这时才产生了死锁,两个线程互相等待。如图 32.1 所示,其中的圈(cycle)表明了死锁。
?

为什么发生死锁

只要线程 1 和线程 2 都用相同的抢锁顺序,死锁就不会发生。那么,死锁为什么还会发生?

其中一个原因是在大型的代码库里,组件之间会有复杂的依赖。以操作系统为例。虚拟内存系统在需要访问文件系统才能从磁盘读到内存页;文件系统随后又要和虚拟内存交互,去申请一页内存,以便存放读到的块。因此,在设计大型系统的锁机制时,你必须要仔细地去避免循环依赖导致的死锁。

另一个原因是封装(encapsulation)。软件开发者一直倾向于隐藏实现细节,以模块化的方式让软件开发更容易。然而,模块化和锁不是很契合。

产生死锁的条件

互斥:线程对于需要的资源进行互斥的访问(例如一个线程抢到锁)。

持有并等待:线程持有了资源(例如已将持有的锁),同时又在等待其他资源(例如,需要获得的锁)。

非抢占:线程获得的资源(例如锁),不能被抢占。

循环等待:线程之间存在一个环路,环路上每个线程都额外持有一个资源,而这个资源又是下一个线程要申请的。

如果这 4 个条件的任何一个没有满足,死锁就不会产生。

预防

循环等待

获取锁时提供一个全序(total ordering)。假如系统共有两个锁(L1 和 L2),那么我们每次都先申请 L1 然后申请 L2,就可以避免死锁。这样严格的顺序避免了循环等待,也就不会产生死锁。

更复杂的系统中不会只有两个锁,锁的全序可能很难做到。因此,偏序(partial ordering)可能是一种有用的方法,安排锁的获取并避免死锁。Linux 中的内存映射代码就是一个偏序锁的好例子。代码开头的注释表明了 10 组不同的加锁顺序,包括简单的关系,比如 i_mutex 早于 i_mmap_mutex,也包括复杂的关系,比如 i_mmap_mutex 早于private_lock,早于 swap_lock,早于 mapping->tree_lock。

提示:通过锁的地址来强制锁的顺序
为了避免这种特殊问题,聪明的程序员根据锁的地址作为获取锁的顺序。按照地址从高到低,或者从低到高的顺序加锁,do_something()函数就可以保证不论传入参数是什么顺序,函数都会用固定的顺序加锁。具体的代码如下

1
2
3
4
5
6
7
8
if (m1 > m2) { // grab locks in high-to-low address order
pthread_mutex_lock(m1);
pthread_mutex_lock(m2);
} else {
pthread_mutex_lock(m2);
pthread_mutex_lock(m1);
}
// Code assumes that m1 != m2 (it is not the same lock)

持有并等待

死锁的持有并等待条件,可以通过原子地抢锁来避免。

1
2
3
4
5
1 lock(prevention);
2 lock(L1);
3 lock(L2);
4 ...
5 unlock(prevention);

先抢到 prevention 这个锁之后,代码保证了在抢锁的过程中,不会有不合时宜的线程切换,从而避免了死锁。当然,这需要任何线程在任何时候抢占锁时,先抢到全局的 prevention锁。例如,如果另一个线程用不同的顺序抢锁 L1 和 L2,也不会有问题,因为此时,线程已经抢到了 prevention 锁。

非抢占

在调用 unlock 之前,都认为锁是被占有的,多个抢锁操作通常会带来麻烦,因为我们等待一个锁时,同时持有另一个锁。很多线程库提供更为灵活的接口来避免这种情况。具体来说,trylock()函数会尝试获得锁,或者返回−1,表示锁已经被占有。

1
2
3
4
5
6
1 top:
2 lock(L1);
3 if (trylock(L2) == -1) {
4 unlock(L1);
5 goto top;
6 }

注意,另一个线程可以使用相同的加锁方式,但是不同的加锁顺序(L2 然后 L1),程序仍然不会产生死锁。但是会引来一个新的问题:活锁(livelock)。两个线程有可能一直重复这一序列,又同时都抢锁失败。这种情况下,系统一直在运行这段代码(因此不是死锁),但是又不会有进展,因此名为活锁。也有活锁的解决方法:例如,可以在循环结束
的时候,先随机等待一个时间,然后再重复整个动作,这样可以降低线程之间的重复互相干扰。

互斥

通过强大的硬件指令,我们可以构造出不需要锁的数据结构。
比较并交换(compare-and-swap)指令,是一种由硬件提供的原子指令

1
2
3
4
5
6
7
1 int CompareAndSwap(int *address, int expected, int new) {
2 if (*address == expected) {
3 *address = new;
4 return 1; // success
5 }
6 return 0; // failure
7 }

假定我们想原子地给某个值增加特定的数量。我们可以这样实现:

1
2
3
4
5
1 void AtomicIncrement(int *value, int amount) {
2 do {
3 int old = *value;
4 } while (CompareAndSwap(value, old, old + amount) == 0);
5 }

考虑一个更复杂的例子:链表插入。这是在链表头部插入元素的代码:

1
2
3
4
5
6
7
1 void insert(int value) {
2 node_t *n = malloc(sizeof(node_t));
3 assert(n != NULL);
4 n->value = value;
5 n->next = head;
6 head = n;
7 }

我们可以通过给相关代码加锁,来解决这个问题:

1
2
3
4
5
6
7
8
9
1 void insert(int value) {
2 node_t *n = malloc(sizeof(node_t));
3 assert(n != NULL);
4 n->value = value;
5 lock(listlock); // begin critical section
6 n->next = head;
7 head = n;
8 unlock(listlock); // end of critical section
9 }

用比较并交换指令(compare-and-swap)来实现插入操作。一种可能的实现是:

1
2
3
4
5
6
7
8
1 void insert(int value) {
2 node_t *n = malloc(sizeof(node_t));
3 assert(n != NULL);
4 n->value = value;
5 do {
6 n->next = head;
7 } while (CompareAndSwap(&head, n->next, n) == 0);
8 }

首先把 next 指针指向当前的链表头(head),然后试着把新节点交换到链表头。但是,如果此时其他的线程成功地修改了 head 的值,这里的交换就会失败,导致这个线程根据新的 head 值重试。

通过调度避免死锁

除了死锁预防,某些场景更适合死锁避免(avoidance)。
我们需要了解全局的信息,包括不同线程在运行中对锁的需求情况,从而使得后续的调度能够避免产生死锁。
?
一种比较聪明的调度方式是,只要 T1 和 T2 不同时运行,就不会产生死锁。下面就是这种方式:
?
请注意,T3 和 T1 重叠,或者和 T2 重叠都是可以的。虽然 T3 会抢占锁 L2,但是由于它只用到一把锁,和其他线程并发执行都不会产生死锁。

另一个竞争更多的例子。在这个例子中,对同样的资源(又是锁 L1 和 L2)有更多的竞争。
?
线程 T1、T2 和 T3 执行过程中,都需要持有锁 L1 和 L2。下面是一种不会产生死锁的可行方案:
?
这种保守的静态方案会明显增加完成任务的总时间。尽管有可能并发运行这些任务,但为了避免死锁,我们没有这样做,付出了性能的代价。

检查和恢复

很多数据库系统使用了死锁检测和恢复技术。死锁检测器会定期运行,通过构建资源图来检查循环。当循环(死锁)发生时,系统需要重启。如果还需要更复杂的数据结构相关的修复,那么需要人工参与。