区分理解Flink水印延迟与窗口允许延迟的概念

导读:Flink 在开窗处理事件时间(Event Time) 数据时 , 可设置水印延迟以及设置窗口允许延迟(allowedLateness)以保证数据的完整性 。 这两者因都是设置延迟时间所以刚接触时容易混淆 。 本文接下将展开讨论分析“水印延迟”与“窗口允许延迟”概念及区别 。

  • 水印延迟
  • 窗口允许延迟
  • 一个Demo 两个猜想
  • 总结
水印延迟(WaterMark)水印
由于采用了事件时间 , 脱离了物理挂钟 。 窗口不知道什么时候需要关闭并进行计算 , 这个时候需要借助水印来解决该问题 。 当窗口遇到水位标识时就默认是窗口时间段内的数据都到齐了 , 可以触发窗口计算 。
水印延迟
设置水印延迟时间的目的是让水印延迟到达 , 从而可以解决乱序问题 。 通过水印延迟到达让在延迟时间范围内到达的迟到数据可以加入到窗口计算中 , 保证了数据的完整性 。 当水印到达后就会触发窗口计算 , 在水印之后到达的迟到数据则会被丢弃 。
区分理解Flink水印延迟与窗口允许延迟的概念文章插图
窗口允许延迟(allowedLateness)
区分理解Flink水印延迟与窗口允许延迟的概念文章插图
使用 StreamAPI 时 , 在进行开窗后可设置 allowedLateness 窗口延迟 。 官网中对其解释如下:
默认情况下 , 当水印到达窗口末端时 , 迟到元素将会被删除 。 但Flink允许为window operators指定允许的最大延迟 。 允许延迟指定元素在被删除之前延迟的时间 , 默认值为0 。 当元素在水印经过窗口末端后到达 , 且它的到达时间在窗口末端加上运行延迟的时间之内 , 其仍会被添加到窗口中 。 根据所使用的触发器 , 延迟但未被丢弃的元素可能会再次触发窗口计算 。 EventTimeTrigger就是这种情况 。 为了做到这一点 , Flink保持窗口的状态 , 直到它们允许的延迟到期 。 一旦发生这种情况 , Flink将删除窗口并删除其状态 , 正如窗口生命周期部分中所描述的那样 。
区分理解Flink水印延迟与窗口允许延迟的概念文章插图
简单理解:通常在水印到达之后迟到数据将会被删除 , 而窗口的延迟则是指数据在被删除之前的允许保留时间 。 也就是说 , 在水印达到之后迟到数据本该被删除 , 但是如果设置了窗口延迟 , 那么在水印之后到窗口延迟时间段内到达的迟到数据还是会被加入到窗口计算中 , 并再次触发窗口计算 。
一个Demo 两个猜想下面我用一个 Demo 和两个猜想来帮助大家加深理解这两个概念 。
例子:接收 Kafka 数据 , 数据为 JSON 格式如:{"word":"a","count":1,"time":1604286564} 。 我们开一个 5 秒的 tumbling windows 滚动窗口 , 以 word 作为 key 在窗口内对 count 值进行累加 。 同时设置水印延迟 2 秒 , 窗口延迟 2 秒 。 代码如下:
public class MyExample {public static void main(String[] args) throws Exception {// 创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 设置时间特性为env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 水印策略 , 其需要注入Timestamp Assigner(描述了如何访问事件时间戳)和 Watermark Generator (事件流显示的超出正常范围的程度)WatermarkStrategy watermarkStrategy = WatermarkStrategy// forBoundedOutOfOrderness 属于(periodic周期性) , 周期生成器通常通过onEvent()观察传入的事件 , 然后在框架调用onPeriodicEmit()时发出水印 。.forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner() {@Overridepublic long extractTimestamp(WC wc, long l) {return wc.getEventTime() * 1000;}});// Kafka 配置Properties properties = new Properties();properties.setProperty("bootstrap.servers", "Kafka地址:9092");properties.setProperty("group.id", "test");// Flink 需要知道如何转换Kafka消息为Java对象(反序列化) , 默认提供了 KafkaDeserializationSchema(序列化需要自己编写)、JsonDeserializationSchema、AvroDeserializationSchema、TypeInformationSerializationSchemaenv.addSource(new FlinkKafkaConsumer<>("flinktest1", new JSONKeyValueDeserializationSchema(true), properties).setStartFromLatest())// map 构建 WC 对象.map(new MapFunction() {@Overridepublic WC map(ObjectNode jsonNode) throws Exception {JsonNode valueNode = jsonNode.get("value");WC wc = new WC(valueNode.get("word").asText(),valueNode.get("count").asInt(),valueNode.get("time").asLong());return wc;}})// 设定水印策略.assignTimestampsAndWatermarks(watermarkStrategy).keyBy(WC::getWord)// 窗口设置 , 这里设置为滚动窗口.window(TumblingEventTimeWindows.of(Time.seconds(5)))// 设置窗口延迟.allowedLateness(Time.seconds(2)).reduce(new ReduceFunction