Flink中的State概念及其扩容算法 state是什么意思

在流量计算场景下,数据会源源不断地流入Apache Flink系统,每一次数据进入Apache Flink系统都会触发计算 。如果我们要进行计数聚合计算,那么每次触发计算是重新计算历史上所有的流入数据,还是每次计算都是基于之前计算结果的增量计算?答案是肯定的,Apache Flink在上次计算结果的基础上做增量计算 。那么问题来了:“最后的计算结果保存在哪里?留在记忆里可以吗?"答案是否定的,如果保存在内存中,如果某个计算节点因为 、硬件等原因出现故障,最后的计算结果就会丢失 。当节点恢复后,需要重新计算历史上的所有数据(可能是十天或者几百天的数据) 。为了避免这种灾难性的问题,Apache Flink将使用状态来存储计算结果 。本文将向您介绍Apache Flink State的相关内容 。

什么是状态这个问题好像有点“智障”?不管问题的答案是否明显,我还是想简单说一下Flink中的State是什么?状态是指流程计算过程中计算节点的中间计算结果或元数据属性 。例如,在汇总过程中,中间汇总结果应记录在状态中 。例如,当使用Apache Kafka作为数据源时,我们还应该记录已经读取和记录的偏移量 。这些状态数据将在计算过程中保持(插入或更新) 。因此,Flink中的状态是与时间相关的Flink义务的内部数据(计算数据和元数据属性)的快照 。
你为什么需要国家?与批量计算相比,状态是流量计算所特有的 。批处理计算没有故障转移机制,它要么获胜,要么重新计算 。流量计算在大多数场景下是增量计算,数据是一个一个处理的(大多数场景) 。每个计算都在最后一个计算结果之上处理 。这样的机制必然会存储最后的计算结果(生产模式应该是持久的) 。另外,由于机器、 、脏数据等引起的程序问题 。,重新启动作业时,需要从成功的检查点(checkpoint,后面章节会具体介绍)恢复状态 。计算、故障转移这些机制都需要国家的支持 。
状态存储实现Flink中有三种状态存储实现,如下所示:
基于内存的heapstateback end-在调试模式下应用,不建议在生产模式下使用;
基于HDFS的fsstate back end——分布式文件持久化,每次读写都操作内存,还需要考虑OOM问题;
基于RocksDB的Rocksdbstatebackend本地文件+异步HDFS持久化;
状态存储模式Apache Flink默认使用RocksDB+HDFS 来存储状态 。状态存储分为两个阶段,首先本地存储到RocksDB,然后异步同步到远程HDFS 。这样的设计不仅消除了HeapStateBackend的限制(内存大小,坏机等 。),还减少了纯分布式存储的 IO开销 。

状态分类keyed state——这里的key是我们在SQL语句中对应的groupby/PartitionBy中的字段,红豆博客key的值是由group by/partition by字段组成的R红豆博客ow的字节数组 。每个键都有自己的状态,键之间的状态是不可见的;
OperatorState将用于记录OperatorState-Flink内部源连接器实现中源数据读取的偏移量 。
国家扩张时的再分配Flink是一个大规模并行分布式系统,允许大范围的条件流处理 。为了可扩展性,Flink作业在逻辑上分解为算子图,每个算子的性能在物理上分解为多个并行的算子实例 。从概念上讲,Flink中的每个并行操作符实例都是一个独立的义务,它可以调度其他连接到 的机器在自己的机器上运行 。
在Flink的DAG图中,只有边连接的节点有 通信,即所有DAG在垂直方向上都有 IO,有状态节点之间没有 通信,度方向如下图 。该模型还确保每个操作符实例保护自己的状态,并将其保存在本地磁盘上(远程异步同步) 。通过这种设计,所有义务的状态数据都是本地的,状态访问不需要义务之间的 通信 。避免这种流量对于Flink这样的大规模并行分布式系统的可伸缩性至关重要 。
我们知道Flink中有operator State和KeyedState,那么在扩展(增加并发)时如何分配状态呢?例如,外部源有五个分区,在Flink上从源的一个并发扩展为两个并发,中间有状态操作节点从两个并发扩展为三个并发,如下图所示:

在Flink中,不同类型的状态有不同的扩展 。接下来我们就分别介绍一下 。
运营商对扩容的处理我们选取Flink中一个具体的连接器实现实例进行介绍 。以MetaQ为例 。MetaQ通过topic 订阅数据,每个topic会有n >: 0个分区,以上图为例 。假设我们订阅的MetaQ的话题有5个分区,当我们的源从1并发调整为2并发时,状态如何恢复?
状态的恢复方式与OperatorState在Source中的存储结构有一定的关系 。我们先来看看MetaQSource的实现是如何存储状态的 。首先,MetaQSource实现了listcheckpointed