游戏电视苹果数码历史美丽
投稿投诉
美丽时装
彩妆资讯
历史明星
乐活安卓
数码常识
驾车健康
苹果问答
网络发型
电视车载
室内电影
游戏科学
音乐整形

五张图带你理解RocketMQ延时消息机制

  今天来聊一聊RocketMQ的延时消息是怎么实现的。
  延时消息是指发送到RocketMQ后不会马上被消费者拉取到,而是等待固定的时间,才能被消费者拉取到。
  延时消息的使用场景很多,比如电商场景下关闭超时未支付的订单,某些场景下需要在固定时间后发送提示消息。1。生产者
  首先看一个生产者发送延时消息的官方示例代码:publicstaticvoidmain(String〔〕args)throwsException{InstantiateaproducertosendscheduledmessagesDefaultMQProducerproducernewDefaultMQProducer(ExampleProducerGroup);Launchproducerproducer。start();inttotalMessagesToSend100;for(inti0;itotalMessagesToSend;i){MessagemessagenewMessage(TestTopic,(Helloscheduledmessagei)。getBytes());Thismessagewillbedeliveredtoconsumer10secondslater。message。setDelayTimeLevel(3);Sendthemessageproducer。send(message);}Shutdownproducerafteruse。producer。shutdown();}
  从上面的代码可以看到,跟普通消息不一样的是,消息设置setDelayTimeLevel属性值,这里设置为3,这里最终将3这个延时级别复制给了DELAY属性。
  关于延时级别,可以看下面这个定义:MessageStoreConfig类privateStringmessageDelayLevel1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h;
  这里延时级别有18个,上面的示例代码中延迟级别是3,消息会延迟10s后消费者才能拉取。2。Broker处理2。1写入消息
  Broker收到消息后,会将消息写入CommitLog。在写入时,会判断消息DELAY属性是否大于0。代码如下:CommitLog类if(msg。getDelayTimeLevel()0){if(msg。getDelayTimeLevel()this。defaultMessageStore。getScheduleMessageService()。getMaxDelayLevel()){msg。setDelayTimeLevel(this。defaultMessageStore。getScheduleMessageService()。getMaxDelayLevel());}topicTopicValidator。RMQSYSSCHEDULETOPIC;intqueueIdScheduleMessageService。delayLevel2QueueId(msg。getDelayTimeLevel());Backuprealtopic,queueIdMessageAccessor。putProperty(msg,MessageConst。PROPERTYREALTOPIC,msg。getTopic());MessageAccessor。putProperty(msg,MessageConst。PROPERTYREALQUEUEID,String。valueOf(msg。getQueueId()));msg。setPropertiesString(MessageDecoder。messageProperties2String(msg。getProperties()));msg。setTopic(topic);msg。setQueueId(queueId);}
  从上面的代码可以看到,CommitLog写入时并没有直接写入,而是把Topic改为SCHEDULETOPICXXXX,把queueId改为延时级别减1。因为延时级别有18个,所以这里有18个队列。如下图:
  2。2调度消息
  延时消息写入后,会有一个调度任务不停地拉取这些延时消息,这个逻辑在类ScheduleMessageService。这个类的初始化代码如下:publicvoidstart(){if(started。compareAndSet(false,true)){this。load();this。deliverExecutorServicenewScheduledThreadPoolExecutor(this。maxDelayLevel,newThreadFactoryImpl(ScheduleMessageTimerThread));省略部分逻辑for(Map。EntryInteger,Longentry:this。delayLevelTable。entrySet()){Integerlevelentry。getKey();LongtimeDelayentry。getValue();Longoffsetthis。offsetTable。get(level);if(nulloffset){offset0L;}if(timeDelay!null){省略部分逻辑this。deliverExecutorService。schedule(newDeliverDelayedMessageTimerTask(level,offset),FIRSTDELAYTIME,TimeUnit。MILLISECONDS);}}省略持久化的逻辑}}
  上面的load()方法会加载一个delayLevelTable(ConcurrentHashMap类型),key保存延时级别(从1开始),value保存延时时间(单位是ms)。
  load()方法结束后,创建了一个有18个核心线程的定时线程池,然后遍历delayLevelTable,创建18个任务(DeliverDelayedMessageTimerTask)进行每个延时级别的任务调度。任务调度的代码逻辑如下:publicvoidexecuteOnTimeup(){ConsumeQueuecqScheduleMessageService。this。defaultMessageStore。findConsumeQueue(TopicValidator。RMQSYSSCHEDULETOPIC,delayLevel2QueueId(delayLevel));if(cqnull){this。scheduleNextTimerTask(this。offset,DELAYFORAWHILE);return;}SelectMappedBufferResultbufferCQcq。getIndexBuffer(this。offset);if(bufferCQnull){省略部分逻辑this。scheduleNextTimerTask(resetOffset,DELAYFORAWHILE);return;}longnextOffsetthis。offset;try{inti0;ConsumeQueueExt。CqExtUnitcqExtUnitnewConsumeQueueExt。CqExtUnit();for(;ibufferCQ。getSize()isStarted();iConsumeQueue。CQSTOREUNITSIZE){longoffsetPybufferCQ。getByteBuffer()。getLong();intsizePybufferCQ。getByteBuffer()。getInt();longtagsCodebufferCQ。getByteBuffer()。getLong();省略部分逻辑longnowSystem。currentTimeMillis();longdeliverTimestampthis。correctDeliverTimestamp(now,tagsCode);nextOffsetoffset(iConsumeQueue。CQSTOREUNITSIZE);longcountdowndeliverTimestampnow;if(countdown0){this。scheduleNextTimerTask(nextOffset,DELAYFORAWHILE);return;}MessageExtmsgExtScheduleMessageService。this。defaultMessageStore。lookMessageByOffset(offsetPy,sizePy);if(msgExtnull){continue;}MessageExtBrokerInnermsgInnerScheduleMessageService。this。messageTimeup(msgExt);事务消息判断省略booleandeliverSuc;只保留同步deliverSucthis。syncDeliver(msgInner,msgExt。getMsgId(),nextOffset,offsetPy,sizePy);if(!deliverSuc){this。scheduleNextTimerTask(nextOffset,DELAYFORAWHILE);return;}}nextOffsetthis。offset(iConsumeQueue。CQSTOREUNITSIZE);}catch(Exceptione){log。error(ScheduleMessageService,messageTimeupexecuteerror,offset{},nextOffset,e);}finally{bufferCQ。release();}this。scheduleNextTimerTask(nextOffset,DELAYFORAWHILE);}
  这段代码可以参考下面的流程图来进行理解:
  上面有一个修正投递时间的函数,这个函数的意义是如果已经过了投递时间,那么立即投递。代码如下:privatelongcorrectDeliverTimestamp(finallongnow,finallongdeliverTimestamp){longresultdeliverTimestamp;longmaxTimestampnowScheduleMessageService。this。delayLevelTable。get(this。delayLevel);if(deliverTimestampmaxTimestamp){resultnow;}returnresult;}
  注意:消息从CommitLog转发到ConsumeQueue时,会判断是否是延时消息(TopicSCHEDULETOPICXXXX并且延时级别大于0),如果是延时消息,就会修改tagsCode值为消息投递的时间戳,而tagsCode原值是tag的HashCode。代码如下:CommitLog类checkMessageAndReturnSize方法if(delayLevel0){tagsCodethis。defaultMessageStore。getScheduleMessageService()。computeDeliverTimestamp(delayLevel,storeTimestamp);}
  如下图:
  而ScheduleMessageService调度线程将消息从ConsumeQueue重新投递到原始队列中时,会把tagsCode再次修改为tag的HashCode,代码如下:类MessageExtBrokerInner,这个方法被messageTimeup方法调用。publicstaticlongtagsString2tagsCode(finalTopicFilterTypefilter,finalStringtags){if(nulltagstags。length()0){return0;}returntags。hashCode();}
  如下图:
  2。3一个问题
  如果有一个业务场景,要求延时消息3小时才能消费,而RocketMQ的延时消息最大延时级别只支持延时2小时,怎么处理?
  这里提供两个思路供大家参考:
  在Broker上修改messageDelayLevel的默认配置;
  在客户端缓存msgId,先设置延时级别是18(2h),当客户端拉取到消息后首先判断有没有缓存,如果有缓存则再次发送延时消息,这次延时级别是17(1h),如果没有缓存则进行消费。3总结
  经过上面的讲解,延时消息的处理流程如下:
  最后,延时消息的延时时间并不精确,这个时间是Broker调度线程把消息重新投递到原始的MessageQueue的时间,如果发生消息积压或者RocketMQ客户端发生流量管控,客户端拉取到消息后进行处理的时间可能会超出预设的延时时间

