Logstash 篇之入门与运行机制

2021-02-04 10:25:13 浏览数 (1)

1、Logstash是一个数据收集引擎,相当于是ETL工具。截图来源慕课,尊重版本从你我做起。

  Logstash分为三个阶段,第一个阶段Input是数据采集、第二个阶段是Filter数据解析和转换,第三个阶段是Output数据输出。

2、Logstash中的Pipeline概念。

  1)、Pipeline是指Input-filter-output的三个阶段处理流程。

  2)、队列管理。

  3)、插件生命周期管理。

3、Logstash中的Logstash Event概念。

  1)、内部流转的数据表现形式,原始数据从Input进入之后,在内部流转的时候不是原始的数据,而是Logstash Event数据。Logstash Event就是一个Java Object,对外暴漏去修改或者获取内部字段的api

  2)、原始数据在input被转换为Event,在output event被转换为目标格式数据。

  3)、在配置文件中可以对Event中的属性进行增删改查。

关于数据流转,数据由Input进入之后,从Output输出。Codec(Code、Decode)将原始数据Data转换为Logstash Event,当数据输出的时候,Codec将Logstash Event转换为目标数据源需要的类型Data。

4、Logstash的安装,将包上传到服务器进行解压缩即可,如下所示:

代码语言:javascript复制
1 tar -zxvf logstash-6.7.1.tar.gz -C /usr/local/soft/

Logstash的简单案例,如下所示:

代码语言:javascript复制
 1 # 输入,stdin是标准输入,按照每一行切分数据
 2 input {
 3   stdin {
 4     codec => line
 5   }
 6 }
 7 
 8 # 过滤为空
 9 filter {}
10 
11 # 输出,stdout标准输出,将输出转换为json格式
12 output {
13   stdout {
14     codec => json
15   }
16 }

以每行进行切分数据,不过滤,然后输出为json格式的输入(Codec- Input Decoding)转换流程,如下所示:

当输入之后,要进行输出(Codec- Output Encoding),如下所示:

演示效果,如下所示:

代码语言:javascript复制
 1 [elsearch@k8s-master logstash-6.7.1]$ echo "foo
 2 > bar
 3 > " | bin/logstash -f config/line-to-json.conf
 4 Sending Logstash logs to /usr/local/soft/logstash-6.7.1/logs which is now configured via log4j2.properties
 5 [2021-01-30T19:11:13,330][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.queue", :path=>"/usr/local/soft/logstash-6.7.1/data/queue"}
 6 [2021-01-30T19:11:13,357][INFO ][logstash.setting.writabledirectory] Creating directory {:setting=>"path.dead_letter_queue", :path=>"/usr/local/soft/logstash-6.7.1/data/dead_letter_queue"}
 7 [2021-01-30T19:11:14,189][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
 8 [2021-01-30T19:11:14,258][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.7.1"}
 9 [2021-01-30T19:11:14,396][INFO ][logstash.agent           ] No persistent UUID file found. Generating new UUID {:uuid=>"e449e4d8-b26e-49fa-b12d-3d452b50aac4", :path=>"/usr/local/soft/logstash-6.7.1/data/uuid"}
10 [2021-01-30T19:11:24,816][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
11 [2021-01-30T19:11:24,943][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x6185f2c6 run>"}
12 [2021-01-30T19:11:25,080][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
13 {"@version":"1","host":"k8s-master","message":"foo","@timestamp":"2021-01-30T11:11:25.063Z"}{"@version":"1","host":"k8s-master","message":"bar","@timestamp":"2021-01-30T11:11:25.102Z"}{"@version":"1","host":"k8s-master","message":"","@timestamp":"2021-01-30T11:11:25.102Z"}[2021-01-30T19:11:25,394][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"main", :thread=>"#<Thread:0x6185f2c6 run>"}
14 [2021-01-30T19:11:25,516][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
15 [elsearch@k8s-master logstash-6.7.1]$ 

5、Logstash的架构简介,如下所示:

  Logstash的输入可以有多个的,数据流入之后,经过Codec之后,数据流入Queue,而Queue负责将流入的数据分发到Pipeline(是指Input-filter-output的三个阶段处理流程)中,而Batcher的作用是批量的从Queue中取数据,而Batcher有时间和数据量两种触发条件。

6、Logstash中Queue的分类。

  1)、In Memory,无法处理进程Crash,机器宕机等情况,会导致数据丢失。   2)、Persistent Queue In Disk,可处理进程Crash等情况,保证数据不丢失。保证数据至少消费一次。充当缓冲区,可以替代Kafka等消息队列的作用。

  3)、Persistent Queue的基本配置,第一个参数queue.type:persisted,但是这个参数默认是memory。第二个参数是queue.max_bytes:4gb,队列存储最大数据量,默认是1gb。 7、Logstash中的线程,包含Input Thread输入线程,Pipeline Workder Thread,每个Pipeline会在自己独立的线程中的,对Logtash线程调优,一般调整的就是Pipeline的线程个数。

  1)、pipeline.workers | -w,pipeline线程数,即filter_output的处理线程数,默认是cpu核数。   2)、pipeline.batch.size | -b,Batcher一次批量获取的待处理文档数,默认是125,可以根据输出进行调整,越大会占用越多的heap空间,可以通过jvm.options调整。   3)、pipeline.batch.delay | -u,Batch等待的时长,单位为ms。

8、logstash的配置文件。

  1)、logstash设置相关的配置文件,在config文件夹中。

    a、logstash.yml,这个是logstash相关的配置,比如node.name(节点名称)、path.data(持久化存储数据的文件夹,默认是logstash home目录下的data)、pipeline.works(设定pipeline的线程数,优化的常用项)、pipeline.batch.size/delay(设定批量处理数据的数目和延迟)、queue.type(设定队列类型,默认是memory)、queue.max_bytes(设定队列总容量,默认是1GB)、path.log(设定pipeline日志文件的目录)、path.config(设定pipeline配置文件的目录)等等,这其中的配置可以被命令行参数中的相关参数覆盖。

    b、jvm.options修改jvm的相关参数,比如修改heap size等等。

   2)、pipeline配置文件,定义数据处理流程的文件,以.conf结尾。

