logstash-output-file 离线日志统一存储

2021-08-23 16:46:16 浏览数 (1)

背景

日志审计要求需要将日志数据统一实时存储到 es 集群和离线存储到磁盘中,其中 es 集群用于实时对日志进行查看展示以及告警操作,统一存储到磁盘的日志用于日志审计和合规性审查。

数据流处理流程

logstash 插件

代码语言:javascript复制
#查看logstash-plugin插件列表
/usr/share/logstash/bin/logstash-plugin  list
#查看logstash-plugin插件版本信息
/usr/share/logstash/bin/logstash-plugin  list --verbose
#过滤logstash-plugin output 插件
/usr/share/logstash/bin/logstash-plugin  list |grep output
#更新logstash-output-file插件
/usr/share/logstash/bin/logstash-plugin  update logstash-output-file
#安装logstash-output-zabbix插件
/usr/share/logstash/bin/logstash-plugin  install logstash-output-zabbix

dpkg 安装 logstash

代码语言:javascript复制
sudo dpkg -i logstash-7.13.3-amd64.deb

移除特定字段

代码语言:javascript复制
filter {
      mutate {
        remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
      }
    }

logstash-output-file

默认情况下,此输出以json格式每行写入一个事件。使用line编解码器自定义行格式,例如

代码语言:javascript复制
file{
   path => "/data/logstashStorage/messages/%{hostip}-%{ YYYY.MM.dd}.log"
   codec => line { format => "custom format: %{ YYYY.MM.dd HH:mm:ss},%{hostip},%{message}"}
    }

启动配置文件

代码语言:javascript复制
#cat storage.conf 
input {
   kafka {
    codec => "json"
    topics => ["linuxos"]
    bootstrap_servers => ["xxxx:9092,xxxx:9092,xxxx:9092"] 
    auto_offset_reset => "latest"
    consumer_threads => 1
    group_id => "sys"        
  }
}
filter {
  if "secure" in [tags] {
    grok {
        match => {
           "message" => "%{SYSLOGTIMESTAMP:time} %{DATA:host1} .*?: %{DATA:status} .*? for %{USER:user} from %{IP:clients} port %{NUMBER:port} .*?"
        }                                                                                                                                                             
    }
   }
  else if "messages" in [tags] {
     grok {
        match => {
           "message" => "%{SYSLOGTIMESTAMP:time} %{SYSLOGHOST:host1} %{DATA:syslog_prom} .*?" 
        }
    }
 }
mutate {
      remove_field => ["time","ecs","offset","path","host","@version","prospector","beat"] 
      }
}
output{
#stdout{codec => rubydebug}
  if "secure" in [tags]{
 file{
   path => "/data/logstashStorage/secure/%{hostip}-%{ YYYY.MM.dd}.log"
   codec => line { format => "custom format: %{ YYYY.MM.dd HH:mm:ss},%{hostip},%{message}"}
     }
  }
  if "messages" in [tags]{
 file{
   path => "/data/logstashStorage/messages/%{hostip}-%{ YYYY.MM.dd}.log"
   codec => line { format => "custom format: %{ YYYY.MM.dd HH:mm:ss},%{hostip},%{message}"}
    }
 }
 if "bashcommand" in [tags]{
 file{
   path => "/data/logstashStorage/bashcommand/%{hostip}-%{ YYYY.MM.dd}.log"
   codec => line { format => "custom format: %{ YYYY.MM.dd HH:mm:ss},%{hostip},%{message}"}
      }
 }
}

检测文件格式正确与否

代码语言:javascript复制
./logstash -f /etc/logstash/conf.d/system.conf  -t

创建存储文件目录(注意日志存储路径的目录权限)

代码语言:javascript复制
mkdir  -p /data/logstashStorage

查看离线日志

logstash 官方文档

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-file.html

https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html

0 人点赞