游戏电视苹果数码历史美丽
投稿投诉
美丽时装
彩妆资讯
历史明星
乐活安卓
数码常识
驾车健康
苹果问答
网络发型
电视车载
室内电影
游戏科学
音乐整形

还在担心消息积压问题难解决?思路代码优化细节全公开

  前言
  之前就遇到的一个问题吧,发生过一两次了,每次都是重启等着慢慢消费,但这样肯定是不行的啦,还是得上点魔法。按照既往惯例,我都会在文章选题开始时,看看网上大家都是怎么写的,看了不少文章吧,感觉大家写的还是比较抽象。(),不过可能也是因为消息积压这一块吧,这个解决方案属实是太过简单,确实也没啥好说的,导致大家这个不怎么贴代码,老是讲思路,我也能够理解。但是作为好家伙棒小伙,我肯定得给大家端上一碗热腾腾的鸡汤,最近心情略差,压力过大,所以皮不动了。那么直接开始吧,鸡汤来咯!事件背景
  之前我不是自己维护了一套日志收集系统嘛,是FilebeatKafka数据处理服务ElasticsearchKibanaSkywalking这样的架构,其中数据处理服务是一个springboot项目部署在公司的DevOps平台上。因为是我自己弄的,我就自己建了一个项目空间,专门放我写的组件的源码以及这个数据处理服务,平时我也是在上面自己部署的,因为稳定运行一年多嘛,我平时也不关注。
  几个周前的时候,呃,不知道咋回事生产的那个节点挂了。周四晚上挂的,发的邮件通知我没看到(邮件太多就不看了),周五没人看日志,就这么到了周一下午,有同事过来问我为啥没有这几天的日志。我先登录Kibana看了下,确实少了这几天的日志,然后我就登录DevOps发现节点没了,上CMAK(Kafka监控)发现积压了八百多万数据,嚯嚯,小一千万,大的要来了。宕机后的处理
  首先告诉发现的小伙伴都别慌,小BUG小场面,先用DevOps上面自带的K8S日志输出看实时日志或者写命令看日志文件。
  点击容器日志可查看实时输出的日志,如下图。我个人觉得这个其实也蛮方便的,但是要定位历史问题,或者要进行搜索、链路追踪啥的,就不如一套日志收集系统了。
  因为我这个自己做的日志收集系统不是公司级的项目(架构组也在做,但是有技术问题,技术问题场景太多了,所以还没有完全推广),大概就我们大组几十个开发在使用,应用到了二十多个子系统上,目前日志量已经增长到了月度亿级。
  因为我会在代码规范中明令禁止测试或者正式环境打印Sql,所以日志量总体还算是可控。再加上我之前在数据处理服务中写了定时,定期删除半年以上日志,因此磁盘表示它状态良好。万幸的是,暂时没有遇到太大问题,但是后续可能要考虑压缩历史日志归档的问题。
  扯远了,说了一些正确的废话,成功让大家摸鱼十秒钟,哈哈。回到正题,我是马上重启了数据处理服务进行消费,由于原Topic只有3个分区,所以并发度是3。可能有的人会说为啥消费的时候不开多线程呢,开多线程批量插入ES就不受分区并发度限制了?对的,这是一个手段,但是我开启了手动提交offset,为了确保日志不丢失,所以这并发度没法提升了。
  正式的环境不能乱动,但是测试的可以随便玩,那么这个消息积压的问题场景就由我收下了!解决思路如下,非常简单,但问题就在于细节非常多,网上的实践内容也相对偏少,所以我就想着通过文章的方式来复盘下我全部的操作。修复现有consumer的问题,并将其停掉,再不停就真G了。重新创建一个容量更大的Topic,比如patition是原来的N倍,大大大。编写一个临时consumer程序,消费原来积压的队列(注意该consumer不做任何耗时的操作,仅作为中转将消息快速写入新创建的Topic里)。将修复好的consumer部署到原来N倍的机器上消费新Topic。消息积压解决后,恢复原有架构。真实的操作场景复盘新建一个中转Topic
  这里无论是通过命令还是在可视化工具上创建,都可以啦,不管黑猫白猫,抓到耗子就是好猫。因为原来的Topic分区是3,这里可以选择提升10倍,改成30。但是我是选择了相对保守的12作为分区数,因为我这资源有限啦。临时消费者,中转消息
  这里我们就在原数据处理服务直接写一个监听加中转就OK了。AutowiredQualifier(performanceKafkaTemplate)privateKafkaTemplateString,StringkafkaTemplate;KafkaListener(topics{logzero{systemProperties〔env〕}},containerFactorytestFactory)publicvoidforwardTopic(ListConsumerRecordString,Stringrecords,Acknowledgmentack){ListLogMsgDTOlogMsgListnewArrayList(1024);records。forEach(record{ListenableFutureSendResultString,StringsendListenerkafkaTemplate。send(logzerotest,record。value());sendListener。addCallback(success{},err{log。error(消息发送失败,err);});});ack。acknowledge();}复制代码
  这里针对Kafka做了一个封装,用的是自己封装的高性能版生产者和测试版消费者。消费新Topic
  首先改写一下原有的消费逻辑,因为之前是按月动态索引,所以消费消息时会先从redis中获取是否存在当月索引,现在我可以直接写死这个索引,新增即可,消费的代码如下。KafkaListener(topics{logzerotest},containerFactorytestFactory)publicvoidtestLogListen(ListConsumerRecordString,Stringrecords,Acknowledgmentack){ListLogMsgDTOlogMsgListnewArrayList(1024);records。forEach(record{Stringvaluerecord。value();String〔〕splitvalue。split();Stringsystemsplit〔2〕;if(split。length13){Stringenvsplit〔3〕;LogInfoDTOinfonewLogInfoDTO();info。setCreateTime(split〔0〕);info。setLogType(split〔1〕);info。setSystem(system);info。setEnv(env);info。setLocalIp(split〔4〕);info。setRequestIp(split〔5〕);info。setTriceId(split〔6〕);info。setRequestUrl(split〔7〕);info。setRequestType(split〔8〕);info。setRequestMethod(split〔9〕);info。setUserId(null。equalsIgnoreCase(split〔10〕)?:split〔10〕);info。setUserName(split〔11〕);info。setThreadName(split〔12〕);info。setMsg(split〔13〕);logMsgList。add(newLogMsgDTO(testlogzerouatre,JSON。toJSONString(info)));}});if(!CollectionUtils。isEmpty(logMsgList)){esService。bulkAddRequest(logMsgList);}ack。acknowledge();}批量新增ahrefhttps:www。bs178。comrizhitargetblankclassinfotextkey日志apublicvoidbulkAddRequest(ListLogMsgDTOmsgList){BulkRequestrequestnewBulkRequest();等待批量请求作为执行的超时设置为2分钟request。timeout(TimeValue。timeValueMinutes(3));msgList。forEach(msg{request。add(newIndexRequest(msg。getTopic())。source(msg。getMsg(),XContentType。JSON));});zeroClient。bulkAsync(request,RequestOptions。DEFAULT,newActionListenerBulkResponse(){OverridepublicvoidonResponse(BulkResponsebulkResponse){for(BulkItemResponsebulkItemResponse:bulkResponse){booleanfailedbulkItemResponse。isFailed();if(failed){log。error(批量插入消息失败,详细信息{},bulkItemResponse。getFailureMessage());}DocWriteResponseitemResponsebulkItemResponse。getResponse();switch(bulkItemResponse。getOpType()){caseINDEX:caseCREATE:IndexResponseindexResponse(IndexResponse)itemResponse;handleAddDocSuccess(indexResponse);break;caseUPDATE:UpdateResponseupdateResponse(UpdateResponse)itemResponse;break;caseDELETE:DeleteResponsedeleteResponse(DeleteResponse)itemResponse;break;default:break;}}}OverridepublicvoidonFailure(Exceptione){log。error(批量插入失败,e);}});}复制代码
  消费消息这个场景做的很简单啊,就是转换日志信息,然后拼接ES批量插入的请求体,几乎没什么费事的动作。这里要注意一下,如果消息早已发送到Topic,并且选择了一个全新的消费者组,那么将auto。offset。reset改成earliest从头读起。积压消息消费之后,切回原来的架构
  因为我这里使用了DevOps平台,所以切换很简单,选之前的容器变更部署即可。年轻人,听说你也懂优化
  我是真的栓Q,大乌鱼事件发生,现在不懂优化是真的不行的辣!生产者优化
  众所周知,Kafka不懂优化也没有太大关系,因为通用版已经够好用了,但是作为卷王怎么能够接受通用,必然是狠狠地定制,定在墙上那种。这里简单说一下高性能版生产者参数的调整,顺道解释下调整意义。batch。size从16384(16KB)提升到163840(160KB),简单粗暴的提升10倍批量大小,攒够这个大小才允许发送消息,从而减少请求次数。linger。ms默认是0ms,也就是无延迟推送消息,因为batch。size调大了所以这里同步增加到20ms,注意这里需要根据实际情况进行调整。
  batch。size和linger。ms是决定吞吐量和延时的重要参数,两个条件任一满足即可发送消息。batch。size过大会导致发送消息难以攒够batch。size大小导致消息发不出去,因此需要linger。ms保底,linger。ms时间一到也能发送消息。buffer。memory默认是32MB,这个缓冲大小要随着batch。size提升而提升到67108864(64MB)。避免过小导致消息无法写入,从而阻塞后续的生产消息。acks使用默认值1,保证分区leader副本写入即可。max。request。size默认1048576B(1MB),因为日志消息,比如异常堆栈信息会超过1MB,所以这里调整到5MB,注意Topic的message。max。bytes也要同步调整大小。request。timeout。ms默认3000ms,增大了max。request。size也要适当提升下请求超时参数,比如60000ms,暴力加参是这样的。compression。type默认无,因为日志场景会有较大量的输入,所以最好开一下压缩。我这里综合考虑压缩比和压缩吞吐量选择了lz4,注意这里broker的compression。type默认是跟随producer的,如果自己有修改,一定要改回去,避免压缩算法不一致导致的额外开销。
  以上便是高性能通用版生产者的重要参数调整,接下来针对生产速度做一个对比。在本地笔记本8核16G环境下(远低于服务器性能,仅作对比参考),并发消费3分区向12分区(副本数为1)新Topic生产消息,另一对比者reliableHighKafkaTemplate(仅开启ACK为1,调大请求消息体),结果峰值如下。高性能版的生产者是要比普通的版本要高出不少的,如果放在服务器上这个速度还会暴增,因此生产消息的速度我们并不需要太在意,肯定是比消费更快的。
  高性能版
  高可靠版
  6045。41条s
  3925。80条s消费者优化
  既然使用了SpringKafka就要老老实实的接受人家的设定(这里是妄图通过Kafkaclient写出消费者,但是被生命周期管理代码劝退,从而老老实实用Spring真香桶的败犬菜恐龙),根据Spring官方给的两条路,选择并发消费。相比写配置文件调参,我更喜欢通过bean加载,除了可以更自由的选择之外,还可以塞到组件里即插即用。接下来重点介绍下高性能模式下,如何调整消费者参数,以及如何定制SpringKafka的消费者工厂。authorWangZYdate20233618:52description测试用手动BeanpublicConcurrentKafkaListenerContainerFactoryString,StringtestFactory(){ConcurrentKafkaListenerContainerFactoryString,StringcontainernewConcurrentKafkaListenerContainerFactory();MapString,ObjectpropsnewHashMap(16);props。put(ConsumerConfig。BOOTSTRAPSERVERSCONFIG,prop。getKafkaServers());props。put(ConsumerConfig。KEYDESERIALIZERCLASSCONFIG,StringDeserializer。class);props。put(ConsumerConfig。VALUEDESERIALIZERCLASSCONFIG,StringDeserializer。class);props。put(ConsumerConfig。GROUPIDCONFIG,StringUtils。isEmpty(prop。getConsumerGroupId())?testConsumerGroup:prop。getConsumerGroupId());props。put(ConsumerConfig。FETCHMINBYTESCONFIG,1048576);props。put(ConsumerConfig。MAXPOLLRECORDSCONFIG,1000);props。put(ConsumerConfig。MAXPOLLINTERVALMSCONFIG,60000);props。put(ConsumerConfig。ENABLEAUTOCOMMITCONFIG,false);props。put(ConsumerConfig。AUTOOFFSETRESETCONFIG,earliest);container。setConsumerFactory(newDefaultKafkaConsumerFactory(props));container。setConcurrency(3);container。setBatchListener(true);container。getContainerProperties()。setAckMode(ContainerProperties。AckMode。MANUAL);returncontainer;}复制代码fetch。min。bytes该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka中拉取的最小数据量,默认值为1B。为了减少网络IO,也是配合前面生产者的高性能配置,这里果断上调到1048576(1MB),对应fetch。max。bytes为50MB,这个不用调。max。poll。records用来配置Consumer在一次拉取请求中拉取的最大消息数,默认值为500(条),略小略小,配合生产者调整至1000。这里是预测值,要根据实际情况调整,下文有提,看到最后哟。max。poll。interval。ms这个参数定义了两次poll()之间的最大间隔,默认值为3000(5分钟)。如果超过这个间隔同样会触发rebalance。在多数情况下这个参数是导致rebalance消息重复的关键,即业务处理消息耗时太长。这里一般来说不用调整,如果出现,也是优先调整代码,而不是修改该参数。
  这里SpringKafka的消费者工厂定义,除了调参之外,重要还有开启并发监听setBatchListener(true)。只有开启了这个参数,才能以publicvoidtestLogListen(Listrecords)这种入参为List的方式监听消费。同理这里getContainerProperties()。setAckMode(ContainerProperties。AckMode。MANUAL)则是为了让入参中的Acknowledgmentack生效,改为手动提交offset后,虽然降低了消费速度,但是避免了因消费报错导致的消息丢失问题。ES配置优化
  前面ElasticSearch不停机重建索引引申来的优化与思考掘金(juejin。cn)也讲过这个问题,这一次当然还是类似的配置副本数可以先设置为0,刷新间隔禁用,刷盘设置为异步即可。别的一些歪门邪道经测试没大用,除了前面三项最好的提升就是加内存,这个堆硬件最好使。插一个官方推荐的优化插入速度的链接TuneforindexingspeedElasticsearchGuide〔8。6〕Elastic,顺道总结下ES官方推荐的提高插入速度的方法使用Bulk批量插入多线程请求插入ES取消或者增加索引刷新间隔插入时禁用索引初始副本服务器禁用swap索引文档使用自动生成的ID使用更快的硬件,比如SSD加内存(这一点是我加的,因为ES很多配置是内存百分比的,提高内存一般是性能提升的最优解。懂吗朋友,加钱啊)不同架构的消费速度对比
  目前测试节点是三节点集群,都是mixed节点,服务器都是8核16G1T机械磁盘(用SSD会更快)。ES除了一个节点因机器上服务过多(Kibana,SkywalkingUI,rediscluster等),设置为4G(Xms4GXmx4G),另两台设置为6G。接下来我会以表格的方式记录不同配置下,在Kibana上观测到的极限插入速度。
  贴一张Kibana的图,其他就不截图了,不然成灌水了。这个实测结果其实是符合我的预期的,不过让我意外的是1master(4G)2data(66)的速度高于3mixed(466)。不知道是不是我的机器配置原因导致了这一点,但是我手里也没有空闲的服务器供我测试了,就留待读者们自己实验了,有测试过的大佬可以在评论区留言。简单来说,是符合上述ES官方推荐的优化配置的说法的,多线程的提升是巨量的,禁用副本和刷新间隔也会产生不小的提升。
  消费的单次拉取值是需要根据实际情况调整的,我第一次调整是单次3000,后来调成2000,都发生了同一个问题,速度没上去,服务器负载倒是飙升,如下图
  刺激的BUG战场发送消息体过大
  新建了Topic后,写好中转生产者后,刚调用就G了,快如闪电。原因很简单,消息体超过message。max。bytes默认值1MB了,调大即可。ES插入获取请求连接超时java。util。concurrent。TimeoutException:Connectionleaserequesttimeoutatorg。apache。http。nio。pool。AbstractNIOConnPool。processPendingRequest(AbstractNIOConnPool。java:411)〔httpcorenio4。4。13。jar:4。4。13〕atorg。apache。http。nio。pool。AbstractNIOConnPool。processNextPendingRequest(AbstractNIOConnPool。java:391)〔httpcorenio4。4。13。jar:4。4。13〕atorg。apache。http。nio。pool。AbstractNIOConnPool。release(AbstractNIOConnPool。java:355)〔httpcorenio4。4。13。jar:4。4。13〕atorg。apache。http。impl。nio。conn。PoolingNHttpClientConnectionManager。releaseConnection(PoolingNHttpClientConnectionManager。java:391)〔httpasyncclient4。1。4。jar:4。1。4〕复制代码
  我是在springboot数据处理服务中使用了elasticsearchresthighlevelclient客户端,其中封装了HttpClient来向ES发送请求。这里的问题来源于使用了bulkAsync(后根据场景换成了同步请求bulk),这是个异步请求,底层是AbstractNIOConnPool提供了请求连接池。发生这个报错的原因是新的异步请求在获取连接时超过了获取连接超时时间,解决方案最直接的就是调大连接池大小和请求连接超时时间。httpClientBuilder。setMaxConnTotal(zeroProperties。getMaxConnectTotal());httpClientBuilder。setMaxConnPerRoute(zeroProperties。getMaxConnectPerRoute());复制代码ES配置超时时间无效RestClientBuilderbuilderRestClient。builder(httpHostZeroList。toArray(newHttpHost〔0〕));builder。setHttpClientConfigCallback(httpClientBuilder{httpClientBuilder。setMaxConnTotal(zeroProperties。getMaxConnectTotal());httpClientBuilder。setMaxConnPerRoute(zeroProperties。getMaxConnectPerRoute());returnhttpClientBuilder;});if(!StringUtils。isEmpty(username)!StringUtils。isEmpty(password)){CredentialsProvidercredentialsProvidernewBasicCredentialsProvider();credentialsProvider。setCredentials(AuthScope。ANY,newUsernamePasswordCredentials(username,password));builder。setHttpClientConfigCallback(httpClientBuilder{httpClientBuilder。setDefaultCredentialsProvider(credentialsProvider);returnhttpClientBuilder;});}复制代码
  在解决上面问题时Debug发现一个很神奇的现象,就是这里的配置超时时间的setMaxConnTotal和setMaxConnPerRoute,如果后面有相同的setHttpClientConfigCallback方法时,就会走最后一个set方法,前面的不会生效。在没有debug之前,我设置的参数一直不能生效,始终是默认值,我百思不得其解,最后通过debug,发现了这个神奇的现象。最后解决方法如下,就是放在一起进行配置,希望有大佬能解答我这个问题。RestClientBuilderbuilderRestClient。builder(httpHostZeroList。toArray(newHttpHost〔0〕));if(!StringUtils。isEmpty(username)!StringUtils。isEmpty(password)){CredentialsProvidercredentialsProvidernewBasicCredentialsProvider();credentialsProvider。setCredentials(AuthScope。ANY,newUsernamePasswordCredentials(username,password));builder。setHttpClientConfigCallback(httpClientBuilder{httpClientBuilder。setDefaultCredentialsProvider(credentialsProvider);httpClientBuilder。setMaxConnTotal(zeroProperties。getMaxConnectTotal());httpClientBuilder。setMaxConnPerRoute(zeroProperties。getMaxConnectPerRoute());returnhttpClientBuilder;});}else{builder。setHttpClientConfigCallback(httpClientBuilder{httpClientBuilder。setMaxConnTotal(zeroProperties。getMaxConnectTotal());httpClientBuilder。setMaxConnPerRoute(zeroProperties。getMaxConnectPerRoute());returnhttpClientBuilder;});}复制代码ES过量插入导致OOM
  org。elasticsearch。client。ResponseException:method〔POST〕,host〔http:192。168。158。115:9200〕,URI〔bulk?timeout3m〕,statusline〔HTTP1。1429TooManyRequests〕
  {error:{rootcause:〔{type:circuitbreakingexception,reason:〔parent〕Datatoolarge,datafor〔〕wouldbe〔20770611941。9gb〕,whichislargerthanthelimitof〔20401094651。8gb〕,realusage:〔20742843521。9gb〕,newbytesreserved:〔27768422。6mb〕,usages〔request00b,fielddata3712136。2kb,inflightrequests1074997410。2mb,modelinference00b,accounting81006007。7mb〕,byteswanted:2077061194,byteslimit:2040109465,durability:TRANSIENT}〕,type:circuitbreakingexception,reason:〔parent〕Datatoolarge,datafor〔〕wouldbe〔20770611941。9gb〕,whichislargerthanthelimitof〔20401094651。8gb〕,realusage:〔20742843521。9gb〕,newbytesreserved:〔27768422。6mb〕,usages〔request00b,fielddata3712136。2kb,inflightrequests1074997410。2mb,modelinference00b,accounting81006007。7mb〕,byteswanted:2077061194,byteslimit:2040109465,durability:TRANSIENT},status:429}
  ES默认是1G,我这里调整到2G,上面这段话的意思就是说,ES的堆内存满了,放不下新的数据,也就是标准OOM了。重点观察报错日志的几个参数whichislargerthanthelimitof〔20401094651。8gb〕,这里提示的是内存上限,默认是ES内存最大值的95realusage:〔20742843521。9gb〕,这里是当前已使用内存大小newbytesreserved:〔27768422。6mb〕,这里是本次请求需求的内存空间大小datafor〔〕wouldbe〔20770611941。9gb〕,这里指的是已使用内存大小本次请求大小,这个值大于内存上限,因此报错
  此时ES此节点挂掉,查看日志可以得到下面这段。
  〔20230309T17:45:22,338〕〔INFO〕〔o。e。i。b。HierarchyCircuitBreakerService〕〔node2〕attemptingtotriggerG1GCduetohighheapusage〔2049729624〕
  〔20230309T17:45:22,343〕〔INFO〕〔o。e。i。b。HierarchyCircuitBreakerService〕〔node2〕GCdidbringmemoryusagedown,before〔2049729624〕,after〔2029398072〕,allocations〔1〕,duration〔5〕
  〔20230309T17:45:22,332〕〔WARN〕〔o。e。h。AbstractHttpServerTransport〕〔node2〕caughtexceptionwhilehandlingclienthttptraffic,closingconnectionNetty4HttpChannel{localAddress192。168。158。114:9200,remoteAddress172。27。136。31:6096}
  java。lang。Exception:java。lang。OutOfMemoryError:Javaheapspace
  这个解决办法其实很简单,因为调任何参数都不治本了,所以唯一能做的就是加内存,我在测试阶段是加到4G就没有问题了。微醺码头
  代码场景设计实践云雨雪的专栏掘金(juejin。cn),突然发现我这个专栏已经有26人订阅了,系列文章均3K阅读量40赞60收藏。本期也将收录至该专栏,依照传统,必然要登上微醺码头,为大家带来一些散碎的知识点,坐稳了,开船啦!更换节点
  一般分为四种节点类型,分别是主节点、数据节点、客户端节点和混合节点。通过elasticsearch。yml中的node。master:true和node。data:true(默认值)来配置,默认节点是混合节点。主节点(master),配置node。master:true、node。data:false。主要功能是维护元数据,管理集群节点状态,不负责数据写入和查询。此机器相对内存配置要求低一些,但是必须稳定,因为是master,挂了就都没得玩了。数据节点(data),配置node。master:false、node。data:true。主要功能是负责数据写入和查询,因此要求内存必须够大。客户端节点(client),配置node。master:false、node。data:false。主要功能是负责任务分发和结果汇聚,分担数据节点压力,计算用的,内存也得大大大。混合节点(mixed),配置node。master:true、node。data:true。全能冠军,(),就和全栈工程师一样,啥都会但也不会特别厉害,压力一大就容易G了。
  当集群达到一定规模之后,不建议使用mixed,而是应该对各节点进行角色划分,可以按照mixedmasterdatamasterdataclient逐步划分。
  接下来介绍下如何从mixed进行转换,当然我的建议是一开始就要划分出来明确的角色,别到一半再转。以下是从mixed转换到master的步骤,其他同理了。修改elasticsearch。yml中的node。master:true和node。data:false下线该节点,在es安装目录下的bin文件夹下执行。elasticsearchnoderepurpose清除分区数据最后重启该节点就成功
  注意这里有几个问题,首先是在当前节点拥有分片数据的时候是不允许直接设置node。data:false的,会提示以下异常
  我是使用的ES7。9版本,因此官方在报错信息后贴心的提示了可以使用工具www。elastic。coguideenel进行分片信息的删除。
  这里bin目录下执行命令后,会有个确认环节,再次让你判断是否要清除分片数据。这里我的建议是三个,一是手动通过ES的API迁移分片(一次只能迁移一个索引,折磨),二是一开始就弄好节点角色定位避免二次变更,三是所有索引都加上副本。
  从上面这个图中可以看到,当选择清除主节点分片数据后,有副本的索引的数据不会丢,而是将副本升级为分片,体现在未分配分片中。因为我的索引大部分设置为3分片,所以这时候索引大部分状态都是Yellow,小部分没有副本的则是变成了Red。(上面这个图截的不好,应该截索引那个界面的)
  当我重新将master节点变成mixed节点后,未分配的分片会慢慢填充到该节点,节点和集群状态也会转Green,不要害怕,让子弹飞一会儿。Kafka各类型通用生产者配置
  按照功能区分为三种特化型生产者和一种普适生产者。三种特化型分别是高性能(吞吐量)、高可靠性、顺序性。高性能上面说过了,就不复述了。高可靠性提升重试次数和重试时间以及ack设置为all来保障消息发送的可靠性顺序性则使用max。in。flight。requests。per。connection设置为1保证消息不会因为重试而顺序混乱,同时topic设置分区为1或者生产者发送指定分区来协同保证顺序性importcom。xxx。transfer。properties。TransferProperties;importorg。apache。kafka。clients。producer。ProducerConfig;importorg。apache。kafka。common。serialization。StringSerializer;importorg。springframework。beans。factory。annotation。Autowired;importorg。springframework。boot。context。properties。EnableConfigurationProperties;importorg。springframework。context。annotation。Bean;importorg。springframework。context。annotation。Configuration;importorg。springframework。context。annotation。Primary;importorg。springframework。kafka。core。DefaultKafkaProducerFactory;importorg。springframework。kafka。core。KafkaTemplate;importjava。util。HashMap;importjava。util。Map;AuthorWangZYDate202231514:51Description多种生产者配置EnableConfigurationProperties(value{TransferProperties。class})ConfigurationpublicclassKafkaProducerConfig{AutowiredprivateTransferPropertiesprop;authorWangZYdate202271411:38description高性能BeanpublicKafkaTemplateString,StringperformanceKafkaTemplate(){MapString,ObjectpropsnewHashMap(16);props。put(ProducerConfig。BOOTSTRAPSERVERSCONFIG,prop。getKafkaServers());props。put(ProducerConfig。KEYSERIALIZERCLASSCONFIG,StringSerializer。class);props。put(ProducerConfig。VALUESERIALIZERCLASSCONFIG,StringSerializer。class);props。put(ProducerConfig。BATCHSIZECONFIG,163840);props。put(ProducerConfig。LINGERMSCONFIG,20);props。put(ProducerConfig。BUFFERMEMORYCONFIG,67108864);props。put(ProducerConfig。ACKSCONFIG,1);props。put(ProducerConfig。MAXREQUESTSIZECONFIG,10485760);props。put(ProducerConfig。REQUESTTIMEOUTMSCONFIG,60000);props。put(ProducerConfig。COMPRESSIONTYPECONFIG,lz4);config。getInt,自动强转这里不用在意是字符串还是数字props。put(ProducerConfig。RETRIESCONFIG,1);props。put(ProducerConfig。RETRYBACKOFFMSCONFIG,1000);returnnewKafkaTemplate(newDefaultKafkaProducerFactory(props));}authorWangZYdate202271411:40description高可靠性BeanpublicKafkaTemplateString,StringreliableHighKafkaTemplate(){MapString,ObjectpropsnewHashMap(16);props。put(ProducerConfig。BOOTSTRAPSERVERSCONFIG,prop。getKafkaServers());props。put(ProducerConfig。KEYSERIALIZERCLASSCONFIG,StringSerializer。class);props。put(ProducerConfig。VALUESERIALIZERCLASSCONFIG,StringSerializer。class);props。put(ProducerConfig。MAXREQUESTSIZECONFIG,10485760);props。put(ProducerConfig。REQUESTTIMEOUTMSCONFIG,60000);props。put(ProducerConfig。RETRIESCONFIG,3);props。put(ProducerConfig。RETRYBACKOFFMSCONFIG,1000);props。put(ProducerConfig。ACKSCONFIG,1);returnnewKafkaTemplate(newDefaultKafkaProducerFactory(props));}authorWangZYdate202271411:40description顺序性BeanpublicKafkaTemplateString,StringtimeKafkaTemplate(){MapString,ObjectpropsnewHashMap(16);props。put(ProducerConfig。BOOTSTRAPSERVERSCONFIG,prop。getKafkaServers());props。put(ProducerConfig。KEYSERIALIZERCLASSCONFIG,StringSerializer。class);props。put(ProducerConfig。VALUESERIALIZERCLASSCONFIG,StringSerializer。class);props。put(ProducerConfig。MAXREQUESTSIZECONFIG,10485760);props。put(ProducerConfig。REQUESTTIMEOUTMSCONFIG,60000);props。put(ProducerConfig。RETRIESCONFIG,1);props。put(ProducerConfig。RETRYBACKOFFMSCONFIG,1000);props。put(ProducerConfig。MAXINFLIGHTREQUESTSPERCONNECTION,1);returnnewKafkaTemplate(newDefaultKafkaProducerFactory(props));}authorWangZYdate202271411:41description普通PrimaryBeanpublicKafkaTemplateString,StringnormalKafkaTemplate(){MapString,ObjectpropsnewHashMap(16);props。put(ProducerConfig。BOOTSTRAPSERVERSCONFIG,prop。getKafkaServers());props。put(ProducerConfig。KEYSERIALIZERCLASSCONFIG,StringSerializer。class);props。put(ProducerConfig。VALUESERIALIZERCLASSCONFIG,StringSerializer。class);props。put(ProducerConfig。MAXREQUESTSIZECONFIG,10485760);props。put(ProducerConfig。REQUESTTIMEOUTMSCONFIG,60000);props。put(ProducerConfig。RETRIESCONFIG,1);props。put(ProducerConfig。RETRYBACKOFFMSCONFIG,1000);props。put(ProducerConfig。BATCHSIZECONFIG,16384);props。put(ProducerConfig。LINGERMSCONFIG,5);returnnewKafkaTemplate(newDefaultKafkaProducerFactory(props));}}复制代码Kafka各类型通用消费者配置
  这个就相对朴实了,区分比较少,主要是是否自动提交offset的对比,高性能部分也在前面提到过,直接贴代码吧。importcom。xxx。transfer。properties。TransferProperties;importorg。apache。kafka。clients。consumer。ConsumerConfig;importorg。apache。kafka。common。serialization。StringDeserializer;importorg。springframework。beans。factory。annotation。Autowired;importorg。springframework。boot。context。properties。EnableConfigurationProperties;importorg。springframework。context。annotation。Bean;importorg。springframework。context。annotation。Configuration;importorg。springframework。context。annotation。Primary;importorg。springframework。kafka。config。ConcurrentKafkaListenerContainerFactory;importorg。springframework。kafka。core。DefaultKafkaConsumerFactory;importorg。springframework。kafka。listener。ContainerProperties;importorg。springframework。util。StringUtils;importjava。util。HashMap;importjava。util。Map;AuthorWangZYDate202231514:51Description多种消费者配置EnableConfigurationProperties(value{TransferProperties。class})ConfigurationpublicclassKafkaConsumerConfig{AutowiredprivateTransferPropertiesprop;authorWangZYdate202271411:35description高性能BeanpublicConcurrentKafkaListenerContainerFactoryString,StringperformanceFactory(){ConcurrentKafkaListenerContainerFactoryString,StringcontainernewConcurrentKafkaListenerContainerFactory();MapString,ObjectpropsnewHashMap(16);props。put(ConsumerConfig。BOOTSTRAPSERVERSCONFIG,prop。getKafkaServers());props。put(ConsumerConfig。KEYDESERIALIZERCLASSCONFIG,StringDeserializer。class);props。put(ConsumerConfig。VALUEDESERIALIZERCLASSCONFIG,StringDeserializer。class);props。put(ConsumerConfig。GROUPIDCONFIG,StringUtils。isEmpty(prop。getConsumerGroupId())?performanceConsumerGroup:prop。getConsumerGroupId());props。put(ConsumerConfig。FETCHMINBYTESCONFIG,1048576);props。put(ConsumerConfig。MAXPOLLRECORDSCONFIG,1000);props。put(ConsumerConfig。MAXPOLLINTERVALMSCONFIG,60000);container。setConsumerFactory(newDefaultKafkaConsumerFactory(props));container。setConcurrency(3);container。setBatchListener(true);returncontainer;}authorWangZYdate202271411:36description高可靠BeanpublicConcurrentKafkaListenerContainerFactoryString,StringreliableHighFactory(){ConcurrentKafkaListenerContainerFactoryString,StringcontainernewConcurrentKafkaListenerContainerFactory();MapString,ObjectpropsnewHashMap(16);props。put(ConsumerConfig。BOOTSTRAPSERVERSCONFIG,prop。getKafkaServers());props。put(ConsumerConfig。KEYDESERIALIZERCLASSCONFIG,StringDeserializer。class);props。put(ConsumerConfig。VALUEDESERIALIZERCLASSCONFIG,StringDeserializer。class);props。put(ConsumerConfig。GROUPIDCONFIG,StringUtils。isEmpty(prop。getConsumerGroupId())?reliableHighConsumerGroup:prop。getConsumerGroupId());props。put(ConsumerConfig。ENABLEAUTOCOMMITCONFIG,false);props。put(ConsumerConfig。MAXPOLLINTERVALMSCONFIG,60000);container。setConsumerFactory(newDefaultKafkaConsumerFactory(props));container。setConcurrency(3);container。setBatchListener(true);container。getContainerProperties()。setAckMode(ContainerProperties。AckMode。MANUAL);returncontainer;}authorWangZYdate202271411:38description普通PrimaryBeanpublicConcurrentKafkaListenerContainerFactoryString,StringnormalFactory(){ConcurrentKafkaListenerContainerFactoryString,StringcontainernewConcurrentKafkaListenerContainerFactory();MapString,ObjectpropsnewHashMap(16);props。put(ConsumerConfig。BOOTSTRAPSERVERSCONFIG,prop。getKafkaServers());props。put(ConsumerConfig。KEYDESERIALIZERCLASSCONFIG,StringDeserializer。class);props。put(ConsumerConfig。VALUEDESERIALIZERCLASSCONFIG,StringDeserializer。class);props。put(ConsumerConfig。GROUPIDCONFIG,StringUtils。isEmpty(prop。getConsumerGroupId())?normalConsumerGroup:prop。getConsumerGroupId());props。put(ConsumerConfig。MAXPOLLINTERVALMSCONFIG,60000);container。setConsumerFactory(newDefaultKafkaConsumerFactory(props));container。setConcurrency(3);container。setBatchListener(true);returncontainer;}}复制代码写在最后
  在我看来消息积压这个问题场景的解决思路是很简单的,确实没啥好讲的,但同时细节又十分丰富,因此我补充了一些优化知识和实际遇到的问题。解决问题的过程还是蛮有意思的。

