Logstash整合Kafka
前面我们已经实现通过Logstash读取track.log日志文件 , 然后写入到ES中 。 现在我们为了完善我们的日志收集系统架构 , 需要在中间添加Kafka消息队列做缓冲 。 这里我们使用了Logstash的Kafka插件来集成Kafka的 。 具体插件的官方地址如下:
环境说明
- kafka_2.11-0.9.0.0
- zookeeper-3.4.8
- logstash-2.3.4
- elasticsearch-2.3.5
这里我们自己配置了一个logstash-shipper用来从track.log日志文件读取日志 , 并且写入到Kafka中 。 当然这里也可以由其他生产者来代替Logstash收集日志并且写入Kafka(比如:Flume等等) 。 这里我们是本地测试所以简单点直接使用Logstash读取本机的日志文件 , 然后写入到Kafka消息队列中 。
logstash-shipper-kafka.conf配置
1234567891011121314151617181920212223
input {file {path => ["/home/yunyu/Downloads/track.log"]type => "api"codec => "json"start_position => "beginning"# 设置是否忽略太旧的日志的# 如果没设置该属性可能会导致读取不到文件内容 , 因为我们的日志大部分是好几个月前的 , 所以这里设置为不忽略ignore_older => 0}}output {stdout {codec => rubydebug}kafka {# 指定Kafka集群地址bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"# 指定Kafka的Topictopic_id => "logstash_test"}}
官网给出的注释- ignore_older
logstash-indexer-kafka.conf配置
input {kafka {# 指定Zookeeper集群地址zk_connect => "hadoop1:2181,hadoop2:2181,hadoop3:2181"# 指定当前消费者的group_idgroup_id => "logstash"# 指定消费的Topictopic_id => "logstash_test"# 指定消费的内容类型(默认是json)codec => "json"# 设置Consumer消费者从Kafka最开始的消息开始消费 , 必须结合"auto_offset_reset => smallest"一起使用reset_beginning => true# 设置如果Consumer消费者还没有创建offset或者offset非法 , 从最开始的消息开始消费还是从最新的消息开始消费auto_offset_reset => "smallest"}}filter {# 将logs数组对象进行拆分split {field => "logs"}date {match => ["timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"]target => "@timestamp"}}output {stdout {codec => rubydebug}elasticsearch {codec => "json"hosts => ["hadoop1:9200", "hadoop2:9200", "hadoop3:9200"]index => "api_logs_index"workers => 1flush_size => 20000idle_flush_time => 10}}
官网给出的注释- auto_offset_resetValue can be any of: largest, smallestDefault value is “largest”smallest or largest - (optional, default largest) If the consumer does not already have an established offset or offset is invalid, start with the earliest message present in the log (smallest) or after the last message in the log (largest).
- reset_beginningValue type is booleanDefault value is falseReset the consumer group to start at the earliest message present in the log by clearing any offsets for the group stored in Zookeeper. This is destructive! Must be used in conjunction with auto_offset_reset ? smallest
- 人脸识别设备主板如何选型 软硬整合大幅缩短开发时间
- 三星公布2021年款电视阵容:屏幕技术大升级 整合Google Duo等服务
- 整合零代码+AI+云原生技术,「速优云」布局智慧教培和智慧社区
- 整合K12业务 在线教育企业跟谁学升级旗下高途课堂
- 全力推进手机×AIoT战略 小米宣布整合成立三大部门:直接向雷军汇报
- 互联网企业都在用的Kafka为什么可以这么快?
- Kafka支持的分布式架构超越经典软件设计的五个原因
- flink消费kafka的offset与checkpoint
- 使用Kafka和Kafka Stream设计高可用任务调度
- 微软已经完成将Pinterest整合到Edge收藏夹的工作