Elasticsearch+Kafka整合

2018-06-11 14:42:15 浏览数 (1)

https://www.jianshu.com/writer#/notebooks/24576109/notes/27382605

接着前一篇文章, 同步节点时间,启动ZK,KAFKA自不必说

1.Kafka-ES的配置文件, kafka-es.conf

代码语言:javascript复制
input {
  kafka {
    type => "level-one"
    auto_offset_reset => "smallest"
    codec => plain {
      charset => "GB2312"
    }
    group_id => "es"
    topic_id => "gamelog" //消息topic
    zk_connect => "zk-01:2181,zk-02:2181,zk-03:2181"
  }
}

filter {
  mutate {
    split => { "message" => "   " }
      add_field => {
        "event_type" => "%{message[3]}"
        "current_map" => "%{message[4]}"
        "current_X" => "%{message[5]}"
        "current_y" => "%{message[6]}"
        "user" => "%{message[7]}"
        "item" => "%{message[8]}"
        "item_id" => "%{message[9]}"
        "current_time" => "%{message[12]}"
     }
     remove_field => [ "message" ]
  } 
}

output {
    elasticsearch {
      index => "level-one-%{ YYYY.MM.dd}"
      codec => plain {
        charset => "utf-16be"//中文乱码问题
      }
      //ES的地址和端口
      hosts => ["elk-01:9200", "elk-02:9200", "elk-03:9200"]
    } 
}

2.在Logstash服务器上启动服务:

代码语言:javascript复制
C:UsersdummyDownloadslogstash-2.4.1bin>.logstash -f ..confkafka-es.conf
Settings: Default pipeline workers: 8
Pipeline main started

3.ES离线安装head插件,进入ES安装目录 $ELK_HOME/bin/plugin install file:///home/username/software/elasticsearch-head-master.zip 4.访问head页面,即可查看拉取到ES的经过清洗过的日志信息

image.png

es2

0 人点赞