简介
IM系统可以简单理解为聊天系统,用户感知到的QQ聊天,电商客服,AI客服都是典型的im。
这类产品的用户体验是:实时性收发消息。在很多大厂,这类业务可能由很多不同部门共同配合才能做到。例如长连网关可能是“统一接入组”负责的,im业务则可能是社交工程组负责的。

本文聚焦最上面的长连网关。
统一接入部门:外网接入一个公司的内部api有两种方式:http(短链) or tcp/websocket(双向长连)。统一接入部门会负责做这两类接入,分别实现长连网关+http api网关。其它业务部门需要follow统一接入部门的sop来接入他们,实现自己的业务需求,比如私信群聊。
最近面试滴滴,单车实际上会和长连网关建连,然后业务数据会通过长连网关“上行”到业务接入侧,然后业务侧可以“下行”信息单播给某个单车。
前言
长连是客户端和服务端进行通讯的双工通道,多业务可基于长连接进行数据上下行的实时收发。
本文剖析长连网关能力。在目前的所有app中长连网关是一个“中台能力”, 不同业务可以复用同一条长连接, 可以实现非常多功能,包括但不限于:
- 聊天业务的站内push:例如一个新的im消息到来,需要推送给手机端显示,让客户端感知到新消息
- 踢人下线推送:多设备管理可以踢出另外一个用户,此时可以invalidate另外一些设备的token,并且做长连kick操作,让客户端”主动下线”(token失效+长连断开)
- 消息红点的站内push:xhs会有消息红点服务,也就是消息服务,这里消息红点的维护实际上也是推拉结合,msg_detect接口定时纠偏,然后长连站内push实时增加红点计数
- 直播间聊天: 实际上大家进入直播间后,还可以在聊天室里面刷礼物聊天,可以接入长连,这个可以接入长连room能力(本文没有提到)
- 客服IM:电商单独接入,自己做ai agent或者座席聊天,可以复用长连中台能力
- app热修fix: 让客户端下载热修patch包修复线上问题
- ai chatbot: ai聊天实时推送,如chatgpt可以选用sse,但是也可以接入app的通用长连
中台能力/中台化:中台能力实际上指的是”通用能力”,我们搭建一个中台系统,大伙都可以接入。例如在小红书,会有评论中台,地图点位评论和书影音评论都可以以较低的成本接入中台,迅速提供评论能力。
显然长连接系统也是可以“中台化”的,一条长连接,提供给不同业务域使用。

站外push/站内push?
站外push:走的运营商厂商push(不同的机型/品牌一一适配): 给定device fingerprint,然后推送指定内容。app可以注册一个回调,即使app被挂起也能显示出这样的推送物料,作为长连网关,我们不感知站外push,站外push可以由别的团队支持和维护,对于业务方,发一个mq即可。点击此处查看介绍。
站内push: 也就是长连下行,可以简单理解为,向某个设备的长连接发消息即可。
长连的通用能力
长连通用能力:
- 基本的建立长连接,维护设备状态: 维护设备状态: offline(建立连但未登录态)/online(登录态)/background(后台挂起态)
1
| 客户端发起连接 -> sdk选长连网关ipport,建立tcp连接 -> 登录信令进行登录
|
- 查询在线设备信息: 可以按用户维度查询在线状态
- 客户端上行: 带业务方ack的, 或者不带业务方ack的
- 不可靠下行: 不可靠推送, 如果uid/device_id在线,直接推,反之直接丢弃
- 可靠下行: 无论uid/device_id是否存在,都存储下推数据,用户连接存在下推后等待,直到端侧ACK到达后,删除基于端的下推数据;若用户连接不存在,等用户在线后拉取增量数据, 做消息ack
可靠推送要先定序,然后存储,再下行,客户端收到gap后直接拉齐消息(可能客户端侧会3s定时检测连续,然后拉空洞)。定seq可以用redis做。
- 保序推送/非保序请求: 客户端收到的下行消息的顺序和服务端收到的上行消息顺序是否一致
目前比较常见的系统,上行网关调用业务是异步的,所以客户端要接受这样的事实:向长连连续发两个data packet 1和2,data packet2先被处理。或者data packet2先被处理,但是ack1却先到达客户端。
保序的必要性?
先讨论几种需求
- 客户端的上行消息需要ack, 但是ack不要求保序(目前我所在公司所用的)
这种情况需要做一个上行网关up router(因为下游阻塞会有用光线程池的风险, 不能在transmission server直接调用下游业务,接入服务风险降低)
这种情况, 最好transmission server调用up router, 投递上行任务, 然后up router向业务投递业务消息, 业务干完了回调up router
这样可以发给up router ack, 然后up router再将ack写回给transmission server, transmission server将消息写回socket给客户端。

