一、ReducingState的方法 ReducingState是和ReduceFunction配合使用 get()获取状态的值 add(INvalue)方法添加一个元素,触发reduceFunction计算一次 二、ReducingState的描述器 ReducingState的描述器和之前ValueState、ListState不同,它得和一个ReduceFunction配合使用。三、统计单词 1、KeyedProcessFunction处理类packagetest;importorg。apache。flink。api。common。functions。ReduceFunction;importorg。apache。flink。api。common。functions。RichFlatMapFunction;importorg。apache。flink。api。common。state。ReducingState;importorg。apache。flink。api。common。state。ReducingStateDescriptor;importorg。apache。flink。api。java。tuple。Tuple2;importorg。apache。flink。configuration。Configuration;importorg。apache。flink。util。Collector;Description:求和Param:return:Author:Mr。逗Date:202199publicclassCountSumWithReduceStateextendsRichFlatMapFunctionTuple2Long,Long,Tuple2Long,Long{privateReducingStateLongreducingState;状态初始化Overridepublicvoidopen(Configurationparameters)throwsException{ReducingStateDescriptordescriptornewReducingStateDescriptor(ReducingDescriptor,newReduceFunctionLong(){OverridepublicLongreduce(Longv1,Longv2)throwsException{returnv1v2;}},Long。class);reducingStategetRuntimeContext()。getReducingState(descriptor);}OverridepublicvoidflatMap(Tuple2Long,Longelement,CollectorTuple2Long,Longcollector)throwsException{将状态放入reducingState。add(element。f1);collector。collect(Tuple2。of(element。f0,reducingState。get()));}} 2、主体类packagetest;importorg。apache。flink。api。java。tuple。Tuple2;importorg。apache。flink。streaming。api。datastream。DataStreamSource;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;program:bigdatalearndescription:测试reduceStateauthor:Mr。逗create:2021090817:43publicclassTestKeyedReduceStateMain{publicstaticvoidmain(String〔〕args)throwsException{获取执行环境StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();StreamExecutionEnvironment。getExecutionEnvironment();设置并行度env。setParallelism(16);获取数据源DataStreamSourceTuple2Long,LongdataStreamSourceenv。fromElements(Tuple2。of(1L,3L),Tuple2。of(1L,7L),Tuple2。of(2L,4L),Tuple2。of(1L,5L),Tuple2。of(2L,2L),Tuple2。of(2L,6L));输出:(1,5。0)(2,4。0)dataStreamSource。keyBy(0)。flatMap(newCountSumWithReduceState())。print();StringnameTestKeyedReduceStateMain。class。getName();env。execute(name);}} 3、结果展示 四、计算最高温度 1、处理类packagetest;importorg。apache。flink。api。common。functions。ReduceFunction;importorg。apache。flink。api。common。functions。RichFlatMapFunction;importorg。apache。flink。api。common。state。ReducingState;importorg。apache。flink。api。common。state。ReducingStateDescriptor;importorg。apache。flink。api。java。tuple。Tuple2;importorg。apache。flink。configuration。Configuration;importorg。apache。flink。util。Collector;Description:求最大值Param:return:Author:Mr。逗Date:202199publicclassCountMaxWithReduceStateextendsRichFlatMapFunctionTuple2Long,Long,Tuple2Long,Long{privateReducingStateLongreducingState;状态初始化Overridepublicvoidopen(Configurationparameters)throwsException{ReducingStateDescriptordescriptornewReducingStateDescriptor(ReducingDescriptor,newReduceFunctionLong(){OverridepublicLongreduce(Longv1,Longv2)throwsException{returnv1v2?v1:v2;}},Long。class);reducingStategetRuntimeContext()。getReducingState(descriptor);}OverridepublicvoidflatMap(Tuple2Long,Longelement,CollectorTuple2Long,Longcollector)throwsException{将状态放入reducingState。add(element。f1);collector。collect(Tuple2。of(element。f0,reducingState。get()));}} 2、主体类packagetest;importorg。apache。flink。api。java。tuple。Tuple2;importorg。apache。flink。streaming。api。datastream。DataStreamSource;importorg。apache。flink。streaming。api。environment。StreamExecutionEnvironment;program:bigdatalearndescription:测试reduceStateauthor:Mr。逗create:2021090817:43publicclassTestKeyedReduceStateMain{publicstaticvoidmain(String〔〕args)throwsException{获取执行环境StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();StreamExecutionEnvironment。getExecutionEnvironment();设置并行度env。setParallelism(16);获取数据源DataStreamSourceTuple2Long,LongdataStreamSourceenv。fromElements(Tuple2。of(1L,3L),Tuple2。of(1L,7L),Tuple2。of(2L,4L),Tuple2。of(1L,5L),Tuple2。of(2L,2L),Tuple2。of(2L,6L));输出:(1,5。0)(2,4。0)dataStreamSource。keyBy(0)。flatMap(newCountMaxWithReduceState())。print();StringnameTestKeyedReduceStateMain。class。getName();env。execute(name);}} 3、结果展示