0简介 ListState〔T〕保存一个列表,列表里的元素的数据类型为T。基本操作如下:ListState。add(value:T)ListState。addAll(values:java。util。List〔T〕)ListState。get()返回Iterable〔T〕ListState。update(values:java。util。List〔T〕) ListState需要将某些值存到一个List中(Iterable),意味着缓存的数据不只是一个而是多个值。很多情况下都可以使用,例如计算的数值要包含全天的每一个记录,此时只有将每个记录的值存成一个列表才可以计算。1。实例 1。1实例一 首先需要先定义一个ListState,然后再重写KeyedProcessFunction中的open方法:privatevaritemState:ListState〔ItemViewCount〕overridedefopen(parameters:Configuration):Unit{命名状态变量的名字和类型valitemStateDescription:ListStateDescriptor〔ItemViewCount〕newListStateDescriptor〔ItemViewCount〕(itemState,classOf〔ItemViewCount〕)itemStategetRuntimeContext。getListState(itemStateDescription)} ListStateDescriptor提供了几种不同的定义方式: 两个参数分别是ListStateDescriptor的名字和typeClass 1。2实例二packageqiuhua;importorg。apache。flink。api。common。functions。RichFlatMapFunction;importorg。apache。flink。api。common。state。ListState;importorg。apache。flink。api。common。state。ListStateDescriptor;importorg。apache。flink。api。common。typeinfo。Types;importorg。apache。flink。api。java。tuple。Tuple2;importorg。apache。flink。configuration。Configuration;importorg。apache。flink。shaded。guava18。com。google。common。collect。Lists;importorg。apache。flink。util。Collector;importjava。util。Collections;importjava。util。List;program:bigdatalearndescription:通过ListState求key出现了3次,则需要计算平均值author:Mr。逗create:2021090816:18publicclassCountAverageWithListStateextendsRichFlatMapFunctionTuple2Long,Long,Tuple2Long,Double{ValueState:里面只能存一条元素ListState:里面可以存很多数据privateListStateTuple2Long,LongelementsByKey;Overridepublicvoidopen(Configurationparameters)throwsException{super。open(parameters);注册状态ListStateDescriptorTuple2Long,LongdescriptornewListStateDescriptor(liststate状态名字,Types。TUPLE(Types。LONG,Types。LONG)状态存储的数据类型);elementsByKeygetRuntimeContext()。getListState(descriptor);}OverridepublicvoidflatMap(Tuple2Long,Longvalue,CollectorTuple2Long,Doubleout)throwsException{IterableTuple2Long,LongcurrentStateelementsByKey。get();拿到当前key的状态值如果状态值没有初始化,则初始化if(currentStatenull){elementsByKey。addAll(Collections。emptyList());}更新状态elementsByKey。update((ListTuple2Long,Long)value);判断,如果当前key出现了3次,则需要计算平均值,并且输出ListTuple2Long,LongallElementsLists。newArrayList(currentState);if(allElements。size()3){longcount0;longsum0;for(Tuple2Long,Longele:allElements){count;sumele。f1;}doubleavg(double)sumcount;out。collect(Tuple2。of(value。f0,avg));清除状态elementsByKey。clear();}}} 总结 Flink提供了三种基于keyvalue的state接口,ListState接口适用于缓存多个值的计算。具体实现之前,因为state必须是基于key,且必须获取getRuntimeContext,state必须同时满足两个条件:直接基于keyedStream或者由keyedStream转换来的windowedStream必须继承RichFunction 实际实现时候,因为windowedStream在scala中不能实现RichWindowFunction,因此在main中使用flatmap间接实现了windowFunction中的功能:valfromTransactionDataStreamwatermarkTransaction。keyBy(。code)。window(TumblingEventTimeWindows。of(Time。seconds(10)))valtransactionfromTransactionDataStream。apply(newStockTransactionApply)。keyBy(。3)。flatMap(newTransactionStateFlatMapFunction)