异步ack的好处是: 下游劣化的风险小, 因为ack都异步了, 不用阻塞rpc等业务回ack。
但是坏处也是有的: 就是客户端连续发req1, req2, 可能req2先被执行, 且ack回复的顺序也是不确定的。im场景, 用户会有感知: 先发消息1, 再发消息2, 可能后续消息2在前(先执行, 先被定序)。
- 客户端的上行消息需要ack, 但是ack要求保序
这种情况也需要一个上行网关up router, 且需要一致性hash, 将一个uid+deviceId+bizName定位到一个内存队列上, 一个个调用下游业务方, 然后一个个回调给transmission server
- 客户端的上行消息不需要ack
这种情况给业务方提供一个消息队列, 将一个uid+deviceId+bizName做partition即可
这样ack只能业务方自行发下行rpc了, 想要保序就利用下行网关的顺序下行能力。
能够管理多端设备,kick其它设备,或者能够实现 android/ipad/电脑 各终端仅能登录一个设备。例如两个android设备登录,实际上后者会让前者被kick掉。

网关信令:
- login: 登录
- offline: 断连
- ping: 心跳
- foregroud: 切换前台模式
- background: 切换后台模式
- logout: 长连登录态且登出
- kickout: 踢出设备
- addtag: 加入tag(见本文其它部分的长连tag能力)
- remtag: 删除tag
业务数据belike:
1 2 3
| bizName: chat-im cmd: send_msg body: data...
|
名词解释:
- 上行:消息由端侧发给服务端
- 下行:消息由服务端发给端侧
- 信令:也就是命令,分为网关信令,和业务数据,网关信令是网关处理的,业务数据是转发给业务域的
- 可靠推送: 需要网关侧进行存储,客户端收到了ack手动删除,比如一些推荐朋友,或者app热修信息
- 不可靠推送: 查完register服务,直接对设备进行下行,不要求客户端回复ack
- 热修: 针对某个出问题的app包,下发一个带有修复代码/资源/so的热修包,用户成功加载patch包后,及时生效,就实现了问题修复
长连网关的架构
- 大体架构图

- tansmission-service: 接长连接的,netty实现的外围服务,解析信令。仅仅作为上下行数据的收发,通道侧
- up-router: 上行网关,收到uid_deviceId_bizName维度的mq消费,异步调用,不阻塞,不等待,异步回复ack。
- register-service: 登录/kick/注销/在线状态查询相关
- sync-server: 下行网关,收到uid_deviceId_bizName维度的mq消费,一个个调用transmission-service进行下行
- auth-service: 实际上就是封装了一些鉴权逻辑,这些逻辑丢register-service也合理
下行多mq/rpc是为什么: 高低优先级,rpc对时延很敏感。
上行网关的上行做异步: 例如一个长连同时写cmd1 + cmd2,实际上cmd1 和 cmd2的执行先后顺序不能确定。
所以此处常见的用户体验问题是这样的: 一个用户先后做发两条消息的动作: 来北京,来上海,实际上最后可能业务放定序会定成 来上海,来北京。业务应该感知到这个异步上行的问题。
为什么需要上行网关? 原因是rpc上行,它会调用下游rpc,下游实际上会劣化,可能让线程池被用完,我们希望将这种风险大的东西给到网关做。下游劣化的时候直接扩容网关。
当然直接给业务方发uid+deviceId+bizName的mq也是合理的,能保证上行的顺序,但是问题就是不能给到ack,需要业务域自行push下行。
- 如何顺序上行
上面说到router层实际上是异步上行,且异步ack的,那么如何顺序上行呢?
这里提供一个思路,长连网关直接按uid+deviceId+bizName维度找mq partition写入,给到业务方,就能给到业务方顺序的上行了,但是坏处是没法ack回调,这对业务其实是没毛病的,可以让业务方和客户端自行商量,业务方自行下行ack。
- 如何顺序可靠下行

顺序下行,核心思路是一个sync server消息红点的站内push:
a. 所有业务业务按照uid hash,保证相同用户的消息在同一台机器上
b. 同意机器上,消息队列按照bizName做内存队列排队
c. uid的消息在同意台机器上按顺序推送到端上,端上对受到的消息做连续性检测发现空洞
- 如何可靠推送
device/或者uid维度的写扩散队列,推拉结合,检测gap主动拉补偿,拉到了客户端手动ack删除。消息下行采用写扩散,seq连续且递增检测空洞,拉齐消息。
- 如何实现req/ack保序性: 也就是客户端给req1 + req2,服务端回复是ack1 + ack2
前面讨论保序,已经提到了。
长连协议头调研
- wx mars长连协议

- 百度长连协议

长连协议选型

- signal信令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| message SignalFrame { oneof stream { SignalStreamData data = 100; AckStreamData ack = 101; } }
message SignalStreamData { AckMode ackMode = 1; string cmd = 2; bytes body = 3; }
message AckStreamData { int32 code = 1; string message = 2; bytes body = 3; }
enum AckMode { NON_ACK = 0; ACK_IMMEDIATELY = 1; ACK_AFTER_RESPONSE = 2; }
|
- data信令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| message DataFrame { oneof stream { CSStreamData data = 100; AckStreamData ack = 101; } }
message CSStreamData { AckMode ackMode = 1; string cmd = 2; string bizName = 3; bytes body = 4; map<string,string> extInfo = 5; string serviceId = 6; string alias = 7; }
|
- sync信令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| message SyncFrame { oneof stream { SCStreamData data = 100; AckFrameData ack = 101; } }
message SCStreamData { AckMode ackMode = 1; string bizName = 2; Event body = 3; int64 time = 4; }
message Event { bytes data = 1; string mid = 2; string cmd = 3; map<string,string> extInfo = 4; }
|
长连实现细节-登录
登录流程能够看到主要的存储和服务配合:
前面提到的transmission-service实际上是做消息上下行的,不处理设备信息管理,其实它只管理了socket的内存状态。连接状态是register-service那边的redis管理的。transmission-service调用了register的rpc。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| service RegisterService {
response.HeartResponse heart(1: required base.Context ctx,2: required string req);
response.LoginResponse login(1: required base.Context ctx,2: required string req);
response.OfflineResponse offline(1: required base.Context ctx,2: required string req);
response.BackgroundResponse background(1: required base.Context ctx,2: required string req);
response.ForegroundResponse foreground(1: required base.Context ctx,2: required string req);
response.BizRegisterResponse bizRegister(1: required base.Context ctx,2: required string req);
response.JoinResponse join(1: required base.Context ctx,2: required string req);
response.LeaveResponse leave(1: required base.Context ctx,2: required string req);
response.AddTagResponse addTag(1: required base.Context ctx,2: required string req);
response.RemoveTagResponse removeTag(1: required base.Context ctx,2: required string req);
response.TagHeartResponse tagHeart(1: required base.Context ctx,2: required string req);
}
|
这里简单看看java是如何处理长连信令的: java会使用netty注册信令, java在低版本没有协程,于是使用netty做长连网关,一个比较常见的写法是将包体解析出来然后publishEvent来处理, java是传统reactor epoll框架。
为了稳定性,不能在io事件循环里面做耗时的rpc调用(下游劣化了这个epoll会堵住,一个下游的劣化就能拖跨整个event loop)。所以这里必然用异步回调,但是这样执行命令的顺序性其实很有问题。如果要解决这两个问题(不阻塞eventloop + 顺序性),可以在内存中做一个uid+deviceId+bizname partition的内存队列, 调用下游服务,调用完后就runInLoop将ack写回。
go: 一个长连接用一个goroutine处理,调用下游纯用阻塞rpc,下游劣化问题不大,不会阻塞其它连接。
这里继续看看register-service的redis存储,按uid分片:
1 2 3 4 5 6 7 8 9 10 11
| type: hash key: {uid} field: {socketId:这个是auth服务生成的一个socketId,理解成一个唯一的id就行了} value: {json化的socket的更多信息,设备/app信息,userStatus,deviceId} expire: 300s
key: hash key: {uid}.{deviceId} field: {socketId} value: {json化的socket的更多信息,设备/app信息,userStatus,deviceId} 同上面的value expire: 300s
|
长连实现细节-心跳
实际上发生心跳包的时候直接调用register的heart rpc,此时可以刷新reset上面两个kv的过期时间。实际上任何包,都可以更新心跳,让register重新set这两个kv。
这里可以有一个优化,如果client.lastActiveTimePoint离当前小于50s,就不调用register了,防止频繁写redis。
实际上任何类型的包都会做lastActiveTimePoint的更新以及register heart rpc的调用,heart心跳包只是为了在没有进行交互的情况下“维持连接活性”,续上“连接租约”。
应用层的心跳是否是必要的? 是的。我这里从tcp和业务两个角度进行阐述:
- tcp层面: 实际上,现在中国的网络大多是nat,而nat的那个cache是比较短的,可能就十五分钟。此时tcp keepalive的配置也没啥用。

