Flink操练(二十)之并行度使用讲解
1、代码实现逻辑packageone;importorg。apache。flink。api。common。functions。FlatMapFunction;importorg。apache。flink。api。common。functions。ReduceFunction;importorg。apache。flink。api。java。functions。KeySelector;importorg。apache。flink。streaming。api。datastream。DataStreamSource;importorg。apache。flink。streaming。api。datastream。KeyedStream;importorg。apache。flink。streaming。api。datastream。SingleOutputStreamOperator;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;importorg。apache。flink。util。Collector;program:Flinklearndescription:并行度的设置针对每个算子设置的并行度的优先级高于全局并行度本程序需要两个任务插槽author:Mr。逗create:2021091415:40publicclassExample3{publicstaticvoidmain(String〔〕args){获取流处理的运行时环境StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();设置并行任务的数量为1需要1个任务插槽env。setParallelism(1);读取数据源并行度设置为1DataStreamSourceStringstreamenv。fromElements(helloworld,helloworld)。setParallelism(1);map操作这里使用的flatMap方法map:针对流中的每一个元素,输出一个元素flatMap:针对流中的每一个元素,输出0个,1个或者多个元素并行度设置为2SingleOutputStreamOperatorWordWithCountmappedStreamstream输入泛型:String;输出泛型:WordWithCount。flatMap(newFlatMapFunctionString,WordWithCount(){OverridepublicvoidflatMap(Stringv,CollectorWordWithCountout)throwsException{String〔〕wordsv。split();for(Stringw:words){使用collect方法向下游发送数据out。collect(newWordWithCount(w,1L));}}})。setParallelism(2);分组shuffle第一个泛型:流中元素的泛型第二个泛型:key的泛型KeyedStreamWordWithCount,StringkeyedStreammappedStream。keyBy(newKeySelectorWordWithCount,String(){OverridepublicStringgetKey(WordWithCountv)throwsException{returnv。word;}});reduce操作reduce会维护一个累加器第一条数据到来,作为累加器输出第二条数据到来,和累加器进行聚合操作,然后输出累加器累加器和流中元素的类型是一样的SingleOutputStreamOperatorWordWithCountreducekeyedStream。reduce(newReduceFunctionWordWithCount(){OverridepublicWordWithCountreduce(WordWithCountv1,WordWithCountv2)throwsException{returnnewWordWithCount(v1。word,v1。countv2。count);}});输出reduce。print();StringnameExample3。class。getName();try{env。execute(name);}catch(Exceptione){e。printStackTrace();}}POJO类1。必须是公有类2。所有字段必须是public3。必须有空构造器模拟了caseclasspublicstaticclassWordWithCount{publicStringword;publicLongcount;publicWordWithCount(){}publicWordWithCount(Stringword,Longcount){this。wordword;this。countcount;}OverridepublicStringtoString(){returnWordWithCount{wordword,countcount};}}}2、结果展示WordWithCount{wordhello,count1}WordWithCount{wordworld,count1}WordWithCount{wordhello,count2}WordWithCount{wordworld,count2}