ELK 日志分析系统整合 KafKa Zookeeper 集群

2020-04-27 10:18:28 浏览数 (1)

集群主机环境

Hostname

Server IP

Software Version

Role

elk-node1

192.168.99.185

elasticsearch-6.8.4-1.noarchlogstash-6.8.4-1.noarchkibana-6.8.4-1.x86_64openjdk version "1.8.0_242"

es master data nodekibana weblogstash

elk-node2

192.168.99.186

elasticsearch-6.8.4-1.noarchlogstash-6.8.4-1.noarchopenjdk version "1.8.0_242"

es data nodelogstash

kafka-node1

192.168.99.233

kafka_2.12-2.5.0zookeeper-3.5.7openjdk version "1.8.0_242"

kafka/zookeeper

kafka-node2

192.168.99.232

kafka_2.12-2.5.0zookeeper-3.5.7openjdk version "1.8.0_242"

kafka/zookeeper

kafka-node3

192.168.99.221

kafka_2.12-2.5.0zookeeper-3.5.7openjdk version "1.8.0_242"

kafka/zookeeper

zabbix-server

192.168.99.50

filebeat-6.8.4-1.x86_64

filebeat

日志采集分析系统架构

ELK集群配置

ELK集群部署配置请参考公众号ELK专栏《ELK集群部署》的文章。

kafka集群配置

kafka/zookeeper 集群配置请参考公众号ELK专栏《KafKa 工作原理 && 集群部署(一)》的文章。

注意:zookeeper版本从3.5.5开始带有"bin.tar.gz"名称的软件包是直接可以使用的编译好的二进制包,之前的"tar.gz"的软件包是只有源码的包,无法直接使用。使用"apache-zookeeper-3.5.7.tar.gz"软件包会出现使用zkServer.sh start启动 zookeeper服务失败,客户端连接也会出错!!!正确的安装包为"apache-zookeeper-3.5.7.tar.gz"软件包。

网络设备日志服务器配置

Rsyslog 网络日志服务器配置请参考公众号ELK专栏《ELK 部署可视化网络日志分析监控平台》的文章。

Filebeat config

filebeat作为kafka生产消息者,在filebeat 主机中日志分为网络设备日志和系统日志,对不同的网络设备日志和linux 系统的不同种类的日志使用tags标签的方式进行区分,以便于在logstash中使用tags进行匹配进行不同方式的字段清洗。同时分别使用不同的log_topic输出到kafka集群中,其中网络设备日志的log_topic=network,linux系统的log_topic=linuxos。

