链路追踪 SkyWalking 源码分析 —— Collector Storage 存储组件

2019-05-24 12:33:46 浏览数 (1)

做积极的人,而不是积极废人!

摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-storage-module/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 SkyWalking 3.2.6 正式版

  • 1. 概述
  • 2. apm-collector-core
  • 3. collector-storage-define
  • 4. collector-storage-h2-provider
  • 5. collector-storage-es-provider

1. 概述

本文主要分享 SkyWalking Collector Storage 存储组件。顾名思义,负责将调用链路、应用、应用实例等等信息存储到存储器,例如,ES 、H2 。

友情提示:建议先阅读 《SkyWalking 源码分析 —— Collector 初始化》 ,以了解 Collector 组件体系。 FROM https://github.com/apache/incubating-skywalking

下面我们来看看整体的项目结构,如下图所示 :

  • apm-collector-coredatadefine :数据的抽象。
  • collector-storage-define :定义存储组件接口。
  • collector-storage-h2-provider :基于 H2 的 存储组件实现。该实现是单机版,建议仅用于 SkyWalking 快速上手,生产环境不建议使用
  • collector-storage-es-provider :基于 Elasticsearch 的集群管理实现。生产环境推荐使用

下面,我们从接口到实现的顺序进行分享。

2. apm-collector-core

apm-collector-coredatadefine ,如下图所示:

我们对类进行梳理分类,如下图:

  • Table :Data 和 TableDefine 之间的桥梁,每个 Table 定义了该表的表名字段名们
  • TableDefine :Table 的详细定义,包括表名字段定义( ColumnDefine )们。在下文中,StorageInstaller 会基于 TableDefine 初始化表的相关信息。
  • Data :数据,包括一条数据的数据值们和数据字段( Column )们。在下文中,Dao 会存储 Data 到存储器中。另外,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》 中,我们也会看到对 Data 的流式处理通用封装。

2.1 Table

org.skywalking.apm.collector.core.data.CommonTable ,通用表。

  • `TABLE_TYPE` 静态属性,表类型。目前只有 ES 存储组件使用到,下文详细解析。
  • `COLUMN_` 前缀的静态属性,通用的字段名。

collector-storage-definetable 下,我们可以看到所有 Table 类,以 "Table" 结尾。每个 Table 的表名,在每个实现类里,例如 ApplicationTable 。

2.2 TableDefine

org.skywalking.apm.collector.core.data.TableDefine ,表定义抽象类

  • `name` 属性,表名。
  • `columnDefines` 属性,ColumnDefine数组。
  • `#initialize()` 抽象方法,初始化表定义。例如:ApplicationEsTableDefine 。

不同的存储组件实现,有不同的 TableDefine 实现类,如下图:

  • ElasticSearchTableDefine :基于 Elasticsearch 的表定义抽象类,在 collector-storage-es-providerdefine 下,我们可以看到所有 ES 的 TableDefine 类。
  • H2TableDefine :基于 H2 的表定义抽象类,在 collector-storage-h2-provider 的 `define` 下,我们可以看到所有 H2 的 TableDefine 类。

2.2.1 ColumnDefine

org.skywalking.apm.collector.core.data.ColumnDefine ,字段定义抽象类

  • `name` 属性,字段名。
  • `type` 属性,字段类型。

collector-storage-xxx-provider 模块中,H2ColumnDefine 、ElasticSearchColumnDefine 实现 ColumnDefine 。

2.2.2 Loader

涉及到的类如下图所示:

org.skywalking.apm.collector.core.data.StorageDefineLoader ,调用 org.skywalking.apm.collector.core.define.DefinitionLoader ,从 org.skywalking.apm.collector.core.data.StorageDefinitionFile 中,加载 TableDefine 实现类数组。

另外,在 collector-storage-es-providercollector-storage-h2-provider 里都有 storage.define 文件,如下图:

  • StorageDefinitionFile 声明了读取该文件。
  • 注意,DefinitionLoader 在加载时,两个文件都会被读取,最终在 StorageInstaller#defineFilter(List) 方法,进行过滤。

代码比较简单,中文注释已加,胖友自己阅读理解下。

2.3 Data

