今天来聊一聊RocketMQ的延时消息是怎么实现的。 延时消息是指发送到RocketMQ后不会马上被消费者拉取到,而是等待固定的时间,才能被消费者拉取到。 延时消息的使用场景很多,比如电商场景下关闭超时未支付的订单,某些场景下需要在固定时间后发送提示消息。1。生产者 首先看一个生产者发送延时消息的官方示例代码:publicstaticvoidmain(String〔〕args)throwsException{InstantiateaproducertosendscheduledmessagesDefaultMQProducerproducernewDefaultMQProducer(ExampleProducerGroup);Launchproducerproducer。start();inttotalMessagesToSend100;for(inti0;itotalMessagesToSend;i){MessagemessagenewMessage(TestTopic,(Helloscheduledmessagei)。getBytes());Thismessagewillbedeliveredtoconsumer10secondslater。message。setDelayTimeLevel(3);Sendthemessageproducer。send(message);}Shutdownproducerafteruse。producer。shutdown();} 从上面的代码可以看到,跟普通消息不一样的是,消息设置setDelayTimeLevel属性值,这里设置为3,这里最终将3这个延时级别复制给了DELAY属性。 关于延时级别,可以看下面这个定义:MessageStoreConfig类privateStringmessageDelayLevel1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h; 这里延时级别有18个,上面的示例代码中延迟级别是3,消息会延迟10s后消费者才能拉取。2。Broker处理2。1写入消息 Broker收到消息后,会将消息写入CommitLog。在写入时,会判断消息DELAY属性是否大于0。代码如下:CommitLog类if(msg。getDelayTimeLevel()0){if(msg。getDelayTimeLevel()this。defaultMessageStore。getScheduleMessageService()。getMaxDelayLevel()){msg。setDelayTimeLevel(this。defaultMessageStore。getScheduleMessageService()。getMaxDelayLevel());}topicTopicValidator。RMQSYSSCHEDULETOPIC;intqueueIdScheduleMessageService。delayLevel2QueueId(msg。getDelayTimeLevel());Backuprealtopic,queueIdMessageAccessor。putProperty(msg,MessageConst。PROPERTYREALTOPIC,msg。getTopic());MessageAccessor。putProperty(msg,MessageConst。PROPERTYREALQUEUEID,String。valueOf(msg。getQueueId()));msg。setPropertiesString(MessageDecoder。messageProperties2String(msg。getProperties()));msg。setTopic(topic);msg。setQueueId(queueId);} 从上面的代码可以看到,CommitLog写入时并没有直接写入,而是把Topic改为SCHEDULETOPICXXXX,把queueId改为延时级别减1。因为延时级别有18个,所以这里有18个队列。如下图: 2。2调度消息 延时消息写入后,会有一个调度任务不停地拉取这些延时消息,这个逻辑在类ScheduleMessageService。这个类的初始化代码如下:publicvoidstart(){if(started。compareAndSet(false,true)){this。load();this。deliverExecutorServicenewScheduledThreadPoolExecutor(this。maxDelayLevel,newThreadFactoryImpl(ScheduleMessageTimerThread));省略部分逻辑for(Map。EntryInteger,Longentry:this。delayLevelTable。entrySet()){Integerlevelentry。getKey();LongtimeDelayentry。getValue();Longoffsetthis。offsetTable。get(level);if(nulloffset){offset0L;}if(timeDelay!null){省略部分逻辑this。deliverExecutorService。schedule(newDeliverDelayedMessageTimerTask(level,offset),FIRSTDELAYTIME,TimeUnit。MILLISECONDS);}}省略持久化的逻辑}} 上面的load()方法会加载一个delayLevelTable(ConcurrentHashMap类型),key保存延时级别(从1开始),value保存延时时间(单位是ms)。 load()方法结束后,创建了一个有18个核心线程的定时线程池,然后遍历delayLevelTable,创建18个任务(DeliverDelayedMessageTimerTask)进行每个延时级别的任务调度。任务调度的代码逻辑如下:publicvoidexecuteOnTimeup(){ConsumeQueuecqScheduleMessageService。this。defaultMessageStore。findConsumeQueue(TopicValidator。RMQSYSSCHEDULETOPIC,delayLevel2QueueId(delayLevel));if(cqnull){this。scheduleNextTimerTask(this。offset,DELAYFORAWHILE);return;}SelectMappedBufferResultbufferCQcq。getIndexBuffer(this。offset);if(bufferCQnull){省略部分逻辑this。scheduleNextTimerTask(resetOffset,DELAYFORAWHILE);return;}longnextOffsetthis。offset;try{inti0;ConsumeQueueExt。CqExtUnitcqExtUnitnewConsumeQueueExt。CqExtUnit();for(;ibufferCQ。getSize()isStarted();iConsumeQueue。CQSTOREUNITSIZE){longoffsetPybufferCQ。getByteBuffer()。getLong();intsizePybufferCQ。getByteBuffer()。getInt();longtagsCodebufferCQ。getByteBuffer()。getLong();省略部分逻辑longnowSystem。currentTimeMillis();longdeliverTimestampthis。correctDeliverTimestamp(now,tagsCode);nextOffsetoffset(iConsumeQueue。CQSTOREUNITSIZE);longcountdowndeliverTimestampnow;if(countdown0){this。scheduleNextTimerTask(nextOffset,DELAYFORAWHILE);return;}MessageExtmsgExtScheduleMessageService。this。defaultMessageStore。lookMessageByOffset(offsetPy,sizePy);if(msgExtnull){continue;}MessageExtBrokerInnermsgInnerScheduleMessageService。this。messageTimeup(msgExt);事务消息判断省略booleandeliverSuc;只保留同步deliverSucthis。syncDeliver(msgInner,msgExt。getMsgId(),nextOffset,offsetPy,sizePy);if(!deliverSuc){this。scheduleNextTimerTask(nextOffset,DELAYFORAWHILE);return;}}nextOffsetthis。offset(iConsumeQueue。CQSTOREUNITSIZE);}catch(Exceptione){log。error(ScheduleMessageService,messageTimeupexecuteerror,offset{},nextOffset,e);}finally{bufferCQ。release();}this。scheduleNextTimerTask(nextOffset,DELAYFORAWHILE);} 这段代码可以参考下面的流程图来进行理解: 上面有一个修正投递时间的函数,这个函数的意义是如果已经过了投递时间,那么立即投递。代码如下:privatelongcorrectDeliverTimestamp(finallongnow,finallongdeliverTimestamp){longresultdeliverTimestamp;longmaxTimestampnowScheduleMessageService。this。delayLevelTable。get(this。delayLevel);if(deliverTimestampmaxTimestamp){resultnow;}returnresult;} 注意:消息从CommitLog转发到ConsumeQueue时,会判断是否是延时消息(TopicSCHEDULETOPICXXXX并且延时级别大于0),如果是延时消息,就会修改tagsCode值为消息投递的时间戳,而tagsCode原值是tag的HashCode。代码如下:CommitLog类checkMessageAndReturnSize方法if(delayLevel0){tagsCodethis。defaultMessageStore。getScheduleMessageService()。computeDeliverTimestamp(delayLevel,storeTimestamp);} 如下图: 而ScheduleMessageService调度线程将消息从ConsumeQueue重新投递到原始队列中时,会把tagsCode再次修改为tag的HashCode,代码如下:类MessageExtBrokerInner,这个方法被messageTimeup方法调用。publicstaticlongtagsString2tagsCode(finalTopicFilterTypefilter,finalStringtags){if(nulltagstags。length()0){return0;}returntags。hashCode();} 如下图: 2。3一个问题 如果有一个业务场景,要求延时消息3小时才能消费,而RocketMQ的延时消息最大延时级别只支持延时2小时,怎么处理? 这里提供两个思路供大家参考: 在Broker上修改messageDelayLevel的默认配置; 在客户端缓存msgId,先设置延时级别是18(2h),当客户端拉取到消息后首先判断有没有缓存,如果有缓存则再次发送延时消息,这次延时级别是17(1h),如果没有缓存则进行消费。3总结 经过上面的讲解,延时消息的处理流程如下: 最后,延时消息的延时时间并不精确,这个时间是Broker调度线程把消息重新投递到原始的MessageQueue的时间,如果发生消息积压或者RocketMQ客户端发生流量管控,客户端拉取到消息后进行处理的时间可能会超出预设的延时时间