集群主机环境
Hostname | Server IP | Software Version | Role |
---|---|---|---|
elk-node1 | 192.168.99.185 | elasticsearch-6.8.4-1.noarchlogstash-6.8.4-1.noarchkibana-6.8.4-1.x86_64openjdk version "1.8.0_242" | es master data nodekibana weblogstash |
elk-node2 | 192.168.99.186 | elasticsearch-6.8.4-1.noarchlogstash-6.8.4-1.noarchopenjdk version "1.8.0_242" | es data nodelogstash |
kafka-node1 | 192.168.99.233 | kafka_2.12-2.5.0zookeeper-3.5.7openjdk version "1.8.0_242" | kafka/zookeeper |
kafka-node2 | 192.168.99.232 | kafka_2.12-2.5.0zookeeper-3.5.7openjdk version "1.8.0_242" | kafka/zookeeper |
kafka-node3 | 192.168.99.221 | kafka_2.12-2.5.0zookeeper-3.5.7openjdk version "1.8.0_242" | kafka/zookeeper |
zabbix-server | 192.168.99.50 | filebeat-6.8.4-1.x86_64 | filebeat |
日志采集分析系统架构
ELK集群配置
ELK集群部署配置请参考公众号ELK专栏《ELK集群部署》的文章。
kafka集群配置
kafka/zookeeper 集群配置请参考公众号ELK专栏《KafKa 工作原理 && 集群部署(一)》的文章。
注意:zookeeper版本从3.5.5开始带有"bin.tar.gz"名称的软件包是直接可以使用的编译好的二进制包,之前的"tar.gz"的软件包是只有源码的包,无法直接使用。使用"apache-zookeeper-3.5.7.tar.gz"软件包会出现使用zkServer.sh start启动 zookeeper服务失败,客户端连接也会出错!!!正确的安装包为"apache-zookeeper-3.5.7.tar.gz"软件包。
网络设备日志服务器配置
Rsyslog 网络日志服务器配置请参考公众号ELK专栏《ELK 部署可视化网络日志分析监控平台》的文章。
Filebeat config
filebeat作为kafka生产消息者,在filebeat 主机中日志分为网络设备日志和系统日志,对不同的网络设备日志和linux 系统的不同种类的日志使用tags标签的方式进行区分,以便于在logstash中使用tags进行匹配进行不同方式的字段清洗。同时分别使用不同的log_topic输出到kafka集群中,其中网络设备日志的log_topic=network,linux系统的log_topic=linuxos。
egrep -v "*#|^$" /etc/filebeat/filebeat.yml filebeat.inputs:- type: log enabled: true paths: - /mnt/huawei/* fields: log_topic: network tags: ["huawei"] include_lines: ['Failed','failed','error','ERROR','bDOWNb','bdownb','bUPb','bupb']- type: log paths: - /mnt/h3c/* fields: log_topic: network tags: ["h3c"] include_lines: ['Failed','failed','error','ERROR','bDOWNb','bdownb','bUPb','bupb']- type: log paths: - /mnt/ruijie/* fields: log_topic: network tags: ["ruijie"] include_lines: ['Failed','failed','error','ERROR','bDOWNb','bdownb','bUPb','bupb']- type: log enabled: true tags: ["secure"] paths: - /var/log/secure fields: log_topic: linuxos include_lines: [".*Failed.*",".*Accepted.*"]- type: log enabled: true paths: - /var/log/messages fields: log_topic: linuxos tags: ["messages"] include_lines: ['Failed','error','ERROR'] filebeat.config.modules: path: ${path.config}/modules.d/*.yml reload.enabled: false name: 192.168.99.185setup.template.settings: index.number_of_shards: 3setup.kibana:output.kafka: enabled: true hosts: ["192.168.99.233:9092","192.168.99.223:9092","192.168.99.221:9092"] topic: '%{[fields][log_topic]}' partition.round_robin: reachable_only: true worker: 2 required_acks: 1 compression: gzip max_message_bytes: 10000000processors: - drop_fields: fields: ["beat", "input","host","log","source","name","os"] - add_host_metadata: ~ - add_cloud_metadata: ~
注意:filebeat output kafka集群可能会出现连接kafka失败的错误,请观察filebeat日志日志路径为/var/log//filebeat/filebeat。使用"tail -f /var/log//filebeat/filebeat"命令查看。
logstash config
两台logstash分别作为kafka集群的消费消息者,192.168.99.185主机负责网络设备日志的清洗,192.168.99.186主机负责linux系统日志的清洗,当然网络设备日志的清洗和linux系统日志的清洗可以运行在一台logstash上。以下两台logstash 的配置包含了output至Zabbix 告警的部分,如不需要对接Zabbix告警平台可删掉对接Zabbix部分的配置。
logstash日志路径"/var/log/logstash/logstash-plain.log",使用"tail -f /var/log/logstash/logstash-plain.log"命令查看。
network logstash
[root@elk-node1 conf.d]# cat network.conf input { kafka { codec => "json" topics => ["network"] bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"] group_id => "logstash" } }
filter { if "huawei" in [tags] { grok{ match => {"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:hostname} %{GREEDYDATA:info}"} } }
else if "h3c" in [tags] { grok{ match => {"message" => "%{SYSLOGTIMESTAMP:time} %{YEAR:year} %{DATA:hostname} %{GREEDYDATA:info}"} } } else if "ruijie" in [tags] { grok{ match => {"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:hostname} %{GREEDYDATA:info}"} } }mutate { add_field => [ "[zabbix_key]", "networklogs" ] add_field => [ "[zabbix_host]", "192.168.99.185" ] add_field => [ "count","%{hostname}:%{info}" ] remove_field => ["message","time","year","offset","tags","path","host","@version","[log]","[prospector]","[beat]","[input][type]","[source]"] }}
output{stdout{codec => rubydebug}elasticsearch{ index => "networklogs-%{ YYYY.MM.dd}" hosts => ["192.168.99.185:9200"] user => "elastic" password => "qZXo7E" sniffing => false }if [count] =~ /(ERR|error|ERROR|Failed|failed)/ { zabbix { zabbix_host => "[zabbix_host]" zabbix_key => "[zabbix_key]" zabbix_server_host => "192.168.99.200" zabbix_server_port => "10051" zabbix_value => "count" } }}
linuxos logstash
[root@elk-node2 ~]# cat /etc/logstash/conf.d/system.conf input { kafka { codec => "json" topics => ["linuxos"] bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"] } }
filter { if "secure" in [tags] { grok { match => { "message" => "%{SYSLOGTIMESTAMP:time} %{DATA:host1} .*?: %{DATA:status} .*? for %{USER:user} from %{IP:clients} port %{NUMBER:port} .*?" } }mutate { add_field => [ "[zabbix_key]", "securelogs" ] add_field => [ "[zabbix_host]", "192.168.99.186" ] add_field => [ "count1","%{host1}--%{message}" ] } } else if "messages" in [tags] { grok { match => { "message" => "%{SYSLOGTIMESTAMP:time} %{SYSLOGHOST:host1} %{DATA:syslog_prom} .*?" } } }mutate { remove_field => ["time","offset","path","host","@version","[log]","[prospector]","[beat]","[input][type]","[source]"] } }
output{stdout{codec => rubydebug} if "secure" in [tags]{elasticsearch{ index => "secure-%{ YYYY.MM.dd}" hosts => ["192.168.99.186:9200"] user => "elastic" password => "qZXo7E" } } if "messages" in [tags]{elasticsearch{ index => "messages-%{ YYYY.MM.dd}" hosts => ["192.168.99.186:9200"] user => "elastic" password => "qZXo7E" } }if [count1] =~ /(Failed|Accepted)/ {zabbix { zabbix_host => "[zabbix_host]" zabbix_key => "[zabbix_key]" zabbix_server_host => "192.168.99.200" zabbix_server_port => "10051" zabbix_value => "count1" } }}
KafKa 集群验证测试
查看filebeat 生成的topic
[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-topics.sh --list --zookeeper 192.168.99.232:2181,192.168.99.233:2181,192.168.99.221:2181__consumer_offsetslinuxosnetwork
验证消费者消费消息
#network 主题消费[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-console-consumer.sh --bootstrap-server 192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092 --topic network --from-beginning#linuxos 主题消费[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-console-consumer.sh --bootstrap-server 192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092 --topic linuxos --from-beginning
查看logstash清洗字段输出
tail -f /var/log/messages
Kibana Web UI
kibana 登陆认证部署的配置请查看公众号ELK专栏《Elastic Stack 6.8 X-Pack 安全功能部署》的文章。
user authentication
Discover networklogs index
Discover secure index
Discover messages index
网络设备日志仪表盘