Logstash整合Kafka
前面我们已经实现通过Logstash读取track.log日志文件 , 然后写入到ES中 。 现在我们为了完善我们的日志收集系统架构 , 需要在中间添加Kafka消息队列做缓冲 。 这里我们使用了Logstash的Kafka插件来集成Kafka的 。 具体插件的官方地址如下:
环境说明
- kafka_2.11-0.9.0.0
- zookeeper-3.4.8
- logstash-2.3.4
- elasticsearch-2.3.5
这里我们自己配置了一个logstash-shipper用来从track.log日志文件读取日志 , 并且写入到Kafka中 。 当然这里也可以由其他生产者来代替Logstash收集日志并且写入Kafka(比如:Flume等等) 。 这里我们是本地测试所以简单点直接使用Logstash读取本机的日志文件 , 然后写入到Kafka消息队列中 。
logstash-shipper-kafka.conf配置
1234567891011121314151617181920212223
input {file {path => ["/home/yunyu/Downloads/track.log"]type => "api"codec => "json"start_position => "beginning"# 设置是否忽略太旧的日志的# 如果没设置该属性可能会导致读取不到文件内容 , 因为我们的日志大部分是好几个月前的 , 所以这里设置为不忽略ignore_older => 0}}output {stdout {codec => rubydebug}kafka {# 指定Kafka集群地址bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"# 指定Kafka的Topictopic_id => "logstash_test"}}
官网给出的注释- ignore_older
logstash-indexer-kafka.conf配置
input {kafka {# 指定Zookeeper集群地址zk_connect => "hadoop1:2181,hadoop2:2181,hadoop3:2181"# 指定当前消费者的group_idgroup_id => "logstash"# 指定消费的Topictopic_id => "logstash_test"# 指定消费的内容类型(默认是json)codec => "json"# 设置Consumer消费者从Kafka最开始的消息开始消费 , 必须结合"auto_offset_reset => smallest"一起使用reset_beginning => true# 设置如果Consumer消费者还没有创建offset或者offset非法 , 从最开始的消息开始消费还是从最新的消息开始消费auto_offset_reset => "smallest"}}filter {# 将logs数组对象进行拆分split {field => "logs"}date {match => ["timestamp", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"]target => "@timestamp"}}output {stdout {codec => rubydebug}elasticsearch {codec => "json"hosts => ["hadoop1:9200", "hadoop2:9200", "hadoop3:9200"]index => "api_logs_index"workers => 1flush_size => 20000idle_flush_time => 10}}
官网给出的注释- auto_offset_resetValue can be any of: largest, smallestDefault value is “largest”smallest or largest - (optional, default largest) If the consumer does not already have an established offset or offset is invalid, start with the earliest message present in the log (smallest) or after the last message in the log (largest).
- reset_beginningValue type is booleanDefault value is falseReset the consumer group to start at the earliest message present in the log by clearing any offsets for the group stored in Zookeeper. This is destructive! Must be used in conjunction with auto_offset_reset ? smallest
{"mappings": {"_default_": {"_all": {"enabled": true},"dynamic_templates": [{"my_template": {"match_mapping_type": "string","mapping": {"type": "string","fields": {"raw": {"type": "string","index": "not_analyzed"}}}}}]},"api": {"properties": {"timestamp": {"format": "strict_date_optional_time||epoch_millis","type": "date"},"message": {"type": "string","index": "not_analyzed"},"level": {"type": "string"},"host": {"type": "string"},"logs": {"properties": {"uid": {"type": "long"},"status": {"type": "string"},"did": {"type": "long"},"device-id": {"type": "string"},"device_id": {"type": "string"},"errorMsg": {"type": "string"},"rpid": {"type": "string"},"url": {"type": "string"},"errorStatus": {"type": "long"},"ip": {"type": "string"},"timestamp": {"type": "string","index": "not_analyzed"},"hb_uid": {"type": "long"},"duid": {"type": "string"},"request": {"type": "string"},"name": {"type": "string"},"errorCode": {"type": "string"},"ua": {"type": "string"},"server_timestamp": {"type": "long"},"bid": {"type": "long"}}},"path": {"type": "string","index": "not_analyzed"},"type": {"type": "string","index": "not_analyzed"},"@timestamp": {"format": "strict_date_optional_time||epoch_millis","type": "date"},"@version": {"type": "string","index": "not_analyzed"}}}}}
Elasticsearch 会自动使用自己的默认分词器(空格 , 点 , 斜线等分割)来分析字段 。 分词器对于搜索和评分是非常重要的 , 但是大大降低了索引写入和聚合请求的性能 。 所以 logstash 模板定义了一种叫”多字段”(multi-field)类型的字段 。 这种类型会自动添加一个 “.raw” 结尾的字段 , 并给这个字段设置为不启用分词器 。 简单说 , 你想获取 url 字段的聚合结果的时候 , 不要直接用 “url”, 而是用 “url.raw” 作为字段名 。
- 专注于扩展|Yearn整合Sushiswap,AC的“DeFi宇宙”已初露端倪
- 互联网|60亿引战整合重组互联网平台业务,苏宁易购能否弯道超车?
- 资源|共享门店系统如何整合商家周边资源助力获客引流?
- flink消费kafka的offset与checkpoint
- 测试|iPhone或将推出可折叠 已由富士康整合成完整测试机
- 直播|玩家福利来了,斗鱼成为虎牙子公司,腾讯大刀阔斧整合游戏直播
- 华立科技整合全球优质IP,科技联动引爆沉浸式设备特色
- 美国陆军|美国陆军成立新办公室整合数据和传感器
- 官同|LINE年度大会首席技术官同框 为雅虎日本整合案暖身
- 跨境电商出海如何实现IT技术开发模块能力资源供应链整合优化?