前言
运维故障排障速度往往与监控系统体系颗粒度成正比,监控到位才能快速排障
在部署这套系统之前,平台所有系统日志都由Graylog Zabbix,针对日志出现的错误关键字进行告警,这种做法在运维工作开展过程中暴露出多个不足点,不详述;在考虑多方面原因后,最终对日志告警系统进行更换,选用的方案是:ELK Kafka Filebeat Elastalert
本文主要以两个需求为主轴做介绍
- 非工作时间服务器异常登录告警
- 系统日志出现错误关键字告警
架构
服务选型
name | version | info |
---|---|---|
Amazon Elasticsearch Service | v6.2 | AWK官网部署教程 |
Logstash | v6.2.3 | 选用与ES相同版本 |
Filebeat | v6.2.3 | 选用与ES相同版本 |
Confluent(Kafka) | v4.0 | 这里推荐 Confluent 的版本,Confluent 是 kafka 作者 Neha Narkhede 从 Linkedin 出来之后联合 LinkedIn 前员工创建的大数据公司,专注于 kafka 的企业应用。 |
Elastalert | v0.1.29 | 原先考虑采用X-Pack但由于AWS目前还不支持 |
部署
本文采用的操作系统 :CentOS release 6.6
Filebeat
# 下载源 $ curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.3-x86_64.rpm
# 安装 $ sudo rpm -vi filebeat-6.2.3-x86_64.rpm
Logstash
# 导入Yum源
$ rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
$ cat <<EOF > /etc/yum.repos.d/logstash.repo
[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
# 安装 yum install logstash -y
Elastalert
# pip直接安装 $ pip install elastalert
# 如果出现依赖包报错,以下为常用开发所需依赖包 $ yum install -y zlib openssl openssl-devel gcc gcc-c Xvfb libXfont Xorg libffi libffi-devel Python-cffi python-devel libxslt-devel libxml2-devel zlib-devel bzip2-devel xz-libs wget
配置
Filebeat /etc/filebeat/filebeat.yml
filebeat.config: prospectors: path: /etc/filebeat/conf/*.yml reload.enabled: true reload.period: 10s
output.kafka: # kafkaNode为Kafaka服务所在服务器 hosts: ["kafkaNode:9092"]
# 索引取fields.out_topic topic: "%{[fields][out_topic]}" partition.round_robin: reachable_only: false
/etc/filebeat/conf/base.yml
# 收集系统日志 - type: log paths: - /var/log/messages - /var/log/syslog* exclude_files: [".gz$"] exclude_lines: ["ssh_host_dsa_key"] tags: ["system_log"] scan_frequency: 1s fields: # 新增字段用于辨别来源客户端 server_name: client01
# 索引 out_topic: "system_log" multiline: pattern: "^\s" match: after
# 收集登录日志 - type: log paths: - /var/log/secure* - /var/log/auth.log* tags: ["system_secure"] exclude_files: [".gz$"] scan_frequency: 1s fields: server_name: client01 out_topic: "system_secure" multiline: pattern: "^\s" match: after
Logstash
/etc/logstash/conf.d/system_log.conf
input { kafka { bootstrap_servers => "kafkaNode:9092" consumer_threads => 3 topics => ["system_log"] auto_offset_reset => "latest" codec => "json" } }
filter {
# 排除logstash日志 if [source] == "/var/log/logstash-stdout.log" { drop {} }
if [fields][out_topic] == "system_log" { date {match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]} grok { match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:[%{POSINT:[system][syslog][pid]}])?: %{GREEDYMULTILINE:[system][syslog][message]}"] } pattern_definitions => { "GREEDYMULTILINE" => "(.|n)*" } remove_field => "message" } } }
output { elasticsearch { hosts => ["<亚马逊ES地址>"] index => "%{[fields][out_topic]}_%{ YYYYMMdd}" document_type => "%{[@metadata][type]}" } }
/etc/logstash/conf.d/secure_log.conf
input { kafka { bootstrap_servers => "kafkaNode:9092" consumer_threads => 3 topics => ["system_secure"] auto_offset_reset => "latest" codec => "json" } }
filter {
if [fields][out_topic] == "system_secure" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:[%{POSINT:[system][auth][pid]}])?: s*%{DATA:[system][auth][user]}