导读 : - flink sql 介绍 - flink sql 使用
Flink sql 是什么
❝sql 的诞生就是为了简化我们对数据开发,可以使用少量的 sql 代码,帮助我完成对数据的查询,分析等功能 ❞
声明式 & 易于理解
对于用户只需要表达我想要什么,具体处理逻辑交给框架,系统处理,用户无需关心,对于一些非专业的开发人员有了解 sql,并且 sql 相对我们学习 java,c 等语言更简单,学习成本更低,如果跨团队,或者非大数据开发人员,也可以通过 sql 来进行 flink 任务的开发
自动调优
查询优化器,会对我们编写的 sql 进行优化,生成效率更好的执行计划,所以用户不需要了解底层细节,即高效的获取结果
稳定
sql 语义发展几十年是一个很稳定的语言,少有变动,当我们引擎的升级,甚至替换成另一个引擎,都可以做到兼容地,平滑地升级,无需更改我们的已经编写好的 sql 代码
流批统一的基础
对于 flink 通过 sql 的表达式,来完成流批的统一,一套 sql 代码,既可以跑流任务,也可以跑批任务,减少我们开发的成本
Flink sql 使用
数据类型
代码语言:javascript复制-- 字符串类型
# char类型
CHAR
CHAR(n) -- n在 1 和 2147483647 之间 未设置n=1
# 字符串类型
VARCHAR
VARCHAR(n) -- n在 1 和 2147483647 之间 未设置n=1
STRING -- 等于最大的varchar(max)
# 二进制类型
BINARY
BINARY(n) -- 范围同上
# 可变长度二进制类型
VARBINARY
VARBINARY(n) -- 类似于string
BYTES
-- 数字类型
# 带有精度的十进制数字类型 -- 类似于java中的
DECIMAL
DECIMAL(p)
DECIMAL(p, s)
DEC
DEC(p)
DEC(p, s)
NUMERIC
NUMERIC(p)
NUMERIC(p, s)
# 带符号
TINYINT -- -128 to 127
SMALLINT -- -32768 to 32767
# 不带符号的
INT -- 2147483,648 to 2147483647
INTEGER
BIGINT -- -9223372036854775808 to 9223372036854775807
# 带小数的
FLOAT
DOUBLE
-- 时间类型
#日期
DATE -- 2020-10-12
#时间
TIME
TIME(p) -- 10:10:12.p 不指定p,p= 0
#时间戳
TIMESTAMP
TIMESTAMP(p) -- 2020-12-12 12:10:11.p
-- 其他类型
#
ARRAY<t>
t ARRAY
#map类型
MAP<kt, vt>
-- 对应java的类型
Class Type
java.lang.String STRING
java.lang.Boolean BOOLEAN
boolean BOOLEAN NOT NULL
java.lang.Byte TINYINT
byte TINYINT NOT NULL
java.lang.Short SMALLINT
short SMALLINT NOT NULL
java.lang.Integer INT
int INT NOT NULL
java.lang.Long BIGINT
long BIGINT NOT NULL
java.lang.Float FLOAT
float FLOAT NOT NULL
java.lang.Double DOUBLE
double DOUBLE NOT NULL
java.sql.Date DATE
java.time.LocalDate DATE
java.sql.Time TIME(0)
java.time.LocalTime TIME(9)
java.sql.Timestamp TIMESTAMP(9)
java.time.LocalDateTime TIMESTAMP(9)
java.time.OffsetDateTime TIMESTAMP(9) WITH TIME ZONE
java.time.Instant TIMESTAMP(9) WITH LOCAL TIME ZONE
java.time.Duration INVERVAL SECOND(9)
java.time.Period I NTERVAL YEAR(4) TO MONTH
byte[] BYTES
T[] ARRAY<T>
java.util.Map<K, V> MAP<K, V>
系统函数 & 自定义函数
代码语言:javascript复制/*
下面是1.12版本的系统内置的函数,具体我们可以到官网查看,根据需求使用即可
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html
*/
// TODO 主要介绍自定义函数
/*
udf 和 udaf 需要定义eval方法,实现自己的逻辑,具体系统会调用对应的方法
udf : 传入一个值/多个/或者不传入,返回一个新的值,可以重载该方法,具体会根据传入的参数调用对应eval烦恼歌发 类似`map`算子,作用于sql
udaf : 自定义聚合函数,根据自己的逻辑定义累加器
udtf : 用作与表中,可返回一个或多个值,
*/
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import java.sql.SQLException;
public class UDFDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance().build());
// 注册函数
tEnv.registerFunction("customFunc1", new CustomUDF());
tEnv.registerFunction("customFunc2", new CustomUDAF());
tEnv.registerFunction("customFunc3", new CustomUDTF());
}
static class Acc {
int result;
public Integer gerResult() {
return result;
}
public Acc merge(Acc acc) {
result = acc.gerResult() result;
return this;
}
public void incr() {
result ;
}
}
static class CustomUDF extends ScalarFunction {
// UDF 需要定义该方法
public int eval(String str) {
int hc = 0;
for (char c : str.toUpperCase().toCharArray()) {
hc = hashCode() >> c;
}
hc = hc - 1 - str.length();
hc = hc >> 7;
return hc;
}
}
static class CustomUDTF extends TableFunction<Row> {
// udtf 需要定义该方法,在该方法实现逻辑
public void eval(String str) throws SQLException {
if (str != null) {
for (String s : str.split(",")) {
Row row = new Row(2);
row.setField(0, s);
row.setField(1, 1);
collect(row);
}
}
}
@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
}
}
static class CustomUDAF extends AggregateFunction<Integer, Acc> {
@Override
public Integer getValue(Acc accumulator) {
return accumulator.gerResult();
}
@Override
public Acc createAccumulator() {
return new Acc();
}
// 累加
public void accumulate(Acc acc,String input){
if("*".equals(input)){
return;
}
acc.incr();
}
public void accumulate(Acc acc){
acc.incr();
}
}
}
简单案例
代码
❝flink sql 中时间机制本质与 dataStream api 相同,只不过使用少于区别,稍加注意即可,注意指定 watermark 需要使用 sql 中 timestamp(3)类型(具体对应 java 类型可根据上面类型自行判断),设置 watermark 后可使用 ROWTIEM 字段(具体看 sql 代码),没有设置可直接使用 PROCTIME 字段
注意 : 不同的时间语义要严格对应环境配置的时间语义,否则可能出现异常
❝时间字段为两种,属于非用户指定字段,设置完时间语义后,根据需求使用具体的时间字段 ❞ ROWTIME : 事件时间 PROCTIME : 处理时间字段 场景 :
- join : 场景与双流 join 或者 维表 join,目前 flink 支持的不是很好
- topN & 去重 : 语法基本相同,row_num > 1 即 topN , 当=1 则是去重操作
topN 场景一些热搜,排名等内容 去重顾名思义,就是为了去重,去重会涉及到 retract 流(以后会详细讲)内容,会更新之前已经存在的结果 ❞
代码语言:javascript复制// TODO 下面代码仅供参考,具体测试根据自己时间环境来
// 以下只是一些简单的案例,后面会逐步深入复杂sql和原理层面
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author 857hub
*/
public class ClickhouseSinkApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().
// useBlinkPlanner().
build()
);
tEnv.getConfig().getConfiguration().setString(PipelineOptions.NAME, "sql test");
// sources
String source = "CREATE TABLE source (n"
" `id` int,n"
" `name` varchar.n"
" `ts` timestamp(3),n"
// 指定watermark 允许延迟5s
"WATERMARK FOR ts AS ts - INTERVAL '5' SECOND"
") WITH (n"
" 'connector' = 'kafka',n"
" 'topic' = 'test1',n"
" 'properties.bootstrap.servers' = '172.16.100.109:9092',n"
" 'properties.group.id' = 'xzw',n"
" 'scan.startup.mode' = 'latest-offset',n"
" 'format' = 'json'n"
")";
String source2 = "CREATE TABLE source2 (n"
" `id` int,n"
" `name` varchar,n"
" `ts` timestamp(3)n"
") WITH (n"
" 'connector' = 'kafka',n"
" 'topic' = 'test2',n"
" 'properties.bootstrap.servers' = '172.16.100.109:9092',n"
" 'properties.group.id' = 'xzw',n"
" 'scan.startup.mode' = 'latest-offset',n"
" 'format' = 'json'n"
")";
// clickhouse sink 由我自己定义,后面会对sql自定义source和sink进行讲解
String sink = "CREATE TABLE sink (n"
" `id` INT,n"
" `name` VARCHARn"
") WITH (n"
// 需要自定义接信息参数 -- option
" 'connector' = 'xzw_ck',n"
" 'url' = 'jdbc:clickhouse://localhost:8123/default',n"
" 'table-name' = 'test',n"
" 'username' = 'default',n"
" 'password' = '123456'n"
" )";
// 执行 source sink sql
tEnv.executeSql(source);
tEnv.executeSql(source2);
tEnv.executeSql(sink);
/*
由于是简单使用,没有在场景应用,简单介绍一下区别,可以根据们不同的区别在自己项目中使用
left json : 无论是否join上都返回左表的数据
inner join : 只有join上才会返回匹配后的结果
full outer join : 两边的数据都会返回,无论是否join上,没有的则为null
interval join : 基于时间范围内的join,在指定的时间范围内返回join上的数据
*/
String joinSql = "select * from source1 s1"
"left join source2 s2"
// 内连接
// "inner join source2" || "join source2"
// 全连接
// "full outer join source2"
// 时间范围join
// "s1.ts >= s2.ts AND s1.ts < s2.ts INTERVAL '10' MINUTE"
" on s1.id =s2.id "
;
Table joinTable = tEnv.sqlQuery(joinSql);
// 分组排序,取topN, 如果要是去重 rnum=1即可实现去重操作
String insertSql = "insert into sink select id,name from("
"select *,"
"row_number() over(partition by id order by ts) as rnum "
"from " joinTable " where rnum < 5 "
")";
// add insert sql
TableResult tableResult = executeSql(tEnv, "insert into sink select * from source", "*",insertSql);
// 随意使用
// Optional<JobClient> jobClient = tableResult.getJobClient();
}
// 添加多个sql具体执行
private static TableResult executeSql(StreamTableEnvironment tEnv, final String... sqls) {
StatementSet statementSet = tEnv.createStatementSet();
for (String sql : sqls) {
if ("*".equals(sql) || sql.length()>=27) {
continue;
}
statementSet.addInsertSql(sql);
}
return statementSet.execute();
}
}
maven 依赖
代码语言:javascript复制 <properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.12.2</flink.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!---->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
</dependency>
</dependencies>