[埃尔法哥哥]解决 Spark 数据倾斜的 8 大实用方法( 四 )


现在将 leftSkewRDD 与 膨胀 n 倍的 rightSkewRDD 进行 join , 且在 Join 过程中将随机前缀去掉 , 得到倾斜数据集的 Join 结果 skewedJoinRDD 。 注意到此时我们已经成功将原先相同的 key 打散成 n 份 , 分散到多个 task 中去进行 join 了 。
对 leftUnSkewRDD 与 rightUnRDD 进行Join , 得到 Join 结果 unskewedJoinRDD 。
通过 union 算子将 skewedJoinRDD 与 unskewedJoinRDD 进行合并 , 从而得到完整的 Join 结果集 。
TIPS:
rightRDD 与倾斜 Key 对应的部分数据 , 需要与随机前缀集 (1~n) 作笛卡尔乘积 (即将数据量扩大 n 倍) , 从而保证无论数据倾斜侧倾斜 Key 如何加前缀 , 都能与之正常 Join 。
skewRDD 的 join 并行度可以设置为 n * k (k 为 topSkewkey 的个数) 。
由于倾斜Key与非倾斜Key的操作完全独立 , 可并行进行 。
(1)适用场景
两张表都比较大 , 无法使用 Map 端 Join 。 其中一个 RDD 有少数几个 Key 的数据量过大 , 另外一个 RDD 的 Key 分布较为均匀 。
(2)解决方案
将有数据倾斜的 RDD 中倾斜 Key 对应的数据集单独抽取出来加上随机前缀 , 另外一个 RDD 每条数据分别与随机前缀结合形成新的RDD(相当于将其数据增到到原来的N倍 , N即为随机前缀的总个数) , 然后将二者Join并去掉前缀 。 然后将不包含倾斜Key的剩余数据进行Join 。 最后将两次Join的结果集通过union合并 , 即可得到全部Join结果 。
(3)优势
相对于 Map 则 Join , 更能适应大数据集的 Join 。 如果资源充足 , 倾斜部分数据集与非倾斜部分数据集可并行进行 , 效率提升明显 。 且只针对倾斜部分的数据做数据扩展 , 增加的资源消耗有限 。
(4)劣势
如果倾斜 Key 非常多 , 则另一侧数据膨胀非常大 , 此方案不适用 。 而且此时对倾斜 Key 与非倾斜 Key 分开处理 , 需要扫描数据集两遍 , 增加了开销 。
思路6. 大表 key 加盐 , 小表扩大 N 倍 jion
如果出现数据倾斜的 Key 比较多 , 上一种方法将这些大量的倾斜 Key 分拆出来 , 意义不大 。 此时更适合直接对存在数据倾斜的数据集全部加上随机前缀 , 然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大N倍) 。
其实就是上一个方法的特例或者简化 。 少了拆分 , 也就没有 union 。
(1)适用场景
一个数据集存在的倾斜 Key 比较多 , 另外一个数据集数据分布比较均匀 。
(2)优势
对大部分场景都适用 , 效果不错 。
(3)劣势
需要将一个数据集整体扩大 N 倍 , 会增加资源消耗 。
思路7. map 端先局部聚合
在 map 端加个 combiner 函数进行局部聚合 。 加上 combiner 相当于提前进行 reduce ,就会把一个 mapper 中的相同 key 进行聚合 , 减少 shuffle 过程中数据量 以及 reduce 端的计算量 。 这种方法可以有效的缓解数据倾斜问题 , 但是如果导致数据倾斜的 key 大量分布在不同的 mapper 的时候 , 这种方法就不是很有效了 。
TIPS:
使用 reduceByKey 而不是 groupByKey 。
思路8. 加盐局部聚合 + 去盐全局聚合
这个方案的核心实现思路就是进行两阶段聚合 。 第一次是局部聚合 , 先给每个 key 都打上一个 1~n 的随机数 , 比如 3 以内的随机数 , 此时原先一样的 key 就变成不一样的了 , 比如 (hello, 1) (hello, 1) (hello, 1) (hello, 1) (hello, 1) , 就会变成 (1_hello, 1) (3_hello, 1) (2_hello, 1) (1_hello, 1) (2_hello, 1) 。 接着对打上随机数后的数据 , 执行 reduceByKey 等聚合操作 , 进行局部聚合 , 那么局部聚合结果 , 就会变成了 (1_hello, 2) (2_hello, 2) (3_hello, 1) 。 然后将各个 key 的前缀给去掉 , 就会变成 (hello, 2) (hello, 2) (hello, 1) , 再次进行全局聚合操作 , 就可以得到最终结果了 , 比如 (hello, 5) 。