full outer改写left anti join实践


背景
ods层数据同步时经常会遇到增全量合并的模型 , 即T-1天增量表 + T-2全量表 = T-1全量表 。 可以通过full outer join脚本来完成合并 , 但是数据量很大时非常消耗资源 。
insert overwrite table tb_test partition(ds='${bizdate}') select case when a.id is not null then a.id esle b.id end as id ,if(a.name is not null, a.name, b.name) as name ,coalesce(a.age, b.age) as age --这3种写法一样 , 都是优先取delta表的字段 from ( select * from tb_test_delta where ds='${bizdate}' ) a full outer join ( select * from tb_test where ds='${bizdate-1}' ) b on a.id =b.id;这种写法可实现新增和更新操作:

  • 新增是指增量表中新出现的数据 , 而全量表中没有;
  • 更新是指增量表和全量表中都有的数据 , 但优先取增量表的数据 , 覆盖历史表的数据 。 如下图所示 , R2_1是增量表当天去重后增量数据 , M3是全量表前一天的数据 , 而J4_2_3则是full outer join的执行图 。
【full outer改写left anti join实践】
full outer改写left anti join实践
本文插图
将J4_2_3展开会发现里面将增量和全量进行了merge join , 当数据量很大(1288亿条)时会产生很大的shuffle开销 。 此时优化方案就是将full outer join改成 union all , 从而避免join shuffle 。
优化模型
结论:full outer join改成hash cluster + left join +union all可以有效地降低计算成本 , 且有两种应用场景 。 先将模型进行抽象 , 假设有a和b两个表 , a是增量表 , b是全量表:
with a as ( select * from values(1,'111') ,(2,'two') ,(7,'777') as (id,name) ) --增量 ,b as ( select * from values(1,'') ,(2,'222') ,(3,'333') ,(4,'444') as (id,name) )--全量场景1:只合并新增数据到全量表
left anti join相当于not in , 增量not in全量,过滤后只剩下完全新增的id , 对全量中已有的id不修改:
--查询完全新增的id select * from a left anti join b on a.id=b.id ; --结果如下 +------------+------+ | id| name | +------------+------+ | 7| 777| +------------+------+--完全新增的合并全量表 select * froma --增量表 left anti join b on a.id=b.id union all select * from b--全量表 --结果如下 +------------+------+ | id| name | +------------+------+ | 1|| | 2| 222| | 3| 333| | 4| 444| | 7| 777| +------------+------+场景2:合并新增数据到全量表 , 且更新历史数据
全量not in增量,过滤后只剩下历史的id , 然后union all增量 , 既新增也修改
--查询历史全量数据 select * from b left anti join a on a.id=b.id; --结果如下 +------------+------+ | id| name | +------------+------+ | 3| 333| | 4| 444| +------------+------+--合并新增数据到全量表 , 且更新历史数据 select * fromb --全量表 left anti join a on a.id=b.id union all select * from a ; --增量表 --结果如下 +------------+------+ | id| name | +------------+------+ | 1| 111| | 2| two| | 7| 777| | 3| 333| | 4| 444| +------------+------+优化实践
步骤1:表属性修改
表、作业属性修改,对原来的表、作业进行属性优化 , 可以提升优化效果 。
set odps.sql.reducer.instances=3072;--可选 。 默认最大1111个reducer,1111哈希桶 。alter table table_name clustered by(contact_id) sorted by(contact_id) into 3072 buckets;--必选步骤2:按照上述模型的场景1 或者 场景2进行代码改造 。
这里先给出代码改造后的资源消耗对比: