metricbeat定制化开发入门教程:采集腾讯云clb的数据

2022-09-19 10:41:39 浏览数 (1)

beats组件在云原生的时代,已经是数据采集领域使用最广泛的开源工具集之一。特别是filebeat与metricbeat,被广泛用于各种环境的日志和指标采集。无论是传统的IDC机房中设备,还是基于云原生技术的容器或者k8s pods,或是公有云基础设施组件。beats由elastic提供了数百种用于不同数据源和目的的数据采集模块(modules),以方便我们便捷的采集数据,并以统一的格式和规范的数据发送到不同的数据存储和分析平台。

但无论beats能提供多少种常见的数据源的采集模块,在日常工作和环境中,总会碰到标准模块无法覆盖到的数据源。特别是国内公有云厂商的基础设施组件,beats目前的覆盖率几乎为0。另外还有国内场景的网络设备,比如华为、华三等的路由器,交换机等也不在支持的范围之内。

为了保证完整的全观测性,避免数据盲点,同时确保数据统一的格式与规范,我们需要以一种灵活的方式让beats支持自定义数据源模块的开发。

beats作为开源的工具集,已经在自定义开发方面给我们提供了充分的条件。而在Elastic的官方文档中,针对这个主题,也提供了详实的开发文档。

为了提供一个可落地的案例方便大家学习,本文我们选取了腾讯云的负载均衡作为目标数据源,从头建建一个metricbeat的模块,对其进行定期的数据采集。本案例的目标以快速上手为主,包括如何配置环境,如何通过配置文件提供必须的参数,如何抓取数据并发布。而对于其他生产环境必须的,比如文档、抓取的数据的格式化(遵循Elastic Common Schema)等,会在文中有提及,但不会详细介绍,具体可以参考开发文档中的描述,或者代码中的标准模块的方式进行构建。

前提准备

在进行开发之前,我们需要先准备好开发环境和代码环境。包括安装Go环境与克隆beats代码仓库。

安装与配置Go

Beats 是 Go 程序,因此请安装用于 Beats 开发的 1.18.6 版本的 Go 。

安装 Go后,将 GOPATH环境变量设置为指向您的工作区位置,并确保$GOPATH/bin位于您的 PATH 中。

代码语言:javascript复制
export GOPATH=/Users/lex.li/es_lab/go_project 
export PATH=$PATH:$GOPATH/bin

安装正确的 Go 版本以与 Beats 一起使用的一种确定方法是使用GVM作为Go 版本管理器。Mac 用户的示例是:

代码语言:javascript复制
gvm use 1.18.6
eval $(gvm 1.18.6)

下载beats的代码仓库

代码语言:javascript复制
git clone https://github.com/elastic/beats.git

目录结构如下:

代码语言:javascript复制
 beats$ tree -L 1
.
├── CHANGELOG-developer.asciidoc
├── CHANGELOG-developer.next.asciidoc
├── CHANGELOG.asciidoc
├── CHANGELOG.next.asciidoc
├── CONTRIBUTING.md
├── Jenkinsfile
├── Jenkinsfile.yml
├── LICENSE.txt
├── Makefile
├── NOTICE.txt
├── README.md
├── Vagrantfile
├── auditbeat
├── build
├── codecov.yml
├── deploy
├── dev-tools
├── docs
├── filebeat
├── go.mod
├── go.sum
├── heartbeat
├── libbeat
├── licenses
├── magefile.go
├── make.bat
├── metricbeat
├── packetbeat
├── pytest.ini
├── reviewdog.yml
├── script
├── setup.yml
├── testing
├── tools
├── winlogbeat
└── x-pack

编译工具

Beats 开发人员主要使用Mage进行开发。您可以使用 make 目标安装 mage:

代码语言:javascript复制
make mage

模块开发

安装好环境之后,我们将进入进入metricbeat子目录,进行腾讯云CLB模块的开发。

创建Metricset

首先,通过create-metricset命令,创建一个metricset。metricset——指标集,即我们最基础的数据集,它可以使来自同一个数据源的不同数据集。比如system module下,有多个数据集:

代码语言:javascript复制
 metricsets:
    - cpu
    - load
    - memory
    - network
    - process
    - process_summary
    - socket_summary
    - entropy
    - core
    - diskio
    - socket
    - service
    - users

而每个数据集,有专门的Go协程进行定期的数据采集。

在本例中,运行create-metricset命令后,根据提示,输入Module nameMetricset name

代码语言:shell复制
metricbeat$ make create-metricset
mage createMetricset
Module name: tencent-cloud
Metricset name: clb
Module tencent-cloud created.
Metricset clb created.

执行以上命令后,会在module目录,创建一个tencent-cloud子目录(metricbeat/module/tencent-cloud)

生成的目录结构如下:

代码语言:shell复制
tencent-cloud$ tree
.
├── _meta
│   ├── config.yml
│   ├── docs.asciidoc
│   └── fields.yml
├── clb
│   ├── _meta
│   │   ├── data.json
│   │   ├── docs.asciidoc
│   │   └── fields.yml
│   └── clb.go
└── doc.go

其中,最重要的两个文件是:_meta/config.ymlclb/clb.go

配置文件

_meta/config.yml在编译后,会变成modules.d下的,对应的配置文件。默认生成为:

代码语言:javascript复制
- module: tencent-cloud
 metricsets: ["clb"]
 enabled: false
 period: 10s
 hosts: ["localhost"]

假如我们在上面的内容中添加两个配置项:

  • accessKeyId
  • accessKeySecret

变为:

代码语言:javascript复制
- module: tencent-cloud
 metricsets: ["clb"]
 enabled: false
 period: 10s
 hosts: ["localhost"]
 accessKeyId: "abcd"
 accessKeySecret: "efg"

clb/clb.go文件中,可以通过以下方式读取内容:

代码语言:go复制
type Config struct {
    AcessKeyId     string `config:"accessKeyId"`
    AcessKeySecret string `config:"accessKeySecret"`
}

 var config Config
 if err := base.Module().UnpackConfig(&config); err != nil {
 return nil, err
 }
 
 fmt.Printf("%s, %s", config.AcessKeyId, config.AcessKeySecret)

具体代码

启动一个模块的主要逻辑实现都是在这个metricset(指标集)的同名go文件中实现的。在这个例子中,我们的metricset

名为clb,所以,脚本为我们生成了clb/clb.go。这个生成的文件中,默认包含以下三个函数与一个指标结构:

  • func init()
  • func New(base mb.BaseMetricSet) (mb.MetricSet, error)
  • func (m *MetricSet) Fetch(report mb.ReporterV2)
  • type MetricSet struct
init()

init 方法将指标集注册到中央注册表。在 Go 中,该init()函数在执行所有其他代码之前被调用。这意味着模块将自动注册到全局注册表。

New传递给的方法MustAddMetricSet将在模块设置之后和开始获取数据之前调用。您通常不需要更改文件的这一部分:

代码语言:javascript复制
func init() {
	mb.Registry.MustAddMetricSet("{module}", "{metricset}", New)
}

指标定义

MetricSet 类型定义了metricset 的所有字段。至少它必须包含mb.BaseMetricSet字段,但可以通过附加字段进行扩展。这些变量可用于在多个 fetch 调用之间持久化数据或配置。

您可以向 MetricSet 类型添加更多字段,如以下示例中添加的usernamepassword字符串字段所示:

代码语言:go复制
type MetricSet struct {
	mb.BaseMetricSet
	username    string
	password    string
}

在我们的例子中,因为腾讯云需要accessKeyId和accessKeySecret,我们添加的是一个与此有关的配置结构体。

创建实例

New函数创建一个新的 MetricSet 实例。MetricSet 的设置过程也是通过New. 此方法将在Fetch第一次调用之前调用。

如果需要,该New函数还通过处理其他配置条目来设置配置。

代码语言:go复制
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

	config := struct{}{}

	if err := base.Module().UnpackConfig(&config); err != nil {
		return nil, err
	}

	return &MetricSet{
		BaseMetricSet: base,
	}, nil
}

抓取数据

Fetch方法是指标集的核心部分。Fetch每次检索新数据时都会调用。如果定义了多个主机,Fetch则为每个主机调用一次。Fetch的调用频率基于配置文件中period的定义。

Fetch必须使用该mb.ReporterV2.Event方法发布事件。如果发生错误,Fetch可以返回错误,或者如果Event在循环中被调用,则使用mb.ReporterV2.Error方法发布。这意味着 Metricbeat 始终会发送事件,即使发生故障也是如此。您必须确保错误消息有助于识别实际错误。

以下示例显示了一个度量集Fetch方法,该方法具有一个计数器,每次Fetch调用都会递增:

代码语言:javascript复制
func (m *MetricSet) Fetch(report mb.ReporterV2) error {

	report.Event(mb.Event{
		MetricSetFields: common.MapStr{
			"counter": m.counter,
		}
	})
	m.counter  

	return nil
}

多次抓取

对于可能暴露多个事件的度量集,Event可以在Fetch方法内部多次调用。Event返回一个布尔值,指示度量集是否已经关闭并且不能处理进一步的事件,在这种情况下Fetch应该立即返回。如果在处理多个事件之一时出现错误,则可以使用该mb.ReporterV2.Error方法发布它,而不是返回错误值。

简单的示例代码

可以看到,要开发一个metricbeat的模块,实际上我们只需要完成指标的定义配置文件的设置以及抓取函数的编写。完成这几步骨干是非常简单的事情。

完成以上方法的定义和实现,我们基本就可以做到数据的抓取与发布了,这里给出一个完整的配置文件和实现文件的示例:

配置文件:

代码语言:javascript复制
- module: tencent
  metricsets: ["clb"]
  enabled: true
  period: 10s
  accessKeyId: elastic
  accessKeySecret: changeme
  clbConfig:
    nameSpace: "QCE/LB_PUBLIC"
    region: "ap-chengdu"
    metrics: ["ClientConnum","ClientInactiveConn","ClientConcurConn","ClientNewConn","InTraffic","OutTraffic"]

实现文件:

代码语言:javascript复制
package clb

import (
	"fmt"
	"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
	"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/errors"
	"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile"
	"github.com/elastic/elastic-agent-libs/mapstr"
	"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
	"github.com/elastic/beats/v7/metricbeat/mb"
	monitor "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/monitor/v20180724"
)


// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host defined in the module's configuration. After the
// MetricSet has been created then Fetch will begin to be called periodically.
func init() {
	mb.Registry.MustAddMetricSet("tencent", "clb", New)
}

type ClbData struct {
    Vip    *string
    Metric  string
    Timestamps []*float64
    Values   []*float64
}

// Config defines all required and optional parameters for tencent metricsets
type Config struct {
	AcessKeyId     string `config:"accessKeyId"`
	AcessKeySecret string `config:"accessKeySecret"`
	Namespace      string `config:"clbConfig.nameSpace"` 
	Region         string `config:"clbConfig.region"` 
}

// MetricSet holds any configuration or state information. It must implement
// the mb.MetricSet interface. And this is best achieved by embedding
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
	mb.BaseMetricSet
	Config
}


// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
	cfgwarn.Beta("The tencent clb metricset is beta.")

	var config Config
	if err := base.Module().UnpackConfig(&config); err != nil {
		return nil, err
	}
	fmt.Println("read the config:")
	fmt.Println(config)

	return &MetricSet{
		BaseMetricSet: base,
		Config: config,
		// clb_data,
	}, nil
}

