https://www.jianshu.com/writer#/notebooks/24576109/notes/27382605
接着前一篇文章, 同步节点时间,启动ZK,KAFKA自不必说
1.Kafka-ES
的配置文件,
kafka-es.conf
input {
kafka {
type => "level-one"
auto_offset_reset => "smallest"
codec => plain {
charset => "GB2312"
}
group_id => "es"
topic_id => "gamelog" //消息topic
zk_connect => "zk-01:2181,zk-02:2181,zk-03:2181"
}
}
filter {
mutate {
split => { "message" => " " }
add_field => {
"event_type" => "%{message[3]}"
"current_map" => "%{message[4]}"
"current_X" => "%{message[5]}"
"current_y" => "%{message[6]}"
"user" => "%{message[7]}"
"item" => "%{message[8]}"
"item_id" => "%{message[9]}"
"current_time" => "%{message[12]}"
}
remove_field => [ "message" ]
}
}
output {
elasticsearch {
index => "level-one-%{ YYYY.MM.dd}"
codec => plain {
charset => "utf-16be"//中文乱码问题
}
//ES的地址和端口
hosts => ["elk-01:9200", "elk-02:9200", "elk-03:9200"]
}
}
2.在Logstash服务器
上启动服务:
C:UsersdummyDownloadslogstash-2.4.1bin>.logstash -f ..confkafka-es.conf
Settings: Default pipeline workers: 8
Pipeline main started
3.ES离线安装head插件,进入ES安装目录
$ELK_HOME/bin/plugin install file:///home/username/software/elasticsearch-head-master.zip
4.访问head页面,即可查看拉取到ES的经过清洗过的日志信息
image.png