org.skywalking.apm.collector.core.data.Data ,数据抽象类

  • [dataXXX]() 前缀的属性,字段值们。
    • `dataStrings` 属性的第一位,是 ID 属性。参见 构造方法的【第 51 行】 或者 `#setId(id)` 方法。
  • [xxxColumns]() 后缀的属性,字段( Column )们。
  • 通过上述两种属性 自身类,可以确定一条数据记录的表、字段类型、字段名、字段值。
  • 继承 `org.skywalking.apm.collector.core.data.EndOfBatchQueueMessage` ,带是否消息批处理的最后一条标记的消息抽象类,`endOfBatch` 属性,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「3. AggregationWorker」 详细解析。
    • 继承 `org.skywalking.apm.collector.core.data.AbstractHashMessage` ,带哈希码的消息抽象类,`hashCode` 属性,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「3. AggregationWorker」 详细解析。
  • `#mergeData(Data)` 方法,合并传入的数据到自身。该方法被 `AggregationWorker#aggregate(message)` 调用,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「3. AggregationWorker」 详细解析。

collector-storage-definetable 下,我们可以看到所有 Data 类, "Table" 结尾,例如 Application 。

2.3.1 Column

org.skywalking.apm.collector.core.data.Column ,字段。

  • `name` 属性,字段名。
  • `operation` 属性,操作( Operation )。

2.3.2 Operation

org.skywalking.apm.collector.core.data.Operation ,操作接口。用于两个值之间的操作,例如,相加等等。目前实现类有:

  • AddOperation :值相加操作。
  • CoverOperation :值覆盖操作,即以新值为返回。
  • NonOperation :空操作,即以老值为返回。

3. collector-storage-define

collector-cluster-define :定义存储组件接口。项目结构如下 :

3.1 StorageModule

org.skywalking.apm.collector.storage.StorageModule ,实现 Module 抽象类,集群管理 Module 。

#name() 实现方法,返回模块名为 "storage"

#services() 实现方法,返回 Service 类名:在 org.skywalking.apm.collector.storage.dao 下的所有类 和 IBatchDAO。

3.2 table 包

org.skywalking.apm.collector.storage.table 包下,定义了存储模块所有的 Table 和 Data 实现类。

3.3 StorageInstaller

org.skywalking.apm.collector.storage.StorageInstaller ,存储安装器抽象类,基于 TableDefine ,初始化存储组件的表。

  • `#defineFilter(List)` 抽象方法,过滤 TableDefine 数组中,非自身需要的。例如说,ElasticSearchStorageInstaller 过滤后,只保留 ElasticSearchTableDefine 对象。
  • `#isExists(Client, TableDefine)` 抽象方法,判断表是否存在。
  • `#deleteTable(Client, TableDefine)` 抽象方法,删除表。
  • `#createTable(Client, TableDefine)` 抽象方法,创建表。
  • `#install(Client)` 方法,基于 TableDefine ,初始化存储组件的表。
    • 该方法会被 StorageModuleH2Provider 或 StorageModuleEsProvider 启动时调用。

3.4 dao 包

collector-storage-define 项目结构图,我们看到一共有bao 包:

  • org.skywalking.apm.collector.storage.base.dao系统的 DAO 接口。
  • org.skywalking.apm.collector.storage.dao业务的 DAO 接口。
    • 继承系统的 DAO 接口。
    • 被 `collector-storage-xxx-provider` 的 `dao` 包实现

3.4.1 系统 DAO

org.skywalking.apm.collector.storage.base.dao.DAO ,继承 Service 接口,DAO 接口

无任何方法。

3.4.1.1 AbstractDAO

org.skywalking.apm.collector.storage.base.dao.AbstractDAO ,实现 DAO 接口,DAO 抽象基类。

  • `client` 属性,数据操作客户端。例如,H2Client 、ElasticSearchClient 。

collector-storage-xxx-provider 模块中,H2DAO 、EsDAO 实现 AbstractDAO 。

3.4.1.2 IPersistenceDAO

org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO ,实现 DAO 接口,持久化 DAO 接口,定义了 Data 的增删改查操作。

  • `#get(id)` 接口方法,根据 ID 查询一条 Data 。
  • `#deleteHistory(startTimestamp, endTimestamp)` 接口方法,删除时间范围内的 Data 们。
  • `#prepareBatchInsert(data)` 接口方法,准备批量插入操作对象。例如:`CpuMetricEsPersistenceDAO#prepareBatchInsert(CpuMetric)` 方法,返回的是 org.elasticsearch.action.index.IndexRequestBuilder 对象。注意:
    • 该方法不会发起具体的 DAO 操作,仅仅是创建插入操作对象,最终的执行在 `IBatchDAO#batchPersistence(List)`。
    • 该方法创建的是批量插入操作对象们中的一个。
  • `#prepareBatchUpdate(data)` 接口方法,准备批量更新操作对象。类似 #prepareBatchInsert(data)方法。
3.4.1.3 IBatchDAO

org.skywalking.apm.collector.storage.base.dao.IBatchDAO ,实现 DAO 接口,批量操作 DAO 接口

  • `#batchPersistence(List batchCollection)` 接口方法,通过执行批量操作对象数组,实现批量持久化数据。
    • `batchCollection` 方法参数,通过 `IPersistenceDAO#prepareBatchInsert` 或 `IPersistenceDAO#prepareBatchUpdate` 方法,生成每个操作数组元素。
    • 该方法会被 `PersistenceTimer#extractDataAndSave(…)` 或 `PersistenceWorker#onWork(…)` 方法调用,在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(二)》「4. PersistenceWorker」 详细解析。

collector-storage-xxx-provider 模块中,BatchH2DAO 、BatchEsDAO 实现 IBatchDAO 。

3.4.2 业务 DAO

StorageModule#services() 方法里,我们可以看到,业务 DAO 按照用途可以拆分成四种

  • Cache :缓存应用、应用实例、服务名
  • Register :注册应用、应用实例、服务名
  • Persistence :持久化,实际可以理解成批量持久化
  • UI :SkyWaling UI 查询使用。

那么整理如下:

Package

Data

Cache / Register

Persistence

UI

关联文章

register

Application

register

Instance

register

ServiceName

jvm

CpuMetric

jvm

CMetric

jvm

MemoryMetric

jvm

MemoryPoolMetric

global

GlobalTrace

instance

InstPerformance

node

NodeComponent

node

NodeMapping

noderef

NodeReference

segment

SegmentCost

segment

Segment

service

ServiceEntry

serviceref

ServiceReference

4. collector-storage-h2-provider

collector-storage-h2-provider ,基于 H2 的存储组件实现。项目结构如下 :

该实现是单机版,建议仅用于 SkyWalking 快速上手,生产环境不建议使用

由于生产环境主要使用 ES 的存储组件实现,所以本文暂不解析相关实现,感兴趣的胖友自己嗨起来。

5. collector-storage-es-provider

collector-storage-es-provider ,基于 ES 的存储组件实现。项目结构如下 :

实际使用时,通过 application.yml 配置如下:

JSON storage: elasticsearch: cluster_name: elasticsearch cluster_transport_sniffer: true cluster_nodes: 127.0.0.1:9300 index_shards_number: 2 index_replicas_number: 0 ttl: 7

  • 生产环境下,推荐 Elasticsearch 配置成集群。
  • cluster_namecluster_transport_sniffercluster_nodesindex_shards_numberindex_replicas_number 参数,Elasticsearch 相关参数。
  • ttl :保留 N 天内的数据。超过 N 天的数据,将被自动滚动删除。
    • 该功能目前版本暂未发布,需要等到 5.0 版本后。
  • 《部署集群collector》

5.1 StorageModuleEsProvider

org.skywalking.apm.collector.storage.es.StorageModuleEsProvider ,实现 ModuleProvider抽象类,基于 ES 的存储组件服务提供者。

#name() 实现方法,返回组件服务提供者名为 "elasticsearch"

module() 实现方法,返回组件类为 StorageModule 。

#requiredModules() 实现方法,返回依赖组件为 "cluster"


#prepare(Properties) 实现方法,执行准备阶段逻辑。

  • 第 71 至 75 行 :创建 `org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient` 对象。
  • 第 77 至 82 行 :创建 DAO 对象们,并调用 #registerServiceImplementation() 父类方法,注册到 services

