Impala基本原理

2023-11-21 15:10:19 浏览数 (2)

1 背景

Impala是Cloudera开源的实时查询项目,目标是基于统一的SQL快速查询各种存储系统,如HDFS、Kudu、HBase等。Impala原意为 高角羚 ,该项目的特点就是 快速 。Impala舍弃MapReduce,基于C 实现针对硬件做了很多的优化,支持数据本地性。

Impala跟其他的查询引擎系统(如presto、spark sql、hive sql)不同,Impala基于C 和Java编写,支持Hadoop生态下的多种组件集成(如HDFS、HBase、Metastore、YARN、Sentry等),支持多种文件格式的读写(如Parqeut、Avro、RCFile等)。

Impala 的优点:

Impala不需要把中间结果写入磁盘,省掉了大量的I/O开销。省掉了MapReduce作业启动的开销。MapReduce启动task的速度很慢(默认每个心跳间隔是3秒钟),Impala直接通过相应的服务进程来进行作业调度,速度快了很多。

Impala完全抛弃了MapReduce这个不太适合做SQL查询的范式,而是像Dremel一样借鉴了MPP并行数据库的思想另起炉灶,因此可做更多的查询优化,从而省掉不必要的shuffle、sort等开销。

通过使用LLVM来统一编译运行时代码,避免了为支持通用编译而带来的不必要开销。

用C 实现,做了很多有针对性的硬件优化,例如使用SSE指令。

使用了支持Data locality的I/O调度机制,尽可能地将数据和计算分配在同一台机器上进行,减少了网络开销。

2 组件角色

在Impala中有三种角色的组件:

Impalad:Impala的核心组件,用于sql的解析、任务分发、执行。

Statestore:检测节点是否故障,如果有故障,那么impalad在分发任务时会忽略该节点。

Catalog:同步元数据变化,正常每个impalad都缓存一份hive元数据,当数据变更时,通过该服务同步给所有的Imaplad。

3 Impala运行流程

1、 客户端提交任务: 客户端通过beeswax或者HiveServer2接口发送一个SQL查询请求到impalad节点,查询包括一条SQL和相关的configuration信息(只对本次查询生效),查询接口提供同步和异步的方式执行,两种接口都会返回一个queryId用于之后的客户端操作。

2、 查询解析和分析 :SQL提交到impalad节点之后交由FE模块处理,由Analyser依次执行SQL的词法分析、语法分析、语义分析、查询重写等操作,生成该SQL的Statement信息。

3、 单机执行计划生成: 根据上一步生成的Statement信息,由Planner生成单机的执行计划,该执行计划是有PlanNode组成的一棵树,这个过程中也会执行一些SQL优化,例如Join顺序改变、谓词下推等。

4、 分布式执行计划生成 :由Planner将单机执行计划转换成分布式并行物理执行计划,物理执行计划由一个个的Fragment组成,Fragment之间有数据依赖关系,处理过程中需要在原有的执行计划之上加入一些ExchangeNode和DataStreamSink信息等。

5、 任务调度和分发: 由BE处理生成的分布式物理执行计划,将Fragment根据数据分区信息发配到不同的Impalad节点上执行。Impalad节点接收到执行Fragment请求交由Backend模块处理Fragment的执行。

6、 子任务执行 :每一个Fragment的执行输出通过DataStreamSink发送到下一个Fragment,由下一个Fragment的ExchangeNode接收,Fragment运行过程中不断向coordinator节点汇报当前运行状态。

7、 结果汇总: 查询的SQL通常情况下需要有一个单独的Fragment用于结果的汇总,它只在coordinator节点运行,将多个backend的最终执行结果汇总,转换成ResultSet信息。

8、 客户端查询结果: 客户端调用获取ResultSet的接口,读取查询结果。

9、 关闭查询: 客户端调用CloseOperation关闭本次查询,标志着本次查询的结束。

4 架构设计

l Impalad

impalad是impala主要的工作计算进程,负责接收client的请求,变成协调者角色,然后解析查询请求,拆分成不同的任务分发给其他的Impalad节点进程。每个Impalad工作节点进程接收到请求后,开始执行本地查询(比如查询hdfs的datanode或者hbase的region

server),查询结果返回给协调者。协调者搜集到请求数据合并返回给client。

每个Impalad又包含三种角色,当接收到client的请求时,由planner解析查询sql,拆分成一个个可以并行的小任务;然后通过coordinator发送给其他的节点;其他的节点接收请求后,由excutor执行本地查询。

impalad 用于 接收查询请求并分解成查询任务、组织并完成集群中的数据查询、汇总完成数据的整合关联 。如果Impala用于调度查询请求的时候,一般会把它称为调度者(Coordinator)。在Impala中Impalad是对等的,也就是说每个进程内部的角色都一样,都可以作为调度者接收请求,这样即有助于容错,又可以做到负载均衡。 一般每个数据节点都要安装一个imapald进程,这样impala可以查询本机的数据,实现数据本地性,减少网络IO

l StatStore

状态管理进程,负责搜集各个节点的健康状况,当某个节点挂掉时,负责通知其他的节点不要往这个节点发送任务。statestore由于只负责状态通知,因此当这个进程挂掉并不影响impalad查询,只是可能会发送任务到挂掉的节点,集群的鲁棒性差一些而已。

