`
October 10, 2019 本文阅读量

AMQP重连机制实现

在生产测试过程中频繁遇到Producer和Consumer无法发送到RabbitMQ的队列中去,查阅日志后发现错误‘channel/connection is not open’。结合阅读AMQP协议,因此实现了一个 Wrapper 来提供重连机制。

文中代码基于 https://github.com/streadway/amqp 实现。此方式简单暴力,但没有做到最小成本迁移(可以选择分别包装Producer和Consumer)。

文中所有代码参见:https://github.com/yeqown/infrastructure/tree/master/framework/amqp

基本知识

AMQP

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。用下图简单描述下AMQP模型:

AMQP MODEL

背景

-问题

生产环境中使用了RabbitMQ做异步消息分发,隔一段时间会出现:发送接口报错;发送成功后未被消费等情况。重启服务后恢复

-问题代码

生产者:

// producer.go

// NewClient .
func NewClient(cfg *types.RabbitMQConfig) *Client {
    // init MQ connection
	return &Client{
		ch:  ch, // *amqp.Channel
		cfg: cfg,
	}
}

// Client .
type Client struct {
	ch  *amqp.Channel
	cfg *types.RabbitMQConfig
}

// Send . send by routing 
func (c *Client) Send(payload *types.Payload) error {
	var routing string

	switch payload.Typ {
	case types.PayloadTypUsers:
		routing = c.cfg.MqRoutingUser
	default:
		routing = c.cfg.MqRoutingSys
	}

	dat, _ := json.Marshal(payload)
	publishing := amqp.Publishing{
		ContentType: "text/plain",
		Body:        dat,
	}

	return c.ch.Publish(c.cfg.MqExchagne, routing, false, false, publishing)
}

消费者:

// consumer.go
// ...
func (c *Client) Consume() error {
    err := c.initRabbitmqResource()
	if err != nil {
		logger.Std.Errorf("could init rabbitmq resource: %v", err)
		return err
	}

	msgs, err := c.rabbitmqChannel.Consume(
		c.rabbitmqQ, // queue
		"",          // consumer
		false,       // auto ack
		false,       // exclusive
		false,       // no local
		false,       // no wait
		nil,         // args
	)
	if err != nil {
		logger.Std.Errorf("could not consume messages: %v", err)
		return err
	}

	logger.Std.Infof("start consumming messages")
	for d := range msgs {
		logger.Std.Debugf("consumming msg: body=%s, ex=%s, routing=%s, content-type=%s",
			d.Body, d.Exchange, d.RoutingKey, d.ContentType)
		if err := c.sendout(d.Body); err != nil {
			continue
		}

		if err := d.Ack(false); err != nil {
			logger.Std.Errorf("ack message error: %v", err)
		}
	}

	// 队列被删除时会被执行到
	logger.Std.Infof("this should not be executed !!!!!")
	return nil
}
// ...

-分析问题

服务故障后,首先查看了生产者的系统日志,发现了 channel/connection is not open。从github中检索后发现https://github.com/streadway/amqp/blob/75d898a42a/channel.go#L155 有一个逻辑是处理已经关闭的channel, 会触发ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"},恰恰channel被关闭是上述代码没有考虑到的问题

在消费者这边的问题,在channel被关闭的情况下,就更显眼了:channel被关闭,消费者也就被关闭了,消费的goroutine自然也退出了,也就出现了消息无法被消费的情况。具体代码参见:

如果还有兴趣可以翻看下connection.shutdown, channel.shutdown

func (subs *consumers) buffer(in chan *Delivery, out chan Delivery) {
	// 退出时关闭消费channel
	defer close(out)
	defer subs.Done()

	var inflight = in
	var queue []*Delivery

	for delivery := range in {
		queue = append(queue, delivery)

		for len(queue) > 0 {
			select {
			// 很明显,协程会在消费者被关闭的时候退出
			case <-subs.closed:
				// closed before drained, drop in-flight
				return

			case delivery, consuming := <-inflight:
				if consuming {
					queue = append(queue, delivery)
				} else {
					inflight = nil
				}

			case out <- *queue[0]:
				queue = queue[1:]
			}
		}
	}
}

而为什么channel会被关闭,在网上检索后并结合代码:

Seq Description Reason Type
1 应用程序主动断开连接,网络异常被动断开连接 连接中断 连接关闭
2 重新声明现有队列或具有不匹配属性的交换将失败 406 PRECONDITION_FAILED 协议异常
3 访问不允许用户访问的资源将失败 403 ACCESS_REFUSED错误 协议异常
4 绑定不存在的队列或不存在的交换将失败 404 NOT_FOUND错误 协议异常
5 从不存在的队列中使用将失败 404 NOT_FOUND错误 协议异常
6 发布到不退出的交换将失败 404 NOT_FOUND错误 协议异常
7 从除声明连接以外的连接访问独占队列将失败 405 RESOURCE_LOCKED 协议异常

得到结论:由于网络波动、生产者消费者使用不当会导致channel被关闭.

重连原理

参考:

简而言之,channel.NotifyCloseconnection.NotifyClose可以接收到错误消息,那就以此为重连的触发器。函数原型:func NotifyClose(c chan *Error) chan *Error。实现思路如图:

AMQP RECONNECT PROCESS

结合***CODE***食用,效果更佳哦。handleReconnect代码如下:

func (w *Wrapper) handleReconnect() {
	for {
		// 未连接上的情况下一直尝试连接
		if !w.isConnected {
			log.Println("Attempting to connect")
			var (
				connected = false
				err       error
			)

			for cnt := 0; !connected; cnt++ {
				// connect 尝试连接,并在成功后触发changeConn事件,也会更新isConnected
				if connected, err = w.connect(); err != nil {
					log.Printf("Failed to connect: %s.\n", err)
				}
				if !connected {
					log.Printf("Retrying... %d\n", cnt)
				}
				time.Sleep(reconnectDelay)
			}
		}

		// 连接正常的情况下,处理channel.NotifyClose 和 conn.NotifyClose
		// 注:在任意一个notify事件通知后,应完全处理两个channel中的消息,否则会造成无缓冲阻塞。
		// https://ms2008.github.io/2019/06/16/golang-rabbitmq/
		select {
		case <-w.done:
			println("w.done")
			return
		case err := <-w.chNotify:
			log.Printf("channel close notify: %v", err)
			w.isConnected = false
		case err := <-w.connNotify:
			log.Printf("conn close notify: %v", err)
			w.isConnected = false
		}
		time.Sleep(reconnectDetectDur)
	}
}

以上。

水平有限,如有错误,欢迎勘误指正🙏。

参考