不同于Go的goroutine和Java 21的virtual threads这类“有栈协程”模型,C#和Python更常使用async/await的“无栈协程”模型来处理并发。本文探索C#的async/await并发模型,以及它与Go有栈协程的差异。

先看几段经典的C#异步代码

  1. 并发请求多个数据源,常用于微服务架构,并发从多个内容服务拉取信息,可以优化响应RT。这个例子展示了如何并发请求多个服务,并用Task.WhenAll等待所有结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

using System.Diagnostics;

Stopwatch totalSw = Stopwatch.StartNew();

Task<HttpResponseMessage> resp1 = GetWithTimingAsync("https://www.baidu.com", "百度");
Task<HttpResponseMessage> resp2 = GetWithTimingAsync("https://www.qq.com", "QQ");
Task<HttpResponseMessage> resp3 = GetWithTimingAsync("https://bing.com", "Bing");

await Task.WhenAll(resp1, resp2, resp3);
totalSw.Stop();
Console.WriteLine($"总耗时: {totalSw.ElapsedMilliseconds} ms");

static async Task<HttpResponseMessage> GetWithTimingAsync(string url, string name)
{
Stopwatch sw = Stopwatch.StartNew();
HttpClient client = new HttpClient();
try
{
HttpResponseMessage response = await client.GetAsync(url);
sw.Stop();
Console.WriteLine($"{name} 请求耗时: {sw.ElapsedMilliseconds} ms, 状态码: {response.StatusCode}");
return response;
}
catch (Exception ex)
{
sw.Stop();
Console.WriteLine($"{name} 请求失败,耗时: {sw.ElapsedMilliseconds} ms, 异常: {ex.Message}");
throw;
}
finally
{
client.Dispose();
}
}
  1. 这个例子展示了一个最基础的TCP Echo Server:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

using System.Net;
using System.Net.Sockets;

await RunTCPEchoServer();

static async Task RunTCPEchoServer()
{
var listener = new TcpListener(IPAddress.Loopback, 5000);
listener.Start();
Console.WriteLine("Server started on 127.0.0.1:5000");
while (true)
{
var client = await listener.AcceptTcpClientAsync();

_ = HandleClient(client);
}
}

static async Task HandleClient(TcpClient client)
{
using var c = client;
var stream = c.GetStream();

var buffer = new byte[1024];

while (true)
{
int n = await stream.ReadAsync(buffer);

if (n == 0)
break;

await stream.WriteAsync(buffer.AsMemory(0, n));
}
}
  1. 这个例子展示了后台任务fire-and-forget的写法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
_ = RunMyIntervalTaskAsync(); // just ignore Task handle

// wait forever
TaskCompletionSource src = new TaskCompletionSource();
await src.Task;

async static Task RunMyIntervalTaskAsync()
{
while (true)
{
Console.WriteLine("Do task");
await Task.Delay(3000);
}
}

总结:

  • 命名规约:要给async修饰的异步函数名加上Async后缀。
  • 调用异步函数给人的感觉是启动了一个异步流程,可以同时不阻塞地启动多个这样的流程。
  • async方法调用后会先同步执行,直到遇到第一个未完成的await才返回给调用方。更准确说法是:调用返回一个代表异步操作最终结果的Task,该操作可能已经开始,也可能已经完成,具体取决于方法实现。
  • 如果想要 fire and forget,类似Go启动一个背景goroutine,直接_ = XxxAsync()抛掉awaitable句柄即可。生产上真正使用时要捕获异常、接入日志、取消令牌,服务端长期后台任务优先用BackgroundService/队列消费者。

Go有栈协程序概述

看完上面的代码,可以知道无栈协程的使用相当简单,但从概念以及原理上来说它比有栈协程复杂很多。

使用Go协程,没有Task<>,也没有async/await这类关键字;启动一个协程就像启动一个线程一样自然;之后的Read、Write socket也全部都是同步API,用起来就像在C语言里使用线程 + 阻塞IO一样简单自然。自然可见,Go的协程模型比C#简单很多。

Go协程简单来说就是语言把每一个执行流抽象成了一个goroutine,也就是协程,这个goroutine里面有一条执行流的所有上下文信息,包括寄存器、当前执行指令的位置、栈等等信息。

实际上,main函数的那条执行流也是一个goroutine,从main goroutine可以创建其它goroutine,下面是一个简单的go echo server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package main

import (
"bufio"
"fmt"
"io"
"log"
"net"
)

func main() {
listener, err := net.Listen("tcp", ":9000")
if err != nil {
log.Fatal(err)
}

fmt.Println("Echo server listening on :9000")

for {
conn, err := listener.Accept()
if err != nil {
log.Println("accept error:", err)
continue
}

go handleConn(conn)
}
}

func handleConn(conn net.Conn) {
defer conn.Close()

addr := conn.RemoteAddr().String()
fmt.Println("client connected:", addr)

reader := bufio.NewReader(conn)

for {
line, err := reader.ReadString('\n')
if err != nil {
if err != io.EOF {
log.Println("read error:", err)
}
fmt.Println("client disconnected:", addr)
return
}

_, err = conn.Write([]byte(line))
if err != nil {
log.Println("write error:", err)
return
}
}
}

可见,使用goroutine就像使用线程一样自然,非常好用。

Go的并发模型是GMP模型。进程启动的时候会启动多个OS线程(被称为M,Machine的缩写),每个线程只有绑定调度器(被称为P,Processor的缩写)后才能调度协程(被称为G, Goroutine的缩写)。调度器的代码抽象可以理解为不停地取状态为runnable的goroutine进行执行。每次启动一个goroutine都创建了一个“调度体”对象,也就是goroutine。

当某个goroutine阻塞在某个read socket上时,这时其实并没有阻塞线程(Golang的网络通常会接入runtime netpoll,所以network fd都是用的non-blocking模式)。fd此时被挂在了epoll树上并且该goroutine被runtime park挂起,等到这个socket真正可read的时候,goroutine才恢复runnable状态,被“调度器”重新拿出来跑。

总结来说,这里有几个关键的名词需要理解:

  • 调度器:理解为一个不断poll runnable goroutine的for循环,抢goroutine运行。
  • 调度体/goroutine:保存goroutine上下文,这就是一条goroutine执行流的抽象。
  • epoller(仅限linux): 实现阻塞挂起,Go的runtime会把阻塞的goroutine挂起,等事件到来后再恢复。例如当read socket读到空(non-blocking模式返回EAGAIN)时,就把当前goroutine的状态改为挂起,并且在epoll上注册这个socket。
  • 有栈协程:保存了执行流完整的栈信息的协程。

关于 epoll 挂起,这里有一些代码可以加以佐证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}

此处waitRead就是把goroutine挂起到epoll上的相关逻辑,将fd的readable事件注册,并且挂起goroutine。

C#无栈协程是怎么实现的

结合我前面提到的几个例子很快就能理解C#异步的用法,这里我讲讲它的实现原理。

C#的async/await并不是靠”真·线程切换”来实现协程,而是通过编译器生成状态机,在await处将自身注册到所等待的awaiter上,保存执行上下文。在任务完成时,执行恢复回调,继续之前的流程。

下面先看Task究竟是什么,见下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Program
{
public async static Task Main()
{
int val = await GetNumberAsync();
Console.WriteLine(val);
}

static Task<int> GetNumberAsync()
{
TaskCompletionSource<int> src = new();
Thread thread = new Thread(() =>
{
Thread.Sleep(3000);
src.SetResult(1);
})
{ IsBackground = true };
thread.Start();
return src.Task;
}
}
// 运行结果: 3s后输出1

可以看到,只要给Task SetResult,之前await它的执行流就会恢复了,我们可以直接使用TaskCompletionSource来创建一个source。

实际上,我们平时使用的很多 Task.Delay()ReadAsync() 等方法,底层都会返回类似这样的Task给调用方。调用方await它时,执行流会挂起;等Task完成后,再恢复执行。

接下来我们来看async函数有什么“魔法”,实际上,async函数会被编译器直接翻译成一个“状态机”,它类似这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
static async Task<int> FooAsync()
{
Console.WriteLine("A");

int x = await GetNumberAsync();

Console.WriteLine("B");

return x + 1;
}

static Task<int> GetNumberAsync()
{
TaskCompletionSource<int> src = new();
Thread thread = new Thread(() =>
{
Thread.Sleep(3000);
src.SetResult(1);
})
{ IsBackground = true };
thread.Start();
return src.Task;
}

// 上面是用户代码,下面是编译器伪代码,看个意思,代码并不严谨

static Task<int> FooAsyncAfterInterpreted()
{
FooAsyncStateMachine stateMachine = new FooAsyncStateMachine();

stateMachine.builder = AsyncTaskMethodBuilder<int>.Create();
// 初始值-1
stateMachine.state = -1;

// 执行一次MoveNext
stateMachine.builder.Start(ref stateMachine);

return stateMachine.builder.Task;
}

private struct FooAsyncStateMachine : IAsyncStateMachine
{
public int state;
public AsyncTaskMethodBuilder<int> builder;

// 会把局部变量的上下文存储在状态机里面
private int x;
private TaskAwaiter<int> awaiter;

private void MoveNext()
{
int result;

try
{
TaskAwaiter<int> localAwaiter;
// state == -1,执行第一个await之前的代码
if (state == -1)
{
Console.WriteLine("A");

Task<int> task = GetNumberAsync();
localAwaiter = task.GetAwaiter();

if (!localAwaiter.IsCompleted)
{
// state == 0: 等待第一个await中
state = 0;
awaiter = localAwaiter;

builder.AwaitUnsafeOnCompleted(
ref localAwaiter,
ref this);

return;
}
}
// 从state == 0,也就是等待第一个await中恢复过来
else if (state == 0)
{
localAwaiter = awaiter;
awaiter = default;
state = -1;
}

x = localAwaiter.GetResult();

Console.WriteLine("B");

result = x + 1;
}
// state=-2 -> 已完成
catch (Exception ex)
{
state = -2;
builder.SetException(ex);
return;
}

state = -2;
builder.SetResult(result);
}

public void SetStateMachine(IAsyncStateMachine stateMachine)
{
builder.SetStateMachine(stateMachine);
}
}

下面总结几个关键点:

  • FooAsync被编译器翻译成了一个返回Task的普通函数,并且真实运行逻辑以及上下文被打包进了一个状态机struct。
  • FooAsync的核心逻辑在StateMachine的MoveNext中,state定位当前运行状态(await位置以及是否执行完毕)。
  • 异步调用的局部变量上下文被保存在了StateMachine对象当中。
  • await处编译器做了改动,检查awaiter是否完成,没有完成就把continuation(指接下来要继续执行的那段代码)注册到对应的Task上去。底层Task完成后,触发之前注册的continuation,也就是调用自己列表里面所有continuation的MoveNext。

C#线程模型与异步的关系

创建TaskCompletionSource可以使用(几乎总是)一个参数RunContinuationsAsynchronously。观察下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class Program
{
public async static Task Main()
{
Console.WriteLine($"1: {Environment.CurrentManagedThreadId}");
int val = await GetNumberAsync();
Console.WriteLine($"2: {Environment.CurrentManagedThreadId}");
}

static Task<int> GetNumberAsync()
{
Console.WriteLine($"3: {Environment.CurrentManagedThreadId}");
TaskCompletionSource<int> src = new(TaskCreationOptions.RunContinuationsAsynchronously);
Thread thread = new Thread(() =>
{
Console.WriteLine($"4: {Environment.CurrentManagedThreadId}");
Thread.Sleep(3000);
src.SetResult(1);
})
{ IsBackground = true };
thread.Start();
return src.Task;
}
}
// 输出:
// 1: 1
// 3: 1
// 4: 4
// 2: 5

// 当不使用RunContinuationsAsynchronously参数的时候输出:
// 1: 1
// 3: 1
// 4: 4
// 2: 4

可以看到await结束,之前运行在thread1的执行流实际上换到了thread5执行,若不指定参数,执行流可能(受SynchronizationContext和非默认TaskScheduler的影响,这点以后再说)直接在SetResult线程恢复执行。

若指定了RunContinuationsAsynchronously参数,恢复continuation具体流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
await 发现 Task 尚未完成,状态机注册 continuation(若Task已经完成则在当前线程继续跑)

TaskCompletionSource.SetResult 完成 Task

如果有 RunContinuationsAsynchronously,Task 完成时会要求 continuation 不要同步内联运行在当前调用栈里,而是投递到对应调度器

continuation 被投递给调度器

调度器通常是 ThreadPool / UI SynchronizationContext / 自定义 TaskScheduler

某个线程取出 continuation

调用状态机 MoveNext()

典型的异步 I/O也通常倾向于避免在底层I/O完成回调里直接执行过重的用户continuation,否则 I/O 完成路径的耗时会变得不可控。因此有可能,你await一个异步IO回来,发现执行流在别的线程上了。需要注意的是这些 API 底层不一定是通过TaskCompletionSource + RunContinuationsAsynchronously实现的,底层可能是别的东西,但我这里就不感知了,但想必是类似的。

在普通 Console/服务端场景中,异步 I/O 完成后的 continuation 通常会被调度到.NET ThreadPool上执行。ThreadPool也有本地队列、全局队列和work stealing机制,这一点和Go调度器中的work stealing思路有相似之处。

