Logstash整合Kafka

前面我们已经实现通过Logstash读取track.log日志文件 , 然后写入到ES中 。 现在我们为了完善我们的日志收集系统架构 , 需要在中间添加Kafka消息队列做缓冲 。 这里我们使用了Logstash的Kafka插件来集成Kafka的 。 具体插件的官方地址如下:

新版本的Logstash已经默认安装好大部分的插件了 , 所以无需像1.x版本的Logstash还需要手动修改Gemfile的source , 然后手动安装插件了 。
环境说明
  • kafka_2.11-0.9.0.0
  • zookeeper-3.4.8
  • logstash-2.3.4
  • elasticsearch-2.3.5
具体环境的安装这里不做重点介绍 。
这里我们自己配置了一个logstash-shipper用来从track.log日志文件读取日志 , 并且写入到Kafka中 。 当然这里也可以由其他生产者来代替Logstash收集日志并且写入Kafka(比如:Flume等等) 。 这里我们是本地测试所以简单点直接使用Logstash读取本机的日志文件 , 然后写入到Kafka消息队列中 。
logstash-shipper-kafka.conf配置1234567891011121314151617181920212223input {file {path => ["/home/yunyu/Downloads/track.log"]type => "api"codec => "json"start_position => "beginning"# 设置是否忽略太旧的日志的# 如果没设置该属性可能会导致读取不到文件内容 , 因为我们的日志大部分是好几个月前的 , 所以这里设置为不忽略ignore_older => 0}}output {stdout {codec => rubydebug}kafka {# 指定Kafka集群地址bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"# 指定Kafka的Topictopic_id => "logstash_test"}}官网给出的注释
  • ignore_older
The default behavior of the file input plugin is to ignore files whose last modification is greater than 86400s. To change this default behavior and process the tutorial file (which date can be much older than a day), we need to specify to not ignore old files.
logstash-indexer-kafka.conf配置input {kafka {# 指定Zookeeper集群地址zk_connect => "hadoop1:2181,hadoop2:2181,hadoop3:2181"# 指定当前消费者的group_idgroup_id => "logstash"# 指定消费的Topictopic_id => "logstash_test"# 指定消费的内容类型(默认是json)codec => "json"# 设置Consumer消费者从Kafka最开始的消息开始消费 , 必须结合"auto_offset_reset => smallest"一起使用reset_beginning => true# 设置如果Consumer消费者还没有创建offset或者offset非法 , 从最开始的消息开始消费还是从最新的消息开始消费auto_offset_reset => "smallest"}}filter {# 将logs数组对象进行拆分split {field => "logs"}date {match => ["timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"]target => "@timestamp"}}output {stdout {codec => rubydebug}elasticsearch {codec => "json"hosts => ["hadoop1:9200", "hadoop2:9200", "hadoop3:9200"]index => "api_logs_index"workers => 1flush_size => 20000idle_flush_time => 10}}官网给出的注释