#start() 实现方法,执行启动阶段逻辑。

  • 第 90 行 :调用 ElasticSearchClient#initialize() 方法,初始化 ZookeeperClient 。
  • 第 93 至 94 行 :创建 ElasticSearchStorageInstaller 对象,初始化存储组件的表。在 「5.2.4 ElasticSearchStorageInstaller」 详细解析。
  • 第 100 至 102 行 :创建 `org.skywalking.apm.collector.storage.es.StorageModuleEsRegistration` 对象,并注册信息到集群管理。在 《SkyWalking 源码分析 —— Collector Cluster 集群管理》 有详细解析。
  • 第 105 至 107 行 :创建 `org.skywalking.apm.collector.storage.es.StorageModuleEsNamingListener`对象,并注册信息到集群管理。在 《SkyWalking 源码分析 —— Collector Cluster 集群管理》 有详细解析。
  • 第 110 至 111 行 :创建 DataTTLKeeperTimer 对象。在 「5.4 DataTTLKeeperTimer」 详细解析。

#notifyAfterCompleted() 实现方法,执行启动完成逻辑。

  • 第 115 行 :调用 DataTTLKeeperTimer#start() 方法,启动 DataTTLKeeperTimer 。在本文 「5.4 DataTTLKeeperTimer」 详细解析。

5.2 define 包

collector-storage-es-provider 项目结构图,我们看到一共有define 包:

  • org.skywalking.apm.collector.storage.es.base.define系统的 TableDefine 抽象类。
  • org.skywalking.apm.collector.storage.es.define业务的 TableDefine 实现类。
    • 继承系统的 TableDefine 抽象类。

5.2.1 ElasticSearchTableDefine

org.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine ,实现 TableDefine 接口,基于 Elasticsearch 的表定义抽象类

  • `#type()` 方法,文档元数据 _type 字段,参见 《Elasticsearch学习笔记》「_type」 。
  • `#refreshInterval()` 抽象方法,文档索引刷新频率,参见 《Elasticsearch: 权威指南 » 基础入门 » 分片内部原理 » 近实时搜索》「refresh API」。

5.2.2 ElasticSearchColumnDefine

org.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine ,实现 ColumnDefine 抽象类,基于 ES 的字段定义。

  • Type 枚举类:枚举 ES 字段类型。

5.2.3 业务 TableDefine 实现类

org.apache.skywalking.apm.collector.storage.es.define 里,我们可以看到,所有基于 ES 的业务 TableDefine 实现类。例如:ApplicationEsTableDefine 。

整体 #refreshInterval() 方法返回的结果如下:

  • 1 s
    • CpuMetricEsTableDefine
    • GCMetricEsTableDefine
    • MemoryMetricEsTableDefine
    • MemoryPoolMetricEsTableDefine
  • 2 s
    • InstPerformanceEsTableDefine
    • NodeComponentEsTableDefine
    • NodeMappingEsTableDefine
    • NodeReferenceEsTableDefine
    • ServiceEntryEsTableDefine
    • ServiceReferenceEsTableDefine
  • 2 s && WriteRequest.RefreshPolicy.IMMEDIATE
    • 【WriteRequest.RefreshPolicy.IMMEDIATE】参见 `ApplicationEsRegisterDAO#save(Application)` 方法
    • ApplicationEsTableDefine
    • InstanceEsTableDefine
    • ServiceNameEsTableDefine
  • 5 s
    • GlobalTraceEsTableDefine
    • SegmentCostEsTableDefine
  • 10 s
    • SegmentEsTableDefine

5.2.4 ElasticSearchStorageInstaller

友情提示:ElasticSearchStorageInstaller 主要是对 Elasticsearch Java API 的使用,所以不熟悉的胖友,可以 Google 下。

