基于socket.io构建即时通讯系统

基于socket.io构建即时通讯系统

本意在总结实现socket.io-app过程中的一些知识。

背景 #

现需要替换公司的即时通信框架(之前用的是阿里云的微消息队列,优点在于:简单易接入,问题在于:对于人数和客户端状态感知不够准确,原因后面细说)。在框架选型的时候,基于应用场景(客户端有:小程序/nodeJS/浏览器),有三种方案:

  1. 替换MQTT的架子,针对现有场景下的问题,选用一款更加可控的MQTT服务,如EMQX
  2. 基于现有的技术栈,选择一款golang开发的开源框架,在此基础上进行开发,如goim
  3. 一个大众且稳定的开源框架,语言不限,如socket.io

综合一系列因素(技术熟悉程度/star数/时间成本/金钱成本/运维成本)选择了socket.io / JS🐶~

这里想推荐一下goimgolang程序猿。个人看法:goim对团队技术栈更友好;对分布式更友好;架构合理易于扩展;如有兴趣可以去看下:https://juejin.im/post/5cbb9e68e51d456e51614aab @tsingson 的解析,结合源码更容易理解上手。


关于socket.io的介绍 #

文档在此socket.io

通信机制及特点 #

特点 摘要
可用性 提供 long-polling 和 WebSocket 两种方式,可以自动升级,基于engine.io
自动重连 客户端会一直重连,直到再次链接上服务器
断线检测 客户端和服务端通过心跳维持长链接
二进制消息 任意被序列化的数据结构都可以传输
多路复用 对不同的Namespace,复用底层链接
内置Room概念 理解为聊天室和即可
基于websocket,但不互通 https://github.com/socketio/socket.io-protocol

总结一下:socket.io 已经提供了即时通讯必要的基础,不用关心任何通讯相关的细节,开箱即用。

分布式 #

确认socket.io已经提供了通讯基础,那么问题来了:单机性能有限,如何扩展到分布式呢? socket.io的另一个优点adapter机制让socket.io易于扩展,官方提供了redis-adapter来支持消息分发。

By running socket.io with the socket.io-redis adapter you can run multiple socket.io instances in different processes or servers that can all broadcast and emit events to and from each other.

注意:redis-adapter会同步所有的广播和emit事件

const io = require('socket.io')(3000);
const redisAdapter = require('socket.io-redis');
io.adapter(redisAdapter({ host: 'localhost', port: 6379 }));

事件机制 #

如下:

var io = require('socket.io')(80);
var chat = io
  .of('/chat')
  .on('connection', function (socket) {
    socket.emit('event1', {
        that: 'only'
      , '/chat': 'will get'
    });
    chat.emit('event2', {
        everyone: 'in'
      , '/chat': 'will get'
    });
  });

var news = io
  .of('/news')
  .on('connection', function (socket) {
    socket.emit('item', { news: 'item' });
  });

除了内置保留的事件(connect message disconnect error ping pong等)外,可以自定义任何事件名,用于业务逻辑。

这里只是简单介绍了一下socket.io,可以前往官网获取更多细节。

事件机制对与客户端状态和人数的作用 #

前面说过使用阿里云微消息队列无法准确获取到客户端状态和数量的问题,在阿里云文档中提到了:异步上下线通知因为采用消息解耦,状态判断更加复杂,且误判可能性更大,但该方法可以基于事件分析多个客户端的运行状态轨迹。异步上下线正是之前用于统计在线人数和客户端状态选择的方案。在实际使用的时候,能很直观的发现,上下线消息不能保证正确的顺序,这也导致了计算人数和状态时候出错~


基于socket-io的APP设计 #

基于业务场景,要求app的功能有:

1. 客户端和服务端全双工方式通信
2. 同一Namesapce下,相同用户ID仅能存在一个
3. 服务端正常触发上线/下线,用于维系客户端状态
4. 提供RPC调用方式,用于业务服务器与客户端交互
5. 提供客户端鉴权,非法客户端和未鉴权客户端会被主动关闭连接
6. Namespace可动态创建
7. 要求一定的监控数据,如链接数量,房间数量,消息数量等等。

应用对接 #

客户端生存周期泳道图 #


实现过程中的问题 #

socket.io是对长连接服务的,在分布式服务的场景下,虽然有redis-adapter可以方便的做消息分发到不同的服务实例上去,如果不是emit相关的事件则不支持分布式。下面列举了一部分关于分布式时需要解决的问题:

Sticky Session #

因为socket.io同时支持了long-pollingwebsocket,并自带自动升级功能。因此存在升级过程中,存在HTTP协议升级时,链接到不同服务器,导致无法完成升级过程。解决办法有两种:

  1. 使用nginx / ip_hash或类似功能,如果有自己的LB或者网关,可以自定义这个规则,保证负载均衡的同时保证同一连接接在同一服务器上完成升级过程。
  2. 配置 transports 为仅 websocket,规避升级过程。
var io = require('socket.io-client')
var client = io('https://myhost.com', {transports: ['websocket']})

https://socket.io/docs/internals/#socket-io-client