statestored 进程用于 维护特定消息的发布订阅服务 ,用于在集群中广播一些消息。这个服务是单点的。

l Catalog

元数据变化同步进程,由于每个impalad都可以作为coordinator角色,那么当一个节点接收到数据变更,比如alter指令,其他的节点如何知晓呢?就可以通过catalog来同步,每个节点的变化都通知给catlog进程,它再同步给其他的节点,每个节点都维护一份最新的元数据信息,这样就不怕查询的数据不一致了。

catalogd ,它主要 负责维护元数据的读取查询 。当执行DDL操作时,会同步到catalog,然后通过statestore广播给其他的节点。

5 impala 操作

外部 shell

代码语言:javascript复制
```shell
impala-shell
-h 帮助
-v 版本
-V 详细输出
-queit 关闭详细输出
-p 显示执行计划
-i hostname 连接主机 (数据量较大时,可连接内存(128G)较大的主机执行)
-r 刷新所有元数据
-q query 从命令行执行,不进入impala-shell
-d default_db 指定数据库
-B 去格式化输出
--output_delimiter=character 指定分隔符
--print_header 打印列名
-f query_file 执行文件,逗号分隔
-o filename 输出到指定文件
-c 查询执行失败时继续执行
代码语言:javascript复制
help 查看帮助
connect <hostname:port> 链接到impalad的服务
refresh <tablename> 增量刷新元数据库
invalidate metadata 全量刷新元数据库
explain <sql> 显示查询执行计划、步骤信息
set explain_level 设置显示级别(0,1,2,3)
shell <shell> 不退出impala-shell执行Linux命令
profile(查询完成后执行) 查询最近一次查询的底层信息

  1. 创建数据库

create database db1;

删除数据库

代码语言:javascript复制
use default;
drop database db1;

创建表(内部表)

默认方式创建表:

代码语言:javascript复制
create table t_person1(
id int,
name string
);

指定存储方式:

代码语言:javascript复制
create table t_person2(
id int,
name string
)
row format delimited
fields terminated by '' (impala1.3.1版本以上支持'' )
stored as textfile;

其他方式创建内部表

使用现有表结构:

代码语言:javascript复制
create table tab_3 like tab_1;

指定文本表字段分隔符:

代码语言:javascript复制
alter table tab_3 set serdeproperties ('serialization.format'=',','field.delim'=',');

插入数据

直接插入值方式:

代码语言:javascript复制
insert into t_person values (1,hex(‘hello world’));

从其他表插入数据:

代码语言:javascript复制
insert (overwrite) into tab_3 select * from tab_2 ;

批量导入文件方式方式:

代码语言:javascript复制
load data local inpath ‘/xxx/xxx’ into table tab_1;

创建表(外部表)

默认方式创建表:

代码语言:javascript复制
create external table tab_p1(
id int,
name string
)
location '/user/xxx.txt';

指定存储方式:

代码语言:javascript复制
create external table tab_p2 like parquet_tab
'/user/xxx/xxx/1.dat'
partition (year int , month tinyint, day tinyint)
location '/user/xxx/xxx'
stored as parquet;

视图

创建视图:

代码语言:javascript复制
create view v1 as select count(id) as total from tab_3 ;

查询视图:

代码语言:javascript复制
select * from v1;

查看视图定义:

代码语言:javascript复制
describe formatted v1

添加分区方式

1、partitioned by 创建表时,添加该字段指定分区列表 createt table test (id int,name string) partitioned by (pt_d string);

2、使用alter table 进行分区的添加和删除操作 alter table test add partition (pt_d='20190424'); alter table test drop partition (pt_d='20190424'); alter table test drop partition (pt_d='20190424',pt_h='01');

分区内添加数据

insert into test partition (pt_d='20190424') values (1,'zhangsan'),(2,'lisi')

查询指定分区数据

代码语言:javascript复制
select id,name from test where pt_d='20190424';

DESCRIBE table_name格式的语句。如果要了解像数据文件位置,ROWFORMAT或 STORED AS对应的值这些详细的信息,可以使用 DESCRIBE FORMATTED table_name。可以看到表的注释信息。

compute stats统计表信息 (impala查看表统计信息)

analyze table 查看表统计信息 (hive 查看表统计信息)

加载数据:

1、insert语句:插入数据时每条数据产生一个数据文件,不建议用此方式加载批量数据

2、load data方式:在进行批量插入时使用这种方式比较合适

3、来自中间表:此种方式使用于从一个小文件较多的大表中读取文件并写入新的表生产少量的数据文件。也可以通过此种方式进行格式转换。

空值处理:

impala将“n”表示为NULL,在结合sqoop使用是注意做相应的空字段过滤,

也可以使用以下方式进行处理:

代码语言:javascript复制
alter table name set tblproperties("serialization.null.format"="null")

7 其他

HBase,Hive和Impala之间的比较

Impala对格式的支持

格式

支持操作

parquet

create table ,insert ,load data ,query

text

load data

avro

仅仅支持查询,在hive中通过load data加载数据

rcfile

仅仅支持查询,在hive中通过load data加载数据

sequencefile

仅仅支持查询,在hive中通过load data加载数据

我正在参与2023腾讯技术创作特训营第三期有奖征文,组队打卡瓜分大奖!

邀请人:岳涛,社区ID:7348459

0 人点赞