org.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller,实现 StorageInstaller 抽象类, 基于 ES 存储安装器实现类。

  • `#defineFilter(List)` 实现方法,过滤数组中,非 ElasticSearchTableDefine 的元素。
  • `#createTable(Client, TableDefine)` 实现方法,创建 Elasticsearch 索引。
    • SkyWalking 彭勇升 :`_index`和 `_type` 是 ES 特有的,考虑其他数据库接入,所以没有用他这个特性。
    • SkyWalking QQ交流群( 392443393 ) ,小心 群友 :`_type` 本来就没做物理隔离,Lucene 层面也不存在,ES 6.x 已经废弃了。
    • 《Elasticsearch 6.0 将移除 Type》
    • `_id` :数据编号,String 类型。
    • `_type` :`"type"` 。
    • `_index` :TableDefine 定义的表名
    • `source` :Data 数据。
    • 文档数据结构如下:
    • 了解 Elasticsearch 的胖友可能有和笔者一样的疑惑,网络上很多文章把 `_index` 类比成关系数据库的 DB ,`_type` 类比成关系数据库的 Table ,和 SkyWalking 目前使用的方式不一致
  • `#deleteTable(Client, TableDefine)` 实现方法,删除 Elasticsearch 索引。
  • `#isExists(Client, TableDefine)` 实现方法,判断 Elasticsearch 索引是否存在。
  • 在方法里,笔者添加了一些 API 的说明,不熟悉的胖友,可以仔细阅读理解。

5.3 dao 包

collector-storage-es-provider 项目结构图,我们看到一共有dao 包:

  • org.skywalking.apm.collector.storage.es.base.dao系统的 DAO 抽象类。
  • org.skywalking.apm.collector.storage.es.dao业务的 DAO 实现类。
    • 继承系统的 DAO 抽象类。

5.3.1 EsDAO

org.skywalking.apm.collector.storage.es.base.dao.EsDAO ,实现 AbstractDAO 抽象类,基于 ES 的 DAO 抽象类

  • `#getMaxId(indexName, columnName)` 方法,获得索引名的指定字段的最大值
  • `#getMinId(indexName, columnName)` 方法,获得索引名的指定字段的最小值

5.3.2 BatchEsDAO

org.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO ,实现 IBatchDAO 接口,继承 EsDAO 抽象类,基于 ES 批量操作 DAO 实现类。

  • `#batchPersistence(List)` 实现方法,将 org.elasticsearch.action.index.IndexRequestBuilderorg.elasticsearch.action.index.UpdateRequestBuilder 数组,创建成 org.elasticsearch.action.bulk.BulkRequestBuilder 对象,批量持久化。
    • IndexRequestBuilder 和 UpdateRequestBuilder 的创建,在 「5.3.3 业务 DAO 实现类」 会看到。

5.3.3 业务 DAO 实现类

org.apache.skywalking.apm.collector.storage.es.dao 里,我们可以看到,所有基于 ES 的业务 DAO 实现类。

实现代码易懂,胖友可以自己阅读。良心如我们,按照 DAO 的业务用途,推荐例子如下:

  • Cache :ApplicationEsCacheDAO
  • Register :ApplicationEsRegisterDAO
  • Persistence :SegmentEsPersistenceDAO
    • 此处可见 IndexRequestBuilder 和 UpdateRequestBuilder 的创建。
  • UI :SegmentEsUIDAO

5.4 DataTTLKeeperTimer

org.skywalking.apm.collector.storage.es.DataTTLKeeperTimer ,过期数据删除定时器。通过该定时器,只保留 N 天内的数据。

  • `#start()` 方法,启动定时任务。
    • 第 49 行:创建延迟 1 小时,每 8 小时执行一次 `#delete()` 方法的定时任务。目前该行代码被注释,胖友可以等待 SkyWallking 5.0 版本的发布。
  • `#delete()` 方法,删除过期数据。
    • 第 54 至 66 行:计算删除的开始与结束时间,即指定时间的前一天。例如,2017-12-23 执行时,删除 2017-12-16 那天的数据。
    • 第 69 行:调用 `#deleteJVMRelatedData(startTimestamp, endTimestamp)` 方法,删除 JVM 相关的数据。
    • 第 70 行:调用 `#deleteTraceRelatedData(startTimestamp, endTimestamp)` 方法,删除 Trace 相关的数据。

如下是不会删除的数据的表:

  • Application
  • Instance
  • ServiceName
  • ServiceEntry

0 人点赞