一文轻松搞定ELK日志实时采集分析平台

2022-04-07 10:44:05 浏览数 (1)

file

ELK简介

ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。

此文中使用了FileBeat进行日志采集,它是一个轻量级的日志收集处理工具(Agent),Filebeat占用资源少,适合于在各个服务器上搜集日志后传输给Logstash,官方也推荐此工具。

Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式零配置自动发现索引自动分片索引副本机制restful风格接口多数据源自动搜索负载等。主要负责将日志索引并存储起来,方便业务方检索查询。

Logstash 主要是用来日志的搜集分析过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。是一个日志收集、过滤、转发的中间件,主要负责将各条业务线的各类日志统一收集、过滤后,转发给 Elasticsearch 进行下一步处理。

Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志;最终将数据以直观的、图形化的方式展示出来

准备工作
虚拟机搭建

便于测试,环境的搭建

参考 基于VirtualBox搭建Linux(CentOS)虚拟机环境(学习必备技能)

Elasticsearch安装

参考 Elasticsearch 6.6.0 集群搭建:https://blog.lupf.cn/articles/2020/04/22/1587535463629.html

kafka安装

参考 并发(七)之分布式异步更新:Zookeeper Kafka实现数据分布式异步更新 : https://blog.lupf.cn/articles/2020/04/17/1587096190497.html

JDK安装

以上文章中都有包含JDK的安装

ELK架构图

file

ELK流程图

file

  • 日志生产;服务通过日志框架输出的日志,Nginx产生的日志;也可以是任何形式输出的日志文件。
  • 日志抓取(filebeat);通过配置,监控抓取符合规则的日志文件,并将抓取到的每条数据发送给kafka
  • kafka;主要起到削峰填谷,ELK高可用的关键作用;当流量过大,kafka可以起到很好的缓冲作用,降低下游的压力;当下游的logstash或ES故障,也可以很好保证数据的完整性
  • logstash;消费kafka的数据,并将数据持久化到ES或者其他持久化框架
  • Elasticsearch;持久化数据,并提供分布式检索功能
  • Kibana;展示ES中的数据
  • xpack watch;监控异常
  • 通知模块;将监控到的告警通知给相关责任人
项目日志规范

为什么要规范?只有规范之后的日志,在后续的抓取、整理同步至ES以及查看都会带来很多便利 , 请参考SpringBoot日志的链路追踪 : https://lupf.cn/articles/2020/06/04/1591273110824.html ; 建议优先阅读一下这篇文章 , 后续关于日志的拦截及解析都是基于这里的日志规则进行的; 具体格式如下:

代码语言:javascript复制
// 完整格式
[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%X{requestId}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{localIp}] [%X{clientIp}] [%X{applicationName}] [%X{requestUri}] [%F,%L,%C,%M] [%m] ## '%ex'%n

// 以下是详细说明
// 其中%X打头的都是自定义的日志  需要通过DMC设置
[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}]  //当前的时间
[%X{requestId}]  // 本次请求的唯一ID
[%level{length=5}]  // 日志级别
[%thread-%tid]  // 线程id
[%logger]  // loger对应的class
[%X{hostName}]  // 服务部署的主机名
[%X{localIp}]  // 服务部署的主机ip
[%X{clientIp}]  //客户端请求的ip
[%X{applicationName}]  //应用名称
[%X{requestUri}]  // 调用的地址
[%F,%L,%C,%M]  // 当前日志所处的类的信息 
[%m]  // 打印的消息
## '%ex'  // 异常信息  使用单引号包裹起来是够了方便后续的logstash的
%n  // 换行
filebeat安装
软件包安装

下载

代码语言:javascript复制
// 迅雷下载(比较的快 wget太慢了)
https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.6.0-linux-x86_64.tar.gz

上传至服务器解压

代码语言:javascript复制
cd /usr/local/src
tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz

mv filebeat-6.6.0-linux-x86_64 /usr/local/filebeat-6.6.0
cd /usr/local/filebeat-6.6.0

filebeat.yml配置

代码语言:javascript复制
#============== Filebeat prospectors ===========
filebeat.prospectors:
- type: log
  paths:
    - /usr/local/src/logs/log4j2-demo/all.log
