使用java开发logstash的filter插件

2023-03-04 15:34:15 浏览数 (1)

0. 前言

在工作中遇到,logstash 中的 filter 中写了大量的解析逻辑,解析性能遇到瓶颈,所以希望将该部分的逻辑转换成 java 开发的插件,以提高解析速度。

本文主要记录我开发插件的过程。

1. 准备开发环境

下载 logstash 源码

直接可以去 logstash github (opens new window) 中选择自己使用的版本进行下载即可。

构建 logstash

将下载的 logstash 压缩包解压出来,进入 logstash 根目录下,当前路径下有 gradlew 和 gradlew.bat 两个脚本文件,前者是在 linux 下执行的,后者是在 windows 执行的脚本。

假设当前环境是 windows,执行 gradlew.bat assemble 命令可以对当前模块进行构建。在这个过程中会去下载所有的依赖包到本地。等待构建完成,直至输出 BUILD SUCCESSFUL 代表构建成功。

gradlew.bat 脚本是对 gradle 的封装,在执行该命令时,会主动根据 gradle/wrapper/ 下的配置去下载 gradle 工具,然后再调用 gradle 进行构建模块

2. 编写 logstash java filter 插件

2.1 准备官方 demo

下载 java 插件官方模板

将 logstash-filter-java_filter_example (opens new window) 下载到本地使用,自定义开发的插件是基于该 example 进行修改的。

构建插件

在该项目的根目录下,创建 gradle.properties 文件,需要添加变量指定 logstash 下的 logstash-core 目录路径,使用绝对路径即可。

代码语言:javascript复制
LOGSTASH_CORE_PATH=<target_folder>/logstash-core

该变量是给 build.gradle 文件中使用的。

2.2 开发 Filter 代码

首先来看官方提供的 demo Filter 代码,代码路径在:srcmainjavaorglogstashpluginsJavaFilterExample.java,我们开发的插件基本是按照这个例子进行修改实现的。

  • 设置 pipeline 中的插件名称

首先可以看到有一个注解 @LogstashPlugin(name = "java_filter_example") name 的值是指我们在 pipeline 中填写的插件名称。

  • 在 pipeline 中传参到插件中

通过 PluginConfigSpec.stringSetting 定义变量

代码语言:javascript复制
public static final PluginConfigSpec<String> SOURCE_CONFIG = PluginConfigSpec.stringSetting("source", "message");

再通过在构造方法中调用 get 方法即可获取到传入的值

代码语言:javascript复制
this.sourceField = config.get(SOURCE_CONFIG);

并且需要将新增的字段添加到 configSchema 方法中并返回出去。

代码语言:javascript复制
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
	// should return a list of all configuration options for this plugin
	return Collections.singletonList(SOURCE_CONFIG);
}
  • filter 主体编码

该插件的主体是 filter 方法,也就是数据的过滤走的 filter 方法,我们将想要做的解析规则实现在该方法中即可。

可以看到该方法中有一个对 events 遍历的处理,每一个 Event 都是进来的每一条数据,然后对该条数据进行处理转换,最后再将转换好的 events 传出去。

可以看到官方的案例是将传入的 message 字符串翻转。

代码语言:javascript复制
@Override
public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener) {
	for (Event e : events) {
		Object f = e.getField(sourceField);
		if (f instanceof String) {
			e.setField(sourceField, StringUtils.reverse((String)f));
			matchListener.filterMatched(e);
		}
	}

	return events;
}

3. 单元测试

单测对插件来说至关重要,插件的规则转换流程、判断逻辑都非常多,各种类型的数据都可能导致插件出错,而插件验证需要编译、打包、安装再测试,流程较长,所以我们可以通过单测来减少以上流程的进行,在单测中就把所有的可能性都验证到,节省大量的时间。并且在后续迭代修改中,可以减少改动引发。

建议可以使用 junit 的参数化单测方式,可以提高单测的效率和数量。这个需要在 build.gradle 文件中的 dependencies 添加支持参数化的库来支持。

4. 打包部署 Filter 插件

4.1 元数据信息

我们需要在 build.gradle 文件中修改部分的插件元数据信息,像 description、authors 和 email 等字段都可以随意填写,以下字段需要注意:

  • group,需要和包名相同
  • pluginClass,需要和插件 Filter 的类名相同
  • pluginName,需要和 @LogstashPlugin 中的 name 相同

4.2 打包任务

通过执行 gradlew.bat gem 进行插件打包任务,最后会在插件根目下生成 .gem 的插件安装包文件。

4.3 安装

安装有在线安装和离线安装两种方式。

注意:我们需要去官网下载可以直接使用的 logstash,而不能使用上面自己下载的 logstash 源码。

在线安装

在线安装会去访问 Elastic 的官网,所以需要是在线的环境。

通过执行 logstash/bin 路径下的 logstash-plugin 命令进行安装,等待片刻即可安装成功。

代码语言:javascript复制
logstash-plugin install /path/javaPlugin.gem

离线安装

在某些场景下,环境是不能连接外网的,所以需要使用离线安装的方式。

将生成的 gem 插件压缩到 zip 包中,然后再使用 logstash-plugin 命令进行安装。

代码语言:javascript复制
logstash-plugin install file:///tmp/plugin.zip

5. 验证

官方的插件 example 的功能是翻转字符串的功能,所以我们只需要验证该功能即可。

  1. 创建一个 pipeline.conf
代码语言:javascript复制
input {
    # 输入一个字符串
    generator { message => "Hello world!" count => 1 }
}

filter {
	# 在插件中@LogstashPlugin配置的插件名称
    java_filter_example {}
}

output {
    # 直接打印到控制台
    stdout { }
}
  1. 启动 logstash 加载上面的 pipeline.conf
代码语言:javascript复制
logstash -f pipeline.conf

输出如下,可以看到 message 字段中的 Hello world!被翻转了。

代码语言:javascript复制
{
	"host" => {
		"name" => "4-sip0060"
	},
	"event" => {
		"original" => "Hello world!",
		"sequence" => 0
	},
	"@timestamp" => 2022-12-20T07:27:46.634166300Z,
	"@version" => "1",
	"message" => "!dlrow olleH"
}

6. 相关链接

  • How to write a Java filter plugin (opens new window)

0 人点赞