在写一般的异步代码时,实际上是用的多线程,下面是echo server静默时候的线程情况(ps -T -p pid):

1
2
3
4
5
6
dotnet              // 父进程
.NET Finalizer // GC finalizer 线程
.NET SigHandler // Linux 信号处理线程
.NET Sockets // epoll挂起相关
.NET Timer // Timer相关
.NET TP Wait // ThreadPool 的等待/注册等待相关线程

此刻有一个预备的线程,若某个Task完成进入MoveNext代码的时候,也是此线程poll到了MoveNext任务进行执行。当任务的队列开始积压的时候就会建立更多Thread来处理,有动态扩/缩容策略,下面是代码中CPU较为密集(CPU Bound)的时候,ThreadPool消费速率不足时的线程状态:

1
2
3
4
5
6
7
8
9
10
dotnet
.NET Finalizer
.NET SigHandler
.NET TP Worker
.NET TP Gate // 线程池的调度控制器,专门管 “什么时候加线程、加多少、要不要缩容”
.NET TP Worker
.NET TP Worker
.NET Sockets
.NET Timer
.NET TP Wait // 还在等任务,线程池工作线程处于等待状态

CPU Bound/IO Bound

CPU Bound的意思是消耗大量CPU时间的计算型任务,如:

  • 大量数学计算
  • 耗时的加密解密
  • 图像处理

IO Bound的意思是不怎么消耗CPU时间,主要是等待IO的等待外部资源返回数据的任务,如:

  • 网络
  • 磁盘
  • 数据库访问
  • 用户/设备输入

观察到上述C#的协程是被运行在ThreadPool中的,也就是当Task完成时,continuation被投递到ThreadPool中运行(此处暂时就这么认为,后续介绍上下文和调度器会推翻这个简单的假设)。新手常见的错误是在async函数里面运行长时间消耗CPU时间的任务,例如大量计算,加密,图像处理等等,下面是两个负面例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private async Task BadHandleClientAsync1(TcpClient client)
{
using var stream = client.GetStream();

while (true)
{
byte[] requestBytes = await ReadFrameAsync(stream);

byte[] decrypted = Decrypt(requestBytes);
byte[] decompressed = Decompress(decrypted);
Request request = DeserializeLargeJson(decompressed);
// CPU-heavy: 复杂业务计算,里面可能有非常复杂的数学计算,图像处理,耗时很长
Response response = RunComplexBusinessRules(request);
byte[] responseJson = SerializeLargeJson(response);
byte[] compressed = Compress(responseJson);
byte[] encrypted = Encrypt(compressed);
byte[] responseBytes = BuildFrame(encrypted);

await stream.WriteAsync(responseBytes, 0, responseBytes.Length);
}
}

private async Task BadHandleClientAsync2(TcpClient client)
{
using var stream = client.GetStream();

while (true)
{
byte[] requestBytes = await ReadFrameAsync(stream);

byte[] responseBytes = await Task.Run(() =>
{
byte[] decrypted = Decrypt(requestBytes);
byte[] decompressed = Decompress(decrypted);
Request request = DeserializeLargeJson(decompressed);
Response response = RunComplexBusinessRules(request);
byte[] responseJson = SerializeLargeJson(response);
byte[] compressed = Compress(responseJson);
byte[] encrypted = Encrypt(compressed);
return BuildFrame(encrypted);
});
await stream.WriteAsync(responseBytes, 0, responseBytes.Length);
}
}

在极端的高并发场景下,上面两个例子,不论是否使用Task.Run都不太是正确的选择,原因如下:

  • 关于例子1,协程是在ThreadPool线程里运行的,如果在里面执行CPU耗时操作,其实会把这个线程给占住,导致该线程没法消费处理其它已恢复的continuation。当所有线程被用完的时候(均处于CPU计算而阻塞),Gate线程就会创建新的线程来跑这个continuation。因此有很大的创建过量线程的风险。
  • 关于例子2,Task.Run的语义是把任务投递给ThreadPool,也是同样的问题,如果线程池里面的任务长时间占据线程,也会面临Gate启动过多线程的问题。

那么到底怎么解决呢?首先明确一个问题,对于CPU密集任务增加线程数目不能提高执行效率,一个核心同时只能跑一个执行流,所以可以考虑将IO Threads和Worker Threads分开,其中Worker Threads的数量为CPU核心数。下面为一个典型的CPU/IO分离实现思路:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
using System.Collections.Concurrent;

