" + format.format(a3) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(a3, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(a3, offse。深入分析Flink 时间窗口的起始时间( 二 )。" />

深入分析Flink 时间窗口的起始时间( 二 )


System.out.println(a3 + " -> " + format.format(a3) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(a3, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(a3, offset, windowsize)));
System.out.println(a4 + " -> " + format.format(a4) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(a4, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(a4, offset, windowsize)));
System.out.println(b5 + " -> " + format.format(b5) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(b5, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(b5, offset, windowsize)));
System.out.println(b6 + " -> " + format.format(b6) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(b6, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(b6, offset, windowsize)));
}
private static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
}
输出结果如下:
深入分析Flink 时间窗口的起始时间文章插图
通过上面的示例 , 首先 , 我们理解window的时间范围是一个自然时间范围 , 比如你定义了一个窗口:timeWindow(Time.seconds(5));那么其window会将窗口中的事件按照5s进行划分(左闭右开) 。
[10:10:00,10:10:05)
[10:10:05,10:10:10)
… …
[10:10:20,10:10:25)
当一个Event Time = 10:10:22的记录到来时就会生成如下窗口 , 此时这条消息就存放在这个窗口中:
[10:10:20,10:11:25)
五、窗口和时区一般情况下按小时、分钟、秒开窗时间都是对的 。
比如按小时 , EventTime:2020-2-15 21:57:40
窗口开始时间:2020-2-15 21:00:00
窗口结束时间:2020-2-15 22:00:00
但是按天开窗的时候由于国内时区问题可能会和设想的不一样 , 窗口默认开始时间是每天八点 。
窗口的开始时间是按照 TimeWindow 类的getWindowStartWithOffset方法计算 , 参数单位都是ms , windowSize是窗口长度 。
如果想要让窗口按一天滚动 , 0点到24点 , 需要设置第二个参数offset为16小时(或者-8小时) 。 如果不设置的话窗口默认是每天八点到第二天八点 。
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16)))
这样设置之后窗口就是按0点到0点开的 , 之后的ProcessFunction里面就可以取window的start、end了 。
下面仍然沿用之前的代码 , 简单调整一下参数:
import java.text.SimpleDateFormat;
public class TimeWindowVerfier {
public static void main(String[] args) {
// 注意是毫秒为单位 ,windowsize=1day
long windowsize = 86400000L;
// long windowsize = 5000L; // 5s
// 注意是毫秒为单位 , 滚动窗口 offset = 0L
long offset = 0L;
// 设置offset = -8小时
// long offset = -28800000L;
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
// 测试 1day window窗口划分
long a1 = 1577808000000L;
long a2 = 1577822400000L;
long a3 = 1577836799000L;
long a4 = 1577836801000L;
long b5 = 1577876400000L;
long b6 = 1577890800000L;
System.out.println(a1 + " -> " + format.format(a1) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(a1, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(a1, offset, windowsize)));
System.out.println(a2 + " -> " + format.format(a2) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(a2, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(a2, offset, windowsize)));
System.out.println(a3 + " -> " + format.format(a3) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(a3, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(a3, offset, windowsize)));
System.out.println(a4 + " -> " + format.format(a4) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(a4, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(a4, offset, windowsize)));
System.out.println(b5 + " -> " + format.format(b5) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(b5, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(b5, offset, windowsize)));
System.out.println(b6 + " -> " + format.format(b6) + "\t所属窗口的起始时间是: " + getWindowStartWithOffset(b6, offset, windowsize) + " -> " + format.format(getWindowStartWithOffset(b6, offset, windowsize)));
}
private static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
}
其中 long offset = 0L , 输出结果如下:
深入分析Flink 时间窗口的起始时间文章插图