我们使用ApacheFlume来采集数据到Kafka中进行存储,最后在ELK中展示出来。到http:flume。apache。org的地址下载ApacheFlume,下载后部署在日志的服务器。下载后进行解压以及配置到环境变量中。整体思路是在拉勾网搜索测试开发工程师,把获取到的结果信息存储到Kafka的系统中,最后展示在ELK中。下面具体配置这些信息。在conf的目录下编辑文件,文件内容为:设置代理名agent。sourcess1agent。channelsc1agent。sinksk1设置收集方式agent。sources。s1。typeexecagent。sources。s1。commandtailFApplicationsdevOpsbigDataELKapacheflumelogsapps。logagant。sources。s1。channelsc1agent。channels。c1。typememoryagent。channels。c1。capacity10000apage。channels。c1。transactionCapacity100设置kafka接收器agent。sinks。k1。typeorg。apache。flume。sink。kafka。KafkaSink设置kafka的broker和端口号agent。sinks。k1。brokerListlocalhost:9092设置kafka的topicagent。sinks。k1。topiclaGou设置序列化agent。sinks。k1。serializer。classkafka。serializer。StringEncoder指定管道名agent。sinks。k1。channelc1 这里使用的主题是laGou,切记此时需要启动Kafka。下来启动ApacheFlume,在apacheflumebin的执行如下命令来启动,命令为:flumengagentnagentconfconfconffile。。confflumekafka。propertiesDflume。root。loggerDEBUG,CONSOLE 执行后,输出如下的信息: 下来使用分流数据的方式来实现数据的展示,具体可以理解为把采集到的数据存储到Kafka系统中,然后使用LogStash来消费Kafka存储的数据,并将消费后的数据存储到ElasticSearch中。下来配置logstash。yml的文件,配置LogStash账户和密码,具体如下: 配置kafkalaGou。conf,具体内容为: 配置完成后,在控制台中LogStach来消费Kafka集群中主题为laGou的数据,到LogStash的bin目录下执行:。logstashf。。configkafkalaGou。conf 执行后,LogStash的Agent将正常启动并消费Kafka集群中的数据,然后把消费后的数据存储到ElasticSearch集群中,执行后,输出如下信息:SendingLogstashslogstoApplicationsdevOpsbigDataELKlogstashlogswhichisnowconfiguredvialog4j2。properties〔20210612T18:39:43,175〕〔WARN〕〔logstash。config。source。multilocal〕Ignoringthepipelines。ymlfilebecausemodulesorcommandlineoptionsarespecified〔20210612T18:39:43,210〕〔FATAL〕〔logstash。runner〕Logstashcouldnotbestartedbecausethereisalreadyanotherinstanceusingtheconfigureddatadirectory。Ifyouwishtorunmultipleinstances,youmustchangethepath。datasetting。〔20210612T18:39:43,221〕〔ERROR〕〔org。logstash。Logstash〕java。lang。IllegalStateException:Logstashstoppedprocessingbecauseofanerror:(SystemExit)exitlocalhost:binliwangpingclearlocalhost:binliwangping。logstashf。。configkafkalaGou。confSendingLogstashslogstoApplicationsdevOpsbigDataELKlogstashlogswhichisnowconfiguredvialog4j2。properties〔20210612T18:40:31,712〕〔WARN〕〔logstash。config。source。multilocal〕Ignoringthepipelines。ymlfilebecausemodulesorcommandlineoptionsarespecified〔20210612T18:40:32,136〕〔INFO〕〔logstash。runner〕StartingLogstash{logstash。version6。3。2}〔20210612T18:40:33,674〕〔INFO〕〔logstash。pipeline〕Startingpipeline{:pipelineidmain,pipeline。workers8,pipeline。batch。size125,pipeline。batch。delay50}〔20210612T18:40:34,092〕〔INFO〕〔logstash。outputs。elasticsearch〕ElasticsearchpoolURLsupdated{:changes{:removed〔〕,:added〔http:elastic:xxxxxxlocalhost:9200〕}}〔20210612T18:40:34,111〕〔INFO〕〔logstash。outputs。elasticsearch〕RunninghealthchecktoseeifanElasticsearchconnectionisworking{:healthcheckurlhttp:elastic:xxxxxxlocalhost:9200,:path}〔20210612T18:40:34,426〕〔WARN〕〔logstash。outputs。elasticsearch〕RestoredconnectiontoESinstance{:urlhttp:elastic:xxxxxxlocalhost:9200}〔20210612T18:40:34,505〕〔INFO〕〔logstash。outputs。elasticsearch〕ESOutputversiondetermined{:esversion6}〔20210612T18:40:34,508〕〔WARN〕〔logstash。outputs。elasticsearch〕Detecteda6。xandabovecluster:thetypeeventfieldwontbeusedtodeterminethedocumenttype{:esversion6}〔20210612T18:40:34,528〕〔INFO〕〔logstash。outputs。elasticsearch〕NewElasticsearchoutput{:classLogStash::Outputs::ElasticSearch,:hosts〔localhost:9200〕}〔20210612T18:40:34,544〕〔INFO〕〔logstash。outputs。elasticsearch〕Usingmappingtemplatefrom{:pathnil}〔20210612T18:40:34,561〕〔INFO〕〔logstash。outputs。elasticsearch〕Attemptingtoinstalltemplate{:managetemplate{templatelogstash,version60001,settings{index。refreshinterval5s},mappings{default{dynamictemplates〔{messagefield{pathmatchmessage,matchmappingtypestring,mapping{typetext,normsfalse}}},{stringfields{match,matchmappingtypestring,mapping{typetext,normsfalse,fields{keyword{typekeyword,ignoreabove256}}}}}〕,properties{timestamp{typedate},version{typekeyword},geoip{dynamictrue,properties{ip{typeip},location{typegeopoint},latitude{typehalffloat},longitude{typehalffloat}}}}}}}}〔20210612T18:40:34,584〕〔INFO〕〔logstash。pipeline〕Pipelinestartedsuccessfully{:pipelineidmain,:threadThread:0x53f09319run}〔20210612T18:40:34,670〕〔INFO〕〔logstash。outputs。elasticsearch〕Installingelasticsearchtemplatetotemplatelogstash〔20210612T18:40:34,676〕〔INFO〕〔logstash。agent〕Pipelinesrunning{:count1,:runningpipelines〔:main〕,:nonrunningpipelines〔〕}〔20210612T18:40:34,691〕〔INFO〕〔org。apache。kafka。clients。consumer。ConsumerConfig〕ConsumerConfigvalues:auto。commit。interval。ms5000auto。offset。resetlatestbootstrap。servers〔localhost:9092〕check。crcstrueclient。idlogstash0connections。max。idle。ms540000enable。auto。committrueexclude。internal。topicstruefetch。max。bytes52428800fetch。max。wait。ms500fetch。min。bytes1group。idconsoleconsumer83756heartbeat。interval。ms3000interceptor。classes〔〕internal。leave。group。on。closetrueisolation。levelreaduncommittedkey。deserializerclassorg。apache。kafka。common。serialization。StringDeserializermax。partition。fetch。bytes1048576max。poll。interval。ms300000max。poll。records500metadata。max。age。ms300000metric。reporters〔〕metrics。num。samples2metrics。recording。levelINFOmetrics。sample。window。ms30000partition。assignment。strategy〔classorg。apache。kafka。clients。consumer。RangeAssignor〕receive。buffer。bytes65536reconnect。backoff。max。ms1000reconnect。backoff。ms50request。timeout。ms305000retry。backoff。ms100sasl。jaas。confignullsasl。kerberos。kinit。cmdusrbinkinitsasl。kerberos。min。time。before。relogin60000sasl。kerberos。service。namenullsasl。kerberos。ticket。renew。jitter0。05sasl。kerberos。ticket。renew。window。factor0。8sasl。mechanismGSSAPIsecurity。protocolPLAINTEXTsend。buffer。bytes131072session。timeout。ms10000ssl。cipher。suitesnullssl。enabled。protocols〔TLSv1。2,TLSv1。1,TLSv1〕ssl。endpoint。identification。algorithmnullssl。key。passwordnullssl。keymanager。algorithmSunX509ssl。keystore。locationnullssl。keystore。passwordnullssl。keystore。typeJKSssl。protocolTLSssl。providernullssl。secure。random。implementationnullssl。trustmanager。algorithmPKIXssl。truststore。locationnullssl。truststore。passwordnullssl。truststore。typeJKSvalue。deserializerclassorg。apache。kafka。common。serialization。StringDeserializer〔20210612T18:40:34,797〕〔INFO〕〔org。apache。kafka。common。utils。AppInfoParser〕Kafkaversion:1。1。0〔20210612T18:40:34,798〕〔INFO〕〔org。apache。kafka。common。utils。AppInfoParser〕KafkacommitId:fdcf75ea326b8e07〔20210612T18:40:35,011〕〔INFO〕〔org。apache。kafka。clients。Metadata〕ClusterID:E0qvXyuTWrvZgZUV80w〔20210612T18:40:35,024〕〔INFO〕〔org。apache。kafka。clients。consumer。internals。AbstractCoordinator〕〔ConsumerclientIdlogstash0,groupIdconsoleconsumer83756〕Discoveredgroupcoordinatorlocalhost:9092(id:2147483647rack:null)〔20210612T18:40:35,029〕〔INFO〕〔org。apache。kafka。clients。consumer。internals。ConsumerCoordinator〕〔ConsumerclientIdlogstash0,groupIdconsoleconsumer83756〕Revokingpreviouslyassignedpartitions〔〕〔20210612T18:40:35,029〕〔INFO〕〔org。apache。kafka。clients。consumer。internals。AbstractCoordinator〕〔ConsumerclientIdlogstash0,groupIdconsoleconsumer83756〕(Re)joininggroup〔20210612T18:40:35,047〕〔INFO〕〔logstash。agent〕SuccessfullystartedLogstashAPIendpoint{:port9600}〔20210612T18:40:35,149〕〔INFO〕〔org。apache。kafka。clients。consumer。internals。AbstractCoordinator〕〔ConsumerclientIdlogstash0,groupIdconsoleconsumer83756〕Successfullyjoinedgroupwithgeneration1〔20210612T18:40:35,151〕〔INFO〕〔org。apache。kafka。clients。consumer。internals。ConsumerCoordinator〕〔ConsumerclientIdlogstash0,groupIdconsoleconsumer83756〕Settingnewlyassignedpartitions〔laGou0,laGou1,laGou2,laGou3,laGou4,laGou5〕〔20210612T18:40:35,168〕〔INFO〕〔org。apache。kafka。clients。consumer。internals。Fetcher〕〔ConsumerclientIdlogstash0,groupIdconsoleconsumer83756〕ResettingoffsetforpartitionlaGou0tooffset1。〔20210612T18:40:35,169〕〔INFO〕〔org。apache。kafka。clients。consumer。internals。Fetcher〕〔ConsumerclientIdlogstash0,groupIdconsoleconsumer83756〕ResettingoffsetforpartitionlaGou1tooffset1。〔20210612T18:40:35,169〕〔INFO〕〔org。apache。kafka。clients。consumer。internals。Fetcher〕〔ConsumerclientIdlogstash0,groupIdconsoleconsumer83756〕ResettingoffsetforpartitionlaGou2tooffset1。〔20210612T18:40:35,169〕〔INFO〕〔org。apache。kafka。clients。consumer。internals。Fetcher〕〔ConsumerclientIdlogstash0,groupIdconsoleconsumer83756〕ResettingoffsetforpartitionlaGou3tooffset1。〔20210612T18:40:35,169〕〔INFO〕〔org。apache。kafka。clients。consumer。internals。Fetcher〕〔ConsumerclientIdlogstash0,groupIdconsoleconsumer83756〕ResettingoffsetforpartitionlaGou4tooffset1。〔20210612T18:40:35,169〕〔INFO〕〔org。apache。kafka。clients。consumer。internals。Fetcher〕〔ConsumerclientIdlogstash0,groupIdconsoleconsumer83756〕ResettingoffsetforpartitionlaGou5tooffset0。 此时,在Kafka的监控系统中可以看到主题laGou消费的详细信息,如下所示: 下来实现数据的可视化,把数据存储到ElasticSearch的集群后,就可以通过Kibana来查询和分析数据。在ManageMent里面创建索引后,点击Discover模块,然后就会展示消费到的拉勾网的测试开发职位的数据,如下所示: 可以使用不同的索引来查询,比如使用message来查询,就会显示如下的信息: 当然也可以点击查看完整的数据,点击向右的箭头,就可以使用table格式和JSON格式来展示具体的数据。 感谢您的阅读和关注,后续会持续更新!