1.1 添加Rollup
Rollup 可以理解为 Table 的一个物化索引结构。物化 是因为其数据在物理上独立存储,而 索引 的意思是,Rollup可以调整列顺序以增加前缀索引的命中率,也可以减少key列以增加数据的聚合度。
以下举例说明。
原表table1的Schema如下:
代码语言:javascript复制 ---------- ------------- ------ ------- --------- -------
| Field | Type | Null | Key | Default | Extra |
---------- ------------- ------ ------- --------- -------
| siteid | int(11) | No | true | 10 | |
| citycode | smallint(6) | No | true | N/A | |
| username | varchar(32) | No | true | | |
| pv | bigint(20) | No | false | 0 | SUM |
| uv | bigint(20) | No | false | 0 | SUM |
---------- ------------- ------ ------- --------- -------
对于 table1 明细数据是 siteid, citycode, username 三者构成一组 key,从而对 pv 字段进行聚合;如果业务方经常有看城市 pv 总量的需求,可以建立一个只有 citycode, pv 的rollup。
代码语言:javascript复制ALTER TABLE table1 ADD ROLLUP rollup_city(citycode, pv);
1.2 Broadcast/Shuffle Join
系统默认实现 Join 的方式,是将小表进行条件过滤后,将其广播到大表所在的各个节点上,形成一个内存 Hash 表,然后流式读出大表的数据进行Hash Join。但是如果当小表过滤后的数据量无法放入内存的话,此时 Join 将无法完成,通常的报错应该是首先造成内存超限。
如果遇到上述情况,建议显式指定 Shuffle Join,也被称作 Partitioned Join。即将小表和大表都按照 Join 的 key 进行 Hash,然后进行分布式的 Join。这个对内存的消耗就会分摊到集群的所有计算节点上。
Doris会自动尝试进行 Broadcast Join,如果预估小表过大则会自动切换至 Shuffle Join。注意,如果此时显式指定了 Broadcast Join 也会自动切换至 Shuffle Join。
使用 Broadcast Join(默认):
代码语言:javascript复制mysql> select sum(table1.pv) from table1 join table2 where table1.siteid = 2;
--------------------
| sum(`table1`.`pv`) |
--------------------
| 10 |
--------------------
1 row in set (0.20 sec)
使用 Broadcast Join(显式指定):
代码语言:javascript复制mysql> select sum(table1.pv) from table1 join [broadcast] table2 where table1.siteid = 2;
--------------------
| sum(`table1`.`pv`) |
--------------------
| 10 |
--------------------
1 row in set (0.20 sec)
使用 Shuffle Join:
代码语言:javascript复制mysql> select sum(table1.pv) from table1 join [shuffle] table2 where table1.siteid = 2;
--------------------
| sum(`table1`.`pv`) |
--------------------
| 10 |
--------------------
1 row in set (0.15 sec)
1.3 Colocation Join
1.3.1 名词解释
- FE:Frontend,Doris 的前端节点。负责元数据管理和请求接入。
- BE:Backend,Doris 的后端节点。负责查询执行和数据存储。
- Colocation Group(CG):一个 CG 中会包含一张及以上的 Table。在同一个 Group 内的 Table 有着相同的 Colocation Group Schema,并且有着相同的数据分片分布。
- Colocation Group Schema(CGS):用于描述一个 CG 中的 Table,和 Colocation 相关的通用 Schema 信息。包括分桶列类型,分桶数以及副本数等。
1.3.2 原理
doris 除了支持Broadcast/Shuffle Join 之外,Colocation Join更是一大特色。Colocation Join 功能,是将一组拥有相同 CGS 的 Table 组成一个 CG。并保证这些 Table 对应的数据分片会落在同一个 BE 节点上。使得当 CG 内的表进行分桶列上的 Join 操作时,可以通过直接进行本地数据 Join,减少数据在节点间的传输耗时。
为了使得 Table 能够有相同的数据分布,同一 CG 内的 Table 必须保证以下属性相同:
- 分桶列和分桶数 分桶列,即在建表语句中 DISTRIBUTED BY HASH(col1, col2, ...) 中指定的列。分桶列决定了一张表的数据通过哪些列的值进行 Hash 划分到不同的 Tablet 中。同一 CG 内的 Table 必须保证分桶列的类型和数量完全一致,并且桶数一致,才能保证多张表的数据分片能够一一对应的进行分布控制。
- 副本数 同一个 CG 内所有表的所有分区(Partition)的副本数必须一致。如果不一致,可能出现某一个 Tablet 的某一个副本,在同一个 BE 上没有其他的表分片的副本对应。
同一个 CG 内的表,分区的个数、范围以及分区列的类型不要求一致。
1.3.3 举例说明
代码语言:javascript复制CREATE TABLE `tbl1` (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
PARTITION p1 VALUES LESS THAN ('2019-05-31'),
PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
"colocate_with" = "group1"
);
代码语言:javascript复制CREATE TABLE `tbl2` (
`k1` datetime NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`) BUCKETS 8
PROPERTIES (
"colocate_with" = "group1"
);
查看查询计划,如果 Colocation Join 生效,则 Hash Join 节点会显示 colocate: true。
代码语言:javascript复制DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);
----------------------------------------------------
| Explain String |
----------------------------------------------------
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN |
| | hash predicates: |
| | colocate: true |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----1:OlapScanNode |
| | TABLE: tbl2 |
| | PREAGGREGATION: OFF. Reason: null |
| | partitions=0/1 |
| | rollup: null |
| | buckets=0/0 |
| | cardinality=-1 |
| | avgRowSize=0.0 |
| | numNodes=0 |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
----------------------------------------------------
1.4 动态分区
1.4.1 原理
在某些使用场景下,用户会将表按照天进行分区划分,每天定时执行例行任务,这时需要使用方手动管理分区,否则可能由于使用方没有创建分区导致数据导入失败,这给使用方带来了额外的维护成本。
在实现方式上, FE会启动一个后台线程,根据fe.conf中dynamic_partition_enable 及 dynamic_partition_check_interval_seconds参数决定该线程是否启动以及该线程的调度频率。每次调度时,会在注册表中读取动态分区表的属性,并根据动态分区属性动态添加及删除分区。
1.4.2 举例说明
建表时,可以在 PROPERTIES 中指定以下dynamic_partition属性,表示这个表是一个动态分区表。
代码语言:javascript复制CREATE TABLE example_db.dynamic_partition
(
k1 DATE,
k2 INT,
k3 SMALLINT,
v1 VARCHAR(2048),
v2 DATETIME DEFAULT "2014-02-04 15:36:00"
)
ENGINE=olap
DUPLICATE KEY(k1, k2, k3)
PARTITION BY RANGE (k1)
(
PARTITION p20200321 VALUES LESS THAN ("2020-03-22"),
PARTITION p20200322 VALUES LESS THAN ("2020-03-23"),
PARTITION p20200323 VALUES LESS THAN ("2020-03-24"),
PARTITION p20200324 VALUES LESS THAN ("2020-03-25")
)
DISTRIBUTED BY HASH(k2) BUCKETS 32
PROPERTIES(
"storage_medium" = "SSD",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-3",
"dynamic_partition.end" = "3",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "32"
);
创建一张动态分区表,指定开启动态分区特性,以当天为2020-03-25为例,在每次调度时,会删除分区上界小于 2020-03-22 的分区,为了避免删除非动态创建的分区,动态删除分区只会删除分区名符合动态创建分区规则的分区,例如分区名为a1, 则即使分区范围在待删除的分区范围内,也不会被删除。同时在调度时会提前创建今天以及以后3天(总共4天)的分区(若分区已存在则会忽略),分区名根据指定前缀分别为p20200325 p20200326p20200327 p20200328,每个分区的分桶数量为32。同时会删除 p20200321 的分区。
1.4.3 分区属性参数
dynamic_partition.enable: 是否开启动态分区特性,可指定为 TRUE 或 FALSE。如果不填写,默认为 TRUE。
dynamic_partition.time_unit: 动态分区调度的单位,可指定为 DAY WEEK MONTH,当指定为 DAY时,动态创建的分区名后缀格式为yyyyMMdd,例如20200325。当指定为 WEEK 时,动态创建的分区名后缀格式为yyyy_ww即当前日期属于这一年的第几周,例如 2020-03-25 创建的分区名后缀为 2020_13, 表明目前为2020年第13周。当指定为 MONTH 时,动态创建的分区名后缀格式为 yyyyMM,例如 202003。
dynamic_partition.start: 动态分区的开始时间, 以当天为基准,超过该时间范围的分区将会被删除。如果不填写,则默认为Integer.MIN_VALUE 即 -2147483648。
dynamic_partition.end: 动态分区的结束时间, 以当天为基准,会提前创建N个单位的分区范围。
dynamic_partition.prefix: 动态创建的分区名前缀。
dynamic_partition.buckets: 动态创建的分区所对应的分桶数量。
1.5 支持Bitmap
使用 Roaring Bitmap 数据结构,现场查询时的 IO,CPU,内存,网络资源会显著减少,并且不会随着数据规模线性增加。
代码语言:javascript复制CREATE TABLE `pv_bitmap` (
`dt` int,
`page` varchar(10),
`user_id` bitmap bitmap_union
)
AGGREGATE KEY(`dt`, page)
DISTRIBUTED BY HASH(`dt`) BUCKETS 2;
代码语言:javascript复制select bitmap_count(bitmap_union(user_id)) from pv_bitmap;
select bitmap_union_count(user_id) from pv_bitmap;
select bitmap_union_int(id) from pv_bitmap;
BITMAP_UNION(expr) : 计算两个 Bitmap 的并集,返回值是序列化后的 Bitmap 值
BITMAP_COUNT(expr) : 计算 Bitmap 的基数值
BITMAP_UNION_COUNT(expr): 和 BITMAP_COUNT(BITMAP_UNION(expr)) 等价
BITMAP_UNION_INT(expr) : 和 COUNT(DISTINCT expr) 等价 (仅支持 TINYINT,SMALLINT 和 INT)
1.6 物化视图
物化视图是将预先计算(根据定义好的 SELECT 语句)好的数据集,存储在 Doris 中的一个特殊的表。
物化视图的出现主要是为了满足用户,既能对原始明细数据的任意维度分析,也能快速的对固定维度进行分析查询。
在没有物化视图功能之前,用户一般都是使用 Rollup 功能通过预聚合方式提升查询效率的。但是 Rollup 具有一定的局限性,他不能基于明细模型做预聚合。
物化视图则在覆盖了 Rollup 的功能的同时,还能支持更丰富的聚合函数。所以物化视图其实是 Rollup 的一个超集。
也就是说,之前 ALTER TABLE ADD ROLLUP 语法支持的功能现在均可以通过 CREATE MATERIALIZED VIEW 实现。
代码语言:javascript复制create materialized view store_amt as
select store_id, sum(sale_amt) from sales_records group by store_id;