代码语言:javascript复制
egrep -v  "*#|^$" /etc/filebeat/filebeat.yml filebeat.inputs:- type: log  enabled: true  paths:    - /mnt/huawei/*  fields:   log_topic: network  tags: ["huawei"]  include_lines: ['Failed','failed','error','ERROR','bDOWNb','bdownb','bUPb','bupb']- type: log  paths:    - /mnt/h3c/*  fields:   log_topic: network  tags: ["h3c"]  include_lines: ['Failed','failed','error','ERROR','bDOWNb','bdownb','bUPb','bupb']- type: log  paths:    - /mnt/ruijie/*  fields:   log_topic: network  tags: ["ruijie"]  include_lines: ['Failed','failed','error','ERROR','bDOWNb','bdownb','bUPb','bupb']- type: log  enabled: true  tags: ["secure"]  paths:    - /var/log/secure  fields:   log_topic: linuxos  include_lines: [".*Failed.*",".*Accepted.*"]- type: log  enabled: true  paths:    - /var/log/messages  fields:   log_topic: linuxos  tags: ["messages"]  include_lines: ['Failed','error','ERROR'] filebeat.config.modules:  path: ${path.config}/modules.d/*.yml  reload.enabled: false  name: 192.168.99.185setup.template.settings:  index.number_of_shards: 3setup.kibana:output.kafka:  enabled: true   hosts: ["192.168.99.233:9092","192.168.99.223:9092","192.168.99.221:9092"]  topic: '%{[fields][log_topic]}'   partition.round_robin:    reachable_only: true  worker: 2  required_acks: 1  compression: gzip  max_message_bytes: 10000000processors:  - drop_fields:     fields: ["beat", "input","host","log","source","name","os"]  - add_host_metadata: ~  - add_cloud_metadata: ~

注意:filebeat output kafka集群可能会出现连接kafka失败的错误,请观察filebeat日志日志路径为/var/log//filebeat/filebeat。使用"tail -f /var/log//filebeat/filebeat"命令查看。

logstash config

两台logstash分别作为kafka集群的消费消息者,192.168.99.185主机负责网络设备日志的清洗,192.168.99.186主机负责linux系统日志的清洗,当然网络设备日志的清洗和linux系统日志的清洗可以运行在一台logstash上。以下两台logstash 的配置包含了output至Zabbix 告警的部分,如不需要对接Zabbix告警平台可删掉对接Zabbix部分的配置。

logstash日志路径"/var/log/logstash/logstash-plain.log",使用"tail -f /var/log/logstash/logstash-plain.log"命令查看。

network logstash

代码语言:javascript复制
[root@elk-node1 conf.d]# cat network.conf input {  kafka {    codec => "json"    topics => ["network"]    bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"]    group_id => "logstash"  }   }
filter {  if "huawei" in [tags] {    grok{      match => {"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:hostname} %{GREEDYDATA:info}"}        }  }
   else if "h3c" in [tags] {    grok{      match => {"message" => "%{SYSLOGTIMESTAMP:time} %{YEAR:year} %{DATA:hostname} %{GREEDYDATA:info}"}        }  }  else if "ruijie" in [tags] {    grok{      match => {"message" => "%{SYSLOGTIMESTAMP:time} %{DATA:hostname} %{GREEDYDATA:info}"}        }  }mutate {      add_field => [ "[zabbix_key]", "networklogs" ]      add_field => [ "[zabbix_host]", "192.168.99.185" ]      add_field => [ "count","%{hostname}:%{info}" ]      remove_field => ["message","time","year","offset","tags","path","host","@version","[log]","[prospector]","[beat]","[input][type]","[source]"]    }} 
output{stdout{codec => rubydebug}elasticsearch{    index => "networklogs-%{ YYYY.MM.dd}"    hosts => ["192.168.99.185:9200"]    user => "elastic"    password => "qZXo7E"        sniffing => false    }if [count]  =~ /(ERR|error|ERROR|Failed|failed)/ {        zabbix {                zabbix_host => "[zabbix_host]"                zabbix_key => "[zabbix_key]"                zabbix_server_host => "192.168.99.200"                zabbix_server_port => "10051"                 zabbix_value => "count"    }  }}

linuxos logstash

代码语言:javascript复制
[root@elk-node2 ~]# cat  /etc/logstash/conf.d/system.conf input {   kafka {    codec => "json"    topics => ["linuxos"]    bootstrap_servers => ["192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092"]  }   }
filter {  if "secure" in [tags] {    grok {        match => {           "message" => "%{SYSLOGTIMESTAMP:time} %{DATA:host1} .*?: %{DATA:status} .*? for %{USER:user} from %{IP:clients} port %{NUMBER:port} .*?"        }    }mutate {      add_field => [ "[zabbix_key]", "securelogs" ]      add_field => [ "[zabbix_host]", "192.168.99.186" ]      add_field => [ "count1","%{host1}--%{message}" ]   } }  else if "messages" in [tags] {     grok {        match => {           "message" => "%{SYSLOGTIMESTAMP:time} %{SYSLOGHOST:host1} %{DATA:syslog_prom} .*?"        }    } }mutate {      remove_field => ["time","offset","path","host","@version","[log]","[prospector]","[beat]","[input][type]","[source]"]    } }
output{stdout{codec => rubydebug}  if "secure" in [tags]{elasticsearch{    index => "secure-%{ YYYY.MM.dd}"    hosts => ["192.168.99.186:9200"]    user => "elastic"    password => "qZXo7E"    }  } if "messages" in [tags]{elasticsearch{    index => "messages-%{ YYYY.MM.dd}"    hosts => ["192.168.99.186:9200"]     user => "elastic"    password => "qZXo7E"    }  }if [count1]  =~ /(Failed|Accepted)/ {zabbix {   zabbix_host => "[zabbix_host]"   zabbix_key => "[zabbix_key]"   zabbix_server_host => "192.168.99.200"   zabbix_server_port => "10051"    zabbix_value => "count1"    }  }}

KafKa 集群验证测试

查看filebeat 生成的topic

代码语言:javascript复制
[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-topics.sh --list --zookeeper 192.168.99.232:2181,192.168.99.233:2181,192.168.99.221:2181__consumer_offsetslinuxosnetwork

验证消费者消费消息

代码语言:javascript复制
#network 主题消费[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-console-consumer.sh --bootstrap-server  192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092 --topic network  --from-beginning#linuxos 主题消费[root@kafka-node1 kafka_2.12-2.5.0]#./bin/kafka-console-consumer.sh --bootstrap-server  192.168.99.233:9092,192.168.99.232:9092,192.168.99.221:9092 --topic linuxos  --from-beginning

查看logstash清洗字段输出

代码语言:javascript复制
tail -f /var/log/messages

Kibana Web UI

kibana 登陆认证部署的配置请查看公众号ELK专栏《Elastic Stack 6.8 X-Pack 安全功能部署》的文章。

user authentication

Discover networklogs index

Discover secure index

Discover messages index

网络设备日志仪表盘

0 人点赞