Elasticsearch系列组件:Logstash强大的日志管理和数据分析工具

2023-10-23 15:28:12 浏览数 (1)

Elasticsearch 是一个开源的、基于 Lucene 的分布式搜索和分析引擎,设计用于云计算环境中,能够实现实时的、可扩展的搜索、分析和探索全文和结构化数据。它具有高度的可扩展性,可以在短时间内搜索和分析大量数据。 Elasticsearch 不仅仅是一个全文搜索引擎,它还提供了分布式的多用户能力,实时的分析,以及对复杂搜索语句的处理能力,使其在众多场景下,如企业搜索,日志和事件数据分析等,都有广泛的应用。 本文将介绍 Elastic Stack 组件 Logstash 的介绍、原理、安装与简单使用。

1、Logstash介绍与原理
1.1、Logstash简介

Logstash 是一个开源的数据收集引擎,它具有实时管道功能,可以用来统一处理来自不同源的数据,并将其发送到你选择的目标。Logstash 支持多种类型的输入数据,包括日志文件、系统消息队列、数据库等,可以对数据进行各种转换和处理,然后将数据发送到各种目标,如 Elasticsearch、Kafka、邮件通知等。

Logstash 的主要特点包括:

  1. 多输入源:Logstash 支持多种类型的输入数据,包括日志文件、系统消息队列、数据库等。
  2. 数据处理:Logstash 可以对数据进行各种转换和处理,如过滤、解析、格式化等。
  3. 多输出目标:Logstash 可以将数据发送到各种目标,如 Elasticsearch、Kafka、邮件通知等。
  4. 插件机制:Logstash 提供了丰富的插件,可以方便地扩展其功能。
  5. 与 Elasticsearch 和 Kibana 的集成:Logstash 是 Elastic Stack(前称 ELK Stack)的一部分,与 Elasticsearch 和 Kibana 有很好的集成,可以方便地进行数据搜索、存储和可视化。
1.2、Logstash工作原理

Logstash 的工作原理可以分为三个主要步骤:输入(Input)、过滤(Filter)和输出(Output)。

  1. 输入(Input):Logstash 支持多种类型的输入数据,包括日志文件、系统消息队列、数据库等。在配置文件中,你可以指定一个或多个输入源。
  2. 过滤(Filter):输入数据被收集后,Logstash 可以对数据进行各种转换和处理。例如,你可以使用 grok 插件来解析非结构化的日志数据,将其转换为结构化的数据。你也可以使用 mutate 插件来修改数据,如添加新的字段、删除字段、更改字段的值等。
  3. 输出(Output):处理后的数据可以被发送到一个或多个目标。Logstash 支持多种类型的输出目标,包括 Elasticsearch、Kafka、邮件通知等。

这三个步骤是在 Logstash 的事件处理管道中顺序执行的。每个事件(例如,一行日志数据)都会经过输入、过滤和输出这三个步骤。在过滤阶段,如果一个事件被过滤器丢弃,那么它将不会被发送到输出目标。

以上就是 Logstash 的基本工作原理。需要注意的是,Logstash 的配置是非常灵活的,你可以根据实际需求来选择合适的输入源、过滤器和输出目标。

1.3、Logstash执行模型

是的,你的理解是正确的。Logstash 的执行模型主要包括以下几个步骤:

  1. 每个 Input 启动一个线程:Logstash 会为每个输入插件启动一个线程,这些线程并行运行,从各自的数据源获取数据。
  2. 数据写入队列:输入插件获取的数据会被写入一个队列。默认情况下,这是一个存储在内存中的有界队列,如果 Logstash 意外停止,队列中的数据会丢失。为了防止数据丢失,Logstash 提供了两个特性:
    • Persistent Queues:这个特性会将队列存储在磁盘上,即使 Logstash 意外停止,队列中的数据也不会丢失。
    • Dead Letter Queues:这个特性会保存无法处理的事件。需要注意的是,这个特性只支持 Elasticsearch 作为输出源。
  3. 多个 Pipeline Worker 处理数据:Logstash 会启动多个 Pipeline Worker,每个 Worker 会从队列中取出一批数据,然后执行过滤器和输出插件。Worker 的数量和每次处理的数据量可以在配置文件中设置。

这种模型使得 Logstash 能够高效地处理大量的数据,并且可以通过调整配置来优化性能。

1.4、Logstash下载安装

Elastic 公司的官方下载页面的链接。在这个页面上,你可以下载 Elastic Stack 的各个组件,包括 Elasticsearch、Kibana、Logstash、Beats 等。这个页面提供了各个组件的最新版本下载链接,以及历史版本的下载链接:Past Releases of Elastic Stack Software | Elastic

在这里,我们将选择 Logstash,并确保所选的 Logstash 版本与我们正在使用的 Elasticsearch 版本一致:

选择后选择「Download」开始下载,并在下载成功后解压到指定位置即可。


2、Logstash配置说明
2.1、Logstash配置介绍

Logstash 的配置主要分为两部分:Pipeline 配置文件和 Settings 配置文件。

  1. Pipeline 配置文件:这是 Logstash 的核心配置,用于定义数据处理的流程,包括输入(input)、过滤(filter)和输出(output)三个部分。每个部分都可以使用多种插件来完成特定的任务。例如,输入部分可以使用 file 插件从文件中读取数据,过滤部分可以使用 grok 插件解析日志,输出部分可以使用 elasticsearch 插件将数据发送到 Elasticsearch。
  2. Settings 配置文件:这是 Logstash 的全局配置,通常在 logstash.yml 文件中设置。这些配置包括 Logstash 实例的名称、数据存储路径、配置文件路径、自动重载配置、工作线程数量等。

这两部分的配置都是以 YAML 格式编写的,可以使用文本编辑器进行编辑。在 Logstash 启动时,它会首先读取 Settings 配置文件,然后加载并执行 Pipeline 配置文件。

2.2、Pipeline配置文件-输入

在 Logstash 的 Pipeline 配置文件中,输入(input)部分定义了数据的来源。Logstash 提供了多种输入插件,可以从各种数据源读取数据。

以下是一些常用的输入插件:

file:从文件中读取数据。常用的配置项包括 path(文件路径)和 start_position(开始读取的位置)。

代码语言:javascript复制
input {
  file {
    path => "/path/to/your/logfile"
    start_position => "beginning"
  }
}

beats:从 Beats 客户端(如 Filebeat、Metricbeat 等)接收数据。常用的配置项包括 port(监听的端口号)。

代码语言:javascript复制
input {
  beats {
    port => 5044
  }
}

http:通过 HTTP 请求接收数据。常用的配置项包括 port(监听的端口号)。

代码语言:javascript复制
input {
  http {
    port => 8080
  }
}

jdbc:从数据库中读取数据。常用的配置项包括 jdbc_driver_library(JDBC 驱动的路径)、jdbc_driver_class(JDBC 驱动的类名)、jdbc_connection_string(数据库连接字符串)、jdbc_user(数据库用户名)和 jdbc_password(数据库密码)。

代码语言:javascript复制
input {
  jdbc {
    jdbc_driver_library => "/path/to/your/jdbc/driver"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/yourdatabase"
    jdbc_user => "yourusername"
    jdbc_password => "yourpassword"
  }
}

kafka:在这个配置中,bootstrap_servers 参数指定了 Kafka 服务器的地址和端口,topics 参数指定了你想从哪个主题读取数据。

代码语言:javascript复制
input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["your_topic"]
  }
}

kafka 输入插件还有许多其他的配置项,你可以根据实际需求进行设置。例如,你可以设置 group_id 参数来指定消费者组,设置 auto_offset_reset 参数来指定在没有初始偏移量或当前偏移量不存在时该如何定位消费位置等。

具体的配置项和可能的值,你可以在 Logstash 的官方文档中找到。

以上就是一些常用的输入插件及其配置。你可以根据实际需求选择合适的插件和配置。需要注意的是,你可以在一个配置文件中定义多个输入,Logstash 会并行处理所有的输入。

2.3、Pipeline配置文件-过滤

在 Logstash 的 Pipeline 配置文件中,过滤(filter)部分定义了数据处理的规则。过滤器插件可以对数据进行各种操作,如解析、转换、添加和删除字段等。

以下是一些常用的过滤插件及其操作:

grok:grok 过滤器用于解析非结构化的日志数据,将其转换为结构化的数据。它使用模式匹配的方式来解析文本,每个模式是一个名字和正则表达式的组合。例如:

代码语言:javascript复制
filter {
  grok {
    match => { "message" => "%{COMBINEDAPACHELOG}" }
  }
}

