C语言实现基于信号的抢占式协程调度(单线程版本)

前言: linux调度以及最小调度单位

我们知道, linux能调度多个不同的进程。假如现在有一台单核心的cpu, 在cpu上的linux系统正在运行多个进程, 虽然某个时刻只能有一个进程正在运行, 但由于linux的调度能力(不同进程进行切换), 我们可以感知到好像有多个进程在同时运行。这种调度机制源自linux系统能在一个进程运行一段时间后执行上下文切换, 让另一个进程得以运行。

除了进程可以调度。那么线程呢? 答案是肯定的, 我们可以认为, 线程是内核的最小调度单位, 在pthread的经典实现中, pthread_create创建线程有赖于clone系统调用, 它会创建一个内核中的task_struct, 而task_struct正是linux的调度对象(每个进程对应一个task_struct, 每个pthread也对应一个task_struct, linux调度的正是task_struct!)。这也是为什么一般线程在linux中被成为轻量级进程的原因, 在man文档中, 也把线程和进程都统一用process(也就是进程)表述。

协程: 用户态的调度

除了可以依赖于linux的task_struct内核调度, 我们同样可以在用户态达到调度的目的, 可以说, 用户态做的调度就是协程。

在用户态中, 我们需要自己管理栈, 自己管理某个协程的上下文, 因此必然需要用某个结构体存储这些信息。在本文中, 我们用linux提供的内核链表(从此处下载)管理多个协程的信息。

除了需要保存每个协程的信息, 我们需要依赖于linux的信号机制在合适的时机中断某个协程, 并切换到下一个协程。

那么什么时候该发信号, 由谁发信号呢? 我们需要一个专门的线程(我们称之为monitor线程)来监控协程的运行, 当某个协程运行太久, 那么信号就会发出。信号将被专门负责协程运行的线程接收, 收到信号, 这个线程的信号处理函数就会被调用, 就在这个信号处理函数中进行协程的切换。

因此, 本文描述的协程设计涉及两个线程, 一个是monitor线程, 一个是协程的运行线程。

那么如何保存某个协程的运行上下文呢? 这是个很麻烦的话题, 我们将使用glibc提供的ucontext设施来解决这个问题, 下面来谈谈这个设施。

ucontext: 下上文的保存和恢复

下面通过一个简单的例子来说明ucontext的简单运用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <stdio.h>
#include <ucontext.h>

ucontext_t ctx;

int main() {
int i = 0;
getcontext(&ctx);
printf("%d\n", i++);
setcontext(&ctx);
}

/* output:
0
1
2
3
4
...
*/

ucontext提供了四个函数: getcontext, setcontext, makecontextswapcontext。其中getcontext是用来获得当前上下文的(用ucontext_t保存), 这其中包含了所有寄存器, 比如通用寄存器, ss, rsp, cs, rip等等。总之, 我们只用getcontext保存它们, 不用理会每个寄存器各自的作用。setcontext用来恢复上下文, 由于ucontext_t包含了指令指针寄存器rip, 因此setcontext将直接跳转且决不返回。

自己构造一个上下文, 并跳转到那里去, 是很正常的需求, makecontext提供了这种方法的封装。比如我们现在想凭空跳转到另外一个函数上去, 该怎么做? 下面的代码实现了这样的需求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#define _GNU_SOURCE

#include <stdio.h>
#include <ucontext.h>
#include <stdlib.h>
#include <stdarg.h>
#include <signal.h>

void call_me(int int1, double int2) {
printf("%d %f\n", int1, int2);
sleep(10);
}

ucontext_t ctx;

int main() {
getcontext(&ctx);
ctx.uc_stack.ss_flags = 0;
ctx.uc_stack.ss_size = 4096;
ctx.uc_stack.ss_sp = malloc(4096);
ctx.uc_link = NULL;
makecontext(&ctx,(void(*)()) call_me, 2, 10, 20.3);
setcontext(&ctx);
}

注意必须先getcontext, 然后再修改一些信息, 最后才能makecontext, 为什么必须这么做, 我也不清楚, 希望有知道的同学告诉我。注意, 我们这里必须指定栈相关信息(见上面代码的ctx.uc_stack...), 包含栈的大小, 栈底的地址。uc_link代表当call_me函数返回时, 跳转的上下文, 如果为NULL(不修改这个字段, 默认也就是NULL), 那么在执行完这个函数后调用exit(0)结束进程。