三星GalaxyNote20设计预览曝光前置中间打孔屏,后置IT之家5月19日消息三星将在今年下半年推出GalaxyNote20系列新品,现在三星Note20的大概设计预览图曝光,显示采用了前置中间打孔屏幕,后置相机矩阵模组。微博……三星入门新机GalaxyA11正式发布4000mAh电池13IT之家5月16日消息据外媒sammobile消息,三星在泰国官网正式发布了入门新机GalaxyA11,采用InfinityO屏幕设计,售价5199泰铢(约合人民币1151元)……V观财报中梁控股停付境外债务项下所有本息,未还本金约11。8中新经纬11月14日电中梁控股13日晚间公告,为确保公平对待所有境外债务持有人,经慎重及仔细考虑及向公司法律顾问咨询法律意见后,中梁控股决定暂停支付本公司境外债务项下所有应付的……三星中国推保值换新计划S20可半价换购S21感谢IT之家网友半沢直樹的线索投递!IT之家5月13日消息今天三星中国推出了保值换新计划,根据该计划,三星S20系列用户可享受半价折抵换购未来的下一代三星S系列新机。……三星支持MPEG5EVC编解码器,高效播放4K8K视频IT之家5月13日消息除了AV1编解码器外,三星还将在其设备上支持新发布的MPEG5EVC视频编解码器。新的视频编解码器旨在满足媒体和消费电子行业的业务和技术需求。IT之家此前……微软SurfaceDuo安卓手机桌面UI大曝光IT之家1月24日消息2019年10月2日,微软向我们简要介绍了SurfaceDuo安卓双屏设备,这是微软宣布放弃命途多舛的WindowsPhone后的第一款智能手机,虽然微软……三星电子将ExynosAP用于更多低端智能手机产品阵容IT之家4月18日消息,据BusinessKorea报道,三星电子已将Exynos850应用处理器(AP)用于北美市场的GalaxyA13LTE智能手机。GalaxyA13有两……三星滑屏手机专利公布,真正无打孔的真全面屏IT之家4月16日消息,随着折叠屏手机市场开始大放异彩,更多可能的未来产品逐渐浮现。例如早在2020年,OPPO就推出了首款卷轴屏概念手机,看起来非常酷,甚至比可折叠智能手机更……未来世界什么样?中兴通讯早就用全新的5G应用告诉我们了最近几天,中兴通讯的最新年度宣传片一上线,就引发各方关注。特别是在宣传片中,中兴通讯提到的5G云网连接世界万物,一下子让人想到近几年中国在5G技术方面取得的显著发展,每时每刻都……DSCC2022一季度折叠屏智能手机出货222万部同比增长5IT之家6月2日消息,据咨询机构DSCC数据显示,2022年第一季度,可折叠智能手机出货量同比增长571,达到222万部。但与21年第四季度420万部的记录高点相比,下降了47……31。5英寸4K显示器999元!HKC拿出杀手锏,这价格还要对于现在的主流PC设备来说,硬件性能部分已经能够很好的支持高分辨率显示了,而且不管是Windows10还是Windows11,系统层面对于1080p以上分辨率的支持也已经十分友……雷军预热小米12Ultra融合了徕卡影像方面一百多年来所积累感谢IT之家网友王的叔叔的线索投递!IT之家6月4日消息,小米上个月宣布与徕卡达成全球影像战略合作,百年传奇影像与创新科技相融合,一起开启移动影像新时代!首款作品将于7月……
九场比赛,五个问题,F1下半程我们能等来答案吗?本周末,2022赛季F1后半程较量即将拉开帷幕。最后的九场比赛中,有哪些关键的问题值得我们去等待答案呢?F1特约撰稿人、前F1车手乔林帕尔默给出了他的回答。梅赛德斯会保持……儿童牙刷也有国家标准,你选对了吗?众所周知,对牙齿的保护要从小做起工欲善其事必先利其器选择一把好牙刷也十分重要那么如何挑选一把适合孩子使用的牙刷呢?今天我们就跟着儿童牙刷的国家标准……NBA联盟现役高中生球员还有谁?唯有三棵独苗,一人是得分王自2005年NBA联盟禁止高中生球员进入联盟以来,也随着17年过去,原来的绝大多数高中生球员都已经退役,甚至是退役多年,科比,加内特。。。。。。但是我们还能在现役球员中找……8年没喝过汽水吃过披萨,你以为梅西容易?成球王的牺牲超级大2022世界杯文篮郭先生提及梅西,大家都知道,之前的他们,在首战中爆冷不敌沙特,一度引发极大的关注,也让人感叹与唏嘘。难道阿根廷男足就此要陨落了?幸好的是,之后与墨……今日油价2022年9月30日全国最新油价(9295汽油与0号今天的油价将在国庆假期后的10月10日晚调整,目前经过7个工作日的统计,原油变化率5。14,预计下调油价120元吨(0。09元升0。11元升),相互转告,油价有望接着下跌。……2022年2月27日(星期天)雁来湖小镇踏春骑行之旅朋友多了,路好走,本周六携手融创文旅,举行融创雁来湖小镇,踏春骑行之旅2022年2月27日周日早8:30到9:00,老安石路强林加油站集合,9:30准时出发【因迟到者,骑……10年后,价值150万的房子还能值多少钱?马光远和王健林说法买房已经不再赚钱,逐渐成为大家的共识。喋喋不休的楼市现状,已经让大多数人失去了信心,没有了信心,那么楼市才是真正的回归正轨。买房不再是一件投资,而是为了满足自己的居住需求。……再见日本,张本智和无球可打!世界第5被抛弃,樊振东威胁解除北京时间4月4日,日本国内乒乓比赛即将开打,日本乒协也公布相关的主力和替补球员名单,其中,现役日本第一男单选手张本智和没有确定效力的俱乐部,这意味着,张本智和暂时无球可打了。据……职场真抓实干类过渡句金句职场真抓实干类过渡句金句1。再美好的发展蓝图,唯有通过苦干才能变为现实;再正确的战略决策,唯有通过实干才能发挥作用。2。一把手抓是责任、是使命,不容含糊;抓一把手是……迈克尔贝亡命救护车首映主演杰克吉伦哈尔称其令人血脉喷张搜狐娱乐讯(哈麦文)4月19日,环球影业出品,迈克尔贝执导的犯罪动作片《亡命救护车》在北京首映。该片翻拍自2005年同名丹麦电影,讲述一个从阿富汗回来的老兵被兄弟拉下水去……西方国家对俄限价令生效在即,欧盟或将面临柴油危机去年12月5日,七国集团、欧盟及澳大利亚对俄罗斯海运出口原油的价格限制令生效。从今年2月5日开始,西方国家针对俄罗斯石油产品的限价令也将生效。对于西方国家的石油限价等制裁……紧急提醒注意血氧饱和度这是个医学术语,最近逝世的40多名北大清华教授中,好多就是(阳)转(阴)的教授。就是在人体感觉已经恢复,食欲也很强的情况下,他们在没有任何痛苦的情况下而丢掉了性命,其根本……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网