当为应用程序的数据选择一个存储系统时,我们通常会选择一个最适合我们业务场景的存储系统。对于快速更新和实时分析工作较多的场景,我们可能希望使用Apache Kudu
,但是对于低成本的大规模可伸缩性场景,我们可能希望使用HDFS
。因此,需要一种解决方案使我们能够利用多个存储系统的最佳特性。本文介绍了如何使用Apache Impala
的滑动窗口模式,操作存储在Apache Kudu
和Apache HDFS
中的数据,使用此模式,我们可以以对用户透明的方式获得多个存储层的所有优点。
Apache Kudu
旨在快速分析、快速变化的数据。Kudu
提供快速插入/更新和高效列扫描的组合,以在单个存储层上实现多个实时分析工作负载。因此,Kudu
非常适合作为存储需要实时查询的数据的仓库。此外,Kudu
支持实时更新和删除行,以支持延迟到达的数据和数据更正。
Apache HDFS
旨在以低成本实现无限的可扩展性。它针对数据不可变的面向批处理的场景进行了优化,与Apache Parquet
文件格式配合使用时,可以以极高的吞吐量和效率访问结构化数据。
对于数据小且不断变化的情况,如维度表,通常将所有数据保存在Kudu
中。当数据符合Kudu
的扩展限制并且可以从Kudu
的特性中受益时,在Kudu
中保留大表是很常见的。如果数据量大,面向批处理且不太可能发生变化,则首选使用Parquet
格式将数据存储在HDFS
中。当我们需要利用两个存储层的优点时,滑动窗口模式是一个有用的解决方案。
滑动窗口模式
在此模式中,我们使用Impala
创建匹配的Kudu
表和Parquet
格式的HDFS
表。根据Kudu
和HDFS
表之间数据移动的频率,这些表按时间单位分区,通常使用每日、每月或每年分区。然后创建一个统一视图,并使用WHERE
子句定义边界,该边界分隔从Kudu
表中读取的数据以及从HDFS
表中读取的数据。定义的边界很重要,这样我们就可以在Kudu
和HDFS
之间移动数据,而不会将重复的记录暴露给视图。移动数据后,可以使用原子的ALTER VIEW
语句向前移动边界。
注意:此模式最适用于组织到范围分区(range partitions
)中的某些顺序数据,因为在此情况下,按时间滑动窗口和删除分区操作会非常有效。
该模式实现滑动时间窗口,其中可变数据存储在Kudu
中,不可变数据以HDFS
上的Parquet
格式存储。通过Impala
操作Kudu
和HDFS
来利用两种存储系统的优势:
- 流数据可立即查询
(Streaming data is immediately queryable)
- 可以对更晚到达的数据或手动更正进行更新
(Updates for late arriving data or manual corrections can be made)
- 存储在
HDFS
中的数据具有最佳大小,可提高性能并防止出现小文件(Data stored in HDFS is optimally sized increasing performance and preventing small files)
- 降低成本
(Reduced cost)
Impala
还支持S3和ADLS等云存储方式。此功能允许方便地访问远程管理的存储系统,可从任何位置访问,并与各种基于云的服务集成。由于这些数据是远程的,因此针对S3
数据的查询性能较差,使得S3
适合于保存仅偶尔查询的“冷”数据。通过创建第三个匹配表并向统一视图添加另一个边界,可以扩展此模式以将冷数据保存在云存储系统中。
注意:为简单起见,下面的示例中仅说明了Kudu
和HDFS
。
将数据从Kudu
移动到HDFS
的过程分为两个阶段。第一阶段是数据移动,第二阶段是元数据更改,最后定义一些定期自动运行的数据任务来辅助我们维护滑动窗口。
在第一阶段,将当前不可变数据从Kudu
复制到HDFS
。即使数据从Kudu
复制到HDFS
,视图中定义的边界也会阻止向用户显示重复数据。此步骤可以包括根据需要进行的任何验证和重试,以确保数据卸载(data offload)
成功。
在第二阶段,现在数据被安全地复制到HDFS
,需要更改元数据以对分区进行调整。这包括向前移动边界,为下一个时段添加新的Kudu
分区,以及删除旧的Kudu
分区。
实现步骤
为了实现滑动窗口模式,需要一些Impala
基础,下面介绍实现滑动窗口模式的基本步骤。
移动数据
只要我们使用每种存储格式定义匹配表,就可以通过Impala
在存储系统之间移动数据。为简洁起见,未描述创建Impala
表时可用的所有选项,可以参考Impala
的CREATE TABLE文档来查找创建Kudu
、HDFS
和云存储表的正确语法。下面列出了一些示例,其中包括滑动窗口模式。
创建表后,移动数据就像INSERT ... SELECT语句一样简单:
代码语言:javascript复制INSERT INTO table_foo SELECT * FROM table_bar;
SELECT语句的所有功能都可用于选择要移动的特定数据。
注意:如果将数据移动到Kudu
,可以使用UPSERT INTO
语句来处理重复键。
统一查询
在Impala
中查询来自多个表和数据源的数据也很简单。为简洁起见,未描述创建Impala
视图时可用的所有选项,可以参考Impala
的CREATE VIEW文档。
创建统一查询的视图就像使用两个SELECT
子句和UNION ALL
的CREATE VIEW
语句一样简单:
CREATE VIEW foo_view AS
SELECT col1, col2, col3 FROM foo_parquet
UNION ALL
SELECT col1, col2, col3 FROM foo_kudu;
警告:确保使用UNION ALL
而不是UNION
。UNION
关键字本身与UNION DISTINCT
相同,可能会对性能产生重大影响,可以在Impala UNION文档中找到更多信息。
SELECT
语句的所有功能都可用于公开每个基础表中的正确数据和列,使用WHERE
子句传递和下推任何需要特殊处理或转换的谓词非常重要。下面将在滑动窗口模式的讨论中进行更多示例。
此外,可以通过ALTER VIEW
语句更改视图,当与SELECT
语句结合使用时,这很有用,因为它可以用于原子地更新视图正在访问的数据。
示例
下面是使用滑动窗口模式来操作具有三个月活动可变的月度周期数据的实现示例,超过三个月的数据将使用Parquet
格式卸载到HDFS
。
创建Kudu表
首先,创建一个Kudu
表,该表将保存三个月的活动可变数据。该表由时间列分区,每个范围包含一个数据周期。拥有与时间周期匹配的分区很重要,因为删除Kudu
分区比通过DELETE
子句删除数据更有效。该表还由另一个键列进行散列分区,以确保所有数据都不会写入单个分区。
注意:模式设计(schema design)
应根据我们的数据和读/写性能考虑因素而有所不同。此示例模式仅用于演示目的,而不是“最佳”模式。有关选择模式的更多指导,请参考Kudu模式设计文档(schema design documentation)。例如,如果数据输入速率较低,则可能不需要任何散列分区,如果数据输入速率非常高,则可能需要更多散列桶。
CREATE TABLE my_table_kudu
(
name STRING,
time TIMESTAMP,
message STRING,
PRIMARY KEY(name, time)
)
PARTITION BY
HASH(name) PARTITIONS 4,
RANGE(time) (
PARTITION '2018-01-01' <= VALUES < '2018-02-01', --January
PARTITION '2018-02-01' <= VALUES < '2018-03-01', --February
PARTITION '2018-03-01' <= VALUES < '2018-04-01', --March
PARTITION '2018-04-01' <= VALUES < '2018-05-01' --April
)
STORED AS KUDU;
注意:有一个额外的月分区(2018-04-01至2018-05-01
)可以为数据提供一个时间缓冲区,以便将数据移动到不可变表中。
创建HDFS表
创建Parquet
格式的HDFS
表,该表将保存较旧的不可变数据。此表按年、月和日进行分区,以便进行有效访问,即使我们无法按时间列本身进行分区,这将在下面的视图步骤中进一步讨论。有关更多详细信息,请参考Impala的分区文档。
CREATE TABLE my_table_parquet
(
name STRING,
time TIMESTAMP,
message STRING
)
PARTITIONED BY (year int, month int, day int)
STORED AS PARQUET;
创建统一视图
现在创建统一视图,用于无缝地查询所有数据:
代码语言:javascript复制CREATE VIEW my_table_view AS
SELECT name, time, message
FROM my_table_kudu
WHERE time >= "2018-01-01"
UNION ALL
SELECT name, time, message
FROM my_table_parquet
WHERE time < "2018-01-01"
AND year = year(time)
AND month = month(time)
AND day = day(time);
每个SELECT
子句都明确列出要公开的所有列,这可确保不会公开Parquet
表所特有的年、月和日列。如果需要,它还允许处理任何必要的列或类型映射。
应用于my_table_kudu
和my_table_parquet
的初始WHERE
子句定义了Kudu
和HDFS
之间的边界,以确保在卸载数据的过程中不会读取重复数据。
应用于my_table_parquet
的附加AND
子句用于确保单个年、月和日列的良好谓词下推(good predicate pushdown)
。
警告:如前所述,请务必使用UNION ALL
而不是UNION
。UNION
关键字本身与UNION DISTINCT
相同,可能会对性能产生重大影响,可以在Impala UNION文档中找到更多信息。
创建定时任务
现在已创建基表和视图,接着创建定时任务以维护滑动窗口,下面定时任务中使用的SQL
文件可以接收从脚本和调度工具传递的变量。
创建window_data_move.sql
文件以将数据从最旧的分区移动到HDFS
:
INSERT INTO ${var:hdfs_table} PARTITION (year, month, day)
SELECT *, year(time), month(time), day(time)
FROM ${var:kudu_table}
WHERE time >= add_months("${var:new_boundary_time}", -1)
AND time < "${var:new_boundary_time}";
COMPUTE INCREMENTAL STATS ${var:hdfs_table};
注意:COMPUTE INCREMENTAL STATS子句不是必需的,但可帮助我们对Impala
查询进行优化。
要运行SQL
语句,请使用Impala shell
并传递所需的变量,示例如下:
impala-shell -i <impalad:port> -f window_data_move.sql
--var=kudu_table=my_table_kudu
--var=hdfs_table=my_table_parquet
--var=new_boundary_time="2018-02-01"
注意:可以调整WHERE
子句以匹配给定的数据周期和卸载的粒度,这里,add_months
函数的参数为-1
,用于从新的边界时间移动过去一个月的数据。
创建window_view_alter.sql
文件以通过更改统一视图来调整时间边界:
ALTER VIEW ${var:view_name} AS
SELECT name, time, message
FROM ${var:kudu_table}
WHERE time >= "${var:new_boundary_time}"
UNION ALL
SELECT name, time, message
FROM ${var:hdfs_table}
WHERE time < "${var:new_boundary_time}"
AND year = year(time)
AND month = month(time)
AND day = day(time);
要运行SQL
语句,请使用Impala shell
并传递所需的变量,示例如下:
impala-shell -i <impalad:port> -f window_view_alter.sql
--var=view_name=my_table_view
--var=kudu_table=my_table_kudu
--var=hdfs_table=my_table_parquet
--var=new_boundary_time="2018-02-01"
创建window_partition_shift.sql
文件以调整Kudu
分区:
ALTER TABLE ${var:kudu_table}
ADD RANGE PARTITION add_months("${var:new_boundary_time}",
${var:window_length}) <= VALUES < add_months("${var:new_boundary_time}",
${var:window_length} 1);
ALTER TABLE ${var:kudu_table}
DROP RANGE PARTITION add_months("${var:new_boundary_time}", -1)
<= VALUES < "${var:new_boundary_time}";
要运行SQL
语句,请使用Impala shell
并传递所需的变量,示例如下:
impala-shell -i <impalad:port> -f window_partition_shift.sql
--var=kudu_table=my_table_kudu
--var=new_boundary_time="2018-02-01"
--var=window_length=3
注意:应该定期在Kudu
表上运行COMPUTE STATS,以确保Impala
的查询性能最佳。
试验
我们已经创建了表、视图和脚本实现了滑动窗口模式,现在可以通过插入不同时间范围的数据并运行脚本来向前移动窗口来进行试验。
将一些示例值插入Kudu
表:
INSERT INTO my_table_kudu VALUES
('joey', '2018-01-01', 'hello'),
('ross', '2018-02-01', 'goodbye'),
('rachel', '2018-03-01', 'hi');
在每个表/视图中显示数据:
代码语言:javascript复制SELECT * FROM my_table_kudu;
SELECT * FROM my_table_parquet;
SELECT * FROM my_table_view;
将1月数据移动到HDFS
:
impala-shell -i <impalad:port> -f window_data_move.sql
--var=kudu_table=my_table_kudu
--var=hdfs_table=my_table_parquet
--var=new_boundary_time="2018-02-01"
确认数据在两个位置,但在视图中不重复:
代码语言:javascript复制SELECT * FROM my_table_kudu;
SELECT * FROM my_table_parquet;
SELECT * FROM my_table_view;
改变视图将时间边界向前移至2月:
代码语言:javascript复制impala-shell -i <impalad:port> -f window_view_alter.sql
--var=view_name=my_table_view
--var=kudu_table=my_table_kudu
--var=hdfs_table=my_table_parquet
--var=new_boundary_time="2018-02-01"
确认数据仍在两个位置,但在视图中不重复:
代码语言:javascript复制SELECT * FROM my_table_kudu;
SELECT * FROM my_table_parquet;
SELECT * FROM my_table_view;
调整Kudu
分区:
impala-shell -i <impalad:port> -f window_partition_shift.sql
--var=kudu_table=my_table_kudu
--var=new_boundary_time="2018-02-01"
--var=window_length=3
确认1月数据现在仅在HDFS
中:
SELECT * FROM my_table_kudu;
SELECT * FROM my_table_parquet;
SELECT * FROM my_table_view;
使用Impala
的EXPLAIN
语句确认谓词下推:
EXPLAIN SELECT * FROM my_table_view;
EXPLAIN SELECT * FROM my_table_view WHERE time < "2018-02-01";
EXPLAIN SELECT * FROM my_table_view WHERE time > "2018-02-01";
在explain
输出中,我们应该看到“kudu
谓词”,其中包括“SCAN KUDU
”部分中的时间列过滤器和“谓词”,其中包括“SCAN HDFS
”部分中的时间、日、月和年列。
参考资料:
- 实时性和完整性兼得,使用 Kudu 和 Impala 实现透明的分层存储管理
- 神策分析的技术选型与架构实现
- 大数据架构如何做到流批一体?
编译自:Transparent Hierarchical Storage Management with Apache Kudu and Impala