0. 前言
在工作中遇到,logstash 中的 filter 中写了大量的解析逻辑,解析性能遇到瓶颈,所以希望将该部分的逻辑转换成 java 开发的插件,以提高解析速度。
本文主要记录我开发插件的过程。
1. 准备开发环境
下载 logstash 源码
直接可以去 logstash github (opens new window) 中选择自己使用的版本进行下载即可。
构建 logstash
将下载的 logstash 压缩包解压出来,进入 logstash 根目录下,当前路径下有 gradlew 和 gradlew.bat 两个脚本文件,前者是在 linux 下执行的,后者是在 windows 执行的脚本。
假设当前环境是 windows,执行 gradlew.bat assemble
命令可以对当前模块进行构建。在这个过程中会去下载所有的依赖包到本地。等待构建完成,直至输出 BUILD SUCCESSFUL
代表构建成功。
gradlew.bat
脚本是对 gradle 的封装,在执行该命令时,会主动根据gradle/wrapper/
下的配置去下载 gradle 工具,然后再调用 gradle 进行构建模块
2. 编写 logstash java filter 插件
2.1 准备官方 demo
下载 java 插件官方模板
将 logstash-filter-java_filter_example (opens new window) 下载到本地使用,自定义开发的插件是基于该 example 进行修改的。
构建插件
在该项目的根目录下,创建 gradle.properties
文件,需要添加变量指定 logstash 下的 logstash-core 目录路径,使用绝对路径即可。
LOGSTASH_CORE_PATH=<target_folder>/logstash-core
该变量是给 build.gradle
文件中使用的。
2.2 开发 Filter 代码
首先来看官方提供的 demo Filter 代码,代码路径在:srcmainjavaorglogstashpluginsJavaFilterExample.java
,我们开发的插件基本是按照这个例子进行修改实现的。
- 设置 pipeline 中的插件名称
首先可以看到有一个注解 @LogstashPlugin(name = "java_filter_example")
name 的值是指我们在 pipeline 中填写的插件名称。
- 在 pipeline 中传参到插件中
通过 PluginConfigSpec.stringSetting
定义变量
public static final PluginConfigSpec<String> SOURCE_CONFIG = PluginConfigSpec.stringSetting("source", "message");
再通过在构造方法中调用 get 方法即可获取到传入的值
代码语言:javascript复制this.sourceField = config.get(SOURCE_CONFIG);
并且需要将新增的字段添加到 configSchema
方法中并返回出去。
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
// should return a list of all configuration options for this plugin
return Collections.singletonList(SOURCE_CONFIG);
}
- filter 主体编码
该插件的主体是 filter 方法,也就是数据的过滤走的 filter 方法,我们将想要做的解析规则实现在该方法中即可。
可以看到该方法中有一个对 events 遍历的处理,每一个 Event 都是进来的每一条数据,然后对该条数据进行处理转换,最后再将转换好的 events 传出去。
可以看到官方的案例是将传入的 message 字符串翻转。
代码语言:javascript复制@Override
public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener) {
for (Event e : events) {
Object f = e.getField(sourceField);
if (f instanceof String) {
e.setField(sourceField, StringUtils.reverse((String)f));
matchListener.filterMatched(e);
}
}
return events;
}
3. 单元测试
单测对插件来说至关重要,插件的规则转换流程、判断逻辑都非常多,各种类型的数据都可能导致插件出错,而插件验证需要编译、打包、安装再测试,流程较长,所以我们可以通过单测来减少以上流程的进行,在单测中就把所有的可能性都验证到,节省大量的时间。并且在后续迭代修改中,可以减少改动引发。
建议可以使用 junit 的参数化单测方式,可以提高单测的效率和数量。这个需要在 build.gradle 文件中的 dependencies 添加支持参数化的库来支持。
4. 打包部署 Filter 插件
4.1 元数据信息
我们需要在 build.gradle
文件中修改部分的插件元数据信息,像 description、authors 和 email 等字段都可以随意填写,以下字段需要注意:
- group,需要和包名相同
- pluginClass,需要和插件 Filter 的类名相同
- pluginName,需要和
@LogstashPlugin
中的 name 相同
4.2 打包任务
通过执行 gradlew.bat gem
进行插件打包任务,最后会在插件根目下生成 .gem
的插件安装包文件。
4.3 安装
安装有在线安装和离线安装两种方式。
注意:我们需要去官网下载可以直接使用的 logstash,而不能使用上面自己下载的 logstash 源码。
在线安装
在线安装会去访问 Elastic 的官网,所以需要是在线的环境。
通过执行 logstash/bin 路径下的 logstash-plugin 命令进行安装,等待片刻即可安装成功。
代码语言:javascript复制logstash-plugin install /path/javaPlugin.gem
离线安装
在某些场景下,环境是不能连接外网的,所以需要使用离线安装的方式。
将生成的 gem 插件压缩到 zip 包中,然后再使用 logstash-plugin 命令进行安装。
代码语言:javascript复制logstash-plugin install file:///tmp/plugin.zip
5. 验证
官方的插件 example 的功能是翻转字符串的功能,所以我们只需要验证该功能即可。
- 创建一个 pipeline.conf
input {
# 输入一个字符串
generator { message => "Hello world!" count => 1 }
}
filter {
# 在插件中@LogstashPlugin配置的插件名称
java_filter_example {}
}
output {
# 直接打印到控制台
stdout { }
}
- 启动 logstash 加载上面的 pipeline.conf
logstash -f pipeline.conf
输出如下,可以看到 message 字段中的 Hello world!
被翻转了。
{
"host" => {
"name" => "4-sip0060"
},
"event" => {
"original" => "Hello world!",
"sequence" => 0
},
"@timestamp" => 2022-12-20T07:27:46.634166300Z,
"@version" => "1",
"message" => "!dlrow olleH"
}
6. 相关链接
- How to write a Java filter plugin (opens new window)