『InfoQ』是如何实现每秒 200 万次的数据处理?,Netflix( 二 )


『InfoQ』是如何实现每秒 200 万次的数据处理?,Netflix
文章图片
摄取
这个数据库的数据插入是实时的 , 不是将单个记录插入到数据源中 , 而是从Kafka流读取事件(就是我们的度量) 。 每个数据源使用一个主题 。 在Druid中 , 我们使用Kafka索引任务 , 它创建了多个分布在实时节点(中间管理器)上的索引工作器 。
这些索引器都订阅主题 , 并从流中读取其事件 。 索引器根据摄取规范从事件消息中提取值 , 并将创建的行累积到内存中 。 一旦创建了一行 , 就可以查询它 。 对于索引器正在填充的段的时间块进行查询 , 将由索引器本身提供服务 。 由于索引任务本质上是执行两项工作 , 即摄取和处理查询 , 所以及时将数据发送到历史节点 , 以更优化的方式将查询工作卸载给它们是很重要的 。
Druid可以在摄取时汇总数据 , 以尽量减少需要存储的原始数据量 。 Rollup是一种汇总或预聚合的形式 。 在某些情况下 , 汇总数据可以极大地减少需要存储的数据的大小 , 可能会减少行数数量级 。 然而 , 这种存储减少是有代价的:我们失去了查询单个事件的能力 , 只能在预定义的查询粒度上进行查询 。 对于我们的用例 , 我们选择了1分钟的查询粒度 。
在摄取期间 , 如果任何行具有相同的维度 , 并且它们的时间戳在同一分钟内(我们的查询粒度) , 则将这些行汇总 。 这意味着 , 通过将所有度量值相加合并行并增加计数器 , 我们就可以知道有多少事件对这一行的值有贡献 。 这种形式的Rollup可以显著地减少数据库中的行数 , 从而加快查询速度 。
一旦累积的行数达到某个阈值 , 或者段打开的时间太长 , 这些行就被写入段文件并被卸载到深层存储中 。 然后 , 索引器通知协调器片段已经做好准备 , 以便协调器可以告诉一个或多个历史节点来加载它 。 一旦段被成功地加载到历史节点中 , 它就会从索引器中卸载 , 任何针对该数据的查询现在都将由历史节点提供服务 。
数据管理
可以想象 , 随着维度基数的增加 , 在同一分钟内发生相同事件的可能性会降低 。 管理基数(以便汇总)是实现良好查询性能的强大手段 。
为了达到我们需要的摄取速度 , 可以运行许多索引器实例 。 即使索引任务使用Rollup合并相同的行 , 在一个索引任务的同一个实例中获得这些相同行的机会也
预定的压缩任务从深度存储中获取时间块的所有段 , 并运行map/reduce作业来重新创建段并实现完美的汇总 。 然后 , 由历史节点加载和发布新的段 , 替换和取代原来的、未充分汇总的段 。 在我们的例子中 , 通过使用这个额外的压缩任务 , 行数减少到了1/2 。
知道何时收到给定时间块的所有事件并不是一件小事 。 Kafka上可能有延迟到达的数据 , 或者索引器将片段传递给历史节点可能需要花些时间 。 为了解决这个问题 , 我们会在运行压缩之前执行一些限制和检查 。
首先 , 我们丢弃所有非常晚才到达的数据 。 我们认为 , 这些数据在我们的实时系统已经过时 。 这设置了数据延迟的界限 。 其次 , 压缩任务被延迟调度 , 这使得段有足够的时间可以卸载到正常流中的历史节点 。 最后 , 当给定时间块的预定压缩任务启动时 , 它将查询段元数据 , 检查是否仍然有相关的段被写入或传递 。 如果有 , 它将等待几分钟后再试一次 。 这将确保所有数据都由压缩作业处理 。
没有这些措施 , 我们发现有时会丢失数据 。 在开始压缩时仍有写入的段将被新压缩的段所覆盖 , 这些段具有更高的版本 , 因此会优先 。 这可以有效地删除包含在那些尚未完成传递的段中的数据 。
查询
Druid支持两种查询语言:DruidSQL和原生查询 。 在底层 , DruidSQL查询会被转换成原生查询 。 原生查询以JSON格式提交给REST端点 , 这是我们使用的主要机制 。