Apache Flink Table Api&SQL 介绍与使用

2020-04-14 09:21:21 浏览数 (1)

Apache Flink,Spark,Hadoop包括其他计算框架都趋向于使用SQL的方式对数据进行检索。很少再有通过代码的方式进行数据的操作。数据计算框架使用SQL解释器的方式对数据进行检索。Apache Flink提供了Table API 与SQL的方式实现统一的流处理与批处理的数据计算。使用DataFrame关系型编程接口,其强大且灵活的表达能力、丰富的接口有效降低用户的使用成本。

Apache Flink提供了关系型编程接口Table API以及基于Table API的SQL API,让用户能够基于Table API、SQL API实现Flink应用。Table API与SQL API能够统一的实现批处理与流处理的计算业务。能够通过一套代码实现批数据的处理与流数据的处理。做到真正的批流统一 (批处理与流处理使用相同的代码实现相同的处理逻辑)

TableEnviroment

TableEnviroment与DataStream API一样,在开发时首先需要创建TableEnviroment关系型编程环境。才能在的程序中使用Table API与SQL API。SQL API与Table API使用的都是相同的编程模型。而且两者可以在程序中同时使用。

Flink SQL基于Apache Calcite框架实现SQL标准协议。Apache Calcite是Java编写的开源SQL解析工具,当前较多的项目使用该框架。如:Hive、Drill、Flink、Phoenix 等。Apache Calcite的主要功能有SQL解析、SQL校验、查询优化、SQL生成器、数据连接等。

TableEnviroment是Table API与SQL API的核心,主要负责:

  1. 在内部 catelog 中注册表
  2. 注册外部 catelog
  3. 执行SQL查询
  4. 注册用户自定义函数
  5. 将 DataStream 或 DataSet 转换为表
  6. 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

Table API 与 SQL API的实现

代码语言:javascript复制
val env = ExecutionEnvironment.getExecutionEnvironment
    val tableEnv = BatchTableEnvironment.create(env)
    //注册DataSet
    val dataset:DataSet[User] = env.fromElements(
      User("CainGao",10),
      User("CainGao",1),
      User("CainGao",65)
    )
    
    val table:Table = tableEnv.fromDataSet(dataset)
    //输出Table名称
    table.printSchema()
    //Table API进行查询.
    table
      .groupBy("name")
      .select("name,age.sum as ages")
      .toDataSet[Row]
      .print()
      
    //注册Table表结构
    tableEnv.registerDataSet("USER",dataset,'name,'age)
    //执行SQL API进行查询
    val result = tableEnv.sqlQuery("SELECT name,sum(age) FROM `USER` GROUP BY name")
    
    result.toDataSet[Row].print()

以上代码通过Table API与SQL API分别对数据实现了对user表的计算。其中包含了 表注册、Table API查询、SQL API查询、DataSet与表转换等。

TableEnviroment中的Register接口完成表的注册,注册相应的数据源和数据表信息。所有数据库和表的元数据信息都存储在Flink Catalog内部目录结构中。

registerDataSet时,可以看到已经设置了Schema信息,如果不设置Schema信息Apache Flink会默认使用索引位置作为Table的字段名称:_1,_2。当然更好的方式还是使用字段名称进行映射,相对于字段索引位置的映射,名称映射显然更加的灵活。

今天大致了解这些,大致的先了解一下概念。Apache Flink利用其Table API与SQL API实现更灵活更加方便的对数据的操作。实现真正的批流统一。

0 人点赞