最后, 介绍swapcontext函数, 它用于切换上下文(即为保存当前上下文, 并跳转到新的上下文, 同时也会设置信号屏蔽字)。在调度中, 这个函数非常重要, 下面我们来利用它实现两个协程的切换, 来加深理解:

不要迷信ucontext, 它是不完美的。切换上下文时, 无法原子地设置信号屏蔽字并跳转。在切换上下文时, ucontext先调用sigprocmask设置信号屏蔽字, 再进行跳转。极端情况下这可能重入, 因此这种切换很可能出问题。要解决这个问题参阅信号是如何返回的一文提到的sigreturn系统调用, 这样能完美解决信号处理重入的bug。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#define _GNU_SOURCE

#include <stdio.h>
#include <ucontext.h>
#include <stdlib.h>
#include <signal.h>

ucontext_t ctx1, ctx2;

void func_ctx1() {
while(1) {
printf("ctx1\n");
swapcontext(&ctx1, &ctx2);
}

}

void func_ctx2() {
while(1) {
printf("ctx2\n");
swapcontext(&ctx2, &ctx1);
}
}

int main() {
getcontext(&ctx1);
ctx1.uc_stack.ss_flags = 0;
ctx1.uc_stack.ss_size = 4096;
ctx1.uc_stack.ss_sp = malloc(4096);
makecontext(&ctx1, func_ctx1, 0);

getcontext(&ctx2);
ctx2.uc_stack.ss_flags = 0;
ctx2.uc_stack.ss_size = 4096;
ctx2.uc_stack.ss_sp = malloc(4096);
makecontext(&ctx2, func_ctx2, 0);

setcontext(&ctx1);
}

/* output:
ctx1
ctx2
ctx1
ctx2
...
*/

本文的协程设计:

下面看看这个设计的api, 来直观感受它的使用, 在我们的设计中, 协程分为主协程和次协程, 其中主协程终止, 整个进程随即调用exit(0)停止:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void my_new_func2(void *p) {
while(1) {
printf("789\n");
}
}

void my_new_func1(void *p) {
while(1) {
printf("456\n");
}
}

void my_main_func(void *p) {
co_create(my_new_func1, NULL);
co_create(my_new_func2, NULL);
while(1) {
printf("123\n");
}
}

int main() {
co_run(my_main_func, NULL);
}

/* output:
123
123
123
...
456
456
456
...
789
789
789
...
*/

我们将用main代表的线程作为monitor线程, 它内部有一个无限循环, 在sleep一段时间后检查某个线程是否运行太久, 如果是, 那么就向运行协程的线程发送信号, 相关数据结构和代码如下(不要思考某个地方的意义, 这个代码是我从写好的代码上扣下来的, 很难看明白, 只需要了解大概的思路就行了):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// 进行切换通知的信号, 最好为不可靠信号, 避免信号堵住
// 这里和go保持一致性, 都是SIGURG
#define _CO_NOTIFY_SIGNO SIGURG
// main_exit栈的大小
#define _CO_EXIT_STACK_SIZE 4096
// 协程的栈大小
#define _CO_STACK_SIZE 128*1024
// 时间片大小
#define _CO_SLICE_NSEC 1000
// 最多执行多少个时间片
#define _CO_SCHEDULE_N_SLICE 10

// 协程函数的签名
typedef void (*co_func_t)(void *);

typedef enum _co_state {
// 仅有两个状态, running和exit
// exit状态代表此协程主动请求退出
_CO_STATE_RUNNING,
_CO_STATE_EXIT
} _co_state_t;

// 单个调度体
typedef struct _co_task_node {
// 链表节点
struct list_head node;
// 单个协程的id
int co_id;
// 单个协程的上下文
ucontext_t co_ctx;
// 退出协程函数的上下文
ucontext_t co_exit_ctx;
// 协程当前状态
_co_state_t co_state;
// 为该协程分配的栈地址
void *co_alloc_stack;
void *exit_alloc_stack;
} _co_task_node_t;

// 调度相关的数据
typedef struct _co_schedule {
// 被调度的协程, 这个协程是主协程
_co_task_node_t task_main;

// 当前正在运行的任务
_co_task_node_t *cur_running_task;
int n_task_running;

// id号, 每次create的时候都递增它, 保证每个协程都有不同的id
unsigned long id_inc;

// monitor线程和程序运行线程之间的临界数据
// 这里用锁保护
pthread_mutex_t mutex;

} _co_schedule_t;

