知识分享-线程, 协程以及网络模型

本文目的

聚焦线程, 协程, 有栈协程, 无栈协程, reactor, 网络框架, netpoll, 阻塞等名词, 了解比较主流的网络编程逻辑。

背景

多核时代, 使用多线程能加速运算。然而网络/磁盘io的阻塞会白白浪费时间, 浪费线程资源。但是线程资源是宝贵的, 开多了浪费性能(切换会耗费很多时间), 开少了无法利用多核的优势, 因此有多少核开多少个线程是主流认知。

然而阻塞的io会让线程浪费掉cpu时间, 所以linux推出了非阻塞的io, 通过O_NONBLOCK打开的文件描述符读写都不会阻塞, 配合epoll可以实现单线程处理海量io。

传统reactor

目前新语言陆续推出协程, 配套的网络设施也发生了大量变化。例如在”线程资源”稀缺的传统语言中(java和cpp的低版本), epoll(或其它的多路复用系统调用)实现的reactor事件回调模型是主流。我不了解proactor, 所以不讨论它。

在没有协程的传统语言中, 为了尽量减少系统调度的频率, 用少量线程+epoll处理海量连接是高并发服务器的基本思路。陈硕提到, 一个好的reactor框架会减少程序员遇到的”偶发复杂性”(意思就是说不用自己手写epoll), 让程序员更加专注于业务。

陈硕说的好, 那我就不用epoll开搞了, 而是直接聚焦传统语言最常用的reactor模型写个简单的demo。c++比较好的reactor框架可能是muduo, libhv(字节的大佬做的, 我没用过), java的则是netty(据说很多开源轮子都用它做网络)。笔者对muduo比较熟悉, 这里抄一段代码来做演示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include "muduo/base/Logging.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
#include "muduo/net/TcpConnection.h"

int main() {
muduo::net::EventLoop loop;
muduo::net::InetAddress listenAddr("127.0.0.1", 8080);
muduo::net::TcpServer server(&loop, listenAddr, "My Echo Server", muduo::net::TcpServer::Option::kReusePort);
server.setConnectionCallback([](const muduo::net::TcpConnectionPtr &conn){
LOG_INFO << "一个新连接的到来";
conn->send("你好新连接\n");
});

server.setMessageCallback([](const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buf, muduo::Timestamp){
std::string msg(buf->retrieveAllAsString());
LOG_INFO << "收到一条消息" << msg.size();
conn->send(msg);
});

server.start();
loop.loop();
}

其实写reactor就是在写回调, 注意回调里面不能有阻塞的代码, 如果要每隔两秒打印一条信息, 必须注册定时器回调(而不是sleep, sleep会使这个线程停止调度, 是极其错误的)。任何阻塞线程的操作(甚至是耗时的计算操作, 极端点, 查询本地数据库也都是)都不应该出现在回调代码中。

在这样的编程模型下, 如果要为每个连接提供服务, 则需要用内存保存每个连接的当前状态做一个状态机(其实也可以用数据库嘛, 不过reactor的精髓就是不要在回调里进行耗时操作, 所以用数据库不好)。

reactor的优点是做一些复杂的业务写状态机, 控制力极强, 如果用全局变量存储每个连接的上下文, 在回调代码里锁都不用加就能随便访问所有连接的上下文, 也可以向每个连接发送消息。

reactor的缺点也很明显, 那就是回调会割裂逻辑, 每次回调都需要查找当前连接的状态以确定下一步该干啥, 我认为用reactor写业务其实就做状态机。

举个简单的例子, 我这里有个counter服务, 服务器为每一个连接维护一个计数器, 用户每次发一个记数请求分包, 服务器就把它的计数器加1并且返回。那么就需要为这个连接开辟一块内存, 存储当前值, 每次message回调先解析请求包, 确认是计数请求后, 就访问对应的内存, 递增counter并返回。在这个简单的场景下, counter保存的就是连接的状态。针对这个场景, 我们给一个伪代码, 说明这个简单的”状态机”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 为了给每个连接给一个id, 用一个全局变量
int globalConnID = 0

onConnection(conn) {
int connID = globalConnID++

// 开辟一块内存, 我这里用new来表示它被分配在堆内存上
connCtx = new ConnContext{
ConnID: connID
Counter: 0
}
// 每个连接都有一个槽给程序员放东西, 想用就用, 其实一般人拿来放连接的上下文信息
// 这就像gin的Set/Get一样, 也给每个请求维护一个槽, 一个意思嘛
conn.setContext(connCtx)
}

// 采用4字节包头+包体的分包协议
onMessage(conn, buf, timestamp) {
// 连四字节包头都还没有
if (buf.readableBytes < 4) {
return
}

// 包体还没收全
bodySize = buf.peekInt64();
if (buf.readableBytes < bodySize + 4) {
return
}

// 丢弃包头的四个字节
buf.retrieve(4)
bodyBytes = buf.retrieveAsString(bodySize)

// unmarshal请求包
var myRequest Request
unmarshal(bodyBytes, &myRequest)

if (myRequest.reqType == "counter") {
c = ctx.getContext.Counter;
// 下面省略一系列封包的代码, 需要把数据打包成 4字节包大小前缀+包体的形式
// 然后调用send把消息给reactor模型发送
// conn.send(......)
ctx.getContext.Counter++
}
}

虽然reactor模型写代码很繁琐, 却能提供高并发和很强的控制力(单个reactor下能在一个eventloop里面操作所有连接)。陈硕认为reactor框架的控制力和适用场景比proactor好, 主要是写代码更简单, proactor的性能虽然好, 但是写代码却超级复杂。

eventloop是什么? 这是reactor最基本的组件, 中文名字叫事件循环, 跑eventloop的线程被成为io线程, 它会执行epoll操作。如果有事件发生就触发我们写好的回调, 所以我们的回调是被eventloop所处的线程直接调用的, 不能在回调中写耗时操作, 不然就耽误了epoll, 胜任不了高并发的任务了。下面我展示一个基本的事件循环应该有的东西, 见我演示。

考虑一个复杂的聊天协议, 一个用户发送广播包, 需要把消息转发给所有连接。如果用go写, 那么一般是一个goroutine负责一个连接, 要做消息的广播其实很复杂。但是如果用reactor来写, 自己只需要维持所有连接的上下文, 就能不加锁地操作所有连接, 其实是相当舒适的, 这也是我说reactor控制力强的原因。

绝对不要在回调里面进行任何耗时操作(我们把跑eventloop的线程叫做io线程),即使是打开读写文件也要考虑再三, 如果读写文件确实耗时, 那么就放弃在io线程进行。那么如果确实要进行耗时操作怎么办呢? 正确的做法是把cpu密集的耗时操作通过队列丢给工作线程, 然后再把运算结果”打回”io线程让io把执行结果发送给用户。为什么java面试强调用线程池? 因为很多人会写netty代码, 那么线程池就是标配, 程序员必须了解。

此外, 操作数据库的时候, 如果轮子只提供了阻塞的函数, 那么也可以开一个数据库线程, io线程把查询任务丢给数据库线程, 数据库线程执行完/拿到响应后再告知io线程执行结果。实际上很多程序也是这么做的, 提供n个线程跑reactor, 一个线程做异步日志, 一个线程做数据库线程……

reactor框架提供了runInLoop的方法, 能够从其它线程把代码”打入”io线程进行执行。一般而言, 我倾向于只让io线程读写连接的上下文, 不暴露给其它线程, 这样可以避免加锁, 控制力极强。

为什么java的程序员强调线程池, go的程序员却根本不care? 那是因为对于java的好多业务框架都是基于reactor回调的, 谁能容忍在io线程的回调里写耗时代码呢? 这种情况把耗时任务分发给线程池才是对的。

多reactor模型的优势与代价

简单而言, 我上面提到的例子只利用了一个线程, 是一个单reactor的例子, 虽然只用了一个线程, 但是能通过状态机+回调的方式处理大量连接, 这就是reactor(或者说epoll)的魅力。

利用起一个线程远远不够, io线程就算是io密集的, 也会伴随大量的拷贝操作, 那么利用多个线程就自然能加速拷贝操作, 降低整体延迟。很多场景下其实单reactor是极其合适的, 比如redis, 这种单reactor的模式控制力极强。我没有背过八股, 我认为redis用单reactor的原因是它的并发并不是那么大, 拷贝操作也不是很耗时(就是io线程编码解码很简单, 耗时很短), 单reactor完全能胜任。

