游戏电视苹果数码历史美丽
投稿投诉
美丽时装
彩妆资讯
历史明星
乐活安卓
数码常识
驾车健康
苹果问答
网络发型
电视车载
室内电影
游戏科学
音乐整形

Flink操练(七)之DS简介(7)FlinkDataStr

  1产生传感器读数代码编写(读取数据源)1。1从批读取数据
  scalaversionvalstreamenv。fromElements(SensorReading(sensor1,1547718199,35。80018327300259),SensorReading(sensor6,1547718199,15。402984393403084),SensorReading(sensor7,1547718199,6。720945201171228),SensorReading(sensor10,1547718199,38。101067604893444))
  javaversionDataStreamSensorReadingstreamenv。fromElements(newSensorReading(sensor1,1547718199,35。80018327300259),newSensorReading(sensor6,1547718199,15。402984393403084),newSensorReading(sensor7,1547718199,6。720945201171228),newSensorReading(sensor10,1547718199,38。101067604893444))1。2从文件读取数据
  scalaversionvalstreamenv。readTextFile(filePath)
  javaversionDataStreamStringstreamenv。readTextFile(filePath);1。3以Kafka消息队列的数据为数据来源
  scalaversionvalpropertiesnewProperties()properties。setProperty(bootstrap。servers,localhost:9092)properties。setProperty(group。id,consumergroup)properties。setProperty(key。deserializer,org。apache。kafka。common。serialization。StringDeserializer)properties。setProperty(value。deserializer,org。apache。kafka。common。serialization。StringDeserializer)properties。setProperty(auto。offset。reset,latest)valenvStreamExecutionEnvironment。getExecutionEnvironmentenv。setStreamTimeCharacteristic(TimeCharacteristic。EventTime)env。setParallelism(1)valstreamenvsource为来自Kafka的数据,这里我们实例化一个消费者,topic为hotitems。addSource(newFlinkKafkaConsumer011〔String〕(hotitems,newSimpleStringSchema(),properties))
  javaversionPropertiespropertiesnewProperties();properties。setProperty(bootstrap。servers,localhost:9092);properties。setProperty(group。id,consumergroup);properties。setProperty(key。deserializer,org。apache。kafka。common。serialization。StringDeserializer);properties。setProperty(value。deserializer,org。apache。kafka。common。serialization。StringDeserializer);properties。setProperty(auto。offset。reset,latest);StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment;env。setStreamTimeCharacteristic(TimeCharacteristic。EventTime);env。setParallelism(1);DataStreamStringstreamenvsource为来自Kafka的数据,这里我们实例化一个消费者,topic为hotitems。addSource(newFlinkKafkaConsumer011String(hotitems,newSimpleStringSchema(),properties));1。4自定义数据源
  scalaversionimportjava。util。Calendarimportorg。apache。flink。streaming。api。functions。source。SourceFunction。SourceContextimportorg。apache。flink。streaming。api。functions。source。{RichParallelSourceFunction,SourceFunction}importscala。util。Random泛型是SensorReading,表明产生的流中的事件的类型是SensorReadingclassSensorSourceextendsRichParallelSourceFunction〔SensorReading〕{表示数据源是否正常运行varrunning:Booleantrue上下文参数用来发出数据overridedefrun(ctx:SourceContext〔SensorReading〕):Unit{valrandnewRandomvarcurFTemp(1to10)。map(使用高斯噪声产生随机温度值i(sensori,(rand。nextGaussian()20)))产生无限数据流while(running){curFTempcurFTemp。map(t(t。1,t。2(rand。nextGaussian()0。5)))产生ms为单位的时间戳valcurTimeCalendar。getInstance。getTimeInMillis使用ctx参数的collect方法发射传感器数据curFTemp。foreach(tctx。collect(SensorReading(t。1,curTime,t。2)))每隔100ms发送一条传感器数据Thread。sleep(1000)}}定义当取消flink任务时,需要关闭数据源overridedefcancel():Unitrunningfalse}
  使用方法valsensorDataenv。addSource(newSensorSource)
  javaversionimportorg。apache。flink。streaming。api。functions。source。RichParallelSourceFunction;importjava。util。Calendar;importjava。util。Random;publicclassSensorSourceextendsRichParallelSourceFunctionSensorReading{privatebooleanrunningtrue;Overridepublicvoidrun(SourceContextSensorReadingsrcCtx)throwsException{RandomrandnewRandom();String〔〕sensorIdsnewString〔10〕;double〔〕curFTempnewdouble〔10〕;for(inti0;i10;i){sensorIds〔i〕sensori;curFTemp〔i〕65(rand。nextGaussian()20);}while(running){longcurTimeCalendar。getInstance()。getTimeInMillis();for(inti0;i10;i){curFTemp〔i〕rand。nextGaussian()0。5;srcCtx。collect(newSensorReading(sensorIds〔i〕,curTime,curFTemp〔i〕));}Thread。sleep(100);}}Overridepublicvoidcancel(){this。runningfalse;}}使用方法摄入数据流DataStreamSensorReadingsensorDataenv。addSource(newSensorSource());2转换算子
  在这一小节我们将大概看一下DataStreamAPI的基本转换算子。与时间有关的操作符(例如窗口操作符和其他特殊的转换算子)将会在后面的章节叙述。一个流的转换操作将会应用在一个或者多个流上面,这些转换操作将流转换成一个或者多个输出流。编写一个DataStreamAPI简单来说就是将这些转换算子组合在一起来构建一个数据流图,这个数据流图就实现了我们的业务逻辑。
  大部分的流转换操作都基于用户自定义函数UDF。UDF函数打包了一些业务逻辑并定义了输入流的元素如何转换成输出流的元素。像MapFunction这样的函数,将会被定义为类,这个类实现了Flink针对特定的转换操作暴露出来的接口。DataStreamStringsensorIdsfilteredReadings。map(rr。id);
  函数接口定义了需要由用户实现的转换方法,例如上面例子中的map()方法。
  大部分函数接口被设计为SingleAbstractMethod(单独抽象方法)接口,并且接口可以使用Java8匿名函数来实现。ScalaDataStreamAPI也内置了对匿名函数的支持。当讲解DataStreamAPI的转换算子时,我们展示了针对所有函数类的接口,但为了简洁,大部分接口的实现使用匿名函数而不是函数类的方式。
  DataStreamAPI针对大多数数据转换操作提供了转换算子。如果你很熟悉批处理API、函数式编程语言或者SQL,那么你将会发现这些API很容易学习。我们会将DataStreamAPI的转换算子分成四类:基本转换算子:将会作用在数据流中的每一条单独的数据上。KeyedStream转换算子:在数据有key的情况下,对数据应用转换算子。多流转换算子:合并多条流为一条流或者将一条流分割为多条流。分布式转换算子:将重新组织流里面的事件。2。1基本转换算子
  基本转换算子会针对流中的每一个单独的事件做处理,也就是说每一个输入数据会产生一个输出数据。单值转换,数据的分割,数据的过滤,都是基本转换操作的典型例子。我们将解释这些算子的语义并提供示例代码。
  MAP
  map算子通过调用DataStream。map()来指定。map算子的使用将会产生一条新的数据流。它会将每一个输入的事件传送到一个用户自定义的mapper,这个mapper只返回一个输出事件,这个输出事件和输入事件的类型可能不一样。图51展示了一个map算子,这个map将每一个正方形转化成了圆形。
  MapFunction的类型与输入事件和输出事件的类型相关,可以通过实现MapFunction接口来定义。接口包含map()函数,这个函数将一个输入事件恰好转换为一个输出事件。T:thetypeofinputelementsO:thetypeofoutputelementsMapFunction〔T,O〕map(T):O
  下面的代码实现了将SensorReading中的id字段抽取出来的功能。
  scalaversionvalreadings:DataStream〔SensorReading〕。。。valsensorIds:DataStream〔String〕readings。map(newIdExtractor)classIdExtractorextendsMapFunction〔SensorReading,String〕{overridedefmap(r:SensorReading):Stringr。id}
  当然我们更推荐匿名函数的写法。valsensorIds:DataStream〔String〕filteredReadings。map(rr。id)
  javaversionDataStreamSensorReadingreadings。。。DataStreamStringsensorIdsreadings。map(newIdExtractor());publicstaticclassIdExtractorimplementsMapFunctionSensorReading,String{OverridepublicStringmap(SensorReadingr)throwsException{returnr。id;}}
  当然我们更推荐匿名函数的写法。DataStreamStringsensorIdsfilteredReadings。map(rr。id);
  FILTER
  filter转换算子通过在每个输入事件上对一个布尔条件进行求值来过滤掉一些元素,然后将剩下的元素继续发送。一个true的求值结果将会把输入事件保留下来并发送到输出,而如果求值结果为false,则输入事件会被抛弃掉。我们通过调用DataStream。filter()来指定流的filter算子,filter操作将产生一条新的流,其类型和输入流中的事件类型是一样的。图52展示了只产生白色方框的filter操作。
  布尔条件可以使用函数、FilterFunction接口或者匿名函数来实现。FilterFunction中的泛型是输入事件的类型。定义的filter()方法会作用在每一个输入元素上面,并返回一个布尔值。T:thetypeofelementsFilterFunction〔T〕filter(T):Boolean
  下面的例子展示了如何使用filter来从传感器数据中过滤掉温度值小于25华氏温度的读数。
  scalaversionvalfilteredReadingsreadings。filter(rr。temperature25)
  javaversionDataStreamSensorReadingfilteredReadingsreadings。filter(rr。temperature25);
  FLATMAP
  flatMap算子和map算子很类似,不同之处在于针对每一个输入事件flatMap可以生成0个、1个或者多个输出元素。事实上,flatMap转换算子是filter和map的泛化。所以flatMap可以实现map和filter算子的功能。图53展示了flatMap如何根据输入事件的颜色来做不同的处理。如果输入事件是白色方框,则直接输出。输入元素是黑框,则复制输入。灰色方框会被过滤掉。
  flatMap算子将会应用在每一个输入事件上面。对应的FlatMapFunction定义了flatMap()方法,这个方法返回0个、1个或者多个事件到一个Collector集合中,作为输出结果。T:thetypeofinputelementsO:thetypeofoutputelementsFlatMapFunction〔T,O〕flatMap(T,Collector〔O〕):Unit
  下面的例子展示了在数据分析教程中经常用到的例子,我们用flatMap来实现。使用来切割传感器ID,比如sensor1。
  scalaversionclassIdSplitterextendsFlatMapFunction〔String,String〕{overridedefflatMap(id:String,out:Collector〔String〕):Unit{valarrid。split()arr。foreach(out。collect)}}
  匿名函数写法valsplitIdssensorIds。flatMap(rr。split())
  javaversionpublicstaticclassIdSplitterimplementsFlatMapFunctionString,String{OverridepublicvoidflatMap(Stringid,CollectorStringout){String〔〕splitsid。split();for(Stringsplit:splits){out。collect(split);}}}
  匿名函数写法:DataStreamStringsplitIdssensorIds。flatMap((FlatMapFunctionString,String)(id,out){for(Strings:id。split()){out。collect(s);}})provideresulttypebecauseJavacannotinferreturntypeoflambdafunction提供结果的类型,因为Java无法推断匿名函数的返回值类型。returns(Types。STRING);2。2键控流转换算子
  很多流处理程序的一个基本要求就是要能对数据进行分组,分组后的数据共享某一个相同的属性。DataStreamAPI提供了一个叫做KeyedStream的抽象,此抽象会从逻辑上对DataStream进行分区,分区后的数据拥有同样的Key值,分区后的流互不相关。
  针对KeyedStream的状态转换操作可以读取数据或者写入数据到当前事件Key所对应的状态中。这表明拥有同样Key的所有事件都可以访问同样的状态,也就是说所以这些事件可以一起处理。
  要小心使用状态转换操作和基于Key的聚合操作。如果Key的值越来越多,例如:Key是订单ID,我们必须及时清空Key所对应的状态,以免引起内存方面的问题。稍后我们会详细讲解。
  KeyedStream可以使用map,flatMap和filter算子来处理。接下来我们会使用keyBy算子来将DataStream转换成KeyedStream,并讲解基于key的转换操作:滚动聚合和reduce算子。
  KEYBY
  keyBy通过指定key来将DataStream转换成KeyedStream。基于不同的key,流中的事件将被分配到不同的分区中去。所有具有相同key的事件将会在接下来的操作符的同一个子任务槽中进行处理。拥有不同key的事件可以在同一个任务中处理。但是算子只能访问当前事件的key所对应的状态。
  如图54所示,把输入事件的颜色作为key,黑色的事件输出到了一个分区,其他颜色输出到了另一个分区。
  keyBy()方法接收一个参数,这个参数指定了key或者keys,有很多不同的方法来指定key。我们将在后面讲解。下面的代码声明了id这个字段为SensorReading流的key。
  scalaversionvalkeyed:KeyedStream〔SensorReading,String〕readings。keyBy(rr。id)
  匿名函数rr。id抽取了传感器读数SensorReading的id值。
  javaversionKeyedStreamSensorReading,Stringkeyedreadings。keyBy(rr。id);
  匿名函数rr。id抽取了传感器读数SensorReading的id值。
  滚动聚合
  滚动聚合算子由KeyedStream调用,并生成一个聚合以后的DataStream,例如:sum,minimum,maximum。一个滚动聚合算子会为每一个观察到的key保存一个聚合的值。针对每一个输入事件,算子将会更新保存的聚合结果,并发送一个带有更新后的值的事件到下游算子。滚动聚合不需要用户自定义函数,但需要接受一个参数,这个参数指定了在哪一个字段上面做聚合操作。DataStreamAPI提供了以下滚动聚合方法。
  滚动聚合算子只能用在滚动窗口,不能用在滑动窗口。sum():在输入流上对指定的字段做滚动相加操作。min():在输入流上对指定的字段求最小值。max():在输入流上对指定的字段求最大值。minBy():在输入流上针对指定字段求最小值,并返回包含当前观察到的最小值的事件。maxBy():在输入流上针对指定字段求最大值,并返回包含当前观察到的最大值的事件。
  滚动聚合算子无法组合起来使用,每次计算只能使用一个单独的滚动聚合算子。
  下面的例子根据第一个字段来对类型为Tuple3Int,Int,Int的流做分流操作,然后针对第二个字段做滚动求和操作。
  scalaversionvalinputStreamenv。fromElements((1,2,2),(2,3,1),(2,2,4),(1,5,3))valresultStreaminputStream。keyBy(0)。sum(1)
  javaversionDataStreamTuple3Integer,Integer,IntegerinputStreamenv。fromElements(newTuple3(1,2,2),newTuple3(2,3,1),newTuple3(2,2,4),newTuple3(1,5,3));DataStreamTuple3Integer,Integer,IntegerresultStreaminputStream。keyBy(0)keyonfirstfieldofthetuple。sum(1);sumthesecondfieldofthetupleinplace
  在这个例子里面,输入流根据第一个字段来分流,然后在第二个字段上做计算。对于key1,输出结果是(1,2,2),(1,7,2)。对于key2,输出结果是(2,3,1),(2,5,1)。第一个字段是key,第二个字段是求和的数值,第三个字段未定义。
  滚动聚合操作会对每一个key都保存一个状态。因为状态从来不会被清空,所以我们在使用滚动聚合算子时只能使用在含有有限个key的流上面。
  REDUCE
  reduce算子是滚动聚合的泛化实现。它将一个ReduceFunction应用到了一个KeyedStream上面去。reduce算子将会把每一个输入事件和当前已经reduce出来的值做聚合计算。reduce操作不会改变流的事件类型。输出流数据类型和输入流数据类型是一样的。
  reduce函数可以通过实现接口ReduceFunction来创建一个类。ReduceFunction接口定义了reduce()方法,此方法接收两个输入事件,输入一个相同类型的事件。T:theelementtypeReduceFunction〔T〕reduce(T,T):T
  下面的例子,流根据传感器ID分流,然后计算每个传感器的当前最大温度值。
  scalaversionvalmaxTempPerSensorkeyed。reduce((r1,r2)r1。temperature。max(r2。temperature))
  javaversionDataStreamSensorReadingmaxTempPerSensorkeyed。reduce((r1,r2){if(r1。temperaturer2。temperature){returnr1;}else{returnr2;}});
  reduce作为滚动聚合的泛化实现,同样也要针对每一个key保存状态。因为状态从来不会清空,所以我们需要将reduce算子应用在一个有限key的流上。2。3多流转换算子
  许多应用需要摄入多个流并将流合并处理,还可能需要将一条流分割成多条流然后针对每一条流应用不同的业务逻辑。接下来,我们将讨论DataStreamAPI中提供的能够处理多条输入流或者发送多条输出流的操作算子。
  UNION
  DataStream。union()方法将两条或者多条DataStream合并成一条具有与输入流相同类型的输出DataStream。接下来的转换算子将会处理输入流中的所有元素。图55展示了union操作符如何将黑色和白色的事件流合并成一个单一输出流。
  事件合流的方式为FIFO方式。操作符并不会产生一个特定顺序的事件流。union操作符也不会进行去重。每一个输入事件都被发送到了下一个操作符。
  下面的例子展示了如何将三条类型为SensorReading的数据流合并成一条流。
  scalaversionvalparisStream:DataStream〔SensorReading〕。。。valtokyoStream:DataStream〔SensorReading〕。。。valrioStream:DataStream〔SensorReading〕。。。valallCities:DataStream〔SensorReading〕parisStream。union(tokyoStream,rioStream)
  javaversionDataStreamSensorReadingparisStream。。。DataStreamSensorReadingtokyoStream。。。DataStreamSensorReadingrioStream。。。DataStreamSensorReadingallCitiesparisStream。union(tokyoStream,rioStream)
  CONNECT,COMAP和COFLATMAP
  联合两条流的事件是非常常见的流处理需求。例如监控一片森林然后发出高危的火警警报。报警的Application接收两条流,一条是温度传感器传回来的数据,一条是烟雾传感器传回来的数据。当两条流都超过各自的阈值时,报警。
  DataStreamAPI提供了connect操作来支持以上的应用场景。DataStream。connect()方法接收一条DataStream,然后返回一个ConnectedStreams类型的对象,这个对象表示了两条连接的流。
  scalaversionvalfirst。。。valsecond。。。valconnectedfirst。connect(second)
  javaversionfirststreamDataStreamIntegerfirst。。。secondstreamDataStreamStringsecond。。。connectstreamsConnectedStreamsInteger,Stringconnectedfirst。connect(second);
  ConnectedStreams提供了map()和flatMap()方法,分别需要接收类型为CoMapFunction和CoFlatMapFunction的参数。
  以上两个函数里面的泛型是第一条流的事件类型和第二条流的事件类型,以及输出流的事件类型。还定义了两个方法,每一个方法针对一条流来调用。map1()和flatMap1()会调用在第一条流的元素上面,map2()和flatMap2()会调用在第二条流的元素上面。IN1:第一条流的事件类型IN2:第二条流的事件类型OUT:输出流的事件类型CoMapFunction〔IN1,IN2,OUT〕map1(IN1):OUTmap2(IN2):OUTCoFlatMapFunction〔IN1,IN2,OUT〕flatMap1(IN1,Collector〔OUT〕):UnitflatMap2(IN2,Collector〔OUT〕):Unit
  函数无法选择读某一条流。我们是无法控制函数中的两个方法的调用顺序的。当一条流中的元素到来时,将会调用相对应的方法。
  对两条流做连接查询通常需要这两条流基于某些条件被确定性的路由到操作符中相同的并行实例里面去。在默认情况下,connect()操作将不会对两条流的事件建立任何关系,所以两条流的事件将会随机的被发送到下游的算子实例里面去。这样的行为会产生不确定性的计算结果,显然不是我们想要的。为了针对ConnectedStreams进行确定性的转换操作,connect()方法可以和keyBy()或者broadcast()组合起来使用。我们首先看一下keyBy()的示例。
  scalaversionvalone。。。valtwo。。。valkeyedConnect1one。connect(two)。keyBy(0,0)valkeyedConnect2one。keyBy(0)。connect(two。keyBy(0))
  javaversionDataStreamTuple2Integer,Longone。。。DataStreamTuple2Integer,Stringtwo。。。keyBytwoconnectedstreamsConnectedStreamsTuple2Int,Long,Tuple2Integer,StringkeyedConnect1one。connect(two)。keyBy(0,0);keybothinputstreamsonfirstattributealternative:connecttwokeyedstreamsConnectedStreamsTuple2Integer,Long,Tuple2Integer,StringkeyedConnect2one。keyBy(0)。connect(two。keyBy(0));
  无论使用keyBy()算子操作ConnectedStreams还是使用connect()算子连接两条KeyedStreams,connect()算子会将两条流的含有相同Key的所有事件都发送到相同的算子实例。两条流的key必须是一样的类型和值,就像SQL中的JOIN。在connected和keyedstream上面执行的算子有访问keyedstate的权限。
  下面的例子展示了如何连接一条DataStream和广播过的流。
  scalaversionvalone。。。valtwo。。。valkeyedConnectfirst。connect(second。broadcast())
  javaversionDataStreamTuple2Integer,Longone。。。DataStreamTuple2Int,Stringtwo。。。connectstreamswithbroadcastConnectedStreamsTuple2Int,Long,Tuple2Int,StringkeyedConnectfirstbroadcastsecondinputstream。connect(second。broadcast());
  一条被广播过的流中的所有元素将会被复制然后发送到下游算子的所有并行实例中去。未被广播过的流仅仅向前发送。所以两条流的元素显然会被连接处理。
  例子:
  警告类:
  scalaversioncaseclassAlert(message:String,timestamp:Long)
  javaversionpublicclassAlert{publicStringmessage;publiclongtimestamp;publicAlert(){}publicAlert(Stringmessage,longtimestamp){this。messagemessage;this。timestamptimestamp;}publicStringtoString(){return(message,timestamp);}}
  烟雾传感器读数类:publicenumSmokeLevel{LOW,HIGH}
  产生烟雾传感器读数的自定义数据源:publicclassSmokeLevelSourceimplementsSourceFunctionSmokeLevel{privatebooleanrunningtrue;Overridepublicvoidrun(SourceContextSmokeLevelsrcCtx)throwsException{RandomrandnewRandom();while(running){if(rand。nextGaussian()0。8){srcCtx。collect(SmokeLevel。HIGH);}else{srcCtx。collect(SmokeLevel。LOW);}Thread。sleep(1000);}}Overridepublicvoidcancel(){this。runningfalse;}}
  监控一片森林然后发出高危的火警警报。报警的Application接收两条流,一条是温度传感器传回来的数据,一条是烟雾传感器传回来的数据。当两条流都超过各自的阈值时,报警。
  scalaversionobjectMultiStreamTransformations{defmain(args:Array〔String〕):Unit{valenvStreamExecutionEnvironment。getExecutionEnvironmentvaltempReadingsenv。addSource(newSensorSource)valsmokeReadingsenv。addSource(newSmokeLevelSource)。setParallelism(1)valkeyedTempReadingstempReadings。keyBy(rr。id)valalertskeyedTempReadings。connect(smokeReadings。broadcast())。flatMap(newRaiseAlertFlatMap)alerts。print()env。execute(MultiStreamTransformationsExample)}classRaiseAlertFlatMapextendsCoFlatMapFunction〔SensorReading,SmokeLevel,Alert〕{privatevarsmokeLevelLOWoverridedefflatMap1(tempReading:SensorReading,out:Collector〔Alert〕):Unit{if(smokeLevelHIGHtempReading。temperature100){out。collect(Alert(Riskoffire!tempReading,tempReading。timestamp))}}overridedefflatMap2(sl:String,out:Collector〔Alert〕):Unit{smokeLevelsl}}}
  javaversionpublicclassMultiStreamTransformations{publicstaticvoidmain(String〔〕args)throwsException{StreamExecutionEnvironmentenvStreamExecutionEnvironment。getExecutionEnvironment();DataStreamSensorReadingtempReadingsenv。addSource(newSensorSource());DataStreamSmokeLevelsmokeReadingsenv。addSource(newSmokeLevelSource())。setParallelism(1);KeyedStreamSensorReading,StringkeyedTempReadingstempReadings。keyBy(rr。id);DataStreamalertskeyedTempReadings。connect(smokeReadings。broadcast())。flatMap(newRaiseAlertFlatMap());alerts。print();env。execute(MultiStreamTransformationsExample);}publicstaticclassRaiseAlertFlatMapimplementsCoFlatMapFunctionSensorReading,SmokeLevel,Alert{privateSmokeLevelsmokeLevelSmokeLevel。LOW;OverridepublicvoidflatMap1(SensorReadingtempReading,Collectorout)throwsException{highchanceoffiretrueif(this。smokeLevelSmokeLevel。HIGHtempReading。temperature100){out。collect(newAlert(Riskoffire!tempReading,tempReading。timestamp));}}OverridepublicvoidflatMap2(SmokeLevelsmokeLevel,Collectorout){updatesmokelevelthis。smokeLevelsmokeLevel;}}}2。4分布式转换算子
  分区操作对应于我们之前讲过的数据交换策略这一节。这些操作定义了事件如何分配到不同的任务中去。当我们使用DataStreamAPI来编写程序时,系统将自动的选择数据分区策略,然后根据操作符的语义和设置的并行度将数据路由到正确的地方去。有些时候,我们需要在应用程序的层面控制分区策略,或者自定义分区策略。例如,如果我们知道会发生数据倾斜,那么我们想要针对数据流做负载均衡,将数据流平均发送到接下来的操作符中去。又或者,应用程序的业务逻辑可能需要一个算子所有的并行任务都需要接收同样的数据。再或者,我们需要自定义分区策略的时候。在这一小节,我们将展示DataStream的一些方法,可以使我们来控制或者自定义数据分区策略。keyBy()方法不同于分布式转换算子。所有的分布式转换算子将产生DataStream数据类型。而keyBy()产生的类型是KeyedStream,它拥有自己的keyedstate。
  Random
  随机数据交换由DataStream。shuffle()方法实现。shuffle方法将数据随机的分配到下游算子的并行任务中去。
  RoundRobin
  rebalance()方法使用RoundRobin负载均衡算法将输入流平均分配到随后的并行运行的任务中去。图57为roundrobin分布式转换算子的示意图。
  Rescale
  rescale()方法使用的也是roundrobin算法,但只会将数据发送到接下来的并行运行的任务中的一部分任务中。本质上,当发送者任务数量和接收者任务数量不一样时,rescale分区策略提供了一种轻量级的负载均衡策略。如果接收者任务的数量是发送者任务的数量的倍数时,rescale操作将会效率更高。
  rebalance()和rescale()的根本区别在于任务之间连接的机制不同。rebalance()将会针对所有发送者任务和所有接收者任务之间建立通信通道,而rescale()仅仅针对每一个任务和下游算子的一部分子并行任务之间建立通信通道。rescale的示意图为图57。
  Broadcast
  broadcast()方法将输入流的所有数据复制并发送到下游算子的所有并行任务中去。
  Global
  global()方法将所有的输入流数据都发送到下游算子的第一个并行任务中去。这个操作需要很谨慎,因为将所有数据发送到同一个task,将会对应用程序造成很大的压力。
  Custom
  当Flink提供的分区策略都不适用时,我们可以使用partitionCustom()方法来自定义分区策略。这个方法接收一个Partitioner对象,这个对象需要实现分区逻辑以及定义针对流的哪一个字段或者key来进行分区。

