flink消费kafka的offset与checkpoint( 三 )

观察topic "test"的“消费组积压数量” , 发现LAG还是1:
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtest1341---test0330---test2660---2020年10月18日 星期日 20时28分39秒 CSTRdeMacBook-Pro:kafka r$ flink使用savepoint启动作业 , 注意参数"-s":
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d -s 'hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60' ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID d6cb6e1a6f9c0816ac4b61a1df38ddeb[econ@dev-hadoop-node-c ~]$ 观察"kafka-console-consumer.sh"消费topic "test2"的情况 , 没有新的消息被打印:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2再观察“消费组积压数量” , 发现LAG值已经全部是0 。
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtest1440---test0330---test2660---2020年10月18日 星期日 20时31分43秒 CSTRdeMacBook-Pro:kafka r$ 证明:flink使用savepoint启动作业 , 不会重复消费kafka数据 , 也会正确更新kafka的offset 。
重申 , 以上试验证明:

  1. flink消费了kafka数据后 , 不会更新offset到kafka , 直到checkpoint完成 。
  2. flink在没有使用savepoint重启作业的时候 , 消费kafka的offset还是从kafka自身获取 , 存在重复消费数据的情况 。
  3. flink使用savepoint重启作业 , 不会重复消费kafka数据 , 也会正确更新kafka的offset 。