Logstash整合Kafka

前面我们已经实现通过Logstash读取track.log日志文件 , 然后写入到ES中 。 现在我们为了完善我们的日志收集系统架构 , 需要在中间添加Kafka消息队列做缓冲 。 这里我们使用了Logstash的Kafka插件来集成Kafka的 。 具体插件的官方地址如下:

新版本的Logstash已经默认安装好大部分的插件了 , 所以无需像1.x版本的Logstash还需要手动修改Gemfile的source , 然后手动安装插件了 。
环境说明
  • kafka_2.11-0.9.0.0
  • zookeeper-3.4.8
  • logstash-2.3.4
  • elasticsearch-2.3.5
具体环境的安装这里不做重点介绍 。
这里我们自己配置了一个logstash-shipper用来从track.log日志文件读取日志 , 并且写入到Kafka中 。 当然这里也可以由其他生产者来代替Logstash收集日志并且写入Kafka(比如:Flume等等) 。 这里我们是本地测试所以简单点直接使用Logstash读取本机的日志文件 , 然后写入到Kafka消息队列中 。
logstash-shipper-kafka.conf配置1234567891011121314151617181920212223input {file {path => ["/home/yunyu/Downloads/track.log"]type => "api"codec => "json"start_position => "beginning"# 设置是否忽略太旧的日志的# 如果没设置该属性可能会导致读取不到文件内容 , 因为我们的日志大部分是好几个月前的 , 所以这里设置为不忽略ignore_older => 0}}output {stdout {codec => rubydebug}kafka {# 指定Kafka集群地址bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"# 指定Kafka的Topictopic_id => "logstash_test"}}官网给出的注释
  • ignore_older
The default behavior of the file input plugin is to ignore files whose last modification is greater than 86400s. To change this default behavior and process the tutorial file (which date can be much older than a day), we need to specify to not ignore old files.
logstash-indexer-kafka.conf配置input {kafka {# 指定Zookeeper集群地址zk_connect => "hadoop1:2181,hadoop2:2181,hadoop3:2181"# 指定当前消费者的group_idgroup_id => "logstash"# 指定消费的Topictopic_id => "logstash_test"# 指定消费的内容类型(默认是json)codec => "json"# 设置Consumer消费者从Kafka最开始的消息开始消费 , 必须结合"auto_offset_reset => smallest"一起使用reset_beginning => true# 设置如果Consumer消费者还没有创建offset或者offset非法 , 从最开始的消息开始消费还是从最新的消息开始消费auto_offset_reset => "smallest"}}filter {# 将logs数组对象进行拆分split {field => "logs"}date {match => ["timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"]target => "@timestamp"}}output {stdout {codec => rubydebug}elasticsearch {codec => "json"hosts => ["hadoop1:9200", "hadoop2:9200", "hadoop3:9200"]index => "api_logs_index"workers => 1flush_size => 20000idle_flush_time => 10}}官网给出的注释
  • auto_offset_resetValue can be any of: largest, smallestDefault value is “largest”smallest or largest - (optional, default largest) If the consumer does not already have an established offset or offset is invalid, start with the earliest message present in the log (smallest) or after the last message in the log (largest).
  • reset_beginningValue type is booleanDefault value is falseReset the consumer group to start at the earliest message present in the log by clearing any offsets for the group stored in Zookeeper. This is destructive! Must be used in conjunction with auto_offset_reset ? smallest
Mapping配置{"mappings": {"_default_": {"_all": {"enabled": true},"dynamic_templates": [{"my_template": {"match_mapping_type": "string","mapping": {"type": "string","fields": {"raw": {"type": "string","index": "not_analyzed"}}}}}]},"api": {"properties": {"timestamp": {"format": "strict_date_optional_time||epoch_millis","type": "date"},"message": {"type": "string","index": "not_analyzed"},"level": {"type": "string"},"host": {"type": "string"},"logs": {"properties": {"uid": {"type": "long"},"status": {"type": "string"},"did": {"type": "long"},"device-id": {"type": "string"},"device_id": {"type": "string"},"errorMsg": {"type": "string"},"rpid": {"type": "string"},"url": {"type": "string"},"errorStatus": {"type": "long"},"ip": {"type": "string"},"timestamp": {"type": "string","index": "not_analyzed"},"hb_uid": {"type": "long"},"duid": {"type": "string"},"request": {"type": "string"},"name": {"type": "string"},"errorCode": {"type": "string"},"ua": {"type": "string"},"server_timestamp": {"type": "long"},"bid": {"type": "long"}}},"path": {"type": "string","index": "not_analyzed"},"type": {"type": "string","index": "not_analyzed"},"@timestamp": {"format": "strict_date_optional_time||epoch_millis","type": "date"},"@version": {"type": "string","index": "not_analyzed"}}}}}Elasticsearch 会自动使用自己的默认分词器(空格 , 点 , 斜线等分割)来分析字段 。 分词器对于搜索和评分是非常重要的 , 但是大大降低了索引写入和聚合请求的性能 。 所以 logstash 模板定义了一种叫”多字段”(multi-field)类型的字段 。 这种类型会自动添加一个 “.raw” 结尾的字段 , 并给这个字段设置为不启用分词器 。 简单说 , 你想获取 url 字段的聚合结果的时候 , 不要直接用 “url”, 而是用 “url.raw” 作为字段名 。