基于流计算 Oceanus 和 Elasticsearch Service 实现实时监控系统

2021-11-18 13:30:55 浏览数 (1)

本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其 App 应用的 CPU 和内存等资源消耗数据,高效地保障系统稳健运行。运用云化的 Kafka、Flink、ES 等组件,大大减少了开发运维人员的投入。

方案描述

1. 概述

本方案结合腾讯云 CKafka、流计算 Oceanus (Flink)、 Grafana 等,通过 Beats 中的 Filebeat 工具实时采集系统和应用监控数据,并传输到 CKafka,再将 CKafka 数据接入流计算 Oceanus (Flink),经过简单的业务逻辑处理输出到 Elasticsearch,最后通过 Kibana 页面查询结果,并利用云 Grafana 监控 CVM 或业务应用指标。

Beats 是一款轻量级日志采集器,目前 Beats 包含多种工具:Metricbeat、Filebeat、Heartbeat 等。每个 Beat 都有一个简单的任务:采集日志或数据并发送到输出目的地。

实时监控场景.png实时监控场景.png

2. 方案架构

架构图.png架构图.png

前置准备

在实现本方案前,请确保已创建并配置了相应的大数据组件。

1. 创建私有网络 VPC

私有网络(VPC)是一块您在腾讯云上自定义的逻辑隔离网络空间,在构建 CKafka、流计算 Oceanus,Elasticsearch 集群等服务时选择建议同一个 VPC。具体创建步骤请参考帮助文档。

2. 创建 Ckafka 实例

Kafka 建议选择最新的 2.4.1 版本,和 Filebeat 采集工具兼容性较好。

购买完成后,再创建 Kafka topic: topic-app-info

3. 创建流计算 Oceanus 集群

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

在流计算 Oceanus 控制台的【集群管理】->【新建集群】页面创建集群,具体步骤请参考帮助文档。

4. 创建 Elasticsearch 实例

在 Elasticsearch 控制台,点击左上角【新建】创建集群,具体步骤请参考帮助文档。

5. 创建独立 Grafana 资源

独立的 Grafana 在内测中,需在 Grafana 管理页面 进行单独购买实现业务监控指标的展示。购买时仍需选择与其他资源同一 VPC 网络。

6. 安装配置 Filebeat

Filebeat 是一款轻量级日志数据采集工具,通过监控 CVM 机器中指定位置的文件来收集日志信息。可以通过以下两种方式安装 Filebeat。

  • 安装方式一:下载 Filebeat 并安装 Filebeat下载地址 ;
  • 安装方式二:采用【Elasticsearch 管理页面】-->【beats 管理】中提供的 Filebeat。

本示例中采用了方式一,下载到 CVM 中并配置 Filebeat,在 filebeat.yml 文件中提添加如下配置项:

代码语言:txt复制
# 监控日志文件配置
- type: log
  enabled: true
  paths:
    - /tmp/test.log
代码语言:txt复制
# 监控数据输出项配置
output.kafka:
  version: 2.0.0                           # kafka版本号
  hosts: ["xx.xx.xx.xx:xxxx"]              # 请填写实际的IP地址 端口
  topic: 'topic-app-info'                  # 请填写实际的topic

请根据实际业务需求配置相对应的 filebeat.yml 文件,参考 Filebeat官方文档。

注意:示例选用 2.4.1 的 Ckafka 版本,这里配置 version: 2.0.0。版本对应不上可能出现“ERROR kafka kafka/client.go:341 Kafka (topic=topic-app-info): dropping invalid message” 错误

方案实现

接下来通过案例介绍如何通过流计算 Oceanus 实现个性化监控。

1. Filebeat 采集数据

(1) 进入到 Filebeat 根目录下,并启动 Filebeat 进行数据采集。示例中采集了 top 命令中显示的 CPU、内存等信息,也可以采集 jar 应用的日志、JVM 使用情况、监听端口等,详情参考 Filebeat 官网。

代码语言:txt复制
# filebeat 启动
./filebeat -e -c filebeat.yml

