从未如此简单:10分钟带你逆袭Kafka!( 九 )
生产者测试类:在生产者测试类中 , 自己遇到一个坑 , 就是最后自己没有加 sleep , 就是怎么检查自己的代码都没有问题 , 但是最后就是没法发送成功消息 , 最后加了一个 sleep 就可以了 。
因为主函数 main 已经执行完退出 , 但是消息并没有发送完成 , 需要进行等待一下 。 当然 , 你在生产环境中可能不会遇到这样问题 , 呵呵!
代码如下:
import static java.lang.Thread.sleep;/*** @ClassName MyKafkaProducerTest* @Description TODO* @Author lingxiangxiang* @Date 3:46 PM* @Version 1.0**/ public class MyKafkaProducerTest {public static void main(String[] args) throws InterruptedException {MyKafkaProducer producer = new MyKafkaProducer();boolean result = producer.sendMsg();System.out.println("send msg " + result);sleep(1000);} }
消费者类:
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Collections; import java.util.Properties;/*** @ClassName MyKafkaConsumer* @Description TODO* @Author lingxiangxiang* @Date 4:12 PM* @Version 1.0**/ public class MyKafkaConsumer extends ShutdownableThread {private KafkaConsumer
消费者的测试类:
/*** @ClassName MyConsumerTest* @Description TODO* @Author lingxiangxiang* @Date 4:23 PM* @Version 1.0**/ public class MyConsumerTest {public static void main(String[] args) {MyKafkaConsumer consumer = new MyKafkaConsumer();consumer.start();System.out.println("==================");} }
文章插图
②消费者同步手动提交
前面的消费者都是以自动提交 Offset 的方式对 Broker 中的消息进行消费的 , 但自动提交 可能会出现消息重复消费的情况 。
所以在生产环境下 , 很多时候需要对 Offset 进行手动提交 ,以解决重复消费的问题 。
手动提交又可以划分为同步提交、异步提交 , 同异步联合提交 。 这些提交方式仅仅是 doWork() 方法不相同 , 其构造器是相同的 。
所以下面首先在前面消费者类的基础上进行构造器的修改 , 然后再分别实现三种不同的提交方式 。
同步提交方式是 , 消费者向 Broker 提交 Offset 后等待 Broker 成功响应 。 若没有收到响应 , 则会重新提交 , 直到获取到响应 。
而在这个等待过程中 , 消费者是阻塞的 。 其严重影响了消费者的吞吐量 。
修改前面的 MyKafkaConsumer.java, 主要修改下面的配置:
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Collections; import java.util.Properties;/*** @ClassName MyKafkaConsumer* @Description TODO* @Author lingxiangxiang* @Date 4:12 PM* @Version 1.0**/ public class MyKafkaConsumer extends ShutdownableThread {private KafkaConsumer
- 页面|如何简单、快速制作流程图?上班族的画图技巧get
- 公式|?有人把 5G 讲得这么简单明了
- 简单|互联网巨头夺走菜贩生计?未必那么简单
- 简单|密码太难记不住,太简单不安全,怎么办?
- 手机|OPPO手机该如何截屏?四种最简单的方法已汇总!
- 加拿大|上演戏剧性一幕!iPhone12最新售价确定,苹果也没想到降价如此快
- 车机|未来汽车,先从未来的"车机"开始
- 动手做|动手做一个最简单的加法计算器
- 简单|行车记录仪数据丢失怎么办?想要简单恢复,那还不试一下这个?
- 打印机的共享设置方法,简单6部搞定!