技术编程|图解 DataX 核心设计原理( 二 )


bps 限速:needChannelNumber = 总 byteLimit / 单个 Channel byteLimit
tps 限速:needChannelNumber = 总 recordLimit / 单个 Channel recordLimit
如果以上都没有设置 , 则会根据用户在 job.setting.speed.channel 配置的并发数量设置 needChannelNumber 。
2)根据 needChannelNumber 将 Job 切分成多个 Task
这个步骤的具体切分逻辑交由相关插件去完成 , 例如 Rdb 对数据的拆分主要分成两类:
如果用户配置了具体的 Table 数量 , 那么就按照 Table 为最小单元进行拆分(即一个 Table 对应一个 Task) , 并生成对应的 querySql;
如果用户还配置了 splitPk , 则会根据 splitPk 进行切分 , 具体逻辑是根据 splitPk 区间对 Table 进行拆分 , 并生成对应的 querySql 。
2、公平分配策略
DataX 在执行调度之前 , 会调用 JobAssignUtil#assignFairly方法对切分好的 Task 公平分配给每个 TaskGroup 。
在分配之前 , 会计算 TaskGroup 的数量 , 具体公式:inttaskGroupNumber=(int)Math.ceil(1.0*channelNumber/channelsPerTaskGroup);
channelNumber 即为在切分策略中根据用户配置计算得到的 needChannelNumber 并发数量大小 , channelsPerTaskGroup 为每个 TaskGroup 需要的并发数量 , 默认为 5 。
求出 TaskGroup 的数量之后 , 就会执行公平分配策略 , 将 Task 平均分配个每个 TaskGroup , 最后执行调度 , 完成整个同步作业 。举个公平分配策略的例子:
假设 A 库有表 0、1、2 , B 库上有表 3、4 , C 库上有表 5、6、7 , 如果此时有 4 个 TaskGroup , 则 assign 后的结果为:taskGroup-0:0,4,taskGroup-1:3,6,taskGroup-2:5,2,taskGroup-3:1,7
举个例子来描述 Job、Task、Task Group 之间的关系:
用户构建了一个数据同步作业 , 该作业的目的是将 MySql 的 100 张表同步到 Oracle 库中 , 假设此时用户设置了 20 个并发(即 channelNumber=20):
DataX 根据表的数量切分成 100 个 Task;
DataX 默认给每个 TaskGroup 分配 5 个 Channel , 因此 taskGroupNumber = channelNumber / channelsPerTaskGroup = 20 / 5 = 4;
根据 DataX 的公平分配策略 , 会将 100 个 Task 平均分配给每个 TaskGroup , 因此每个 TaskGroup 处理 taskNumber / taskGroupNumber = 100 / 4 = 25 个 Task 。
以上的例子用如下图表示:
技术编程|图解 DataX 核心设计原理
文章图片
由于一个 Channel 对应一个线程执行 , 因此 DataX 的线程模型可以用如下图表示:
技术编程|图解 DataX 核心设计原理
文章图片