LogStash的安装部署与应用

2019-09-19 15:25:07 浏览数 (1)

LogStash的安装部署与应用

介绍

1、Logstash是一个接收,处理,转发日志的工具; 2、Logstash支持网络日志、系统日志、应用日志、apache日志等等,总之可以处理所有日志类型; 3、典型应用场景ELK:logstash负责采集、解析日志,elasticsearch负责数据存储,kibana负责前端报表展示。

下载

https://www.elastic.co/cn/downloads/logstash

安装部署

上传服务器、解压即可使用。

配置

根据需要修改配置文件config/logstash.yml,默认不修改即可运行

  • 节点名称:node.name
  • 日志级别:log.level(默认debug,如果想要看详细日志改为trace)

根据需要调整jvm.options配置文件:

  • -Xms256m
  • -Xmx1g

测试安装是否成功: 快速启动,标准输入输出作为input和output

代码语言:javascript复制
./bin/logstash -e 'input { stdin {} } output { stdout {} }'

使用

主要组件

主要组件

  • Input组件:负责采集日志数据,包括文件、syslog、collectd、kafka、redis等等;
  • Filter:负责解析日志数据,包括解析、加工、转换数据等;
  • Output:负责输出日志数据,对接到redis、kafka、elasticsearch、hdfs等存储组件;

常用启动参数

代码语言:javascript复制
-e    立即执行,使用命令行里的配置参数启动实例    
    ./bin/logstash -e ‘input {stdin {}} output {stdout {}}'
-f    指定启动实例的配置文件 
    ./bin/logstash -f config/test.conf
-t    测试配置文件的正确性  
    ./bin/logstash-f config/test.conf -t
-l    指定日志文件名称    
    ./bin/logstash-f config/test.conf -l logs/test.log
-w    指定filter线程数量,默认线程数是5    
    ./bin/logstash-f config/test.conf -w

常用input配置

File

文件读取插件主要用来抓取文件的变化信息,将变化信息封装成Event进程处理或者传递。

代码语言:javascript复制
input
  file {
        #监听文件的路径
        path => ["E:/software/logstash-1.5.4/logstash-1.5.4/data/*","F:/test.txt"]
        #排除不想监听的文件
        exclude => "1.log"

        #添加自定义的字段
        add_field => {"test"=>"test"}
        #增加标签
        tags => "tag1"

        #设置新事件的标志
        delimiter => "n"

        #设置多长时间扫描目录,发现新文件
        discover_interval => 15
        #设置多长时间检测文件是否修改
        stat_interval => 1

        #监听文件的起始位置,默认是end
        start_position => beginning

        #监听文件读取信息记录的位置
        sincedb_path => "E:/software/logstash-1.5.4/logstash-1.5.4/test.txt"
        #设置多长时间会写入读取的位置信息
        sincedb_write_interval => 15
  }
}
kafka
代码语言:javascript复制
input
  kafka {
        #集群地址
        bootstrap_servers => ["10.142.134.179:9092"]
        #消费组id
        group_id => "ete_serv_common"
        #数组类型,可配置多个topic topics_pattern => ["BO_TOPO_d{1,}$"]正则匹配
        topics => ["logq","loge"]
        #从最早的偏移量开始消费latest从最新的开始消费
        auto_offset_reset => "earliest"
        #起多少个input线程数
        consumer_threads => 5
        #此属性会将当前topic、offset、group、partition等信息也带到message中
        decorate_events => true
    }
}
Beats

Beats插件用于建立监听服务,接收Filebeat或者其他beat发送的Events;

配置示例

代码语言:javascript复制
input {
    beats {
        port => 5044
    }
}
TCP

TCP插件有两种工作模式,"Client"和"Server",分别用于发送网络数据和监听网络数据。

配置示例

代码语言:javascript复制
tcp {
    port => 41414
}
Redis
代码语言:javascript复制
input {
  redis {
    host => "127.0.0.1"
    port => 6379
    data_type => "list"
    key => "logstash-list"
  }
}

常用的Filter配置

丰富的过滤器插件的是 logstash威力如此强大的重要因素,过滤器插件主要处理流经当前Logstash的事件信息,可以添加字段、移除字段、转换字段类型,通过正则表达式切分数据等,也可以根据条件判断来进行不同的数据处理方式。

grok 过滤器

grok 是Logstash中将非结构化数据解析成结构化数据以便于查询的最好工具,非常适合解析syslog logs,apache log, mysql log,以及一些其他的web log Logstash提供120个常用正则表达式可供安装使用,安装之后你可以通过名称调用它们 语法如下:%{SYNTAX:SEMANTIC}

  • SYNTAX:表示已经安装的正则表达式的名称
  • SEMANTIC:表示从Event中匹配到的内容的名称

例如:Event的内容为"[debug] 127.0.0.1 - test log content",匹配%{IP:client}将获得"client: 127.0.0.1"的结果,前提安装了IP表达式;

通过配置grok可以把 [debug] 127.0.0.1 - test log content 这样的非结构化数据转为: "cllient":"127.0.0.1".

一个完整的例子

代码语言:javascript复制
 日志文件http.log内容:192.168.1.11 GET /index.html 15994 0.053
 表达式:%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

配置文件内容:

input {
  file {
    path => "/var/log/http.log"
  }
}
filter {
  grok {
    match => {"message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
  }
}

输出结果:
client: 192.168.1.11 
method: GET
request: /index.html
bytes: 15994
duration: 0.053

自定义表达式 与预定义表达式相同,你也可以将自定义的表达式配置到Logstash中,然后就可以像于定义的表达式一样使用;

代码语言:javascript复制
语法:(?<field_name>the pattern here)

举例:捕获10或11和长度的十六进制queue_id可以使用表达式(?<queue_id>[0-9A-F]{10,11})

安装自定义表达式
  1、在Logstash根目录下创建文件夹"patterns",在"patterns"文件夹中创建文件"extra"(文件名称无所谓,可自己选择有意义的文件名称);
  2、在文件"extra"中添加表达式,格式:patternName regexp,名称与表达式之间用空格隔开即可,如下:

            # contents of ./patterns/postfix:
            POSTFIX_QUEUEID [0-9A-F]{10,11}

  3、使用自定义的表达式时需要指定"patterns_dir"变量,变量内容指向表达式文件所在的目录

举例如下:

    ## 日志内容
Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=20130101142543.5828399CCAF@mailserver14.example.com>

    ## Logstash配置
    filter {
      grok {
        patterns_dir => ["./patterns"]
        match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
      }
    }

    ## 运行结果 ##
    timestamp: Jan 1 06:25:43
    logsource: mailserver14
    program: postfix/cleanup
    pid: 21403
    queue_id: BEF25A72965

-Grok表达式在线debug地址:
    http://grokdebug.herokuapp.com 
-预定义正则表达式参考地址:
    https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
date 时间处理过滤器

该插件用于时间字段的格式转换,比如将"Apr 17 09:32:01"(MMM dd HH:mm:ss)转换为"MM-dd HH:mm:ss"。 而且通常情况下,Logstash会为自动给Event打上时间戳,但是这个时间戳是Event的处理时间(主要是input接收数据的时间),和日志记录时间会存在偏差(主要原因是buffer),我们可以使用此插件用日志发生时间替换掉默认是时间戳的值。

代码语言:javascript复制
#日志内容
{
    "agent":"Windows 7",
    "client_time":"2017-11-20 12:00:00",
    "client_ip":"123.10.91.106"
}

# Filter 配置
filter {
   date {
       match => ["client_time", "yyyy-MM-dd HH:mm:ss"] 
       #这里是如果client_time跟后面的格式匹配上了就会去替换,替换什么呢?
       #target默认指的就是@timestamp,所以就是以client_time的时间更新@timestamp的时间
   }
}
mutate数据修改过滤器

mutate 插件是 Logstash另一个重要插件。它提供了丰富的基础类型数据处理能力。 可以重命名,删除,替换和修改事件中的字段。

  • 重命名 -- rename 对于已经存在的字段,重命名其字段名称
代码语言:javascript复制
filter {
    mutate {
        rename => ["syslog_host", "host"]
    }
}
  • 更新字段内容 -- update 更新字段内容,如果字段不存在,不会新建
代码语言:javascript复制
filter {
    mutate {
        update => { "sample" => "My new message" }
    }
}
  • 替换字段内容 -- replace 与 update 功能相同,区别在于如果字段不存在则会新建字段
代码语言:javascript复制
filter {
    mutate {
        replace => { "message" => "%{source_host}: My new message" }
    }
}
  • 数据类型转换 -- convert
代码语言:javascript复制
filter {
    mutate {
        convert => ["request_time", "float"]
    }
}
JSON过滤器

JSON插件用于解码JSON格式的字符串,一般是一堆日志信息中,部分是JSON格式,部分不是的情况下

代码语言:javascript复制
配置示例

json {
    source => ...
}

## 示例配置,message是JSON格式的字符串:"{"uid":3081609001,"type":"signal"}" ##

filter {
    json {
        source => "message"
        target => "jsoncontent"
    }
}

## 输出结果 ##
{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{"uid":333333333,"type":"signal"}",
    "jsoncontent": {
        "uid": 333333333,
        "type": "signal"
    }
}

## 如果从示例配置中删除`target`,输出结果如下 ##
{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{"uid":333333333,"type":"signal"}",
    "uid": 333333333,
    "type": "signal"
}

Output

ElasticSearch输出插件

用于将事件信息写入到Elasticsearch中,官方推荐插件,ELK必备插件

配置事例

代码语言:javascript复制
output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "filebeat-%{type}-%{ yyyy.MM.dd}"
        user => "upuptop" 
        password => "upuptop"
        template => "/dd/c/caaaa.json"
        template_name => "index_scene_control"
                 template_overwrite => true
            }
}
File输出插件

用于将Event输出到文件内

配置事例

代码语言:javascript复制
output {
    file {
        path => "/home/upuptop/test.txt "

    }
}

0 人点赞