游戏电视苹果数码历史美丽
投稿投诉
美丽时装
彩妆资讯
历史明星
乐活安卓
数码常识
驾车健康
苹果问答
网络发型
电视车载
室内电影
游戏科学
音乐整形

Kafka基于Windows的Kafka有关环境搭建以及使用

  前言:基于Windows系统下的Kafka环境搭建;以及使用。NET6环境进行开发简单的生产者与消费者的演示。
  一、环境部署
  Kafka是使用Java语言和Scala语言开发的,所以需要有对应的Java环境,以及Scala语言环境。
  Java环境配置,如果不清楚的,可以查看鄙人的另一篇博客:
  https:www。cnblogs。comweskynetp14852471。html
  https:www。scalalang。orgdownloadscala2。html
  要选择Binaries版本的环境,否则需要自己编译:
  2、Kafka基于Zookeeper环境运行,zookeeper提供给kafka一系列的功能支持,所以还需要安装Zookeeper有关的环境。下载zookeeper地址:
  https:zookeeper。apache。orgreleases。htmldownload
  3、同样,Zookeeper也需要下载带bin的链接,没有带bin的链接,可能是源码,需要自己编译:
  https:kafka。apache。orgdownloads。html
  5、同样需要选择下载binary版本,然后根据scala的版本选择对应的版本。
  6、下载的三个安装包,如图所示:
  7、先安装Scala语言包环境:
  8、验证Scala语言包是否安装成功:
  控制台窗口,输入:scalaversion
  如果提示类似如下有关版本信息,则代表安装成功。
  9、然后是安装zookeeper环境。必须先启动zookeeper,才可以使用kafka。
  安装zookeeper环境,先解压下载的包,然后在解压后的目录下新增data文件夹
  10、然后复制data文件夹的绝对路径,备用。在conf文件夹下,编辑cfg文件
  11、在cfg文件内,修改dataDir指定为上面新建的data文件夹的绝对路径。注意路径是斜杠,如果要使用反斜杆,需要写双反斜杠
  12、也要更改cfg格式的文件名称为zoo。cfg否则zookeeper无法识别配置文件。Zoo。cfg文件是zookeeper启动时候自动关联的默认配置文件名称。
  13、然后新建环境变量ZOOKEEPERHOME:
  14、环境变量path新增:ZOOKEEPERHOMEbin
  15、启动zookeeper,直接任意打开控制台,输入zkServer
  16、如果都没有报错,一般是启动成功了的。再次验证下,可以任意开个控制台,输入JPS进行查看,如下图所示,有JPS、也有QuorumPeerMain,代表zookeeper启动成功了。
  17、Kafka环境安装。先解压,然后在解压后的目录下,新增logs文件夹
  18、然后在Config文件夹下,修改server。properties文件,修改log。dirs的值为新增的logs文件夹的绝对路径
  19、进入到解压后的kafka目录下,在路径栏输入cmd,快速打开当前文件夹下的控制台窗口:
  20、输入命令:
  。binwindowskafkaserverstart。bat。configserver。properties
  进行启动Kafka服务:
  21、启动Kafka报错了,可能是版本问题,kafka一般新版本对windows环境不友好,所以降级一下。此处我把kafka3。0降级为2。8:
  22、此处我下载的版本为2。132。8。1,各位大佬们可以按照自己意愿选择版本。可能2。x版本和3。x版本跨度比较大,所以3。0版本没法玩。
  23、然后是重复以上配置kafka有关的动作,修改有关配置文件以及新增logs文件夹等。此处省略。
  24、接着在低版本的kafka目录下,快速进入当前解压缩的目录下,再次输入有关命令尝试一下:
  25、没有提示错误,根据提示信息,代表是启动成功了。任意打开控制台,再输入JPS查看下,可以看到Kafka,确认是启动OK了。
  https:www。kafkatool。comdownload。html
  27、安装可视化工具,默认可以一直下一步:
  28、可以在安装目录下把可执行程序发送到桌面快捷方式,方便打开。
  29、一些配置,包括名称、kafka版本、端口号、服务地址等
  30、连接以后的效果图,如下。Topic是空的,接下来写点代码。
  二、代码开发与测试
  31、新建类库项目,当作kafka服务类库
  32、此处选择标准库2。1,用于可以给多种。netcore版本使用,方便兼容。
  33、引用Confluent。Kafka包。
  34、此处新增发布服务类和订阅服务类:
  35、新增的生产者发布服务方法代码如下:
  代码:
  Description:Kafka生产者发布服务
  CreateTime:202212119:35:27
  Author:Wesky
  summary
  publicclassPublishService:IPublishService
  {
  publicasyncTaskPublishAsync(stringbroker,stringtopicName,TMessagemessage)whereTMessage:class
  {
  varconfignewProducerConfig
  {
  BootstrapServersbroker,kafka服务集群,例如192。168。0。1:9092,192。168。0。2:9092或者单机192。168。0。1:9092
  AcksAcks。All,
  MessageSendMaxRetries3,发送失败重试的次数
  };
  using(varproducernewProducerBuilderspanstylefontsize:inherit;lineheight:1。5;color:rgb(0,0,255);stringspan,string(config)。Build())
  {
  try
  {
  stringdataNewtonsoft。Json。JsonConvert。SerializeObject(message);
  varsendDatanewMessagespanstylefontsize:inherit;lineheight:1。5;color:rgb(0,0,255);stringspan,string{KeyGuid。NewGuid()。ToString(N),Valuedata};
  varreportawaitproducer。ProduceAsync(topicName,sendData);
  Console。WriteLine(消息:{data}r发送到:{report。TopicPartitionOffset});
  }
  catch(ProduceExceptionspanstylefontsize:inherit;lineheight:1。5;color:rgb(0,0,255);stringspan,stringex)
  {
  Console。WriteLine(消息发送失败:rCode{ex。Error。Code}rError{ex。Message});
  }
  }
  }
  }
  36、新增的消费者接收服务方法代码如下:
  代码:
  Description:kafka消费者订阅服务
  CreateTime:202212119:36:25
  Author:Wesky
  summary
  publicclassSubscribeService:ISubscribeService
  {
  消费者服务核心代码
  summary
  消费者配置信息param
  主题集合param
  param
  param
  publicasyncTaskSubscribeAsync(ConsumerConfigconfig,IEnumerablespanstylefontsize:inherit;lineheight:1。5;color:rgb(0,0,255);stringspantopics,Actionfunc,CancellationTokencancellationToken)whereTMessage:class
  {
  constintcommitPeriod1;
  using(varconsumernewConsumerBuilderIgnore,string(config)
  。SetErrorHandler((,e)
  {
  Console。WriteLine(消费错误:{e。Reason});
  })
  。SetStatisticsHandler((,json)
  {
  Console。WriteLine();
  })
  。SetPartitionsAssignedHandler((c,partitionList)
  {
  stringpartitionsstring。Join(,,partitionList);
  Console。WriteLine(分配的分区:{partitions});
  })
  。SetPartitionsRevokedHandler((c,partitionList)
  {
  stringpartitionsstring。Join(,,partitionList);
  Console。WriteLine(回收的分区:{partitions});
  })
  。Build())
  {
  consumer。Subscribe(topics);
  try
  {
  while(true)
  {
  try
  {
  varconsumeResultconsumer。Consume(cancellationToken);
  if(consumeResult。IsPartitionEOF)
  {
  continue;
  }
  if(consumeResult?。OffsetcommitPeriod0){
  try
  {
  varresultJsonConvert。DeserializeObject(consumeResult。Message?。Value);
  func(result);消费消息
  }
  catch(Exceptionex)
  {
  Console。WriteLine(消费业务处理失败:{ex。Message});
  }
  try
  {
  consumer。Commit(consumeResult);手动提交
  Console。WriteLine(消费者消费完成,已提交);
  }
  catch(KafkaExceptione)
  {
  Console。WriteLine(提交错误:{e。Error。Reason});
  }
  }
  }
  catch(ConsumeExceptione)
  {
  Console。WriteLine(消费错误:{e。Error。Reason});
  }
  }
  }
  catch(Exceptione)
  {
  Console。WriteLine(其他错误:{e。Message});
  consumer。Close();
  }
  }
  awaitTask。CompletedTask;
  }
  }pre
  37、并且提供对应的接口服务,用于开放给外部调用,或者提供依赖注入使用:
  38、新建一个控制台项目,用来当作消费者端的测试,并且新增一个方法,用来当作消费者接收到消息以后的业务处理方法体。此处控制台环境版本为。NET6
  39、消费客户端代码如下。其中,BootstrapServers也可以提供集群地址,例如ip1:port,ip2:port服务之间以半角逗号隔开。
  40、再新增一个webapi项目,用来当作生产者的客户端进行发送数据。以及对kafka服务类部分进行依赖注入注册,此处使用单例。该webapi此处使用。NET6环境,带有控制器的模式。
  41、新增的控制器里面,进行生产者的注入与实现。注意:topicName参数对应上边的topicwesky,通过主题绑定,否则消费者不认识就没办法消费到了。
  控制器代码:
  〔Route(api〔controller〕〔action〕)〕
  〔ApiController〕
  publicclassProducerController:ControllerBase
  {
  IPublishServiceservice;
  publicProducerController(IPublishServicepublishService)
  {
  servicepublishService;
  }
  〔HttpPost〕
  publicIActionResultSendMessage(stringbroker,stringtopicName,stringmessage)
  {
  service。PublishAsync(broker,topicName,message);
  returnOk();
  }
  }
  42、接下来是一些测试,如图所示:
  43、最后,使用可视化管理工具Offset进行查看,可以看到对应的主题。选中主题,可以设置数据类型,这里我设置为字符串,就可以查看到对应的消息内容了。如果没有设置,默认是16进制的数据。
  44、查看刚刚测试时候收发的消息队列里面的数据,如下所示:
  45、一些额外补充:
  Kafka也是消息队列的一种,用于在高吞吐量场景下使用比较适合。如果是轻量级的,只需要用于削峰,可以使用RabbitMQ。
  以上只是简单的操作演示,至于要用得溜,观众朋友们可以自行补充所需的相关理论知识。
  可视化工具还有一款yahoo提供的开源的工具,叫kafkamanager,有兴趣的大佬们可以自行玩玩,开源地址:
  https:github。comyahooCMAK
  还有一款滴滴平台做的开源的kafka运维管理平台,有兴趣的大佬们也可以自行了解,地址:
  https:github。comdidiLogiKM
  以上就是该博客的全部内容,感谢各位大佬们的观看

冬奥奇闻时来运转天无奈,倒数第一摘金牌《奇闻》在冬奥速滑预赛中,澳大利亚选手柏利在最后一圈还是倒数第一,冲线时前二名选手互相碰撞摔出赛道,柏利以第二名进入决赛。决赛时从起跑到全程他还是倒数第一,但到最后……指南针和俺搜的故事进入大数据时代,数字经济与实体经济深度融合,数据不但成为重要的生产要素,也成为贯彻新发展理念、构建新发展格局的必然选择。成立于2015年的俺搜就已搭上数字经济的东风,立足于化塑……员工守则竞赛演讲稿范文尊敬的各位领导、各位评委、各位同仁:大家下午好!我是来自三元支行的张珂,我演讲的题目是《修心与修行》。我们每一位员工的行为不仅直接影响农行的对外形象,更是关系农行能……工厂转正个人自我鉴定三篇篇一透过实习,我们既对中发厂的安全管理有一个初步了解,对化工行业的特殊性有一个初步认识,加强安全生产意识及自我防护的潜质,又能了解中发化工厂的生产状况和工艺流程,对所学专……关于员工责任心的心得3篇所谓责任心,就是一个人对自己的所作所为负责,是对他人、对集体、对企业、对社会承担责任和履行义务的自觉态度。下面是品学网带来的关于员工责任心的心得,希望大家喜欢。篇一:关于……初三学生寒假学习计划寒假过后,初三学生即将迎来人生第一个转折点中考!上一所好的高中,对未来考上一所好的大学非常有帮助,所以在寒假,同学们不要浪费时间,好好复习。具体的学习计划安排参考如下:篇一:初……小学暑假教师集训动员会领导讲话发言稿老师们:根据教育局统一安排部署,2010年暑假教师集训今天正式开始。本次集训的目的是通过学习交流和反思讨论,提升师德师风水平,增强法律法规意识,切实转换思想,更新观念,增……工作表扬信范文3篇对一些好人好事或是有传播意义的事迹进行表彰,表扬信主要用于作者在日常工作、生活中受益于被表扬者的高尚品行(或被其品行所感动),特向被表扬者所在单位或其上级领导致信,以期使其受到……个人向公司借款合同实用版我们有很多人没有钱会选择和公司借款,今天小编就给大家分享一下借款合同,但是也要签合同哦简单个人向公司借款合同合同编号:出借方(甲方)投资有限公司法定代表……办公室主任述职述廉报告精选三篇【篇一】时间过得真快,回顾过去,对自己所从事的工作加以梳理、盘点和反思,这既是一种自我激励和加压,也是一种理性的思考,更是接受领导指导、同仁相助的极好机会。我,自2……如何招待外国人当今社会,我们在工作中,遇到外国人已经是经常的事情,如何招待外国人?下面是品学网小编搜集整理的一些内容,希望对你有帮助。招待外国人的方法1。客人来到公司的时候,我们带他到……商务活动中的西方就餐礼仪和禁忌随着我国改革开放的迅速发展,我们与国际间的交流越来越多,商务活动中的一些礼仪,特别是就餐礼仪越来越重要。下面是小编给大家搜集整理的商务活动中的西方就餐礼仪和禁忌文章内容。……
即将复航!时刻表来了为满足广大旅客的赴日本出行需求,南航计划在3月26日夏秋航季开启后,恢复哈尔滨东京、哈尔滨大阪航线。这是继3月15日恢复哈尔滨首尔国际直达航线后,南航再次在哈尔滨市场投放运力,……内心话,DK胜率肯定高点37开吧(一)四强赛EDG坚强取胜刚刚结束的四强比赛中,我们可以看到哪怕一片唱衰,连自己赛区的观众都不看好自己的情况下,EDG还是艰难取胜jiejie的皇子第二第三局……感恩节搞怪创意祝福短信1、今天是感恩节,别忘了我的好。经济这么紧张,我还勒紧裤带花费一毛钱的巨资给你发来短信,为你祝福。感恩节快乐!2、天空为感谢大地而变蓝,花朵为感谢雨露而娇艳,赤子为感谢母……关于暑期家乡养鸡社会实践报告在现代社会主义新农村,种地现在已经不是农民的唯一收入来源了。许多人选择外出打工,也有很多人在政策的鼓励下自己创办个体经营。像一些养家禽、家畜的,养貂的专业户等,还有许多种树的,……2021双拥工作总结及2022工作计划三篇篇一20xx年,我局双拥工作在区委、区政府正确领导下,按照区双拥办的安排部署,认真开展各项双拥工作,为巩固双拥成果,增进军民团结,促进双拥创建作出了积极的贡献,较好地完成……法院系统思想作风整顿心得体会学习体会人民法院作为国家的审判机关,其执法水平如何,直接影响着整个政法机关的执法形象。在执法过程中,如果不能体现公正司法、一心为民的宗旨,不仅损害政法机关的形象,而且损害党和国家的形象……听见颜色的女孩读后感范文《听见颜色的女孩》是接力出版社出版的图书,作者是莎朗德蕾珀。在读完这本书之后,相信很多人一定会有不一样的感受。下面是小编整理的听见颜色的女孩读后感范文,欢迎阅读参考!听见颜色的……关于煤矿隐患排查治理行动情况工作汇报下面是品学网小编精心为大家整理收集的关于煤矿隐患排查治理行动情况工作汇报,供大家阅读参考。篇一:自去年11月开展煤矿隐患排查治理行动以来,在市委、市政府的正确领导下……最新医务人员思想工作总结医务人员是我们健康的看护者,是关乎发展顺利与否的核心,医务人员的总结便是对这方面的认识和思考,以及完善。以下是小编整理的医务人员思想工作总结,请参考,上CNFLA,发现学习。……公司租房协议合同模板公司的租房协议很重要,下面小编整理了公司租房协议合同模板,欢迎阅读!公司租房协议合同模板甲方(出租方):乙方(承租方):甲、乙双方就租用甲方房屋一事:经……寒假法院社会实践报告三篇篇一这个寒假在xx法院进行了一个月的实习,最让我印象深刻,或者说是受益匪浅的,就是对于沟通技能的锻炼。与之前暑期在联通的实习经验不同,在联通时由于很多是技术层面的活,所以……副园长个人述职报告范本三篇【篇一】20xx年对幼儿园及我本人而言,是不寻常的一年。六月底,在幼儿园新老管理班子顺利交接的同时,自工作以来,我第一次走上了管理者岗位,担任幼儿园副园长一职,主管教务、……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网