首先先下载我们需要的安装文件,wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.4.tar.gz
这里因为我们的elasticsearch用的是6.2.4,所以我们logstash使用相同的版本。
- Logstash是一个数据收集处理引擎。
- ETL工具
它整个的处理流程如下
这里input{}的stdin{}是标准输入,再到filter{},filter{}可以有多个,首先是grok{},它是利用了正则匹配的方法,将信息中的内容匹配出一些独立的字段;date{}是专门处理日期类型的;geoip{}是处理地理位置信息的;useragent{}是把信息中的agent提取出来;output{}是把数据输出到elasticsearch中。
- Pipeline
- input-filter-output的3阶段处理流程
- 队列管理
- 插件生命周期管理
- Logstash Event
- 内部流转的数据表现形式
- 原始数据在input被转换为Event,在output event被转换为目标格式数据
- 在配置文件中可以对Event中的属性进行增删改查
原始数据从Input进来,通过Output输出符合目标数据源的数据。Input通过Codec的decode(解码)把原始数据转换成Event,Event通过Output的encode(编码)转换为目标数据源的数据格式。整体流程如下。
我们来看一个最简单的配置
这里Input的codec是line,意思就是说对每一行进行切割数据,就是把每一行都变成一个Logstash Event;ouput的codec是json,意思就是说把Event转换成json对象输出。假设有一个输出为"foonbar",它的解析过程如下所示
我们可以看到它最终被解析成了两个Event,一个是"foo",一个是"bar"。这两个Event再通过output变成json对象
现在我们来实现一下上面的过程,先建立一个codec.conf的文件,文件内容为
代码语言:javascript复制input {
stdin {
codec => line
}
}
output {
stdout {
codec => json
}
}
执行
代码语言:javascript复制[root@ config]# echo "foo
> bar
> "|../bin/logstash -f ./codec.conf
运行结果如下
代码语言:javascript复制Sending Logstash's logs to /home/soft/logstash-6.2.4/logs which is now configured via log4j2.properties
[2021-06-02T18:53:45,304][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/home/soft/logstash-6.2.4/modules/netflow/configuration"}
[2021-06-02T18:53:45,333][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/home/soft/logstash-6.2.4/modules/fb_apache/configuration"}
[2021-06-02T18:53:45,429][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/home/soft/logstash-6.2.4/data/queue"}
[2021-06-02T18:53:45,436][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/home/soft/logstash-6.2.4/data/dead_letter_queue"}
[2021-06-02T18:53:45,804][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-06-02T18:53:45,837][INFO ][logstash.agent ] No persistent UUID file found. Generating new UUID {:uuid=>"b63c49fd-9164-435f-9579-e01cf97cc025", :path=>"/home/soft/logstash-6.2.4/data/uuid"}
[2021-06-02T18:53:46,638][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.4"}
[2021-06-02T18:53:47,095][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2021-06-02T18:53:48,736][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2021-06-02T18:53:48,896][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x26912ae5 sleep>"}
[2021-06-02T18:53:49,021][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
{"@version":"1","host":"ecs-linge-hangye-1","@timestamp":"2021-06-02T10:53:48.967Z","message":"foo"}{"@version":"1","host":"ecs-linge-hangye-1","@timestamp":"2021-06-02T10:53:48.988Z","message":"bar"}{"@version":"1","host":"ecs-linge-hangye-1","@timestamp":"2021-06-02T10:53:48.989Z","message":""}[2021-06-02T18:53:49,271][INFO ][logstash.pipeline ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x26912ae5 run>"}
我们可以看到获得了{"@version":"1","host":"ecs-linge-hangye-1","@timestamp":"2021-06-02T10:53:48.967Z","message":"foo"}{"@version":"1","host":"ecs-linge-hangye-1","@timestamp":"2021-06-02T10:53:48.988Z","message":"bar"}两个json对象。
Event的生命周期
这里从Input进入的原始数据,经过Codec解码后,变成Event进入Queue,Queue会把进入的Event分发给不同的Pipeline(管道)中,Pipeline包含了后面的Batcher、Filter、Output。Batcher的作用是批量的从Queue中取数据。
这里每一个Pipeline都是一个线程。Batcher会收集很多的数据,当数据达到一定的数量的时候,会发送给Filter往后传递。