多reactor的主流模式是: 一个线程一直accept新的连接, 然后递交给不同线程的reactor eventloop, 把连接均匀的分给不同的线程处理。

虽然多reactor能降低延迟, 提供均质的服务, 但是有得必有失。我个人很喜欢单reactor, 原因是状态信息都仅仅被io线程独享, 如果要做聊天这样的场景, 那么单reactor将没有任何加锁成本, 直接查表send就能实现广播聊天。

使用多reactor有个弊端, 业务如果涉及多个连接的广播就很麻烦。如果连接被分散给多个线程, 那么每个线程都维护自己的连接的状态, 中间要进行信息交互, 必然会需要一些机制, 这中间必然有额外的同步开销。

在多reactor下的对于聊天的这种广播场景, 我能想到的一个方法就是每个eventloop都保存其它eventloop的引用, 当一个连接发送消息时, 直接挨个对其它eventloop进行runInLoop, 其他的eventloop也能收到这个广播信息咯。

有栈协程和线程

go用的io都是阻塞的, goroutine给人一种在用线程的感觉。然而并不是每个goroutine都享有一个线程, 魔法在于go的io是自己用epoll实现的, 每个看起来会阻塞的操作都被golang重新封装, 而绝对不使用操作系统会阻塞的io, 例如go绝对不会调用nanosleep(2)。

java23的虚拟线程只感觉自己好像突然能无限开线程执行阻塞的网络操作也不会出问题了, 这种协程的好处是程序员对此无感, 只觉得自己在写线程。

go的程序员一般写网络代码就是一个goroutine循环accept, 然后直接开goroutine操作这个连接, 更离谱的是, 甚至对于一个连接, 读写都单独开goroutine。通常一个连接可以有读/写/心跳三个goroutine处理。

go通常情况下会开一定数量的系统线程来跑协程, 但是实际运行过程中线程数目有时候会变多, 接下来我将通过一个问题说明原因:

如何避免真正进行系统调用时阻塞线程? 例如go提供了syscall, 我们用syscall.Write写入一个将会阻塞的fd, 为毛没有任何副作用, 即使开很多个gorouine让它们阻塞在那里最后go也能正常调度大量goroutine? 我这里说的副作用的意思是, 我知道go底层是一定数量的系统线程调度很多的goroutine代码, 但是我广泛的使用syscall阻塞线程, 也不会耽搁golang调度其它协程。

这是由于在调用系统调用前会新开一个新的线程来执行系统调用, 也就是说syscall.Write直接新开一个线程避免阻塞(即使有的系统调用不会阻塞, go也选择新开线程, 这是一种保险, 也是一种加速手段, 试想一个线程等待磁盘io, 其实是这部分时间线程完全没有用来跑代码, 如果go始终只使用N个线程跑代码, 那么等待磁盘io的时间就被白白浪费了), syscall.XXX在语言层面封装了新开线程的逻辑。

在java中, 虚拟线程(协程)被放在一个容量一定的线程池(和cpu数量有关)里不断调度, 使用java的库尝试进行系统调用时, java也会首先判断这个执行流是不是虚拟线程, 如果是, 就新开一个线程进行系统调用, 防止把那个池子里的线程全搞阻塞住了。

go里的epoll

os.OpenFile在open打开文件时, go语言底部也是使用了O_NONBLOCK的, 虽然go提供给语言用户的网络库看起来是阻塞的, 但是打开的文件描述符其实都是非阻塞的。简单看看代码, 瞅瞅什么情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
// 这是os.OpenFile的接口
func OpenFile(name string, flag int, perm FileMode) (*File, error) {
testlog.Open(name)
f, err := openFileNolog(name, flag, perm) // 我们来看看这个
if err != nil {
return nil, err
}
f.appendMode = flag&O_APPEND != 0

return f, nil
}

func openFileNolog(name string, flag int, perm FileMode) (*File, error) {
// 这个sticky和文件的sticky位有关, sticky是作用给目录的一个特殊的位, linux支持CreateWithStickyBit
// 所以下面的setSticky没用
// setSticky := false
// if !supportsCreateWithStickyBit && flag&O_CREATE != 0 && perm&ModeSticky != 0 {
// if _, err := Stat(name); IsNotExist(err) {
// setSticky = true
// }
// }

var r int
// 为毛for循环, 因为打开fuse file system居然可以被信号中断, 这里要无限尝试, 见#11180
for {
var e error
// 真正调用syscall.Open
r, e = syscall.Open(name, flag|syscall.O_CLOEXEC, syscallMode(perm))
if e == nil {
break
}

// 可能会非常不巧地被信号中断, 需要重试
// We have to check EINTR here, per issues 11180 and 39237.
if e == syscall.EINTR {
continue
}

return nil, &PathError{Op: "open", Path: name, Err: e}
}

// 别忘了, linux支持create file的同时设置sticky位, 所以下面是废代码
// open(2) itself won't handle the sticky bit on *BSD and Solaris
// if setSticky {
// setStickyBit(name)
// }

// 支持create file的时候设置closeOnExec, 所以下面是废代码
// There's a race here with fork/exec, which we are
// content to live with. See ../syscall/exec_unix.go.
// if !supportsCloseOnExec {
// syscall.CloseOnExec(r)
// }

// 继续探究newFile
return newFile(uintptr(r), name, kindOpenFile), nil
}

func newFile(fd uintptr, name string, kind newFileKind) *File {
fdi := int(fd)
if fdi < 0 {
return nil
}
// 封装成File
// 注意下面的poll.FD, 俗称PollFD, 是一个很重要的结构, 可以借助它注册文件描述符给netpoll
/*
type FD struct {
// PollFD拥有一个读写锁, 防止读写同时进行, 想象一个*os.File是可以共享的
// 如果多个执行流同时找netpoll读数据, 那读到的数据必然可能串话
// 写也是一样的, netpoll的写缓冲不大, 如果写一半再继续写且不加锁, 一个write也将不会是原子的
// 协程1 write 1111111....11111(很多的1), 协程2 write 222222(很少的2), 可能最后写入的是
// 1111111...222222....11111, 这就是我说的串话, 所以加读写锁是必要的
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex

// System file descriptor. Immutable until Close.
Sysfd int

// I/O poller.
pd pollDesc

// Writev cache.
iovecs *[]syscall.Iovec

// Semaphore signaled when file is closed.
csema uint32

// Non-zero if this file has been set to blocking mode.
isBlocking uint32

// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool

// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool

// Whether this is a file rather than a network socket.
isFile bool
}
*/
f := &File{&file{
pfd: poll.FD{
Sysfd: fdi,
IsStream: true,
ZeroReadIsEOF: true,
},
name: name,
stdoutOrErr: fdi == 1 || fdi == 2,
}}

// 是否可以poll, 能poll就注册给netpoll
pollable := kind == kindOpenFile || kind == kindPipe || kind == kindNonBlock

// If the caller passed a non-blocking filedes (kindNonBlock),
// we assume they know what they are doing so we allow it to be
// used with kqueue.
if kind == kindOpenFile {
switch runtime.GOOS {
case "darwin", "ios", "dragonfly", "freebsd", "netbsd", "openbsd":
var st syscall.Stat_t
err := ignoringEINTR(func() error {
return syscall.Fstat(fdi, &st)
})
typ := st.Mode & syscall.S_IFMT
// Don't try to use kqueue with regular files on *BSDs.
// On FreeBSD a regular file is always
// reported as ready for writing.
// On Dragonfly, NetBSD and OpenBSD the fd is signaled
// only once as ready (both read and write).
// Issue 19093.
// Also don't add directories to the netpoller.
if err == nil && (typ == syscall.S_IFREG || typ == syscall.S_IFDIR) {
pollable = false
}

// In addition to the behavior described above for regular files,
// on Darwin, kqueue does not work properly with fifos:
// closing the last writer does not cause a kqueue event
// for any readers. See issue #24164.
if (runtime.GOOS == "darwin" || runtime.GOOS == "ios") && typ == syscall.S_IFIFO {
pollable = false
}
}
}

// 我们来研究FD.Init是啥
if err := f.pfd.Init("file", pollable); err != nil {
// An error here indicates a failure to register
// with the netpoll system. That can happen for
// a file descriptor that is not supported by
// epoll/kqueue; for example, disk files on
// Linux systems. We assume that any real error
// will show up in later I/O.
} else if pollable {
// We successfully registered with netpoll, so put
// the file into nonblocking mode.
// 再次保证文件是NonBlock的
if err := syscall.SetNonblock(fdi, true); err == nil {
f.nonblock = true
}
}

runtime.SetFinalizer(f.file, (*file).close)
return f
}

func (fd *FD) Init(net string, pollable bool) error {
// 注意下面的英文注释, 就算是socket, 也其实可以当成普通文件的文件描述符对待
// linux的抽象就是这么好
// We don't actually care about the various network types.
if net == "file" {
fd.isFile = true
}
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller,
// assume we are using blocking mode.
fd.isBlocking = 1
}
return err
}

func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
// runtime的代码看不到了
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}

// runtime_pollOpen其实的代码就追溯不到了

看情况是os.OpenFile把fd注册给了netpoll, 那么Read, Write会是什么情况?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
func (f *File) Read(b []byte) (n int, err error) {
if err := f.checkValid("read"); err != nil {
return 0, err
}
n, e := f.read(b)
return n, f.wrapErr("read", e)
}

func (f *File) read(b []byte) (n int, err error) {
n, err = f.pfd.Read(b)
runtime.KeepAlive(f)
return n, err
}

// 我们需要看的就是这个, PollFD的实现
func (fd *FD) Read(p []byte) (int, error) {
// 加读锁, 防止我之前提到的"串话"
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
/* ignoringEINTERIO就是一个小加工具, 看下面的代码
func ignoringEINTRIO(fn func(fd int, p []byte) (int, error), fd int, p []byte) (int, error) {
for {
n, err := fn(fd, p)
if err != syscall.EINTR {
return n, err
}
}
}
不得不说go在linux下的实现其实可以选择更好的signalfd, 把信号融入事件循环, 不至于每个syscall都来检查一下EINTR
*/
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
// 如果fd没有任何数据到来, fild.Read将会返回EAGAIN, n = 0, 那么就waitRead
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}

// 下面来看waitRead, 这是很神奇的东西, netpoll的调度也就体现在这了
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
// runtime_pollWait是内部代码, 我们看不到了
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}

看了这么多, 其实根本就在一点, 语言封装的是NONBLOCK的io, 并netpoll提供的就是阻塞/唤醒goroutine的机制, netpoll就是一个epoll, 只要把fd注册给netpoll, 我们就能在可以读写的时候唤醒协程。

作为go语言用户, 我们怎么利用netpoll? 那就是使用poll.FD了。如果我们想自己造一个tcp轮子, 完全可以用poll.FD自己写一套(不过没有意义), go的netpoll是基础设置, net是建立在它之上的, 我们是完全可以自己用poll.FD实现net包的。

netpoll到底干了啥

