AMQP

AMQP重连机制实现

文中代码基于 https://github.com/streadway/amqp 实现。此方式简单暴力,但没有做到最小成本迁移(可以选择分别包装Producer和Consumer)。

文中所有代码参见:https://github.com/yeqown/infrastructure/tree/master/framework/amqp

基本知识 #

AMQP #

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。用下图简单描述下AMQP模型:

AMQP MODEL

背景 #

-问题 #

生产环境中使用了RabbitMQ做异步消息分发,隔一段时间会出现:发送接口报错;发送成功后未被消费等情况。重启服务后恢复

-问题代码 #

生产者:

// producer.go

// NewClient .
func NewClient(cfg *types.RabbitMQConfig) *Client {
    // init MQ connection
	return &Client{
		ch:  ch, // *amqp.Channel
		cfg: cfg,
	}
}

// Client .
type Client struct {
	ch  *amqp.Channel
	cfg *types.RabbitMQConfig
}

// Send . send by routing 
func (c *Client) Send(payload *types.Payload) error {
	var routing string

	switch payload.Typ {
	case types.PayloadTypUsers:
		routing = c.cfg.MqRoutingUser
	default:
		routing = c.cfg.MqRoutingSys
	}

	dat, _ := json.Marshal(payload)
	publishing := amqp.Publishing{
		ContentType: "text/plain",
		Body:        dat,
	}

	return c.ch.Publish(c.cfg.MqExchagne, routing, false, false, publishing)
}

消费者:

...

访问量 访客数