做积极的人,而不是积极废人!
摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-storage-module/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 SkyWalking 3.2.6 正式版
- 1. 概述
- 2. apm-collector-core
- 3. collector-storage-define
- 4. collector-storage-h2-provider
- 5. collector-storage-es-provider
1. 概述
本文主要分享 SkyWalking Collector Storage 存储组件。顾名思义,负责将调用链路、应用、应用实例等等信息存储到存储器,例如,ES 、H2 。
友情提示:建议先阅读 《SkyWalking 源码分析 —— Collector 初始化》 ,以了解 Collector 组件体系。 FROM https://github.com/apache/incubating-skywalking
下面我们来看看整体的项目结构,如下图所示 :
apm-collector-core
的data
和define
包 :数据的抽象。collector-storage-define
:定义存储组件接口。collector-storage-h2-provider
:基于 H2 的 存储组件实现。该实现是单机版,建议仅用于 SkyWalking 快速上手,生产环境不建议使用。collector-storage-es-provider
:基于 Elasticsearch 的集群管理实现。生产环境推荐使用。
下面,我们从接口到实现的顺序进行分享。
2. apm-collector-core
apm-collector-core
的 data
和 define
包,如下图所示:
我们对类进行梳理分类,如下图:
- Table :Data 和 TableDefine 之间的桥梁,每个 Table 定义了该表的表名,字段名们。
- TableDefine :Table 的详细定义,包括表名,字段定义( ColumnDefine )们。在下文中,StorageInstaller 会基于 TableDefine 初始化表的相关信息。
- Data :数据,包括一条数据的数据值们和数据字段( Column )们。在下文中,Dao 会存储 Data 到存储器中。另外,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》 中,我们也会看到对 Data 的流式处理通用封装。
2.1 Table
org.skywalking.apm.collector.core.data.CommonTable
,通用表。
- `TABLE_TYPE` 静态属性,表类型。目前只有 ES 存储组件使用到,下文详细解析。
- `COLUMN_` 前缀的静态属性,通用的字段名。
在 collector-storage-define
的 table
包下,我们可以看到所有 Table 类,以 "Table"
结尾。每个 Table 的表名,在每个实现类里,例如 ApplicationTable 。
2.2 TableDefine
org.skywalking.apm.collector.core.data.TableDefine
,表定义抽象类。
- `name` 属性,表名。
- `columnDefines` 属性,ColumnDefine数组。
- `#initialize()` 抽象方法,初始化表定义。例如:ApplicationEsTableDefine 。
不同的存储组件实现,有不同的 TableDefine 实现类,如下图:
- ElasticSearchTableDefine :基于 Elasticsearch 的表定义抽象类,在
collector-storage-es-provider
的define
包下,我们可以看到所有 ES 的 TableDefine 类。 - H2TableDefine :基于 H2 的表定义抽象类,在
collector-storage-h2-provider
的 `define` 包下,我们可以看到所有 H2 的 TableDefine 类。
2.2.1 ColumnDefine
org.skywalking.apm.collector.core.data.ColumnDefine
,字段定义抽象类。
- `name` 属性,字段名。
- `type` 属性,字段类型。
在 collector-storage-xxx-provider
模块中,H2ColumnDefine 、ElasticSearchColumnDefine 实现 ColumnDefine 。
2.2.2 Loader
涉及到的类如下图所示:
org.skywalking.apm.collector.core.data.StorageDefineLoader
,调用 org.skywalking.apm.collector.core.define.DefinitionLoader
,从 org.skywalking.apm.collector.core.data.StorageDefinitionFile
中,加载 TableDefine 实现类数组。
另外,在 collector-storage-es-provider
和 collector-storage-h2-provider
里都有 storage.define
文件,如下图:
- StorageDefinitionFile 声明了读取该文件。
- 注意,DefinitionLoader 在加载时,两个文件都会被读取,最终在
StorageInstaller#defineFilter(List)
方法,进行过滤。
代码比较简单,中文注释已加,胖友自己阅读理解下。
2.3 Data
org.skywalking.apm.collector.core.data.Data
,数据抽象类。
- [
dataXXX
]() 前缀的属性,字段值们。- `dataStrings` 属性的第一位,是 ID 属性。参见 构造方法的【第 51 行】 或者 `#setId(id)` 方法。
- [
xxxColumns
]() 后缀的属性,字段( Column )们。 - 通过上述两种属性 自身类,可以确定一条数据记录的表、字段类型、字段名、字段值。
- 继承 `org.skywalking.apm.collector.core.data.EndOfBatchQueueMessage` ,带是否消息批处理的最后一条标记的消息抽象类,`endOfBatch` 属性,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「3. AggregationWorker」 详细解析。
- 继承 `org.skywalking.apm.collector.core.data.AbstractHashMessage` ,带哈希码的消息抽象类,`hashCode` 属性,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「3. AggregationWorker」 详细解析。
- `#mergeData(Data)` 方法,合并传入的数据到自身。该方法被 `AggregationWorker#aggregate(message)` 调用,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「3. AggregationWorker」 详细解析。
在 collector-storage-define
的 table
包下,我们可以看到所有 Data 类,非 "Table"
结尾,例如 Application 。
2.3.1 Column
org.skywalking.apm.collector.core.data.Column
,字段。
- `name` 属性,字段名。
- `operation` 属性,操作( Operation )。
2.3.2 Operation
org.skywalking.apm.collector.core.data.Operation
,操作接口。用于两个值之间的操作,例如,相加等等。目前实现类有:
- AddOperation :值相加操作。
- CoverOperation :值覆盖操作,即以新值为返回。
- NonOperation :空操作,即以老值为返回。
3. collector-storage-define
collector-cluster-define
:定义存储组件接口。项目结构如下 :
3.1 StorageModule
org.skywalking.apm.collector.storage.StorageModule
,实现 Module 抽象类,集群管理 Module 。
#name()
实现方法,返回模块名为 "storage"
。
#services()
实现方法,返回 Service 类名:在 org.skywalking.apm.collector.storage.dao 包下的所有类 和 IBatchDAO。
3.2 table 包
在 org.skywalking.apm.collector.storage.table
包下,定义了存储模块所有的 Table 和 Data 实现类。
3.3 StorageInstaller
org.skywalking.apm.collector.storage.StorageInstaller
,存储安装器抽象类,基于 TableDefine ,初始化存储组件的表。
- `#defineFilter(List)` 抽象方法,过滤 TableDefine 数组中,非自身需要的。例如说,ElasticSearchStorageInstaller 过滤后,只保留 ElasticSearchTableDefine 对象。
- `#isExists(Client, TableDefine)` 抽象方法,判断表是否存在。
- `#deleteTable(Client, TableDefine)` 抽象方法,删除表。
- `#createTable(Client, TableDefine)` 抽象方法,创建表。
- `#install(Client)` 方法,基于 TableDefine ,初始化存储组件的表。
- 该方法会被 StorageModuleH2Provider 或 StorageModuleEsProvider 启动时调用。
3.4 dao 包
在 collector-storage-define
项目结构图,我们看到一共有两个 bao
包:
org.skywalking.apm.collector.storage.base.dao
,系统的 DAO 接口。org.skywalking.apm.collector.storage.dao
,业务的 DAO 接口。- 继承系统的 DAO 接口。
- 被 `collector-storage-xxx-provider` 的 `dao` 包实现。
3.4.1 系统 DAO
org.skywalking.apm.collector.storage.base.dao.DAO
,继承 Service 接口,DAO 接口。
无任何方法。
3.4.1.1 AbstractDAO
org.skywalking.apm.collector.storage.base.dao.AbstractDAO
,实现 DAO 接口,DAO 抽象基类。
- `client` 属性,数据操作客户端。例如,H2Client 、ElasticSearchClient 。
在 collector-storage-xxx-provider
模块中,H2DAO 、EsDAO 实现 AbstractDAO 。
3.4.1.2 IPersistenceDAO
org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO
,实现 DAO 接口,持久化 DAO 接口,定义了 Data 的增删改查操作。
- `#get(id)` 接口方法,根据 ID 查询一条 Data 。
- `#deleteHistory(startTimestamp, endTimestamp)` 接口方法,删除时间范围内的 Data 们。
- `#prepareBatchInsert(data)` 接口方法,准备批量插入操作对象。例如:`CpuMetricEsPersistenceDAO#prepareBatchInsert(CpuMetric)` 方法,返回的是
org.elasticsearch.action.index.IndexRequestBuilder
对象。注意:- 该方法不会发起具体的 DAO 操作,仅仅是创建插入操作对象,最终的执行在 `IBatchDAO#batchPersistence(List)`。
- 该方法创建的是批量插入操作对象们中的一个。
- `#prepareBatchUpdate(data)` 接口方法,准备批量更新操作对象。类似
#prepareBatchInsert(data)
方法。
3.4.1.3 IBatchDAO
org.skywalking.apm.collector.storage.base.dao.IBatchDAO
,实现 DAO 接口,批量操作 DAO 接口。
- `#batchPersistence(List batchCollection)` 接口方法,通过执行批量操作对象数组,实现批量持久化数据。
- `batchCollection` 方法参数,通过 `IPersistenceDAO#prepareBatchInsert` 或 `IPersistenceDAO#prepareBatchUpdate` 方法,生成每个操作数组元素。
- 该方法会被 `PersistenceTimer#extractDataAndSave(…)` 或 `PersistenceWorker#onWork(…)` 方法调用,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「4. PersistenceWorker」 详细解析。
在 collector-storage-xxx-provider
模块中,BatchH2DAO 、BatchEsDAO 实现 IBatchDAO 。
3.4.2 业务 DAO
在 StorageModule#services()
方法里,我们可以看到,业务 DAO 按照用途可以拆分成四种:
- Cache :缓存应用、应用实例、服务名
- Register :注册应用、应用实例、服务名
- Persistence :持久化,实际可以理解成批量持久化
- UI :SkyWaling UI 查询使用。
那么整理如下:
Package | Data | Cache / Register | Persistence | UI | 关联文章 |
---|---|---|---|---|---|
register | Application | √ | |||
register | Instance | √ | √ | √ | |
register | ServiceName | √ | |||
jvm | CpuMetric | √ | √ | ||
jvm | CMetric | √ | √ | ||
jvm | MemoryMetric | √ | √ | ||
jvm | MemoryPoolMetric | √ | √ | ||
global | GlobalTrace | √ | √ | ||
instance | InstPerformance | √ | √ | ||
node | NodeComponent | √ | √ | ||
node | NodeMapping | √ | √ | ||
noderef | NodeReference | √ | √ | ||
segment | SegmentCost | √ | √ | ||
segment | Segment | √ | √ | ||
service | ServiceEntry | √ | √ | ||
serviceref | ServiceReference | √ | √ |
4. collector-storage-h2-provider
collector-storage-h2-provider
,基于 H2 的存储组件实现。项目结构如下 :
该实现是单机版,建议仅用于 SkyWalking 快速上手,生产环境不建议使用。
由于生产环境主要使用 ES 的存储组件实现,所以本文暂不解析相关实现,感兴趣的胖友自己嗨起来。
5. collector-storage-es-provider
collector-storage-es-provider
,基于 ES 的存储组件实现。项目结构如下 :
实际使用时,通过 application.yml
配置如下:
JSON storage: elasticsearch: cluster_name: elasticsearch cluster_transport_sniffer: true cluster_nodes: 127.0.0.1:9300 index_shards_number: 2 index_replicas_number: 0 ttl: 7
- 生产环境下,推荐 Elasticsearch 配置成集群。
cluster_name
、cluster_transport_sniffer
、cluster_nodes
、index_shards_number
、index_replicas_number
参数,Elasticsearch 相关参数。ttl
:保留 N 天内的数据。超过 N 天的数据,将被自动滚动删除。- 该功能目前版本暂未发布,需要等到 5.0 版本后。
- 《部署集群collector》
5.1 StorageModuleEsProvider
org.skywalking.apm.collector.storage.es.StorageModuleEsProvider
,实现 ModuleProvider抽象类,基于 ES 的存储组件服务提供者。
#name()
实现方法,返回组件服务提供者名为 "elasticsearch"
。
module()
实现方法,返回组件类为 StorageModule 。
#requiredModules()
实现方法,返回依赖组件为 "cluster"
。
#prepare(Properties)
实现方法,执行准备阶段逻辑。
- 第 71 至 75 行 :创建 `org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient` 对象。
- 第 77 至 82 行 :创建 DAO 对象们,并调用
#registerServiceImplementation()
父类方法,注册到services
。
#start()
实现方法,执行启动阶段逻辑。
- 第 90 行 :调用
ElasticSearchClient#initialize()
方法,初始化 ZookeeperClient 。 - 第 93 至 94 行 :创建 ElasticSearchStorageInstaller 对象,初始化存储组件的表。在 「5.2.4 ElasticSearchStorageInstaller」 详细解析。
- 第 100 至 102 行 :创建 `org.skywalking.apm.collector.storage.es.StorageModuleEsRegistration` 对象,并注册信息到集群管理。在 《SkyWalking 源码分析 —— Collector Cluster 集群管理》 有详细解析。
- 第 105 至 107 行 :创建 `org.skywalking.apm.collector.storage.es.StorageModuleEsNamingListener`对象,并注册信息到集群管理。在 《SkyWalking 源码分析 —— Collector Cluster 集群管理》 有详细解析。
- 第 110 至 111 行 :创建 DataTTLKeeperTimer 对象。在 「5.4 DataTTLKeeperTimer」 详细解析。
#notifyAfterCompleted()
实现方法,执行启动完成逻辑。
- 第 115 行 :调用
DataTTLKeeperTimer#start()
方法,启动 DataTTLKeeperTimer 。在本文 「5.4 DataTTLKeeperTimer」 详细解析。
5.2 define 包
在 collector-storage-es-provider
项目结构图,我们看到一共有两个 define
包:
org.skywalking.apm.collector.storage.es.base.define
,系统的 TableDefine 抽象类。org.skywalking.apm.collector.storage.es.define
,业务的 TableDefine 实现类。- 继承系统的 TableDefine 抽象类。
5.2.1 ElasticSearchTableDefine
org.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine
,实现 TableDefine 接口,基于 Elasticsearch 的表定义抽象类。
- `#type()` 方法,文档元数据
_type
字段,参见 《Elasticsearch学习笔记》「_type」 。 - `#refreshInterval()` 抽象方法,文档索引刷新频率,参见 《Elasticsearch: 权威指南 » 基础入门 » 分片内部原理 » 近实时搜索》「refresh API」。
5.2.2 ElasticSearchColumnDefine
org.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine
,实现 ColumnDefine 抽象类,基于 ES 的字段定义。
- Type 枚举类:枚举 ES 字段类型。
5.2.3 业务 TableDefine 实现类
在 org.apache.skywalking.apm.collector.storage.es.define
包里,我们可以看到,所有基于 ES 的业务 TableDefine 实现类。例如:ApplicationEsTableDefine 。
整体 #refreshInterval()
方法返回的结果如下:
- 1 s
- CpuMetricEsTableDefine
- GCMetricEsTableDefine
- MemoryMetricEsTableDefine
- MemoryPoolMetricEsTableDefine
- 2 s
- InstPerformanceEsTableDefine
- NodeComponentEsTableDefine
- NodeMappingEsTableDefine
- NodeReferenceEsTableDefine
- ServiceEntryEsTableDefine
- ServiceReferenceEsTableDefine
- 2 s && WriteRequest.RefreshPolicy.IMMEDIATE
- 【WriteRequest.RefreshPolicy.IMMEDIATE】参见 `ApplicationEsRegisterDAO#save(Application)` 方法
- ApplicationEsTableDefine
- InstanceEsTableDefine
- ServiceNameEsTableDefine
- 5 s
- GlobalTraceEsTableDefine
- SegmentCostEsTableDefine
- 10 s
- SegmentEsTableDefine
5.2.4 ElasticSearchStorageInstaller
友情提示:ElasticSearchStorageInstaller 主要是对 Elasticsearch Java API 的使用,所以不熟悉的胖友,可以 Google 下。
org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller
,实现 StorageInstaller 抽象类, 基于 ES 存储安装器实现类。
- `#defineFilter(List)` 实现方法,过滤数组中,非 ElasticSearchTableDefine 的元素。
- `#createTable(Client, TableDefine)` 实现方法,创建 Elasticsearch 索引。
- SkyWalking 彭勇升 :`_index`和 `_type` 是 ES 特有的,考虑其他数据库接入,所以没有用他这个特性。
- SkyWalking QQ交流群( 392443393 ) ,小心 群友 :`_type` 本来就没做物理隔离,Lucene 层面也不存在,ES 6.x 已经废弃了。
- 《Elasticsearch 6.0 将移除 Type》
- `_id` :数据编号,String 类型。
- `_type` :`"type"` 。
- `_index` :TableDefine 定义的表名。
- `source` :Data 数据。
- 文档数据结构如下:
- 了解 Elasticsearch 的胖友可能有和笔者一样的疑惑,网络上很多文章把 `_index` 类比成关系数据库的 DB ,`_type` 类比成关系数据库的 Table ,和 SkyWalking 目前使用的方式不一致?
- `#deleteTable(Client, TableDefine)` 实现方法,删除 Elasticsearch 索引。
- `#isExists(Client, TableDefine)` 实现方法,判断 Elasticsearch 索引是否存在。
- 在方法里,笔者添加了一些 API 的说明,不熟悉的胖友,可以仔细阅读理解。
5.3 dao 包
在 collector-storage-es-provider
项目结构图,我们看到一共有两个 dao
包:
org.skywalking.apm.collector.storage.es.base.dao
,系统的 DAO 抽象类。org.skywalking.apm.collector.storage.es.dao
,业务的 DAO 实现类。- 继承系统的 DAO 抽象类。
5.3.1 EsDAO
org.skywalking.apm.collector.storage.es.base.dao.EsDAO
,实现 AbstractDAO 抽象类,基于 ES 的 DAO 抽象类。
- `#getMaxId(indexName, columnName)` 方法,获得索引名的指定字段的最大值。
- `#getMinId(indexName, columnName)` 方法,获得索引名的指定字段的最小值。
5.3.2 BatchEsDAO
org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO
,实现 IBatchDAO 接口,继承 EsDAO 抽象类,基于 ES 批量操作 DAO 实现类。
- `#batchPersistence(List)` 实现方法,将
org.elasticsearch.action.index.IndexRequestBuilder
和org.elasticsearch.action.index.UpdateRequestBuilder
数组,创建成org.elasticsearch.action.bulk.BulkRequestBuilder
对象,批量持久化。- IndexRequestBuilder 和 UpdateRequestBuilder 的创建,在 「5.3.3 业务 DAO 实现类」 会看到。
5.3.3 业务 DAO 实现类
在 org.apache.skywalking.apm.collector.storage.es.dao
包里,我们可以看到,所有基于 ES 的业务 DAO 实现类。
实现代码易懂,胖友可以自己阅读。良心如我们,按照 DAO 的业务用途,推荐例子如下:
- Cache :ApplicationEsCacheDAO
- Register :ApplicationEsRegisterDAO
- Persistence :SegmentEsPersistenceDAO
- 此处可见 IndexRequestBuilder 和 UpdateRequestBuilder 的创建。
- UI :SegmentEsUIDAO
5.4 DataTTLKeeperTimer
org.skywalking.apm.collector.storage.es.DataTTLKeeperTimer
,过期数据删除定时器。通过该定时器,只保留 N 天内的数据。
- `#start()` 方法,启动定时任务。
- 第 49 行:创建延迟 1 小时,每 8 小时执行一次 `#delete()` 方法的定时任务。目前该行代码被注释,胖友可以等待 SkyWallking 5.0 版本的发布。
- `#delete()` 方法,删除过期数据。
- 第 54 至 66 行:计算删除的开始与结束时间,即指定时间的前一天。例如,2017-12-23 执行时,删除 2017-12-16 那天的数据。
- 第 69 行:调用 `#deleteJVMRelatedData(startTimestamp, endTimestamp)` 方法,删除 JVM 相关的数据。
- 第 70 行:调用 `#deleteTraceRelatedData(startTimestamp, endTimestamp)` 方法,删除 Trace 相关的数据。
如下是不会删除的数据的表:
- Application
- Instance
- ServiceName
- ServiceEntry