_co_schedule_t _co_sche;

void co_run(co_func_t func, void *arg) {
// 下面进行一些初始化工作

pthread_mutex_init(&_co_sche.mutex, NULL);
_co_sche.n_task_running = 1;
// id从2开始递增, 1以及被主协程使用
_co_sche.id_inc = 2;

void *main_stack = malloc(_CO_STACK_SIZE);
void *main_exit_stack = malloc(_CO_EXIT_STACK_SIZE);
_co_sche.task_main.co_alloc_stack = main_stack;
_co_sche.task_main.exit_alloc_stack = main_exit_stack;

_co_sche.task_main.co_id = 1;
_co_sche.task_main.co_state = _CO_STATE_RUNNING;
_co_sche.task_main.node.prev = _co_sche.task_main.node.next = &_co_sche.task_main.node;
_co_sche.task_main.co_alloc_stack = main_stack;
_co_sche.task_main.exit_alloc_stack = main_exit_stack;



// 生成main协程的上下文
getcontext(&_co_sche.task_main.co_exit_ctx);
sigfillset(&_co_sche.task_main.co_exit_ctx.uc_sigmask);
sigdelset(&_co_sche.task_main.co_exit_ctx.uc_sigmask, SIGURG);
_co_sche.task_main.co_exit_ctx.uc_stack.ss_flags = 0;
_co_sche.task_main.co_exit_ctx.uc_stack.ss_size = _CO_EXIT_STACK_SIZE;
_co_sche.task_main.co_exit_ctx.uc_stack.ss_sp = main_exit_stack;
makecontext(&_co_sche.task_main.co_exit_ctx, _co_main_exit, 0, NULL);

getcontext(&_co_sche.task_main.co_ctx);
sigfillset(&_co_sche.task_main.co_exit_ctx.uc_sigmask);
sigdelset(&_co_sche.task_main.co_exit_ctx.uc_sigmask, SIGURG);
_co_sche.task_main.co_ctx.uc_link = &_co_sche.task_main.co_exit_ctx;
_co_sche.task_main.co_ctx.uc_stack.ss_flags = 0;
_co_sche.task_main.co_ctx.uc_stack.ss_size = _CO_STACK_SIZE;
_co_sche.task_main.co_ctx.uc_stack.ss_sp = main_stack;
makecontext(&_co_sche.task_main.co_ctx, (void(*)())func, 1, arg);

_co_sche.cur_running_task = &_co_sche.task_main;

pthread_t tid;
pthread_attr_t attr;
pthread_attr_init(&attr);
// 得阻塞所有的信号, 然后在setup中设置新的信号处理函数
sigset_t set;
sigfillset(&set);
pthread_attr_setsigmask_np(&attr, &set);
pthread_create(&tid, &attr, _main_thread_setup, NULL);

// 循环进行monitor
int cur_running_co_id = 1;
int slice_spent = 0;
unsigned long sleep_time_nsec = _CO_SLICE_NSEC;
struct timespec sleep_ts = {
.tv_sec = 0,
.tv_nsec = sleep_time_nsec
};
while(1) {
nanosleep(&sleep_ts, NULL);
pthread_mutex_lock(&_co_sche.mutex);
// 确定此时只有一个线程在被调度
if(_co_sche.n_task_running == 1 && _co_sche.cur_running_task->co_id == 1) {
pthread_mutex_unlock(&_co_sche.mutex);
continue;
}
if (cur_running_co_id != _co_sche.cur_running_task->co_id) {
cur_running_co_id = _co_sche.cur_running_task->co_id;
slice_spent = 0;
pthread_mutex_unlock(&_co_sche.mutex);
continue;
}
slice_spent ++;
if (slice_spent > _CO_SCHEDULE_N_SLICE) {
sigval_t v;
int n = pthread_sigqueue(tid, _CO_NOTIFY_SIGNO, v);
}
pthread_mutex_unlock(&_co_sche.mutex);
}
}

观察上面代码片段, co_run内部的pthread_create创造了一个供协程运行线程, 并且内部有个无限循环, 用于自旋, 监视一个协程的运行时间。如果协程用光了它的时间片, 信号就会发出。为了保证数据的一致性, 两个线程间的通信必须用到pthread_mutex进行保证。在co_run中初始化了主协程的上下文, 并指定了它的uc_link。也就是说, _co_main_exit将在主协程返回后被执行。下面是_co_main_exit的代码:

1
2
3
void _co_main_exit() {
exit(0);
}

来看_main_thread_setup函数, 它是协程运行线程, 在创建此线程时, 我们有意阻塞了所有信号, 以便运行线程后首先设置好信号处理函数。下面是_main_pthread_setup的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 在进入这个线程时, 它的所有信号都是屏蔽的
void *_main_thread_setup(void * data) {
// 设置信号处理函数, 在信号处理函数中阻塞所有信号
struct sigaction action;
action.sa_flags = SA_SIGINFO|SA_RESTART;
sigfillset(&action.sa_mask);
action.sa_sigaction = _co_signal_handler;
sigaction(_CO_NOTIFY_SIGNO, &action, NULL);

// 现在可以处理信号了, 将接收调度信号, setcontext里面已经设置好
// 了sigmask
setcontext(&_co_sche.task_main.co_ctx);
// unreachable
return NULL;
}

可知, 它做了两件事: 1.设置信号处理函数, 2.跳转到主协程的上下文

现在基本能让主协程运行了, 我们需要创建新的协程, 下面是co_create的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
void co_create(co_func_t func, void *arg) {
// 在锁之前设置信号阻塞, 否则会发生死锁
sigset_t newmask;
sigset_t oldmask;
sigfillset(&newmask);
pthread_sigmask(SIG_SETMASK, &newmask, &oldmask);

pthread_mutex_lock(&_co_sche.mutex);
// 此处增加一个协程
_co_task_node_t *newtask = malloc(sizeof(_co_task_node_t));
newtask->co_state = _CO_STATE_RUNNING;
newtask->co_id = _co_sche.id_inc;
_co_sche.id_inc ++;
_co_sche.n_task_running ++;

void *co_stack = malloc(_CO_STACK_SIZE);
void *exit_stack = malloc(_CO_EXIT_STACK_SIZE);
newtask->co_alloc_stack = co_stack;
newtask->exit_alloc_stack = exit_stack;

// 获得新的上下文
getcontext(&newtask->co_exit_ctx);
newtask->co_exit_ctx.uc_stack.ss_flags = 0;
sigfillset(&newtask->co_exit_ctx.uc_sigmask);
sigdelset(&newtask->co_exit_ctx.uc_sigmask, _CO_NOTIFY_SIGNO);
newtask->co_exit_ctx.uc_link = NULL;
newtask->co_exit_ctx.uc_stack.ss_flags = 0;
newtask->co_exit_ctx.uc_stack.ss_size = _CO_EXIT_STACK_SIZE;
newtask->co_exit_ctx.uc_stack.ss_sp = exit_stack;
makecontext(&newtask->co_exit_ctx, _co_exit, 0);

getcontext(&newtask->co_ctx);
newtask->co_ctx.uc_stack.ss_flags = 0;
sigfillset(&newtask->co_ctx.uc_sigmask);
sigdelset(&newtask->co_ctx.uc_sigmask, _CO_NOTIFY_SIGNO);
newtask->co_ctx.uc_link = &newtask->co_exit_ctx;
newtask->co_ctx.uc_stack.ss_flags = 0;
newtask->co_ctx.uc_stack.ss_size = _CO_STACK_SIZE;
newtask->co_ctx.uc_stack.ss_sp = co_stack;
makecontext(&newtask->co_ctx, (void (*) (void))func, 1, arg);

// 将新的任务加入到链表
list_add_tail(&newtask->node, &_co_sche.task_main.node);

pthread_mutex_unlock(&_co_sche.mutex);

// 恢复信号处理
pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
}

void _co_exit() {
// 先阻塞信号
sigset_t newmask;
sigset_t oldmask;
sigfillset(&newmask);
pthread_sigmask(SIG_SETMASK, &newmask, &oldmask);

pthread_mutex_lock(&_co_sche.mutex);
_co_sche.n_task_running --;
_co_sche.cur_running_task->co_state = _CO_STATE_EXIT;
pthread_mutex_unlock(&_co_sche.mutex);

pthread_sigmask(SIG_SETMASK, &oldmask, NULL);

while(1);
}

这里有个很重要的一点, 必须在上锁之前禁止信号响应, 否则就会发生死锁。试想, 如果此协程锁上了, 然后发送了调度信号, 跳转到信号处理函数中, 此处又执行上锁, 那么就死锁了。然后有个巧妙的点就是在协程结束后调用的_co_exit, 修改好数据结构后使用while(1)阻塞在那来等待一个信号, 进入中断调度。