public class Program
{
public static async Task Main()
{
var cpuQueue = new CpuWorkQueue(
workerCount: 2,
capacity: 10,
name: "dedicated-cpu-worker");

var tasks = new List<Task<int>>();

Console.WriteLine($"Main thread: {Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine("Start enqueue CPU tasks...");
Console.WriteLine();

for (int i = 0; i < 6; i++)
{
int taskId = i;

Task<int> task = cpuQueue.Enqueue(() =>
{
Console.WriteLine(
$"Task {taskId} started on thread {Thread.CurrentThread.ManagedThreadId}");

// 模拟 CPU 密集型计算
var result = HeavyCpuCompute(taskId);

Console.WriteLine(
$"Task {taskId} finished on thread {Thread.CurrentThread.ManagedThreadId}");

return result;
});

tasks.Add(task);
}

Console.WriteLine("All tasks have been enqueued.");
Console.WriteLine("Main thread is not blocked while CPU workers are running.");
Console.WriteLine();

int[] results = await Task.WhenAll(tasks);

Console.WriteLine();
Console.WriteLine("All CPU tasks finished.");
Console.WriteLine("Results:");

foreach (var result in results)
{
Console.WriteLine(result);
}
}

private static int HeavyCpuCompute(int input)
{
// 真实场景里这里通常是复杂计算、压缩、加密、图像处理、规则引擎等。
// Sleep这里只是演示工作队列调度效果
Thread.Sleep(2000);

return input * input;
}
}

public sealed class CpuWorkQueue
{
private readonly BlockingCollection<Action> _queue;
private readonly Thread[] _workers;

public CpuWorkQueue(int workerCount, int capacity = 1024, string name = "cpu-worker")
{
if (workerCount <= 0)
throw new ArgumentOutOfRangeException(nameof(workerCount));

if (capacity <= 0)
throw new ArgumentOutOfRangeException(nameof(capacity));

_queue = new BlockingCollection<Action>(boundedCapacity: capacity);
_workers = new Thread[workerCount];

for (int i = 0; i < workerCount; i++)
{
var index = i;

_workers[i] = new Thread(WorkerLoop)
{
IsBackground = true,
Name = $"{name}-{index}"
};

_workers[i].Start();
}
}

public Task<T> Enqueue<T>(Func<T> work)
{
var tcs = new TaskCompletionSource<T>(
TaskCreationOptions.RunContinuationsAsynchronously);

var item = new Action(() =>
{
try
{
var result = work();
tcs.TrySetResult(result);
}
catch (Exception ex)
{
tcs.TrySetException(ex);
}
});

// 不要无限排队。队列满了就明确失败,形成背压。
if (!_queue.TryAdd(item))
{
tcs.TrySetException(
new InvalidOperationException("CPU work queue is full."));
}

return tcs.Task;
}

private void WorkerLoop()
{
foreach (var work in _queue.GetConsumingEnumerable())
{
work();
}
}
}

协程调度

上面提到了SynchronizationContextTaskScheduler,它们不但难以理解而且还很重要。它们直接决定了await Task结束后,此协程在哪里恢复运行。这里有个调度的优先级规则,如下:

1
2
3
4
5
6
7
8
如果 SynchronizationContext.Current != null
用 SynchronizationContext.Post(...) 调度 continuation

否则如果 TaskScheduler.Current != TaskScheduler.Default
用当前 TaskScheduler 调度 continuation

否则
ThreadPool

默认情况下,普通Console程序,SynchronizationContext.Current是null,且使用TaskScheduler.Default(也就是ThreadPool)作为调度器。

SynchronizationContext是一个线程维度的属性,在ThreadPool中它是null,它用来决定我await一个async函数后,接下来自己的continuation在哪里执行。下面代码展示了如何自定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

public sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
private readonly BlockingCollection<(SendOrPostCallback Callback, object? State)> _queue = new();

private readonly int _ownerThreadId;

public SingleThreadSynchronizationContext()
{
_ownerThreadId = Thread.CurrentThread.ManagedThreadId;
}

public override void Post(SendOrPostCallback d, object? state)
{
// 异步投递,不阻塞调用方
_queue.Add((d, state));
}

public override void Send(SendOrPostCallback d, object? state)
{
// 如果已经在目标线程上,直接执行
if (Thread.CurrentThread.ManagedThreadId == _ownerThreadId)
{
d(state);
return;
}

using var done = new ManualResetEventSlim(false);
Exception? exception = null;

Post(s =>
{
try
{
d(s);
}
catch (Exception ex)
{
exception = ex;
}
finally
{
done.Set();
}
}, state);

done.Wait();

if (exception != null)
{
throw exception;
}
}

public void RunOnCurrentThread()
{
foreach (var workItem in _queue.GetConsumingEnumerable())
{
workItem.Callback(workItem.State);
}
}

public void Complete()
{
_queue.CompleteAdding();
}
}

