从未如此简单:10分钟带你逆袭Kafka!( 十 )
手动同步提交方式需要等待 Broker 的成功响应 , 效率太低 , 影响消费者的吞吐量 。
异步提交方式是 , 消费者向 Broker 提交 Offset 后不用等待成功响应 , 所以其增加了消费者的吞吐量 。
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Collections; import java.util.Properties;/*** @ClassName MyKafkaConsumer* @Description TODO* @Author lingxiangxiang* @Date 4:12 PM* @Version 1.0**/ public class MyKafkaConsumer extends ShutdownableThread {private KafkaConsumer
Spring Boot 使用 Kafka
现在大家的开发过程中 , 很多都用的是 Spring Boot 的项目 , 直接启动了 , 如果还是用原生的 API , 就是有点 Low 了啊 , 那 Kafka 是如何和 Spring Boot 进行联合的呢?
maven 配置:
添加配置文件 , 在 application.properties 中加入如下配置信息:
Kafka 连接地址:
spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094
生产者:
spring.kafka.producer.acks = 0 spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.retries = 3 spring.kafka.producer.batch-size = 4096 spring.kafka.producer.buffer-memory = 33554432 spring.kafka.producer.compression-type = gzip
消费者:
spring.kafka.consumer.group-id = mygroup spring.kafka.consumer.auto-commit-interval = 5000 spring.kafka.consumer.heartbeat-interval = 3000 spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.auto-offset-reset = earliest spring.kafka.consumer.enable-auto-commit = true # listenner, 标识消费者监听的个数 spring.kafka.listener.concurrency = 8 # topic的名字 kafka.topic1 = topic1
生产者:
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate;@Service @Slf4j public class MyKafkaProducerServiceImpl implements MyKafkaProducerService {@Resourceprivate KafkaTemplate kafkaTemplate;// 读取配置文件@Value("${kafka.topic1}")private String topic;@Overridepublic void sendKafka() {kafkaTemplate.send(topic, "hell world");} }
消费者:
@Component @Slf4j public class MyKafkaConsumer {@KafkaListener(topics = "${kafka.topic1}")public void listen(ConsumerRecord, ?> record) {Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {log.info("----------------- record =" + record);log.info("------------------ message =" + kafkaMessage.get()); }
- 页面|如何简单、快速制作流程图?上班族的画图技巧get
- 公式|?有人把 5G 讲得这么简单明了
- 简单|互联网巨头夺走菜贩生计?未必那么简单
- 简单|密码太难记不住,太简单不安全,怎么办?
- 手机|OPPO手机该如何截屏?四种最简单的方法已汇总!
- 加拿大|上演戏剧性一幕!iPhone12最新售价确定,苹果也没想到降价如此快
- 车机|未来汽车,先从未来的"车机"开始
- 动手做|动手做一个最简单的加法计算器
- 简单|行车记录仪数据丢失怎么办?想要简单恢复,那还不试一下这个?
- 打印机的共享设置方法,简单6部搞定!