文章大纲
大数据领域,SQL 的重要性无需多言,甚至称得上是“万物皆可 SQL 化”。
不管是做平台的,还是做应用的,都免不了跟 SQL 打交道。一句“SQL Boy”,虽然是大家的自嘲,但也能说明大数据工程师们跟 SQL 的关系之紧密。
从 Hive/Spark SQL 等最原始、最普及的 SQL 查询引擎,到 Kylin/ClickHouse 等 OLAP 引擎,再到流式的 Flink SQL/Kafka SQL,大数据的各条技术栈,都在或多或少地往 SQL 方向靠拢。
可以说,缺乏对 SQL 的支持会让自身的技术架构逊色不少,同时也会影响使用的便利性。
而在所有这些引擎中,Spark SQL 对 SQL 的优化是做得最深、最好的!
1
Spark SQL 是什么
Spark SQL 是 Spark 中用来处理结构化数据的一个模块,它提供了一个编程抽象(DataFrame),并且可以作为分布式 SQL 的查询引擎。
Spark SQL 可以将数据的计算任务通过 SQL 的形式转换成 RDD再提交到集群执行计算,类似于 Hive 通过 SQL 的形式将数据的计算任务转换成 MapReduce,大大简化了编写 Spark 数据计算操作程序的复杂性,且执行效率比 MapReduce 这种计算模型高。
在 Spark 中,Spark SQL 并不仅仅是狭隘的 SQL,而是作为 Spark 程序优化、执行的核心组件。
流计算、机器学习、图计算、深度学习等应用都可以转化为 DataFrame/Dataset 的 API。这些 API 和通常的 SQL 一样,共享优化层、执行层,共享访问多种数据源的能力。
可以说,Spark SQL 是让 Spark 应用程序拥有高效性、高可容错性和丰富生态的“幕后英雄”。
2
Spark SQL 特点
Spark SQL 有以下特点:
- 和 Spark Core 的无缝集成,可以在写整个 RDD 应用程序时,配置 Spark SQL 来完成逻辑实现。
- 统一的数据访问方式,Spark SQL 提供标准化的 SQL 查询。
- Hive 的继承,Spark SQL 通过内嵌的 Hive 或者连接外部已经部署好的 Hive 案例,实现了对 Hive 语法的继承和操作。
- 标准化的连接方式,Spark SQL 可以通过启动 Thrift Server 来支持 JDBC、ODBC 的访问,将自己作为一个 BI Server 使用。
3
Spark SQL 运行原理
在了解 Spark SQL 的运行原理前,我们需要先认识 Spark SQL 的架构:
3.1
Spark SQL 架构
Spark SQL 由 Core,Catalyst,Hive 和 Hive-Thriftserver 共 4 个部分组成。
Spark SQL 架构
- Core:负责处理数据的输入/输出,从不同的数据源获取数据(如 RDD,HDFS,Parquet 文件和 JSON 文件等),然后将查询结果输出成 Data Frame。
- Catalyst:负责处理查询语句的整个执行过程,包括解析、绑定、优化、生成物理计划等。
- Hive:负责对 Hive 数据的处理。
- Hive-Thriftserver:提供 Client 和 JDBC/ODBC 等接口。
Spark SQL 核心:Catalyst 查询编译器
Spark SQL 的核心是一个叫做 Catalyst 的查询编译器,它将用户程序中的 SQL/DataFrame/Dataset 经过一系列的操作,最终转化为 Spark 系统中执行的 RDD。
Catalyst 查询编译器组成
Catalyst 有以下几个重要的组成部分:
1. Parser
将 SQL/DataFrame/Dataset 转化成一棵未经解析(Unresolved)的树,在 Spark 中称为逻辑计划(Logical Plan),它是用户程序的一种抽象。
兼容 ANSI SQL 2003 标准和 HiveQL。
2. Analyzer
利用目录(Catalog)中的信息,对 Parser 中生成的树进行解析。
Analyzer 有一系列规则(Rule)组成,每个规则负责某项检查或者转换操作,如解析 SQL 中的表名、列名,同时判断它们是否存在。
通过 Analyzer,我们可以得到解析后的逻辑计划。
3. Optimizer
对解析完的逻辑计划进行树结构的优化,以获得更高的执行效率。
优化过程也是通过一系列的规则来完成,常用的规则如谓词下推(Predicate Pushdown)、列裁剪(Column Pruning)、连接重排序(Join Reordering)等。
此外,Spark SQL 中还有一个基于成本的优化器(Cost-based Optimizer),是由 DLI 内部开发并贡献给开源社区的重要组件。该优化器可以基于数据分布情况,自动生成最优的计划。
4. Planner
将优化后的逻辑计划转化成物理执行计划(Physical Plan)。
由一系列的策略(Strategy)组成,每个策略将某个逻辑算子转化成对应的物理执行算子,并最终变成 RDD 的具体操作。
注意在转化过程中,一个逻辑算子可能对应多个物理算子的实现,如 join 可以实现成 SortMergeJoin 或者 BroadcastHashJoin,这时候需要基于成本模型(Cost Model)来选择较优的算子。
上面提到的基于成本的优化器在这个选择过程中也能起到关键的作用。
整个 Catalyst 框架拥有良好的可扩展性,开发者可以根据不同的需求,灵活地添加自己的语法、解析规则、优化规则和转换策略。
3.2
基本 SQL 运行原理
理解传统关系型数据库中的基本 SQL 运行原理,有助于对 Spark SQL 运行原理更好地进行理解。
基本 SQL 运行流程
传统关系型数据库中 ,最基本的 SQL 查询语句由 Projection (a1, a2, a3) 、DataSource (table A) 和 Filter (condition) 三部分组成。
分别对应了 SQL 查询过程中的 Result、DataSource 和 Operation,也就是按照 Result --> DataSource --> Operation 的顺序来描述。
但是 SQL 的实际执行过程是按照 Operation --> DataSource --> Result 的顺序来执行的,这与 SQL 的语法正好相反。
具体的执行过程如下:
- 词法和语法解析(Parse):对写入的 SQL 语句进行词法和语法解析,分辨出 SQL 语句中哪些是关键词(如 select、from 和 where)、哪些是表达式、哪些是 Projection、哪些是 DataSource 等,判断 SQL 语法是否规范,并形成逻辑计划。
- 绑定(Bind):将 SQL 语句和数据库的数据字典(列、表、视图等)进行绑定,如果相关的 Projection 和 DataSource 等都在的话,则表示这个 SQL 语句是可以执行的,并生成可执行计划。
- 优化(Optimize):一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优的计划,生成最优执行计划。
- 执行(Execute):执行前面的步骤获取到的最优执行计划,返回实际查询得到的数据集。
3.3
Spark SQL 运行流程
Spark SQL 对 SQL 语句的处理采用了与关系型数据库类似的方法,并且跟 Catalyst 的组成结构对应。
Spark SQL 运行流程
下面以 SQL 例子及图解辅助进行说明:
3.3.1. 使用 SessionCatalog 保存元数据
在解析 SQL 语句前需要初始化 SQLContext,它定义 Spark SQL 上下文,在输入 SQL 语句前会加载 SessionCatalog。
初始化 SQLContext 时会把元数据保存在 SessionCatalog 中,包括数据库名、表名、字段名、字段类型等。这些数据将在解析未绑定的逻辑计划上使用。
3.3.2. 使用 Antlr 生成未绑定的逻辑计划
Spark2.0 起使用 Antlr 进行词法和语法解析,Antlr 会构建一个按照关键字生成的语法树,也就是未绑定的逻辑执行计划(Unresolved Logical Plan),包含 Unresolved Relation、Unresolved Function 和 Unresolved Attribute。
▲ 解析 SQL,生成抽象语法树(未绑定的逻辑执行计划)
3.3.3. 使用 Analyzer 绑定逻辑计划
在这个阶段 Analyzer 使用 Analysis Rules,结合 SessionCatalog 元数据,对未绑定的逻辑计划进行解析,生成已绑定的逻辑计划(Analyzed Logical Plan)。
具体流程是:
实例化一个 Simple Analyzer,然后遍历预定义好的 Batch,通过父类 Rule Executor 的执行方法运行 Batch 里的 Rules,每个 Rule 会对未绑定的逻辑计划进行处理。
有些可以通过一次解析处理,有些需要多次迭代,迭代直到达到 FixedPoint 次数或前后两次的树结构没有变化才停止操作。
▲ 在语法树中加入元数据信息,生成绑定的逻辑计划
3.3.4. 使用 Optimizer 优化逻辑计划
Optimizer 的实现和处理方式跟 Analyzer 类似,在该类中定义一系列 Optimization Rules,利用这些 Rules 将绑定的逻辑计划进行迭代处理,完成合并、列裁剪和谓词下推等优化工作后生成优化的逻辑计划(Optimized Logical Plan)。
▲ Predicate Pushdown(谓词下推),Filter 下推到 Scan 的位置,将符合条件的数据筛选出来后再进行 join 操作,减少操作的数据量
▲ Column Pruning(列裁剪),只保留查询用到的列,其它列裁剪掉,减少处理的数据量, 提升速度
3.3.5. 使用 SparkPlanner 生成可执行计划的物理计划
SparkPlanner 使用 Planning Strategies,对优化的逻辑计划进行转换,生成可以执行的物理计划(Physical Plan)。
根据过去的性能统计数据,选择最佳的物理执行计划 Cost Model,最后生成可以执行的物理执行计划树,得到 SparkPlan。
▲ 使用 queryExecution 方法查看逻辑执行计划
▲ 使用 explain 方法查看物理执行计划
3.3.6. 使用 execute 执行物理计划
在最终真正执行物理执行计划之前,还要进行 Preparations 规则处理,最后调用 SparkPlan 的 execute()
,执行物理计划计算 RDD。
▲ 执行物理计划,返回结果数据
经过上述的一整个流程,就完成了从用户编写的 SQL 语句(或 DataFrame/Dataset),到 Spark 内部 RDD 的具体操作逻辑的转化。
后面会另起章节,带大家实操 Spark SQL,敬请关注!
4
Spark SQL 数据抽象
在 Spark SQL 中有两种数据抽象:DataFrame 和 DataSet。
4.1
DataFrame
在 Spark 中,DataFrame 是一种以 RDD 为基础的的分布式数据集,类似于传统数据库的二维表格。
DataFrame 与 RDD 的主要区别在于,前者带有 Schema 元信息,即DataFrame 所表示的二维表数据集的每一列都带有名称和类型的数据结构信息。同时,与 Hive 类似,DataFrame 也支持嵌套数据类型(Struct、Array 和 Map)。
这就使得 Spark SQL 得以洞察更多的结构信息,从而对藏于 DataFrame 背后的数据源以及作用于 DataFrame 之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。
反观 RDD,由于无从得知所存数据元素的具体内部结构,Spark Core 只能在 Stage 层面进行简单、通用的流水线优化。
另外,从 API 易用性的角度上看,DataFrame API 提供的是一套高层的关系操作,比函数式的 RDD API 要更加友好、门槛更低。
DataFrame 具有如下特性:
- RDD 是分布式的 Java 对象的集合;DataFrame 是分布式的 Row 对象的集合。
- DataFrame 除了提供了比 RDD 更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如谓词下推、列裁剪等。
- DataFrame 为数据提供了 Schema 的视图,可以把它当做数据库中的一张表来对待。
- DataFrame 也是采用惰性执行机制,但性能上比 RDD 要高,主要原因是能通过 Catalyst 中的 Optimizer 对执行计划进行优化。
- DataFrame 是由 R、Pandas 处理小数据集的经验应用到处理分布式大数据集上的。
- 在 Spark 1.3 版本之前,DataFrame 叫 SchemaRDD。
4.2
DataSet
DataFrame 有以下的限制:
- 编译时类型不安全:DataFrame API 不支持编译时安全性,这限制了在结构不知道时操纵数据,使得在编译期间有效,但执行代码时出现运行时异常。
- 无法对域对象(丢失域对象)进行操作:将域对象转换为 DataFrame 后,无法从中重新生成它,就是说无法重新生成原始 RDD。
基于上述的两点,从 Spark 1.6 开始出现 DataSet,作为 DataFrame API 的一个扩展,是一个强类型的特定领域的对象,这种对象可以函数式或者关系操作并行地转换,结合了 RDD 和 DataFrame 的优点,至 Spark 2.0 中将 DataFrame 与 DataSet 合并。
每个 DataSet 也有一个被称为 DataFrame 的类型化视图,这种 DataFrame 是 Row 类型的 DataSet,即 Dataset[Row]。因此,DataFrame 可看作是特殊类型的 DataSet,类型为 Row。
RDD、DataFrame、DataSet 的关系
DataSet API 是 DataFrames 的扩展,它提供了一种类型安全的、面向对象的编程接口,它是一个强类型、不可变的对象集合,映射到关系模式。
DataSet 的优势:
- 针对 RDD、DataFrame 与 DataSet 三者编程比较来说,DataSet API 无论语法错误和分析错误在编译时都能发现,而 RDD 和 DataFrame 有时需要在运行时才能发现。
- DataSet 与 RDD 相比较而言,由于 DataSet 数据使用特殊编码,所以在存储数据时更加节省内存。
- 与 RDD 相比,DataSet 保存了更多的描述信息,概念上等同于关系型数据库中的二维表。
- 与 DataFrame 相比,DataSet 保存了类型信息,是强类型的,提供了编译时类型检查。
4.3
RDD、DataFrame、DataSet 的区别
4.3.1. 结构上的区别
RDD、DataFrame、DataSet 在结构上的区别
- RDD[Person]:以 Person 为类型参数,但不了解其内部结构。
- DataFrame:提供了详细的结构信息 Schema,包括列的名称和数据类型,类似于传统数据库的二维表。
- DataSet[Person]:不单有 Schema 信息,还有类型信息。
4.3.2. 数据上的区别
假设 RDD[Person] 中有两行数据:
则 DataFrame 中的数据为:
代码语言:javascript复制DataFrame = RDD[Person] - 泛型 Schema SQL 优化
而 Dataset[Person] 中的数据为:
代码语言:javascript复制Dataset[Person] = DataFrame 泛型 = RDD[Person] Schema SQL 优化
Dataset[Row] 中的数据为:
代码语言:javascript复制DataFrame = DataSet[Row]
从数据上能更直观地看出 RDD、DataFrame、DataSet 之间的区别。
5
SparkSession
Spark 2.0 中引入了 SparkSession,其为用户提供了一个统一的切入点来学习和使用 Spark 的各项功能,并且允许用户通过它调用 DataFrame 和 DataSet 的相关 API 来编写 Spark 程序。
最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与 Spark 进行交互。
Spark Shell 中可直接使用 SparkSession
在 Spark 早期的版本中,SparkContext 是 Spark 的主要切入点,由于 RDD 是主要的 API,与 Spark 交互之前必须先创建 SparkConf 和 SparkContext,通过 SparkContext 来创建和操作 RDD。
对于其他的 API,需要使用不同的 Context。例如,对于 Streaming,需要使用 StreamingContext;对于 SQL,需要使用 SQLContext;对于 Hive,需要使用 HiveContext。
但是随着 DataSet 和 DataFrame 的 API 逐渐成为标准的 API,就需要为他们建立接入点。
所以在 Spark2.0 中,引入了 SparkSession 作为 DataSet 和 DataFrame API 的切入点,SparkSession 封装了 SparkConf 和 SparkContext。为了向后兼容,SQLContext 和 HiveContext 也被保存下来,封装在 SparkSession 中。
因此使用 SparkSession,不需要显式地创建 SparkConf、SparkContext 以及 SQLContext。
SparkSession 实质上是 SQLContext 和 HiveContext 的组合(未来可能还会加上 StreamingContext),所以在 SQLContext 和 HiveContext 上可用的 API 在 SparkSession 上同样是可以使用的。
SparkSession 内部封装了 SparkContext,所以计算实际上是由 SparkContext 完成的。
版权信息:© Olaf Protze / Alamy Stock Photo