从未如此简单:10分钟带你逆袭Kafka!( 十 )

(properties);}@Overridepublic void doWork() {consumer.subscribe(Arrays.asList("test2"));ConsumerRecordsrecords = consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println("topic = " + record.topic());System.out.println("partition = " + record.partition());System.out.println("key = " + record.key());System.out.println("value = "http://kandian.youth.cn/index/+ record.value());//手动同步提交consumer.commitSync();}} } ③消费者异步手工提交
手动同步提交方式需要等待 Broker 的成功响应 , 效率太低 , 影响消费者的吞吐量 。
异步提交方式是 , 消费者向 Broker 提交 Offset 后不用等待成功响应 , 所以其增加了消费者的吞吐量 。
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Collections; import java.util.Properties;/*** @ClassName MyKafkaConsumer* @Description TODO* @Author lingxiangxiang* @Date 4:12 PM* @Version 1.0**/ public class MyKafkaConsumer extends ShutdownableThread {private KafkaConsumer consumer;public MyKafkaConsumer() {super("KafkaConsumerTest", false);Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");properties.put("group.id", "mygroup");// 这里要修改成手动提交properties.put("enable.auto.commit", "false");// properties.put("auto.commit.interval.ms", "1000");properties.put("session.timeout.ms", "30000");properties.put("heartbeat.interval.ms", "10000");properties.put("auto.offset.reset", "earliest");properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");this.consumer = new KafkaConsumer(properties);}@Overridepublic void doWork() {consumer.subscribe(Arrays.asList("test2"));ConsumerRecordsrecords = consumer.poll(1000);for (ConsumerRecord record : records) {System.out.println("topic = " + record.topic());System.out.println("partition = " + record.partition());System.out.println("key = " + record.key());System.out.println("value = "http://kandian.youth.cn/index/+ record.value());//手动同步提交// consumer.commitSync();//手动异步提交// consumer.commitAsync();// 带回调公共的手动异步提交consumer.commitAsync((offsets, e) -> {if(e != null) {System.out.println("提交次数, offsets = " + offsets);System.out.println("exception = " + e);}});}} } Spring Boot 使用 Kafka
现在大家的开发过程中 , 很多都用的是 Spring Boot 的项目 , 直接启动了 , 如果还是用原生的 API , 就是有点 Low 了啊 , 那 Kafka 是如何和 Spring Boot 进行联合的呢?
maven 配置:
org.apache.kafkakafka-clients2.1.1 添加配置文件 , 在 application.properties 中加入如下配置信息:
Kafka 连接地址:
spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 生产者:
spring.kafka.producer.acks = 0 spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.retries = 3 spring.kafka.producer.batch-size = 4096 spring.kafka.producer.buffer-memory = 33554432 spring.kafka.producer.compression-type = gzip 消费者:
spring.kafka.consumer.group-id = mygroup spring.kafka.consumer.auto-commit-interval = 5000 spring.kafka.consumer.heartbeat-interval = 3000 spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.auto-offset-reset = earliest spring.kafka.consumer.enable-auto-commit = true # listenner, 标识消费者监听的个数 spring.kafka.listener.concurrency = 8 # topic的名字 kafka.topic1 = topic1 生产者:
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate;@Service @Slf4j public class MyKafkaProducerServiceImpl implements MyKafkaProducerService {@Resourceprivate KafkaTemplate kafkaTemplate;// 读取配置文件@Value("${kafka.topic1}")private String topic;@Overridepublic void sendKafka() {kafkaTemplate.send(topic, "hell world");} } 消费者:
@Component @Slf4j public class MyKafkaConsumer {@KafkaListener(topics = "${kafka.topic1}")public void listen(ConsumerRecord record) {Optional kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {log.info("----------------- record =" + record);log.info("------------------ message =" + kafkaMessage.get()); }