「埃尔法哥哥」Spark 数据倾斜的 8 大实用方法,解决( 二 )


由于数据量巨大 , 可以采用抽样的方式 , 对数据进行抽样 , 统计出现的次数 , 根据出现次数大小排序取出前几个:
df.select("key").sample(false,0.1)//数据采样.(k=>(k,1)).reduceBykey(_+_)//统计key出现的次数.map(k=>(k._2,k._1)).sortByKey(false)//根据key出现次数进行排序.take(10)//取前10个 。
如果发现多数数据分布都较为平均 , 而个别数据比其他数据大上若干个数量级 , 则说明发生了数据倾斜 。
如何缓解数据倾斜?
基本思路:
业务逻辑:我们从业务逻辑的层面上来优化数据倾斜 , 比如要统计不同城市的订单情况 , 那么我们单独对这一线城市来做count , 最后和其它城市做整合 。
程序实现:比如说在Hive中 , 经常遇到count(distinct)操作 , 这样会导致最终只有一个reduce , 我们可以先group再在外面包一层count , 就可以了;在Spark中使用reduceByKey替代groupByKey等 。
参数调优:Hadoop和Spark都自带了很多的参数和机制来调节数据倾斜 , 合理利用它们就能解决大部分问题 。
思路1.过滤异常数据
如果导致数据倾斜的key是异常数据 , 那么简单的过滤掉就可以了 。
首先要对key进行分析 , 判断是哪些key造成数据倾斜 。 具体方法上面已经介绍过了 , 这里不赘述 。
然后对这些key对应的记录进行分析:
空值或者异常值之类的 , 大多是这个原因引起
无效数据 , 大量重复的测试数据或是对结果影响不大的有效数据
有效数据 , 业务导致的正常数据分布
解决方案:
对于第1 , 2种情况 , 直接对数据进行过滤即可 。
第3种情况则需要特殊的处理 , 具体我们下面详细介绍 。
思路2.提高shuffle并行度
Spark在做Shuffle时 , 默认使用HashPartitioner(非HashShuffle)对数据进行分区 。 如果并行度设置的不合适 , 可能造成大量不相同的Key对应的数据被分配到了同一个Task上 , 造成该Task所处理的数据远大于其它Task , 从而造成数据倾斜 。
如果调整Shuffle时的并行度 , 使得原本被分配到同一Task的不同Key发配到不同Task上处理 , 则可降低原Task所需处理的数据量 , 从而缓解数据倾斜问题造成的短板效应 。
(1)操作流程
RDD操作可在需要Shuffle的操作算子上直接设置并行度或者使用spark.default.parallelism设置 。 如果是SparkSQL , 还可通过SETspark.sql.shuffle.partitions=[num_tasks]设置并行度 。 默认参数由不同的ClusterManager控制 。
dataFrame和sparkSql可以设置spark.sql.shuffle.partitions=[num_tasks]参数控制shuffle的并发度 , 默认为200 。
(2)适用场景
大量不同的Key被分配到了相同的Task造成该Task数据量过大 。
(3)解决方案
调整并行度 。 一般是增大并行度 , 但有时如减小并行度也可达到效果 。
(4)优势
实现简单 , 只需要参数调优 。 可用最小的代价解决问题 。 一般如果出现数据倾斜 , 都可以通过这种方法先试验几次 , 如果问题未解决 , 再尝试其它方法 。
(5)劣势
适用场景少 , 只是让每个task执行更少的不同的key 。 无法解决个别key特别大的情况造成的倾斜 , 如果某些key的大小非常大 , 即使一个task单独执行它 , 也会受到数据倾斜的困扰 。 并且该方法一般只能缓解数据倾斜 , 没有彻底消除问题 。 从实践经验来看 , 其效果一般 。
TIPS:
可以把数据倾斜类比为hash冲突 。 提高并行度就类似于提高hash表的大小 。
思路3.自定义Partitioner
(1)原理
使用自定义的Partitioner(默认为HashPartitioner) , 将原本被分配到同一个Task的不同Key分配到不同Task 。
例如 , 我们在groupByKey算子上 , 使用自定义的Partitioner: