问题描述 在开发过程中使用spark去读取hive分区表的过程中(或者使用hiveonspark、nodepad开发工具),部分开发人员未注意添加分区属性过滤导致在执行过程中加载了全量数据,引起任务执行效率低、磁盘IO大量损耗等问题。 解决办法 1、自定义规则CheckPartitionTable类,实现Rule,通过以下方式创建SparkSession。typeExtensionsBuilderSparkSessionExtensionsUnit在Optimizer中追加CheckPartitionTable规则执行器valextBuilder:ExtensionsBuilder{ee。injectOptimizerRule(CheckPartitionTable)}valconfnewSparkConf()。setMaster(local〔〕)。set(spark。table。check。partition,true)。set(spark。table。check。partition。num,30)。setAppName(SQL)valsparkSparkSession。builder()。config(conf)。withExtensions(extBuilder)。enableHiveSupport()。getOrCreate() 2、自定义规则CheckPartitionTable类,实现Rule,将规则类追加至Optimizer。batches:Seq〔Batch〕中,如下。 规则内容实现 1、CheckPartitionTable规则执行类,需要通过引入sparkSession从而获取到引入conf;需要继承Rule〔LogicalPlan〕;caseclassCheckPartitionTable(sparkSession:SparkSession)extendsRule〔LogicalPlan〕withPredicateHelper{是否检查分区,配置valcheckpartitionspark。table。check。partition检查分区,限制分区读取数量,配置valchecknumpartitionspark。table。check。partition。numvalconfsparkSession。conf 2、通过splitPredicates方法,分离分区谓词,得到分区谓词表达式。在sql解析过程中将谓词解析为TreeNode,此处采用递归的方式获取分区谓词。defsplitPredicates(condition:Expression,partitionSet:AttributeSet):Seq〔Expression〕{conditionmatch{匹配and表达式,并筛选and表达式中的分区表达式caseAnd(cond1,cond2)splitPredicates(cond1,partitionSet)splitPredicates(cond2,partitionSet)匹配or表达式,并筛选or表达式中的分区表达式caseOr(cond1,cond2)valleftSeqsplitPredicates(cond1,partitionSet)valrightSeqsplitPredicates(cond2,partitionSet)if(leftSeq。nonEmptyrightSeq。nonEmpty)Or(leftSeq。reduceLeft(And),rightSeq。reduceLeft(And))::NilelseNilcaseotherif(other。references。subsetOf(partitionSet))other::NilelseNil}} 3、判断是否是分区表,且是否添加分区字段。defisPartitionTable(filter:Filter,numPartition:Int):Boolean{varbooleanfalsefilter。childmatch{匹配logicalRelationcaselogicalRelationLogicalRelation(fsRelationHadoopFsRelation(location:CatalogFileIndex,partitionSchema:StructType,,,,),,catalogTable)valtablecatalogTable。get判断读取表是否存在分区columnif(table。partitionColumnNames。nonEmpty){valsparkSessionfsRelation。sparkSession获取表的分区column的AttributevalpartitionColumnslogicalRelation。resolve(partitionSchema,sparkSession。sessionState。analyzer。resolver)log。info(partitionColumns:partitionColumns)valpartitionSetAttributeSet(partitionColumns)获取分区Filter表达式valpartitionKeyFilterssplitPredicates(filter。condition,partitionSet)varpartitionsize1Lif(partitionKeyFilters。nonEmpty){log。info(partitionKeyFiltersExpression:partitionKeyFilters)在hdfs上获取分区pathvalprunedFileIndexlocation。filterPartitions(partitionKeyFilters)valpartitionsprunedFileIndex。partitionSpec()。partitionspartitionsizepartitions。sizelog。info(partitions:partitions)}booleanpartitionKeyFilters。isEmptypartitionsizenumPartition}匹配CatalogRelationcasecatalogRelation:CatalogRelationvalpartitionSetAttributeSet(catalogRelation。partitionCols)valpartitionKeyFilterssplitPredicates(filter。condition,partitionSet)判断是否存在分区属性booleanpartitionKeyFilters。forall(。references。subsetOf(partitionSet))caselog。warn(未获取到表信息)}boolean} 4、实现Rule的apply方法defapply(plan:LogicalPlan):LogicalPlanif(!conf。get(checkpartition,true)。toBoolean){log。warn(sIsnotenabledcheckpartition)plan}elseplantransform{casejFilter(condition:Expression,child:LogicalPlan)ifisPartitionTable(j,conf。get(checknumpartition,s{Int。MaxValue})。toInt)thrownewException(s{condition。sql}{child。treeString}Nopartitioninformationisaddedtothepartitiontable。stripMargin)} 大数据和云计算的关系 大数据JUC面试题 大数据之Kafka集群部署 大数据logstsh架构 大数据技术kafka的零拷贝