func fetch_clb_data(d *ClbData, c Config) {
	// 实例化一个认证对象,入参需要传入腾讯云账户secretId,secretKey,此处还需注意密钥对的保密
	// 密钥可前往https://console.cloud.tencent.com/cam/capi网站进行获取
	credential := common.NewCredential(
			c.AcessKeyId,
			c.AcessKeySecret,
	)
	// 实例化一个client选项,可选的,没有特殊需求可以跳过
	cpf := profile.NewClientProfile()
	cpf.HttpProfile.Endpoint = "monitor.tencentcloudapi.com"
	// 实例化要请求产品的client对象,clientProfile是可选的
	client, _ := monitor.NewClient(credential, c.Region, cpf)

	// 实例化一个请求对象,每个接口都会对应一个request对象
	request := monitor.NewGetMonitorDataRequest()
	
	request.Namespace = common.StringPtr(c.Namespace)
	request.MetricName = common.StringPtr("ClientConnum")
	request.Instances = []*monitor.Instance {
			&monitor.Instance {
					Dimensions: []*monitor.Dimension {
							&monitor.Dimension {
									Name: common.StringPtr("vip"),
									Value: common.StringPtr("111.231.213.19"),
							},
					},
			},
	}

	// 返回的resp是一个GetMonitorDataResponse的实例,与请求对象对应
	response, err := client.GetMonitorData(request)
	if _, ok := err.(*errors.TencentCloudSDKError); ok {
			fmt.Printf("An API error has returned: %s", err)
			return
	}
	if err != nil {
			panic(err)
	}
	// 输出json格式的字符串回包
	// fmt.Printf("%s", response.ToJsonString())

	d.Vip = response.Response.DataPoints[0].Dimensions[0].Value
	d.Metric= "ClientConnum"
	d.Timestamps = response.Response.DataPoints[0].Timestamps
	d.Values = response.Response.DataPoints[0].Values

} 

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
	var clb_data ClbData
	fetch_clb_data(&clb_data,m.Config)
	report.Event(mb.Event{
		MetricSetFields: mapstr.M{
			"data": clb_data,
		},
	})

	return nil
}

上面的代码中,从腾讯云获取云负载均衡指标数据的代码,可以直接从腾讯云的控制台上API Explorer中,通过代码生成功能获得:

但因为是入门教程,这里只列出了最基本的实现,在抓取数据的时候,只读取了腾讯云cloud monitor API下的与CLB公网指标(QCE/LB_PUBLIC)相关的ClientConnum数据。并且 dimension, instance等都是写死的。

完备的逻辑,应该是在配置文件里面提供选项由客户配置,region, 指标等信息。

然后像上文提到的多次抓取一样,先获取目前账户下,特定region所有的clb的实例,在向每一个实例发送特定指标的获取命令,最后再将数据进行处理,转换和合并。

而要在开发一个用于生产的模块,我们还需要考虑类似配置文件的完备性(config.yml)、字段定义的规范性(fields.yml)、文档的可读性(docs.asciidoc)、测试的全面性等问题({module}_test.go)。具体可以参考:

https://www.elastic.co/guide/en/beats/devguide/current/metricset-details.html

https://www.elastic.co/guide/en/beats/devguide/current/creating-metricbeat-module.html

调试

完成代码的编写好,我们需要对代码进行生成和编译,在metricbeat的根目录下执行以下命令:

代码语言:javascript复制
mage update
mage build

以上命令,将会扫描metricbeat中的变化的文件,并进行编译。编译之后,会更新对应的module中的配置文件和代码,并将其编译到metricbeat的二进制文件当中。

在官方文中,我们可以通过单元测试,集成测试,和系统测试的方式进行校验。在本教程中,我们不详细描述测试的方法和实现方式。可参考文档中的测试介绍。

其实我们简单的通过启动metricbeat的方式进行测试,就可查看效果:

代码语言:shell复制
./metricbeat modules disable system
./metricbeat modules enable tencent
./metricbeat -e

这里,记得修改metricbeat.yml文件,将output改为stdout:

代码语言:yaml复制
output.console:
 pretty: true
# ---------------------------- Elasticsearch Output ----------------------------
# output.elasticsearch:
#   # Array of hosts to connect to.
#   hosts: ["localhost:9200"]
#   allow_older_versions: true

运行./metricbeat -e之后,我们就可以看到具体的输出:

代码语言:javascript复制
{"log.level":"info","@timestamp":"2022-09-18T13:27:27.617 0800","log.logger":"add_cloud_metadata","log.origin":{"file.name":"add_cloud_metadata/add_cloud_metadata.go","file.line":102},"message":"add_cloud_metadata: hosting provider type not detected.","service.name":"metricbeat","ecs.version":"1.6.0"}
{
  "@timestamp": "2022-09-18T05:27:24.681Z",
  "@metadata": {
    "beat": "metricbeat",
    "type": "_doc",
    "version": "8.5.0"
  },
  "event": {
    "dataset": "tencent.clb",
    "module": "tencent",
    "duration": 335221416
  },
  "metricset": {
    "name": "clb",
    "period": 10000
  },
  "service": {
    "type": "tencent"
  },
  "tencent": {
    "clb": {
      "data": {
        "Values": [
          0,
          。。。
          0
        ],
        "Vip": "111.231.213.19",
        "Metric": "ClientConnum",
        "Timestamps": [
          1.6634304e 09,
         。。。
          1.6634787e 09
        ]
      }
    }
  },
  "host": {
    "hostname": "Lex-M1X.local",
    "name": "Lex-M1X.local",
    "architecture": "arm64",
    "os": {
      "name": "macOS",
      "kernel": "21.6.0",
      "build": "21G83",
      "type": "macos",
      "platform": "darwin",
      "version": "12.5.1",
      "family": "darwin"
    },
    "id": "C89AB38E-3EF1-5898-B58F-ABD82D7C2BDF",
    "ip": [
      "fe80::6ce8:8fff:fef7:c688",
      "fe80::6ce8:8fff:fef7:c687",
      "fe80::6ce8:8fff:fef7:c686",
      "fe80::84d:576b:b708:f1f0",
      "192.168.3.46",
      "fe80::98e9:b4ff:fe23:588",
      "fe80::98e9:b4ff:fe23:588",
      "fe80::aee9:4fad:7f27:d6c5",
      "fe80::7d96:c4db:30bd:ca82",
      "fe80::ce81:b1c:bd2c:69e",
      "fe80::5419:b4e3:56fb:dd5e",
      "fe80::6162:9f1d:d92a:71",
      "fe80::7841:7008:e141:469e"
    ],
    "mac": [
      "36-CB-89-29-1C-00",
      "36-CB-89-29-1C-04",
      "36-CB-89-29-1C-08",
      "50-1F-C6-F2-1C-55",
      "6E-E8-8F-F7-C6-66",
      "6E-E8-8F-F7-C6-67",
      "6E-E8-8F-F7-C6-68",
      "6E-E8-8F-F7-C6-86",
      "6E-E8-8F-F7-C6-87",
      "6E-E8-8F-F7-C6-88",
      "72-1F-C6-F2-1C-55",
      "9A-E9-B4-23-05-88"
    ]
  },
  "agent": {
    "type": "metricbeat",
    "version": "8.5.0",
    "ephemeral_id": "99d59478-71ef-4a85-a922-df5e7b426631",
    "id": "2e14d4d4-90c5-4aaf-a9ba-37797406b303",
    "name": "Lex-M1X.local"
  },
  "ecs": {
    "version": "8.0.0"
  }
}

通过该输出,我们就可以确定写入Elasticsearch中的数据的形式,然后定制具体的ingest pipeline和dashboard,构建完整的dodule了。

总结

本文中,我们简单介绍了如何快速的构建一个metricbeat的腾讯云的负载均衡的数据采集模块。文中介绍了如何配置环境,如何通过配置文件提供必须的参数,如何抓取数据并发布。但对于一个完整的模块来说,这只是最基础的部分我们需要参考其他标准模块,才能够实现一个具备数据采集、数据标准化、数据可视化、连同数据生命周期,数据告警等功能一起的完整的metricbeat module。

但至少,这是一个很好的起步,如果我们有需要,不仅是公有云的基础组件,对于其他IDC中现在beats尚未支持的数据源,我们也可以以类似的方法自己开发模块,并可以回馈社区,方便所有有共同需求的人。

0 人点赞