#    - /usr/local/src/logs/*/*.log
  document_type: "all-log"
  fields:
    logbiz: log4j2-demo            # 项目名称
    log_topic: all-log-log4j2-demo     # kafka的topic
    evn: dev
  multiline:
    pattern: '^['   #指定多行匹配的表达式 以[开头的标识一行新的数据
    negate: true     # 是否匹配到 
    match: after     # 没有匹配上正则合并到上一行末尾
    max_lines: 1000  # 最大未匹配上的行数
    timeout: 2s      # 指定时间没有新的日志 就不等待后面的日志输入

- type: log          #定义多个输入
  paths:
    - /usr/local/src/logs/log4j2-demo/error.log
#    - /usr/local/src/logs/*/*.log
  document_type: "err-log"
  fields:
    logbiz: log4j2-demo
    log_topic: err-log-log4j2-demo     # kafka的topic
    evn: dev
  multiline:
    pattern: '^['   #指定多行匹配的表达式 以[开头的标识一行新的数据
    negate: true     # 是否匹配到 
    match: after     # 没有匹配上正则合并到上一行末尾
    max_lines: 1000  # 最大未匹配上的行数
    timeout: 2s      # 指定时间没有新的日志 就不等待后面的日志输入

## 定义输出的方式
output.kafka:
  enabled: true    # 启动
  hosts: ["192.168.1.160:9092"]    #kafka的ip和port
  topic: '%{[fields.log_topic]}'   #指定输出到的topicname 也就是上面定义的变量
  partition.hash:                  # 分区规则 hash           
    reachable_only: true
  compression: gzip                # 数据压缩
  max_message_bytes: 1000000       # 最大的消息字节数
  required_acks: 1                 # kafka ack的方式 0:丢出去就好了  1:有一个响应就好了  -1:所有节点响应才算成功
logging.to_files: true             #

测试配置

代码语言:javascript复制
cd /usr/local/filebeat-6.6.0
./filebeat -c filebeat.yml -configtest

file

kafka创建topic

代码语言:javascript复制
cd /usr/local/kafka/
bin/kafka-topics.sh --zookeeper cache1000:2181,cache1001:2181,cache1002:2181 --topic all-log-log4j2-demo --replication-factor 1 --partitions 1 --create
bin/kafka-topics.sh --zookeeper cache1000:2181,cache1001:2181,cache1002:2181 --topic err-log-log4j2-demo --replication-factor 1 --partitions 1 --create
// 查看topic的详情
bin/kafka-topics.sh --zookeeper cache1000:2181,cache1001:2181,cache1002:2181 --topic  all-log-log4j2-demo --describe

file

启动filebeat

代码语言:javascript复制
cd /usr/local/filebeat-6.6.0
./filebeat 1>/dev/null 2>&1 &

ps -ef | grep filebeat

file

产生日志数据并确定kafka是否拿到了数据

代码语言:javascript复制
cd /tmp/kafka-logs/err-log-log4j2-demo-0
ll
cd /tmp/kafka-logs/all-log-log4j2-demo-0
ll
// 看到文件不为空,说明filebeat生产的数据kafka已经收到

file

Docker安装

第一步,创建一个同上的filebeat.yml

第二步

代码语言:javascript复制
docker run --name filebeat -v /你的路径/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro -v /日志所在的目录:/日志所在的目录 -d elastic/filebeat:6.8.9
如 docker run --name filebeat -v /var/local/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro -v /usr/local/src/logs:/usr/local/src/logs -d elastic/filebeat:6.8.9
docker compose安装

创建docker compose文件

代码语言:javascript复制
# Use root/example as user/password credentials
version: '3.1'

services:
  filebeat:
    image: elastic/filebeat:6.8.9
    user: root
    container_name: filebeat-6.8.9
    restart: always
    volumes:
      - /var/local/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml:ro
      - /usr/local/src/logs:/usr/local/src/logs

启动

代码语言:javascript复制
docker-compose up -d
logstash

下载

代码语言:javascript复制
// 迅雷下载
https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz

// 上传至任意主机 /usr/local/src 目录下

解压

代码语言:javascript复制
cd /usr/local/src
tar -zxvf logstash-6.6.0.tar.gz
mv logstash-6.6.0 /usr/local/logstash-6.6.0

添加配置

代码语言:javascript复制
mkdir /usr/local/logstash-6.6.0/script
vim logstash-script.conf

