锁的基本原理(2)

cas自旋锁引出的问题

评价锁的一个重要因素就是公平性, 我们可以看到上节我们提供了cas自旋锁:

1
2
3
4
5
6
7
8
9
10
11
12
void mutex_lock(atomic_int *addr) {
while(1) {
int expected = 0;
if(atomic_compare_exchange_strong(&lock, &expected, 1)) {
break;
}
}
}

void mutex_unlock(atomic_int *addr) {
atomic_store(&lock, 0);
}

然而它很大的问题就是公平性, 多个线程抢占锁的过程是一个无限循环的原子操作, 所以谁能抢到锁全靠运气, 很可能有线程很长时间也抢不到锁, 它会在抢到锁前经历大量的失败重试。所以这种锁可能让某个线程”饿死”, 为了公平性, 引入票锁(ticket lock)

票锁: 依赖两个变量的锁结构

首先来看一个新的原子操作, 它原子地取变量的当前值并让它加上1, 对应x86架构的xadd指令:

1
2
3
4
5
6
7
8
atomic_fetch_add(object, operand)

// 伪代码
int atomic_fetch_add(int *obj, int operand) {
int k = *obj;
*obj ++;
retrun k;
}

用它来实现一个锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#define _GNU_SOURCE

#include <linux/futex.h>
#include <sys/syscall.h>
#include <stdint.h>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <stdatomic.h>
atomic_int ticket = 0;
atomic_int current = 0;

int share_variable = 0;

int ticket_mutex_lock(atomic_int *addr_ticket, atomic_int *addr_current) {
int t = atomic_fetch_add(addr_ticket, 1);
while(1) {
int k = atomic_load(addr_current);
if(k == t) {
break;
}
}
return t;
}

void ticket_mutex_unlock(atomic_int *addr) {
atomic_store(addr, *addr+1);
}

void *new_routine(void*) {
for (size_t i = 0; i < 20000; i++)
{
// 自旋上锁
ticket_mutex_lock(&ticket, &current);

share_variable ++;

// 解锁
ticket_mutex_unlock(&current);
}
}

int main() {
pthread_t pid1, pid2;
pthread_create(&pid1, NULL, new_routine, NULL);
pthread_create(&pid2, NULL, new_routine, NULL);

void *ret;
pthread_join(pid1, &ret);
pthread_join(pid2, &ret);


printf("%d\n", share_variable);
}

这种锁需要两个变量, 一个是依次递增的”售票口”(ticket), 每个线程要拿锁时, 先从这里拿票。然后等待current等于自己拿到的票。解锁时只需要将current递增就行了。这种票锁做的自旋锁很稳定, 不会让某个线程饿死, linux内核的里面的自旋锁就是票锁