AMQP重连机制实现
在生产测试过程中频繁遇到Producer和Consumer无法发送到RabbitMQ的队列中去,查阅日志后发现错误‘channel/connection is not open’。结合阅读AMQP协议,因此实现了一个 Wrapper 来提供重连机制。
文中代码基于 https://github.com/streadway/amqp 实现。此方式简单暴力,但没有做到最小成本迁移(可以选择分别包装Producer和Consumer)。
文中所有代码参见:https://github.com/yeqown/infrastructure/tree/master/framework/amqp
基本知识
AMQP
AMQP
,即Advanced Message Queuing Protocol
,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。用下图简单描述下AMQP模型:
背景
-问题
生产环境中使用了RabbitMQ
做异步消息分发,隔一段时间会出现:发送接口报错;发送成功后未被消费等情况。重启服务后恢复。
-问题代码
生产者:
// producer.go
// NewClient .
func NewClient(cfg *types.RabbitMQConfig) *Client {
// init MQ connection
return &Client{
ch: ch, // *amqp.Channel
cfg: cfg,
}
}
// Client .
type Client struct {
ch *amqp.Channel
cfg *types.RabbitMQConfig
}
// Send . send by routing
func (c *Client) Send(payload *types.Payload) error {
var routing string
switch payload.Typ {
case types.PayloadTypUsers:
routing = c.cfg.MqRoutingUser
default:
routing = c.cfg.MqRoutingSys
}
dat, _ := json.Marshal(payload)
publishing := amqp.Publishing{
ContentType: "text/plain",
Body: dat,
}
return c.ch.Publish(c.cfg.MqExchagne, routing, false, false, publishing)
}
消费者:
// consumer.go
// ...
func (c *Client) Consume() error {
err := c.initRabbitmqResource()
if err != nil {
logger.Std.Errorf("could init rabbitmq resource: %v", err)
return err
}
msgs, err := c.rabbitmqChannel.Consume(
c.rabbitmqQ, // queue
"", // consumer
false, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
logger.Std.Errorf("could not consume messages: %v", err)
return err
}
logger.Std.Infof("start consumming messages")
for d := range msgs {
logger.Std.Debugf("consumming msg: body=%s, ex=%s, routing=%s, content-type=%s",
d.Body, d.Exchange, d.RoutingKey, d.ContentType)
if err := c.sendout(d.Body); err != nil {
continue
}
if err := d.Ack(false); err != nil {
logger.Std.Errorf("ack message error: %v", err)
}
}
// 队列被删除时会被执行到
logger.Std.Infof("this should not be executed !!!!!")
return nil
}
// ...
-分析问题
服务故障后,首先查看了生产者的系统日志,发现了 channel/connection is not open
。从github中检索后发现https://github.com/streadway/amqp/blob/75d898a42a/channel.go#L155 有一个逻辑是处理已经关闭的channel
, 会触发ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"}
,恰恰channel被关闭是上述代码没有考虑到的问题。
在消费者这边的问题,在channel
被关闭的情况下,就更显眼了:channel被关闭,消费者也就被关闭了,消费的goroutine自然也退出了,也就出现了消息无法被消费的情况。具体代码参见:
channel.Consume
https://github.com/streadway/amqp/blob/75d898a42a/channel.go#L1049consumers.buffer
https://github.com/streadway/amqp/blob/75d898a42a/consumers.go#L54
如果还有兴趣可以翻看下connection.shutdown
, channel.shutdown
。
func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {
// 退出时关闭消费channel
defer close(out)
defer subs.Done()
var inflight = in
var queue []*Delivery
for delivery := range in {
queue = append(queue, delivery)
for len(queue) > 0 {
select {
// 很明显,协程会在消费者被关闭的时候退出
case <-subs.closed:
// closed before drained, drop in-flight
return
case delivery, consuming := <-inflight:
if consuming {
queue = append(queue, delivery)
} else {
inflight = nil
}
case out <- *queue[0]:
queue = queue[1:]
}
}
}
}
而为什么channel会被关闭,在网上检索后并结合代码:
Seq | Description | Reason | Type |
---|---|---|---|
1 | 应用程序主动断开连接,网络异常被动断开连接 | 连接中断 | 连接关闭 |
2 | 重新声明现有队列或具有不匹配属性的交换将失败 | 406 PRECONDITION_FAILED | 协议异常 |
3 | 访问不允许用户访问的资源将失败 | 403 ACCESS_REFUSED错误 | 协议异常 |
4 | 绑定不存在的队列或不存在的交换将失败 | 404 NOT_FOUND错误 | 协议异常 |
5 | 从不存在的队列中使用将失败 | 404 NOT_FOUND错误 | 协议异常 |
6 | 发布到不退出的交换将失败 | 404 NOT_FOUND错误 | 协议异常 |
7 | 从除声明连接以外的连接访问独占队列将失败 | 405 RESOURCE_LOCKED | 协议异常 |
得到结论:由于网络波动、生产者消费者使用不当会导致channel被关闭.
重连原理
参考:
简而言之,channel.NotifyClose
和connection.NotifyClose
可以接收到错误消息,那就以此为重连的触发器。函数原型:func NotifyClose(c chan *Error) chan *Error
。实现思路如图:
结合***CODE***食用,效果更佳哦。handleReconnect
代码如下:
func (w *Wrapper) handleReconnect() {
for {
// 未连接上的情况下一直尝试连接
if !w.isConnected {
log.Println("Attempting to connect")
var (
connected = false
err error
)
for cnt := 0; !connected; cnt++ {
// connect 尝试连接,并在成功后触发changeConn事件,也会更新isConnected
if connected, err = w.connect(); err != nil {
log.Printf("Failed to connect: %s.\n", err)
}
if !connected {
log.Printf("Retrying... %d\n", cnt)
}
time.Sleep(reconnectDelay)
}
}
// 连接正常的情况下,处理channel.NotifyClose 和 conn.NotifyClose
// 注:在任意一个notify事件通知后,应完全处理两个channel中的消息,否则会造成无缓冲阻塞。
// https://ms2008.github.io/2019/06/16/golang-rabbitmq/
select {
case <-w.done:
println("w.done")
return
case err := <-w.chNotify:
log.Printf("channel close notify: %v", err)
w.isConnected = false
case err := <-w.connNotify:
log.Printf("conn close notify: %v", err)
w.isConnected = false
}
time.Sleep(reconnectDetectDur)
}
}
以上。
水平有限,如有错误,欢迎勘误指正🙏。