小白也能玩的搬砖游戏大更新,赶紧冲一款骨灰级搬砖玩家都安利的游戏,非常适合小白快速上手,它就是《创造与魔法》,7月14日即将更新月宫空战版本,小伙伴们赶紧冲!这是款3D沙盒类自由探索游戏,画风符合大众胃口……火箭队管理层为什么这两年把球队运营如此糟糕,换帅首当其中火箭队目前的运营乌烟瘴气,充满混乱,从上到下一团糟,18赛季的火箭真的是太遗憾了,但是从争冠球队到争夺状元的球队新老板费尔蒂塔淋淋雨用了两年就把火箭从天堂带到了地狱,火箭……国产手游厉害了,奥斯卡最佳艺术指导亲自动手,把时装复刻到现实对于游戏时装,相信是不少玩家的心头爱,平时玩游戏时,想方设法都要弄一套漂亮的时装穿在身上。甚至有些狂热爱好者,还会自己动手,将游戏中比较好看的时装复刻到现实之中来穿在身上,可见……脚和脸的攀比争斗作为一个人,我们都有自己的审美观和喜好,有些人喜欢美女的脚,有些人则更喜欢美女的脸。在我看来,美女的脚和脸各有其不同的特点和魅力。首先,美女的脸是她们最为重要的资本之一,……广西南宁12315中心提示谨防直播带货诱导场外交易来源:中国新闻网本报南宁讯(记者顾艳伟)3月7日,广西壮族自治区南宁市市场监管局12315中心发布消费提示,提醒消费者在选择直播带货购物时,认真查看经营者在直播平台公示情……奥巴梅扬事件发生后的几小时,现皇马球星的家里惨遭入侵者的袭击皇家马德里球星丹尼卡瓦哈尔成为最新一名武装入侵者目标的足球运动员。西班牙警方正在调查两名蒙面男子企图闯入他位于马德里首都附近的家中,据称他们携带的武器被称为铁棍。现皇马当……诀症患者头等苍人狂喜!苍兰诀将上线番外纵观今年6月到8月的暑期档剧集,各平台上新了超过30部,但真正能引发全民关注的并不多。在古装剧方面,《苍兰诀》异军突起,从播出前的不被看好,到一路逆袭成为黑马,最终收获了……中超赛程调整第12轮5场比赛延期,第16轮2场比赛提前在本周北京时间8月8日,中足联举行了线上会议,对中超赛程进行了调整。第12轮有5场比赛延期,同时第16轮的2场比赛,提前到原定12轮的比赛日内进行,也就是本周进行。由于海南的突……对话TikTok女装品牌Go。G。G上线5个月如何做到英国销TikTok英国小店虽然一度被后起的东南亚市场抢占风头,但开通英国小店一直是很多商家进入美国、甚至更广泛成熟市场之前的必然选择,因此英国小店始终不乏出海卖家,而随着经验成熟,也……3比0,中国女排四连胜!10月1日晚将迎来第二个劲敌北京时间9月30日晚,在荷兰阿纳姆举行的2022年第十九届女排世锦赛D组第四轮比赛中,中国女排以3比0战胜捷克女排,迎来世锦赛四连胜。北京时间10月1日晚8时,中国女排将迎来第……平心而论,27岁的威金斯,还有可能成为一个巨星吗威金斯在天王山之战的表现,让我不经发出了一个提问,现在已经27岁的嘴哥,他还有能去成长为一名巨星球员吗,虽然威金斯在G5的数据上,不能说有那么的夸张,砍下40还是50,但是对于……京东手机816战略升级发布会加速全渠道布局开拓手机服务新阵地二十年前的世纪交替之际,全球向荣,互联网也随之腾飞。国人开始接纳并逐渐习惯于在线上完成购物、娱乐、工作的生活模式,而成就这一切的基础是智能科技推动的万物互联。作为占据消费者生活……
雷神首款客制化机械键盘粒子上市Gasket结构三模无线,59IT之家1月1日消息,今日雷神发布了2022年首款新品:客制化机械键盘【粒子】。这款键盘采用独特的外壳,支持三模无线有线连接,采用Gasket结构,配备PBT键帽,首发售价59……双奥园区里的双奥人杨洪福这儿每寸土地像家一样亲切4月8日,北京冬奥会、冬残奥会总结表彰大会在人民大会堂举行,北京奥林匹克中心区管委会副主任杨洪福被授予突出贡献个人称号。在聆听总书记的讲话时,我的心情非常激动,筹办举办冬……南芯推出AC双口快充协议控制芯片,支持PD3。1IT之家1月12日消息,据南芯半导体官方消息,最新推出的AC口单颗快充协议控制芯片SC2021A支持了TypeCPD3。1。官方表示,南芯最新推出的SC2021A是一颗高……紫米ZMI20号移动电源(200W)宣布涨价,此前为369元IT之家11月19日消息,今日紫米公司宣布,由于目前3C配件原材料价格持续大幅波动,且未来较长一段时间核心部件、材料价格仍会持续走高。迫于材料成本大幅波动,官方决定于2021年……消息称华为本月将举行两次发布会,将推出Matebook13S感谢IT之家网友络世的线索投递!IT之家9月3日消息数码博主菊厂影业Fans透露,华为将于9月13日左右发布Matebook13S和14S,采用英特尔处理器。此外,她还透……46秒86!泳坛史上第一飞鱼诞生,打破尘封13年世界纪录,全2022年欧洲游泳锦标赛男子100米自由泳决赛,罗马尼亚小将大卫波波维奇不仅夺冠,并且以46秒86的成绩打破世界纪录,这项纪录已经尘封了13年之久。看到自己的成绩,波波维奇激动……95号汽油将迎9元时代对居民生活将有哪些影响?俄乌事件持续发酵,国际油价持续暴涨。而从目前我国来看,我国油价将迎来9元时代。如此暴涨的油价,也让很多人纷纷感叹,现在是真的开不起车。那么,若油价上涨将对我国居民生活构成哪些影……华为P50Pocket折叠屏真机上手曝光背部双环设计,小巧便感谢IT之家网友肖战割割的线索投递!IT之家12月23日消息,华为已官宣年度重磅新机华为P50Pocket(中文名P50宝盒)将于今晚发布,届时IT之家将为大家带来更多报……女篮全明星欢乐闹元宵王丽丽荣膺MVP昨天是中国传统佳节元宵节,WCBA全明星正赛也于昨晚在深圳龙华体育中心上演。在经历了一场堪称联赛史上最激烈的全明星过招后,北区明星队以84比79力克南区明星队,王丽丽凭借着24……中邮Hinova9Pro今日开售骁龙778G5G120Hz超IT之家12月9日消息,中邮Hinova9Pro于12月2日发布,搭载骁龙778G5G处理器,售价2999元起,将于今日上午10:08正式开售。Hinova9售价为(无充……RedmiAX6000路由器发布8数据流4核处理器,首发48感谢IT之家网友heheyuo的线索投递!IT之家4月5日消息,小米今晚开卖新款Redmi路由器AX6000,售价499元,首发489元。IT之家了解到,Redmi……799元,京东云无线宝AX6600路由器开售能赚京豆,高通5IT之家4月1日消息,近期,京东云对外发布无线宝AX6600路由器、无线宝AX3000路由器、无线宝移动固态硬盘等新品。无线宝AX6600路由器采用高通五核WiFi6芯片,支持……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网