深入分析Flink 时间窗口的起始时间
我们知道Flink window窗口触发的两个条件:
(1)watermark时间 >= window_end_time 时间 。
(2)在窗口[window_start_time, window_end_time)区间中还需要有数据存在 , 如果没有数据同样是不会触发的 。
那么Flink 时间窗口的起始时间和终止时间是怎么计算出来的呢?
在获得起始时间start time之后 , 终止时间很容易计算:end time=start time+size , 其中size是窗口大小 。
每一条记录来了以后会根据时间属性值采用不同的window assigner方法分配给一个或者多个窗口 。 对于滚动窗口而言 , 一条记录只属于一个窗口 。
下面 , 我们从滚动窗口开始 , 进入Flink的源码进行深入分析window窗口的起始时间的计算逻辑 。
一、定义窗口类型和大小在Flink应用开发中 , 我们window算子来定义一个窗口 。
下面代码定义了一个窗口大小为5s的滚动窗口 。
DataStream window = watermarkStream.keyBy(0)
// 使用滚动窗口 , 间隔5s
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
文章插图
window(TumblingEventTimeWindows.of(size))这段代码 , window利用TumblingEventTimeWindows来分配元素 , 所以我们要了解的核心是TumblingEventTimeWindows.of(size)的定义 。
二、TumblingEventTimeWindows的定义可以看到通过of方法 , 我们构建了一个offset为0 , size为5的TumblingEventTimeWindows对象 , 然后就是我们需要的核心方法:assignWindows , 窗口分配元素的核心方法 。
public static TumblingEventTimeWindows of(Time size) {
return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
}
public static TumblingEventTimeWindows of(Time size, Time offset) {
return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
}
给元素分配窗口的核心方法:assignWindows源码如下所示 。
@Override
public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
其中 , start 表示窗口的开始时间 , 调用了TimeWindow.getWindowStartWithOffset(timestamp, offset, size) 方法 。
在确定 window开始时间之后 , 窗口的 end time = start time + windowsSize 就好了 。
三、获取窗口的开始时间戳TimeWindow.getWindowStartWithOffset方法源码如下 , 该方法用来获取窗口的开始时间戳 。
/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
代码很简单 , 核心计算公式是:timestamp - (timestamp - offset + windowSize) % windowSize
文章插图
四、测试窗口的开始时间戳按照上面的计算公式 , 我们编写一段代码计算时间戳所对应的窗口 。 在给定窗口大小的情况下 , 每个元素所属的滚动窗口的 start time计算逻辑 。
示例代码如下所示:
import java.text.SimpleDateFormat;
public class TimeWindowVerfier {
public static void main(String[] args) {
// 注意是毫秒为单位
long windowsize = 5000L; // 5s
// 注意是毫秒为单位 , 滚动窗口 offset = 0L
long offset = 0L;
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
// 测试 5s window窗口划分
long a1 = 1603595291000L;
long a2 = 1603595292000L;
long a3 = 1603595395100L;
【深入分析Flink 时间窗口的起始时间】 long a4 = 1603595495000L;
long b5 = 1603595496000L;
long b6 = 1603595292000L;
System.out.println(a1 + " -> " + format.format(a1) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(a1, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(a1, offset, windowsize)));
System.out.println(a2 + " -> " + format.format(a2) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(a2, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(a2, offset, windowsize)));
- 广告点击|广告效果评估:30天的广告时间评估最全面
- 要来|折叠屏iPhone终于要来了!可惜发布的时间有点晚,你愿意等吗?
- 智能手机品|越南手机悄然崛起!创立短短2年时间,在当地接连击退苹果、小米
- Android|索尼Android 11更新时间表公布 最早本月开始
- 华为荣耀|荣耀V40发布时间确定,被华为出售后,最高只能搭载骁龙865?
- 时间|19824.66美元!比特币突破近三年高价 分析师:创新高不代表行情将持续上升
- 用最短的时间,赚最高的工资
- 更改计算机待机睡眠状态时间方法,电脑设置关闭显示器时间教程
- 3天时间,我是如何解决redis bigkey 删除问题的?
- ICPC--1200:数组的距离时间限制&1201:众数问题