我们提到, 作为go语言用户, 如果想使用netpoll的阻塞/唤醒机制, 只能通过poll.FD注册自己的文件描述符, 然后使用FD提供的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (*FD).Accept() (int, syscall.Sockaddr, string, error)
func (*FD).Close() error
func (*FD).Dup() (int, string, error)
func (*FD).Fchdir() error
func (*FD).Fchmod(mode uint32) error
func (*FD).Fchown(uid int, gid int) error
func (*FD).Fstat(s *syscall.Stat_t) error
func (*FD).Fsync() error
func (*FD).Ftruncate(size int64) error
func (*FD).Init(net string, pollable bool) error
func (*FD).Pread(p []byte, off int64) (int, error)
func (*FD).Pwrite(p []byte, off int64) (int, error)
func (*FD).RawControl(f func(uintptr)) error
func (*FD).RawRead(f func(uintptr) bool) error
func (*FD).RawWrite(f func(uintptr) bool) error
func (*FD).Read(p []byte) (int, error)
func (*FD).ReadDirent(buf []byte) (int, error)
func (*FD).ReadFrom(p []byte) (int, syscall.Sockaddr, error)
func (*FD).ReadFromInet4(p []byte, from *syscall.SockaddrInet4) (int, error)
func (*FD).ReadFromInet6(p []byte, from *syscall.SockaddrInet6) (int, error)
func (*FD).ReadMsg(p []byte, oob []byte, flags int) (int, int, int, syscall.Sockaddr, error)
func (*FD).ReadMsgInet4(p []byte, oob []byte, flags int, sa4 *syscall.SockaddrInet4) (int, int, int, error)
func (*FD).ReadMsgInet6(p []byte, oob []byte, flags int, sa6 *syscall.SockaddrInet6) (int, int, int, error)
func (*FD).Seek(offset int64, whence int) (int64, error)
func (*FD).SetBlocking() error
func (*FD).SetDeadline(t time.Time) error
func (*FD).SetReadDeadline(t time.Time) error
func (*FD).SetWriteDeadline(t time.Time) error
func (*FD).SetsockoptByte(level int, name int, arg byte) error
func (*FD).SetsockoptIPMreq(level int, name int, mreq *syscall.IPMreq) error
func (*FD).SetsockoptIPMreqn(level int, name int, mreq *syscall.IPMreqn) error
func (*FD).SetsockoptIPv6Mreq(level int, name int, mreq *syscall.IPv6Mreq) error
func (*FD).SetsockoptInet4Addr(level int, name int, arg [4]byte) error
func (*FD).SetsockoptInt(level int, name int, arg int) error
func (*FD).SetsockoptLinger(level int, name int, l *syscall.Linger) error
func (*FD).Shutdown(how int) error
func (*FD).WaitWrite() error
func (*FD).Write(p []byte) (int, error)
func (*FD).WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, error)
func (*FD).WriteMsgInet4(p []byte, oob []byte, sa *syscall.SockaddrInet4) (int, int, error)
func (*FD).WriteMsgInet6(p []byte, oob []byte, sa *syscall.SockaddrInet6) (int, int, error)
func (*FD).WriteOnce(p []byte) (int, error)
func (*FD).WriteTo(p []byte, sa syscall.Sockaddr) (int, error)
func (*FD).WriteToInet4(p []byte, sa *syscall.SockaddrInet4) (int, error)
func (*FD).WriteToInet6(p []byte, sa *syscall.SockaddrInet6) (int, error)
func (*FD).Writev(v *[][]byte) (int64, error)

提供给语言用户的就这些, 我们用起来特别简单, 但是实现呢? netpoll究竟干了啥? 哪个线程负责跑epoll?

我查阅了相关博客, 发现其实每一个m的调度代码都会有一个epoll。举个例子, 现在只有一个goroutine, 它正要waitRead, 把这个协程放入netpoll的等待队列, 然后进入调度代码, go发现, workout, 居然没有别的协程要调度了, 我也不能让cpu空转啊, 于是就会立马调用netpoll(-1)。一旦epoll发现文件描述符可读, 相关代码就会把g放入可执行队列, 调度器就又有g可以执行咯。

1
2
3
4
5
6
7

// netpoll 轮询网络并返回可运行的准备就绪的 goroutines 列表,传入的参数会决定他的行为
// - 参数 < 0: 无限期阻塞等待文件就绪
// - 参数 == 0: 非阻塞轮询
// - 参数 > 0: 阻塞定期轮询
// netpoll里面有个epoll_wait
func netpoll(delay int64) gList

也就是说go调度器负责决定什么时候epoll_wait, 我是个小白, 对此完全不懂, 只能通过这个”cpu可能空转的例子”加深一下理解, 基本了解下是go的调度器在跑epoll就行了。

有栈协程解决了什么问题? 为什么不用epoll或者reactor了?

一般而言, go的http框架做tcp读写都是写阻塞io而不是回调了(忽略字节的那个)。根本原因就是go通过语言层面的epoll解决了”线程是稀缺资源”这个问题, 赋予用户开N多执行流的能力。如果线程的切换的价格是低廉的, 就没人会用epoll。此外go封装了自己的epoll, 这种情况也没必要用epoll自己造了, 但是字节自己写了个reactor, 我不了解具体原因, 但是说法是可以减少切换的开销, 有不少的性能提升。

