logstash有什么用
logstash这个工具在我们这里的使用方式是从kafka消费信息并且将信息整理发送给es中。logstash对数据的处理很强大,插件特别多,但是过程可能比想的简单。
- 看一下你的数据源是啥,好,装插件。
- 输出目标是啥,好,装插件。
- 需要处理过滤信息,好,装插件。
- 但是处理过程还是需要自己编写的,但是遇到大家都一样的数据格式(比如log4j),复制网上一份就可以用了。(但事实上数据类型多种多样,针对多种数据还是感觉特别麻烦)
其实,logstash就是一个管道,但是送的数据来源多种多样,可以自己去采集服务器上面的日志(比如nginx,Apache这类web服务,可以接受我们日常的日志,可以接受kafka,rabbitmq这类消息队列,获取到的数据我们按照一定的形式去编排,可以送到es,关系型或者非关系型数据库,想怎么用这些数据就跟logstash没关系了。
logstash充当管道
logstash的作用就介绍到这里,接下来进入正题,logstash的使用。
logstash
首先是安装,推荐是通过二进制直接安装,因为logstash是商用软件,所以是没有直接公开源代码的,也就是通过源码安装是不行的(其实通常我们装软件都是用二进制的吧,源码容易出错而且可能要编译很久),但是虽然是商用,但是也有一些开源的和免费的商用功能我们可以直接使用。
大家可以去官网下载,https://www.elastic.co/downloads/logstash,要是嫌弃速度太慢的话,我这里也有一个linux64位的二进制的包可以直接使用百度网盘 提取码:5o40 下载,解压缩之后得到logstash-7.6.0文件夹,进入文件夹里。执行logstash的helloworld命令:bin/logstash -e 'input { stdin { } } output { stdout {} }'
代码语言:javascript复制root@DESKTOP-3JK8RKR:~/logstash-7.6.0/logstash-7.6.0# bin/logstash -e 'input { stdin { } } output { stdout {} }'
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.headius.backport9.modules.Modules (file:/root/logstash-7.6.0/logstash-7.6.0/logstash-core/lib/jars/jruby-complete-9.2.9.0.jar) to method sun.nio.ch.NativeThread.signal(long)
WARNING: Please consider reporting this to the maintainers of com.headius.backport9.modules.Modules
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Sending Logstash logs to /root/logstash-7.6.0/logstash-7.6.0/logs which is now configured via log4j2.properties
[2020-03-08T23:39:04,108][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-03-08T23:39:04,229][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.6.0"}
[2020-03-08T23:39:05,433][INFO ][org.reflections.Reflections] Reflections took 37 ms to scan 1 urls, producing 20 keys and 40 values
[2020-03-08T23:39:05,875][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.RubyArray) has been create for key: cluster_uuids. This may result in invalid serialization. It is recommended to log an issue to the responsible developer/development team.
[2020-03-08T23:39:05,896][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x43221229 run>"}
[2020-03-08T23:39:06,525][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2020-03-08T23:39:06,577][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-03-08T23:39:06,771][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
得到一些启动信息,其中可以看到一些版本,配置,线程信息(虽然不知道这个有什么用),还看到起了一个api端口,9600,我们打开9600看看是啥。
logstash起了啥
可以看到这个端口里,有一些版本的信息。相当的友好,并且我们还可以通过restfulapi和logstash进行通讯配置。
回到命令本身。这个命令可以很明显的看到,把命令行输入,输出到命令行。 我们试试是不是就这么简单。
代码语言:javascript复制huangzelin
/root/logstash-7.6.0/logstash-7.6.0/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
{
"@timestamp" => 2020-03-08T15:39:51.388Z,
"host" => "DESKTOP-3JK8RKR",
"message" => "huangzelin",
"@version" => "1"
}
当我们输入huangzelin的时候,确实是输出内容带有huangzelin,但是还带有了其他的东西,比如时间戳,host,版本,这也正是logstash的强大之处,我们使用其他的插件的时候,也可以使输出内容更加丰富,比如你想做一个全国各地的访问情况,通过logstash可以把ip地址转为geo(地理信息),再通过一些前端的数据可视化工具(比如antv)去展示。
虽然我们的第一个例子很简单,但是那个例子缺少了logstash的灵魂,插件。接下来我们尝试使用插件,将我们的日志信息展示到控制台上面。
首先写个简单的打日志的代码
代码语言:javascript复制package main
import (
"github.com/golang/glog"
"time"
)
func main(){
timer := time.NewTicker(1 * time.Second)
for {
select {
case timeInfo:=<-timer.C:
glog.Infof("打个日志,%s",timeInfo.Format("2006-01-02 15:04:05"))
}
}
}
产生的效果如下
代码语言:javascript复制root@DESKTOP-3JK8RKR:/mnt/e/all_my_love/all-my-love/cron/log_product# go run log_producer.go
ERROR: logging before flag.Parse: I0310 00:06:19.898846 249 log_producer.go:13] 打个日志,2020-03-10 00:06:19
ERROR: logging before flag.Parse: I0310 00:06:20.898558 249 log_producer.go:13] 打个日志,2020-03-10 00:06:20
ERROR: logging before flag.Parse: I0310 00:06:21.898074 249 log_producer.go:13] 打个日志,2020-03-10 00:06:21
ERROR: logging before flag.Parse: I0310 00:06:22.898155 249 log_producer.go:13] 打个日志,2020-03-10 00:06:22
然后我们把日志内容输出到一个文件里面,执行
./log_producer >> test.log 2>&1
然后我们利用logstash从test.log采集数据。 ps:这里是单机操作,我们就不弄很复杂,要是真实的需要在多台机子上采集日志,就需要用到FileBeat采集日志,到特定的端口,然后由logstash使用logstash自带的Beats输入插件填写FileBeat指定的ip端口去获得数据源。 如果我们把插件信息和配置全部写到命令行里,那要是很复杂就很长很难看了,所以logstash可以像nginx一样,把我们的命令放到一个配置文件去读取,logstash为我们准备了一个简单的模板:/logstash-7.6.0/config/logstash-sample.conf。我们打开这个文件可以看到
代码语言:javascript复制# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{ YYYY.MM.dd}"
#user => "elastic"
#password => "changeme"
}
}
这是从利用beats插件从端口拉取数据的配置。我们把这个配置改一下。让他读取文件的新增内容之后,显示在控制台上。
代码语言:javascript复制input {
file {
path => ["/mnt/e/all_my_love/all-my-love/cron/log_producer/test.log"]
type => "test"
}
}
output {
stdout {
codec => rubydebug //以非常友好的方式显示出来
}
}
接下来我们执行这个配置文件。
代码语言:javascript复制root@DESKTOP-3JK8RKR:~/logstash-7.6.0/logstash-7.6.0# ./bin/logstash -f config/logstash-sample.conf
可以看到每秒都会有这样的输出
代码语言:javascript复制{
"@version" => "1",
"path" => "/mnt/e/all_my_love/all-my-love/cron/log_producer/test.log",
"host" => "DESKTOP-3JK8RKR",
"type" => "test",
"message" => "ERROR: logging before flag.Parse: I0317 21:56:41.828445 253 log_producer.go:13] 打个日志,2020-03-17 21:56:41",
"@timestamp" => 2020-03-17T13:56:41.943Z
}
{
"@version" => "1",
"path" => "/mnt/e/all_my_love/all-my-love/cron/log_producer/test.log",
"host" => "DESKTOP-3JK8RKR",
"type" => "test",
"message" => "ERROR: logging before flag.Parse: I0317 21:56:42.828521 253 log_producer.go:13] 打个日志,2020-03-17 21:56:42",
"@timestamp" => 2020-03-17T13:56:42.947Z
}
{
"@version" => "1",
"path" => "/mnt/e/all_my_love/all-my-love/cron/log_producer/test.log",
"host" => "DESKTOP-3JK8RKR",
"type" => "test",
"message" => "ERROR: logging before flag.Parse: I0317 21:56:43.828683 253 log_producer.go:13] 打个日志,2020-03-17 21:56:43",
"@timestamp" => 2020-03-17T13:56:43.952Z
}
可以看到我们成功的将日志文件输出到控制台上了。
有的同学就会将,搞这么多,就输出个控制台,没什么用,是的,如果要这样子说的话,确实是没有什么用的,但是这里也只是抛砖引玉,在现实的工作环境中,需要这种管道的场景很多的。 比如刚刚提到的多服务器日志采集,还有采集数据去类似es的存储数据库中,或者mysql等等,如果说这些暂时用不到,那可以先mark一下,以后有场景可以使用的时候想到有这个工具就可以,也可以自己开发属于自己的管道,毕竟管道这个传输,业务比较简单,但是logstash的插件兼容很好,我们也要享受开源的好处不是嘛。
最后附上我们用来采集kafka广播到es的配置文件给大家参考参考。
代码语言:javascript复制input{
kafka {
bootstrap_servers => "kafka监听地址"
group_id => "kafka消费者名"
topics => ["监听的topic名"]
consumer_threads => 10
decorate_events => true
}
}
filter {
grok {
//过滤消息内容,把消息的信息提取成一个个属性,比如有个"ip":"127.0.0.1"
match => {"ip":"(?:%{IPORHOST:ip}|)""}
}
}
}
output {
elasticsearch {
hosts => ["广播的es服务地址"]
user => es用户名
password => "es密码"
index => "es的存储索引"
flush_size => 60000
idle_flush_time => 10
}
}