埃尔法哥哥@Spark 数据倾斜的 8 大实用方法,解决( 三 )


例如 , 我们在groupByKey算子上 , 使用自定义的Partitioner:
.groupByKey(newPartitioner(){@OverridepublicintnumPartitions(){return12;}@OverridepublicintgetPartition(Objectkey){intid=Integer.parseInt(key.toString());if(id>=9500000&&id<=9500084&&((id-9500000)%12)==0){return(id-9500000)/12;}else{returnid%12;}}})
TIPS:
这个做法相当于自定义hash表的哈希函数 。
(2)适用场景
大量不同的Key被分配到了相同的Task造成该Task数据量过大 。
(3)解决方案
使用自定义的Partitioner实现类代替默认的HashPartitioner , 尽量将所有不同的Key均匀分配到不同的Task中 。
(4)优势
不影响原有的并行度设计 。 如果改变并行度 , 后续Stage的并行度也会默认改变 , 可能会影响后续Stage 。
(5)劣势
适用场景有限 , 只能将不同Key分散开 , 对于同一Key对应数据集非常大的场景不适用 。 效果与调整并行度类似 , 只能缓解数据倾斜而不能完全消除数据倾斜 。 而且需要根据数据特点自定义专用的Partitioner , 不够灵活 。
思路4.Reduce端Join转化为Map端Join
通过Spark的Broadcast机制 , 将Reduce端Join转化为Map端Join , 这意味着Spark现在不需要跨节点做shuffle而是直接通过本地文件进行join , 从而完全消除Shuffle带来的数据倾斜 。
埃尔法哥哥@Spark 数据倾斜的 8 大实用方法,解决
文章图片
frompyspark.sql.functionsimportbroadcastresult=broadcast(A).join(B,["join_col"],"left")
其中A是比较小的dataframe并且能够整个存放在executor内存中 。
(1)适用场景
参与Join的一边数据集足够小 , 可被加载进Driver并通过Broadcast方法广播到各个Executor中 。
(2)解决方案
在Java/Scala代码中将小数据集数据拉取到Driver , 然后通过Broadcast方案将小数据集的数据广播到各Executor 。 或者在使用SQL前 , 将Broadcast的阈值调整得足够大 , 从而使Broadcast生效 。 进而将ReduceJoin替换为MapJoin 。
(3)优势
避免了Shuffle , 彻底消除了数据倾斜产生的条件 , 可极大提升性能 。
(4)劣势
因为是先将小数据通过Broadcase发送到每个executor上 , 所以需要参与Join的一方数据集足够小 , 并且主要适用于Join的场景 , 不适合聚合的场景 , 适用条件有限 。
NOTES:
使用SparkSQL时需要通过SETspark.sql.autoBroadcastJoinThreshold=104857600将Broadcast的阈值设置得足够大 , 才会生效 。
思路5.拆分join再union
思路很简单 , 就是将一个join拆分成倾斜数据集Join和非倾斜数据集Join , 最后进行union:
对包含少数几个数据量过大的key的那个RDD(假设是leftRDD) , 通过sample算子采样出一份样本来 , 然后统计一下每个key的数量 , 计算出来数据量最大的是哪几个key 。 具体方法上面已经介绍过了 , 这里不赘述 。
然后将这k个key对应的数据从leftRDD中单独过滤出来 , 并给每个key都打上1~n以内的随机数作为前缀 , 形成一个单独的leftSkewRDD;而不会导致倾斜的大部分key形成另外一个leftUnSkewRDD 。
接着将需要join的另一个rightRDD , 也过滤出来那几个倾斜key并通过flatMap操作将该数据集中每条数据均转换为n条数据(这n条数据都按顺序附加一个0~n的前缀) , 形成单独的rightSkewRDD;不会导致倾斜的大部分key也形成另外一个rightUnSkewRDD 。
现在将leftSkewRDD与膨胀n倍的rightSkewRDD进行join , 且在Join过程中将随机前缀去掉 , 得到倾斜数据集的Join结果skewedJoinRDD 。 注意到此时我们已经成功将原先相同的key打散成n份 , 分散到多个task中去进行join了 。