RPC调用如何扩展到分布式 #

需RPC提供的功能有踢人断开与客户端的连接清空房间之类。这一类命令不会调用emit / broadcast,因此也不能通过redis-adapter来实现分发。这里可以借鉴redis-adapter消息分发的思想,通过 redis PUB/SUB 来分发RPC命令,由此扩展到分布式。举例如下:

interface IRpcCommand {
    evt: rpcCommandEvt
    meta: any
}

class grpcService {
    /*
    * 订阅redis channel,并分发命令
    */
    private _subscribeCommandsToRedis() {
        this._sub.on("message", (ch: string, message: string) => {
            logger.info("recv an command from: ", ch, message)
            try {
                let command: IRpcCommand = JSON.parse(message)
                this._call(command)
            } catch (error) {
                logger.error("could not execute message command: ", error)
            }
        })

        this._sub.SUBSCRIBE(gRPCService.pubsubTopic())
    }

     private _call(command: IRpcCommand) {
        switch (command.evt) {
            /* ignore other cases */
            case rpcCommandEvt.clearRooms:
                let d6: clearRoomMeta = command.meta as clearRoomMeta
                this._clearRooms(d6.nspName, d6.roomIds)
                break
            default:
                logger.error("undefined command evt")
        }
    }

    /* ignore some codes */

    private _clearRooms = (nspName: string, roomIds: string[]) => {
        this._socketioSrv.clearRooms(nspName, roomIds)
    }

    /**
     * clearRoom
     */
    clearRooms = (call: grpc.ServerUnaryCall<api_pb.ClearRoomsReq>, cb: grpc.requestCallback<api_pb.ClearRoomsResp>) => {
        let roomIds = call.request.getRoomidsList()
        let nspName = call.request.getNspname()
        let resp = new api_pb.ClearRoomsResp()

        let meta: clearRoomMeta = { nspName, roomIds }
        this._publishCommandsToRedis(rpcCommandEvt.clearRooms, meta)

        resp.setErrcode(codes.OK)
        resp.setErrmsg(getMessage(codes.OK))
        cb(null, resp)
    }
}

Namespace下唯一用户ID如何保证 #

这个需求简单的就是说,业务逻辑上想对客户端加以限制,不能让同一个用户开启多个客户端来交互。逻辑很简单:每个连接鉴权的时候,去查询是否该用户ID已经存在Session了,如果存在则把之前的连接断开。同样的在分布式场景下:用户A分配到了S1服务器,用户A在其他地方S2服务器又建立一个新的连接,那么S2如何把S1上的连接断开呢?

说过上面RPC的命令分发之后,这里简单了:只需要触发一次RPC的disconnect事件就行了。

NodeJS + gRPC + TypeScript 实战 #

😭😭😭先哭为敬!想起来就心酸。核心问题是: 1. 用什么工具给proto文件生成ts文件;2. ts 中如何使用gRPC;3. ts 配置问题;其他;这里直接祭出最终结果:

生成工具 #
# generate js code for grpc proto file
gen-js:
	./node_modules/.bin/grpc_tools_node_protoc \
		--js_out=import_style=commonjs,binary:./src/codegen/api/ \
		--grpc_out=./src/codegen/api/ \
		--plugin=protoc-gen-grpc=./node_modules/.bin/grpc_tools_node_protoc_plugin \
		--proto_path=./src/api/ \
		api.proto

# generate ts code for grpc messages and service
gen-ts:
	protoc \
		--plugin=protoc-gen-ts=./node_modules/.bin/protoc-gen-ts \
		--ts_out=./src/codegen/api \
		--proto_path=./src/api/ \
		api.proto
ts中grpc.Server绑定service #
// https://github.com/yeqown/socket.io-app/blob/master/src/server/grpc.ts
serve = () => {
    // const _protoPath = '../api/api.proto'
    // const _protoDescriptor: GrpcObject = grpc.load(_protoPath);
    // const service = ;
    let _srv = new grpc.Server()
    _srv.addService(grpc_pb.SocketMServiceService, {
        nspBroadcast: this.nspBroadcast,
        nspRoomsBroadcast: this.nspRoomsBroadcast,
        nspUsersBroadcast: this.nspUsersBroadcast,
        disconnect: this.disconnect,
        knockoutFromRoom: this.knockoutFromRoom,
        clearRooms: this.clearRooms,
    })

    /* ignore some codes */

    _srv.bind("0.0.0.0:" + this.port.toString(), grpc.ServerCredentials.createInsecure())
    _srv.start()
}

总结 #

  1. socket.io 已经很全面了,在此基础上只需要接入自己的业务逻辑就可以了。
  2. redis 真好用
  3. TS 真好用,只是部分工具跟不上~
  4. 开源了一个基于socket.io的app架子,socket.io-app,欢迎交流
  5. redis-adapter 会主动分发emit, broadcast指令
  6. socket.io的协议升级期间需要保证在同一台服务器上完成

水平有限,如有错误,欢迎勘误指正🙏。

参考资料 #

访问量 访客数