# 监控系统信息写入 test.log 文件
top -d 10 >>/tmp/test.log

(2) 进入CKafka页面,点击左侧【消息查询】,查询对应 topic 消息,验证是否采集到数据。

Filebeat 采集到的数据格式:

代码语言:txt复制
{
	"@timestamp": "2021-08-30T10:22:52.888Z",
	"@metadata": {
		"beat": "filebeat",
		"type": "_doc",
		"version": "7.14.0"
	},
	"input": {
		"type": "log"
	},
	"host": {
		"ip": ["xx.xx.xx.xx", "xx::xx:xx:xx:xx"],
		"mac": ["xx:xx:xx:xx:xx:xx"],
		"hostname": "xx.xx.xx.xx",
		"architecture": "x86_64",
		"os": {
			"type": "linux",
			"platform": "centos",
			"version": "7(Core)",
			"family": "redhat",
			"name": "CentOSLinux",
			"kernel": "3.10.0-1062.9.1.el7.x86_64",
			"codename": "Core"
		},
		"id": "0ea734564f9a4e2881b866b82d679dfc",
		"name": "xx.xx.xx.xx",
		"containerized": false
	},
	"agent": {
		"name": "xx.xx.xx.xx",
		"type": "filebeat",
		"version": "7.14.0",
		"hostname": "xx.xx.xx.xx",
		"ephemeral_id": "6c0922a6-17af-4474-9e88-1fc3b1c3b1a9",
		"id": "6b23463c-0654-4f8b-83a9-84ec75721311"
	},
	"ecs": {
		"version": "1.10.0"
	},
	"log": {
		"offset": 2449931,
		"file": {
			"path": "/tmp/test.log"
		}
	},
	"message": "(B16root0-20000S0.00.00:00.00kworker/1:0H(B"
}

2. 创建流计算 Oceanus 作业

在 Oceanus 中,对 Kafka 接入的数据进行加工处理,并存入 Elasticsearch 中。

(1) 定义 Source

按照 Filebeat 中 json 消息的格式,构造 Flink Source。

代码语言:txt复制
 CREATE TABLE DataInput (
     `@timestamp` VARCHAR,
     `host`       ROW<id VARCHAR,ip ARRAY<VARCHAR>>,
     `log`        ROW<`offset` INTEGER,file ROW<path VARCHAR>>,
     `message`    VARCHAR
 ) WITH (
     'connector' = 'kafka',         -- 可选 'kafka','kafka-0.11'. 注意选择对应的内置  Connector
     'topic' = 'topic-app-info',    -- 替换为您要消费的 Topic
     'scan.startup.mode' = 'earliest-offset',            -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种
     'properties.bootstrap.servers' = '10.0.0.29:9092',  -- 替换为您的 Kafka 连接地址
     'properties.group.id' = 'oceanus_group2',           -- 必选参数, 一定要指定 Group ID
     -- 定义数据格式 (JSON 格式)
     'format' = 'json',
     'json.ignore-parse-errors' = 'true',     -- 忽略 JSON 结构解析异常
     'json.fail-on-missing-field' = 'false'   -- 如果设置为 true, 则遇到缺失字段会报错 设置为 false 则缺失字段设置为 null
 );
