从未如此简单:10分钟带你逆袭Kafka!( 八 )

  • --topic
  • --zookeeper
  • 示例:
    cd /data/servers/kafka_2.11-2.4.0/bin # 创建topictest1 kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test1 # 创建topic test2 kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test2 # 查看topic kafka-topics.sh --list --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094②自动创建 Topic
    我们在工作中 , 如果我们不想去管理 Topic , 可以通过 Kafka 的配置文件来管理 。
    我们可以让 Kafka 自动创建 Topic , 需要在我们的 Kafka 配置文件中加入如下配置文件:
    auto.create.topics.enable=true 如果删除 Topic 想达到物理删除的目的 , 也是需要配置的:
    delete.topic.enable=true ③发送消息
    他们可以通过客户端的命令生产消息 , 先来看看 kafka-console-producer.sh 常用的几个参数吧:
    • --topic :指定 topic
    • --timeout :超时时间
    • --sync:异步发送消息
    • --broker-list :官网提示: REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.
    这个参数是必须的:
    kafka-console-producer.sh --broker-list 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 ④消费消息
    我们也还是先来看看 kafka-console-consumer.sh 的参数吧:
    • --topic :指定 topic
    • --group :指定消费者组
    • --from-beginning:指定从开始进行消费, 如果不指定, 就从当前进行消费
    • --bootstrap-server:Kafka 的连接地址
    kafka-console-consumer.sh --bootstrap-server 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 ---beginning
    从未如此简单:10分钟带你逆袭Kafka!文章插图
    Kafka 的日志
    Kafka 的日志分两种:
    • 第一种日志是我们的 Kafka 的启动日志 , 就是我们排查问题 , 查看报错信息的日志 。
    • 第二种日志就是我们的数据日志 , Kafka 是我们的数据是以日志的形式存在存盘中的 , 我们第二种所说的日志就是我们的 Partiton 与 Segment 。
    那我们就来说说备份和分区吧:我们创建一个分区 , 一个备份 , 那么 test 就应该在三台机器上或者三个数据目录只有一个 test-0 。 (分区的下标是从 0 开始的)
    如果我们创建 N 个分区 , 我们就会在三个服务器上发现 , test_0-n , 如果我们创建 M 个备份 , 我们就会在发现 , test_0 到 test_n 每一个都是 M 个 。
    Kafka API
    使用 Kafka 原生的 API
    ①消费者自动提交
    定义自己的生产者:
    import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;import java.util.Properties;/*** @ClassName MyKafkaProducer* @Description TODO* @Author lingxiangxiang* @Date 3:37 PM* @Version 1.0**/ public class MyKafkaProducer {private org.apache.kafka.clients.producer.KafkaProducer producer;public MyKafkaProducer() {Properties properties = new Properties();properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094");properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 设置批量发送properties.put("batch.size", 16384);// 批量发送的等待时间50ms, 超过50ms, 不足批量大小也发送properties.put("linger.ms", 50);this.producer = new org.apache.kafka.clients.producer.KafkaProducer(properties);}public boolean sendMsg() {boolean result = true;try {// 正常发送, test2是topic, 0代表的是分区, 1代表的是key, hello world是发送的消息内容final ProducerRecord record = new ProducerRecord("test2", 0, 1, "hello world");producer.send(record);// 有回调函数的调用producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println(recordMetadata.topic());System.out.println(recordMetadata.partition());System.out.println(recordMetadata.offset());}});// 自己定义一个类producer.send(record, new MyCallback(record));} catch (Exception e) {result = false;}return result;} } 定义生产者发送成功的回调函数:
    import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata;/*** @ClassName MyCallback* @Description TODO* @Author lingxiangxiang* @Date 3:51 PM* @Version 1.0**/ public class MyCallback implements Callback {private Object msg;public MyCallback(Object msg) {this.msg = msg;}@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {System.out.println("topic = " + metadata.topic());System.out.println("partiton = " + metadata.partition());System.out.println("offset = " + metadata.offset());System.out.println(msg);} }