9、logstash命令行配置项。

  1)、--node.name,指定节点名称。   2)、-f --path.config,pipeline路径,可以是文件或者文件夹。   3)、--path.settings,logstash配置文件夹路径,其中要包含logstash.yml。   4)、-e --config.string,指明pipeline内容,多用于测试使用。   5)、-w --pipeline.workers,设定pipeline的线程数,优化的常用项。   6)、-b --pipeline.batch.szie,设定批量处理数据的数目和延迟。   7)、--path.data,指定logstash的数据目录。   8)、--debug,打开调试日志。   9)、-t config.test_and_exit,打开测试,检查Logstash加载进来的pipeline是否有错,有错就报错。

10、logstash配置方式建议。

  1)、线上环境推荐采用配置文件的方式来设定Logstash的相关配置,这样可以减少犯错的机会,而且文件便于进行版本化管理。   2)、命令行形式多用来进行快速的配置测试、验证、检查等。

11、logstash多实例运行方式,在一台机器运行多个logstash的实例。

代码语言:javascript复制
1 bin/logstash --path.settings instance1
2 bin/logstash --path.settings instance2

不同的instance实例中修改logstash.yml,自定义path.path,确保其不相同即可。

12、pipeline的配置,用于配置input、filter、output的插件,框架是input{}、filter{}、output{}。pipeline配置语法,主要有如下的数据类型,如下所示:

代码语言:javascript复制
1 a、布尔类型Boolean,例如isFailed => true。
2 b、数值类型Number,例如port => 33。
3 c、字符串类型String,例如name => "hello world"。
4 d、数组Array或者List,例如,users => [{id => 1,name => bob},{id => 2, name => jane}]。
5 e、哈希类型hash,match => {
6     "field1" => "value1"
7     "field2" => "value2"
8 }
9 f、注释,使用井号#。

在配置中可以引用Logstash Event的属性字段,主要有如下两种方式。

1)、第一种,是直接引用字段值Field Reference,使用[]中括号即可,嵌套字段写多层[]中括号即可。   2)、第二种,是在字符串以sprintf方式引用,使用%{}来实现。   3)、支持条件判断语法,从而扩展了配置的多样性,语法格式if 表达式 else if 表达式。

代码语言:javascript复制
1 表达式主要包含如下的操作符。
2 1)、比较运算符,==、!=、<、>、<=、>=。
3 2)、正则是否匹配,=~、!~。
4 3)、包含字符串或者数据,in、not in。
5 4)、布尔操作符,and、or、nand、xor、!。
6 5)、分组操作符,()。

0 人点赞