- 业务上: 假如我们把keep alive的时间设置的很小很小(远小于nat cache过期值),其实也不行。因为tcp正常只能说是协议栈正常,但是不能说应用正常(比如进程死锁了)。且tcp层面的keep alive心跳包没法触发业务逻辑。
长连实现细节-下行
目前我们register的存储很简单,两个hash见上,存了用户的设备信息,我们可以直接按uid,deviceId下行消息。通常我们可以发mq进行下行,要进行消息下行的业务方可以衡量触达速率,申请不同的mq(消费速率不同,衡量业务方的发送速率,看看部署多少下行网关)。

长连实现细节-tag
有一些场景,如给订阅某个关注页的人下推数据,给订阅某个设备属性(cpu+builder+xxx)进行热修等等此类场景。只要用户长连连上,然后端上触发了订阅tag,相当于在长连通道上添加了标签属性。
业务方可以按照这个标签属性,给订阅这部分tag的设备下推数据,在register redis存储那边看来,它看起来是一个hash:
1 2 3 4 5
| type: hash key: {bizName}.{tag} field: ip(这是长连网关的ip) value: time expire: 300s
|
加tag/remove tag就是写这个hash。并且长连网关transmission那里也会记录tag信息,使用了下面的结构(下面的信息存储在transmission server):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
| public class NamespaceClientBox { private final Map<String, BaseClient> ALL_CLIENTS = PlatformDependent.newConcurrentHashMap(); private final Map<MulticastIdInfo, ConcurrentSkipListSet<BaseClient>> MULTICAST_CLIENTS = PlatformDependent.newConcurrentHashMap(); private final Map<String, Map<String, ConcurrentSkipListSet<BaseClient>>> TAG_CLIENTS = PlatformDependent.newConcurrentHashMap();
public NamespaceClientBox() { }
public Map<String, BaseClient> getClient() { return this.ALL_CLIENTS; }
public Map<MulticastIdInfo, ConcurrentSkipListSet<BaseClient>> getMulticastClient() { return this.MULTICAST_CLIENTS; }
public Map<String, Map<String, ConcurrentSkipListSet<BaseClient>>> getAllTagClient() { return this.TAG_CLIENTS; }
public void addTagClient(String bizName, String tagId, BaseClient client) { client.getTagInfos().computeIfAbsent(bizName, (k) -> new ArrayList()); List<String> tagIds = (List)client.getTagInfos().get(bizName); if (!tagIds.contains(tagId)) { ((ConcurrentSkipListSet)((Map)this.TAG_CLIENTS.computeIfAbsent(bizName, (k) -> new ConcurrentHashMap())).computeIfAbsent(tagId, (k) -> new ConcurrentSkipListSet(Comparator.comparing(BaseClient::getSocketId, String::compareToIgnoreCase)))).add(client); tagIds.add(tagId); } }
public void removeTagClient(String bizName, String tagId, BaseClient client) { List<String> tagIds = (List)client.getTagInfos().get(bizName); if (tagIds != null && tagIds.remove(tagId)) { Map<String, ConcurrentSkipListSet<BaseClient>> tagClientMap = (Map)this.TAG_CLIENTS.get(bizName); if (tagClientMap != null) { ConcurrentSkipListSet<BaseClient> clients = (ConcurrentSkipListSet)tagClientMap.get(tagId); if (clients != null && clients.remove(client) && clients.isEmpty()) { tagClientMap.remove(tagId); }
} } }
public ConcurrentSkipListSet<BaseClient> getTagClients(String bizName, String tagId) { if (CollectionUtils.isEmpty(this.TAG_CLIENTS)) { return null; } else { Map<String, ConcurrentSkipListSet<BaseClient>> clientByTags = (Map)this.TAG_CLIENTS.get(bizName); return CollectionUtils.isEmpty(clientByTags) ? null : (ConcurrentSkipListSet)clientByTags.get(tagId); } }
public ConcurrentSkipListSet<BaseClient> delTagClients(String bizName, String tagId) { Map<String, ConcurrentSkipListSet<BaseClient>> tagIdClients = (Map)this.TAG_CLIENTS.get(bizName); return CollectionUtils.isEmpty(tagIdClients) ? null : (ConcurrentSkipListSet)tagIdClients.remove(tagId); }
public void addMulticastClient(MulticastIdInfo multicastIdInfo, BaseClient client) { if (!client.getMulticastIdInfos().contains(multicastIdInfo)) { ((ConcurrentSkipListSet)this.MULTICAST_CLIENTS.computeIfAbsent(multicastIdInfo, (k) -> new ConcurrentSkipListSet(Comparator.comparing(BaseClient::getSocketId, String::compareToIgnoreCase)))).add(client); client.getMulticastIdInfos().add(multicastIdInfo); } }
public void removeMulticastClient(MulticastIdInfo multicastIdInfo, BaseClient client) { if (client.getMulticastIdInfos().contains(multicastIdInfo)) { ConcurrentSkipListSet<BaseClient> clients = (ConcurrentSkipListSet)this.MULTICAST_CLIENTS.get(multicastIdInfo); if (!CollectionUtils.isEmpty(clients)) { boolean removed = clients.remove(client); if (removed) { client.getMulticastIdInfos().remove(multicastIdInfo); } }
} }
public ConcurrentSkipListSet<BaseClient> delMulticastClient(MulticastIdInfo multicastId) { return (ConcurrentSkipListSet)this.MULTICAST_CLIENTS.remove(multicastId); }
public ConcurrentSkipListSet<BaseClient> getMulticastClient(MulticastIdInfo multicastId) { return (ConcurrentSkipListSet)this.MULTICAST_CLIENTS.get(multicastId); }
public Collection<BaseClient> getAllClient() { return this.ALL_CLIENTS.values(); }
public void addClient(String socketId, BaseClient baseClient) { this.ALL_CLIENTS.put(socketId, baseClient); }
public void removeClient(String socketId) { this.ALL_CLIENTS.remove(socketId); }
public BaseClient getClient(String socketId) { return (BaseClient)this.ALL_CLIENTS.get(socketId); } }
|
数据架构图:

addTag步骤:
- rpc调用registerRemoteService.addTag(req): 也就是写上面提到的redis hash
- namespaceBox.addTagClient(tagId, client): transmission server存储本地tag信息
此外客户端应该定期tagHeart,re expire 续期 tag的这个过期时间,防止上面的300s过期,这里具体实现可以这样,收到客户端包的时候,做tagHeart的续期(下面的update在每次收到客户端报文后被调用,和heart一样有个防频繁调用redis的逻辑):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Override public void update(BaseClient client) { if (CollectionUtils.isEmpty(client.getTagInfos())) { return; }
if (System.currentTimeMillis() - client.getDeviceInfo().getTs() <= apolloSwitch.getTagHeartbeatInterval()) { return; }
try { tagHeartExecutorService.execute(() -> { TagRequestModel tagRequestModel = TagRequestModel.create(client, client.getTagInfos()); rtpCallbackRegisterService.tagHeart(tagRequestModel, new ICallback<Void>() { @Override public void onSuccess(Void unused) {
}
@Override public void onFail(int code, String message) {
} }); }); } catch (Exception ex) { log.warn("update tagHeart error, error:{}", ex.getMessage()); } }
|
removeTag有很细的细节,当客户端来removeTag时,要不要hdel呢? 实际上有个小trick:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
public boolean del(String key, String field, String value) { try { String luaScript = "local currentValue = redis.call('hget', KEYS[1], ARGV[1]) " + "if currentValue and ARGV[2] >= currentValue then " + " redis.call('hdel', KEYS[1], ARGV[1]) " + "end";
cacheClient.eval(luaScript, 1, key, field, value); return true; } catch (Exception ex) { log.warn("del error, key:{}, field:{}", key, field, ex); return false; } }
|
sync服务有一个按tag下行的接口:逻辑是从redis拿到tansmission server的ipport,然后调用transmission下行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| private void syncByTag(NamespaceClientBox namespaceClientBox, MulticastByTagDto dto) { try { ConcurrentSkipListSet<BaseClient> clients = namespaceClientBox.getTagClients(dto.getBizName(), dto.getTagId()); if (CollectionUtils.isEmpty(clients)) { MetricsUtils.counter("multicast_tag_sync").addTag("result", "client_empty").increment(dto.getSize()); multicastDataService.removeTagByIp(dto.getDomain(), namespaceClientBox, dto.getBizName(), dto.getTagId());
ApplicationContextProvider.getSyncStateService().record( dto.getSyncStateKeys(), null, null, null, EnumAckStatus.NO_CHANNEL, dto.getTime(), false); return; }
clients.forEach(client -> { if (FilterUtil.selectPush(client, dto.getFilter())) { EnumAckStatus state = syncManager.sync(client, dto.getBizName(), dto.getPackets()); if (state == EnumAckStatus.WRITE_TO_CHANNEL) { MetricsUtils.counter("multicast_tag_sync").addTag("result", EnumAckStatus.WRITE_TO_CHANNEL.name()).increment(dto.getSize()); return; }
MetricsUtils.counter("multicast_tag_sync").addTag("result", state.name()).increment(dto.getSize()); ApplicationContextProvider.getSyncStateService().record( dto.getSyncStateKeys(), client.getSocketId(), client.getLoginUser().getAlias(), client.getPlatformType(), state, dto.getTime(), false); return; }
MetricsUtils.counter("multicast_tag_sync").addTag("result", EnumAckStatus.FILTER.name()).increment(dto.getSize()); ApplicationContextProvider.getSyncStateService().record( dto.getSyncStateKeys(), client.getSocketId(), client.getLoginUser().getAlias(), client.getPlatformType(), EnumAckStatus.FILTER, dto.getTime(), false); }); } catch (Exception ex) { log.warn("process tagId:{} error", dto.getTagId(), ex); } }
|
下一章
讲im业务层如何接入长连网关这个“中台能力”,实现一个私信群聊。