Elastic Stack——Logstash基本使用、实时数据监控和可视化分析

2022-09-24 10:00:04 浏览数 (1)

7、Logstash

7.1、简介

用途

7.2、部署安装

代码语言:javascript复制
#检查jdk环境,要求jdk1.8 
java -version
#上传压缩包
sudo rz
#解压安装包
tar -xvf logstash-6.5.4.tar.gz
#第一个logstash示例
bin/logstash -e 'input { stdin { } } output { stdout {} }'

执行效果如下:

7.3、配置详解

Logstash的配置有三部分,如下:

代码语言:javascript复制
input { #输入
stdin { ... } #标准输入
}
filter { #过滤,对数据进行分割、截取等处理
...
}
output { #输出
stdout { ... } #标准输出
}
7.3.1、输入
  • 采集各种样式、大小和来源的数据,数据往往以各种各样的形式,或分散或集中地存在于很多系统中。
  • Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方* 式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
7.3.2、过滤
  • 实时解析和转换数据
  • 数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。
7.3.3、输出

Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。

7.4、读取自定义日志

7.4.1、日志结构

2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1002 可以看到,日志中的内容是使用“|”进行分割的,使用,我们在处理的时候,也需要对数据做分割处理。

7.4.2、编写配置文件
代码语言:javascript复制
#vim itcast-pipeline.conf
input {
file {
    path => "/itcast/logstash/logs/app.log"     #读取日志的入口
    start_position => "beginning"                       #从头开始
   }
}
filter {
   mutate {
        split => {"message"=>"|"}     #分割的符号
     }
}
output {
      stdout { codec => rubydebug }         #输出在控制台
}
7.4.3、启动测试
代码语言:javascript复制
#启动
./bin/logstash -f ./itcast-pipeline.conf
#写日志到文件
echo "2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1002" >> app.log
#输出的结果
{
    "@timestamp" => 2019-03-15T08:44:04.749Z,
    "path" => "/itcast/logstash/logs/app.log",
    "@version" => "1",
    "host" => "node01",
    "message" => [
        [0] "2019-03-15 21:21:21",
        [1] "ERROR",
        [2] "读取数据出错",
        [3] "参数:id=1002"
    ]
}

可以看到,数据已经被分割了。

7.4.5、输出到Elasticsearch
代码语言:javascript复制
input {
    file {
        path => "/itcast/logstash/logs/app.log"
        #type => "system"
        start_position => "beginning"
    }
}
filter {
    mutate {
        split => {"message"=>"|"}
    }
}
output {
    elasticsearch {
        hosts => [ "master:9200","salve1:9200","salve2.135:9200","salve3.135:9200"]
    }
}


#启动
./bin/logstash -f ./itcast-pipeline.conf
#写入数据
echo "2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1003" >> app.log

测试:

8、综合练习

将前面所学习到的Elasticsearch Logstash Beats Kibana整合起来做一个综合性的练习。

8.1、流程说明

* 应用APP生产日志,用来记录用户的操作 [INFO] 2019-03-15 22:55:20 [cn.itcast.dashboard.Main] - DAU|5206|使用优惠券|2019-03-15 03:37:20 [INFO] 2019-03-15 22:55:21 [cn.itcast.dashboard.Main] - DAU|3880|浏览页面|2019-03-15 07:25:09 * 通过Filebeat读取日志文件中的内容,并且将内容发送给Logstash,原因是需要对内容做处理 * Logstash接收到内容后,进行处理,如分割操作,然后将内容发送到Elasticsearch中 * Kibana会读取Elasticsearch中的数据,并且在Kibana中进行设计Dashboard,最后进行展示

8.2、APP介绍

APP在生产环境应该是真实的系统,为了简化操作,所以就做数据的模拟生成即可。 业务代码如下:

代码语言:javascript复制
package cn.itcast.dashboard;

import org.apache.commons.lang3.RandomUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Main {

    private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);

    public static final String[] VISIT = new String[]{"浏览页面", "评论商品", "加入收藏", "加入购物车", "提交订单", "使用优惠券", "领取优惠券", "搜索", "查看订单"};

    public static void main(String[] args) throws Exception {
        while(true){
            Long sleep = RandomUtils.nextLong(200, 1000 * 5);
            Thread.sleep(sleep);
            Long maxUserId = 9999L;
            Long userId = RandomUtils.nextLong(1, maxUserId);
            String visit = VISIT[RandomUtils.nextInt(0, VISIT.length)];
            DateTime now = new DateTime();
            int maxHour = now.getHourOfDay();
            int maxMillis = now.getMinuteOfHour();
            int maxSeconds = now.getSecondOfMinute();
            String date = now.plusHours(-(RandomUtils.nextInt(0, maxHour)))
                    .plusMinutes(-(RandomUtils.nextInt(0, maxMillis)))
                    .plusSeconds(-(RandomUtils.nextInt(0, maxSeconds)))
                    .toString("yyyy-MM-dd HH:mm:ss");

            String result = "DAU|"   userId   "|"   visit   "|"   date;
            LOGGER.info(result);
        }

    }
}

运行结果:

代码在资料中可以找到,itcast-dashboard-generate.zip。也可以自己生成导出jar包。 部署:

代码语言:javascript复制
#打包成jar包,在linux上运行
java -jar itcast-dashboard-generate-1.0-SNAPSHOT.jar
#运行之后,就可以将日志写入到/itcast/logs/app.log文件中

8.3、Filebeat

代码语言:javascript复制
#vim itcast-dashboard.yml
filebeat.inputs:
    - type: log
    enabled: true
    paths:
        - /itcast/logs/*.log
setup.template.settings:
    index.number_of_shards: 3
output.logstash:
    hosts: ["master:5044"]

#启动
./filebeat -e -c itcast-dashboard.yml

8.4、Logstash

代码语言:javascript复制
#vim itcast-dashboard.conf
input {
    beats {
        port => "5044"
    }
}
filter {
    mutate {
        split => {"message"=>"|"}
    }
    mutate {
        add_field => {
        "userId" => "%{message[1]}"
        "visit" => "%{message[2]}"
        "date" => "%{message[3]}"
        }
    }
    mutate {
        convert => {
        "userId" => "integer"
        "visit" => "string"
        "date" => "string"
        }
    }
}
#output {
# stdout { codec => rubydebug }
#}
output {
    elasticsearch {
        hosts => [ "master:9200","salve1:9200","salve2.135:9200","salve3.135:9200"]
    }
}


#启动
./bin/logstash -f itcast-dashboard.conf

8.5、Kibana

启动Kibana:

代码语言:javascript复制
#启动
./bin/kibana
#通过浏览器进行访问
http://master/app/kibana

注意:如果启动Kibana后服务器变得非常卡,可以换一台虚拟机启动Kibana,配置和之前一样。

添加Logstash索引到Kibana中:

8.5.1、时间间隔的柱形图

说明:x轴是时间,以天为单位,y轴是count数 保存:(my-dashboard-时间间隔的柱形图)

8.5.2、各个操作的饼图分布

统计各个操作的数量,形成饼图。 保存:(my-dashboard-各个操作的饼图)

8.5.3、数据表格

在数据探索中进行保存,并且保存,将各个操作的数据以表格的形式展现出来。 保存:(my-dashboard-表格)

8.5.4、制作Dashboard
8.5.5、其他操作
  1. 可以根据自己的需求,绘制不同的统计图
  2. 可以点击share,导出链接代码,可以直接使用
  1. 可以将制作的表格,或者监控到的数据导出为 csv
  2. 可以自己调整统计图的样式,和实时监控刷新的频率

0 人点赞