什么是原子性?
在多线程的场景下, 我们会遇到多个线程访问同一个变量的场景。如果多个线程只读一个全局变量, 那没有任何问题, 任何线程都能读到期望的”定值”。但是如果使用多线程进行并发的读写, 就会发生一致性问题。看下面的程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| #define _GNU_SOURCE
#include <linux/futex.h> #include <sys/syscall.h> #include <stdint.h> #include <unistd.h> #include <pthread.h> #include <stdio.h>
int share_variable = 0;
void *new_routine(void*) { for (size_t i = 0; i < 20000; i++) { share_variable ++; } }
int main() { pthread_t pid1, pid2; pthread_create(&pid1, NULL, new_routine, NULL); pthread_create(&pid2, NULL, new_routine, NULL);
void *ret; pthread_join(pid1, &ret); pthread_join(pid2, &ret);
printf("%d\n", share_variable); }
|
我们预计结果应该是40000, 但结果确都小于40000, 为什么会发生这种情况? 原因很简单, 因为我们认为share_variable ++
是原子的, 然而简单如递增操作, 它也不是原子操作。它包含了以下汇编:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| share_variable: .zero 4 new_routine(void*): pushq %rbp movq %rsp, %rbp movq %rdi, -24(%rbp) movq $0, -8(%rbp) jmp .L4 .L5: movl share_variable(%rip), %eax addl $1, %eax movl %eax, share_variable(%rip) addq $1, -8(%rbp) .L4: cmpq $19999, -8(%rbp) jbe .L5 nop popq %rbp ret
|
可以看到, 先取了share_variable
的值, 再将此值加一, 然后写回了share_variable
。然而当我们写下这些代码的时候, 我们指望此线程是原子地加上1, 而不是先读取它到寄存器, 然后再递增这个寄存器, 最后将临时变量写回原处。显然, 多个线程并发执行此处代码的时候, 必然发生不一致的问题。
试想内核调度线程是不确定的, 它可以在任何时刻停止调度某个线程, 比如线程A在标记1处的代码执行完后很长时间得不到调度, 然后调度线程B, 让B跑很久, 那么当线程A又被调度执行时, 它将使用之前的share_variable
的保存值(因为是很久之前的值, 所以较小)+1, 然后赋值给share_variable
, 那么最终值就必然小了
除了执行语句的原子性问题, 还有另外一个不确定的因素, 那就是cpu的乱序执行, 下面的内容简要提到了这个概念
原子操作: 内存屏障的保证
我们通常有个疑问, 为啥golang或c提供了AtomicLoad
和AtomicStore
, 难道直接用*addr
取值和直接用*addr = val
不是原子操作吗? 看下面的汇编:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| int i; int *p = &i;
void func2(int n) {
}
void func() { *p = 30; func2(*p); }
i: .zero 4 p: .quad i func2(int): pushq %rbp movq %rsp, %rbp movl %edi, -4(%rbp) nop popq %rbp ret func(): pushq %rbp movq %rsp, %rbp movq p(%rip), %rax movl $30, (%rax) movq p(%rip), %rax movl (%rax), %eax movl %eax, %edi call func2(int) nop popq %rbp ret
|
上面的代码展示了直接取值和赋值操作其实是”原子”的操作。可见, 存取某个地址的值, 确实最终用一条汇编完成, 可以说, 这样的指令确实是”原子的”。但是, 即使它们是一条汇编指令, 在多线程条件下却不能用直接用*pointer
并发读写同一个地址的对象, 在特别的情况下会发生一致性问题! 下面来阐述原因
问题就在cpu乱序执行的优化上。在某个cpu操作内存的时候, 由于cpu有多极流水线, 一核连续执行load
, store
操作时, cpu将分析它们的依赖关系进行乱序执行, 它们的操作顺序可能被打乱。那么此时另外一个核心同时观察这块内存, 就会出现薛定谔的玄学现象, 比如下面的伪代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| var x = 0 var y = 0
Thread A in cpu0: x=1 y=1
Thread B in cpu1: a := x b := y print(a, b)
// 可能的打印结果: (0,0), (0,1), (1,0), (1,1)
|
我们把(0,0)
,(1,0)
, (1,1)
定义为正常, 因为它们是确实可能发生的。那(0,1)
就是特别诡异的问题了: 明明是先修改x
, 再修改y
, 为什么会发生y
先被观察到修改, x
被观察到后修改?
其实这就是cpu的乱序执行优化, cpu采用流水线技术, 它会根据指令间的依赖顺序打乱连续执行的指令, 以获得更快的处理速度。比如上面的程序, 两次连续load
明显没有依赖关系, 那么cpu就可能打乱它们的执行顺序, 从而让另一个线程看到诡异的现象
显然乱序执行这种优化对于单线程执行的程序没有任何影响, 写单线程程序的程序员无法感知到这种优化, 对单线程, 它是没有任何副作用的。但是对多线程, 对共享变量的读写, 必须保证这种顺序, 那么就得用内存屏障了(除了乱序执行, 由于多核cpu具有缓存, 内存一致性模型也需要考虑, 但是这里不再多做说明, 自行百度)
内存屏障像一道墙, 保证在越过这道墙时, cpu前面的所有内存操作已经被刷入内存了, 在合适的地方插入内存屏障(它是与架构有关一条指令), 就像下面的伪代码能解决上面的”薛定谔问题”:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| var x = 0 var y = 0
Thread A in cpu0: x=1 mb(); y=1
Thread B in cpu1: a := x mb(); b := y print(a, b)
(0,0), (1,0), (1,1)
|
内存屏障是的强有力保证, 这个回答体现了这个问题
简而言之, 同一个地址在被多个线程并发读写时, 必须做内存屏障
然而对代码编写者来说, 我们不需要显式使用内存屏障, 原子操作就提供了内存屏障的要求, 就是说必须使用原子操作共享的变量。此外, 锁和golang的管道提供了隐士的内存屏障, 因此我们在正常使用同步原语(锁, 信号量, 条件变量, 管道)的时候, 代码总是能按照我们的预期跑。原子操作是为了防止乱序执行或多核cpu cache不一致时做同步时使用的。当我们要自己实现锁设施的时候, 需要用原子操作。
我们上面说了这么多, 对程序员来说其实都是废话。这么多内容就是来提示你, 操作多线程读写的变量无论何时都请使用编程语言提供的原子操作!!!即使你不懂什么是内存屏障, 什么是内存一致性模型, 用原子操作能保证你不出错, 来保证程序的正确性
下面展示了一种使用原子操作的场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| #include <stdio.h> #include <unistd.h> #include <pthread.h> #include <stdatomic.h>
volatile atomic_int a = 0;
void *thread_b(void *) { sleep(3); atomic_store(&a, 1); a = 1; }
void *thread_a(void *) { while(1) { if (atomic_load(&a) != 1) { continue; } break; }
printf("芜湖\n"); }
int main() { pthread_t tid1, tid2;
pthread_create(&tid1, NULL, thread_a, NULL); pthread_create(&tid2, NULL, thread_b, NULL);
void *ret; pthread_join(tid1, &ret); pthread_join(tid2, &ret); }
|
compare and swap(cas): 实现锁的关键原子操作
cas
操作在x86下对应一条指令(cmpxchg
), 在glibc的封装及其伪代码如下:
1 2 3 4 5 6 7 8 9 10
| atomic_compare_exchange_strong(object, expected, desired)
bool atomic_compare_exchange_strong(int *obj, int *exp, int desired) { if (*obj == *exp) { *exp = desired; return true; } retrun false; }
|
它原子的判断某个原子变量的值是否等于expected
, 如果相等就替换为desired
, 返回1。如果原子变量的值不等于expected
那么返回值就是当时这个变量的实际值, 那么返回0
对于共享的变量, 我们只需要用cas操作, 在想操作这个变量之前, 先将一个原子变量由0原子地设置1即可, 下面使用了cas操作实现了一个”自旋锁”解决了之前的问题:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| #define _GNU_SOURCE
#include <linux/futex.h> #include <sys/syscall.h> #include <stdint.h> #include <unistd.h> #include <pthread.h> #include <stdio.h> #include <stdatomic.h>
int share_variable = 0;
atomic_int lock = 0;
void *new_routine(void*) { for (size_t i = 0; i < 20000; i++) { while(1) { int expected = 0; if(atomic_compare_exchange_strong(&lock, &expected, 1)) { break; } }
share_variable ++;
atomic_store(&lock, 0); } }
int main() { pthread_t pid1, pid2; pthread_create(&pid1, NULL, new_routine, NULL); pthread_create(&pid2, NULL, new_routine, NULL);
void *ret; pthread_join(pid1, &ret); pthread_join(pid2, &ret);
printf("%d\n", share_variable); }
|
所谓自旋, 就是无限循环重试, 直到拿到锁。所以我们可以看到, 上锁的过程就是原子地尝试将变量从0变为1的过程, 谁能成功那么谁就拥有了锁, 此时就有权读写这个共享变量!
最佳实践?
编写多线程程序, 内存一致性必须考虑, 原子操作和同步原语(mutex, sem, cond)都做好了隐式的内存屏障, 应该使用它们来实现一切。