class Program
{
static async Task Main()
{
var context = new SingleThreadSynchronizationContext();

SynchronizationContext.SetSynchronizationContext(context);

Console.WriteLine($"Main start thread: {Thread.CurrentThread.ManagedThreadId}");

var task = DoAsync();

task.ContinueWith(_ => context.Complete());

context.RunOnCurrentThread();

await task;

Console.WriteLine("Program finished.");
}

static async Task DoAsync()
{
Console.WriteLine($"Before await thread: {Thread.CurrentThread.ManagedThreadId}");

await Task.Delay(1000);

Console.WriteLine($"After await thread: {Thread.CurrentThread.ManagedThreadId}");
}
}

// 输出
/*
Main start thread: 1
Before await thread: 1
After await thread: 1
Program finished.
*/

理解上面代码有以下要点:

  • SetSynchronizationContext用来设置当前执行线程的SynchronizationContext,简称上下文。
  • await task时,如果任务未完成、需要挂起,则会注册continuation,并默认按优先级捕获当前调度环境:
    优先捕获当前SynchronizationContext;
    如果没有合适的SynchronizationContext,则可能使用当前非默认TaskScheduler;
    如果也没有非默认TaskScheduler,则走默认continuation调度逻辑。
  • await注册continuation时捕获当前执行点的调度环境,即SynchronizationContext OR TaskScheduler,最后决定task完成时恢复该continuation的位置。
    如果捕获到了SynchronizationContext,continuation的调度逻辑通常会通过context.Post执行,让自定义SynchronizationContext决定后续代码在哪里运行。
    如果没有捕获到合适的SynchronizationContext,则可能使用当前非默认TaskScheduler;
    如果当前TaskScheduler也是默认调度器,则走默认continuation调度逻辑。
  • await task时,如果任务已经完成,则此处通常会直接继续执行,不会挂起,也不会把continuation注册到该task上,因此不会经过SynchronizationContext.Post或TaskScheduler调度。
  • 如果确实想强制把后续代码重新排队调度一次,可以使用 await Task.Yield()。
    它返回的是一个特殊awaitable,它的awaiter通常会让IsCompleted == false,所以它会强制异步让出当前执行流,然后把后续continuation交给当前SynchronizationContext或TaskScheduler调度。
    但这不是每次await都要配套使用的东西,业务代码中比较罕见,也通常不推荐。
    1
    2
    3
    4
    5
    public static async Task AwaitWithYieldAsync(Task task)
    {
    await task.ConfigureAwait(true);
    await Task.Yield();
    }
  • await task.ConfigureAwait(false) 会禁止 continuation 捕获当前 SynchronizationContext 和当前非默认 TaskScheduler,因此不会强制回到原上下文/原调度器执行。

TaskScheduler决定了一个task continuation被安排到哪里执行,默认的TaskScheduler是TaskScheduler.Default也就是ThreadPool,使用它可以做到这些事情:

  • 所有任务只在一个线程上执行;
  • 限制最大并发数;
  • 给任务排优先级;
  • 把任务投递到某个专用线程。

下面是一个自定义TaskScheduler的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
using System.Collections.Concurrent;

public sealed class SingleThreadTaskScheduler : TaskScheduler, IDisposable
{
private readonly BlockingCollection<Task> _tasks = new();
private readonly Thread _thread;

public SingleThreadTaskScheduler(string name = "SingleThreadTaskScheduler")
{
_thread = new Thread(Run)
{
IsBackground = true,
Name = name
};

_thread.Start();
}

private void Run()
{
foreach (var task in _tasks.GetConsumingEnumerable())
{
TryExecuteTask(task);
}
}

protected override void QueueTask(Task task)
{
_tasks.Add(task);
}

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// 为了保证所有 Task 都在专用线程执行,这里禁止 inline
return false;
}

protected override IEnumerable<Task>? GetScheduledTasks()
{
return _tasks.ToArray();
}

public void Dispose()
{
_tasks.CompleteAdding();
}
}


