本文从数据倾斜的危害、现象、原因等方面,由浅入深阐述Spark数据倾斜及其解决方案。 一、什么是数据倾斜 对SparkHadoop这样的分布式大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。 对于分布式系统而言,理想情况下,随着系统规模(节点数量)的增加,应用整体耗时线性下降。如果一台机器处理一批大量数据需要120分钟,当机器数量增加到3台时,理想的耗时为120340分钟。但是,想做到分布式情况下每台机器执行时间是单机时的1N,就必须保证每台机器的任务量相等。不幸的是,很多时候,任务的分配是不均匀的,甚至不均匀到大部分任务被分配到个别机器上,其它大部分机器所分配的任务量只占总得的小部分。比如一台机器负责处理80的任务,另外两台机器各处理10的任务。 不患多而患不均,这是分布式环境下最大的问题。意味着计算能力不是线性扩展的,而是存在短板效应:一个Stage所耗费的时间,是由最慢的那个Task决定。 由于同一个Stage内的所有task执行相同的计算,在排除不同计算节点计算能力差异的前提下,不同task之间耗时的差异主要由该task所处理的数据量决定。所以,要想发挥分布式系统并行计算的优势,就必须解决数据倾斜问题。 二、数据倾斜的危害 当出现数据倾斜时,小量任务耗时远高于其它任务,从而使得整体耗时过大,未能充分发挥分布式系统的并行计算优势。 另外,当发生数据倾斜时,部分任务处理的数据量过大,可能造成内存不足使得任务失败,并进而引进整个应用失败。 三、数据倾斜的现象 当发现如下现象时,十有八九是发生数据倾斜了:绝大多数task执行得都非常快,但个别task执行极慢,整体任务卡在某个阶段不能结束。原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是我们写的业务代码造成的。这种情况比较少见。TIPS 在Sparkstreaming程序中,数据倾斜更容易出现,特别是在程序中包含一些类似sql的join、group这种操作的时候。因为SparkStreaming程序在运行的时候,我们一般不会分配特别多的内存,因此一旦在这个过程中出现一些数据倾斜,就十分容易造成OOM。 四、数据倾斜的原因 在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。比如大部分key对应10条数据,但是个别key却对应了100万条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。 因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出。 五、问题发现与定位 1、通过SparkWebUI 通过SparkWebUI来查看当前运行的stage各个task分配的数据量(ShuffleReadSizeRecords),从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。 知道数据倾斜发生在哪一个stage之后,接着我们就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类算子。可以通过countByKey查看各个key的分布。TIPS 数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。 2、通过key统计 也可以通过抽样统计key的出现次数验证。 由于数据量巨大,可以采用抽样的方式,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个:df。select(key)。sample(false,0。1)数据采样。(k(k,1))。reduceBykey()统计key出现的次数。map(k(k。2,k。1))。sortByKey(false)根据key出现次数进行排序。take(10)取前10个。 如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。 六、如何缓解数据倾斜 基本思路业务逻辑:我们从业务逻辑的层面上来优化数据倾斜,比如要统计不同城市的订单情况,那么我们单独对这一线城市来做count,最后和其它城市做整合。程序实现:比如说在Hive中,经常遇到count(distinct)操作,这样会导致最终只有一个reduce,我们可以先group再在外面包一层count,就可以了;在Spark中使用reduceByKey替代groupByKey等。参数调优:Hadoop和Spark都自带了很多的参数和机制来调节数据倾斜,合理利用它们就能解决大部分问题。 思路1。过滤异常数据 如果导致数据倾斜的key是异常数据,那么简单的过滤掉就可以了。 首先要对key进行分析,判断是哪些key造成数据倾斜。具体方法上面已经介绍过了,这里不赘述。 然后对这些key对应的记录进行分析:空值或者异常值之类的,大多是这个原因引起无效数据,大量重复的测试数据或是对结果影响不大的有效数据有效数据,业务导致的正常数据分布 解决方案 对于第1,2种情况,直接对数据进行过滤即可。 第3种情况则需要特殊的处理,具体我们下面详细介绍。 思路2。提高shuffle并行度 Spark在做Shuffle时,默认使用HashPartitioner(非HashShuffle)对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的Key对应的数据被分配到了同一个Task上,造成该Task所处理的数据远大于其它Task,从而造成数据倾斜。 如果调整Shuffle时的并行度,使得原本被分配到同一Task的不同Key发配到不同Task上处理,则可降低原Task所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。 (1)操作流程 RDD操作可在需要Shuffle的操作算子上直接设置并行度或者使用spark。default。parallelism设置。如果是SparkSQL,还可通过SETspark。sql。shuffle。partitions〔numtasks〕设置并行度。默认参数由不同的ClusterManager控制。 dataFrame和sparkSql可以设置spark。sql。shuffle。partitions〔numtasks〕参数控制shuffle的并发度,默认为200。 (2)适用场景 大量不同的Key被分配到了相同的Task造成该Task数据量过大。 (3)解决方案 调整并行度。一般是增大并行度,但有时如减小并行度也可达到效果。 (4)优势 实现简单,只需要参数调优。可用最小的代价解决问题。一般如果出现数据倾斜,都可以通过这种方法先试验几次,如果问题未解决,再尝试其它方法。 (5)劣势 适用场景少,只是让每个task执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,如果某些key的大小非常大,即使一个task单独执行它,也会受到数据倾斜的困扰。并且该方法一般只能缓解数据倾斜,没有彻底消除问题。从实践经验来看,其效果一般。 TIPS可以把数据倾斜类比为hash冲突。提高并行度就类似于提高hash表的大小。 思路3。自定义Partitioner (1)原理 使用自定义的Partitioner(默认为HashPartitioner),将原本被分配到同一个Task的不同Key分配到不同Task。 例如,我们在groupByKey算子上,使用自定义的Partitioner:。groupByKey(newPartitioner(){OverridepublicintnumPartitions(){return12;}OverridepublicintgetPartition(Objectkey){intidInteger。parseInt(key。toString());if(id9500000id9500084((id9500000)12)0){return(id9500000)12;}else{returnid12;}}}) TIPS这个做法相当于自定义hash表的哈希函数。 (2)适用场景 大量不同的Key被分配到了相同的Task造成该Task数据量过大。 (3)解决方案 使用自定义的Partitioner实现类代替默认的HashPartitioner,尽量将所有不同的Key均匀分配到不同的Task中。 (4)优势 不影响原有的并行度设计。如果改变并行度,后续Stage的并行度也会默认改变,可能会影响后续Stage。 (5)劣势 适用场景有限,只能将不同Key分散开,对于同一Key对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的Partitioner,不够灵活。 思路4。Reduce端Join转化为Map端Join 通过Spark的Broadcast机制,将Reduce端Join转化为Map端Join,这意味着Spark现在不需要跨节点做shuffle而是直接通过本地文件进行join,从而完全消除Shuffle带来的数据倾斜。 frompyspark。sql。functionsimportbroadcastresultbroadcast(A)。join(B,〔joincol〕,left) 其中A是比较小的dataframe并且能够整个存放在executor内存中。 (1)适用场景 参与Join的一边数据集足够小,可被加载进Driver并通过Broadcast方法广播到各个Executor中。 (2)解决方案 在JavaScala代码中将小数据集数据拉取到Driver,然后通过Broadcast方案将小数据集的数据广播到各Executor。或者在使用SQL前,将Broadcast的阈值调整得足够大,从而使Broadcast生效。进而将ReduceJoin替换为MapJoin。 (3)优势 避免了Shuffle,彻底消除了数据倾斜产生的条件,可极大提升性能。 (4)劣势 因为是先将小数据通过Broadcase发送到每个executor上,所以需要参与Join的一方数据集足够小,并且主要适用于Join的场景,不适合聚合的场景,适用条件有限。NOTES 使用SparkSQL时需要通过SETspark。sql。autoBroadcastJoinThreshold104857600将Broadcast的阈值设置得足够大,才会生效。 思路5。拆分join再union 思路很简单,就是将一个join拆分成倾斜数据集Join和非倾斜数据集Join,最后进行union:对包含少数几个数据量过大的key的那个RDD(假设是leftRDD),通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。具体方法上面已经介绍过了,这里不赘述。然后将这k个key对应的数据从leftRDD中单独过滤出来,并给每个key都打上1n以内的随机数作为前缀,形成一个单独的leftSkewRDD;而不会导致倾斜的大部分key形成另外一个leftUnSkewRDD。接着将需要join的另一个rightRDD,也过滤出来那几个倾斜key并通过flatMap操作将该数据集中每条数据均转换为n条数据(这n条数据都按顺序附加一个0n的前缀),形成单独的rightSkewRDD;不会导致倾斜的大部分key也形成另外一个rightUnSkewRDD。现在将leftSkewRDD与膨胀n倍的rightSkewRDD进行join,且在Join过程中将随机前缀去掉,得到倾斜数据集的Join结果skewedJoinRDD。注意到此时我们已经成功将原先相同的key打散成n份,分散到多个task中去进行join了。对leftUnSkewRDD与rightUnRDD进行Join,得到Join结果unskewedJoinRDD。通过union算子将skewedJoinRDD与unskewedJoinRDD进行合并,从而得到完整的Join结果集。TIPS rightRDD与倾斜Key对应的部分数据,需要与随机前缀集(1n)作笛卡尔乘积(即将数据量扩大n倍),从而保证无论数据倾斜侧倾斜Key如何加前缀,都能与之正常Join。skewRDD的join并行度可以设置为nk(k为topSkewkey的个数)。由于倾斜Key与非倾斜Key的操作完全独立,可并行进行。 (1)适用场景 两张表都比较大,无法使用Map端Join。其中一个RDD有少数几个Key的数据量过大,另外一个RDD的Key分布较为均匀。 (2)解决方案 将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join并去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。 (3)优势 相对于Map则Join,更能适应大数据集的Join。如果资源充足,倾斜部分数据集与非倾斜部分数据集可并行进行,效率提升明显。且只针对倾斜部分的数据做数据扩展,增加的资源消耗有限。 (4)劣势 如果倾斜Key非常多,则另一侧数据膨胀非常大,此方案不适用。而且此时对倾斜Key与非倾斜Key分开处理,需要扫描数据集两遍,增加了开销。 思路6。大表key加盐,小表扩大N倍jion 如果出现数据倾斜的Key比较多,上一种方法将这些大量的倾斜Key分拆出来,意义不大。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍)。 其实就是上一个方法的特例或者简化。少了拆分,也就没有union。 (1)适用场景 一个数据集存在的倾斜Key比较多,另外一个数据集数据分布比较均匀。 (2)优势 对大部分场景都适用,效果不错。 (3)劣势 需要将一个数据集整体扩大N倍,会增加资源消耗。 思路7。map端先局部聚合 在map端加个combiner函数进行局部聚合。加上combiner相当于提前进行reduce,就会把一个mapper中的相同key进行聚合,减少shuffle过程中数据量以及reduce端的计算量。这种方法可以有效的缓解数据倾斜问题,但是如果导致数据倾斜的key大量分布在不同的mapper的时候,这种方法就不是很有效了。 TIPS使用reduceByKey而不是groupByKey。 思路8。加盐局部聚合去盐全局聚合 这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key都打上一个1n的随机数,比如3以内的随机数,此时原先一样的key就变成不一样的了,比如(hello,1)(hello,1)(hello,1)(hello,1)(hello,1),就会变成(1hello,1)(3hello,1)(2hello,1)(1hello,1)(2hello,1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1hello,2)(2hello,2)(3hello,1)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2)(hello,1),再次进行全局聚合操作,就可以得到最终结果了,比如(hello,5)。defantiSkew():RDD〔(String,Int)〕{valSPLITvalprefixnewRandom()。nextInt(10)pairs。map(t(prefixSPLITt。1,1))。reduceByKey((v1,v2)v1v2)。map(t(t。1。split(SPLIT)(1),t2。2))。reduceByKey((v1,v2)v1v2)} 不过进行两次mapreduce,性能稍微比一次的差些。 七、Hadoop中的数据倾斜 Hadoop中直接贴近用户使用的是Mapreduce程序和Hive程序,虽说Hive最后也是用MR来执行(至少目前Hive内存计算并不普及),但是毕竟写的内容逻辑区别很大,一个是程序,一个是Sql,因此这里稍作区分。 Hadoop中的数据倾斜主要表现在ruduce阶段卡在99。99,一直99。99不能结束。 这里如果详细的看日志或者和监控界面的话会发现:有一个多几个reduce卡住各种container报错OOM读写的数据量极大,至少远远超过其它正常的reduce伴随着数据倾斜,会出现任务被kill等各种诡异的表现。 经验:Hive的数据倾斜,一般都发生在Sql中Group和On上,而且和数据逻辑绑定比较深。 优化方法 这里列出来一些方法和思路,具体的参数和用法在官网看就行了。mapjoin方式countdistinct的操作,先转成group,再count参数调优sethive。map。aggrtruesethive。groupby。skewindatatrueleftsemijion的使用设置map端输出、中间结果压缩。(不完全是解决数据倾斜的问题,但是减少了IO读写和网络传输,能提高很多效率) 说明hive。map。aggrtrue:在map中会做部分聚集操作,效率更高但需要更多的内存。 hive。groupby。skewindatatrue:数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupByKey有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupByKey分布到Reduce中(这个过程可以保证相同的GroupByKey被分布到同一个Reduce中),最后完成最终的聚合操作。 八、参考文章Spark性能优化之道解决Spark数据倾斜(DataSkew)的N种姿势漫谈千亿级数据优化实践:数据倾斜(纯干货)解决spark中遇到的数据倾斜问题