前言: linux调度以及最小调度单位
我们知道, linux能调度多个不同的进程。假如现在有一台单核心的cpu, 在cpu上的linux系统正在运行多个进程, 虽然某个时刻只能有一个进程正在运行, 但由于linux的调度能力(不同进程进行切换), 我们可以感知到好像有多个进程在同时运行。这种调度机制源自linux系统能在一个进程运行一段时间后执行上下文切换, 让另一个进程得以运行。
除了进程可以调度。那么线程呢? 答案是肯定的, 我们可以认为, 线程是内核的最小调度单位, 在pthread的经典实现中, pthread_create
创建线程有赖于clone
系统调用, 它会创建一个内核中的task_struct
, 而task_struct
正是linux的调度对象(每个进程对应一个task_struct
, 每个pthread也对应一个task_struct
, linux调度的正是task_struct
!)。这也是为什么一般线程在linux中被成为轻量级进程的原因, 在man
文档中, 也把线程和进程都统一用process
(也就是进程)表述。
协程: 用户态的调度
除了可以依赖于linux的task_struct
内核调度, 我们同样可以在用户态达到调度的目的, 可以说, 用户态做的调度就是协程。
在用户态中, 我们需要自己管理栈, 自己管理某个协程的上下文, 因此必然需要用某个结构体存储这些信息。在本文中, 我们用linux提供的内核链表(从此处下载)管理多个协程的信息。
除了需要保存每个协程的信息, 我们需要依赖于linux的信号机制在合适的时机中断某个协程, 并切换到下一个协程。
那么什么时候该发信号, 由谁发信号呢? 我们需要一个专门的线程(我们称之为monitor
线程)来监控协程的运行, 当某个协程运行太久, 那么信号就会发出。信号将被专门负责协程运行的线程接收, 收到信号, 这个线程的信号处理函数就会被调用, 就在这个信号处理函数中进行协程的切换。
因此, 本文描述的协程设计涉及两个线程, 一个是monitor
线程, 一个是协程的运行线程。
那么如何保存某个协程的运行上下文呢? 这是个很麻烦的话题, 我们将使用glibc提供的ucontext
设施来解决这个问题, 下面来谈谈这个设施。
ucontext: 下上文的保存和恢复
下面通过一个简单的例子来说明ucontext
的简单运用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| #include <stdio.h> #include <ucontext.h>
ucontext_t ctx;
int main() { int i = 0; getcontext(&ctx); printf("%d\n", i++); setcontext(&ctx); }
|
ucontext
提供了四个函数: getcontext
, setcontext
, makecontext
和swapcontext
。其中getcontext
是用来获得当前上下文的(用ucontext_t
保存), 这其中包含了所有寄存器, 比如通用寄存器, ss
, rsp
, cs
, rip
等等。总之, 我们只用getcontext
保存它们, 不用理会每个寄存器各自的作用。setcontext
用来恢复上下文, 由于ucontext_t
包含了指令指针寄存器rip
, 因此setcontext
将直接跳转且决不返回。
自己构造一个上下文, 并跳转到那里去, 是很正常的需求, makecontext
提供了这种方法的封装。比如我们现在想凭空跳转到另外一个函数上去, 该怎么做? 下面的代码实现了这样的需求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| #define _GNU_SOURCE
#include <stdio.h> #include <ucontext.h> #include <stdlib.h> #include <stdarg.h> #include <signal.h>
void call_me(int int1, double int2) { printf("%d %f\n", int1, int2); sleep(10); }
ucontext_t ctx;
int main() { getcontext(&ctx); ctx.uc_stack.ss_flags = 0; ctx.uc_stack.ss_size = 4096; ctx.uc_stack.ss_sp = malloc(4096); ctx.uc_link = NULL; makecontext(&ctx,(void(*)()) call_me, 2, 10, 20.3); setcontext(&ctx); }
|
注意必须先getcontext
, 然后再修改一些信息, 最后才能makecontext
, 为什么必须这么做, 我也不清楚, 希望有知道的同学告诉我。注意, 我们这里必须指定栈相关信息(见上面代码的ctx.uc_stack...
), 包含栈的大小, 栈底的地址。uc_link
代表当call_me
函数返回时, 跳转的上下文, 如果为NULL
(不修改这个字段, 默认也就是NULL
), 那么在执行完这个函数后调用exit(0)
结束进程。
最后, 介绍swapcontext
函数, 它用于切换上下文(即为保存当前上下文, 并跳转到新的上下文, 同时也会设置信号屏蔽字)。在调度中, 这个函数非常重要, 下面我们来利用它实现两个协程的切换, 来加深理解:
不要迷信ucontext, 它是不完美的。切换上下文时, 无法原子地设置信号屏蔽字并跳转。在切换上下文时, ucontext先调用sigprocmask设置信号屏蔽字, 再进行跳转。极端情况下这可能重入, 因此这种切换很可能出问题。要解决这个问题参阅信号是如何返回的一文提到的sigreturn系统调用, 这样能完美解决信号处理重入的bug。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| #define _GNU_SOURCE
#include <stdio.h> #include <ucontext.h> #include <stdlib.h> #include <signal.h>
ucontext_t ctx1, ctx2;
void func_ctx1() { while(1) { printf("ctx1\n"); swapcontext(&ctx1, &ctx2); }
}
void func_ctx2() { while(1) { printf("ctx2\n"); swapcontext(&ctx2, &ctx1); } }
int main() { getcontext(&ctx1); ctx1.uc_stack.ss_flags = 0; ctx1.uc_stack.ss_size = 4096; ctx1.uc_stack.ss_sp = malloc(4096); makecontext(&ctx1, func_ctx1, 0); getcontext(&ctx2); ctx2.uc_stack.ss_flags = 0; ctx2.uc_stack.ss_size = 4096; ctx2.uc_stack.ss_sp = malloc(4096); makecontext(&ctx2, func_ctx2, 0);
setcontext(&ctx1); }
|
本文的协程设计:
下面看看这个设计的api, 来直观感受它的使用, 在我们的设计中, 协程分为主协程和次协程, 其中主协程终止, 整个进程随即调用exit(0)
停止:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| void my_new_func2(void *p) { while(1) { printf("789\n"); } }
void my_new_func1(void *p) { while(1) { printf("456\n"); } }
void my_main_func(void *p) { co_create(my_new_func1, NULL); co_create(my_new_func2, NULL); while(1) { printf("123\n"); } }
int main() { co_run(my_main_func, NULL); }
|
我们将用main
代表的线程作为monitor
线程, 它内部有一个无限循环, 在sleep
一段时间后检查某个线程是否运行太久, 如果是, 那么就向运行协程的线程发送信号, 相关数据结构和代码如下(不要思考某个地方的意义, 这个代码是我从写好的代码上扣下来的, 很难看明白, 只需要了解大概的思路就行了):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
#define _CO_NOTIFY_SIGNO SIGURG
#define _CO_EXIT_STACK_SIZE 4096
#define _CO_STACK_SIZE 128*1024
#define _CO_SLICE_NSEC 1000
#define _CO_SCHEDULE_N_SLICE 10
typedef void (*co_func_t)(void *);
typedef enum _co_state { _CO_STATE_RUNNING, _CO_STATE_EXIT } _co_state_t;
typedef struct _co_task_node { struct list_head node; int co_id; ucontext_t co_ctx; ucontext_t co_exit_ctx; _co_state_t co_state; void *co_alloc_stack; void *exit_alloc_stack; } _co_task_node_t;
typedef struct _co_schedule { _co_task_node_t task_main;
_co_task_node_t *cur_running_task; int n_task_running;
unsigned long id_inc;
pthread_mutex_t mutex; } _co_schedule_t;
_co_schedule_t _co_sche;
void co_run(co_func_t func, void *arg) {
pthread_mutex_init(&_co_sche.mutex, NULL); _co_sche.n_task_running = 1; _co_sche.id_inc = 2;
void *main_stack = malloc(_CO_STACK_SIZE); void *main_exit_stack = malloc(_CO_EXIT_STACK_SIZE); _co_sche.task_main.co_alloc_stack = main_stack; _co_sche.task_main.exit_alloc_stack = main_exit_stack;
_co_sche.task_main.co_id = 1; _co_sche.task_main.co_state = _CO_STATE_RUNNING; _co_sche.task_main.node.prev = _co_sche.task_main.node.next = &_co_sche.task_main.node; _co_sche.task_main.co_alloc_stack = main_stack; _co_sche.task_main.exit_alloc_stack = main_exit_stack;
getcontext(&_co_sche.task_main.co_exit_ctx); sigfillset(&_co_sche.task_main.co_exit_ctx.uc_sigmask); sigdelset(&_co_sche.task_main.co_exit_ctx.uc_sigmask, SIGURG); _co_sche.task_main.co_exit_ctx.uc_stack.ss_flags = 0; _co_sche.task_main.co_exit_ctx.uc_stack.ss_size = _CO_EXIT_STACK_SIZE; _co_sche.task_main.co_exit_ctx.uc_stack.ss_sp = main_exit_stack; makecontext(&_co_sche.task_main.co_exit_ctx, _co_main_exit, 0, NULL);
getcontext(&_co_sche.task_main.co_ctx); sigfillset(&_co_sche.task_main.co_exit_ctx.uc_sigmask); sigdelset(&_co_sche.task_main.co_exit_ctx.uc_sigmask, SIGURG); _co_sche.task_main.co_ctx.uc_link = &_co_sche.task_main.co_exit_ctx; _co_sche.task_main.co_ctx.uc_stack.ss_flags = 0; _co_sche.task_main.co_ctx.uc_stack.ss_size = _CO_STACK_SIZE; _co_sche.task_main.co_ctx.uc_stack.ss_sp = main_stack; makecontext(&_co_sche.task_main.co_ctx, (void(*)())func, 1, arg);
_co_sche.cur_running_task = &_co_sche.task_main;
pthread_t tid; pthread_attr_t attr; pthread_attr_init(&attr); sigset_t set; sigfillset(&set); pthread_attr_setsigmask_np(&attr, &set); pthread_create(&tid, &attr, _main_thread_setup, NULL);
int cur_running_co_id = 1; int slice_spent = 0; unsigned long sleep_time_nsec = _CO_SLICE_NSEC; struct timespec sleep_ts = { .tv_sec = 0, .tv_nsec = sleep_time_nsec }; while(1) { nanosleep(&sleep_ts, NULL); pthread_mutex_lock(&_co_sche.mutex); if(_co_sche.n_task_running == 1 && _co_sche.cur_running_task->co_id == 1) { pthread_mutex_unlock(&_co_sche.mutex); continue; } if (cur_running_co_id != _co_sche.cur_running_task->co_id) { cur_running_co_id = _co_sche.cur_running_task->co_id; slice_spent = 0; pthread_mutex_unlock(&_co_sche.mutex); continue; } slice_spent ++; if (slice_spent > _CO_SCHEDULE_N_SLICE) { sigval_t v; int n = pthread_sigqueue(tid, _CO_NOTIFY_SIGNO, v); } pthread_mutex_unlock(&_co_sche.mutex); } }
|
观察上面代码片段, co_run
内部的pthread_create
创造了一个供协程运行线程, 并且内部有个无限循环, 用于自旋, 监视一个协程的运行时间。如果协程用光了它的时间片, 信号就会发出。为了保证数据的一致性, 两个线程间的通信必须用到pthread_mutex
进行保证。在co_run
中初始化了主协程的上下文, 并指定了它的uc_link
。也就是说, _co_main_exit
将在主协程返回后被执行。下面是_co_main_exit
的代码:
1 2 3
| void _co_main_exit() { exit(0); }
|
来看_main_thread_setup
函数, 它是协程运行线程, 在创建此线程时, 我们有意阻塞了所有信号, 以便运行线程后首先设置好信号处理函数。下面是_main_pthread_setup
的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| void *_main_thread_setup(void * data) { struct sigaction action; action.sa_flags = SA_SIGINFO|SA_RESTART; sigfillset(&action.sa_mask); action.sa_sigaction = _co_signal_handler; sigaction(_CO_NOTIFY_SIGNO, &action, NULL);
setcontext(&_co_sche.task_main.co_ctx); return NULL; }
|
可知, 它做了两件事: 1.设置信号处理函数, 2.跳转到主协程的上下文
现在基本能让主协程运行了, 我们需要创建新的协程, 下面是co_create
的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| void co_create(co_func_t func, void *arg) { sigset_t newmask; sigset_t oldmask; sigfillset(&newmask); pthread_sigmask(SIG_SETMASK, &newmask, &oldmask);
pthread_mutex_lock(&_co_sche.mutex); _co_task_node_t *newtask = malloc(sizeof(_co_task_node_t)); newtask->co_state = _CO_STATE_RUNNING; newtask->co_id = _co_sche.id_inc; _co_sche.id_inc ++; _co_sche.n_task_running ++;
void *co_stack = malloc(_CO_STACK_SIZE); void *exit_stack = malloc(_CO_EXIT_STACK_SIZE); newtask->co_alloc_stack = co_stack; newtask->exit_alloc_stack = exit_stack;
getcontext(&newtask->co_exit_ctx); newtask->co_exit_ctx.uc_stack.ss_flags = 0; sigfillset(&newtask->co_exit_ctx.uc_sigmask); sigdelset(&newtask->co_exit_ctx.uc_sigmask, _CO_NOTIFY_SIGNO); newtask->co_exit_ctx.uc_link = NULL; newtask->co_exit_ctx.uc_stack.ss_flags = 0; newtask->co_exit_ctx.uc_stack.ss_size = _CO_EXIT_STACK_SIZE; newtask->co_exit_ctx.uc_stack.ss_sp = exit_stack; makecontext(&newtask->co_exit_ctx, _co_exit, 0);
getcontext(&newtask->co_ctx); newtask->co_ctx.uc_stack.ss_flags = 0; sigfillset(&newtask->co_ctx.uc_sigmask); sigdelset(&newtask->co_ctx.uc_sigmask, _CO_NOTIFY_SIGNO); newtask->co_ctx.uc_link = &newtask->co_exit_ctx; newtask->co_ctx.uc_stack.ss_flags = 0; newtask->co_ctx.uc_stack.ss_size = _CO_STACK_SIZE; newtask->co_ctx.uc_stack.ss_sp = co_stack; makecontext(&newtask->co_ctx, (void (*) (void))func, 1, arg); list_add_tail(&newtask->node, &_co_sche.task_main.node);
pthread_mutex_unlock(&_co_sche.mutex);
pthread_sigmask(SIG_SETMASK, &oldmask, NULL); }
void _co_exit() { sigset_t newmask; sigset_t oldmask; sigfillset(&newmask); pthread_sigmask(SIG_SETMASK, &newmask, &oldmask);
pthread_mutex_lock(&_co_sche.mutex); _co_sche.n_task_running --; _co_sche.cur_running_task->co_state = _CO_STATE_EXIT; pthread_mutex_unlock(&_co_sche.mutex);
pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
while(1); }
|
这里有个很重要的一点, 必须在上锁之前禁止信号响应, 否则就会发生死锁。试想, 如果此协程锁上了, 然后发送了调度信号, 跳转到信号处理函数中, 此处又执行上锁, 那么就死锁了。然后有个巧妙的点就是在协程结束后调用的_co_exit
, 修改好数据结构后使用while(1)
阻塞在那来等待一个信号, 进入中断调度。
最后附上信号处理函数的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| void _co_signal_handler() { pthread_mutex_lock(&_co_sche.mutex);
_co_task_node_t *cur_task = _co_sche.cur_running_task; _co_task_node_t *iter_task = container_of(_co_sche.cur_running_task->node.next, _co_task_node_t, node); while(1) { if(iter_task->co_state == _CO_STATE_RUNNING) { break; } else { free(iter_task->co_alloc_stack); free(iter_task->exit_alloc_stack); free(iter_task); list_del(&iter_task->node);
iter_task = container_of(_co_sche.cur_running_task->node.next, _co_task_node_t, node); continue; } }
_co_sche.cur_running_task = iter_task; pthread_mutex_unlock(&_co_sche.mutex);
swapcontext(&cur_task->co_ctx, &iter_task->co_ctx); }
|
在扫描下一个RUNNNIG协程的途中, 我们通过判断co_state == _CO_STATE_RUNNING
来判断是否进行垃圾回收, 这里可以结合_co_exit
看看。
附完整代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
| #define _GNU_SOURCE
#include <pthread.h> #include "list.h" #include <ucontext.h> #include <stdatomic.h> #include <signal.h> #include <stdlib.h> #include <stdio.h>
#define _CO_NOTIFY_SIGNO SIGURG
#define _CO_EXIT_STACK_SIZE 4096
#define _CO_STACK_SIZE 128*1024
#define _CO_SLICE_NSEC 1000
#define _CO_SCHEDULE_N_SLICE 10
typedef void (*co_func_t)(void *);
typedef enum _co_state { _CO_STATE_RUNNING, _CO_STATE_EXIT } _co_state_t;
typedef struct _co_task_node { struct list_head node; int co_id; ucontext_t co_ctx; ucontext_t co_exit_ctx; _co_state_t co_state; void *co_alloc_stack; void *exit_alloc_stack; } _co_task_node_t;
typedef struct _co_schedule { _co_task_node_t task_main;
_co_task_node_t *cur_running_task; int n_task_running;
unsigned long id_inc;
pthread_mutex_t mutex; } _co_schedule_t;
_co_schedule_t _co_sche;
void _co_main_exit() { exit(0); }
void _co_signal_handler() { pthread_mutex_lock(&_co_sche.mutex);
_co_task_node_t *cur_task = _co_sche.cur_running_task; _co_task_node_t *iter_task = container_of(_co_sche.cur_running_task->node.next, _co_task_node_t, node); while(1) { if(iter_task->co_state == _CO_STATE_RUNNING) { break; } else { free(iter_task->co_alloc_stack); free(iter_task->exit_alloc_stack); free(iter_task); list_del(&iter_task->node);
iter_task = container_of(_co_sche.cur_running_task->node.next, _co_task_node_t, node); continue; } }
_co_sche.cur_running_task = iter_task; pthread_mutex_unlock(&_co_sche.mutex);
swapcontext(&cur_task->co_ctx, &iter_task->co_ctx); }
void *_main_thread_setup(void * data) { struct sigaction action; action.sa_flags = SA_SIGINFO|SA_RESTART; sigfillset(&action.sa_mask); action.sa_sigaction = _co_signal_handler; sigaction(_CO_NOTIFY_SIGNO, &action, NULL);
setcontext(&_co_sche.task_main.co_ctx); return NULL; }
void co_run(co_func_t func, void *arg) {
pthread_mutex_init(&_co_sche.mutex, NULL); _co_sche.n_task_running = 1; _co_sche.id_inc = 2;
void *main_stack = malloc(_CO_STACK_SIZE); void *main_exit_stack = malloc(_CO_EXIT_STACK_SIZE); _co_sche.task_main.co_alloc_stack = main_stack; _co_sche.task_main.exit_alloc_stack = main_exit_stack;
_co_sche.task_main.co_id = 1; _co_sche.task_main.co_state = _CO_STATE_RUNNING; _co_sche.task_main.node.prev = _co_sche.task_main.node.next = &_co_sche.task_main.node; _co_sche.task_main.co_alloc_stack = main_stack; _co_sche.task_main.exit_alloc_stack = main_exit_stack;
getcontext(&_co_sche.task_main.co_exit_ctx); sigfillset(&_co_sche.task_main.co_exit_ctx.uc_sigmask); sigdelset(&_co_sche.task_main.co_exit_ctx.uc_sigmask, SIGURG); _co_sche.task_main.co_exit_ctx.uc_stack.ss_flags = 0; _co_sche.task_main.co_exit_ctx.uc_stack.ss_size = _CO_EXIT_STACK_SIZE; _co_sche.task_main.co_exit_ctx.uc_stack.ss_sp = main_exit_stack; makecontext(&_co_sche.task_main.co_exit_ctx, _co_main_exit, 0, NULL);
getcontext(&_co_sche.task_main.co_ctx); sigfillset(&_co_sche.task_main.co_exit_ctx.uc_sigmask); sigdelset(&_co_sche.task_main.co_exit_ctx.uc_sigmask, SIGURG); _co_sche.task_main.co_ctx.uc_link = &_co_sche.task_main.co_exit_ctx; _co_sche.task_main.co_ctx.uc_stack.ss_flags = 0; _co_sche.task_main.co_ctx.uc_stack.ss_size = _CO_STACK_SIZE; _co_sche.task_main.co_ctx.uc_stack.ss_sp = main_stack; makecontext(&_co_sche.task_main.co_ctx, (void(*)())func, 1, arg);
_co_sche.cur_running_task = &_co_sche.task_main;
pthread_t tid; pthread_attr_t attr; pthread_attr_init(&attr); sigset_t set; sigfillset(&set); pthread_attr_setsigmask_np(&attr, &set); pthread_create(&tid, &attr, _main_thread_setup, NULL);
int cur_running_co_id = 1; int slice_spent = 0; unsigned long sleep_time_nsec = _CO_SLICE_NSEC; struct timespec sleep_ts = { .tv_sec = 0, .tv_nsec = sleep_time_nsec }; while(1) { nanosleep(&sleep_ts, NULL); pthread_mutex_lock(&_co_sche.mutex); if(_co_sche.n_task_running == 1 && _co_sche.cur_running_task->co_id == 1) { pthread_mutex_unlock(&_co_sche.mutex); continue; } if (cur_running_co_id != _co_sche.cur_running_task->co_id) { cur_running_co_id = _co_sche.cur_running_task->co_id; slice_spent = 0; pthread_mutex_unlock(&_co_sche.mutex); continue; } slice_spent ++; if (slice_spent > _CO_SCHEDULE_N_SLICE) { sigval_t v; int n = pthread_sigqueue(tid, _CO_NOTIFY_SIGNO, v); } pthread_mutex_unlock(&_co_sche.mutex); } }
void _co_exit() { sigset_t newmask; sigset_t oldmask; sigfillset(&newmask); pthread_sigmask(SIG_SETMASK, &newmask, &oldmask);
pthread_mutex_lock(&_co_sche.mutex); _co_sche.n_task_running --; _co_sche.cur_running_task->co_state = _CO_STATE_EXIT; pthread_mutex_unlock(&_co_sche.mutex);
pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
while(1); }
void co_create(co_func_t func, void *arg) { sigset_t newmask; sigset_t oldmask; sigfillset(&newmask); pthread_sigmask(SIG_SETMASK, &newmask, &oldmask);
pthread_mutex_lock(&_co_sche.mutex); _co_task_node_t *newtask = malloc(sizeof(_co_task_node_t)); newtask->co_state = _CO_STATE_RUNNING; newtask->co_id = _co_sche.id_inc; _co_sche.id_inc ++; _co_sche.n_task_running ++;
void *co_stack = malloc(_CO_STACK_SIZE); void *exit_stack = malloc(_CO_EXIT_STACK_SIZE); newtask->co_alloc_stack = co_stack; newtask->exit_alloc_stack = exit_stack;
getcontext(&newtask->co_exit_ctx); newtask->co_exit_ctx.uc_stack.ss_flags = 0; sigfillset(&newtask->co_exit_ctx.uc_sigmask); sigdelset(&newtask->co_exit_ctx.uc_sigmask, _CO_NOTIFY_SIGNO); newtask->co_exit_ctx.uc_link = NULL; newtask->co_exit_ctx.uc_stack.ss_flags = 0; newtask->co_exit_ctx.uc_stack.ss_size = _CO_EXIT_STACK_SIZE; newtask->co_exit_ctx.uc_stack.ss_sp = exit_stack; makecontext(&newtask->co_exit_ctx, _co_exit, 0);
getcontext(&newtask->co_ctx); newtask->co_ctx.uc_stack.ss_flags = 0; sigfillset(&newtask->co_ctx.uc_sigmask); sigdelset(&newtask->co_ctx.uc_sigmask, _CO_NOTIFY_SIGNO); newtask->co_ctx.uc_link = &newtask->co_exit_ctx; newtask->co_ctx.uc_stack.ss_flags = 0; newtask->co_ctx.uc_stack.ss_size = _CO_STACK_SIZE; newtask->co_ctx.uc_stack.ss_sp = co_stack; makecontext(&newtask->co_ctx, (void (*) (void))func, 1, arg); list_add_tail(&newtask->node, &_co_sche.task_main.node);
pthread_mutex_unlock(&_co_sche.mutex);
pthread_sigmask(SIG_SETMASK, &oldmask, NULL); }
void my_new_func2(void *p) { while(1) { printf("789\n"); } }
void my_new_func1(void *p) { while(1) { printf("456\n"); } }
void my_main_func(void *p) { co_create(my_new_func1, NULL); co_create(my_new_func2, NULL); while(1) { printf("123\n"); } }
int main() { co_run(my_main_func, NULL); }
|
更多思考(QA)
Q: 如果信号中断了系统调用?
A: 以SA_RESTART
设置的信号处理函数, 在返回时可以重启系统调用。然而并不是所有的函数都能被重启, 比如nanosleep
, select
, sendto
, sendmsg
等等。为了稳妥, 我们最好对每个系统调用做一层系统调用封装, 检查其返回值是否为EINTR
, 如果是, 那么重新执行。这是一个很庞大的工作量, 本文没有做这些封装(懒)。值得一提的是, go也对每个syscall
做了这种封装, 使得信号发生时系统调用能被正确的处理
Q: 协程间通信以及锁等基础设施?
A: 以后单独写篇文章来做这个
Q: 利用多线程, 像go一样?
A: 现在只实现了单线程版本, 以后会做多线程的协程调度