(2) 定义 Sink
代码语言:txt复制
CREATE TABLE es_output (
    `id`         VARCHAR,
    `ip`         ARRAY<VARCHAR>,
    `path`       VARCHAR,
    `num`        INTEGER,
    `message`    VARCHAR,
    `createTime` VARCHAR
) WITH (
    'connector.type' = 'elasticsearch',             -- 输出到 Elasticsearch
    'connector.version' = '6',                      -- 指定 Elasticsearch 的版本, 例如 '6', '7'. 
    'connector.hosts' = 'http://10.0.0.175:9200',   -- Elasticsearch 的连接地址
    'connector.index' = 'oceanus_test2',            -- Elasticsearch 的 Index 名
    'connector.document-type' = '_doc',             -- Elasticsearch 的 Document 类型
    'connector.username' = 'elastic',  
    'connector.password' = 'yourpassword', 
    'update-mode' = 'upsert',                       -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式     
    'connector.key-delimiter' = '$',                -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3)
    'connector.key-null-literal' = 'n/a',           -- 主键为 null 时的替代字符串,默认是 'null'
    'connector.failure-handler' = 'retry-rejected', -- 可选的错误处理。可选择 'fail' (抛出异常)、'ignore'(忽略任何错误)、'retry-rejected'(重试)

    'connector.flush-on-checkpoint' = 'true',       -- 可选参数, 快照时不允许批量写入(flush), 默认为 true
    'connector.bulk-flush.max-actions' = '42',      -- 可选参数, 每批次最多的条数
    'connector.bulk-flush.max-size' = '42 mb',      -- 可选参数, 每批次的累计最大大小 (只支持 mb)
    'connector.bulk-flush.interval' = '60000',      -- 可选参数, 批量写入的间隔 (ms)
    'connector.connection-max-retry-timeout' = '1000',    -- 每次请求的最大超时时间 (ms)                                                  
    'format.type' = 'json'                          -- 输出数据格式, 目前只支持 'json'
);
(3) 业务逻辑
代码语言:txt复制
INSERT INTO es_output
SELECT 
  host.id       AS `id`,
  host.ip       AS `ip`,
  log.file.path AS `path`,
  log.`offset`  AS `num`,
  message,
  `@timestamp`  AS `createTime`
from DataInput;
(4) 作业参数

【内置 connector】选择flink-connector-elasticsearch6flink-connector-kafka

注意: 新版 Flink 1.13 集群无需用户自己选择内置 Connector,旧版集群请用户选择对应版本的 Connector

(5) ES 数据查询

在 ES 控制台的 Kibana 页面查询数据,或者进入某台相同子网的 CVM 下,使用以下命令进行查询:

代码语言:txt复制
# 查询索引  username:password 请替换为实际账号密码
curl -XGET -u username:password http://xx.xx.xx.xx:xxxx/oceanus_test2/_search -H 'Content-Type: application/json' -d'
{
    "query": { "match_all": {}},
    "size":  10
}
'

更多访问方式请参考 访问 ES 集群。

3. 业务指标监控

通过 Filebeat 采集到应用业务数据,经过 Oceanus 服务的加工处理已经被存入 ES,可以通过 ES Grafana 来实现业务数据的监控。

(1) Grafana 配置 ES 数据源。进入灰度发布中的 Grafana 控制台,进入刚刚创建的 Grafana 服务,找到外网地址打开并登录,Grafana 账号为 admin,登录后点击【Configuration】,点击【Add Source】,搜索elasticsearch,填写相关 ES 实例信息,添加数据源。

添加数据源2.png添加数据源2.png

(2) 点击左侧【Dashboards】,点击【Manage】,点击右上角【New Dashboard】,新建面板,编辑面板。

编辑Dashboard.png编辑Dashboard.png

(3) 展现效果如下:

  • 总数据量写入实时监控:对写入数据源的总数据量进行监控;
  • 数据来源实时监控:对来源于某个特定 log 的数据写入量进行监控;
  • 字段平均值监控:对某个字段的平均值进行监控;
  • num 字段最大值监控:对 num 字段的最大值进行监控;
ES-个性化监控.pngES-个性化监控.png

注意:本处只做示例,无实际业务含义

总结

本方案中利用 Filebeat 工具实时采集 CVM 系统数据存入 CKafka,经过流计算 Oceanus进行数据的提取、清洗和转换后存储在 ES 中,最后利用 Grafana 对 ES 中的数据进行实时监控和展示。需要注意的是:

  • Ckafka的版本和开源版本Kafka并没有严格对应,方案中 CKafka 2.4.1 和开源 Filebeat-1.14.1 版本能够调试成功。
  • 云监控中的 Promethus 服务已经嵌入了 Grafana 监控服务。但不支持自定义数据源,该嵌入的 Grafana 只能接入 Promethus,需使用独立内测的 Grafana 才能完成 ES 数据接入 Grafana。

0 人点赞