触摸春天第二课时教学设计的范文【文本解读】《触摸春天》选自人教版实验教材四年级下册第五组。本组以热爱生命为主题。选有四篇美文,让学生体会生命的美好,从而思考如何对待生命,热爱生命。本文文笔细腻,……有关太阳教学反思范文《太阳》这篇文章是一篇说明文,本文的教学重点是理解课文内容,初步了解太阳的特点以及太阳和人类的密切关系,激发学生学习自然科学的兴趣;了解用数字、比较等方法来说明事物的写法,并能……高中说课稿范例说课是教学改革中涌现出来的新生事物,是进行教学研究、教学交流和教学探讨的一种新的教学研究形式,下面是小编整理的高中说课稿范例,希望对你有帮助。高中说课稿范例一一、说课部分……第一单元ampnbsp认识国情ampnbsp理解国策ampn第一单元认识国情理解国策第三课适合国情的政治制度教学内容第一课时人民代表大会制度民族区域自治与基层民主管理制度教学目标知识与能力理解人民代表大会制度是我……人教版八年级语文上册背影教学反思范文背影教学反思一朱自清和他的父亲分别用眼泪和背影演绎了人间亲情,这不是演戏,但更感染人。本课聚集背影、眼泪父亲的语言,引导学生体验和感受、理解和反思,基本上做到了理性与人性……课文我不是最弱小的教学设计范文教学目标:1知识:会认个生字,会写12个字。2能力:正确、流利、有感情地朗读课文,体会问好、感叹号表达的不同语气。3情感:有不甘为弱者、应该保护弱小者的意识。……六年级美术教学中的色彩理论知识的教学反思色彩的基本属性,所谓色彩三要素即:色相、明度、纯度。它是决定画面色彩的主要原因。我们平时看到的色彩世界,正是由于这三要素的变化与组合,形成了五彩斑斓的景象。人们对色彩的不同感受……亲子游戏2则亲子游戏:默数目的:发展计数能力。前提:数数后说出总数。方法:家长随意拿几个玩具、水果、食物或扣子、珠子等,让孩子不用手点数,只能心中默默数数。然后说出总数,……初中七年级英语教学反思新课程改革是一次深刻的改革,强调课程要促进每个学生的身心健康发展。下面是由小编为大家带来的关于初中七年级英语教学反思,希望能够帮到您!篇一:初中七年级英语教学反思这……Unit1Womenofachievementlisteniunit1womenofachievementlistening,speakingamp;writing教案(agoodexampleforme)aimstoreadan……有关难过的造句1)奶奶要回乡下去了,送别时我难过地流下了眼泪。2)他丢了钱袋,他妻子安慰他别难过。3)旧社会,劳动人民的苦日子非常难过。4)前些年,用难过怎么造句家里人多收……2018年幼儿园中班亲子同乐会园长发言稿尊敬的各位家长,亲爱的小朋友们:大家好!感谢各位家长来参加xx元旦亲子同乐会,在此我代表亲亲宝贝幼儿园欢迎各位家长的到来,今天是xx年的最后一天,明天将迎接崭新的x……
信息技术认识Windows98窗口教学目的和要求1、认识Windows窗口,了解窗口的组成。2、掌握窗口的基本操作。3、初识菜单命令。4、进一步掌握鼠标器的基本操作方法。教学重点与……坚强的同义词成语导语:人活着就要坚强,勇敢的去接纳你身边所发生的事情。一切都会过去,一切都会好起来。坚强的近义词有哪些:顽强,坚固,坚韧,刚毅刚强,刚劲,坚贞,坚定四个……神鸟优秀教学反思《神鸟》是一篇蒙古族的民间传说,讲的是一只神鸟三次机智地从可汗手中逃脱的故事。课文篇幅较长,内容浅显易懂。教学中我采用长文短教、抓重点的方法引导学生进行学习。在学生初步掌握了生……实用教学设计方案汇总八篇为了确保工作或事情有序地进行,我们需要提前开始方案制定工作,方案是书面计划,具有内容条理清楚、步骤清晰的特点。方案应该怎么制定才好呢?以下是小编帮大家整理的教学设计方案8篇,欢……曹刿论战的教学反思按以往传统的教学,教学《曹刿论战》这篇文章,就是让同学们读读,译译,我再串讲全文,把曹刿的远谋充分挖掘一番,再与鲁庄公的鄙对照一下,学生知道课文通过对比手法刻画曹刿和鲁庄公这两……录音新闻教案4录音新闻教学目标1了解录音新闻的特点以及它与其他新闻形式的异同。2体会口语与书面语的自然结合。3理解本文报道的安排顺序。教学重难点1体会口语与书面……蟋蟀的住宅教学片断及反思课堂教学中的许多情况往往有着很大程度上的随机性。课标中指出:尊重学生在学习过程中独特的体验。阅读教学是个性化的行为。今天教学《蟋蟀的住宅一课》,学完后,我总结到:蟋蟀只是……石油大王哈默课程教案第一课时一、导入同学们,古人有饿死不吃嗟来之食的名言,讲做人要有尊严。今天老师介绍给大家的是一位外国的年轻人,从他的行为和做法中,会给我们很多新的启示。二、学……有关访城西友人别墅的阅读赏析与参考答案访城西友人别墅雍陶澧水桥西小路斜,日高犹未到君家。村园门巷多相似,处处春风枳壳花。8这首诗二、三、四句曲折而有层次地表现了诗人心情的变化,请结合作品分析……四年级上册我们是怎样听到声音的优秀教案【教学目标】科学概念:人的耳朵是由外耳、中耳和内耳构成的,外耳的耳廓把收集到的声音通过耳道传到鼓膜,引起鼓膜的振动,这种振动信号传递到大脑,通过大脑的加工,我们就能……大班歌曲爷爷为我打月饼日期xx活动名称爷爷为我打月饼执教者xxx活动目标1知道歌曲名字,理解歌词的情绪,对歌曲有一个完整的印象。2喜爱学唱革命歌曲。。环境创设歌曲的录音重点与难点知道歌曲……培养良好的学习习惯教案范文第一课时教学目标:1。知道处处皆学问,遇到不理解的地方,不清楚的问题,都应虚心向人请教,争取弄懂弄通。2。培养学生勤学好问的好习惯。教学重点:知道处处皆……
友情链接:易事利快生活快传网聚热点七猫云快好知快百科中准网快好找文好找中准网快软网