Flink操练(十八)之流处理单词统计
1、pom。xml文件lt;?xmlversion1。0encodingUTF8?projectxmlnshttp:maven。apache。orgPOM4。0。0xmlns:xsihttp:www。w3。org2001XMLSchemainstancexsi:schemaLocationhttp:maven。apache。orgPOM4。0。0http:maven。apache。orgxsdmaven4。0。0。xsdmodelVersion4。0。0modelVersiongroupIdorg。examplegroupIdFlinklearnartifactIdversion1。0SNAPSHOTversionpropertiesflink。version1。13。0flink。versionjava。version1。8java。versionscala。binary。version2。12scala。binary。versionslf4j。version1。7。30slf4j。versionpropertiesdependenciesdependencygroupIdorg。apache。flinkgroupIdflinkjavaartifactIdversion{flink。version}versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkstreamingjava{scala。binary。version}artifactIdversion{flink。version}versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkclients{scala。binary。version}artifactIdversion{flink。version}versiondependencydependencygroupIdorg。apache。flinkgroupIdflinktableapijavabridge{scala。binary。version}artifactIdversion{flink。version}versiondependencydependencygroupIdorg。apache。flinkgroupIdflinktableplannerblink{scala。binary。version}artifactIdversion{flink。version}versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkstreamingscala{scala。binary。version}artifactIdversion{flink。version}versiondependencydependencygroupIdorg。apache。flinkgroupIdflinktablecommonartifactIdversion{flink。version}versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkcep{scala。binary。version}artifactIdversion{flink。version}versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkcsvartifactIdversion{flink。version}versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkconnectorkafka{scala。binary。version}artifactIdversion{flink。version}versiondependencydependencygroupIdorg。apache。bahirgroupIdflinkconnectorredis2。11artifactIdversion1。0versiondependencydependencygroupIdmysqlgroupIdmysqlconnectorjavaartifactIdversion8。0。21versiondependencydependencygroupIdorg。apache。flinkgroupIdflinkconnectorjdbc{scala。binary。version}artifactIdversion{flink。version}versiondependencydependencygroupIdorg。slf4jgroupIdslf4jlog4j12artifactIdversion{slf4j。version}versiondependencydependencygroupIdorg。apache。logging。log4jgroupIdlog4jtoslf4jartifactIdversion2。14。0versiondependencydependenciesbuildpluginsplugingroupIdorg。apache。maven。pluginsgroupIdmavenassemblypluginartifactIdversion3。3。0versionconfigurationdescriptorRefsdescriptorRefjarwithdependenciesdescriptorRefdescriptorRefsconfigurationexecutionsexecutionidmakeassemblyidphasepackagephasegoalsgoalsinglegoalgoalsexecutionexecutionspluginplugingroupIdorg。apache。maven。pluginsgroupIdmavencompilerpluginartifactIdconfigurationsource8sourcetarget8targetconfigurationpluginpluginsbuildproject2、代码实现packageone;importorg。apache。flink。api。common。functions。FlatMapFunction;importorg。apache。flink。api。common。functions。ReduceFunction;importorg。apache。flink。api。java。functions。KeySelector;importorg。apache。flink。streaming。api。datastream。DataStream;importorg。apache。flink。streaming。api。datastream。DataStreamSource;importorg。apache。flink。streaming。api。datastream。KeyedStream;importorg。apache。flink。streaming。api。datastream。SingleOutputStreamOperator;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;importorg。apache。flink。util。Collector;program:Flinklearndescription:从socket读取数据然后处理author:Mr。逗create:2021091415:20publicclassExample1{publicstaticvoidmain(String〔〕args){获取流处理的运行时环境StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();设置并行度为1env。setParallelism(1);读取数据源先在终端启动nclk9999DataStreamSourceStringsourceenv。socketTextStream(172。17。0。50,9999);map操作这里使用的flatMap方法map:针对流中的每一个元素,输出一个元素flatMap:针对流中的每一个元素,输出0个,1个或者多个元素DataStreamWordWithCountmappedStreamsource输入泛型:String;输出泛型:WordWithCount。flatMap(newFlatMapFunctionString,WordWithCount(){OverridepublicvoidflatMap(Stringv,CollectorWordWithCountout)throwsException{String〔〕wordsv。split(,);使用collect方法向下游发送数据for(Stringword:words){out。collect(newWordWithCount(word,1L));}}});分组shuffleKeyedStreamWordWithCount,StringkeyedStreammappedStream。keyBy(newKeySelectorWordWithCount,String(){OverridepublicStringgetKey(WordWithCountv)throwsException{returnv。word;}});reduce操作reduce会维护一个累加器第一条数据到来,作为累加器输出第二条数据到来,和累加器进行聚合操作,然后输出累加器累加器和流中元素的类型是一样的SingleOutputStreamOperatorWordWithCountreducekeyedStream。reduce(newReduceFunctionWordWithCount(){OverridepublicWordWithCountreduce(WordWithCountv1,WordWithCountv2)throwsException{returnnewWordWithCount(v1。word,v1。countv2。count);}});输出reduce。print();StringnameExample1。class。getName();执行程序try{env。execute(name);}catch(Exceptione){e。printStackTrace();}}POJO类1。必须是公有类2。所有字段必须是public3。必须有空构造器模拟了caseclasspublicstaticclassWordWithCount{publicStringword;publicLongcount;publicWordWithCount(){}publicWordWithCount(Stringword,Longcount){this。wordword;this。countcount;}OverridepublicStringtoString(){returnWordWithCount{wordword,countcount};}}}3、终端输入nclk9999〔xgsUserfraud50〕nclk9999a,ba,ca,da,a4、结果展示WordWithCount{worda,count1}WordWithCount{wordb,count1}WordWithCount{worda,count1}WordWithCount{wordc,count1}WordWithCount{worda,count2}WordWithCount{wordd,count1}WordWithCount{worda,count3}WordWithCount{worda,count4}