从未如此简单:10分钟带你逆袭Kafka!( 七 )


从未如此简单:10分钟带你逆袭Kafka!文章插图
方案三:设置前提条件
为更新的数据设置前置条件另外一种实现幂等的思路是 , 给数据变更设置一个前置条件 , 如果满足条件就更新数据 , 否则拒绝更新数据 , 在更新数据的时候 , 同时变更前置条件中需要判断的数据 。
这样 , 重复执行这个操作时 , 由于第一次更新数据的时候已经变更了前置条件中需要判断的数据 , 不满足前置条件 , 则不会重复执行更新数据操作 。
比如 , 刚刚我们说过 , “将 X 老师的账户的余额增加 100 万元”这个操作并不满足幂等性 , 我们可以把这个操作加上一个前置条件 , 变为:“如果 X 老师的账户当前的余额为 500 万元 , 将余额加 100 万元” , 这个操作就具备了幂等性 。
对应到消息队列中的使用时 , 可以在发消息时在消息体中带上当前的余额 , 在消费的时候进行判断数据库中 , 当前余额是否与消息中的余额相等 , 只有相等才执行变更操作 。
但是 , 如果我们要更新的数据不是数值 , 或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?
更加通用的方法是 , 给你的数据增加一个版本号属性 , 每次更数据前 , 比较当前数据的版本号是否和消息中的版本号一致 , 如果不一致就拒绝更新数据 , 更新数据的同时将版本号 +1 , 一样可以实现幂等 。
从未如此简单:10分钟带你逆袭Kafka!文章插图
Kafka 集群搭建
我们在工作中 , 为了保证环境的高可用 , 防止单点 , Kafka 都是以集群的方式出现的 , 下面就带领大家一起搭建一套 Kafka 集群环境 。
我们在官网下载 Kafka , 下载地址为:downloads , 下载我们需要的版本 , 推荐使用稳定的版本 。
搭建集群
①下载并解压
cd /usr/local/src wgetmkdir /data/servers tar xzvf kafka_2.11-2.4.0.tgz -C /data/servers/ cd /data/servers/kafka_2.11-2.4.0 ②修改配置文件
Kafka 的配置文件 $KAFKA_HOME/config/server.properties , 主要修改一下下面几项:
确保每个机器上的id不一样broker.id=0配置服务端的监控地址listeners=PLAINTEXT://192.168.51.128:9092kafka 日志目录log.dirs=/data/servers/kafka_2.11-2.4.0/logs#kafka设置的partitons的个数num.partitions=1zookeeper的连接地址, 如果有自己的zookeeper集群, 请直接使用自己搭建的zookeeper集群zookeeper.connect=192.168.51.128:2181 因为我自己是本机做实验 , 所有使用的是一个主机的不同端口 , 在线上 , 就是不同的机器 , 大家参考即可 。
我们这里使用 Kafka 的 Zookeeper , 只启动一个节点 , 但是正真的生产过程中 , 是需要 Zookeeper 集群 , 自己搭建就好 , 后期我们也会出 Zookeeper 的教程 , 大家请关注就好了 。
③拷贝 3 份配置文件
#创建对应的日志目录 mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9092 mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9093 mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9094#拷贝三份配置文件 cp server.properties server_9092.propertiescp server.properties server_9093.propertiescp server.properties server_9094.properties④修改不同端口对应的文件
#9092的id为0, 9093的id为1, 9094的id为2broker.id=0# 配置服务端的监控地址, 分别在不通的配置文件中写入不同的端口listeners=PLAINTEXT://192.168.51.128:9092# kafka 日志目录, 目录也是对应不同的端口log.dirs=/data/servers/kafka_2.11-2.4.0/logs/9092# kafka设置的partitons的个数num.partitions=1# zookeeper的连接地址, 如果有自己的zookeeper集群, 请直接使用自己搭建的zookeeper集群zookeeper.connect=192.168.51.128:2181 修改 Zookeeper 的配置文件:
dataDir=/data/servers/zookeeper server.1=192.168.51.128:2888:3888 然后创建 Zookeeper 的 myid 文件:
echo "1"> /data/servers/zookeeper/myid ⑤启动 Zookeeper
使用 Kafka 内置的 Zookeeper:
cd /data/servers/kafka_2.11-2.4.0/bin zookeeper-server-start.sh -daemon ../config/zookeeper.propertiesnetstat -anp |grep 2181 启动 Kafka:
./kafka-server-start.sh -daemon ../config/server_9092.properties./kafka-server-start.sh -daemon ../config/server_9093.properties./kafka-server-start.sh -daemon ../config/server_9094.propertiesKafka 的操作
①Topic
我们先来看一下创建 Topic 常用的参数吧: