Flink操练(十三)之DS简介(13)读写外部系统
0简介
数据可以存储在不同的系统中,例如:文件系统,对象存储系统(OSS),关系型数据库,KeyValue存储,搜索引擎索引,日志系统,消息队列,等等。每一种系统都是给特定的应用场景设计的,在某一个特定的目标上超越了其他系统。今天的数据架构,往往包含着很多不同的存储系统。在将一个组件加入到我们的系统中时,我们需要问一个问题:这个组件和架构中的其他组件能多好的一起工作?
添加一个像Flink这样的数据处理系统,需要仔细的考虑。因为Flink没有自己的存储层,而是读取数据和持久化数据都需要依赖外部存储。所以,对于Flink,针对外部系统提供良好的读取和写入的连接器就很重要了。尽管如此,仅仅能够读写外部系统对于Flink这样想要提供任务故障情况下一致性保证的流处理器来讲,是不够的。
在本章中,我们将会讨论source和sink的连接器。这些连接器影响了Flink的一致性保证,也提供了对于最流行的一些外部系统的读写的连接器。我们还将学习如何实现自定义source和sink连接器,以及如何实现可以向外部系统发送异步读写请求的函数。1应用的一致性保证
Flink的检查点和恢复机制定期的会保存应用程序状态的一致性检查点。在故障的情况下,应用程序的状态将会从最近一次完成的检查点恢复,并继续处理。尽管如此,可以使用检查点来重置应用程序的状态无法完全达到令人满意的一致性保证。相反,source和sink的连接器需要和Flink的检查点和恢复机制进行集成才能提供有意义的一致性保证。
为了给应用程序提供恰好处理一次语义的状态一致性保证,应用程序的source连接器需要能够将source的读位置重置到之前保存的检查点位置。当处理一次检查点时,source操作符将会把source的读位置持久化,并在恢复的时候从这些读位置开始重新读取。支持读位置的检查点的source连接器一般来说是基于文件的存储系统,如:文件流或者Kafkasource(检查点会持久化某个正在消费的topic的读偏移量)。如果一个应用程序从一个无法存储和重置读位置的source连接器摄入数据,那么当任务出现故障的时候,数据就会丢失。也就是说我们只能提供atmostonce)的一致性保证。
Fink的检查点和恢复机制和可以重置读位置的source连接器结合使用,可以保证应用程序不会丢失任何数据。尽管如此,应用程序可能会发出两次计算结果,因为从上一次检查点恢复的应用程序所计算的结果将会被重新发送一次(一些结果已经发送出去了,这时任务故障,然后从上一次检查点恢复,这些结果将被重新计算一次然后发送出去)。所以,可重置读位置的source和Flink的恢复机制不足以提供端到端的恰好处理一次语义,即使应用程序的状态是恰好处理一次一致性级别。
一个志在提供端到端恰好处理一次语义一致性的应用程序需要特殊的sink连接器。sink连接器可以在不同的情况下使用两种技术来达到恰好处理一次一致性语义:幂等性写入和事务性写入。1。1幂等性写入
一个幂等操作无论执行多少次都会返回同样的结果。例如,重复的向hashmap中插入同样的keyvalue对就是幂等操作,因为头一次插入操作之后所有的插入操作都不会改变这个hashmap,因为hashmap已经包含这个keyvalue对了。另一方面,append操作就不是幂等操作了,因为多次append同一个元素将会导致列表每次都会添加一个元素。在流处理程序中,幂等写入操作是很有意思的,因为幂等写入操作可以执行多次但不改变结果。所以它们可以在某种程度上缓和Flink检查点机制带来的重播计算结果的效应。
需要注意的是,依赖于幂等性sink来达到exactlyonce语义的应用程序,必须保证在从检查点恢复以后,它将会覆盖之前已经写入的结果。例如,一个包含有sink操作的应用在sink到一个keyvalue存储时必须保证它能够确定的计算出将要更新的key值。同时,从Flink程序sink到的keyvalue存储中读取数据的应用,在Flink从检查点恢复的过程中,可能会看到不想看到的结果。当重播开始时,之前已经发出的计算结果可能会被更早的结果所覆盖(因为在恢复过程中)。所以,一个消费Flink程序输出数据的应用,可能会观察到时间回退,例如读到了比之前小的计数。也就是说,当流处理程序处于恢复过程中时,流处理程序的结果将处于不稳定的状态,因为一些结果被覆盖掉,而另一些结果还没有被覆盖。一旦重播完成,也就是说应用程序已经通过了之前出故障的点,结果将会继续保持一致性。1。2事务性写入
第二种实现端到端的恰好处理一次一致性语义的方法基于事务性写入。其思想是只将最近一次成功保存的检查点之前的计算结果写入到外部系统中去。这样就保证了在任务故障的情况下,端到端恰好处理一次语义。应用将被重置到最近一次的检查点,而在这个检查点之后并没有向外部系统发出任何计算结果。通过只有当检查点保存完成以后再写入数据这种方法,事务性的方法将不会遭受幂等性写入所遭受的重播不一致的问题。尽管如此,事务性写入却带来了延迟,因为只有在检查点完成以后,我们才能看到计算结果。
Flink提供了两种构建模块来实现事务性sink连接器:writeaheadlog(WAL,预写式日志)sink和两阶段提交sink。WAL式sink将会把所有计算结果写入到应用程序的状态中,等接到检查点完成的通知,才会将计算结果发送到sink系统。因为sink操作会把数据都缓存在状态后段,所以WAL可以使用在任何外部sink系统上。尽管如此,WAL还是无法提供刀枪不入的恰好处理一次语义的保证,再加上由于要缓存数据带来的状态后段的状态大小的问题,WAL模型并不十分完美。
与之形成对比的,2PCsink需要sink系统提供事务的支持或者可以模拟出事务特性的模块。对于每一个检查点,sink开始一个事务,然后将所有的接收到的数据都添加到事务中,并将这些数据写入到sink系统,但并没有提交(commit)它们。当事务接收到检查点完成的通知时,事务将被commit,数据将被真正的写入sink系统。这项机制主要依赖于一次sink可以在检查点完成之前开始事务,并在应用程序从一次故障中恢复以后再commit的能力。
2PC协议依赖于Flink的检查点机制。检查点屏障是开始一个新的事务的通知,所有操作符自己的检查点成功的通知是它们可以commit的投票,而作业管理器通知一个检查点成功的消息是commit事务的指令。于WALsink形成对比的是,2PCsinks依赖于sink系统和sink本身的实现可以实现恰好处理一次语义。更多的,2PCsink不断的将数据写入到sink系统中,而WAL写模型就会有之前所述的问题。
不可重置的源
可重置的源
anysink
atmostonce
atleastonce
幂等性sink
atmostonce
exactlyonce(当从任务失败中恢复时,存在暂时的不一致性)
预写式日志sink
atmostonce
atleastonce
2PCsink
atmostonce
exactlyonce2Flink提供的连接器
Flink提供了读写很多存储系统的连接器。消息队列,日志系统,例如ApacheKafka,Kinesis,RabbitMQ等等这些是常用的数据源。在批处理环境中,数据流很可能是监听一个文件系统,而当新的数据落盘的时候,读取这些新数据。
在sink一端,数据流经常写入到消息队列中,以供接下来的流处理程序消费。数据流也可能写入到文件系统中做持久化,或者交给批处理程序来进行分析。数据流还可能被写入到keyvalue存储或者关系型数据库中,例如Cassandra,ElasticSearch或者MySQL中,这样数据可供查询,还可以在仪表盘中显示出来。
不幸的是,对于大多数存储系统并没有标准接口,除了针对DBMS的JDBC。相反,每一个存储系统都需要有自己的特定的连接器。所以,Flink需要维护针对不同存储系统(消息队列,日志系统,文件系统,kv数据库,关系型数据库等等)的连接器实现。
Flink提供了针对ApacheKafka,Kinesis,RabbitMQ,ApacheNifi,各种文件系统,Cassandra,Elasticsearch,还有JDBC的连接器。除此之外,ApacheBahir项目还提供了额外的针对例如ActiveMQ,Akka,Flume,Netty,和Redis等的连接器。2。1ApacheKafkaSource连接器
ApacheKafka是一个分布式流式平台。它的核心是一个分布式的发布订阅消息系统。
Kafka将事件流组织为所谓的topics。一个主题就是一个事件日志系统,Kafka可以保证主题中的数据在被读取时和这些数据在被写入时相同的顺序。为了扩大读写的规模,主题可以分裂为多个分区,这些分区分布在一个集群上面。这时,读写顺序的保证就限制到了分区这个粒度,Kafka并没有提供从不同分区读取数据时的顺序保证。Kafka分区的读位置称为偏移量(offset)。
Kafka的依赖引入如下:dependencygroupIdorg。apache。flinkgroupIdflinkconnectorkafka2。12artifactIdversion1。7。1versiondependency
FlinkKafka连接器并行的摄入事件流。每一个并行source任务可以从一个或者多个分区中读取数据。任务将会跟踪每一个分区当前的读偏移量,然后将读偏移量写入到检查点数据中。当从任务故障恢复时,读偏移量将被恢复,而source任务将从检查点保存的读偏移量开始重新读取数据。FlinkKafka连接器并不依赖Kafka自己的offsettracking机制(基于消费者组实现)。下图展示了分区如何分配给source实例。
Kafkasource连接器使用如下代码创建valpropertiesnewProperties()properties。setProperty(bootstrap。servers,localhost:9092)properties。setProperty(group。id,test)valstream:DataStream〔String〕env。addSource(newFlinkKafkaConsumer〔String〕(topic,newSimpleStringSchema(),properties))
构造器接受三个参数。第一个参数定义了从哪些topic中读取数据,可以是一个topic,也可以是topic列表,还可以是匹配所有想要读取的topic的正则表达式。当从多个topic中读取数据时,Kafka连接器将会处理所有topic的分区,将这些分区的数据放到一条流中去。
第二个参数是一个DeserializationSchema或者KeyedDeserializationSchema。Kafka消息被存储为原始的字节数据,所以需要反序列化成Java或者Scala对象。上例中使用的SimpleStringSchema,是一个内置的DeserializationSchema,它仅仅是简单的将字节数组反序列化成字符串。DeserializationSchema和KeyedDeserializationSchema是公共的接口,所以我们可以自定义反序列化逻辑。
第三个参数是一个Properties对象,设置了用来读写的Kafka客户端的一些属性。
为了抽取事件时间的时间戳然后产生水印,我们可以通过调用valpropertiesnewProperties()properties。setProperty(bootstrap。servers,localhost:9092)properties。setProperty(group。id,test)valstream:DataStream〔String〕env。addSource(newFlinkKafkaConsumer〔String〕(topic,newSimpleStringSchema(),properties))
方法为Kafka消费者提供AssignerWithPeriodicWatermark或者AssignerWithPucntuatedWatermark。每一个assigner都将被应用到每个分区,来利用每一个分区的顺序保证特性。source实例将会根据水印的传播协议聚合所有分区的水印。2。2ApacheKafkaSink连接器
添加依赖:dependencygroupIdorg。apache。flinkgroupIdflinkconnectorkafka2。12artifactIdversion1。7。1versiondependency
下面的例子展示了如何创建一个Kafkasinkvalstream:DataStream〔String〕。。。valmyProducernewFlinkKafkaProducer〔String〕(localhost:9092,brokerlisttopic,targettopicnewSimpleStringSchema)serializationschemastream。addSink(myProducer)2。3KakfaSink的atleastonce保证
Flink的Kafkasink提供了基于配置的一致性保证。Kafkasink使用下面的条件提供了至少处理一次保证:Flink检查点机制开启,所有的数据源都是可重置的。当写入失败时,sink连接器将会抛出异常,使得应用程序挂掉然后重启。这是默认行为。应用程序内部的Kafka客户端还可以配置为重试写入,只要提前声明当写入失败时,重试几次这样的属性(retriesproperty)。sink连接器在完成它的检查点之前会等待Kafka发送已经将数据写入的通知。2。4KafkaSink的恰好处理一次语义保证
Kafka0。11版本引入了事务写特性。由于这个新特性,FlinkKafkasink可以为输出结果提供恰好处理一次语义的一致性保证,只要经过合适的配置就行。Flink程序必须开启检查点机制,并从可重置的数据源进行消费。FlinkKafkaProducer还提供了包含Semantic参数的构造器来控制sink提供的一致性保证。可能的取值如下:Semantic。NONE,不提供任何一致性保证。数据可能丢失或者被重写多次。Semantic。ATLEASTONCE,保证无数据丢失,但可能被处理多次。这个是默认设置。Semantic。EXACTLYONCE,基于Kafka的事务性写入特性实现,保证每条数据恰好处理一次。2。5文件系统source连接器
ApacheFlink针对文件系统实现了一个可重置的source连接器,将文件看作流来读取数据。如下面的例子所示:vallineReadernewTextInputFormat(null)vallineStream:DataStream〔String〕env。readFile〔String〕(lineReader,TheFileInputFormathdfs:pathtomydata,ThepathtoreadFileProcessingMode。PROCESSCONTINUOUSLY,Theprocessingmode30000L)Themonitoringintervalinms
StreamExecutionEnvironment。readFile()接收如下参数:FileInputFormat参数,负责读取文件中的内容。文件路径。如果文件路径指向单个文件,那么将会读取这个文件。如果路径指向一个文件夹,FileInputFormat将会扫描文件夹中所有的文件。PROCESSCONTINUOUSLY将会周期性的扫描文件,以便扫描到文件新的改变。30000L表示多久扫描一次监听的文件。
FileInputFormat是一个特定的InputFormat,用来从文件系统中读取文件。FileInputFormat分两步读取文件。首先扫描文件系统的路径,然后为所有匹配到的文件创建所谓的inputsplits。一个inputsplit将会定义文件上的一个范围,一般通过读取的开始偏移量和读取长度来定义。在将一个大的文件分割成一堆小的splits以后,这些splits可以分发到不同的读任务,这样就可以并行的读取文件了。FileInputFormat的第二步会接收一个inputsplit,读取被split定义的文件范围,然后返回对应的数据。
DataStream应用中使用的FileInputFormat需要实现CheckpointableInputFormat接口。这个接口定义了方法来做检查点和重置文件片段的当前的读取位置。
在Flink1。7中,Flink提供了一些类,这些类继承了FileInputFormat,并实现了CheckpointableInputFormat接口。TextInputFormat一行一行的读取文件,而CsvInputFormat使用逗号分隔符来读取文件。2。6文件系统sink连接器
在将流处理应用配置成exactlyonce检查点机制,以及配置成所有源数据都能在故障的情况下可以重置,Flink的StreamingFileSink提供了端到端的恰好处理一次语义保证。下面的例子展示了StreamingFileSink的使用方式。valinput:DataStream〔String〕valsink:StreamingFileSink〔String〕StreamingFileSink。forRowFormat(newPath(basepath),newSimpleStringEncoder〔String〕(UTF8))。build()input。addSink(sink)
当StreamingFileSink接到一条数据,这条数据将被分配到一个桶(bucket)中。一个桶是我们配置的basepath的子目录。
Flink使用BucketAssigner来分配桶。BucketAssigner是一个公共的接口,为每一条数据返回一个BucketId,BucketId决定了数据被分配到哪个子目录。如果没有指定BucketAssigner,Flink将使用DateTimeBucketAssigner来将每条数据分配到每个一个小时所产生的桶中去,基于数据写入的处理时间(机器时间,墙上时钟)。
StreamingFileSink提供了exactlyonce输出的保证。sink通过一个commit协议来达到恰好处理一次语义的保证。这个commit协议会将文件移动到不同的阶段,有以下状态:inprogress,pending,finished。这个协议基于Flink的检查点机制。当Flink决定rollafile时,这个文件将被关闭并移动到pending状态,通过重命名文件来实现。当下一个检查点完成时,pending文件将被移动到finished状态,同样是通过重命名来实现。
一旦任务故障,sink任务需要将处于inprogress状态的文件重置到上一次检查点的写偏移量。这个可以通过关闭当前inprogress的文件,并将文件结尾无效的部分丢弃掉来实现。3实现自定义源函数
DataStreamAPI提供了两个接口来实现source连接器:SourceFunction和RichSourceFunction可以用来定义非并行的source连接器,source跑在单任务上。ParallelSourceFunction和RichParallelSourceFunction可以用来定义跑在并行实例上的source连接器。
除了并行于非并行的区别,这两种接口完全一样。就像processfunction的rich版本一样,RichSourceFunction和RichParallelSourceFunction的子类可以overrideopen()和close()方法,也可以访问RuntimeContext,RuntimeContext提供了并行任务实例的数量,当前任务实例的索引,以及一些其他信息。
SourceFunction和ParallelSourceFunction定义了两种方法:voidrun(SourceContextctx)cancel()
run()方法用来读取或者接收数据然后将数据摄入到Flink应用中。根据接收数据的系统,数据可能是推送的也可能是拉取的。Flink仅仅在特定的线程调用run()方法一次,通常情况下会是一个无限循环来读取或者接收数据并发送数据。任务可以在某个时间点被显式的取消,或者由于流是有限流,当数据被消费完毕时,任务也会停止。
当应用被取消或者关闭时,cancel()方法会被Flink调用。为了优雅的关闭Flink应用,run()方法需要在cancel()被调用以后,立即终止执行。下面的例子显示了一个简单的源函数的例子:从0数到Long。MaxValue。classCountSourceextendsSourceFunction〔Long〕{varisRunning:Booleantrueoverridedefrun(ctx:SourceFunction。SourceContext〔Long〕){varcnt:Long1while(isRunningcntLong。MaxValue){cnt1ctx。collect(cnt)}}overridedefcancel()isRunningfalse}3。1可重置的源函数
之前我们讲过,应用程序只有使用可以重播输出数据的数据源时,才能提供令人满意的一致性保证。如果外部系统暴露了获取和重置读偏移量的API,那么source函数就可以重播源数据。这样的例子包括一些能够提供文件流的偏移量的文件系统,或者提供seek方法用来移动到文件的特定位置的文件系统。或者ApacheKafka这种可以为每一个主题的分区提供偏移量并且可以设置分区的读位置的系统。一个反例就是source连接器连接的是socket,socket将会立即丢弃已经发送过的数据。
支持重播输出的源函数需要和Flink的检查点机制集成起来,还需要在检查点被处理时,持久化当前所有的读取位置。当应用从一个保存点(savepoint)恢复或者从故障恢复时,Flink会从最近一次的检查点或者保存点中获取读偏移量。如果程序开始时并不存在状态,那么读偏移量将会被设置到一个默认值。一个可重置的源函数需要实现CheckpointedFunction接口,还需要能够存储读偏移量和相关的元数据,例如文件的路径,分区的ID。这些数据将被保存在liststate或者unionliststate中。
下面的例子将CountSource重写为可重置的数据源。
scalaversionclassResettableCountSourceextendsSourceFunction〔Long〕withCheckpointedFunction{varisRunning:Booleantruevarcnt:LongvaroffsetState:ListState〔Long〕overridedefrun(ctx:SourceFunction。SourceContext〔Long〕){while(isRunningcntLong。MaxValue){synchronizedataemissionandcheckpointsctx。getCheckpointLock。synchronized{cnt1ctx。collect(cnt)}}}overridedefcancel()isRunningfalseoverridedefsnapshotState(snapshotCtx:FunctionSnapshotContext):Unit{removepreviouscntoffsetState。clear()addcurrentcntoffsetState。add(cnt)}overridedefinitializeState(initCtx:FunctionInitializationContext):Unit{valdescnewListStateDescriptor〔Long〕(offset,classOf〔Long〕)offsetStateinitCtx。getOperatorStateStore。getListState(desc)initializecntvariablevalitoffsetState。get()cntif(nullit!it。iterator()。hasNext){1L}else{it。iterator()。next()}}}4实现自定义sink函数
DataStreamAPI中,任何运算符或者函数都可以向外部系统发送数据。DataStream不需要最终流向sink运算符。例如,我们可能实现了一个FlatMapFunction,这个函数将每一个接收到的数据通过HTTPPOST请求发送出去,而不使用Collector发送到下一个运算符。DataStreamAPI也提供了SinkFunction接口以及对应的rich版本RichSinkFunction抽象类。SinkFunction接口提供了一个方法:voidinvode(INvalue,Contextctx)
SinkFunction的Context可以访问当前处理时间,当前水位线,以及数据的时间戳。
下面的例子展示了一个简单的SinkFunction,可以将传感器读数写入到socket中去。需要注意的是,我们需要在启动Flink程序前启动一个监听相关端口的进程。否则将会抛出ConnectException异常。可以运行ncllocalhost9191命令。valreadings:DataStream〔SensorReading〕。。。writethesensorreadingstoasocketreadings。addSink(newSimpleSocketSink(localhost,9191))setparallelismto1becauseonlyonethreadcanwritetoasocket。setParallelism(1)classSimpleSocketSink(valhost:String,valport:Int)extendsRichSinkFunction〔SensorReading〕{varsocket:Socketvarwriter:PrintStreamoverridedefopen(config:Configuration):Unit{opensocketandwritersocketnewSocket(InetAddress。getByName(host),port)writernewPrintStream(socket。getOutputStream)}overridedefinvoke(value:SensorReading,ctx:SinkFunction。Context〔〕):Unit{writesensorreadingtosocketwriter。println(value。toString)writer。flush()}overridedefclose():Unit{closewriterandsocketwriter。close()socket。close()}}
之前我们讨论过,端到端的一致性保证建立在sink连接器的属性上面。为了达到端到端的恰好处理一次语义的目的,应用程序需要幂等性的sink连接器或者事务性的sink连接器。上面例子中的SinkFunction既不是幂等写入也不是事务性的写入。由于socket具有只能添加(appendonly)这样的属性,所以不可能实现幂等性的写入。又因为socket不具备内置的事务支持,所以事务性写入就只能使用Flink的WALsink特性来实现了。接下来我们将学习如何实现幂等sink连接器和事务sink连接器。4。1幂等sink连接器
对于大多数应用,SinkFunction接口足以实现一个幂等性写入的sink连接器了。需要以下两个条件:结果数据必须具有确定性的key,在这个key上面幂等性更新才能实现。例如一个计算每分钟每个传感器的平均温度值的程序,确定性的key值可以是传感器的ID和每分钟的时间戳。确定性的key值,对于在故障恢复的场景下,能够正确的覆盖结果非常的重要。外部系统支持针对每个key的更新,例如关系型数据库或者keyvalue存储。
下面的例子展示了如何实现一个针对JDBC数据库的幂等写入sink连接器,这里使用的是ApacheDerby数据库。valreadings:DataStream〔SensorReading〕。。。writethesensorreadingstoaDerbytablereadings。addSink(newDerbyUpsertSink)classDerbyUpsertSinkextendsRichSinkFunction〔SensorReading〕{varconn:ConnectionvarinsertStmt:PreparedStatementvarupdateStmt:PreparedStatementoverridedefopen(parameters:Configuration):Unit{connecttoembeddedinmemoryDerbyconnDriverManager。getConnection(jdbc:derby:memory:flinkExample,newProperties())prepareinsertandupdatestatementsinsertStmtconn。prepareStatement(INSERTINTOTemperatures(sensor,temp)VALUES(?,?))updateStmtconn。prepareStatement(UPDATETemperaturesSETtemp?WHEREsensor?)}overridedefinvoke(SensorReadingr,context:Context〔〕):Unit{setparametersforupdatestatementandexecuteitupdateStmt。setDouble(1,r。temperature)updateStmt。setString(2,r。id)updateStmt。execute()executeinsertstatementifupdatestatementdidnotupdateanyrowif(updateStmt。getUpdateCount0){setparametersforinsertstatementinsertStmt。setString(1,r。id)insertStmt。setDouble(2,r。temperature)executeinsertstatementinsertStmt。execute()}}overridedefclose():Unit{insertStmt。close()updateStmt。close()conn。close()}}
由于ApacheDerby并没有提供内置的UPSERT方法,所以这个sink连接器实现了UPSERT写。具体实现方法是首先去尝试更新一行数据,如果这行数据不存在,则插入新的一行数据。4。2事务性sink连接器
事务写入sink连接器需要和Flink的检查点机制集成,因为只有在检查点成功完成以后,事务写入sink连接器才会向外部系统commit数据。
为了简化事务性sink的实现,Flink提供了两个模版用来实现自定义sink运算符。这两个模版都实现了CheckpointListener接口。CheckpointListener接口将会从作业管理器接收到检查点完成的通知。GenericWriteAheadSink模版会收集检查点之前的所有的数据,并将数据存储到sink任务的运算符状态中。状态保存到了检查点中,并在任务故障的情况下恢复。当任务接收到检查点完成的通知时,任务会将所有的数据写入到外部系统中。TwoPhaseCommitSinkFunction模版利用了外部系统的事务特性。对于每一个检查点,任务首先开始一个新的事务,并将接下来所有的数据都写到外部系统的当前事务上下文中去。当任务接收到检查点完成的通知时,sink连接器将会commit这个事务。
GENERICWRITEAHEADSINK
GenericWriteAheadSink使得sink运算符可以很方便的实现。这个运算符和Flink的检查点机制集成使用,目标是将每一条数据恰好一次写入到外部系统中去。需要注意的是,在发生故障的情况下,writeaheadlogsink可能会不止一次的发送相同的数据。所以GenericWriteAheadSink无法提供完美无缺的恰好处理一次语义的一致性保证,而是仅能提供atleastonce这样的保证。我们接下来详细的讨论这些场景。
GenericWriteAheadSink的原理是将接收到的所有数据都追加到有检查点分割好的预写式日志中去。每当sink运算符碰到检查点屏障,运算符将会开辟一个新的section,并将接下来的所有数据都追加到新的section中去。WAL(预写式日志)将会保存到运算符状态中。由于log能被恢复,所有不会有数据丢失。
当GenericWriteAheadSink接收到检查点完成的通知时,将会发送对应检查点的WAL中存储的所有数据。当所有数据发送成功,对应的检查点必须在内部提交。
检查点的提交分两步。第一步,sink持久化检查点被提交的信息。第二步,删除WAL中所有的数据。我们不能将commit信息保存在Flink应用程序状态中,因为状态不是持久化的,会在故障恢复时重置状态。相反,GenericWriteAheadSink依赖于可插拔的组件在一个外部持久化存储中存储和查找提交信息。这个组件就是CheckpointCommitter。
继承GenericWriteAheadSink的运算符需要提供三个构造器函数。CheckpointCommitterTypeSerializer,用来序列化输入数据。一个jobID,传给CheckpointCommitter,当应用重启时可以识别commit信息。
还有,writeahead运算符需要实现一个单独的方法:booleansendValues(IterableINvalues,longchkpntId,longtimestamp)
当检查点完成时,GenericWriteAheadSink调用sendValues()方法来将数据写入到外部存储系统中。这个方法接收一个检查点对应的所有数据的迭代器,检查点的ID,检查点被处理时的时间戳。当数据写入成功时,方法必须返回true,写入失败返回false。
下面的例子展示了如何实现一个写入到标准输出的writeaheadsink。它使用了FileCheckpointCommitter。valreadings:DataStream〔SensorReading〕。。。writethesensorreadingstothestandardoutviaawriteaheadlogreadings。transform(WriteAheadSink,newSocketWriteAheadSink)classStdOutWriteAheadSinkextendsGenericWriteAheadSink〔SensorReading〕(CheckpointCommitterthatcommitscheckpointstothelocalfilesystemnewFileCheckpointCommitter(System。getProperty(java。io。tmpdir)),SerializerforrecordscreateTypeInformation〔SensorReading〕。createSerializer(newExecutionConfig),RandomJobIDusedbytheCheckpointCommitterUUID。randomUUID。toString){overridedefsendValues(readings:Iterable〔SensorReading〕,checkpointId:Long,timestamp:Long):Boolean{for(rreadings。asScala){writerecordtostandardoutprintln(r)}true}}
之前我们讲过,GenericWriteAheadSink无法提供完美的exactlyonce保证。有两个故障状况会导致数据可能被发送不止一次。当任务执行sendValues()方法时,程序挂掉了。如果外部系统无法原子性的写入所有数据(要么都写入要么都不写),一些数据可能会写入,而另一些数据并没有被写入。由于checkpoint还没有commit,所以在任务恢复的过程中一些数据可能会被再次写入。所有数据都写入成功了,sendValues()方法也返回true了;但在CheckpointCommitter方法被调用之前程序挂了,或者CheckpointCommitter在commit检查点时失败了。那么在恢复的过程中,所有未被提交的检查点将会被重新写入。
TWOPHASECOMMITSINKFUNCTION
Flink提供了TwoPhaseCommitSinkFunction接口来简化sink函数的实现。这个接口保证了端到端的exactlyonce语义。2PCsink函数是否提供这样的一致性保证取决于我们的实现细节。我们需要讨论一个问题:2PC协议是否开销太大?
通常来讲,为了保证分布式系统的一致性,2PC是一个非常昂贵的方法。尽管如此,在Flink的语境下,2PC协议针对每一个检查点只运行一次。TwoPhaseCommitSinkFunction和WALsink很相似,不同点在于前者不会将数据收集到state中,而是会写入到外部系统事务的上下文中。
TwoPhaseCommitSinkFunction实现了以下协议。在sink任务发送出第一条数据之前,任务将在外部系统中开始一个事务,所有接下来的数据将被写入这个事务的上下文中。当作业管理器初始化检查点并将检查点屏障插入到流中的时候,2PC协议的投票阶段开始。当运算符接收到检查点屏障,运算符将保存它的状态,当保存完成时,运算符将发送一个acknowledgement信息给作业管理器。当sink任务接收到检查点屏障时,运算符将会持久化它的状态,并准备提交当前的事务,以及acknowledgeJobManager中的检查点。发送给作业管理器的acknowledgement信息类似于2PC协议中的commit投票。sink任务还不能提交事务,因为它还没有保证所有的任务都已经完成了它们的检查点操作。sink任务也会为下一个检查点屏障之前的所有数据开始一个新的事务。
当作业管理器成功接收到所有任务实例发出的检查点操作成功的通知时,作业管理器将会把检查点完成的通知发送给所有感兴趣的任务。这里的通知对应于2PC协议的提交命令。当sink任务接收到通知时,它将commit所有处于开启状态的事务。一旦sink任务acknowledge了检查点操作,它必须能够commit对应的事务,即使任务发生故障。如果commit失败,数据将会丢失。
让我们总结一下外部系统需要满足什么样的要求:外部系统必须提供事务支持,或者sink的实现能在外部系统上模拟事务功能。在检查点操作期间,事务必须处于open状态,并接收这段时间数据的持续写入。事务必须等到检查点操作完成的通知到来才可以提交。在恢复周期中,可能需要一段时间等待。如果sink系统关闭了事务(例如超时了),那么未被commit的数据将会丢失。sink必须在进程挂掉后能够恢复事务。一些sink系统会提供事务ID,用来commit或者abort一个开始的事务。commit一个事务必须是一个幂等性操作。sink系统或者外部系统能够观察到事务已经被提交,或者重复提交并没有副作用。
下面的例子可能会让上面的一些概念好理解一些。classTransactionalFileSink(valtargetPath:String,valtempPath:String)extendsTwoPhaseCommitSinkFunction〔(String,Double),String,Void〕(createTypeInformation〔String〕。createSerializer(newExecutionConfig),createTypeInformation〔Void〕。createSerializer(newExecutionConfig)){vartransactionWriter:BufferedWriterCreatesatemporaryfileforatransactionintowhichtherecordsarewritten。overridedefbeginTransaction():String{pathoftransactionfileisbuiltfromcurrenttimeandtaskindexvaltimeNowLocalDateTime。now(ZoneId。of(UTC))。format(DateTimeFormatter。ISOLOCALDATETIME)valtaskIdxthis。getRuntimeContext。getIndexOfThisSubtaskvaltransactionFilestimeNowtaskIdxcreatetransactionfileandwritervaltFilePathPaths。get(stempPathtransactionFile)Files。createFile(tFilePath)this。transactionWriterFiles。newBufferedWriter(tFilePath)println(sCreatingTransactionFile:tFilePath)nameoftransactionfileisreturnedtolateridentifythetransactiontransactionFile}Writerecordintothecurrenttransactionfile。overridedefinvoke(transaction:String,value:(String,Double),context:Context〔〕):Unit{transactionWriter。write(value。toString)transactionWriter。write()}Flushandclosethecurrenttransactionfile。overridedefpreCommit(transaction:String):Unit{transactionWriter。flush()transactionWriter。close()}Commitatransactionbymovingtheprecommittedtransactionfiletothetargetdirectory。overridedefcommit(transaction:String):Unit{valtFilePathPaths。get(stempPathtransaction)checkifthefileexiststoensurethatthecommitisidempotentif(Files。exists(tFilePath)){valcFilePathPaths。get(stargetPathtransaction)Files。move(tFilePath,cFilePath)}}Abortsatransactionbydeletingthetransactionfile。overridedefabort(transaction:String):Unit{valtFilePathPaths。get(stempPathtransaction)if(Files。exists(tFilePath)){Files。delete(tFilePath)}}}
TwoPhaseCommitSinkFunction〔IN,TXN,CONTEXT〕包含如下三个范型参数:IN表示输入数据的类型。TXN定义了一个事务的标识符,可以用来识别和恢复事务。CONTEXT定义了自定义的上下文。
TwoPhaseCommitSinkFunction的构造器需要两个TypeSerializer。一个是TXN的类型,另一个是CONTEXT的类型。
最后,TwoPhaseCommitSinkFunction定义了五个需要实现的方法:beginTransaction():TXN开始一个事务,并返回事务的标识符。invoke(txn:TXN,value:IN,context:Context〔〕):Unit将值写入到当前事务中。preCommit(txn:TXN):Unit预提交一个事务。一个预提交的事务不会接收新的写入。commit(txn:TXN):Unit提交一个事务。这个操作必须是幂等的。abort(txn:TXN):Unit终止一个事务。
数据决定体验软件定义汽车,化身科技企业的上汽竟如此硬核近日,国家多个相关部门联合印发了《国家车联网产业标准体系建设指南》。该指南的发布,旨在推进先进技术在智能交通领域的应用,并促进自动驾驶和车路协同技术的应用和发展。可以预见,在国……
借力科创板,圆上市梦想中国资本策划研究院、著名融资实战策划专家朱耿洲博士专访6月27日,随着科创板第一股华兴源创启动申购,标志着科创板正式开板。截至7月4日,上交所已受理141家公司提交的发行……
荣耀Earbuds2SE上手黑白分明!降噪音质续航全在线6月16日,伴随着首款搭载高通芯片的荣耀50发布,全新TWS主动降噪耳机荣耀Earbuds2SE也正式亮相,并将于6月25日开启首销。作为入局TWS市场较早的品牌,荣耀已推出多……
试驾在野外玩泥巴,雪佛兰开拓者轻混版能行吗?在被城市这个钢筋牢笼困了太久之后,很多人都有了走出城市,前往大自然的想法。不得不说,跟亲朋好友在假期出游是最好的放松方式。可惜的是,想要追求这样的生活方式并非一件易事。因为我们……
为什么兰花养护不能暴晒?兰花不能长期在太阳下暴晒,一般在冬季和早春,宜放在阳光充足的地方养护;在春季3~4月,遮阳50~60左右。夏秋季,宜遮阳80~90左右。兰花,怕强烈阳光直射,一般应在晴天……
关键时刻的见证者70迈智能记录仪Pro体验爱车买来一年多了,觉得很对不起她,既没有爬过高山和大海,也没有追求过诗和远方,就是平时上班通勤,真真是浪费了她那17寸的大脚。开车嘛,免不了的都会买一些周边产品,像是脚垫……
疑似领克09售价及配置曝光!年内上市,29。98万买入门款超近日,网友曝出一张疑似领克09售价表,从最置顶的可以看到印有LynkCo09车型产品配置的字样,可以看得出该价格表来自于国内某一家4S店中。并且标配2。0TT5发动机爱信8AT……
RedmiK305GOPPOReno3Pro,一千多差值什么依旧是这个话题:一千四的差价,值什么?上一篇我们说了OPPOReno3Pro相比于RedmiK305G的优势,这一次我们来说说RedmiK305G相比于OPPOReno3Pro……
每次10分钟跟我学Python(第十一次课)大家好!我是幻化意识流。今天继续跟我学Python。看过我发布课的同学一定能感觉到,我的课程里没有太多的理论,上来就是实践。是的,经过我以往学习的经历、经验,……
功能手机没有微信抖音?不,AGM三防4G手机做到了引言功能机,相信很多人都不陌生,功能机给我们的生活曾经也带来了非常多的便利。像我到如今为止,用智能手机快十年了,但我还会考虑入手一部功能机作为备用机。为什么?因为功能机有……
新车出厂LucidAir有望近期开启交付《车市零距离》是车市物语旗下自媒体,每天提供最新鲜的汽车零部件和智能网联相关资讯。日前,美国电动车企LucidMotors发布了一张载有六辆LucidAir电动车的卡车照……
翱翔海南三亚(一)畅游蜈支洲岛翱翔海南三亚畅游蜈支洲岛及时起飞海南三亚,躲过北京黄沙。当飞机起飞的那一刻,北京的天空已经是黄沙漫天,随着高度的提升,天空逐渐清晰,慢慢呈现出蓝天白云,随之而来的是身心愉……
注意,这些原因可能导致商标注册失败相信大家都希望能早日拿到商标注册证书,那么一些容易导致商标注册失败的情况就要尽量避免,进而提升商标注册成功率。哪些原因容易导致商标注册失败呢?(1)申请商标违规……
客厅阳台到底封不封,看完就不纠结了买了新房准备装修的伙伴往往会纠结一件事,那就是客厅阳台到底封不封,要是封上了都有哪些好处?不封又有何影响?那么客厅阳台究竟是封好还是不封好呢?现在绝大多数商品房的阳台都是……
ins风巧来搭,装网红似的家现如今的网红在互联网上已经风起云涌,拥有众多的粉丝来追捧,甚至就连他们拍的那些生活照片看了也会让人羡慕不已,这些网红不仅有很高的颜值而且在家居背景的布置上还非常有情调。其……
腾讯这座大山正在慢慢的土崩瓦解相当长的一段时间内,腾讯是中国互联网企业中一个不可超越的高度,坐拥QQ和微信两个庞大的用户群,可以轻松的获得低成本的流量,是腾讯成为中国互联网的三大山之一,巨大的低成本的流量池……
明星夫妻店将上市,吴奇隆的10亿聘礼总算要来了爱奇艺不仅是稻草熊影业的第二大股东,也是后者的第一大客户,两者深度捆绑下,稻草熊影业隐患重重。本文由无冕财经(wumiancaijing)原创首发作者:海棠葉……
男版小红书在哪?小红书是中国最近几年新崛起的明星互联网企业,相信已经成为大部分年轻人手机必装的App之一,小红书以分享购物社区起家,截至2019年1月,小红书App用户数已经超过2亿,估值超过……
达内教育Q2财报营收大增75。5,毛利增长2119月24日,达内教育发布了2021年第二季度财报。财报显示,2021年第二季度,达内教育总净收入为5。823亿元,同比增长75。5;毛利润从2020年同期的9467万元增……
快速生成小程序,怎样做才能使小程序店铺更吸睛?小程序在搭建时,商家都想搭建一个体验感较好的小程序店铺,但是对于不懂小程序的商家其实还是很难的,可以说是一头雾水。那么,针对这种情况,在搭建小程序时,我们应该注意哪些问题呢?商……
耗时两年,倾注百亿资金,雪松控股为何要放手信托?在接盘中江信托后,雪松控股解决及兑付历史逾期项目总额超100亿,如今风险出清接近尾声,雪松为何作出这样的决定?本文由无冕财经(wumiancaijing)原创首发作……
Redmi在天门山上演灯光秀,999级台阶全部点亮,网友太好最近红米Note11系列可以说是出尽了风头,尤其是下放了只有旗舰机才有的120W快充,仅15分钟即可充满电,体验相当梦幻。红米Note11系列首销1个小时就卖掉了50万台手机,……
无线远程遥控空开品牌及生产厂家有哪些?无线远程遥控空开品牌及生产厂家有哪些?卓文科技是一家专业生产无线远程控制空开的生产厂家,主要产品有无线远程遥控空开、无线远程控制电源等系列产品,产品质量稳定可靠,售后完善……
透过产业带,揭开京东的底色借助三驾马车,京东大商超在产业带布局钟逐渐步入深水区。撰文蓝洞商业郭朝飞在湖北神农架,孔祥彬将自己裹得严严实实,戴着采蜜专用的防护帽,一圈面纱直接武装到脖子以下,上……