# 添加以下配置
# 输入从kafka
input {
  kafka {
    topics_pattern => "all-log-.*"         # 也可以模糊匹配 如:all-log.*
    bootstrap_servers => "192.168.1.160:9092"   # kafka的ip 端口
    codec => json                               # 数据格式
    consumer_threads => 1                       # 对应partition的数量
    decorate_events => true
    #auto_offset_rest => "latest"               # 默认值就是这个
    group_id => "all-logs-group"                # kafka的消费组
  }

  kafka {
    topics_pattern => "err-log-.*"               # 也可以模糊匹配 如:err-log-*   这样就可以匹配到 err-log-product err-log-user
    bootstrap_servers => "192.168.1.160:9092"   # kafka的ip 端口
    codec => json                               # 数据格式
    consumer_threads => 1                       # 对应partition的数量
    decorate_events => true
    #auto_offset_rest => "latest"               # 默认值就是这个
    group_id => "err-logs-group"                # kafka的消费组
  }
}

filter {
  ruby {
    code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
  }

  ## 这里匹配的是filebeat中的fields.log_topic字段
  if "all-log" in [fields][log_topic] {
    grok {
      ## 表达式
      ## 这里的表达式的定义需要和真实的lo4j2中定义的规则保持一致;否则将无法匹配上
      match => ["message","[%{NOTSPACE:currentDataTime}] [%{NOTSPACE:requestId}] [%{NOTSPACE:level}] [%{NOTSPACE:thread-id}] [%{NOTSPACE:logger}] [%{DATA:hostName}] [%{DATA:localIp}] [%{DATA:clientIp}] [%{DATA:applicationName}] [%{DATA:requestUrl}] [%{DATA:location}] [%{DATA:loginfo}] ## (''|%{QUOTEDSTRING:throwable})"]
    }
  }

  if "err-log" in [fields][log_topic] {
    grok {
      ## 表达式
      match => ["message","[%{NOTSPACE:currentDataTime}] [%{NOTSPACE:requestId}] [%{NOTSPACE:level}] [%{NOTSPACE:thread-id}] [%{NOTSPACE:logger}] [%{DATA:hostName}] [%{DATA:localIp}] [%{DATA:clientIp}] [%{DATA:applicationName}] [%{DATA:requestUrl}] [%{DATA:location}] [%{DATA:loginfo}] ## (''|%{QUOTEDSTRING:throwable})"]
    }
  }
}

## 输出方式
## 到控制台 当测试无误之后,可以将此输出关闭
output {
  stdout { codec => rubydebug }
}

output {
  if "all-log" in [fields][log_topic] {
    elasticsearch {
      hosts => ["192.168.1.170:9200"]        # es的ip 端口
      # 用户名 密码
      # user => "admin"
      # password => "admin"

      # 索引名称(index)
      # 索引格式:all-log-应用名称-按天分组
      # [fields][logbiz]为filebeat中定义的变量
      index => "all-log-%{[fields][logbiz]}-%{index_time}"
      # 是否嗅探集群
      # 通过嗅探机制解析es集群的负载均衡发送日志
      sniffing => true 

      # logstash默认自带一个mapping模版,进行模版覆盖
      template_overwrite => true
    }
  }

  if "err-log" in [fields][log_topic] {
    elasticsearch {
      hosts => ["192.168.1.170:9200"]        # es的ip 端口
      # 用户名 密码
      # user => "admin"
      # password => "admin"

      # 索引名称(index)
      # 索引格式:all-log-应用名称-按天分组
      # [fields][logbiz]为filebeat中定义的变量
      index => "err-log-%{[fields][logbiz]}-%{index_time}"
      # 是否嗅探集群
      # 通过嗅探机制解析es集群的负载均衡发送日志
      sniffing => true

      # logstash默认自带一个mapping模版,进行模版覆盖
      template_overwrite => true
    }
  }
}

启动

代码语言:javascript复制
cd /usr/local/logstash-6.6.0
// 前台启动 方便观察数据及日志
./bin/logstash -f ./script/logstash-script.conf &

// 后台启动
nohup ./bin/logstash -f ./script/logstash-script.conf 1>/dev/null 2>&1 &
// jps查看是否有Logstash的进程
jps

