文中代码主要以golang为示例,其他语言实现方式请自行Google RabbitMQ一条消息从生产端到消费端共经3个步骤:生产端发送消息到RabbitMQ;RabbitMQ发送消息到消费端;消费端消费这条消息。 如图所示,在每个步骤中都可能会发生消息丢失的情况,因而我们需要有措施来保障系统的可靠性。(磁盘损坏等极端情况就不在我们的考虑范围啦)一、生产端投递消息可靠性 生产端投递消息丢失的原因有很多,例如消息在传输过程中发生网络故障导致丢失,或者消息投递到RabbitMQ时RabbitMQ挂了,从而引发消息丢失,此类情况我们根本无法预估并知晓。 因而,我们可以利用RabbitMQ的一些机制来处理1。事务消息机制 事务处理方案所说能够保障消息的强一致性,但因此造成的性能损耗较大,面对大量消息推送时性能问题严重,故而我们可以考虑轻量级解决方案:confirm消息确认机制2。confirm消息确认机制 生产端投递的消息投递到RabbitMQ后,RabbitMQ将发送一个确认消息给到生产端,让生产端知晓我已收到消息,否则这条消息就可能丢失了,需要生产端再次发起消息投递。 开启确认机制:errchannel。Confirm(false) 以confirm模式发送消息示例:packagemainimport(fmtgithub。comstreadwayamqprmqdbrmq)var(channelamqp。Channelerrerrorqueueamqp。Queueconnamqp。Connection)funcmain(){conn,errrmq。GetConn()deferconn。Close()channel,errconn。Channel()iferr!nil{fmt。Printf(error:s,err。Error())return}deferchannel。Close()errchannel。Confirm(false)iferr!nil{fmt。Printf(error:s,err。Error())return}queue,errchannel。QueueDeclare(confirm:message,false,false,false,false,nil)iferr!nil{fmt。Printf(error:s,err。Error())return}confirms:channel。NotifyPublish(make(chanamqp。Confirmation,1))deferconfirmOne(confirms)errchannel。Publish(,queue。Name,false,false,amqp。Publishing{ContentType:textplain,Body:〔〕byte(confirmmessage),})iferr!nil{fmt。Printf(error:s,err。Error())return}fmt。Println(消息发送成功)}funcconfirmOne(confirmschanamqp。Confirmation){ifconfirmed:confirms;confirmed。Ack{fmt。Printf(带标志的消息投递确认:d,confirmed。DeliveryTag)}else{fmt。Printf(消息投递确认:d,confirmed。DeliveryTag)}} 采用此种方案就可以让生产端感知到消息是否投递到RabbitMQ中。二、消息持久化 RabbitMQ收到消息后是暂存到内存当中,此时若RabbitMQ挂了,重启服务将会导致数据丢失,所以我们应当将相关数据持久化到硬盘中,这样RabbitMQ重启后依然可以到硬盘中取数据恢复。 消息达到RabbitMQ后先到exchange交换机,然后路由到queue队列,最后发送给消费端。 因而需要对exchange、queue、message都进行持久化: exchange持久化:第三个参数true表示这个exchange持久化channel。ExchangeDeclare(exchange,direct,true,false,false,true,nil) queue持久化:第二个参数true标识这个queue持久化channel。QueueDeclare(confirm:message,true,false,false,false,nil) message持久化:errch。Publish(exchange,queuename,false,false,amqp。Publishing{DeliveryMode:amqp。Persistent,消息持久化ContentType:textplain,Body:〔〕byte(msgBody),}) 注意:如果需要消息持久化,Queue也是需要设定为持久化才有效。 这样,如果RabbitMQ收到消息后挂掉了,重启后会自行恢复消息。 思考一下,基于以上几种机制,我们还是不能完全保证消息可靠性投递到RabbitMQ中,比如极端情况,RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂掉了,此时消息依然是丢失了;或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障导致生产端没有收到确认消息,导致生产端不知道RabbitMQ是否收到消息,依然不好处理接下来的业务。 因而我们需要在上述机制的基础上,考虑一些消息补偿机制,以应对部分极端情况,比如消息入库。三、消息入库 我们可以考虑将要发送的消息保存到数据库中,标注一个状态字段status0,标识生产端将消息发送给RabbitMQ但还没收到确认回复。在生产端收到RabbitMQ确认回复后,将status设为1,表示RabbitMQ已收到消息。 考虑到前面提到的极端情况,我们可以在生产端开设一个定时器,定时检索消息表,将status0并且超过固定期限后还没收到确认的消息内容取出重发(此时消费端要考虑消息重复情况,提前做好幂等性设置),并设定重发最大次数,超限做单独的特殊处理。 到此,消息就可以可靠性投递到RabbitMQ中,生产端也可以正常感知了。四、消费端信息不丢失 正常情况下,以下三种情况会导致消息丢失在RabbitMQ将消息发出后,消费端还没有接收到消息前发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;在RabbitMQ将消息发出后,消费端还没有接收到消息前消费端挂了,此时消息会丢失;消费端准备接收到消息后,但在处理消息过程中发生异常或宕机,消息会丢失。 综合上述三种情况,都是因为RabbitMQ的自动ack机制,即RabbitMQ默认在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时,RabbitMQ也没有该消息了。 因此就需要将自动ack机制改为手动ack机制。 消费端手动确认消息:接收消息messages,err:r。channel。Consume(r。QueueName,队列名称,区分消费者false,设置关闭自动应答,每条消息必须手动ackfalse,false,false,nil)iferr!nil{fmt。Println(err)}消费消息channel:make(chanbool)gofunc(){fork:rangemessages{true表示确认所有未确认消息false表示确认当前消息k。Ack(false)}}()阻塞主程序channel 当autoAck设置为false时,对于RabbitMQ服务端而言,队列中的消息就分为两类:等待投递给消费端的消息已经投递给消费端,等待消费端发回确认信号的消息 如果RabbitMQ一直没有接收到消费端的确认信号,且消费端已经断开链接或宕机,此时RabbitMQ会将此消息重新放入队列,等待下次投递。故而消费端也需要做好幂等性设置,确保消息重复处理机制。 综合以上方案,即可保证生产端RabbitMQ消费端全链路数据不丢失啦!