最后附上信号处理函数的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void _co_signal_handler() {
pthread_mutex_lock(&_co_sche.mutex);


// 先一个一个找, 直到找到下一个
_co_task_node_t *cur_task = _co_sche.cur_running_task;
_co_task_node_t *iter_task = container_of(_co_sche.cur_running_task->node.next, _co_task_node_t, node);
while(1) {
if(iter_task->co_state == _CO_STATE_RUNNING) {
break;
} else {
// 进行垃圾回收, 拿下一个对象
free(iter_task->co_alloc_stack);
free(iter_task->exit_alloc_stack);
free(iter_task);
list_del(&iter_task->node);

iter_task = container_of(_co_sche.cur_running_task->node.next, _co_task_node_t, node);
continue;
}
}

// 找到了一个合适的切换对象, 现在进行切换

_co_sche.cur_running_task = iter_task;

pthread_mutex_unlock(&_co_sche.mutex);

swapcontext(&cur_task->co_ctx, &iter_task->co_ctx);
}

在扫描下一个RUNNNIG协程的途中, 我们通过判断co_state == _CO_STATE_RUNNING来判断是否进行垃圾回收, 这里可以结合_co_exit看看。

附完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
#define _GNU_SOURCE

#include <pthread.h>
#include "list.h"
#include <ucontext.h>
#include <stdatomic.h>
#include <signal.h>
#include <stdlib.h>
#include <stdio.h>

// 进行切换通知的信号, 最好为不可靠信号, 避免信号堵住
// 这里和go保持一致性, 都是SIGURG
#define _CO_NOTIFY_SIGNO SIGURG
// main_exit栈的大小
#define _CO_EXIT_STACK_SIZE 4096
// 协程的栈大小
#define _CO_STACK_SIZE 128*1024
// 时间片大小
#define _CO_SLICE_NSEC 1000
// 最多执行多少个时间片
#define _CO_SCHEDULE_N_SLICE 10

// 协程函数的签名
typedef void (*co_func_t)(void *);

typedef enum _co_state {
// 仅有两个状态, running和exit
// exit状态代表此协程主动请求退出
_CO_STATE_RUNNING,
_CO_STATE_EXIT
} _co_state_t;

// 单个调度体
typedef struct _co_task_node {
// 链表节点
struct list_head node;
// 单个协程的id
int co_id;
// 单个协程的上下文
ucontext_t co_ctx;
// 退出协程函数的上下文
ucontext_t co_exit_ctx;
// 协程当前状态
_co_state_t co_state;
// 为该协程分配的栈地址
void *co_alloc_stack;
void *exit_alloc_stack;
} _co_task_node_t;

// 调度相关的数据
typedef struct _co_schedule {
// 被调度的协程, 这个协程是主协程
_co_task_node_t task_main;

// 当前正在运行的任务
_co_task_node_t *cur_running_task;
int n_task_running;

// id号, 每次create的时候都递增它, 保证每个协程都有不同的id
unsigned long id_inc;

// monitor线程和程序运行线程之间的临界数据
// 这里用锁保护
pthread_mutex_t mutex;

} _co_schedule_t;

_co_schedule_t _co_sche;

void _co_main_exit() {
exit(0);
}

void _co_signal_handler() {
pthread_mutex_lock(&_co_sche.mutex);


// 先一个一个找, 直到找到下一个
_co_task_node_t *cur_task = _co_sche.cur_running_task;
_co_task_node_t *iter_task = container_of(_co_sche.cur_running_task->node.next, _co_task_node_t, node);
while(1) {
if(iter_task->co_state == _CO_STATE_RUNNING) {
break;
} else {
// 进行垃圾回收, 拿下一个对象
free(iter_task->co_alloc_stack);
free(iter_task->exit_alloc_stack);
free(iter_task);
list_del(&iter_task->node);

iter_task = container_of(_co_sche.cur_running_task->node.next, _co_task_node_t, node);
continue;
}
}

// 找到了一个合适的切换对象, 现在进行切换

_co_sche.cur_running_task = iter_task;

pthread_mutex_unlock(&_co_sche.mutex);

swapcontext(&cur_task->co_ctx, &iter_task->co_ctx);
}

