AMQP重连机制实现
October 10, 2019
文中代码基于 https://github.com/streadway/amqp 实现。此方式简单暴力,但没有做到最小成本迁移(可以选择分别包装Producer和Consumer)。
文中所有代码参见:https://github.com/yeqown/infrastructure/tree/master/framework/amqp
基本知识 #
AMQP #
AMQP
,即Advanced Message Queuing Protocol
,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。用下图简单描述下AMQP模型:
背景 #
-问题 #
生产环境中使用了RabbitMQ
做异步消息分发,隔一段时间会出现:发送接口报错;发送成功后未被消费等情况。重启服务后恢复。
-问题代码 #
生产者:
// producer.go
// NewClient .
func NewClient(cfg *types.RabbitMQConfig) *Client {
// init MQ connection
return &Client{
ch: ch, // *amqp.Channel
cfg: cfg,
}
}
// Client .
type Client struct {
ch *amqp.Channel
cfg *types.RabbitMQConfig
}
// Send . send by routing
func (c *Client) Send(payload *types.Payload) error {
var routing string
switch payload.Typ {
case types.PayloadTypUsers:
routing = c.cfg.MqRoutingUser
default:
routing = c.cfg.MqRoutingSys
}
dat, _ := json.Marshal(payload)
publishing := amqp.Publishing{
ContentType: "text/plain",
Body: dat,
}
return c.ch.Publish(c.cfg.MqExchagne, routing, false, false, publishing)
}
消费者:
...