环境flink1。13。6hudi0。11。0mergeonread表 代码示例tEnv。executeSql(CREATETABLEtbpersonhudi(idBIGINT,ageINT,nameSTRING,createtimeTIMESTAMP(3),timestampTIMESTAMP(3),PRIMARYKEY(id)NOTENFORCED)WITH(connectorhudi,table。typeMERGEONREAD,pathfile:D:datahadoop3。2。1warehousetbpersonhudi,read。startcommit20220722103000,read。endcommit20220722104000,read。task1,read。streaming。enabledtrue,read。streaming。checkinterval30));TabletabletEnv。sqlQuery(selectfromtbpersonhudi);tEnv。toChangelogStream(table)。print()。setParallelism(1);env。execute(test); 流程分析 hudi源入口(HoodieTableSource) HoodieTableSource实现ScanTableSource,SupportsPartitionPushDown,SupportsProjectionPushDown,SupportsLimitPushDown,SupportsFilterPushDown接口,后4个接口主要是支持对查询计划的优化。ScanTableSource则提供了读取hudi表的具体实现,核心方法为org。apache。hudi。table。HoodieTableSourcegetScanRuntimeProvider:if(conf。getBoolean(FlinkOptions。READASSTREAMING)){开启了流式读(read。streaming。enabled)StreamReadMonitoringFunctionmonitoringFunctionnewStreamReadMonitoringFunction(conf,FilePathUtils。toFlinkPath(path),maxCompactionMemoryInBytes,getRequiredPartitionPaths());InputFormatRowData,?inputFormatgetInputFormat(true);OneInputStreamOperatorFactoryMergeOnReadInputSplit,RowDatafactoryStreamReadOperator。factory((MergeOnReadInputFormat)inputFormat);SingleOutputStreamOperatorRowDatasourceexecEnv。addSource(monitoringFunction,getSourceOperatorName(splitmonitor))。setParallelism(1)。transform(splitreader,typeInfo,factory)。setParallelism(conf。getInteger(FlinkOptions。READTASKS));returnnewDataStreamSource(source);} 上面代码在流环境中创建了一个SourceFunction(StreamReadMonitoringFunction)和一个自定义的转换(StreamReadOperator)StreamReadMonitoringFunction:监控hudi表元数据目录(。hoodie)获取需要被读取的文件分片(MergeOnReadInputSplit,一个baseparquet文件和一组log文件),然后把分片递给下游的转换算子StreamReadOperator进行文件读取;固定一个线程去监控,名称为splitmonitorxxxxx。StreamReadOperator:将按timeline升序收到的MergeOnReadInputSplit一个一个地读取分片数据;算子名称为splitreaderxxxxx,可以通过设置read。tasks进行设置并行度 定时监控元数据获得增量分片(StreamReadMonitoringFunction) StreamReadMonitoringFunction负责定时(read。streaming。checkinterval)扫描hudi表的元数据目录。hoodie,如果发现在activetimeline上有新增的instant〔actioncommit,deltacommit,compaction,replaceactivecompleted〕,从这些instant信息中可以知道数据变更写到了哪些文件(parquet,log),然后构建成分片对象(MergeOnReadInputSplit)。核心属性:issuedInstant,这个是增量查询的依据,记录着当前已经消费的数据的最新instant,类似于kafka的offset,但是hudi是基于timeline。该值是有状态的,维护在ListState中,所以flinkjob重启依然可以做到增量。核心方法:StreamReadMonitoringFunctionmonitorDirAndForwardSplits,很简单,就做了两件事,调用IncrementalInputSplitsinputSplits获取到增量分片(有序),然后传递给下游的算子(StreamReadOperator)publicvoidmonitorDirAndForwardSplits(SourceContextMergeOnReadInputSplitcontext){HoodieTableMetaClientmetaClientgetOrCreateMetaClient();IncrementalInputSplits。ResultresultincrementalInputSplits。inputSplits(metaClient,this。hadoopConf,this。issuedInstant);for(MergeOnReadInputSplitsplit:result。getInputSplits()){context。collect(split);}} 获取增量分片(IncrementalInputSplits) 主要逻辑在方法IncrementalInputSplitsinputSplits(metaClient,hadoopConf,issuedInstant),需要先了解hudi关于timeline和instant的一些基本概念,详细的流程如下图所示: 如果flinkjob首次运行指定了read。startcommit和read。endcommit,但是该范围是比较久以前,instant已经被归档,那么流作业将永远不能消费到数据 https:github。comapachehudiissues6167 读取数据文件(StreamReadOperator) StreamReadOperator算子接收分片后会缓存在队列Queuesplits,然后不停从队列中poll分片放到线程池中执行 privatevoidprocessSplits()throwsIOException{format。open(split);consumeAsMiniBatch(split);enqueueProcessSplits();} 主要有三个步骤:从队列中peek分片,调用MergeOnReadInputFormat。open构建迭代器,迭代器是用来进行文件的数据读取,一个迭代器对应一个分片(多个物理文件,baselog),对应不同读取的场景,有几种迭代器:BaseFileOnlyFilteringIterator,BaseFileOnlyIterator,LogFileOnlyIterator,MergeIterator,SkipMergeIterator微批量消费,每批只读2048记录,将把记录传递给下游的算子消费同时标记消费的总数,如果该分片读到了尾,则将该分片从队列中弹出,并关闭MergeOnReadInputFormat继续处理队列中的分片,回到步骤1,如果上一次的分片没消费完,那么本次循环将继续消费,只不过是由另一个线程处理。