// 在进入这个线程时, 它的所有信号都是屏蔽的
void *_main_thread_setup(void * data) {
// 设置信号处理函数, 在信号处理函数中阻塞所有信号
struct sigaction action;
action.sa_flags = SA_SIGINFO|SA_RESTART;
sigfillset(&action.sa_mask);
action.sa_sigaction = _co_signal_handler;
sigaction(_CO_NOTIFY_SIGNO, &action, NULL);

// 现在可以处理信号了, 将接收调度信号, setcontext里面已经设置好
// 了sigmask
setcontext(&_co_sche.task_main.co_ctx);
// unreachable
return NULL;
}

void co_run(co_func_t func, void *arg) {
// 下面进行一些初始化工作

pthread_mutex_init(&_co_sche.mutex, NULL);
_co_sche.n_task_running = 1;
// id从2开始递增, 1以及被主协程使用
_co_sche.id_inc = 2;

void *main_stack = malloc(_CO_STACK_SIZE);
void *main_exit_stack = malloc(_CO_EXIT_STACK_SIZE);
_co_sche.task_main.co_alloc_stack = main_stack;
_co_sche.task_main.exit_alloc_stack = main_exit_stack;

_co_sche.task_main.co_id = 1;
_co_sche.task_main.co_state = _CO_STATE_RUNNING;
_co_sche.task_main.node.prev = _co_sche.task_main.node.next = &_co_sche.task_main.node;
_co_sche.task_main.co_alloc_stack = main_stack;
_co_sche.task_main.exit_alloc_stack = main_exit_stack;



// 生成main协程的上下文
getcontext(&_co_sche.task_main.co_exit_ctx);
sigfillset(&_co_sche.task_main.co_exit_ctx.uc_sigmask);
sigdelset(&_co_sche.task_main.co_exit_ctx.uc_sigmask, SIGURG);
_co_sche.task_main.co_exit_ctx.uc_stack.ss_flags = 0;
_co_sche.task_main.co_exit_ctx.uc_stack.ss_size = _CO_EXIT_STACK_SIZE;
_co_sche.task_main.co_exit_ctx.uc_stack.ss_sp = main_exit_stack;
makecontext(&_co_sche.task_main.co_exit_ctx, _co_main_exit, 0, NULL);

getcontext(&_co_sche.task_main.co_ctx);
sigfillset(&_co_sche.task_main.co_exit_ctx.uc_sigmask);
sigdelset(&_co_sche.task_main.co_exit_ctx.uc_sigmask, SIGURG);
_co_sche.task_main.co_ctx.uc_link = &_co_sche.task_main.co_exit_ctx;
_co_sche.task_main.co_ctx.uc_stack.ss_flags = 0;
_co_sche.task_main.co_ctx.uc_stack.ss_size = _CO_STACK_SIZE;
_co_sche.task_main.co_ctx.uc_stack.ss_sp = main_stack;
makecontext(&_co_sche.task_main.co_ctx, (void(*)())func, 1, arg);

_co_sche.cur_running_task = &_co_sche.task_main;

pthread_t tid;
pthread_attr_t attr;
pthread_attr_init(&attr);
// 得阻塞所有的信号, 然后在setup中设置新的信号处理函数
sigset_t set;
sigfillset(&set);
pthread_attr_setsigmask_np(&attr, &set);
pthread_create(&tid, &attr, _main_thread_setup, NULL);

// 循环进行monitor
int cur_running_co_id = 1;
int slice_spent = 0;
unsigned long sleep_time_nsec = _CO_SLICE_NSEC;
struct timespec sleep_ts = {
.tv_sec = 0,
.tv_nsec = sleep_time_nsec
};
while(1) {
nanosleep(&sleep_ts, NULL);
pthread_mutex_lock(&_co_sche.mutex);
// 确定此时只有一个线程在被调度
if(_co_sche.n_task_running == 1 && _co_sche.cur_running_task->co_id == 1) {
pthread_mutex_unlock(&_co_sche.mutex);
continue;
}
if (cur_running_co_id != _co_sche.cur_running_task->co_id) {
cur_running_co_id = _co_sche.cur_running_task->co_id;
slice_spent = 0;
pthread_mutex_unlock(&_co_sche.mutex);
continue;
}
slice_spent ++;
if (slice_spent > _CO_SCHEDULE_N_SLICE) {
sigval_t v;
int n = pthread_sigqueue(tid, _CO_NOTIFY_SIGNO, v);
}
pthread_mutex_unlock(&_co_sche.mutex);
}
}

