锁的基本原理(3)

如何降低无意义的自旋损耗?

前两节, 我们实现了自旋锁, 然而自旋锁很拉跨, 它会占用线程的执行时间去做没有意义的自旋操作。也就是说, 自旋属于没有意义的性能损耗, 如果可以, 我们更希望内核此时停止调度这个线程, 转而去执行其它线程, 等待锁空闲的时候, 再回来调度此线程, 借此提高cpu利用率

实现这种锁, 需要用来内核提供的系统调用设施futex(fast user mutex), 这种系统调用需要用户态和内核态相互协调, 实现高性能锁

futex

futex只是实现了陷入内核休眠与唤醒机制, 真正的锁还是要通过cas操作来做, 我们仍然需要在用户态代码写cas原子操作来进行抢锁, 下面先展示cas+futex实现的互斥锁, 这种锁很公平, 且不会发生”惊群”问题。我们先展示代码, 再一一讲解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
atomic_int lock;

void mutex_lock(atomic_int *lock) {
while(1) {
int expected = 0;
if(atomic_compare_exchange_strong(lock, &expected, 1)) {
break;
}
syscall(SYS_futex, lock, FUTEX_WAIT, 0, NULL, NULL, 0);
}
}

void mutex_unlock(atomic_int *lock) {
atomic_store(lock, 0);
syscall(SYS_futex, lock, FUTEX_WAKE, 1, NULL, NULL, 0);
}

int share_variable = 0;

void *new_routine(void*) {
for (size_t i = 0; i < 20000; i++)
{
// 自旋上锁
mutex_lock(&lock);

share_variable ++;

// 解锁
mutex_unlock(&lock);
}
}

int main() {
pthread_t pid1, pid2, pid3, pid4, pid5, pid6, pid7, pid8;

pthread_create(&pid1, NULL, new_routine, NULL);
pthread_create(&pid2, NULL, new_routine, NULL);
pthread_create(&pid3, NULL, new_routine, NULL);
pthread_create(&pid4, NULL, new_routine, NULL);
pthread_create(&pid5, NULL, new_routine, NULL);
pthread_create(&pid6, NULL, new_routine, NULL);
pthread_create(&pid7, NULL, new_routine, NULL);
pthread_create(&pid8, NULL, new_routine, NULL);


void *ret;
pthread_join(pid1, &ret);
pthread_join(pid2, &ret);
pthread_join(pid3, &ret);
pthread_join(pid4, &ret);
pthread_join(pid5, &ret);
pthread_join(pid6, &ret);
pthread_join(pid7, &ret);
pthread_join(pid8, &ret);



printf("%d\n", share_variable);
}

首先是futex系统调用的调用方式:

1
2
3
long syscall(SYS_futex, uint32_t *uaddr, int futex_op, uint32_t val,
const struct timespec *timeout, /* or: uint32_t val2 */
uint32_t *uaddr2, uint32_t val3);

其中futex_op指定了要进行的操作, FUTEX_WAIT代表陷入等待, FUTEX_WAKE代表唤醒, 我们没有用到后面的三个参数, 故不做解释

其中WAIT操作在内核中的操作是: 首先用原子load测试addr指向的值是否等于val。如果等于, 那么就进入睡眠。如果不等于, 那么此系统调用返回EAGAIN。为什么要测试相等呢, 明明之前没拿到锁, 就不能让我痛痛快快地休眠吗? 这是为了防止”丢失唤醒”的情况发生。下面的时序图展示了如果不进行相等测试的”丢失唤醒”的例子

1
2
3
4
5
6
线程1               线程2
cas操作上锁成功
CAS上锁失败
解锁
调用syscall唤醒别人
调用syscall请求休眠, 永久地休眠了...

如果没有测试相等, 那线程2就没法被唤醒了, 相反如果测试了相等, 丢失休眠就不会发生:

1
2
3
4
5
6
线程1               线程2
cas操作上锁成功
CAS上锁失败
解锁
调用syscall唤醒别人
调用syscall请求休眠, 但内核测试后发现*addr!=val, 退出系统调用, 不休眠

WAKE操作的val指定了需要唤醒的线程最大个数, 根据手册, 此系统调用要么在这里填1(只最多唤醒1个线程), 要么填INT_MAX(唤醒队列里全部线程)

我们填1的原因是为了避免”惊群”问题。想想, 如果有大量线程都在等待这个锁, 某个持有锁的线程释放后, 将唤醒其他所有线程, 然后一起来抢锁, 这合理吗?反正只有一个线程能拿到被释放的锁, 因此最好只唤醒一个就够了。可见, 所谓惊群, 那就是看到一点风吹草动, 所有线程都来竞争这个锁了

futex的排队问题

前面提到, 锁需要公平, 此处的唤醒机制就很公平, 内核对休眠的线程采取的唤醒机制就是先到先得(先被唤醒)。也就是说, 用futex, 不会出现某个线程被饿死的情况

两阶段锁

c提供的锁设施, 先自旋一定次数尝试获取锁, 然后用futex陷入内核, 防止cpu空转, 这是当前阶段最流行的锁的设计思路。