7、Logstash
7.1、简介
用途
7.2、部署安装
代码语言:javascript复制#检查jdk环境,要求jdk1.8
java -version
#上传压缩包
sudo rz
#解压安装包
tar -xvf logstash-6.5.4.tar.gz
#第一个logstash示例
bin/logstash -e 'input { stdin { } } output { stdout {} }'
执行效果如下:
7.3、配置详解
Logstash的配置有三部分,如下:
代码语言:javascript复制input { #输入
stdin { ... } #标准输入
}
filter { #过滤,对数据进行分割、截取等处理
...
}
output { #输出
stdout { ... } #标准输出
}
7.3.1、输入
- 采集各种样式、大小和来源的数据,数据往往以各种各样的形式,或分散或集中地存在于很多系统中。
- Logstash 支持各种输入选择 ,可以在同一时间从众多常用来源捕捉事件。能够以连续的流式传输方* 式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。
7.3.2、过滤
- 实时解析和转换数据
- 数据从源传输到存储库的过程中,Logstash 过滤器能够解析各个事件,识别已命名的字段以构建结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值。
7.3.3、输出
Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。
7.4、读取自定义日志
7.4.1、日志结构
2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1002 可以看到,日志中的内容是使用“|”进行分割的,使用,我们在处理的时候,也需要对数据做分割处理。
7.4.2、编写配置文件
代码语言:javascript复制#vim itcast-pipeline.conf
input {
file {
path => "/itcast/logstash/logs/app.log" #读取日志的入口
start_position => "beginning" #从头开始
}
}
filter {
mutate {
split => {"message"=>"|"} #分割的符号
}
}
output {
stdout { codec => rubydebug } #输出在控制台
}
7.4.3、启动测试
代码语言:javascript复制#启动
./bin/logstash -f ./itcast-pipeline.conf
#写日志到文件
echo "2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1002" >> app.log
#输出的结果
{
"@timestamp" => 2019-03-15T08:44:04.749Z,
"path" => "/itcast/logstash/logs/app.log",
"@version" => "1",
"host" => "node01",
"message" => [
[0] "2019-03-15 21:21:21",
[1] "ERROR",
[2] "读取数据出错",
[3] "参数:id=1002"
]
}
可以看到,数据已经被分割了。
7.4.5、输出到Elasticsearch
代码语言:javascript复制input {
file {
path => "/itcast/logstash/logs/app.log"
#type => "system"
start_position => "beginning"
}
}
filter {
mutate {
split => {"message"=>"|"}
}
}
output {
elasticsearch {
hosts => [ "master:9200","salve1:9200","salve2.135:9200","salve3.135:9200"]
}
}
#启动
./bin/logstash -f ./itcast-pipeline.conf
#写入数据
echo "2019-03-15 21:21:21|ERROR|读取数据出错|参数:id=1003" >> app.log
测试:
8、综合练习
将前面所学习到的Elasticsearch Logstash Beats Kibana整合起来做一个综合性的练习。
8.1、流程说明
* 应用APP生产日志,用来记录用户的操作 [INFO] 2019-03-15 22:55:20 [cn.itcast.dashboard.Main] - DAU|5206|使用优惠券|2019-03-15 03:37:20 [INFO] 2019-03-15 22:55:21 [cn.itcast.dashboard.Main] - DAU|3880|浏览页面|2019-03-15 07:25:09 * 通过Filebeat读取日志文件中的内容,并且将内容发送给Logstash,原因是需要对内容做处理 * Logstash接收到内容后,进行处理,如分割操作,然后将内容发送到Elasticsearch中 * Kibana会读取Elasticsearch中的数据,并且在Kibana中进行设计Dashboard,最后进行展示
8.2、APP介绍
APP在生产环境应该是真实的系统,为了简化操作,所以就做数据的模拟生成即可。 业务代码如下:
代码语言:javascript复制package cn.itcast.dashboard;
import org.apache.commons.lang3.RandomUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Main {
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
public static final String[] VISIT = new String[]{"浏览页面", "评论商品", "加入收藏", "加入购物车", "提交订单", "使用优惠券", "领取优惠券", "搜索", "查看订单"};
public static void main(String[] args) throws Exception {
while(true){
Long sleep = RandomUtils.nextLong(200, 1000 * 5);
Thread.sleep(sleep);
Long maxUserId = 9999L;
Long userId = RandomUtils.nextLong(1, maxUserId);
String visit = VISIT[RandomUtils.nextInt(0, VISIT.length)];
DateTime now = new DateTime();
int maxHour = now.getHourOfDay();
int maxMillis = now.getMinuteOfHour();
int maxSeconds = now.getSecondOfMinute();
String date = now.plusHours(-(RandomUtils.nextInt(0, maxHour)))
.plusMinutes(-(RandomUtils.nextInt(0, maxMillis)))
.plusSeconds(-(RandomUtils.nextInt(0, maxSeconds)))
.toString("yyyy-MM-dd HH:mm:ss");
String result = "DAU|" userId "|" visit "|" date;
LOGGER.info(result);
}
}
}
运行结果:
代码在资料中可以找到,itcast-dashboard-generate.zip。也可以自己生成导出jar包。 部署:
代码语言:javascript复制#打包成jar包,在linux上运行
java -jar itcast-dashboard-generate-1.0-SNAPSHOT.jar
#运行之后,就可以将日志写入到/itcast/logs/app.log文件中
8.3、Filebeat
代码语言:javascript复制#vim itcast-dashboard.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /itcast/logs/*.log
setup.template.settings:
index.number_of_shards: 3
output.logstash:
hosts: ["master:5044"]
#启动
./filebeat -e -c itcast-dashboard.yml
8.4、Logstash
代码语言:javascript复制#vim itcast-dashboard.conf
input {
beats {
port => "5044"
}
}
filter {
mutate {
split => {"message"=>"|"}
}
mutate {
add_field => {
"userId" => "%{message[1]}"
"visit" => "%{message[2]}"
"date" => "%{message[3]}"
}
}
mutate {
convert => {
"userId" => "integer"
"visit" => "string"
"date" => "string"
}
}
}
#output {
# stdout { codec => rubydebug }
#}
output {
elasticsearch {
hosts => [ "master:9200","salve1:9200","salve2.135:9200","salve3.135:9200"]
}
}
#启动
./bin/logstash -f itcast-dashboard.conf
8.5、Kibana
启动Kibana:
代码语言:javascript复制#启动
./bin/kibana
#通过浏览器进行访问
http://master/app/kibana
注意:如果启动Kibana后服务器变得非常卡,可以换一台虚拟机启动Kibana,配置和之前一样。
添加Logstash索引到Kibana中:
8.5.1、时间间隔的柱形图
说明:x轴是时间,以天为单位,y轴是count数 保存:(my-dashboard-时间间隔的柱形图)
8.5.2、各个操作的饼图分布
统计各个操作的数量,形成饼图。 保存:(my-dashboard-各个操作的饼图)
8.5.3、数据表格
在数据探索中进行保存,并且保存,将各个操作的数据以表格的形式展现出来。 保存:(my-dashboard-表格)
8.5.4、制作Dashboard
8.5.5、其他操作
- 可以根据自己的需求,绘制不同的统计图
- 可以点击share,导出链接代码,可以直接使用
- 可以将制作的表格,或者监控到的数据导出为 csv
- 可以自己调整统计图的样式,和实时监控刷新的频率