那么, 如果想用go实现reactor那种控制力很强的代码咋办? 就比如我想做一个tcp聊天室? 方法1: 使用go语言的reactor框架, 比较好的有字节的那个netpoll, 我之前考核写象棋用的是一个大佬写的gev(go当然也可以用syscall.Epoll的相关操作, 字节就用它自己造了一个reactor框架)。

方法2: 用goroutine+channel, 把消息发送给主的消息循环, 一个大的for select作为所有消息的接受者, 这样就能回归到原始的状态机了, 看看我考核的代码: 点击此处

处理单个的连接很有考究, 我用了读/写两个goroutine处理一个线程, 然后用了channel做同步, 用for select这个大循环接收多种channel维持状态, 我感觉channel用好了控制力的效果和reactor是一样的, 其实都是写状态机嘛。

无栈协程, 为什么我注意到了它?

在我尝试写这个分享的时候, 我其实还不知道无栈协程到底是什么, 但是基本上知道它能很大程度上被大佬封装成轮子, 然后提供给”调包侠”使用, 我们就能使用这个轮子做到单线程异步了, 并且写代码的难度还和go差不多。

下面是等疾风大佬做的一个轮子的echo server示例(等疾风做的工作就是使用c++最新提供的协程语法实现了一个调度器):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <co_context/net.hpp>
using namespace co_context;

task<> session(int sockfd) {
co_context::socket sock{sockfd};
char buf[8192];

while (true) {
int nr = co_await sock.recv(buf);
co_await sock.send({buf, (size_t)nr});
}
}

task<> server(const uint16_t port) {
acceptor ac{inet_address{port}};
for (int sock; (sock = co_await ac.accept()) >= 0;) {
// 就像go session(sock)
// 其实这也是one loop per thread模式, co_spawn通过线程局部变量
// 拿到了io_context本体, 然后使用co_spawn注册了一个协程
co_spawn(session(sock));
}
}

int main() {
io_context ctx;
ctx.co_spawn(server(1234));
ctx.start();
ctx.join();
return 0;
}

ctx.start是跑io线程, ctx.join会等待这个io线程结束。这是单线程异步哦, 利用一个单线程可以实现海量并发, 并且这个代码风格是同步代码哦。

可以看到其实和写阻塞代码没两样。据说无栈协程是一种语法糖, 只要库作者实现调度器, 就能把异步的代码改写为同步的代码, 不但兼顾性能并且代码风格优雅(没有回调)。

传统用reactor写的代码全是回调, 如果要发送一个dns查询, 可能会是这样的:

1
2
3
// 伪代码
1. send_dns_request(target_name, func_dns_callback)
2. 在func_dns_callback继续你的动作

但是用了协程, 代码可以变成这样:

1
2
3
// 伪代码
co_await send_dns_request(target_name = ..., timeout = 3)
// 继续你的逻辑

魔法在于, co_await后面跟一个协程函数, 它不会真正调用阻塞的gethostname这个阻塞操作系统的操作, 而是把执行流切换到调度器, 由调度器发送一个dns udp请求包, 然后调度器选择其它协程来跑。

调度器有个epoll, 每次调度前都会跑epoll, 把阻塞在某些io的协程重新恢复运行。一旦这个epoll收到了dns响应包, 就会将之前挂起的协程设置为runnable, 并设置好它的返回值。

弊端是协程代码里同样不能使用阻塞线程的操作或者复杂的计算操作, 它会阻塞调度器的调度, 调度器整个阻塞住了, 谈什么并发呢?

可以看出来这种协程还是和go的有很大的区别, 因为这种协程是”非抢占”的。如果要让调度器充分掌握控制权, 要么没有耗时任务(我指的是长时间耗时的计算任务或者阻塞的系统调用), 要么充分使用co_yield或者await把控制权交还给调度器(协程就是做io密集的工作的, 需要让调度器充分调度才能实现单线程大并发)。

如果要做线程耗时的任务, c++的协程还是得把任务打入工作线程, 然后工作线程再把计算结果打回到io线程(这里我提到的io线程是指跑协程调度器的那个线程)。这和reactor是一样的。不过这是个复杂的任务, 因为协程的各种同步设施和线程的是不一样的, 所以协程要和工作线程”交流”恐怕真的有点麻烦。