在这个配置中,grok 过滤器会尝试将 message 字段的内容匹配为 COMBINEDAPACHELOG 模式,这是一个预定义的模式,用于解析 Apache 日志。

mutate:mutate 过滤器用于修改事件数据,如添加新的字段、删除字段、更改字段的值等。例如:

代码语言:javascript复制
filter {
  mutate {
    add_field => { "new_field" => "new_value" }
  }
}

在这个配置中,mutate 过滤器会向每个事件添加一个名为 new_field 的新字段,字段的值为 new_value。

date:date 过滤器用于解析日期和时间信息,将其转换为 Logstash 的 @timestamp 字段。例如:

代码语言:javascript复制
filter {
  date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
}

在这个配置中,date 过滤器会尝试将 timestamp 字段的内容匹配为指定的日期和时间格式。

以上就是一些常用的过滤插件及其操作。你可以根据实际需求选择合适的插件和配置。需要注意的是,你可以在一个配置文件中定义多个过滤器,Logstash 会按照配置文件中的顺序依次执行这些过滤器。

2.4、Pipeline配置文件-输出

在 Logstash 的 Pipeline 配置文件中,输出(output)部分定义了处理后的数据应该发送到哪里。Logstash 提供了多种输出插件,可以将数据发送到各种目标。

以下是一些常用的输出插件:

elasticsearch:将数据发送到 Elasticsearch。常用的配置项包括 hosts(Elasticsearch 服务器的地址和端口)和 index(索引名称)。

代码语言:javascript复制
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "your_index"
  }
}

file:将数据写入到文件。常用的配置项包括 path(文件路径)。

代码语言:javascript复制
output {
  file {
    path => "/path/to/your/file"
  }
}

stdout:将数据输出到标准输出。常用的配置项包括 codec(编码格式),常用的值有 rubydebug(以 Ruby 的调试格式输出)。

代码语言:javascript复制
output {
  stdout {
    codec => rubydebug
  }
}

kafka:将数据发送到 Kafka。常用的配置项包括 bootstrap_servers(Kafka 服务器的地址和端口)和 topic_id(主题名称)。

代码语言:javascript复制
output {
  kafka {
    bootstrap_servers => "localhost:9092"
    topic_id => "your_topic"
  }
}

以上就是一些常用的输出插件及其配置。你可以根据实际需求选择合适的插件和配置。需要注意的是,你可以在一个配置文件中定义多个输出,Logstash 会将每个事件发送到所有的输出。

2.4、Settings配置文件

Logstash 的 Settings 配置文件通常是 logstash.yml,这是 Logstash 的全局配置文件,用于设置 Logstash 运行的一些基本参数。

以下是一些常见的配置项:

node.name:设置 Logstash 实例的名称,默认值为当前主机的主机名

代码语言:javascript复制
node.name: test

path.data:设置 Logstash 存储持久化数据的路径,默认值为 Logstash 安装目录下的 data 文件夹

代码语言:javascript复制
path.data: /var/lib/logstash

path.config:设置 Pipeline 配置文件的路径。

