从未如此简单:10分钟带你逆袭Kafka!( 九 )

生产者测试类:在生产者测试类中 , 自己遇到一个坑 , 就是最后自己没有加 sleep , 就是怎么检查自己的代码都没有问题 , 但是最后就是没法发送成功消息 , 最后加了一个 sleep 就可以了 。
因为主函数 main 已经执行完退出 , 但是消息并没有发送完成 , 需要进行等待一下 。 当然 , 你在生产环境中可能不会遇到这样问题 , 呵呵!
代码如下:
import static java.lang.Thread.sleep;/*** @ClassName MyKafkaProducerTest* @Description TODO* @Author lingxiangxiang* @Date 3:46 PM* @Version 1.0**/ public class MyKafkaProducerTest {public static void main(String[] args) throws InterruptedException {MyKafkaProducer producer = new MyKafkaProducer();boolean result = producer.sendMsg();System.out.println("send msg " + result);sleep(1000);} } 消费者类:
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Collections; import java.util.Properties;/*** @ClassName MyKafkaConsumer* @Description TODO* @Author lingxiangxiang* @Date 4:12 PM* @Version 1.0**/ public class MyKafkaConsumer extends ShutdownableThread {private KafkaConsumer consumer;public MyKafkaConsumer() {super("KafkaConsumerTest", false);Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");properties.put("group.id", "mygroup");properties.put("enable.auto.commit", "true");properties.put("auto.commit.interval.ms", "1000");properties.put("session.timeout.ms", "30000");properties.put("heartbeat.interval.ms", "10000");properties.put("auto.offset.reset", "earliest");properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");this.consumer = new KafkaConsumer(properties);}@Overridepublic void doWork() {consumer.subscribe(Arrays.asList("test2"));ConsumerRecordsrecords = consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println("topic = " + record.topic());System.out.println("partition = " + record.partition());System.out.println("key = " + record.key());System.out.println("value = "http://kandian.youth.cn/index/+ record.value());}} } 消费者的测试类:
/*** @ClassName MyConsumerTest* @Description TODO* @Author lingxiangxiang* @Date 4:23 PM* @Version 1.0**/ public class MyConsumerTest {public static void main(String[] args) {MyKafkaConsumer consumer = new MyKafkaConsumer();consumer.start();System.out.println("==================");} }
从未如此简单:10分钟带你逆袭Kafka!文章插图
②消费者同步手动提交
前面的消费者都是以自动提交 Offset 的方式对 Broker 中的消息进行消费的 , 但自动提交 可能会出现消息重复消费的情况 。
所以在生产环境下 , 很多时候需要对 Offset 进行手动提交 ,以解决重复消费的问题 。
手动提交又可以划分为同步提交、异步提交 , 同异步联合提交 。 这些提交方式仅仅是 doWork() 方法不相同 , 其构造器是相同的 。
所以下面首先在前面消费者类的基础上进行构造器的修改 , 然后再分别实现三种不同的提交方式 。
同步提交方式是 , 消费者向 Broker 提交 Offset 后等待 Broker 成功响应 。 若没有收到响应 , 则会重新提交 , 直到获取到响应 。
而在这个等待过程中 , 消费者是阻塞的 。 其严重影响了消费者的吞吐量 。
修改前面的 MyKafkaConsumer.java, 主要修改下面的配置:
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Collections; import java.util.Properties;/*** @ClassName MyKafkaConsumer* @Description TODO* @Author lingxiangxiang* @Date 4:12 PM* @Version 1.0**/ public class MyKafkaConsumer extends ShutdownableThread {private KafkaConsumer consumer;public MyKafkaConsumer() {super("KafkaConsumerTest", false);Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");properties.put("group.id", "mygroup");// 这里要修改成手动提交properties.put("enable.auto.commit", "false");// properties.put("auto.commit.interval.ms", "1000");properties.put("session.timeout.ms", "30000");properties.put("heartbeat.interval.ms", "10000");properties.put("auto.offset.reset", "earliest");properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");this.consumer = new KafkaConsumer