CenttOS 6.6下部署ELK日志告警系统

2022-07-13 15:18:52 浏览数 (1)

前言

运维故障排障速度往往与监控系统体系颗粒度成正比,监控到位才能快速排障

在部署这套系统之前,平台所有系统日志都由Graylog Zabbix,针对日志出现的错误关键字进行告警,这种做法在运维工作开展过程中暴露出多个不足点,不详述;在考虑多方面原因后,最终对日志告警系统进行更换,选用的方案是:ELK Kafka Filebeat Elastalert

本文主要以两个需求为主轴做介绍

  • 非工作时间服务器异常登录告警
  • 系统日志出现错误关键字告警

架构

服务选型

name

version

info

Amazon Elasticsearch Service

v6.2

AWK官网部署教程

Logstash

v6.2.3

选用与ES相同版本

Filebeat

v6.2.3

选用与ES相同版本

Confluent(Kafka)

v4.0

这里推荐 Confluent 的版本,Confluent 是 kafka 作者 Neha Narkhede 从 Linkedin 出来之后联合 LinkedIn 前员工创建的大数据公司,专注于 kafka 的企业应用。

Elastalert

v0.1.29

原先考虑采用X-Pack但由于AWS目前还不支持

部署

本文采用的操作系统 :CentOS release 6.6

Filebeat

# 下载源 $ curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.2.3-x86_64.rpm

# 安装 $ sudo rpm -vi filebeat-6.2.3-x86_64.rpm

Logstash

# 导入Yum源

$ rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

$ cat <<EOF > /etc/yum.repos.d/logstash.repo

[logstash-6.x]

name=Elastic repository for 6.x packages

baseurl=https://artifacts.elastic.co/packages/6.x/yum

gpgcheck=1

gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch

enabled=1

autorefresh=1

type=rpm-md

EOF

# 安装 yum install logstash -y

Elastalert

# pip直接安装 $ pip install elastalert

# 如果出现依赖包报错,以下为常用开发所需依赖包 $ yum install -y zlib openssl openssl-devel gcc gcc-c Xvfb libXfont Xorg libffi libffi-devel Python-cffi python-devel libxslt-devel libxml2-devel zlib-devel bzip2-devel xz-libs wget

配置

Filebeat /etc/filebeat/filebeat.yml

filebeat.config:   prospectors:     path: /etc/filebeat/conf/*.yml     reload.enabled: true     reload.period: 10s

output.kafka:   # kafkaNode为Kafaka服务所在服务器   hosts: ["kafkaNode:9092"]

  # 索引取fields.out_topic   topic: "%{[fields][out_topic]}"   partition.round_robin:     reachable_only: false

/etc/filebeat/conf/base.yml

# 收集系统日志 - type: log   paths:     - /var/log/messages     - /var/log/syslog*   exclude_files: [".gz$"]   exclude_lines: ["ssh_host_dsa_key"]   tags: ["system_log"]   scan_frequency: 1s   fields:     # 新增字段用于辨别来源客户端     server_name: client01

    # 索引     out_topic: "system_log"   multiline:     pattern: "^\s"     match: after

# 收集登录日志 - type: log   paths:     - /var/log/secure*     - /var/log/auth.log*   tags: ["system_secure"]   exclude_files: [".gz$"]   scan_frequency: 1s   fields:     server_name: client01     out_topic: "system_secure"   multiline:     pattern: "^\s"     match: after

Logstash

/etc/logstash/conf.d/system_log.conf

input {     kafka {         bootstrap_servers => "kafkaNode:9092"         consumer_threads => 3         topics => ["system_log"]         auto_offset_reset => "latest"         codec => "json"     } }

filter {

    # 排除logstash日志     if [source] == "/var/log/logstash-stdout.log" {         drop {}     }

    if [fields][out_topic] == "system_log" {         date {match => [ "[system][syslog][timestamp]", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]}         grok {             match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:[%{POSINT:[system][syslog][pid]}])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }             pattern_definitions => { "GREEDYMULTILINE" => "(.|n)*" }             remove_field => "message"         }     } }

output {         elasticsearch {             hosts => ["<亚马逊ES地址>"]             index => "%{[fields][out_topic]}_%{ YYYYMMdd}"             document_type => "%{[@metadata][type]}"         } }

/etc/logstash/conf.d/secure_log.conf

input {     kafka {         bootstrap_servers => "kafkaNode:9092"         consumer_threads => 3         topics => ["system_secure"]         auto_offset_reset => "latest"         codec => "json"     } }

filter {     if [fields][out_topic] == "system_secure" {         grok {           match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",                   "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",                   "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:[%{POSINT:[system][auth][pid]}])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",                   "%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:[%{POSINT:[system][auth][pid]}])?: s*%{DATA:[system][auth][user]}

0 人点赞