2021年大数据Flink(三十二):​​​​​​​Table与SQL案例准备 API

2021-10-11 14:55:38 浏览数 (1)


API

获取环境

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#create-a-tableenvironment

代码语言:javascript复制
// **********************

// FLINK STREAMING QUERY

// **********************

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;



EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();

StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();

StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);



// ******************

// FLINK BATCH QUERY

// ******************

import org.apache.flink.api.java.ExecutionEnvironment;

import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;



ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();

BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);



// **********************

// BLINK STREAMING QUERY

// **********************

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;



StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);



// ******************

// BLINK BATCH QUERY

// ******************

import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableEnvironment;



EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();

TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

​​​​​​​创建表

代码语言:javascript复制
// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section



// table is the result of a simple projection query

Table projTable = tableEnv.from("X").select(...);



// register the Table projTable as table "projectedTable"

tableEnv.createTemporaryView("projectedTable", projTable);
代码语言:javascript复制
tableEnvironment

  .connect(...)

  .withFormat(...)

  .withSchema(...)

  .inAppendMode()

  .createTemporaryTable("MyTable")

​​​​​​​查询表

Table API

代码语言:javascript复制
// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// register Orders table

// scan registered Orders table

Table orders = tableEnv.from("Orders");// compute revenue for all customers from France

Table revenue = orders

  .filter($("cCountry")

.isEqual("FRANCE"))

  .groupBy($("cID"), $("cName")

  .select($("cID"), $("cName"), $("revenue")

.sum()

.as("revSum"));

// emit or convert Table

// execute query

SQL

代码语言:javascript复制
// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section



// register Orders table

// compute revenue for all customers from France

Table revenue = tableEnv.sqlQuery(

    "SELECT cID, cName, SUM(revenue) AS revSum "  

    "FROM Orders "  

    "WHERE cCountry = 'FRANCE' "  

    "GROUP BY cID, cName"

  );

// emit or convert Table

// execute query
代码语言:javascript复制
// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section



// register "Orders" table

// register "RevenueFrance" output table

// compute revenue for all customers from France and emit to "RevenueFrance"

tableEnv.executeSql(

    "INSERT INTO RevenueFrance "  

    "SELECT cID, cName, SUM(revenue) AS revSum "  

    "FROM Orders "  

    "WHERE cCountry = 'FRANCE' "  

    "GROUP BY cID, cName"

  );

​​​​​​​写出表

代码语言:javascript复制
// get a TableEnvironment

TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// create an output Table

final Schema schema = new Schema()

    .field("a", DataTypes.INT())

    .field("b", DataTypes.STRING())

    .field("c", DataTypes.BIGINT());

tableEnv.connect(new FileSystem().path("/path/to/file"))

    .withFormat(new Csv().fieldDelimiter('|').deriveSchema())

    .withSchema(schema)

    .createTemporaryTable("CsvSinkTable");

// compute a result Table using Table API operators and/or SQL queries

Table result = ...

// emit the result Table to the registered TableSink

result.executeInsert("CsvSinkTable");

​​​​​​​与DataSet/DataStream集成

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api

  • Create a View from a DataStream or DataSet
代码语言:javascript复制
// get StreamTableEnvironment

// registration of a DataSet in a BatchTableEnvironment is equivalent

StreamTableEnvironment tableEnv = ...; 



// see "Create a TableEnvironment" section

DataStream<Tuple2<Long, String>> stream = ...



// register the DataStream as View "myTable" with fields "f0", "f1"

tableEnv.createTemporaryView("myTable", stream);



// register the DataStream as View "myTable2" with fields "myLong", "myString"

tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));

Convert a DataStream or DataSet into a Table

Convert a Table into a DataStream or DataSet

Convert a Table into a DataStream

Append Mode: This mode can only be used if the dynamic Table is only modified by INSERT changes, i.e, it is append-only and previously emitted results are never updated.

追加模式:只有当动态表仅通过插入更改进行修改时,才能使用此模式,即,它是仅追加模式,并且以前发出的结果从不更新。

Retract Mode: This mode can always be used. It encodes INSERT and DELETE changes with a boolean flag.

撤回模式:此模式始终可用。它使用布尔标志对插入和删除更改进行编码。

代码语言:javascript复制
// get StreamTableEnvironment.

StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section



// Table with two fields (String name, Integer age)

Table table = ...



// convert the Table into an append DataStream of Row by specifying the class

DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);



// convert the Table into an append DataStream of Tuple2<String, Integer>

 //   via a TypeInformation

TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(

  Types.STRING(),

  Types.INT());

DataStream<Tuple2<String, Integer>> dsTuple = 

  tableEnv.toAppendStream(table, tupleType);

// convert the Table into a retract DataStream of Row.

//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.

//   The boolean field indicates the type of the change.

//   True is INSERT, false is DELETE.

DataStream<Tuple2<Boolean, Row>> retractStream = 

  tableEnv.toRetractStream(table, Row.class);

Convert a Table into a DataSet

代码语言:javascript复制
// get BatchTableEnvironment

BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);



// Table with two fields (String name, Integer age)

Table table = ...



// convert the Table into a DataSet of Row by specifying a class

DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);



// convert the Table into a DataSet of Tuple2<String, Integer> via a TypeInformationTupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(

  Types.STRING(),

  Types.INT());

DataSet<Tuple2<String, Integer>> dsTuple = 

  tableEnv.toDataSet(table, tupleType);

​​​​​​​TableAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html

​​​​​​​SQLAPI

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/

0 人点赞