揭秘数据湖长文详解Hudi从内核到实战(一)
Hudi入门与构建
Hudi介绍
Hudi将带来流式处理大数据,提供新数据集,同时比传统批处理效率高一个数据量级。
Hudi主要特性:快速upsert,可插入索引;以原子方式操作数据并具有回滚功能;写入器之间的快照隔离;savepoint用户数据恢复的保存点;管理文件大小,使用统计数据布局;数据行的异步压缩和柱状数据;时间轴数据跟踪血统。
Hudi快速构建安装环境准备Hadoop集群HiveSpark2。4。5(2。x)Maven安装
把apachemaven3。6。1bin。tar。gz上传到linux的optsoftware目录下。
解压apachemaven3。6。1bin。tar。gz到optmodule目录下面。〔atguiguhadoop102software〕tarzxvfapachemaven3。6。1bin。tar。gzCoptmodule
修改apachemaven3。6。1的名称为maven。〔atguiguhadoop102module〕mvapachemaven3。6。1maven
添加环境变量到etcprofile中。〔atguiguhadoop102module〕sudovimetcprofileMAVENHOMEexportMAVENHOMEoptmodulemavenexportPATHPATH:MAVENHOMEbin
测试安装结果。〔atguiguhadoop102module〕sourceetcprofile〔atguiguhadoop102module〕mvnv
修改setting。xml,指定为阿里云。〔atguiguhadoop102maven〕cdconf〔atguiguhadoop102maven〕vimsettings。xml!添加阿里云镜像mirroridnexusaliyunidmirrorOfcentralmirrorOfnameNexusaliyunnameurlhttp:maven。aliyun。comnexuscontentgroupspublicurlmirror
Git安装〔atguiguhadoop102software〕sudoyuminstallgit〔atguiguhadoop102software〕gitversion构建Hudi〔atguiguhadoop102software〕cdoptmodule〔atguiguhadoop102module〕gitclonehttps:github。comapachehudi。gitcdhudi〔atguiguhadoop102hudi〕vimpom。xmlrepositoryidnexusaliyunidnamenexusaliyunnameurlhttp:maven。aliyun。comnexuscontentgroupspublicurlreleasesenabledtrueenabledreleasessnapshotsenabledfalseenabledsnapshotsrepository〔atguiguhadoop102hudi〕mvncleanpackageDskipTestsDskipITs
通过Sparkshell快速开始Sparkshell启动
sparkshell启动,需要指定sparkavro模块,因为默认环境里没有,sparkavro模块版本好需要和spark版本对应,这里都是2。4。5。〔roothadoop102hudi〕sparkshellpackagesorg。apache。spark:sparkavro2。11:2。4。5confspark。serializerorg。apache。spark。serializer。KryoSerializerjarsoptmodulehudipackaginghudisparkbundletargethudisparkbundle2。110。6。0SNAPSHOT。jar设置表名
设置表名,基本路径和数据生成器。scalaimportorg。apache。hudi。QuickstartUtils。importorg。apache。hudi。QuickstartUtils。scalaimportscala。collection。JavaConversions。importscala。collection。JavaConversions。scalaimportorg。apache。spark。sql。SaveMode。importorg。apache。spark。sql。SaveMode。scalaimportorg。apache。hudi。DataSourceReadOptions。importorg。apache。hudi。DataSourceReadOptions。scalaimportorg。apache。hudi。DataSourceWriteOptions。importorg。apache。hudi。DataSourceWriteOptions。scalaimportorg。apache。hudi。config。HoodieWriteConfig。importorg。apache。hudi。config。HoodieWriteConfig。scalavaltableNamehuditripscowtableName:StringhuditripscowscalavalbasePathfile:tmphuditripscowbasePath:Stringfile:tmphuditripscowscalavaldataGennewDataGeneratordataGen:org。apache。hudi。QuickstartUtils。DataGeneratororg。apache。hudi。QuickstartUtilsDataGenerator5cdd5ff9插入数据
新增数据,生成一些数据,将其加载到DataFrame中,然后将DataFrame写入Hudi表。scalavalinsertsconvertToStringList(dataGen。generateInserts(10))scalavaldfspark。read。json(spark。sparkContext。parallelize(inserts,2))scaladf。write。format(hudi)。options(getQuickstartWriteConfigs)。option(PRECOMBINEFIELDOPTKEY,ts)。option(RECORDKEYFIELDOPTKEY,uuid)。option(PARTITIONPATHFIELDOPTKEY,partitionpath)。option(TABLENAME,tableName)。mode(Overwrite)。save(basePath)
Mode(overwrite)将覆盖重新创建表(如果已存在)。可以检查tmphuditrpscow路径下是否有数据生成。〔roothadoop102〕cdtmphuditripscow〔roothadoop102huditripscow〕lsamericasasia查询数据scalavaltripsSnapshotDFspark。read。format(hudi)。load(basePath)scalatripsSnapshotDF。createOrReplaceTempView(huditripssnapshot)scalaspark。sql(selectfare,beginlon,beginlat,tsfromhuditripssnapshotwherefare20。0)。show()farebeginlonbeginlatts64。276962958840160。49234796529120240。57318354079306340。033。922164839486430。96945864178483920。18564880850682720。027。794786885825960。62732122024896610。114883931570882610。093。560181152366180。142850512594661970。216241503676011360。043。49238112190140。87794022954277520。61000705621365870。066。620843664502460。038441044444459280。07505887600430350。034。1582847163828450。461578584504654830。47269058795696530。041。062909290463680。81928686877142240。6510585056607420。0scalaspark。sql(selecthoodiecommittime,hoodierecordkey,hoodiepartitionpath,rider,driver,farefromhuditripssnapshot)。show()hoodiecommittimehoodierecordkeyhoodiepartitionpathriderdriverfare202007011051446007a624d9424e0。。。americasuniteds。。。rider213driver21364。2769629588401620200701105144db7c63613f0548d。。。americasuniteds。。。rider213driver21333。9221648394864320200701105144dfd0e7d9f10c468。。。americasuniteds。。。rider213driver21319。17913910664360720200701105144e36365c85b3a415。。。americasuniteds。。。rider213driver21327。7947868858259620200701105144fb92c00edea248e。。。americasuniteds。。。rider213driver21393。560181152366182020070110514498be3080a05847d。。。americasbrazils。。。rider213driver21343。4923811219014202007011051443dd6ef724196469。。。americasbrazils。。。rider213driver21366。620843664502462020070110514420f9463f1c144e6。。。americasbrazils。。。rider213driver21334。158284716382845202007011051441585ad3a11c943c。。。asiaindiachennairider213driver21317。85113525509115520200701105144d40daa90cf1a4d1。。。asiaindiachennairider213driver21341。06290929046368
由于测试数据分区是区域国家城市,所以load(basePath)。修改数据
类似于插入新数据,使用数据生成器生成新数据对历史数据进行更新。将数据加载到DataFrame中并将DataFrame写入Hudi表中。scalavalupdatesconvertToStringList(dataGen。generateUpdates(10))scalavaldfspark。read。json(spark。sparkContext。parallelize(updates,2))scaladf。write。format(hudi)。options(getQuickstartWriteConfigs)。option(PRECOMBINEFIELDOPTKEY,ts)。option(RECORDKEYFIELDOPTKEY,uuid)。option(PARTITIONPATHFIELDOPTKEY,partitionpath)。option(TABLENAME,tableName)。mode(Append)。save(basePath)增量查询
Hudi还提供了获取自给定提交时间戳以来以更改记录流的功能。这可以通过使用Hudi的增量查询并提供开始流进行更改的开始时间来实现。scalaspark。read。format(hudi)。load(basePath)。createOrReplaceTempView(huditripssnapshot)scalavalcommitsspark。sql(selectdistinct(hoodiecommittime)ascommitTimefromhuditripssnapshotorderbycommitTime)。map(kk。getString(0))。take(50)scalavalbeginTimecommits(commits。length2)beginTime:String20200701105144scalavaltripsIncrementalDFspark。read。format(hudi)。option(QUERYTYPEOPTKEY,QUERYTYPEINCREMENTALOPTVAL)。option(BEGININSTANTTIMEOPTKEY,beginTime)。load(basePath)scalatripsIncrementalDF。createOrReplaceTempView(huditripsincremental)scalaspark。sql(selecthoodiecommittime,fare,beginlon,beginlat,tsfromhuditripsincrementalwherefare20。0)。show()hoodiecommittimefarebeginlonbeginlatts2020070111054649。5276942524320560。51421849379331810。73401339012547920。02020070111054690。90538095331540。199493233229220630。182940790590163660。02020070111054698。34281928179870。33499178332483270。47773950677073030。02020070111054690。257101090082390。40069831399892220。085286503476541650。02020070111054663。725049132799290。8884936036969270。65708574434233760。02020070111054629。476613701470790。0108723128705021650。15938676071885560。0
这将提供在beginTime提交后的数据,并且fare20的数据。时间点查询
根据特定时间查询,可以将endTime指向特定时间,beginTime指向000(表示最早提交时间)。scalavalbeginTime000beginTime:String000scalavalendTimecommits(commits。length2)endTime:String20200701105144scalavaltripsPointInTimeDFspark。read。format(hudi)。option(QUERYTYPEOPTKEY,QUERYTYPEINCREMENTALOPTVAL)。option(BEGININSTANTTIMEOPTKEY,beginTime)。option(ENDINSTANTTIMEOPTKEY,endTime)。load(basePath)scalatripsPointInTimeDF。createOrReplaceTempView(huditripspointintime)scalaspark。sql(selecthoodiecommittime,fare,beginlon,beginlat,tsfromhuditripspointintimewherefare20。0)。show()hoodiecommittimefarebeginlonbeginlatts2020070110514464。276962958840160。49234796529120240。57318354079306340。02020070110514433。922164839486430。96945864178483920。18564880850682720。02020070110514427。794786885825960。62732122024896610。114883931570882610。02020070110514493。560181152366180。142850512594661970。216241503676011360。02020070110514443。49238112190140。87794022954277520。61000705621365870。02020070110514466。620843664502460。038441044444459280。07505887600430350。02020070110514434。1582847163828450。461578584504654830。47269058795696530。02020070110514441。062909290463680。81928686877142240。6510585056607420。0删除数据scalaspark。sql(selectuuid,partitionPathfromhuditripssnapshot)。count()res12:Long10scalavaldsspark。sql(selectuuid,partitionPathfromhuditripssnapshot)。limit(2)scalavaldeletesdataGen。generateDeletes(ds。collectAsList())scalavaldfspark。read。json(spark。sparkContext。parallelize(deletes,2));scaladf。write。format(hudi)。options(getQuickstartWriteConfigs)。option(OPERATIONOPTKEY,delete)。option(PRECOMBINEFIELDOPTKEY,ts)。option(RECORDKEYFIELDOPTKEY,uuid)。option(PARTITIONPATHFIELDOPTKEY,partitionpath)。option(TABLENAME,tableName)。mode(Append)。save(basePath)scalavalroAfterDeleteViewDFspark。read。format(hudi)。load(basePath)scalaroAfterDeleteViewDF。registerTempTable(huditripssnapshot)scalaspark。sql(selectuuid,partitionPathfromhuditripssnapshot)。count()res15:Long8
只有append模式,才支持删除功能。
大数据技术生态体系
大数据的切片机制有哪些
大数据之Kafka集群部署
大数据JUC面试题
大数据学习之部署Hadoop
唐Dmi内饰方面看看唐唐的内饰,无滤镜。关于充电方面:用国网的电桩从馈电开始充满需要一个多小时。关于油耗:用电就不需要发动机工作,加200块钱油可以一直用到首次保养,一车两用……
这一交通工具逐渐淘汰,却轻松走红海外,你家也有科技改变生活,随着科学技术的不断进步,人们在出行上也越来越方便,出行工具也处在时刻创新懂得道路上。纵观交通工具的发展史,可以看出工具越来越人性化,出行门槛越来越低,一种交……
支持精密单点定位的高精度定位模块介绍SKYLAB精密单点定位指的是利用全球若干地面跟踪站的GPS观测数据计算出的精密卫星轨道和卫星钟差,对单台GPS接收机所采集的相位和伪距观测值进行定位解算。通常,卫星导航系统能够提供10米……
橇装加油站是设备还是装置?橇装如果应用在油的方面其实全名叫:阻隔防爆橇装式加油装置(是装置),也就是我们看到最多的撬装式加油站。这种装置是集:加油机,阻隔防爆双层储油罐,阻隔防爆油气回收,自……
曹县为什么这样红?文新经济沸点小新最近,网友总在开心地调侃北上广曹,这个曹,就是山东菏泽的曹县。截止5月19日数据,在某短视频平台,曹县、山东菏泽曹县两个话题的总播放量超过7亿,微博……
俯拍中国传统村落程家川村四面环山环境优美村民平均寿命87岁程家川村位于陕西省咸阳市彬县香庙镇南8公里处的泾河古道,三面环水,四面环山,素有风水第一村、养生最佳地之称,是泾河川道风水最好的村落。程家川村田园恬静、人杰地灵。最早有程……
冬天最能下饭的菜,山东名吃,教你做济宁辣子鸡,多吃一碗大米饭辣子鸡是山东的名菜之一,是山东家喻户晓的一道菜。一份辣子鸡,咸鲜香辣,超级下饭。吃完了鸡肉,还可以用汤汁来拌饭,一样很美味哦!主料:小笨鸡一只或肉食鸡鸡腿1个、土豆1个、……
联合音响界爱马仕调音,海量曲库的华为AI音箱搭载双人工智能AI智能音箱的诞生颠覆了人们对音响的观念,一直以来音箱只能单纯的播放有声媒体,人工智能的加持令人和音箱之间有了交互,可以语音点播网络音频,甚至可以吩咐音箱控制很多智能家居,为家……
排面or实用?捷达VS7全都满足随着生活质量越来越高,汽车已经彻底融入到每个人的生活当中,几乎所有家庭都有汽车代步,但因为每个人对车的需求不同,在选择汽车车型上也有着不同的想法,很多用户买车的要求都是空间充足……
5家LED芯片上市公司2020年度业绩及2021年一季度业绩截止目前,华灿光电、乾照光电、聚灿光电、兆驰股份以及三安光电均已披露2020年度业绩及2021年一季度业绩报告,业绩榜整理了这些报告,与大家分享。华灿光电2020年……
偏远山区能做撬装加油站吗新能源汽车做到快速普及难吗无论城市或偏僻的山区,撬装加油站(企业加油站)都是能够做的,但有一个前提,以企业为主体进行报建,并且仅供内部车辆自用,不作对外运营。橇装加油站随着农村的道路日益完善……
餐桌首选大理石材质?看完这些再说说到大理石,在脑海中情不自禁的浮现漂亮美观,时尚有档次的印象。而且装修预算充足的人往往会选择大理石餐桌,因为在多数人的心目中:大理石是品位、品质的象征。那么,今天从……