背景
日志审计要求需要将日志数据统一实时存储到 es 集群和离线存储到磁盘中,其中 es 集群用于实时对日志进行查看展示以及告警操作,统一存储到磁盘的日志用于日志审计和合规性审查。
数据流处理流程
logstash 插件
代码语言:javascript复制#查看logstash-plugin插件列表
/usr/share/logstash/bin/logstash-plugin list
#查看logstash-plugin插件版本信息
/usr/share/logstash/bin/logstash-plugin list --verbose
#过滤logstash-plugin output 插件
/usr/share/logstash/bin/logstash-plugin list |grep output
#更新logstash-output-file插件
/usr/share/logstash/bin/logstash-plugin update logstash-output-file
#安装logstash-output-zabbix插件
/usr/share/logstash/bin/logstash-plugin install logstash-output-zabbix
dpkg 安装 logstash
代码语言:javascript复制sudo dpkg -i logstash-7.13.3-amd64.deb
移除特定字段
代码语言:javascript复制filter {
mutate {
remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
}
}
logstash-output-file
默认情况下,此输出以json格式每行写入一个事件。使用line编解码器自定义行格式,例如
代码语言:javascript复制file{
path => "/data/logstashStorage/messages/%{hostip}-%{ YYYY.MM.dd}.log"
codec => line { format => "custom format: %{ YYYY.MM.dd HH:mm:ss},%{hostip},%{message}"}
}
启动配置文件
代码语言:javascript复制#cat storage.conf
input {
kafka {
codec => "json"
topics => ["linuxos"]
bootstrap_servers => ["xxxx:9092,xxxx:9092,xxxx:9092"]
auto_offset_reset => "latest"
consumer_threads => 1
group_id => "sys"
}
}
filter {
if "secure" in [tags] {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:host1} .*?: %{DATA:status} .*? for %{USER:user} from %{IP:clients} port %{NUMBER:port} .*?"
}
}
}
else if "messages" in [tags] {
grok {
match => {
"message" => "%{SYSLOGTIMESTAMP:time} %{SYSLOGHOST:host1} %{DATA:syslog_prom} .*?"
}
}
}
mutate {
remove_field => ["time","ecs","offset","path","host","@version","prospector","beat"]
}
}
output{
#stdout{codec => rubydebug}
if "secure" in [tags]{
file{
path => "/data/logstashStorage/secure/%{hostip}-%{ YYYY.MM.dd}.log"
codec => line { format => "custom format: %{ YYYY.MM.dd HH:mm:ss},%{hostip},%{message}"}
}
}
if "messages" in [tags]{
file{
path => "/data/logstashStorage/messages/%{hostip}-%{ YYYY.MM.dd}.log"
codec => line { format => "custom format: %{ YYYY.MM.dd HH:mm:ss},%{hostip},%{message}"}
}
}
if "bashcommand" in [tags]{
file{
path => "/data/logstashStorage/bashcommand/%{hostip}-%{ YYYY.MM.dd}.log"
codec => line { format => "custom format: %{ YYYY.MM.dd HH:mm:ss},%{hostip},%{message}"}
}
}
}
检测文件格式正确与否
代码语言:javascript复制./logstash -f /etc/logstash/conf.d/system.conf -t
创建存储文件目录(注意日志存储路径的目录权限)
代码语言:javascript复制mkdir -p /data/logstashStorage
查看离线日志
logstash 官方文档
https://www.elastic.co/guide/en/logstash/current/plugins-outputs-file.html
https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html