flink消费kafka的offset与checkpoint( 二 )
<>(topic, new SimpleStringSchema(), properties);DataStreamSource stringDataStreamSource = env.addSource(stringFlinkKafkaConsumer);String producerTopic = "test2";FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer<>(producerTopic, new KafkaSerializationSchema() {@Overridepublic ProducerRecord serialize(String element, @Nullable Long timestamp) {return new ProducerRecord<>(producerTopic, element.getBytes());}}, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);stringDataStreamSource.addSink(kafkaProducer);env.execute("TestKafkaOffsetCheckpointJob");}}提交作业:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 5fdd14f7fd3c93287635c9d61180d8a6[econ@dev-hadoop-node-c ~]$
使用"kafka-console-producer.sh"往topic "test"生成消息"a1":
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:>a1>
使用"kafka-console-consumer.sh"消费topic "test2"的消息:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1
证明作业逻辑本身没有问题 , 实现' 从topic "test"读取数据 , 然后写入topic "test2" ' 。
【flink消费kafka的offset与checkpoint】使用"kafka-consumer-groups.sh"观察消费组"prod-econ-flink-TestKafkaOffsetCheckpointJob-local"的积压数量 , 重点观察指标"LAG" , 可以看到LAG为1 :
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtest1330---test0330---test2561---2020年10月18日 星期日 20时09分45秒 CSTRdeMacBook-Pro:kafka r$
证明flink消费了kafka数据后 , 不会更新offset到kafka 。
停止作业:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 5fdd14f7fd3c93287635c9d61180d8a6Suspending job "5fdd14f7fd3c93287635c9d61180d8a6" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-5fdd14-53dfd9f8eccd[econ@dev-hadoop-node-c ~]$
再次启动作业 , 但是 , 不使用上面生成的savepoint:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink run -m dev-hadoop-node-c:8081 -c "com.econ.powercloud.jobsTest.TestKafkaOffsetCheckpointJob" -d ~/powercloud-flink-1.0.20201016.jar --bootstrap.servers localhost:9092Job has been submitted with JobID 130568a2eeec96296237ed3e1f280f83[econ@dev-hadoop-node-c ~]$
观察topic "test2" , 发现 , 同样的数据"a1"被生产进入:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1
证明:flink在没有使用savepoint的时候 , 消费kafka的offset还是从kafka自身获取 。
再仔细观察topic "test"的“消费组积压数量” , 注意在"20时10分05秒"还观察到积压数值1 , 但是在"20时10分08秒"就发现积压数值都是0.
RdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtest1330---test0330---test2561---2020年10月18日 星期日 20时10分05秒 CSTRdeMacBook-Pro:kafka r$ ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group prod-econ-flink-TestKafkaOffsetCheckpointJob-local --describe; date; Consumer group 'prod-econ-flink-TestKafkaOffsetCheckpointJob-local' has no active members.TOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAGCONSUMER-IDHOSTCLIENT-IDtest1330---test0330---test2660---2020年10月18日 星期日 20时10分08秒 CSTRdeMacBook-Pro:kafka r$
这是因为 , 在"20:10:06"完成了一次checkpoint , 把offset更新回kafka 。
文章插图
Flink Checkpoint History
下面接着测试flink使用savepoint的情况下 , 是否会重复消费kafka数据 。
使用"kafka-console-producer.sh"往topic "test"生成消息"a2":
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-producer.sh --topic test --broker-list localhost:9092>a1>a2>
使用"kafka-console-consumer.sh"消费topic "test2"的消息:
RdeMacBook-Pro:kafka r$ ./bin/kafka-console-consumer.sh --topic test2 --bootstrap-server localhost:9092a1a1a2
停止作业:
[econ@dev-hadoop-node-c ~]$ /opt/flink-1.11.1/bin/flink stop -m dev-hadoop-node-c:8081 bb8b4ba7ddaad869c6469fab5e81d179Suspending job "bb8b4ba7ddaad869c6469fab5e81d179" with a savepoint.Savepoint completed. Path: hdfs://nameservice1/flink1.11/flink-savepoints/savepoint-bb8b4b-99016a1c3e60[econ@dev-hadoop-node-c ~]$
- 会员|美容院使用会员管理软件给顾客更好的消费体验!
- 死亡|这届年轻人不讲武德,旧消费主义社会性死亡
- 好消息|好消息!双十二实体店消费券已经开领
- 借贷消费|花呗该为网贷背锅吗
- 消费|在日活6亿+的抖音上,新消费品牌如何玩好流量生意?
- 市场|高明市场监管部门发布重要消费警示!
- 外卖|美团将增加一个“新功能”,外卖员迎来新时代,消费者会面临什么
- 上涨|美团营收全面上涨
- 消费|宿言:早看早应用~提升零售产品销售额的9个手段!
- 投诉|巴西“黑五”活动投诉增多 消费者最不满误导性广告