“榨干”EMR开销!AWS EMR在搭建大数据平台ETL的应用实践( 二 )


在批处理ETLpipeline中有两个重要的组件分别叫Optimus和JetFire , 就是大家耳熟能详的擎天柱和天火 , 是由我司Transformerteam建立的一套基于Spark的数据建模和处理框架(所以team的产品都以Transformer中的角色来命名) 。 Optimus主要针对的是数据仓库层的建设 , 主要功能是将广告系统产生的log经前端模块收集起来后 , 根据商业逻辑的需求进行抽取转换 , 统一建模 , 并做了大量业务的enrichment , 将原始数据转换成方便下游应用端进行分析使用的ContextData , 最终由SparkSQL生成宽表落盘 。 JetFire更偏向于一个灵活通用的基于Spark的ETLFramework , 能让用户更简单方便的将基于宽表之上的数据加工需求进行快速实现 。 这些pipelines都是由Airflow进行任务的编排和调度 。

“榨干”EMR开销!AWS EMR在搭建大数据平台ETL的应用实践
文章图片

“榨干”EMR开销!AWS EMR在搭建大数据平台ETL的应用实践
文章图片
任务特点
基于Optimus的批处理ETLpipeline
ETLpipeline每小时一个batch , 由于客户分布在全球各个时区 , 所以数据的发布需求是按照客户所在时区的零点之后发布当前时区客户前24小时的数据;另外有些产品也需要支持每个小时的数据都不能延迟 , 每个小时的数据处理需要控制在30min以内;数据量不稳定;客户在不同时区分布的密度 , 各个小时的流量也不同 , 以及区域性事件的发生 , 以及内部上游模块可能会发生delay等都会造成每小时数据量不均匀 。 虽然每天24个小时的数据量分布虽然大致趋势相同 , 但是不能准确预测 , 只有将要开始处理这个batch的时候 , 才能从上游拿到这个batch的数据量信息;数据在ETL的中间过程中在HDFS上没有持久化需求;对于HDFS的需求是支撑Spark任务以及EMR集群的访问 , 保证一次批任务内部的事务性即可 。 需要持久化的数据会由后续的模块load到clickhouse , 以及同步发布到S3上交由hive来管理;Spark任务对于集群资源的需求:Optimus中由于存在大量的计算(如数据序列化反序列化 , metric的计算 , 数据的排序聚合 , Hyperloglog计算等)和缓存(分层建模中 , 在DAG中被反复用到的数据) , Spark任务在不同阶段对集群资源的需求点是不同的:从数据load进内存到对数据进行transform进行建模的过程 , 是计算密集型的 , 需要消耗大量的CPU , 同时由于有些dataframe需要被更上层的模型复用 , 需要cache起来 , 这里需要大量的memory;而最终在支撑大量并发SparkSQL的数据抽取和聚合的运算中 , 网络和CPU都是很大性能瓶颈 。 我们针对Spark应用做了非常精细的调优工作 , 具体可以参考文章《ApacheSpark3.0新特性在FreeWheel核心业务数据团队的应用与实战》 。 在处理一个batch的过程中 , 集群资源使用情况可以对应比较下面三个图 。
“榨干”EMR开销!AWS EMR在搭建大数据平台ETL的应用实践
文章图片

“榨干”EMR开销!AWS EMR在搭建大数据平台ETL的应用实践
文章图片

“榨干”EMR开销!AWS EMR在搭建大数据平台ETL的应用实践
文章图片
基于JetFire的DataFeedpipeline
Datafeed中的任务具有不同的schedule , 不同的input数据量 , 提交到EMR集群的任务负载无法提前预知 。 这种场景下 , EMR更像是一个共享的计算平台 , 我们很难从单独一个应用的角度对整体资源进行规划 。 考虑到基于Optimus的ETLpipeline是众多下游应用的共同依赖 , 需要保证很高的稳定性 , 我们期望能够在独占一个EMR集群的前提下尽量降低开销 。 而Datafeedpipeline任务零碎且多 , 绝大部分任务轻量需要较快完成 。我们来各个击破上述需求
EMR集群配置
基于前文对EMR集群的介绍以及实际应用需求 , 总的来说 , 我们使用了在Longterm的EMR集群 , 运行单点Ondemand机型的Masternode+单点Ondemand机型的Corenode+动态伸缩的由Spot&Ondemand机型的InstanceFleet组成的Worker 。
LongtermEMRVSOndemand创建EMR
我们不选择在每次需要EMR集群的时候去重新创建一个集群的原因是 , 除了机器实例provision的时间外 , EMR还需要额外运行较长时间的bootstraping脚本去启动很多服务才能让整个集群ready 。 而Masternode和Corenode我们在上文介绍过 , 相较于Workernode , 他们不仅需要支撑Hadoop服务 , 还需要下载并配置Spark , Ganglia等环境 , 以及更多服务(根据用户勾选的application决定) , 这样频繁创建和销毁带来的时间开销相对于hourlyschedule的任务而言 , 是不能忽略的 。 所以我们选择创建维护一个longtermEMR集群 , 通过scaleworkernode来控制负载的方案 。 而对于使用间隔较长 , 如daily甚至weeklyjob来讲 , 在每次需要使用EMR的时候进行临时创建是一个更合理的方案 。