消息推送架构-Based-GOIM
April 2, 2020
本文的重点,主要梳理了GOIM的架构,消息流转和消息处理。本文没有提到Comet的具体逻辑,套接字编程和RingBuffer等,但是Comet的复杂度远高于其他两个网元,因此强烈建议阅读Comet源码,应该会对Go网络编程有更多认识。
GOIM 是Go实现的消息推送的分布式服务,易于扩容伸缩,使用了bilibili/discovery来支持服务发现。相较于我之前用Socket.IO做的信令服务,优点在于更优雅的扩容,将连接层和逻辑层分离,职责更清晰。当然缺点也有(没有和具体实现解耦,如MQ的选型,导致不够灵活;客户端非全双工通信,TCP利用率偏低,这点并不全是缺点,好处是:消息流转清晰,职责非常明确),这部分可以自己做定制(最后的参考文献2中讲很多)。
架构图 #
总的来说,整个应用的架构如下
这里省略了其中用于服务发现的“bilibili/discovery”。在整个GOIM中用到服务发现的主要是gRPC和消息推送关系查找。
如上图:
- Comet负责建立和维持客户端的长连接;
- Job负责消息的分发;
- Logic提供三种纬度的消息(全局,ROOM,用户)投递,还包括业务逻辑,Session管理。
消息流转 #
从上述的架构图中可以知道,消息是通过HTTP调用Logic产生的,然后用MQ来中转(存储,削峰);每个Job成员都从给队列中消费消息,投递给一个或者多个Comet,由Comet将消息发送给客户端。
生成消息 #
目前在Github上的GOIM版本,消息(除鉴权/心跳等基础数据包外)生成都是由Logic完成第一手处理,Logic提供了HTTP接口以支持消息发送能力,主要有三个纬度:用户,房间,全应用广播,如下:
curl -d 'mid message' http://api.goim.io:3111/goim/push/mids?operation=1000&mids=123
curl -d 'room message' http://api.goim.io:3111/goim/push/room?operation=1000&type=live&room=1000
curl -d 'broadcast message' http://api.goim.io:3111/goim/push/all?operation=1000
在Logic服务中会通过处理,将消息处理成**附#消息格式#任务队列消息**的格式,然后投递到MQ中。其中三种纬度的消息处理稍有不同:
用户
// goim/internal/logic/push.go
// mid => []PushMsg{op, server, keys, msg}
func (l *Logic) PushMids(c context.Context, op int32, mids []int64, msg []byte) (err error) {
// 根据用户ID获取所有的 key:server 对应关系;在redis中是一个hash
keyServers, _, err := l.dao.KeysByMids(c, mids)
// ...
keys := make(map[string][]string)
for key, server := range keyServers {
// ...
keys[server] = append(keys[server], key)
}
for server, keys := range keys {
// 通过DAO组装PushMsg投递给MQ
if err = l.dao.PushMsg(c, op, server, keys, msg); err != nil {
return
}
}
return
}
房间 没什么特别的处理
...