等疾风的轮子没有提供相关例子, 我也是个半调子, 但我想到一个简单(但低效)的方法能做到, 其实就是轮询:

1
2
3
4
5
6
7
8
9
10
11
递交任务流程
1. 使用spin_lock, 锁住递送队列
2. 把任务打入任务
为什么用spin_lock? 因为我希望等待锁的时间尽可能少, 以免耽误调度器调度

收回执行结果流程:
1. 使用spin_lock上锁, 锁住结果队列
2. 拿任务执行结果
3. 如果拿不到co_yield, 然后goto 1号步骤

这样的思路其实就是疯狂轮询, 轮询不到结果就放弃执行权, 让其它协程先跑。

要用起c++的协程写网络, 一个牛逼的调度器必不可少, 我上面用的调度器就是等疾风大佬做的, 据说他使用了io_uring, 性能爆表。要实现这样一个调度器, 需要实现各种阻塞原语, 比如像sleep这样的阻塞语句, 必须用其它非阻塞的法子做到, 然后在阻塞到期后恢复协程的调度。我目前没有实现这种调度器的经验。这位作者甚至实现了类似go的channel。

实现这个调度器, 要支持很多设施, 比如要实现协程锁, 要实现tcp/udp socket, 要实现sleep, 要实现协程条件变量……我认为基本和内核做的工作差不多了。

go的协程和上面的co_context的区别

  1. go是抢占式的

你肯定没有听说过在go语言里面开goroutine执行死循环会耽误调度器调度。事实上, 如果我们写for{}无限循环, 超过10ms没有切换新协程, go运行时就发信号强行抢占了。也就是说, 即使不调用netpoll的那些调度代码最后go也会有办法实现抢占的。

  1. go的调度器跑在固定多的线程池上, 且在启动的时候就分配这些线程作为goroutine的”运行池”

而上面c++的那个, 需要自己安排线程跑调度器

  1. go的协程会在不同的地方陷入调度

比如在消耗大量栈的函数之前插入陷入调度的代码。比如在运行系统调用之前(当然是使用go的syscall.XXX而不能调cgo)。比如执行”阻塞”操作的时候, 例如mutex, 信号量, 条件变量, netpoll的waitXXX都会陷入调度。长时间没有调度, 直接信号抢占, 就像操作系统的定时器抢占一样屌, 缺点是性能较拉, 因为信号抢占需要先陷入内核, 内核设置栈帧, 然后才能跑进抢占代码, 因此go语言有个编译选项可以指定是否关闭信号抢占(这通常会造成bug, 比如开和调度器一样多的协程序跑while(1), 将彻底堵死所有调度器)。

但是c++在哪里会被调度是程序员决定的, 比如co_await, co_yield, co_return, 程序员对自己在哪里陷入调度一清二楚。

go什么时候会创建新的线程?

  1. 使用系统调用

即使不是慢系统调用(参考apue, 意思是可能长时间阻塞的系统调用, 比如sleep, read, epoll_wait, select), 有的系统调用依然会有不少的等待时间, 如果在go中调用syscallXXX, 必然新创建一个线程来做。

其实我之前提到的写文件, 虽然不是慢系统调用, 但是等待io的时间必然被浪费, 创建新线程代劳那些等待时间是必要的。

  1. 调用cgo之前

想想如果调用cgo之前不创建线程代劳, cgo如果执行耗时操作就很耽误go调度器调度, 想想如果go使用8个线程跑调度器, 如果我开8个goroutine执行while(1);那该如何? 如果真的不创建新的线程代劳, 那么第九个正常的协程就永远没机会跑了。

需要知道的是go协程在跑线程的时候不会被调度(也不会被信号抢占), 回到go程序之后才能被正常调度。

如何实现一个reactor框架?

这点有很多细节, 推荐阅读陈硕的《Linux高性能并发服务器编程》, 其实虽然是c++书籍, 但是语言部分介绍很少, 基本在介绍思路, 比如双缓冲的高性能日志库, 比如比较不同的网络模式(例如one loop per thread, 其实就是reactor使用的模式, 此外才有一个线程一个连接的模式, 被他指出有性能问题)。