logstash的输出

代码语言:javascript复制
// 以下是产生的日志数据
[2020-04-22T21:35:28.164 08:00] [4562f064-a59f-420d-8673-f9810ef1361c] [INFO] [http-nio-8080-exec-3-21] [com.lupf.log4j2demo.controller.Log4j2Controller] [elasticsearch0000] [192.168.1.170] [192.168.1.82] [log4j2-demo] [/info] [Log4j2Controller.java,14,com.lupf.log4j2demo.controller.Log4j2Controller,info] [我是info日志!!!] ## ''
// logstash 接收到kafka的消息并输出的对象
{
                "log" => {
        "file" => {
            "path" => "/usr/local/src/logs/log4j2-demo/all.log"
        }
    },
           "@version" => "1",
    "currentDataTime" => "2020-04-23T13:51:58.396 08:00",
              "input" => {
        "type" => "log"
    },
               "host" => {
        "name" => "elasticsearch0000"
    },
               "beat" => {
            "name" => "elasticsearch0000",
        "hostname" => "elasticsearch0000",
         "version" => "6.6.0"
    },
             "source" => "/usr/local/src/logs/log4j2-demo/all.log",
         "index_time" => "2020.04.23",
         "requestUrl" => "/info",
             "offset" => 69870,
            "message" => "[2020-04-23T13:51:58.396 08:00] [54452aa5-8a5c-4518-8e43-be347c55ff09] [INFO] [http-nio-8080-exec-7-25] [com.lupf.log4j2demo.controller.Log4j2Controller] [elasticsearch0000] [192.168.1.170] [192.168.1.82] [log4j2-demo] [/info] [Log4j2Controller.java,14,com.lupf.log4j2demo.controller.Log4j2Controller,info] [我是info日志!!!] ## ''",
    "applicationName" => "log4j2-demo",
             "logger" => "com.lupf.log4j2demo.controller.Log4j2Controller",
         "prospector" => {
        "type" => "log"
    },
            "loginfo" => "我是info日志!!!",
          "thread-id" => "http-nio-8080-exec-7-25",
         "@timestamp" => 2020-04-23T05:51:59.763Z,
           "clientIp" => "192.168.1.82",
              "level" => "INFO",
           "location" => "Log4j2Controller.java,14,com.lupf.log4j2demo.controller.Log4j2Controller,info",
           "hostName" => "elasticsearch0000",
             "fields" => {
        "log_topic" => "all-log-log4j2-demo",
           "logbiz" => "log4j2-demo",
              "evn" => "dev"
    },
          "requestId" => "54452aa5-8a5c-4518-8e43-be347c55ff09",
            "localIp" => "192.168.1.170"
}

查看kafka的消费情况

代码语言:javascript复制
cd /usr/local/kafka/
bin/kafka-consumer-groups.sh --bootstrap-server cache1000:9092,cache1001:9092,cache1002:9092 --describe --group err-logs-group
bin/kafka-consumer-groups.sh --bootstrap-server cache1000:9092,cache1001:9092,cache1002:9092 --describe --group all-logs-group

file

logstash输入、过滤器,输出测试

当如果上面的 logstash配置导致输入、过滤器、输出的不对,就可以通过以下的方式进行配置测试,快速定位问题或者配置匹配调整;

定义一个手动输入的配置文件

代码语言:javascript复制
cd /usr/local/logstash-6.6.0
vim script/test.conf

// 加入以下配置
# 手动输入
input {
  stdin {
  }
}

# 过滤器
filter{
  grok{
    # 测试匹配IP
    match => {"message" => "%{IPV4:ip}"}
  }
}

# 输出
output {
  stdout {
  }
}

通过以上的配置文件启动logstash

代码语言:javascript复制
cd /usr/local/logstash-6.6.0
./bin/logstash -f ./script/test.conf

// 然后直接输入想要测试的文本,回车就可以看见结果;如:
192.168.1.123 aabbcc 11223344

file

Kibana日志信息查看

进入kibana

代码语言:javascript复制
// kibana监听的是5601端口
http://elasticsearch:5601

优先查看索引是否是正常的

代码语言:javascript复制
GET /_cat/indices?v

file

配置索引管理

日志查看

到此!一个从0搭建的ELK技术栈即完成!!!

0 人点赞