RabbitMQ中的消息长期未被消费会过期吗?用过RabbitMQ的小伙伴可能都有这样的疑问,今天松哥就来和大家捋一捋这个问题。1。默认情况 首先我们来看看默认情况。 默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。 这种情况具体代码就不用我再演示了吧,松哥之前的文章凡是涉及到RabbitMQ的,基本上都是这样的。2。TTL TTL(TimeToLive),消息存活的时间,即消息的有效期。如果我们希望消息能够有一个存活时间,那么我们可以通过设置TTL来实现这一需求。如果消息的存活时间超过了TTL并且还没有被消息,此时消息就会变成死信,关于死信以及死信队列,松哥后面再和大家介绍。 TTL的设置有两种不同的方式:在声明队列的时候,我们可以在队列属性中设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期。在发送消息的时候设置消息的有效期,这样不同的消息就具有不同的有效期。 那如果两个都设置了呢? 以时间短的为准。 当我们设置了消息有效期后,消息过期了就会被从队列中删除了(进入到死信队列,后文一样,不再标注),但是两种方式对应的删除时机有一些差异:对于第一种方式,当消息队列设置过期时间的时候,那么消息过期了就会被删除,因为消息进入RabbitMQ后是存在一个消息队列中,队列的头部是最早要过期的消息,所以RabbitMQ只需要一个定时任务,从头部开始扫描是否有过期消息,有的话就直接删除。对于第二种方式,当消息过期后并不会立马被删除,而是当消息要投递给消费者的时候才会去删除,因为第二种方式,每条消息的过期时间都不一样,想要知道哪条消息过期,必须要遍历队列中的所有消息才能实现,当消息比较多时这样就比较耗费性能,因此对于第二种方式,当消息要投递给消费者的时候才去删除。 介绍完TTL之后,接下来我们来看看具体用法。 接下来所有代码松哥都以SpringBoot中封装的AMPQ为例来讲解。2。1单条消息过期 我们先来看单条消息的过期时间。 首先创建一个SpringBoot项目,引入Web和RabbitMQ依赖,如下: 然后在application。properties中配置一下RabbitMQ的连接信息,如下:spring。rabbitmq。host127。0。0。1spring。rabbitmq。port5672spring。rabbitmq。usernameguestspring。rabbitmq。passwordguestspring。rabbitmq。virtualhost 接下来稍微配置一下消息队列:ConfigurationpublicclassQueueConfig{publicstaticfinalStringJAVABOYQUEUEDEMOpublicstaticfinalStringJAVABOYEXCHANGEDEMOpublicstaticfinalStringHELLOROUTINGKEYBeanQueuequeue(){returnnewQueue(JAVABOYQUEUEDEMO,true,false,false);}BeanDirectExchangedirectExchange(){returnnewDirectExchange(JAVABOYEXCHANGEDEMO,true,false);}BeanBindingbinding(){returnBindingBuilder。bind(queue())。to(directExchange())。with(HELLOROUTINGKEY);}} 这个配置类主要干了三件事:配置消息队列、配置交换机以及将两者绑定在一起。首先配置一个消息队列,new一个Queue:第一个参数是消息队列的名字;第二个参数表示消息是否持久化;第三个参数表示消息队列是否排他,一般我们都是设置为false,即不排他;第四个参数表示如果该队列没有任何订阅的消费者的话,该队列会被自动删除,一般适用于临时队列。配置一个DirectExchange交换机。将交换机和队列绑定到一起。 这段配置应该很简单,没啥好解释的,有一个排他性,松哥这里稍微多说两句: 关于排他性,如果设置为true,则该消息队列只有创建它的Connection才能访问,其他的Connection都不能访问该消息队列,如果试图在不同的连接中重新声明或者访问排他性队列,那么系统会报一个资源被锁定的错误。另一方面,对于排他性队列而言,当连接断掉的时候,该消息队列也会自动删除(无论该队列是否被声明为持久性队列都会被删除)。 接下来提供一个消息发送接口,如下:RestControllerpublicclassHelloController{AutowiredRabbitTemplaterabbitTGetMapping(hello)publicvoidhello(){MessagemessageMessageBuilder。withBody(hellojavaboy。getBytes())。setExpiration(10000)。build();rabbitTemplate。convertAndSend(QueueConfig。JAVABOYQUEUEDEMO,message);}} 在创建Message对象的时候我们可以设置消息的过期时间,这里设置消息的过期时间为10秒。 这就可以啦! 接下来我们启动项目,进行消息发送测试。当消息发送成功之后,由于没有消费者,所以这条消息并不会被消费。打开RabbitMQ管理页面,点击到Queues选项卡,10s之后,我们会发现消息已经不见了: 很简单吧! 单条消息设置过期时间,就是在消息发送的时候设置一下消息有效期即可。2。2队列消息过期 给队列设置消息过期时间,方式如下:BeanQueuequeue(){MapString,ObjectargsnewHashMap();args。put(xmessagettl,10000);returnnewQueue(JAVABOYQUEUEDEMO,true,false,false,args);} 设置完成后,我们修改消息的发送逻辑,如下:RestControllerpublicclassHelloController{AutowiredRabbitTemplaterabbitTGetMapping(hello)publicvoidhello(){MessagemessageMessageBuilder。withBody(hellojavaboy。getBytes())。build();rabbitTemplate。convertAndSend(QueueConfig。JAVABOYQUEUEDEMO,message);}} 可以看到,消息正常发送即可,不用设置消息过期时间。 OK,启动项目,发送一条消息进行测试。查看RabbitMQ管理页面,如下: 可以看到,消息队列的Features属性为D和TTL,D表示消息队列中消息持久化,TTL则表示消息会过期。 10s之后刷新页面,发现消息数量已经恢复为0。 这就是给消息队列设置消息过期时间,一旦设置了,所有进入到该队列的消息都有一个过期时间了。2。3特殊情况 还有一种特殊情况,就是将消息的过期时间TTL设置为0,这表示如果消息不能立马消费则会被立即丢掉,这个特性可以部分替代RabbitMQ3。0以前支持的immediate参数,之所以所部分代替,是因为immediate参数在投递失败会有basic。return方法将消息体返回(这个功能可以利用死信队列来实现)。 具体代码松哥就不演示了,这个应该比较容易。3。死信队列 有小伙伴不禁要问,被删除的消息去哪了?真的被删除了吗?非也非也!这就涉及到死信队列了,接下来我们来看看死信队列。3。1死信交换机 死信交换机,DeadLetterExchange即DLX。 死信交换机用来接收死信消息(DeadMessage)的,那什么是死信消息呢?一般消息变成死信消息有如下几种情况:消息被拒绝(Basic。RejectBasic。Nack),井且设置requeue参数为false消息过期队列达到最大长度 当消息在一个队列中变成了死信消息后,此时就会被发送到DLX,绑定DLX的消息队列则称为死信队列。 DLX本质上也是一个普普通通的交换机,我们可以为任意队列指定DLX,当该队列中存在死信时,RabbitMQ就会自动的将这个死信发布到DLX上去,进而被路由到另一个绑定了DLX的队列上(即死信队列)。3。2死信队列 这个好理解,绑定了死信交换机的队列就是死信队列。3。3实践 我们来看一个简单的例子。 首先我们来创建一个死信交换机,接着创建一个死信队列,再将死信交换机和死信队列绑定到一起:publicstaticfinalStringDLXEXCHANGENAMEpublicstaticfinalStringDLXQUEUENAMEpublicstaticfinalStringDLXROUTINGKEY配置死信交换机returnBeanDirectExchangedlxDirectExchange(){returnnewDirectExchange(DLXEXCHANGENAME,true,false);}配置死信队列returnBeanQueuedlxQueue(){returnnewQueue(DLXQUEUENAME);}绑定死信队列和死信交换机returnBeanBindingdlxBinding(){returnBindingBuilder。bind(dlxQueue())。to(dlxDirectExchange())。with(DLXROUTINGKEY);} 这其实跟普通的交换机,普通的消息队列没啥两样。 接下来为消息队列配置死信交换机,如下:BeanQueuequeue(){MapString,ObjectargsnewHashMap();设置消息过期时间args。put(xmessagettl,0);设置死信交换机args。put(xdeadletterexchange,DLXEXCHANGENAME);设置死信routingkeyargs。put(xdeadletterroutingkey,DLXROUTINGKEY);returnnewQueue(JAVABOYQUEUEDEMO,true,false,false,args);} 就两个参数:xdeadletterexchange:配置死信交换机。xdeadletterroutingkey:配置死信routingkey。 这就配置好了。 将来发送到这个消息队列上的消息,如果发生了nack、reject或者过期等问题,就会被发送到DLX上,进而进入到与DLX绑定的消息队列上。 死信消息队列的消费和普通消息队列的消费并无二致:RabbitListener(queuesQueueConfig。DLXQUEUENAME)publicvoiddlxHandle(Stringmsg){System。out。println(dlxmsgmsg);} 很容易吧~4。小结 好啦,今天就和小伙伴们聊一聊RabbitMQ中的消息过期问题,感兴趣的小伙伴可以去试试哦~原文链接:https:mp。weixin。qq。comsfFqzuN2AnLazoCyahL3EFw 原作者:江南一点雨