消息推送架构-Based-GOIM
GOIM 是Go实现的消息推送的分布式服务,易于扩容伸缩,使用了bilibili/discovery来支持服务发现。
本文的重点,主要梳理了GOIM的架构,消息流转和消息处理。本文没有提到Comet的具体逻辑,套接字编程和RingBuffer等,但是Comet的复杂度远高于其他两个网元,因此强烈建议阅读Comet源码,应该会对Go网络编程有更多认识。
GOIM 是Go实现的消息推送的分布式服务,易于扩容伸缩,使用了bilibili/discovery来支持服务发现。相较于我之前用Socket.IO做的信令服务,优点在于更优雅的扩容,将连接层和逻辑层分离,职责更清晰。当然缺点也有(没有和具体实现解耦,如MQ的选型,导致不够灵活;客户端非全双工通信,TCP利用率偏低,这点并不全是缺点,好处是:消息流转清晰,职责非常明确),这部分可以自己做定制(最后的参考文献2中讲很多)。
架构图
总的来说,整个应用的架构如下
这里省略了其中用于服务发现的“bilibili/discovery”。在整个GOIM中用到服务发现的主要是gRPC和消息推送关系查找。
如上图:
- Comet负责建立和维持客户端的长连接;
- Job负责消息的分发;
- Logic提供三种纬度的消息(全局,ROOM,用户)投递,还包括业务逻辑,Session管理。
消息流转
从上述的架构图中可以知道,消息是通过HTTP调用Logic产生的,然后用MQ来中转(存储,削峰);每个Job成员都从给队列中消费消息,投递给一个或者多个Comet,由Comet将消息发送给客户端。
生成消息
目前在Github上的GOIM版本,消息(除鉴权/心跳等基础数据包外)生成都是由Logic完成第一手处理,Logic提供了HTTP接口以支持消息发送能力,主要有三个纬度:用户,房间,全应用广播,如下:
curl -d 'mid message' http://api.goim.io:3111/goim/push/mids?operation=1000&mids=123
curl -d 'room message' http://api.goim.io:3111/goim/push/room?operation=1000&type=live&room=1000
curl -d 'broadcast message' http://api.goim.io:3111/goim/push/all?operation=1000
在Logic服务中会通过处理,将消息处理成**附#消息格式#任务队列消息**的格式,然后投递到MQ中。其中三种纬度的消息处理稍有不同:
用户
// goim/internal/logic/push.go
// mid => []PushMsg{op, server, keys, msg}
func (l *Logic) PushMids(c context.Context, op int32, mids []int64, msg []byte) (err error) {
// 根据用户ID获取所有的 key:server 对应关系;在redis中是一个hash
keyServers, _, err := l.dao.KeysByMids(c, mids)
// ...
keys := make(map[string][]string)
for key, server := range keyServers {
// ...
keys[server] = append(keys[server], key)
}
for server, keys := range keys {
// 通过DAO组装PushMsg投递给MQ
if err = l.dao.PushMsg(c, op, server, keys, msg); err != nil {
return
}
}
return
}
房间 没什么特别的处理
// goim/internal/logic/push.go
func (l *Logic) PushRoom(c context.Context, op int32, typ, room string, msg []byte) (err error) {
return l.dao.BroadcastRoomMsg(c, op, model.EncodeRoomKey(typ, room), msg)
}
// // goim/internal/logic/dao
func (d *Dao) BroadcastRoomMsg(c context.Context, op int32, room string, msg []byte) (err error) {
pushMsg := &pb.PushMsg{
Type: pb.PushMsg_ROOM,
Operation: op,
Room: room,
Msg: msg,
}
b, err := proto.Marshal(pushMsg)
// ...
if err := d.nsqProducer.Publish(d.c.Nsq.Topic, b); err != nil {
log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)
}
return
}
广播 没什么特别的处理
// goim/internal/logic/push.go
func (l *Logic) PushAll(c context.Context, op, speed int32, msg []byte) (err error) {
return l.dao.BroadcastMsg(c, op, speed, msg)
}
// goim/internal/logic/dao
func (d *Dao) BroadcastMsg(c context.Context, op, speed int32, msg []byte) (err error) {
pushMsg := &pb.PushMsg{
Type: pb.PushMsg_BROADCAST,
Operation: op,
Speed: speed, // 这里需要去到Job才知道speed的具体功效
Msg: msg,
}
b, err := proto.Marshal(pushMsg)
if err != nil {
return
}
if err := d.nsqProducer.Publish(d.c.Nsq.Topic, b); err != nil {
log.Errorf("PushMsg.send(push pushMsg:%v) error(%v)", pushMsg, err)
}
return
}
小结:
- 针对用户单发时,会获取到具体的sever和keys组装到PushMsg
- 房间消息,没有server和keys, 但是多一个room是通过typ和roomID组装而成的 “live://1000”
- 广播消息,除了消息体之外,另外有一个speed字段
传输消息
由Logic处理好的消息会放在MQ中,Job任务会自动消费消息,然后通过gRPC调用Comet单元。相比其他两个网元,Job就简单多了。从MQ中消费到消息后会调用c.job.push(ctx, pushMsg)
。
// job 发送消息(普通消息,房间消息,广播)
func (j *Job) push(ctx context.Context, pushMsg *pb.PushMsg) (err error) {
switch pushMsg.Type {
case pb.PushMsg_PUSH:
err = j.pushKeys(pushMsg.Operation, pushMsg.Server, pushMsg.Keys, pushMsg.Msg)
case pb.PushMsg_ROOM:
// 获取一个job中的Room缓存,用于房间内“定时,定量”发送消息,减少请求次数
// 这里调用的Push并不会立即发送,而是放在Room.proto这个channel中
// 实际放松是由Room.pushproc来定时
err = j.getRoom(pushMsg.Room).Push(pushMsg.Operation, pushMsg.Msg)
case pb.PushMsg_BROADCAST:
err = j.broadcast(pushMsg.Operation, pushMsg.Msg, pushMsg.Speed)
default:
err = fmt.Errorf("no match push type: %s", pushMsg.Type)
}
return
}
// 根据serverID发送给特定的Comet服务,避免广播
// cometServers 是由discovery服务发现维护的comet列表。
func (j *Job) pushKeys(operation int32, serverID string, subKeys []string, body []byte) (err error) {
buf := bytes.NewWriterSize(len(body) + 64)
p := &comet.Proto{
Ver: 1,
Op: operation,
Body: body,
}
p.WriteTo(buf)
p.Body = buf.Buffer()
p.Op = comet.OpRaw
var args = comet.PushMsgReq{
Keys: subKeys,
ProtoOp: operation,
Proto: p,
}
if c, ok := j.cometServers[serverID]; ok {
if err = c.Push(&args); err != nil {
log.Errorf("c.Push(%v) serverID:%s error(%v)", args, serverID, err)
}
log.Infof("pushKey:%s comets:%d", serverID, len(j.cometServers))
}
return
}
// 处理成一个BroadcastReq,并广播给所有的Comet
func (j *Job) broadcast(operation int32, body []byte, speed int32) (err error) {
// ... 与pushKeys一致,生成一个p
comets := j.cometServers
// 如 speed = 64, len(comets) = 2, speed = 32
speed /= int32(len(comets))
var args = comet.BroadcastReq{
ProtoOp: operation,
Proto: p,
Speed: speed, // 是被传递给Comet处理,继续跟踪
}
for serverID, c := range comets {
if err = c.Broadcast(&args); err != nil {
log.Errorf("c.Broadcast(%v) serverID:%s error(%v)", args, serverID, err)
}
}
log.Infof("broadcast comets:%d", len(comets))
return
}
房间消息处理:
getRoom(roomID) -> room.Push() -> p -> room.proto
|
|---> NewRoom(batch, duration)
|
|---> go room.pushproc() -> p <- room.proto
// goim/internal/job/room.go
type Room struct {
c *conf.Room // 关于房间的配置
job *Job // 绑定job,为了追溯Room所属的Job
id string // 房间ID
proto chan *comet.Proto // 有缓冲channel
}
// pushproc merge proto and push msgs in batch.
// 默认batch = 20, sigTime = 1s
func (r *Room) pushproc(batch int, sigTime time.Duration) {
var (
n int
last time.Time
p *comet.Proto
buf = bytes.NewWriterSize(int(comet.MaxBodySize)) // 4096B = 4KB
)
// 设置了一个定时器,在一定时间后往room.proto放送一个roomReadyProto信号。
td := time.AfterFunc(sigTime, func() {
select {
case r.proto <- roomReadyProto:
default:
}
})
defer td.Stop()
for {
if p = <-r.proto; p == nil {
// 如果创建了room,但是读到空包
break // exit
} else if p != roomReadyProto {
// 读取room.proto 如果是正常的数据包,则合并到buf中去,如果满了怎么办?
p.WriteTo(buf)
// 如果是第一个数据包,则重置定时器,并继续读取后续数据包
if n++; n == 1 {
last = time.Now()
td.Reset(sigTime)
continue
} else if n < batch {
// 后续的数据包,不会重置定时器,但是如果时间仍在第一个数据包的 sigTime 时间间隔内
// 简单说,定时器还没到时间
if sigTime > time.Since(last) {
continue
}
}
// 累计的数据包数量已经超过了batch, 执行发送动作
} else {
// 定时器到读到了roomReadyProto
// 如果buf已经被重置了,则跳出循环执行清理动作;否则执行发送消息
if n == 0 {
break
}
}
// 发送房间内的消息
_ = r.job.broadcastRoomRawBytes(r.id, buf.Buffer())
buf = bytes.NewWriterSize(buf.Size())
n = 0
// 如果配置了房间最大闲置时间,则重新设定定时器
// 也就是说,如果房间被创建后,处理完了该房间的消息,并不是直接跳出循环清理房间
// 而是,会阻塞等待下一次的消息再来,如果在 “1m / r.c.Idle” 时间内没有来,则会跳出循环清理掉该房间
// 如果在 “1m / r.c.Idle” 内有消息,则会重新设定定时器为sigTime,并为proto计数
if r.c.Idle != 0 {
td.Reset(time.Duration(r.c.Idle)) // 默认15分钟
} else {
td.Reset(time.Minute)
}
}
// 清理动作
r.job.delRoom(r.id)
}
总结如下图:
小结:
- 针对用户单发时,会直接发送到对应的comet服务,根据key再去给特定的channel发送消息
- 房间消息,这个会特殊一些,Job持有一个特殊的Room结构,用于合并发送到该房间的消息,定时定量发送房间消息(好处是,减少了gRPC调用次数降低系统负载,缺点增加时消息延迟)
- 广播消息,将消息封装到
BroadcastPushReq
中,然后直接发送给所有的Comet
投递消息
Comet接收到Job单元的gRPC调用之后,会将消息通过Websocket套接字按照GOIM约定的协议格式发送给指定客户端。从Job那边传输过来的消息,依旧是分为用户消息,房间消息,全局消息。这里得先说明下Comet是如何管理用户端的长连接,如下图:
Bucket是在一个管理Channel和Room的数据结构,它的作用在于使用了hash来将Channel做分片管理,相较于集中管理,这样channel分布在不同的bucket中而不是一个map,可以降低冲突,减小锁的粒度。
有了上述结构,那么消息发送在忽略传输层的情况下:
针对用户单发
调用链路为:comet.Bucket(key).Channel(key).Push(proto),这里Push也只是将proto放在了channel的队列中(10缓冲),消息的下发在goim/internal/comet/server_websocket.go#dispatchWebsocket
。
房间消息
在Comet内部遍历Buckets并调用Bucket.BroadcastRoom(),但是这里也只是把消息放到了“某处”,并没有直接发送。实际发送代码在goim/internal/comet/bucket.go#roomproc
。
// BroadcastRoom broadcast a message to specified room
func (b *Bucket) BroadcastRoom(arg *grpc.BroadcastRoomReq) {
// 这里取模选中一个goroutine来执行任务
num := atomic.AddUint64(&b.routinesNum, 1) % b.c.RoutineAmount
// b.routines 是 b.c.RoutineAmount 数量的 有 b.c.RoutineSize 缓冲大小的 chan *grpc.BroadcastRoomReq
b.routines[num] <- arg
}
// 在创建Bucket时,便开启了goroutine来处理
func (b *Bucket) roomproc(c chan *grpc.BroadcastRoomReq) {
for {
arg := <-c
if room := b.Room(arg.RoomID); room != nil {
room.Push(arg.Proto)
}
}
}
// 遍历房间内的channel的链表,将消息放到channel的发送队列中,又回到了channel消息单发的逻辑。
func (r *Room) Push(p *grpc.Proto) {
r.rLock.RLock()
for ch := r.next; ch != nil; ch = ch.Next {
_ = ch.Push(p)
}
r.rLock.RUnlock()
}
广播消息
在Comet内部遍历Buckets并向Bucket中的所有Channel发送消息。这里终于用到了speed,上文提到过,如果设定speed = 64, len(comets) = 2, 那么这里用到的 speed = 32。
// Broadcast broadcast msg to all user.
func (s *server) Broadcast(ctx context.Context, req *pb.BroadcastReq) (*pb.BroadcastReply, error) {
if req.Proto == nil {
return nil, errors.ErrBroadCastArg
}
go func() {
for _, bucket := range s.srv.Buckets() {
bucket.Broadcast(req.GetProto(), req.ProtoOp)
if req.Speed > 0 {
//该bucket
// 有0个channel时,t = 0 / 32 = 0
// 有2个channel时, t = 2 / 32 = 0.0625
// 有32个channel时, t = 32 / 32 = 1
// 有64个channel时,t = 64 / 32 = 2
// 由此可得,(comet)speed 的含义是 每个bucket每秒最多发送的消息数量
t := bucket.ChannelCount() / int(req.Speed)
time.Sleep(time.Duration(t) * time.Second)
}
}
}()
return &pb.BroadcastReply{}, nil
}
// 广播,直接从bucket.chs中遍历
func (b *Bucket) Broadcast(p *grpc.Proto, op int32) {
var ch *Channel
b.cLock.RLock()
for _, ch = range b.chs {
// 如果不在该channel的监听队列中,那么该消息不会发给该客户端
// 这个监听队列,是在建立连接是发送的 “accepts” 字段中取得的
// 譬如accpets = [1000, 1001, 1002], 但是op = 1003, 那么就不会发送
//
// 值得注意的是,这个op是从BroadcastReq.ProtoOp取得,BroadcastReq.ProtoOp又是从pushMsg.Operation取得
// 也就是说 op = grpc.BroadcastReq.ProtoOp = proto.Op = PushMsg.Operation = 从发送消息接口产生。
//
if !ch.NeedPush(op) {
continue
}
_ = ch.Push(p)
}
b.cLock.RUnlock()
}
小结:
- 针对用户单发时,直接利用key定位到Bucket和Channel,将消息放到队列中。
- 房间消息,将消息分配到房间协程之一的队列中,在该协程中会持续不断的消费消费消息并处理,处理动作是将消息分发到Channel的消息队列(buffered chan)上。
- 广播消息,直接使用了bucket的chs遍历,来为每一个Channel推送一条消息到消息队列上。
附
这里会展示GOIM中必要的数据结构,帮助理解GOIM中的数据流转过程。 这里会出现几个名词:
- server: comet服务的hostname (string)
- mid: 用户在业务中的ID (int64)
- key: 用户在GOIM中的唯一ID (string)
Session结构
Session由Redis管理,维持了客户端MID,Server,Key的关系,这部分是在Logic中gRPC服务的Connect
方法中设置。如下图所示:
// goim/internal/logic/conn.go
func (l *Logic) Connect(c context.Context, server, cookie string, token []byte) (mid int64, key, roomID string, accepts []int32, hb int64, err error) {
var params struct {
Mid int64 `json:"mid"` // 用户ID
Key string `json:"key"` // 客户端标识别,如果为空则自动生成UUID
RoomID string `json:"room_id"` // 客户端加入房间
Platform string `json:"platform"`// 客户端所在平台
Accepts []int32 `json:"accepts"` // 监听房间
}
if err = json.Unmarshal(token, ¶ms); err != nil {
log.Errorf("json.Unmarshal(%s) error(%v)", token, err)
return
}
mid = params.Mid
roomID = params.RoomID
accepts = params.Accepts
hb = int64(l.c.Node.Heartbeat) * int64(l.c.Node.HeartbeatMax)
if key = params.Key; key == "" {
key = uuid.New().String()
}
if err = l.dao.AddMapping(c, mid, key, server); err != nil {
log.Errorf("l.dao.AddMapping(%d,%s,%s) error(%v)", mid, key, server, err)
}
log.Infof("conn connected key:%s server:%s mid:%d token:%s", key, server, mid, token)
return
}
// goim/internal/logic/dao/redis.go
//
func (d *Dao) AddMapping(c context.Context, mid int64, key, server string) (err error) {
// ...
if mid > 0 {
if err = conn.Send("HSET", keyMidServer(mid), key, server); err != nil {
log.Errorf("conn.Send(HSET %d,%s,%s) error(%v)", mid, server, key, err)
return
}
if err = conn.Send("EXPIRE", keyMidServer(mid), d.redisExpire); err != nil {
log.Errorf("conn.Send(EXPIRE %d,%s,%s) error(%v)", mid, key, server, err)
return
}
// ...
}
if err = conn.Send("SET", keyKeyServer(key), server); err != nil {
log.Errorf("conn.Send(HSET %d,%s,%s) error(%v)", mid, server, key, err)
return
}
if err = conn.Send("EXPIRE", keyKeyServer(key), d.redisExpire); err != nil {
log.Errorf("conn.Send(EXPIRE %d,%s,%s) error(%v)", mid, key, server, err)
return
}
// ...
}
从AddMapping
方法中,总结下得到:
如果(mid=1, key=69dafe8b58066478aea48f3d0f384820,server=comet.001)
mid_1 = {69dafe8b58066478aea48f3d0f384820: comet.001}
key_69dafe8b58066478aea48f3d0f384820 = "comet.001"
也就是说,同一个用户可以在多个地方同时连入系统;同时也能看出来,Session管理并不包括用户所在的房间,用户需要接受哪些房间的消息,这部分是在是在Logic.Connect
处理好了之后通过gRPC响应,交给Comet处理的。
// goim/internal/comet/server_websocket.go
func (s *Server) ServeWebsocket(conn net.Conn, rp, wp *bytes.Pool, tr *xtime.Timer) {
// ...
if p, err = ch.CliProto.Set(); err == nil {
if ch.Mid, ch.Key, rid, accepts, hb, err = s.authWebsocket(ctx, ws, p, req.Header.Get("Cookie")); err == nil {
ch.Watch(accepts...) // 监听房间列表
b = s.Bucket(ch.Key) // 根据用户key选择一个bucket (对key做cityhash再取模)
err = b.Put(rid, ch) // 将用户ID和连接Channel维护到Bucket中
if conf.Conf.Debug {
log.Infof("websocket connnected key:%s mid:%d proto:%+v", ch.Key, ch.Mid, p)
}
}
}
// ...
}
// auth for goim handshake with client, use rsa & aes.
func (s *Server) authWebsocket(ctx context.Context, ws *websocket.Conn, p *grpc.Proto, cookie string) (mid int64, key, rid string, accepts []int32, hb time.Duration, err error) {
for {
if err = p.ReadWebsocket(ws); err != nil {
return
}
if p.Op == grpc.OpAuth {
break
} else {
log.Errorf("ws request operation(%d) not auth", p.Op)
}
}
// rid roomID
if mid, key, rid, accepts, hb, err = s.Connect(ctx, p, cookie); err != nil {
return
}
p.Op = grpc.OpAuthReply
p.Body = nil
if err = p.WriteWebsocket(ws); err != nil {
return
}
err = ws.Flush()
return
}
消息格式
1 - 任务队列消息:
不管是个人消息,还是房间消息和广播消息,都是用的如下结构;其中Op和Type可以帮助Job单元可以针对消息上做差异化的处理。
type PushMsg struct {
Type PushMsg_Type // 消息类型,个人,房间广播,广播
Operation int32 // 指令 goim/api/comet/grpc/operation.go
Speed int32 // 广播时用 TODO:
Server string // Comet的Hostname, 个人消息时指定
Room string // 房间号
Keys []string // bucket key
Msg []byte // 消息体
}
2 - GOIM消息协议:
区别于任务队列消息,这个条消息是客户端实际收到的消息(对比可以发现,其中只有Op和Body是从Logic单元传递过来的,其他字段很大一部分用于分流(定位Comet/Bucket/Room/Channel),或者系统字段用于差异化处理消息):
type Proto struct {
Ver int32 // 版本号
Op int32 // 消息类型,如Ping,Pong, Text
Seq int32 // 序列号 TODO:
Body []byte // 消息体 等于 PushMsg.Msg
}
服务发现
服务发现可以帮助整个应用发现Comet单元和Logic单元,而Job单元并不需要注册自己(不需要被发现)。当然可以没有服务发现功能,直接在代码和配置中配置好(Comet/Logic)服务地址,但是也就失去了动态扩容的能力。另外,如果是K8S部署,这里的服务发现功能就有点冗余了,因此需要做一些调整再用K8S部署,调整包括(服务注册和发现抽象即于discovery结耦,可选开启;对于Comet的gRPC调用,在针对用户单发消息时,需要从定向单播变成广播)。
总结
- GOIM将整个应用职责,分配给三个单个独立网元来承担相应的工作,让流程更清晰,应用也易于扩展(动态)。
- 在用户和长连接的映射上,使用了Key来区别应用用户和业务用户,得以支持单个用户同时登陆(多平台)的场景;另外key也作为了Comet网元定位用户的唯一标识;利用了bucket + cityhash来降低竞争,加速用户定位并发送消息。
- 在房间消息的处理上,出于消息频繁和业务场景的考虑:在Job上为房间消息增加了数据包合并机制;在Comet层为每一个Bucket都创建了一定数量的goroutine来持续处理房间消息。这两个动作,都能提高整个应用对于房间消息处理能力,提升吞吐量。
- 从Job端将消息分发到Comet时,除了单个用户的消息能够指定Comet以外,其他的消息都只能广播给所有的Comet处理。
水平有限,如有错误,欢迎勘误指正🙏。