// 仅仅将自己标记为_CO_STATE_EXIT, 真正的内存回收工作在下次调度时扫描进行
// 标记完成后, 释放锁, 原地while(1)等待
void _co_exit() {
// 先阻塞信号
sigset_t newmask;
sigset_t oldmask;
sigfillset(&newmask);
pthread_sigmask(SIG_SETMASK, &newmask, &oldmask);

pthread_mutex_lock(&_co_sche.mutex);
_co_sche.n_task_running --;
_co_sche.cur_running_task->co_state = _CO_STATE_EXIT;
pthread_mutex_unlock(&_co_sche.mutex);

pthread_sigmask(SIG_SETMASK, &oldmask, NULL);

while(1);
}

void co_create(co_func_t func, void *arg) {
// 在锁之前设置信号阻塞, 否则会发生死锁
sigset_t newmask;
sigset_t oldmask;
sigfillset(&newmask);
pthread_sigmask(SIG_SETMASK, &newmask, &oldmask);

pthread_mutex_lock(&_co_sche.mutex);
// 此处增加一个协程
_co_task_node_t *newtask = malloc(sizeof(_co_task_node_t));
newtask->co_state = _CO_STATE_RUNNING;
newtask->co_id = _co_sche.id_inc;
_co_sche.id_inc ++;
_co_sche.n_task_running ++;

void *co_stack = malloc(_CO_STACK_SIZE);
void *exit_stack = malloc(_CO_EXIT_STACK_SIZE);
newtask->co_alloc_stack = co_stack;
newtask->exit_alloc_stack = exit_stack;

// 获得新的上下文
getcontext(&newtask->co_exit_ctx);
newtask->co_exit_ctx.uc_stack.ss_flags = 0;
sigfillset(&newtask->co_exit_ctx.uc_sigmask);
sigdelset(&newtask->co_exit_ctx.uc_sigmask, _CO_NOTIFY_SIGNO);
newtask->co_exit_ctx.uc_link = NULL;
newtask->co_exit_ctx.uc_stack.ss_flags = 0;
newtask->co_exit_ctx.uc_stack.ss_size = _CO_EXIT_STACK_SIZE;
newtask->co_exit_ctx.uc_stack.ss_sp = exit_stack;
makecontext(&newtask->co_exit_ctx, _co_exit, 0);

getcontext(&newtask->co_ctx);
newtask->co_ctx.uc_stack.ss_flags = 0;
sigfillset(&newtask->co_ctx.uc_sigmask);
sigdelset(&newtask->co_ctx.uc_sigmask, _CO_NOTIFY_SIGNO);
newtask->co_ctx.uc_link = &newtask->co_exit_ctx;
newtask->co_ctx.uc_stack.ss_flags = 0;
newtask->co_ctx.uc_stack.ss_size = _CO_STACK_SIZE;
newtask->co_ctx.uc_stack.ss_sp = co_stack;
makecontext(&newtask->co_ctx, (void (*) (void))func, 1, arg);

// 将新的任务加入到链表
list_add_tail(&newtask->node, &_co_sche.task_main.node);

pthread_mutex_unlock(&_co_sche.mutex);

// 恢复信号处理
pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
}


// -----------------------
// 上面是库函数

void my_new_func2(void *p) {
while(1) {
printf("789\n");
}
}

void my_new_func1(void *p) {
while(1) {
printf("456\n");
}
}

void my_main_func(void *p) {
co_create(my_new_func1, NULL);
co_create(my_new_func2, NULL);
while(1) {
printf("123\n");
}
}

int main() {
co_run(my_main_func, NULL);
}

更多思考(QA)

Q: 如果信号中断了系统调用?
A: 以SA_RESTART设置的信号处理函数, 在返回时可以重启系统调用。然而并不是所有的函数都能被重启, 比如nanosleep, select, sendto, sendmsg等等。为了稳妥, 我们最好对每个系统调用做一层系统调用封装, 检查其返回值是否为EINTR, 如果是, 那么重新执行。这是一个很庞大的工作量, 本文没有做这些封装(懒)。值得一提的是, go也对每个syscall做了这种封装, 使得信号发生时系统调用能被正确的处理

Q: 协程间通信以及锁等基础设施?
A: 以后单独写篇文章来做这个

Q: 利用多线程, 像go一样?
A: 现在只实现了单线程版本, 以后会做多线程的协程调度