NATS设计与实现 #
NATS 就是一个消息中间件,提供了 Pub/Sub 核心数据流,并基于此构建了 Request/Reply API 和 JetStream 用来提供可靠的分布式存储能力,和更高的 QoS(至少一次 + ACK)。
1. 核心概念 #
序号 | 名词(ENG) | 名词(zh-CN) | 解释 |
---|---|---|---|
1 | Publish | 发布 | 发布动作,往 subject 中投递一条消息 |
2 | Subscribe | 订阅 | 订阅动作,表示想要接受发布相应 subject 的消息 |
3 | Subject | 主题 | 唯一标识,用于标识一种或者一类事件的概念。订阅时,可以使用 nats 约定的通匹配符来接收一类 subjects,如 orders.> 。 |
4 | Core Nats | NATS 核心 | CORE NATS 提供了以下能力: - PUB / SUB https://docs.nats.io/nats-concepts/core-nats/pubsub - Request / Reply https://docs.nats.io/nats-concepts/core-nats/reqreply - Queue Groups https://docs.nats.io/nats-concepts/core-nats/queue 提供 最多一次 的消息传递保证 |
5 | Request / Reply | 请求 / 回复 | NATS 基于 PUB / SUB 实现的对 request 异步回复的功能,依赖于消息中的 reply 字段。reply 是内置实现,随机生成一个 “inbox” subject。 订阅者中间也是存在 Queue Group 概念的。 注意:发起 request 如果没有reply,那么服务端会返回 No Responders 消息。 |
6 | Queue groups | 队列组 | 订阅者使用同一个队列名称,它们就会成为一个队列组。每次队列组接收到消息时,只有队列组中_随机选择_的一个订阅者会消费一条消息,内置的负载均衡。 |
7 | Message | 消息 or 事件 | 一条消息包含了以下内容: - Subject. - A payload in the form of a byte array. - Any number of header fields. - An optional ‘reply’ address field. |
8 | Subject wildcards https://docs.nats.io/nats-concepts/subjects#wildcards |
主题通配符 | 订阅者可以使用这些通配符_通过单个订阅收听多个主题。_ 注意:但发布者将始终使用完全指定的主题,而不使用通配符。 |
9 | JetStream https://docs.nats.io/nats-concepts/jetstream |
无 / 流 | NATS 中一个功能特性,它是一个内置的分布式存储,在 CORE nats 的基础上扩展了更多的功能和更高的 QoS。功能上: - Stream:将 NATS 消息存储在流中,提供多种保留策略、限制、丢弃策略和主题映射功能。 - Consumer: 让客户端应用订阅或拉取流中的消息,支持多种重放策略、确认机制和流控功能 - Persistence: 将流的数据复制到多个 NATS 服务器,提供容错能力和加密存储功能 - KV Store: 可以将消息与键关联,提供存储、检索、删除和监听键值变化的功能。 - Object Store: JetStream 可以存储任意大小的对象(如文件),提供分块、校验和、元数据等功能。 其中最核心(开发常用)的概念就是:stream 和 consumer。 |
10 | Stream https://docs.nats.io/nats-concepts/jetstream/streams |
流 | 流即是消息存储,它定义了消息的存储方式以及保留的限制。 更具体的内容参见 #5.1 Stream 配置解析 |
11 | Consumer | 消费者 | 消费者作为客户端的接口,使用存储在流中的消息,跟踪客户端传递和确认的消息。Nats 同时支持 pull 和 push 两种消费模式;consumer 还提供了 durable 配置,用于持久化 consumer 消费信息(除非设置了 InactiveThreshold) 更具体的内容参见 #5.2 Consumer 配置解析 |
12 | Replay / Redelivery | ||
13 | Raft Group https://docs.nats.io/running-a-nats-service/configuration/clustering/jetstream_clustering |
RAFT 组 | 对于特定内容达成一致的分组,nats 中有 meta, stream, consumer 几种组。 Meta: 全部节点都是组成员。负责:JetStream API + 集群管理。 Stream: (根据 replicas 配置选择组内的服务器成员)。负责:stream 数据 Consumer: (根据 stream group的成员来确定消费者组内的成员)。负责:消费者状态 |
14 | KV Store | 键值存储 | |
15 | Object Store | 对象存储 |
2. 软件架构(集群模式) #
Nats 自身是提供了多种运行方式:
-
单机 只运行了一个 nats 实例。
-
普通集群: 运行了 3 / 5 个实例,其中一个为集群 leader,此时集群如下:
-
超级集群: 存在多个集群,集群之间可以通过 网关 来传播消息_。_网关提供了三种传播机制:这里 A / B 分别代表两个集群。
- Optimistic Mode 乐观模式
当 A 中的发布者发布“foo”时,A 网关将检查集群 B 是否已注册对“foo”没有兴趣。如果没有,则将“foo”转发给B。如果B收到“foo”后,在“foo”上没有订阅者,则B将向A发送一条网关协议消息,表示它对“foo”没有兴趣,从而阻止将来的消息关于“foo”被转发。
当后续 B 中有订阅者关注 “foo”, 那么 B 会发送一条网关协议,用来取消对 “foo” 没兴趣的设置。
- Interest-only Mode 兴趣(关注)模式
当 A 上的网关发送许多关于 B 不感兴趣的各种主题的消息时。 B 发送一条网关协议消息,要求 A 乐观地停止发送,如果已知对该主题感兴趣,则发送。随着 B 上的订阅不断增多,B 将向 A 更新其主题兴趣。
- Queue Subscriptions 队列订阅模式
服务器将始终首先尝试为本地队列订阅者提供服务,并且仅在未找到本地队列订阅者时进行故障转移。服务器将选择 RTT 最低的集群。
-
叶子结点/集群(代理模式):
透明地从本地客户端路由消息到一个或多个远程 NATS 系统。叶子结点采用本地的认证进行认证,连接远程 NATS 结点时,采用远程 NATS 系统的认证。通常用来降低 local 服务的延迟和流量。 注意:如果集群中一个节点配置为叶子结点,那么其余结点也要配置为叶子结点。
一般场景下最常用的就是集群模式。
Qs:
运维时怎么保证集群各节点的负载均衡?怎么保证集群的高可用?
3. Nats 客户端协议 #
https://docs.nats.io/reference/reference-protocols/nats-protocol
Nats 协议是一个_文本协议_ 这意味着,我们通过 telnet 就可以与之交互,通过抓包工具也可以很轻松的分析客户端和服务器之间的交互行为
Op 操作 | 发送方 | 操作场景描述 | 注意事项 |
---|---|---|---|
INFO | 服务器 | 当客户端建立连接时,或者服务器集群拓扑发生变化时,服务器发送自身的信息,配置和安全要求给客户端 | |
CONNECT | 客户端 | 当客户端收到服务器的 INFO 消息后,客户端发送自身的信息和安全信息给服务器,以完成连接 | verbose 字段默认为 false,表示服务器不会对每个消息回复 +OK |
PUB | 客户端 | 当客户端想要发布一个消息给指定的主题时,客户端发送 PUB 消息,可选地提供一个回复主题 | 消息内容是可选的,如果没有内容,需要把内容大小设置为 0,并且仍然需要第二个 CRLF |
HPUB | 客户端 | 和 PUB 相同,但是消息内容包含了 NATS 头部信息 | 消息内容是可选的,如果没有内容,需要把总消息大小设置为头部大小,并且仍然需要第二个 CRLF |
SUB | 客户端 | 当客户端想要订阅一个主题时,客户端发送 SUB 消息,可选地加入一个分布式队列组 | 主题名称必须是合法的,不能包含空格或者分隔符 |
UNSUB | 客户端 | 当客户端想要取消订阅一个主题时,客户端发送 UNSUB 消息,可选地指定一个消息数量,达到后自动取消订阅 | |
MSG | 服务器 | 当服务器向客户端发送一个应用消息时,服务器发送 MSG 消息,包含主题,sid,可选的回复主题和内容 | |
HMSG | 服务器 | 和 MSG 相同,但是消息内容包含了 NATS 头部信息 | |
PING | 服务器或客户端 | 当服务器或客户端想要检测对方是否存活时,发送 PING 消息 | 服务器会定期发送 PING 消息给客户端,如果客户端没有及时回复 PONG,服务器会断开连接 |
PONG | 服务器或客户端 | 当服务器或客户端收到 PING 消息时,回复 PONG 消息 | 服务器会把正常的流量当作 PING/PONG 的代理,所以如果客户端有消息流动,可能不会收到服务器的 PING |
+OK | 服务器 | 当服务器收到客户端的合法消息时,如果 verbose 字段为 true,服务器回复 +OK 消息 | 大多数客户端会把 verbose 字段设置为 false |
-ERR | 服务器 | 当服务器遇到协议错误,授权错误,或者其他运行时错误时,服务器发送 -ERR 消息给客户端 | 大多数这些错误会导致服务器关闭连接,客户端需要异步处理这些错误 |
从表里没有看到 Request 对不对?那是因为 Request 是基于 Pub / Sub 实现的 API,因此不在通信协议中,属于应用层的功能。
另外也没有看到 JetStream 相关的 Op 对不对?翻下代码就可以发现,它是基于 Request 机制实现的 pub,但通过前面的介绍,我们已经知道 CORE Nats 的 Reply 其实是 Subscriber 回复的,而 JetStream 是提供可靠存储的 ACK 并不能依赖于 Consumer ,所以这里服务器一定是有特殊处理。我们在 #6.5 JetStream 消息的投递和消费 一节中再详细展开。
4. NATS-Server 配置解析 #
https://docs.nats.io/running-a-nats-service/configuration
Nats 端口列表
默认端口号 | 作用 | 补充说明 |
443 | WebSocket 协议端口,通过 websocket 来进行交互 | |
1883 | MQTT 协议支持 | |
4222 | NATS 自身端口 | |
4111 | 叶节点允许本地客户端通过端口 4111 连接,且不需要任何认证 | |
6222 | 集群路由监听端口 | |
7222 | 网关监听端口 | |
7422 | 叶子结点监听端口 | |
8222 | 监控服务端口 |
5. JetStream 配置解析 #
Nats 的 jetstream 是通过 raft 来解决分布式一致性问题,以此实现 stream 相关的功能。
上图中展示了 jetstream 的一些关键点:
- Stream 可以存储多个 subject的消息。
- 消费者有多种消费模式(pull/push),还可以过滤stream 中的 subject 进行消费。
- 消费有多种确认模式(ack)
5.1 Stream 配置清单 #
https://docs.nats.io/nats-concepts/jetstream/streams#configuration
我们当前使用的 nats 版本对应 2.9.21 下表配置中,Metadata,compression, FirstSeq 和 SubjectTransform 暂不可用。
配置选项 | 描述 | 版本 | 可编辑 |
Name | 流的名称,必须是唯一的 | 2.2.0 | ❌ |
Storage | 流的存储类型,可以是 File(默认)或 Memory | 2.2.0 | ❌ |
Subjects | 流订阅的主题列表,可以使用通配符 | 2.2.0 | ✅ |
Replicas | 流的副本数量,必须大于等于 1 | 2.2.0 | ✅ |
Retention | 流的保留策略,决定了何时删除消息 | 2.2.0 | ✅ |
MaxAge | 流允许的最大消息存活时间,0 表示无限制 | 2.2.0 | ✅ |
MaxBytes | 流允许的最大字节数,-1 表示无限制 | 2.2.0 | ✅ |
MaxMsgs | 流允许的最大消息数量,-1 表示无限制 | 2.2.0 | ✅ |
MaxMsgSize | 流允许接收的最大消息体 | 2.2.0 | ✅ |
MaxConsumers | 流允许的最大消费者数量,-1 表示无限制 | 2.2.0 | ❌ |
NoAck | 流是否禁用确认机制,如果为 true,则不需要消费者确认消息。 | 2.2.0 | ✅ |
Retention | 声明流的保留策略 | 2.2.0 | ❌ |
Discard | 流达到限制时的丢弃策略,可以是 DiscardOld(默认)、DiscardNew 或 DiscardNewPerSubject | 2.2.0 | ✅ |
DuplicateWindow | 流的去重窗口,用于检测和删除重复的消息,0 表示禁用去重 | 2.2.0 | ✅ |
Placement | 流的放置选项,可以指定集群和标签 | 2.2.0 | ✅ |
Mirror | 流的镜像选项,可以指定源流和过滤条件 | 2.2.0 | 已设置不可修改 |
Sources | 流的源选项,可以指定一个或多个源流和过滤条件 | 2.2.0 | ✅ |
MaxMsgsPerSubject | 流允许的每个主题的最大消息数量,0 表示无限制 | 2.3.0 | ✅ |
Description | 描述信息 | 2.3.3 | ✅ |
Sealed | stream 封存,不允许删除。 | 2.6.2 | 可修改一次 |
DenyDelete | 限制通过 API 从 stream 中删除消息。 | 2.6.2 | ❌ |
DenyPurge | 限制通过 API 从 stream 中清除消息。 | 2.6.2 | ❌ |
AllowRollup | 流是否允许滚动更新,如果为 true,则可以使用 Nats-Rollup 头部删除旧的消息 | 2.6.2 | ✅ |
RePublish | 存储在 stream 会马上重新往配置的 subject 发布消息 | 2.8.3 | ✅ |
AllowDirect | ??? | ✅ | |
MirroDirect | ??? | 2.9.0 | ✅ |
DiscardNewPerSubject | 如果为 true, 丢弃策略为 DiscardNew 时,则丢弃每个 subject 新来的消息。 | 2.9.0 | ✅ |
Metadata | ???? | 2.10.0 | ✅ |
Compression | 文件存储压缩算法,s2 = snappy | 2.10.0 | ✅ |
FirstSeq | 指定 stream 的初始序列号 | 2.10.0 | ❌ |
SubjectTransform | 流的主题转换选项,可以指定源主题和目标主题,用于在存储消息时修改主题 | 2.10.0 | ✅ |
5.2 Consumer 配置清单 #
https://docs.nats.io/nats-concepts/jetstream/consumers
5.2.1 通用配置 #
配置选项 | 描述 | 版本 | 是否可编辑 |
Durable | 订阅绑定到使用者,直到被显示删除 | 2.2.0 | ❌ |
FilterSubject | 过滤 consumer 消费的主题,不能和 FilterSubjects 同时使用 | 2.2.0 | ✅ |
AckPolicy | 客户端确认策略:AckExplicit : 默认策略,每条消息单独确认。AckNone : 不缺人任何消息,服务器发送时即确认。AckAll :收到系列消息,只需要确认最后一条。 |
2.2.0 | ❌ |
AckWait | 一旦任何单个消息被传递给消费者,服务器将等待其确认的持续时间。如果没有及时收到确认,消息将被重新发送。 | 2.2.0 | ✅ |
DeliverPolicy | 消费者接受消息的策略:DeliverAll :默认策略,将从最早可用的消息开始消费DeliverLast : 开始消费后,消费加入到 stream 中的最新一条消息(如果过滤,则是匹配的最后一条)DeliverNew : 开始消费后,只会开始接收在消费者创建之后创建的消息。DeliverByStartSequence : 匹配序列号的第一条或者下一条seq >= OptStartSeq DeliverByStartTime : 匹配时间的第一条或者下一条 time >= OptStartTime DeliverLastPerSubject : 开始消费后,消费(匹配的)每个 subject 的最新一条消息。 |
2.2.0 | ❌ |
OptStartSeq | DeliverByStartSequence 配合使用 |
2.2.0 | ❌ |
OptStartTime | DeliverByStartTime 配合使用 |
2.2.0 | ❌ |
Description | 消费者描述 | ||
InactiveThreshold | Consumer 超过这个时间未活动会被清理,在 2.9 以前仅对 临时消费者生效。 | 2.2.0 | ✅ |
MaxAckPending | 定义为确认的消息的最大数量,一旦到达这个限制,那么将暂定投递消息。 | 2.2.0 | ✅ |
MaxDeliver | 尝试传送消息的最大次数。消费者未确认(NAck或者未发送确认)时会重新传递。 | 2.2.0 | ✅ |
ReplayPolicy | ReplayOriginal :模拟接收到消息的时间向 consumer 推送。ReplayInstant :消息将尽快推送到客户端。 |
2.2.0 | ❌ |
Replicas | 设置 consumer group 的副本数,默认继承 stream。 | 2.8.3 | ✅ |
MemoryStorage | 设置将 Consumer 的状态保留在内存中,而不是继承 stream 的存储类型。 | 2.8.3 | ✅ |
SampleFrequency | 采样率:??? | 2.2.0 | ✅ |
Metadata | 用于关联消费者的元数据 | 2.10.0 | ✅ |
FilterSubjects | 类似于 FilterSubject,但是多个 | 2.10.0 | ✅ |
5.2.2 拉模式 - 专属配置 #
配置选项 | 描述 | 版本 | 是否可编辑 |
MaxWaiting | 最大拉取请求数 | 2.2.0 | ❌ |
MaxRequestExpires | 单个拉取请求等待消息可供拉取的最长时间 | 2.2.0 | ❌ |
MaxRequestBatch | 单个请求的最大数量 (MaxRequestMaxBytes 共同作用,先到先限制) | 2.7.0 | ✅ |
MaxRequestMaxBytes | 单个请求的最大字节数 | 2.8.3 | ✅ |
5.2.3 推模式 - 专属配置 #
配置选项 | 描述 | 版本 | 是否可编辑 |
DeliverSubject | 设置服务器推送的主题,将隐式的设置消费者是基于推送。 | 2.2.0 | ❌ |
DeliverGroup | 类似于 queue group | 2.2.0 | ✅ |
FlowControl | 启用滑动窗口协议(服务端和客户端通信交换),用于控制服务器向客户端推送多少消息。 | 2.2.0 | ✅ |
IdleHeartbeat | 服务器定时向客户端发送状态消息(在心跳周期内没有新消息发送),客户端可以感知到服务器的状态。 | 2.2.0 | ✅ |
RateLimit | 限制向消费者发送消息的的速率 bits Per Second | 2.2.0 | ✅ |
HeadersOnly | 仅传送流中的消息标头,而不传送正文。其中会携带 Nats-Msg-Size 来标识负载的大小。 | 2.6.2 | ✅ |
6. 关键流程 #
这里梳理了一些 Nats 的关键流程,用来理解 nats 中的设计和实现原理。
6.1 服务启动 / 集群 #
集群 gossip 流程示意图:
nats-server -D -p 4222 -cluster nats://localhost:6222
nats-server -D -p 4333 -cluster nats://localhost:6333 -routes nats://localhost:6222
nats-server -D -p 4444 -cluster nats://localhost:6444 -routes nats://localhost:6333
流程解释说明:
- nats-1 启动
- nats-2 连接到 nats-1, nats-1 回复 INFO 消息,并将 nats-2 添加到自己的路由,同时建立到 nats-2 的连接。
- nats-3 连接到 nats-2, nats-2 回复 INFO 消息,并将 nats-3 添加到自己的路由,同时建立到 nats-3 的连接。这里 nats-2 会将新路由( nats-3 )传播(INFO 消息)到自己已知的服务节点,也就是 nats-1, 而 nats-1 收到 INFO 消息后,会向 nats-3 建立连接,与前面的流程类似。
6.2 PUB / SUB #
整体流程概览如下:
6.2.1 SUB #
当客户端新增订阅时,会向服务器发送一条 SUB 消息,服务器则会更新客户端(及对应账户)的订阅关系,同时向其余集群中的 route (服务节点) 发送 RS+ (集群通讯协议)消息以更新订阅。
6.2.2 PUB #
客户端发布一条消息,服务器内部匹配(有缓存设计以提高匹配效率)相关订阅客户端。将消息加入到客户端的发送缓冲区(注意:订阅中区分 普通订阅和 队列,队列模式则需要随机选中一个客户端),当相应的 route 节点收到消息后,会再从本地的订阅列表中匹配当前节点中的客户端推送消息。
6.3 Request / Reply #
从 Nats 设计的协议已经知道,Request 和 Reply 两个 API 是基于 PUB / SUB 机制。PUB 参数中可以携带一个 reply 参数,该 reply 参数是发布时自动创建的一个 _INBOX subject。服务端在推送消息时,会将 reply 放到 MSG 消息头传递给订阅者。
// This processes the sublist results for a given message.
// Returns if the message was delivered to at least target and queue filters.
func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, subject, reply []byte, flags int) (bool, [][]byte) {
// ...
var creply = reply
// ...
// Loop over all normal subscriptions that match.
for _, sub := range r.psubs {
...
// Normal delivery
mh := c.msgHeader(dsubj, creply, sub)
c.deliverMsg(prodIsMQTT, sub, acc, dsubj, creply, mh, msg, rplyHasGWPrefix)
}
}
6.4 新建 jetstream #
在开始之前,我们需要对 multi-raft 有个大致的概念,示意图如下:
stream raft group 就对应上图中的一个 raft group。与此同时,集群中所有开启 jetstream 功能的节点还会组成一个 meta raft group, 用来管理 jestream API 的执行和 jetstream 集群拓扑(节点上线离线)。
从 nats-cli 的源码入手,我们可以找到:
// NewStreamFromDefault creates a new stream based on a supplied template and optionsfunc (m *Manager) NewStreamFromDefault(name string, dflt api.StreamConfig, opts ...StreamOption) (stream *Stream, err error) {
// ...
var resp api.JSApiStreamCreateResponse
err = m.jsonRequest(fmt.Sprintf(api.JSApiStreamCreateT, name), &cfg, &resp)
if err != nil {
return nil, err
}
return m.streamFromConfig(&resp.Config, resp.StreamInfo), nil
}
Manager 的含义是 JetStreamManager,代表与服务端 JS API 交互,其中 api. JSApiStreamCreateT string = "$JS.API.STREAM.CREATE.%s"
可以发现底层还是通过 PUB / SUB 机制实现的交互。
在 nats-server 中检索可以发现,在 nats-server 内部定义了 JetStream 相关API 的 msgHandler。
func (s *Server) setJetStreamExportSubs() error {
// Start the go routine that will process API requests received by the
// subscription below when they are coming from routes, etc.. s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests")
s.startGoRoutine(s.processJSAPIRoutedRequests)
// This is the catch all now for all JetStream API calls.
if _, err := s.sysSubscribe(jsAllAPI, js.apiDispatch); err != nil {
return err
}
// ...
// API handles themselves.
pairs := []struct {
subject string
handler msgHandler
}{
{JSApiAccountInfo, s.jsAccountInfoRequest},
{JSApiTemplateCreate, s.jsTemplateCreateRequest},
{JSApiTemplates, s.jsTemplateNamesRequest},
{JSApiTemplateInfo, s.jsTemplateInfoRequest},
{JSApiTemplateDelete, s.jsTemplateDeleteRequest},
{JSApiStreamCreate, s.jsStreamCreateRequest},
{JSApiStreamUpdate, s.jsStreamUpdateRequest},
{JSApiStreams, s.jsStreamNamesRequest},
{JSApiStreamList, s.jsStreamListRequest},
{JSApiStreamInfo, s.jsStreamInfoRequest},
{JSApiStreamDelete, s.jsStreamDeleteRequest},
{JSApiStreamPurge, s.jsStreamPurgeRequest},
{JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
{JSApiStreamRestore, s.jsStreamRestoreRequest},
{JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
{JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
{JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
{JSApiMsgDelete, s.jsMsgDeleteRequest},
{JSApiMsgGet, s.jsMsgGetRequest},
{JSApiConsumerCreateEx, s.jsConsumerCreateRequest},
{JSApiConsumerCreate, s.jsConsumerCreateRequest},
{JSApiDurableCreate, s.jsConsumerCreateRequest},
{JSApiConsumers, s.jsConsumerNamesRequest},
{JSApiConsumerList, s.jsConsumerListRequest},
{JSApiConsumerInfo, s.jsConsumerInfoRequest},
{JSApiConsumerDelete, s.jsConsumerDeleteRequest},
}
js.mu.Lock()
defer js.mu.Unlock()
for _, p := range pairs {
sub := &subscription{subject: []byte(p.subject), icb: p.handler}
if err := js.apiSubs.Insert(sub); err != nil {
return err
}
}
return nil
}
在 client 消息分发(deliverMsg) 的逻辑中会对匹配的 subscription 执行 icb (msgHandler)。
在集群模式下,这些 JetStream API 的订阅也是会通过 route 客户端传导到集群中的每个节点。相当于每个节点都订阅/处理,JS API 的消息,但是只有 leader 节点可以处理。
jsStreamCreateRequest
后续代码逻辑(集群模式下, 单机只需要在本地操作即可):
jsStreamCluster(jetstream meta group) 的启动路径: Reload / Start -> enableJetStream -> enableJetStreamClustering -> startRaftNode/setupMetaGroup -> monitorCluster
- 通过 meta raft group 提交一条新增 stream 的日志消息(其中已经设置 raftgroup 的基本信息)参见
jsClusteredStreamRequest
函数 - jetstream 集群中的节点接收到该日志后(monitorCluster),会执行(applyMetaEntries)其中的操作 (assignStreamOp)添加 stream 执行逻辑参见 processStreamAssignment -> processClusterCreateStream
- 新建一个 stream raft group 启动 raft 节点
- 内部创建一个 stream 数据结构(mset= stream)
- 启动监视协程 monitorStream,处理 raft group 中的事件(快照,领导者变更,日志应用)
6.5 JetStream 消息的投递和消费 #
最简单的开始使用 JetStream 消费者的代码如下:
// connect to nats server
nc, _ := nats.Connect(nats.DefaultURL)
// create jetstream context from nats connection
js, _ := jetstream.New(nc)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// get existing stream handle
stream, _ := js.Stream(ctx, "foo")
// retrieve consumer handle from a stream
cons, _ := stream.Consumer(ctx, "cons")
// consume messages from the consumer in callback
cc, _ := cons.Consume(func(msg jetstream.Msg) {
fmt.Println("Received jetstream message: ", string(msg.Data()))
msg.Ack()
})
defer cc.Stop()
6.5.1 消费者创建 #
消费者可以显式或者隐式的创建,如果隐式创建往往都代表临时,也就是如果消费者不活跃则会被服务端删除。之后,客户端开始订阅消费者下发消息的 subject,push 方式等服务端推送,而 pull 模式则需要定时往服务端发送一条消息,触发消息下推。
从 nats.go 的源码出发,StreamConsumerManager 接口定义了一个 JetStream 中 Consumer 的相关操作。我们首先看下 consumer 的创建,同样的客户端是通过 JS API 来完成调用:
服务端统一使用了 jsConsumerCreateRequest 进行处理,其中逻辑与添加 stream 类似:jetstream集群模式下会创建一个raft group 设置好 leader, 并通过 meta group 来提交新增 consumer 的日志,每个节点会处理该消息(processConsumerAssignment->processClusterCreateConsumer):
- 获取到流
- 通过 addConsumerWithAssignment 添加一个 consumer
- (leader)发送响应
这个操作会在 stream raft group 的 部分/全部(取决于 consumer 配置的 Replicas)节点中创建一个 raft group (consumer) 用来保存 consumer 的相关信息(消费点位)。同时在 stream 对应的节点中添加 consumer 实例。
再回到客户端,consumer 创建(查询)后,获取到 consume 的信息,从中获取到 delivery subject(代表 consumer 消费的消息会从这个 subject 推送出来),后续客户端会订阅这个这个 subject。
这里可以说明服务端 consumer 和 客户端 consumer 没有必然的的联系,后续另外的客户端使用同一个 consumer 信息时,也只是共享 consumer 的配置,订阅相同的 subject,回到了 CORE NATS 提供的数据模型。
6.5.2 消息消费 #
consumer 提供了两种模式 pull (PullSubscribe) 和 push(Subscribe) 模式,虽然叫做 pull 和 push,但是这两种机制的底层实现还是基于 nats 的 pub / sub 机制。push 等价于 subscribe,服务端会将消息直接推送到客户端;pull 则是客户端主动调用 JS API ”请求“,服务端 “响应” 数据给客户端消费。参见下图抓包数据分析截图(提前创建了 stream 和 consumer,指定了拉模式)
nats.go 的 jetstream 包针对 pull consumer 提供了丰富的API(尤其是 Consume)让 pull consumer 可以和 push consumer 用相似的方式来持续的处理消息。
6.5.2.1 pull 模式 #
pullConsumer Consume 方法实现如下(已精简),与 PullSubscribe.Fetch 的原理类似(请求 JS API),区别在于 API 的形式表现不同:
func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error) {
// 解析消费选项
consumeOpts, err := parseConsumeOpts(false, opts...)
if err != nil {
return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err)
}
// pull subject 是 JS.API 下的一个 subject
// apiRequestNextT string = "CONSUMER.MSG.NEXT.%s.%s"
subject := apiSubj(p.jetStream.apiPrefix, fmt.Sprintf(apiRequestNextT, p.stream, p.name))
sub := &pullSubscription{
consumer: p,
errs: make(chan error, 1),
done: make(chan struct{}, 1),
fetchNext: make(chan *pullRequest, 1),
consumeOpts: consumeOpts,
}
// 创建一个 inbox subject 作为 pull 请求的 reply,internalHandler 就是 MsgHandler
inbox := p.jetStream.conn.NewInbox()
sub.subscription, err = p.jetStream.conn.Subscribe(inbox, internalHandler)
if err != nil {
return nil, err
}
go func() {
for {
select {
case status, ok := <-sub.connStatusChanged:
case err := <-sub.errs:
if errors.Is(err, ErrNoHeartbeat) {
sub.fetchNext <- &pullRequest{
Expires: sub.consumeOpts.Expires,
Batch: batchSize,
MaxBytes: sub.consumeOpts.MaxBytes,
Heartbeat: sub.consumeOpts.Heartbeat,
}
}
}
}
}()
// 根据 fetchNext 通知向服务端发送 pull 请求
// fetchNext 是由一个 consumeOpts 的心跳设置的一个 timer 触发 ErrNoHeartbeat 来激活
go sub.pullMessages(subject)
return sub, nil
}
6.5.2.2 push 模式 #
使用 DeliverSubject (没有设置会自动生成一个) 来设置消费者订阅的主题,同时创建一个对该主题的订阅,用于接收处理消息。push 模式同样会在服务器创建消费者。
小结:对于客户端来说消费 stream 中的消息,其实就是产生一个订阅主题,客户端会如同普通的 SUB 客户端一样消费这里的消息。但在此之前,需要根据场景和选项在服务器创建一个 consumer,告诉服务端自己的消费场景(是否持久化,关注stream中的哪些 subject, 消息的分发策略 等等)
6.5.3 消息投递推送 #
通过前面的消息消费我们可以知道,pullConsumer 拉取消息时会往一个形如 CONSUMER.MSG.NEXT 的主题中发起请求,通过在服务端代码检索可以发现,consumer 结构内保存了这么一个 subject, 并关联了对应的 icb(processNextMsgReq),而这个动作是在 consumer 创建时,在 leader 节点上设置的(consumer.setLeader)。
这里要注意,这个请求只能被 consumer group 中的 leader 处理。
与此同时 consumer leader 节点还设置了(与消费相关的操作):
- ack订阅、请求订阅 和 流控订阅等
- 如果消费者是推模式,那么会注册“兴趣通知”
- 同时会启动定时器来清除不活跃的消费者
- 启动消息推送的 loopAndGatherMsgs 逻辑,往 consumer.outq -> stream.outq 发送消息, stream.outq 在 stream.internalLoop 中执行往客户端推送的逻辑(通过 stream 的内部客户端往 subject 推送一条消息,与 pub 流程类似再由 CORE NATS 完成消息的分发)。
到这里总结下这个环节的问题:
- jetstream publish 的消息经过了什么样的流程?
- 消息到服务端后会检查 subject 相关的订阅,而 stream(leader)设置时已经配置了一个内部订阅。订阅会传播到集群中。 processInboundJetStreamMsg 的目的就是将将消息提交到 raft group, 最终通过 processJetStreamMsg 执行。
- stream 组提交后,各个副本保存,同时通知相关的 consumer
- server consumer(leader) 通过 loopAndGatherMsgs 往 subject 中推送消息
- server consumer 订阅了客户端 ack 消息,用于更新消费者位点等数据
- 是先保存还是先推送? 在stream 组内部提交保存后推送
- consumer 的消息是从 leader 来还是可以从 follower 直接读取? leader
- 服务端中 stream 和 consumer 这两个结构在其中承担什么样的职责?
- stream 代表 stream (包括 store / raft group / stream config)
- 负责 jetstream 相关API的请求处理
- stream 消息 复制/存储
- 通知/唤醒等待的 consumer
- consumer 代表 消费者(包括消费配置 / raft group ) 注意它不等于客户端实例
- consumer 组内的数据保存(raft group)
- API 处理(拉消费的 API)
- 消息推送
- 消费确认处理
- stream 代表 stream (包括 store / raft group / stream config)
JetStream 消息投递API 也是基于 PUB / SUB 机制实现的,但是存在的区别是: JetStream 是存在确认机制的,而 Core Nats 的 Publish API 并没有。因此要严格保证消息发布到 stream 中,需要使用 JetStream 的 API 来发送。
7. 扩展 #
7.1 RAFT Consensus Protocol #
Consensus is a fundamental problem in fault-tolerant distributed systems. Consensus involves multiple servers agreeing on values. Once they reach a decision on a value, that decision is final.
共识问题是分布式系统容错中的基础性问题,它描述的是:在多个服务节点间对某个值达成一致,一旦达成一致那么这个值就是最终的结果。RAFT 就是解决共识问题的一种算法,它以简单著名。
常见的一种实现是,通过 复制/多副本 的方式来解决分布式问题中的 可靠性 问题。常见的,我们的系统提供了3个副本,当其中一个节点宕机时,也不影响系统对外提供服务。而 RAFT 则提供相应的机制(通过当选的领导者达成共识)来解决 复制/多副本 过程中的一致性问题。
Raft 可视化介绍:https://thesecretlivesofdata.com/raft/
它其中有几个概念:
- Leader Election(选举)产生唯一的一个 leader 角色。
- Leader
- Candidate
- Follower
- Vote
- Term
- Log Replication(日志复制)用于服务器之间保持一致要素。
- Replicated State Machine(复制状态机)_每个服务器存储一个包含一系列指令的日志,并且按顺序执行指令。由于日志都包含相同顺序的指令,状态机会按照相同的顺序执行指令,由于状态机是确定的(deterministic),因此状态机会产生相同的结果。
7.2 Gossip Protocol #
它基于流行病传播方式的节点或者进程之间信息交换的协议。以给定的频率,每台计算机随机选择另一台计算机,并共享任何消息。
它的定义如下:
- 如果有某一项信息需要在整个网络中所有节点中传播,那从信息源开始,选择一个固定的传播周期(譬如 1 秒),随机选择它相连接的 k 个节点(称为 Fan-Out)来传播消息。
- 每一个节点收到消息后,如果这个消息是它之前没有收到过的,将在下一个周期内,选择除了发送消息给它的那个节点外的其他相邻 k 个节点发送相同的消息,直到最终网络中所有节点都收到了消息,尽管这个过程需要一定时间,但是理论上最终网络的所有节点都会拥有相同的消息。
它对网络节点的 连通性和稳定性 几乎没有任何要求,它一开始就将网络某些节点只能与一部分节点_部分连通(Partially Connected Network)而不是以全连通网络_(Fully Connected Network)作为前提;没有任何中心化节点或者主节点的概念。
相应的它的缺点是:无法准确地预计到需要多长时间才能达成全网一致;也存在重复传播的概率,消息在网路中冗余。
由此,Gossip 设计了两种可能的消息传播模式:反熵(Anti-Entropy)和传谣(Rumor-Mongering):
- 反熵:会同步节点的全部数据,以消除各节点之间的差异,目标是整个网络各节点完全的一致。
- 传谣:仅仅发送新到达节点的数据,即只对外发送变更信息。
在 Nats 中,gossip 被用于实现 集群服务发现 https://docs.nats.io/reference/reference-protocols/nats-server-protocol。routes 里指定了一个种子服务器,新启动的服务器连接上种子服务器后,就可以获取到全部的服务器列表。在服务器的配置里,可以不用知道所有的节点,而只用配置一个即可,但通常为了配置简单,会统一使用一个种子服务器。如下:
nats-server -D -p 4222 -cluster nats://localhost:6222
nats-server -D -p 4333 -cluster nats://localhost:6333 -routes nats://localhost:6222
nats-server -D -p 4444 -cluster nats://localhost:6444 -routes nats://localhost:6222
// 使用下面的命令启动也是可行的
nats-server -D -p 4444 -cluster nats://localhost:6444 -routes nats://localhost:6333
可以参见服务启动 / 集群的流程。
7.3 Zero allocation byte parser #
https://www.youtube.com/watch?v=ylRKac5kSOk&t=646s 零分配 的含义是:nats 在解析协议时,避免不必要的内存分配:
-
使用局部变量,在栈上分配
-
使用 slice 复用底层数组
-
采用状态机来解析协议的各个部分,而不是构建临时对象来存储中间状态。如: 一般情况下解析如下协议时,最常规的思路就是,先把 Command(PUB) 读出来,然后根据这个command 决定要后面的操作,这里再读去两个参数 subject, payload length, 有了 length 之后再读去 payload,这样我们就创建了_4个临时变量_在表示下面的命令。
PUB foo.bar 7 goodbye
在 nats 中则是通过状态机,一步一步的解析中间没有使用临时变量来存储命令中的数据。
-
避免使用字符串,网络IO场景中,操作的对象几乎都是字节,因此 nats parser 也避免使用字符串以减少内存分配和拷贝。
7.4 Nats CLI #
https://docs.nats.io/using-nats/nats-tools/nats_cli 能方便快捷的与 nats 交互 / 模拟 / 测试。