“ Apache Flink,Spark,Hadoop包括其他计算框架都趋向于使用SQL的方式对数据进行检索。很少再有通过代码的方式进行数据的操作。数据计算框架使用SQL解释器的方式对数据进行检索。Apache Flink提供了Table API 与SQL的方式实现统一的流处理与批处理的数据计算。使用DataFrame关系型编程接口,其强大且灵活的表达能力、丰富的接口有效降低用户的使用成本。”
Apache Flink提供了关系型编程接口Table API以及基于Table API的SQL API,让用户能够基于Table API、SQL API实现Flink应用。Table API与SQL API能够统一的实现批处理与流处理的计算业务。能够通过一套代码实现批数据的处理与流数据的处理。做到真正的批流统一 (批处理与流处理使用相同的代码实现相同的处理逻辑)。
TableEnviroment
TableEnviroment与DataStream API一样,在开发时首先需要创建TableEnviroment关系型编程环境。才能在的程序中使用Table API与SQL API。SQL API与Table API使用的都是相同的编程模型。而且两者可以在程序中同时使用。
Flink SQL基于Apache Calcite框架实现SQL标准协议。Apache Calcite是Java编写的开源SQL解析工具,当前较多的项目使用该框架。如:Hive、Drill、Flink、Phoenix 等。Apache Calcite的主要功能有SQL解析、SQL校验、查询优化、SQL生成器、数据连接等。
TableEnviroment是Table API与SQL API的核心,主要负责:
- 在内部 catelog 中注册表
- 注册外部 catelog
- 执行SQL查询
- 注册用户自定义函数
- 将 DataStream 或 DataSet 转换为表
- 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
Table API 与 SQL API的实现
代码语言:javascript复制val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = BatchTableEnvironment.create(env)
//注册DataSet
val dataset:DataSet[User] = env.fromElements(
User("CainGao",10),
User("CainGao",1),
User("CainGao",65)
)
val table:Table = tableEnv.fromDataSet(dataset)
//输出Table名称
table.printSchema()
//Table API进行查询.
table
.groupBy("name")
.select("name,age.sum as ages")
.toDataSet[Row]
.print()
//注册Table表结构
tableEnv.registerDataSet("USER",dataset,'name,'age)
//执行SQL API进行查询
val result = tableEnv.sqlQuery("SELECT name,sum(age) FROM `USER` GROUP BY name")
result.toDataSet[Row].print()
以上代码通过Table API与SQL API分别对数据实现了对user表的计算。其中包含了 表注册、Table API查询、SQL API查询、DataSet与表转换等。
TableEnviroment中的Register接口完成表的注册,注册相应的数据源和数据表信息。所有数据库和表的元数据信息都存储在Flink Catalog内部目录结构中。
registerDataSet时,可以看到已经设置了Schema信息,如果不设置Schema信息Apache Flink会默认使用索引位置作为Table的字段名称:_1,_2。当然更好的方式还是使用字段名称进行映射,相对于字段索引位置的映射,名称映射显然更加的灵活。
今天大致了解这些,大致的先了解一下概念。Apache Flink利用其Table API与SQL API实现更灵活更加方便的对数据的操作。实现真正的批流统一。