beats组件在云原生的时代,已经是数据采集领域使用最广泛的开源工具集之一。特别是filebeat与metricbeat,被广泛用于各种环境的日志和指标采集。无论是传统的IDC机房中设备,还是基于云原生技术的容器或者k8s pods,或是公有云基础设施组件。beats由elastic提供了数百种用于不同数据源和目的的数据采集模块(modules),以方便我们便捷的采集数据,并以统一的格式和规范的数据发送到不同的数据存储和分析平台。
但无论beats能提供多少种常见的数据源的采集模块,在日常工作和环境中,总会碰到标准模块无法覆盖到的数据源。特别是国内公有云厂商的基础设施组件,beats目前的覆盖率几乎为0。另外还有国内场景的网络设备,比如华为、华三等的路由器,交换机等也不在支持的范围之内。
为了保证完整的全观测性,避免数据盲点,同时确保数据统一的格式与规范,我们需要以一种灵活的方式让beats支持自定义数据源模块的开发。
beats作为开源的工具集,已经在自定义开发方面给我们提供了充分的条件。而在Elastic的官方文档中,针对这个主题,也提供了详实的开发文档。
为了提供一个可落地的案例方便大家学习,本文我们选取了腾讯云的负载均衡作为目标数据源,从头建建一个metricbeat的模块,对其进行定期的数据采集。本案例的目标以快速上手为主,包括如何配置环境,如何通过配置文件提供必须的参数,如何抓取数据并发布。而对于其他生产环境必须的,比如文档、抓取的数据的格式化(遵循Elastic Common Schema)等,会在文中有提及,但不会详细介绍,具体可以参考开发文档中的描述,或者代码中的标准模块的方式进行构建。
前提准备
在进行开发之前,我们需要先准备好开发环境和代码环境。包括安装Go环境与克隆beats代码仓库。
安装与配置Go
Beats 是 Go 程序,因此请安装用于 Beats 开发的 1.18.6 版本的 Go 。
安装 Go后,将 GOPATH环境变量设置为指向您的工作区位置,并确保$GOPATH/bin
位于您的 PATH 中。
export GOPATH=/Users/lex.li/es_lab/go_project
export PATH=$PATH:$GOPATH/bin
代码语言:javascript复制安装正确的 Go 版本以与 Beats 一起使用的一种确定方法是使用GVM作为Go 版本管理器。Mac 用户的示例是:
gvm use 1.18.6
eval $(gvm 1.18.6)
下载beats的代码仓库
代码语言:javascript复制git clone https://github.com/elastic/beats.git
目录结构如下:
代码语言:javascript复制 beats$ tree -L 1
.
├── CHANGELOG-developer.asciidoc
├── CHANGELOG-developer.next.asciidoc
├── CHANGELOG.asciidoc
├── CHANGELOG.next.asciidoc
├── CONTRIBUTING.md
├── Jenkinsfile
├── Jenkinsfile.yml
├── LICENSE.txt
├── Makefile
├── NOTICE.txt
├── README.md
├── Vagrantfile
├── auditbeat
├── build
├── codecov.yml
├── deploy
├── dev-tools
├── docs
├── filebeat
├── go.mod
├── go.sum
├── heartbeat
├── libbeat
├── licenses
├── magefile.go
├── make.bat
├── metricbeat
├── packetbeat
├── pytest.ini
├── reviewdog.yml
├── script
├── setup.yml
├── testing
├── tools
├── winlogbeat
└── x-pack
编译工具
Beats 开发人员主要使用Mage进行开发。您可以使用 make 目标安装 mage:
代码语言:javascript复制make mage
模块开发
安装好环境之后,我们将进入进入metricbeat子目录,进行腾讯云CLB模块的开发。
创建Metricset
首先,通过create-metricset
命令,创建一个metricset。metricset——指标集,即我们最基础的数据集,它可以使来自同一个数据源的不同数据集。比如system module下,有多个数据集:
metricsets:
- cpu
- load
- memory
- network
- process
- process_summary
- socket_summary
- entropy
- core
- diskio
- socket
- service
- users
而每个数据集,有专门的Go协程进行定期的数据采集。
在本例中,运行create-metricset
命令后,根据提示,输入Module name
和Metricset name
:
metricbeat$ make create-metricset
mage createMetricset
Module name: tencent-cloud
Metricset name: clb
Module tencent-cloud created.
Metricset clb created.
执行以上命令后,会在module目录,创建一个tencent-cloud子目录(metricbeat/module/tencent-cloud)
生成的目录结构如下:
代码语言:shell复制tencent-cloud$ tree
.
├── _meta
│ ├── config.yml
│ ├── docs.asciidoc
│ └── fields.yml
├── clb
│ ├── _meta
│ │ ├── data.json
│ │ ├── docs.asciidoc
│ │ └── fields.yml
│ └── clb.go
└── doc.go
其中,最重要的两个文件是:_meta/config.yml
和clb/clb.go
配置文件
_meta/config.yml
在编译后,会变成modules.d下的,对应的配置文件。默认生成为:
- module: tencent-cloud
metricsets: ["clb"]
enabled: false
period: 10s
hosts: ["localhost"]
假如我们在上面的内容中添加两个配置项:
- accessKeyId
- accessKeySecret
变为:
代码语言:javascript复制- module: tencent-cloud
metricsets: ["clb"]
enabled: false
period: 10s
hosts: ["localhost"]
accessKeyId: "abcd"
accessKeySecret: "efg"
在clb/clb.go
文件中,可以通过以下方式读取内容:
type Config struct {
AcessKeyId string `config:"accessKeyId"`
AcessKeySecret string `config:"accessKeySecret"`
}
var config Config
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}
fmt.Printf("%s, %s", config.AcessKeyId, config.AcessKeySecret)
具体代码
启动一个模块的主要逻辑实现都是在这个metricset(指标集)的同名go文件中实现的。在这个例子中,我们的metricset
名为clb,所以,脚本为我们生成了clb/clb.go
。这个生成的文件中,默认包含以下三个函数与一个指标结构:
func init()
func New(base mb.BaseMetricSet) (mb.MetricSet, error)
func (m *MetricSet) Fetch(report mb.ReporterV2)
type MetricSet struct
init()
init 方法将指标集注册到中央注册表。在 Go 中,该init()
函数在执行所有其他代码之前被调用。这意味着模块将自动注册到全局注册表。
New
传递给的方法MustAddMetricSet
将在模块设置之后和开始获取数据之前调用。您通常不需要更改文件的这一部分:
func init() {
mb.Registry.MustAddMetricSet("{module}", "{metricset}", New)
}
指标定义
MetricSet 类型定义了metricset 的所有字段。至少它必须包含mb.BaseMetricSet
字段,但可以通过附加字段进行扩展。这些变量可用于在多个 fetch 调用之间持久化数据或配置。
您可以向 MetricSet 类型添加更多字段,如以下示例中添加的username
和password
字符串字段所示:
type MetricSet struct {
mb.BaseMetricSet
username string
password string
}
在我们的例子中,因为腾讯云需要accessKeyId和accessKeySecret,我们添加的是一个与此有关的配置结构体。
创建实例
New
函数创建一个新的 MetricSet 实例。MetricSet 的设置过程也是通过New
. 此方法将在Fetch
第一次调用之前调用。
如果需要,该New
函数还通过处理其他配置条目来设置配置。
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := struct{}{}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}
return &MetricSet{
BaseMetricSet: base,
}, nil
}
抓取数据
该Fetch
方法是指标集的核心部分。Fetch
每次检索新数据时都会调用。如果定义了多个主机,Fetch
则为每个主机调用一次。Fetch
的调用频率基于配置文件中period
的定义。
Fetch
必须使用该mb.ReporterV2.Event
方法发布事件。如果发生错误,Fetch
可以返回错误,或者如果Event
在循环中被调用,则使用mb.ReporterV2.Error
方法发布。这意味着 Metricbeat 始终会发送事件,即使发生故障也是如此。您必须确保错误消息有助于识别实际错误。
以下示例显示了一个度量集Fetch
方法,该方法具有一个计数器,每次Fetch
调用都会递增:
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
report.Event(mb.Event{
MetricSetFields: common.MapStr{
"counter": m.counter,
}
})
m.counter
return nil
}
多次抓取
对于可能暴露多个事件的度量集,Event
可以在Fetch
方法内部多次调用。Event
返回一个布尔值,指示度量集是否已经关闭并且不能处理进一步的事件,在这种情况下Fetch
应该立即返回。如果在处理多个事件之一时出现错误,则可以使用该mb.ReporterV2.Error
方法发布它,而不是返回错误值。
简单的示例代码
可以看到,要开发一个metricbeat的模块,实际上我们只需要完成指标的定义、配置文件的设置、以及抓取函数的编写。完成这几步骨干是非常简单的事情。
完成以上方法的定义和实现,我们基本就可以做到数据的抓取与发布了,这里给出一个完整的配置文件和实现文件的示例:
配置文件:
代码语言:javascript复制- module: tencent
metricsets: ["clb"]
enabled: true
period: 10s
accessKeyId: elastic
accessKeySecret: changeme
clbConfig:
nameSpace: "QCE/LB_PUBLIC"
region: "ap-chengdu"
metrics: ["ClientConnum","ClientInactiveConn","ClientConcurConn","ClientNewConn","InTraffic","OutTraffic"]
实现文件:
代码语言:javascript复制package clb
import (
"fmt"
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/errors"
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/metricbeat/mb"
monitor "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/monitor/v20180724"
)
// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host defined in the module's configuration. After the
// MetricSet has been created then Fetch will begin to be called periodically.
func init() {
mb.Registry.MustAddMetricSet("tencent", "clb", New)
}
type ClbData struct {
Vip *string
Metric string
Timestamps []*float64
Values []*float64
}
// Config defines all required and optional parameters for tencent metricsets
type Config struct {
AcessKeyId string `config:"accessKeyId"`
AcessKeySecret string `config:"accessKeySecret"`
Namespace string `config:"clbConfig.nameSpace"`
Region string `config:"clbConfig.region"`
}
// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
mb.BaseMetricSet
Config
}
// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The tencent clb metricset is beta.")
var config Config
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}
fmt.Println("read the config:")
fmt.Println(config)
return &MetricSet{
BaseMetricSet: base,
Config: config,
// clb_data,
}, nil
}
func fetch_clb_data(d *ClbData, c Config) {
// 实例化一个认证对象,入参需要传入腾讯云账户secretId,secretKey,此处还需注意密钥对的保密
// 密钥可前往https://console.cloud.tencent.com/cam/capi网站进行获取
credential := common.NewCredential(
c.AcessKeyId,
c.AcessKeySecret,
)
// 实例化一个client选项,可选的,没有特殊需求可以跳过
cpf := profile.NewClientProfile()
cpf.HttpProfile.Endpoint = "monitor.tencentcloudapi.com"
// 实例化要请求产品的client对象,clientProfile是可选的
client, _ := monitor.NewClient(credential, c.Region, cpf)
// 实例化一个请求对象,每个接口都会对应一个request对象
request := monitor.NewGetMonitorDataRequest()
request.Namespace = common.StringPtr(c.Namespace)
request.MetricName = common.StringPtr("ClientConnum")
request.Instances = []*monitor.Instance {
&monitor.Instance {
Dimensions: []*monitor.Dimension {
&monitor.Dimension {
Name: common.StringPtr("vip"),
Value: common.StringPtr("111.231.213.19"),
},
},
},
}
// 返回的resp是一个GetMonitorDataResponse的实例,与请求对象对应
response, err := client.GetMonitorData(request)
if _, ok := err.(*errors.TencentCloudSDKError); ok {
fmt.Printf("An API error has returned: %s", err)
return
}
if err != nil {
panic(err)
}
// 输出json格式的字符串回包
// fmt.Printf("%s", response.ToJsonString())
d.Vip = response.Response.DataPoints[0].Dimensions[0].Value
d.Metric= "ClientConnum"
d.Timestamps = response.Response.DataPoints[0].Timestamps
d.Values = response.Response.DataPoints[0].Values
}
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
var clb_data ClbData
fetch_clb_data(&clb_data,m.Config)
report.Event(mb.Event{
MetricSetFields: mapstr.M{
"data": clb_data,
},
})
return nil
}
上面的代码中,从腾讯云获取云负载均衡指标数据的代码,可以直接从腾讯云的控制台上API Explorer中,通过代码生成功能获得:
但因为是入门教程,这里只列出了最基本的实现,在抓取数据的时候,只读取了腾讯云cloud monitor API下的与CLB公网指标(QCE/LB_PUBLIC
)相关的ClientConnum
数据。并且 dimension, instance等都是写死的。
完备的逻辑,应该是在配置文件里面提供选项由客户配置,region, 指标等信息。
然后像上文提到的多次抓取一样,先获取目前账户下,特定region所有的clb的实例,在向每一个实例发送特定指标的获取命令,最后再将数据进行处理,转换和合并。
而要在开发一个用于生产的模块,我们还需要考虑类似配置文件的完备性(config.yml
)、字段定义的规范性(fields.yml
)、文档的可读性(docs.asciidoc
)、测试的全面性等问题({module}_test.go
)。具体可以参考:
https://www.elastic.co/guide/en/beats/devguide/current/metricset-details.html
https://www.elastic.co/guide/en/beats/devguide/current/creating-metricbeat-module.html
调试
完成代码的编写好,我们需要对代码进行生成和编译,在metricbeat的根目录下执行以下命令:
代码语言:javascript复制mage update
mage build
以上命令,将会扫描metricbeat中的变化的文件,并进行编译。编译之后,会更新对应的module中的配置文件和代码,并将其编译到metricbeat的二进制文件当中。
在官方文中,我们可以通过单元测试,集成测试,和系统测试的方式进行校验。在本教程中,我们不详细描述测试的方法和实现方式。可参考文档中的测试介绍。
其实我们简单的通过启动metricbeat的方式进行测试,就可查看效果:
代码语言:shell复制./metricbeat modules disable system
./metricbeat modules enable tencent
./metricbeat -e
这里,记得修改metricbeat.yml文件,将output改为stdout:
代码语言:yaml复制output.console:
pretty: true
# ---------------------------- Elasticsearch Output ----------------------------
# output.elasticsearch:
# # Array of hosts to connect to.
# hosts: ["localhost:9200"]
# allow_older_versions: true
运行./metricbeat -e
之后,我们就可以看到具体的输出:
{"log.level":"info","@timestamp":"2022-09-18T13:27:27.617 0800","log.logger":"add_cloud_metadata","log.origin":{"file.name":"add_cloud_metadata/add_cloud_metadata.go","file.line":102},"message":"add_cloud_metadata: hosting provider type not detected.","service.name":"metricbeat","ecs.version":"1.6.0"}
{
"@timestamp": "2022-09-18T05:27:24.681Z",
"@metadata": {
"beat": "metricbeat",
"type": "_doc",
"version": "8.5.0"
},
"event": {
"dataset": "tencent.clb",
"module": "tencent",
"duration": 335221416
},
"metricset": {
"name": "clb",
"period": 10000
},
"service": {
"type": "tencent"
},
"tencent": {
"clb": {
"data": {
"Values": [
0,
。。。
0
],
"Vip": "111.231.213.19",
"Metric": "ClientConnum",
"Timestamps": [
1.6634304e 09,
。。。
1.6634787e 09
]
}
}
},
"host": {
"hostname": "Lex-M1X.local",
"name": "Lex-M1X.local",
"architecture": "arm64",
"os": {
"name": "macOS",
"kernel": "21.6.0",
"build": "21G83",
"type": "macos",
"platform": "darwin",
"version": "12.5.1",
"family": "darwin"
},
"id": "C89AB38E-3EF1-5898-B58F-ABD82D7C2BDF",
"ip": [
"fe80::6ce8:8fff:fef7:c688",
"fe80::6ce8:8fff:fef7:c687",
"fe80::6ce8:8fff:fef7:c686",
"fe80::84d:576b:b708:f1f0",
"192.168.3.46",
"fe80::98e9:b4ff:fe23:588",
"fe80::98e9:b4ff:fe23:588",
"fe80::aee9:4fad:7f27:d6c5",
"fe80::7d96:c4db:30bd:ca82",
"fe80::ce81:b1c:bd2c:69e",
"fe80::5419:b4e3:56fb:dd5e",
"fe80::6162:9f1d:d92a:71",
"fe80::7841:7008:e141:469e"
],
"mac": [
"36-CB-89-29-1C-00",
"36-CB-89-29-1C-04",
"36-CB-89-29-1C-08",
"50-1F-C6-F2-1C-55",
"6E-E8-8F-F7-C6-66",
"6E-E8-8F-F7-C6-67",
"6E-E8-8F-F7-C6-68",
"6E-E8-8F-F7-C6-86",
"6E-E8-8F-F7-C6-87",
"6E-E8-8F-F7-C6-88",
"72-1F-C6-F2-1C-55",
"9A-E9-B4-23-05-88"
]
},
"agent": {
"type": "metricbeat",
"version": "8.5.0",
"ephemeral_id": "99d59478-71ef-4a85-a922-df5e7b426631",
"id": "2e14d4d4-90c5-4aaf-a9ba-37797406b303",
"name": "Lex-M1X.local"
},
"ecs": {
"version": "8.0.0"
}
}
通过该输出,我们就可以确定写入Elasticsearch中的数据的形式,然后定制具体的ingest pipeline和dashboard,构建完整的dodule了。
总结
本文中,我们简单介绍了如何快速的构建一个metricbeat的腾讯云的负载均衡的数据采集模块。文中介绍了如何配置环境,如何通过配置文件提供必须的参数,如何抓取数据并发布。但对于一个完整的模块来说,这只是最基础的部分,我们需要参考其他标准模块,才能够实现一个具备数据采集、数据标准化、数据可视化、连同数据生命周期,数据告警等功能一起的完整的metricbeat module。
但至少,这是一个很好的起步,如果我们有需要,不仅是公有云的基础组件,对于其他IDC中现在beats尚未支持的数据源,我们也可以以类似的方法自己开发模块,并可以回馈社区,方便所有有共同需求的人。