class Program
{
static void Main()
{
using var scheduler = new SingleThreadTaskScheduler();

var factory = new TaskFactory(scheduler);

Console.WriteLine($"Main thread: {Thread.CurrentThread.ManagedThreadId}");

Task task1 = factory.StartNew(() =>
{
Console.WriteLine($"Task1 thread: {Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine($"Task1 scheduler is custom: {TaskScheduler.Current == scheduler}");
});

Task<int> task2 = factory.StartNew(() =>
{
Console.WriteLine($"Task2 thread: {Thread.CurrentThread.ManagedThreadId}");
Console.WriteLine($"Task2 scheduler is custom: {TaskScheduler.Current == scheduler}");
return 123;
});

Task.WaitAll(task1, task2);
}
}

可以使用TaskFactory自定义一个新的环境,设置这个环境的TaskScheduler。值得一提的是如果当前同时设置了上下文和调度器,调度器其实是没有用的,形同虚设。

这里解答下为什么没有指定异步参数的TaskCompletionSource.SetResult为什么仅仅是可能在原地跑(更确切的说是内联执行),原因就是Task完成时,会看被注册在自己身上的continuation的运行环境(上下文或者调度器),如果它和SetResult处的运行环境相同,就没必要丢队列了,自己执行也OK。出于性能的目的,才有了内联执行的概念。

UI协程

在很多游戏框架里面,允许使用协程让代码更好读,避免回调地狱。下面是一段回调地狱代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private void Button_Click(object sender, EventArgs e)
{
button.Enabled = false;
textBox.Text = "Loading...";

DoHttpGetAsync("https://example.com/user", userResult =>
{
// 回到 UI 线程
this.BeginInvoke(new Action(() =>
{
textBox.Text = "User loaded: " + userResult;

DoHttpGetAsync("https://example.com/orders?user=" + userResult, ordersResult =>
{
// 再回到 UI 线程
this.BeginInvoke(new Action(() =>
{
textBox.Text += Environment.NewLine + "Orders loaded: " + ordersResult;

DoHttpGetAsync("https://example.com/detail?order=" + ordersResult, detailResult =>
{
// 再回到 UI 线程
this.BeginInvoke(new Action(() =>
{
textBox.Text += Environment.NewLine + "Detail loaded: " + detailResult;

DoHttpPostAsync("https://example.com/log", detailResult, logResult =>
{
// 再回到 UI 线程
this.BeginInvoke(new Action(() =>
{
textBox.Text += Environment.NewLine + "Log saved: " + logResult;
button.Enabled = true;
}));
},
ex =>
{
this.BeginInvoke(new Action(() =>
{
MessageBox.Show("Save log failed: " + ex.Message);
button.Enabled = true;
}));
});
}));
},
ex =>
{
this.BeginInvoke(new Action(() =>
{
MessageBox.Show("Load detail failed: " + ex.Message);
button.Enabled = true;
}));
});
}));
},
ex =>
{
this.BeginInvoke(new Action(() =>
{
MessageBox.Show("Load orders failed: " + ex.Message);
button.Enabled = true;
}));
});
}));
},
ex =>
{
this.BeginInvoke(new Action(() =>
{
MessageBox.Show("Load user failed: " + ex.Message);
button.Enabled = true;
}));
});
}

但是如果有了协程,生活就更简单了,整个流程看起来就是同步的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private async void Button_Click(object sender, EventArgs e)
{
button.Enabled = false;
textBox.Text = "Loading...";

try
{
string user = await DoHttpGetAsync("https://example.com/user");
textBox.Text = "User loaded: " + user;

string orders = await DoHttpGetAsync("https://example.com/orders?user=" + user);
textBox.Text += Environment.NewLine + "Orders loaded: " + orders;

string detail = await DoHttpGetAsync("https://example.com/detail?order=" + orders);
textBox.Text += Environment.NewLine + "Detail loaded: " + detail;

string log = await DoHttpPostAsync("https://example.com/log", detail);
textBox.Text += Environment.NewLine + "Log saved: " + log;
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
finally
{
button.Enabled = true;
}
}

上面的做法避免的回调地狱,看起来是同步代码,实际上底层是异步的。

UI协程的原理

一个典型游戏程序通常有一个主线程上的帧循环,它被称为帧循环即frame loop。

这个循环不断poll外部事件(比如定时器,窗口系统事件,输入事件…),然后根据上一帧到当前帧的时间差 deltaTime 更新游戏世界,最后渲染当前帧并提交给显示系统。

下面是伪代码,可以看个意思:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
var gameContext = new GameSynchronizationContext();
SynchronizationContext.SetSynchronizationContext(gameContext);

while (running)
{
// 尽早处理窗口消息,避免系统认为程序无响应
PollEvents();

// 处理投递回主线程的回调:
// - async/await continuation
// - RunInLoop/Post 回调
// - 其他用户态主线程任务
gameContext.ExecutePendingCallbacks();

// 计算时间
double dt = timer.GetDeltaTime();

// 更新世界
Update(dt);

// 尽量靠近渲染前再采样关键输入
PollInputLate();

// 渲染当前帧
Render();

// 提交帧,可能阻塞在 VSync
Present();
}

public sealed class GameSynchronizationContext : SynchronizationContext
{
private readonly ConcurrentQueue<(SendOrPostCallback Callback, object? State)> _queue = new();

public override void Post(SendOrPostCallback d, object? state)
{
_queue.Enqueue((d, state));
}

public void ExecutePendingCallbacks()
{
while (_queue.TryDequeue(out var item))
{
try
{
item.Callback(item.State);
}
catch (Exception ex)
{
Console.WriteLine("Async continuation exception: " + ex);
}
}
}
}

private void PollEvents()
{
// 操作系统来的事件:键盘/鼠标/手柄输入,窗口状态(resize/关闭/最小化/移动/失去获得焦点/DPI变化),文件拖拽,窗口重绘
PollWindowEvents();
// linux有timerfd,会比较方便
PollTimerEvents();
}

关于窗口重绘事件:游戏是主动去绘制然后调用GPU进行Swap Buffer的,并不是窗口重绘事件来了才进行绘制并进行Swap,因此这个窗口重绘事件其实在游戏中不重要。

像记事本这类传统GUI程序通常是事件驱动的,不会像游戏一样每帧主动重绘。它们主要在窗口内容失效、窗口大小变化、被遮挡后重新显示、文本变化等情况下,由系统发送重绘/绘制事件。程序收到绘制事件后,需要在系统指定的绘制上下文中把失效区域重新画出来,并在绘制结束后通知系统该区域已经有效。

network async底层

现在有的同学肯定很好奇,这么好用的network async库底层是怎么实现的呢,这里我给出一个linux的实现方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
await stream.ReadAsync(buffer)

NetworkStream.ReadAsync

Socket.ReceiveAsync / ReceiveAsync(SocketAsyncEventArgs)

进入该 socket 对应的 SocketAsyncContext

构造或复用一个 receive/read operation

进入读方向 operation queue 的调度逻辑

如果前面没有更早的 pending read,并且 non-blocking recv 立即读到数据:
同步完成本次 operation
返回已完成的 Task/ValueTask

如果当前 fd 暂时不可读,例如 recv 返回 EAGAIN/EWOULDBLOCK,
或前面已经有更早的 pending read:
把本次 operation 留在 read operation queue 中
返回尚未完成的 Task/ValueTask

await 发现 Task/ValueTask 尚未完成

当前 async 方法状态机挂起,continuation 注册到这个异步结果上

SocketAsyncEngine 通过 epoll 等待该 fd 的 readable 事件

.NET Sockets 事件线程收到 epoll 事件

SocketAsyncEngine 根据事件里的索引/数据找到对应 SocketAsyncContext

默认把对应 I/O 事件投递给 ThreadPool 处理

ThreadPool 线程推进 SocketAsyncContext 的读队列

按顺序取出 pending read/receive operation

再次尝试 non-blocking recv

如果成功读到数据或读到 EOF:
完成该 operation
设置读取字节数 / socket 状态
完成对应 Task/ValueTask

如果仍然返回 EAGAIN/EWOULDBLOCK:
operation 继续保持 pending
等待后续 readable 事件

Task/ValueTask 完成后

await continuation 按捕获到的 SynchronizationContext / TaskScheduler / ThreadPool 规则恢复执行

每个socket都有自己的SocketAsyncContext,其中包含read operation queuewrite operation queue两个操作队列。read队列里面可以存储类读事件比如ReceiveAccept。write队列里面存储的事件比如Write/Send/SendFile/Connect。需要对读写做排队是因为读/写讲究一个顺序问题,需要按先后顺序进行read操作,至少不能让后ReadAsync的人拿到靠前的数据。

SocketAsyncEngine 根据事件里的索引/数据找到对应 SocketAsyncContext,这里的索引指的是这样的数据结构:

1
2
private static SocketAsyncContext?[] s_registeredContexts = [];
private static readonly Queue<int> s_registeredContextsFreeList = [];

epoll_event.data里面有一个index,可以通过s_registeredContexts[index]拿到真正的SocketAsyncContext。