代码语言:javascript复制
path.config: /etc/logstash/conf.d/*.conf

config.reload.automatic:如果设置为 true,Logstash 会自动检测 Pipeline 配置文件的更改,并重新加载配置。

代码语言:javascript复制
config.reload.automatic: true

pipeline.workers:设置处理事件的工作线程数量,通常设置为机器的 CPU 核心数。

代码语言:javascript复制
pipeline.workers: 2

pipeline.batch.size:设置每个批处理的事件数量,增大这个值可以提高吞吐量,但也会增加处理延迟。

代码语言:javascript复制
pipeline.batch.size: 125

pipeline.batch.delay:设置两个批处理之间的最大等待时间(以毫秒为单位)。

代码语言:javascript复制
pipeline.batch.delay: 50

以上就是一些常见的 Logstash Settings 配置项,你可以根据实际需求来修改这些配置。具体的配置项和可能的值,你可以在 Logstash 的官方文档中找到。


3、Logstash使用示例
3.1、Logstash Hello world

首先我们进行一个非常基础的 Logstash 使用示例。在这个示例中,Logstash 使用标准输入作为输入源,标准输出作为输出目标,且不指定任何过滤器。

  1. 在命令行中切换到 Logstash 的根目录,然后执行以下命令来启动 Logstash:
代码语言:javascript复制
cd logstash-8.10.2
bin/logstash -e 'input { stdin { } } output { stdout {} }'

在这个命令中,-e 参数用于指定 Pipeline 配置,input { stdin { } } 表示使用标准输入作为输入源,output { stdout {} } 表示使用标准输出作为输出目标。

  1. Logstash 启动成功后,你可以在控制台输入一些文本,如 “hello world”,然后 Logstash 会将这些文本作为事件数据处理。
  2. Logstash 会自动为每个事件添加一些字段,如 @versionhost@timestamp,然后将处理后的事件输出到标准输出。

例如,你在控制台输入 “hello world” 后,可能会看到如下的输出:

代码语言:javascript复制
{
    "@version": "1",
    "host": "localhost",
    "@timestamp": "2018-09-18T12:39:38.514Z",
    "message": "hello world"
}

在这个示例中,Logstash 仅仅是从标准输入获取数据,添加一些简单的字段,然后将数据输出到标准输出。这是最基础的使用方式,实际上 Logstash 还可以做很多复杂的数据处理和转换。

3.2、日志格式处理

我们可以看到虽然上面示例使用标准输入作为输入源,并将数据输出到标准输出,但是日志内容作为一个整体被存放在 message 字段中,这样对后续存储及查询都极为不便。可以为该 pipeline 指定一个 grok filter 来对日志格式进行处理。

  1. first-pipeline.conf 中增加 filter 配置如下
代码语言:javascript复制
input { stdin { } }
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}
output {
   stdout { codec => rubydebug }
}

其中 codec => rubydebug 用于美化输出

  1. 验证配置(注意指定配置文件的路径):
代码语言:javascript复制
./bin/logstash -f first-pipeline.conf --config.test_and_exit 
  1. 启动命令:
代码语言:javascript复制
./bin/logstash -f first-pipeline.conf --config.reload.automatic 

其中 --config.reload.automatic 选项启用动态重载配置功能

  1. 预期结果:

我们的配置中使用了 grok 过滤器来解析 COMBINEDAPACHELOG 格式的 Apache 日志。以下是一个符合这种格式的日志示例:

代码语言:javascript复制
127.0.0.1 - - [28/Sep/2021:10:00:00  0800] "GET /test.html HTTP/1.1" 200 2326 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"

这条日志记录了一个 HTTP 请求的信息,包括客户端 IP 地址、请求时间、请求方法和 URL、HTTP 版本、响应状态码、响应体的字节数、Referer 和 User-Agent 等。

我们可以将这个日志作为输入,然后 Logstash 会使用我们的配置来处理这个日志。处理后的结果会被输出到标准输出,格式为 Ruby 的调试格式。

3.3、将数据导入Elasticsearch

Logstash 作为 Elastic stack 的重要组成部分,其最常用的功能是将数据导入到 Elasticssearch 中。将 Logstash 中的数据导入到 Elasticsearch 中操作也非常的方便,只需要在 pipeline 配置文件中增加 Elasticsearch 的 output 即可。

  1. 首先要有一个已经部署好的 Logstash
  2. first-pipeline.conf 中增加 Elasticsearch 的配置,如下
代码语言:javascript复制
input { stdin { } }
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}
output {
    elasticsearch {
        hosts => [ "localhost:9200" ]
        topic_id => "logstash"
    }
}
  1. 启动命令:
代码语言:javascript复制
./bin/logstash -f first-pipeline.conf --config.reload.automatic 

其中 --config.reload.automatic 选项启用动态重载配置功能

  1. 预期结果:

我们的配置中使用了 grok 过滤器来解析 COMBINEDAPACHELOG 格式的 Apache 日志。以下是一个符合这种格式的日志示例:

代码语言:javascript复制
"127.0.0.1 - - [28/Sep/2021:10:00:00  0800] "GET /test.html HTTP/1.1" 200 2326 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"

这条日志记录了一个 HTTP 请求的信息,包括客户端 IP 地址、请求时间、请求方法和 URL、HTTP 版本、响应状态码、响应体的字节数、Referer 和 User-Agent 等。

我们可以将这个日志作为输入,然后 Logstash 会使用我们的配置来处理这个日志。处理后的结果会被输出到标准输出,格式为 Ruby 的调试格式。

查询 Elasticsearch 确认数据是否正常上传:

代码语言:javascript复制
curl -XGET 'http://localhost:9200/logstash/_search?pretty&q=response=200'

也可以使用 Kibana 查看:

0 人点赞