线上logstash配置文件,特此记录。
# pipelines.yml 文件
代码语言:javascript复制$ egrep -v '^$|^#' pipelines.yml
- pipeline.id: feature
path.config: "/data/elk/logstash-7.5.0/conf.d/kafka-feature.conf"
- pipeline.id: feature-log
path.config: "/data/elk/logstash-7.5.0/conf.d/kafka-feature-log.conf"
- pipeline.id: report
path.config: "/data/elk/logstash-7.5.0/conf.d/kafka-report.conf"
- pipeline.id: device
path.config: "/data/elk/logstash-7.5.0/conf.d/kafka-devicelog.conf"
- pipeline.id: main
path.config: "/data/elk/logstash-7.5.0/conf.d/kafka-reqlog.conf"
# logstash 配置文件
代码语言:javascript复制$ egrep -v '^$|^#' logstash.yml
pipeline.workers: 32
pipeline.batch.size: 1000
pipeline.batch.delay: 50
# 任意一个.conf 文件
代码语言:javascript复制 cat kafka-devicelog.conf
input{
kafka {
bootstrap_servers => "kafka01:9092,kafka02:9092,kafka03:9092"
auto_offset_reset => "latest"
topics => ["deviceRequestLog"]
client_id => "dev-no01"
group_id => "logstash-devlog"
decorate_events => true
}
}
filter{
json {
source => "message"
}
mutate {
remove_field => ["message"]
add_field => { "@kafka_timestamp" => "" }
}
date {
match => ["[@metadata][kafka][timestamp]","UNIX_MS"]
target => "@kafka_timestamp"
}
# 解决8小时时差问题
ruby {
code => "event.set('timestamp', event.get('req_time') 8*60*60*1000)"
}
date {
match => ["timestamp", "UNIX_MS"]
target => "@timestamp"
}
mutate {
remove_field => ["timestamp"]
}
}
output {
elasticsearch {
hosts => ["es01:9200","es02:9200","es03:9200"]
index => "device-log-%{ YYYY-MM-dd}"
user => "elastic"
password => "ppasswd"
}
}