(下)史上最全干货!Flink SQL 成神之路(全文 18 万字、138 个案例、42 张图)

2022-04-04 11:30:02 浏览数 (1)

4.SQL UDF 篇

Flink TableSQL API 允许用户使用函数进行数据处理、字段标准化等处理。

4.1.SQL 函数的归类

Flink 中的函数有两个维度的归类标准。

  1. ⭐ 一个归类标准是:系统(内置)函数和 Catalog 函数。系统函数没有命名空间,只能通过其名称来进行引用。Catalog 函数属于 Catalog 和数据库,因此它们拥有 Catalog 和数据库的命名空间。用户可以通过全/部分限定名(catalog.db.func 或 db.func)或者函数来对 Catalog 函数进行引用。
  2. ⭐ 另一个归类标准是:临时函数和持久化函数。临时函数由用户创建,它仅在会话的生命周期(也就是一个 Flink 任务的一次运行生命周期内)内有效。持久化函数不是由系统提供的,是存储在 Catalog 中,它在不同会话的生命周期内都有效。

这两个维度归类标准组合下,Flink SQL 总共提供了 4 种函数:

  1. ⭐ 临时性系统内置函数
  2. ⭐ 系统内置函数
  3. ⭐ 临时性 Catalog 函数(例如:Create Temporary Function)
  4. ⭐ Catalog 函数(例如:Create Function)

请注意,在用户使用函数时,系统函数始终优先于 Catalog 函数解析,临时函数始终优先于持久化函数解析。

4.2.SQL 函数的引用方式

用户在 Flink 中可以通过精确、模糊两种引用方式引用函数。

4.2.1.精确函数

精确函数引用是让用户限定 Catalog,数据库名称进行精准定位一个 UDF 然后调用。

例如:select mycatalog.mydb.myfunc(x) from mytable 或者 select mydb.myfunc(x) from mytable。

4.2.2.模糊函数

在模糊函数引用中,用户只需在 SQL 查询中指定函数名就可以引用 UDF,例如:select myfunc(x) from mytable。

当然小伙伴萌问到,如果系统函数和 Catalog 函数的名称是重复的,Flink 体系是会使用哪一个函数呢?这就是下文要介绍的 UDF 解析顺序

4.3.SQL 函数的解析顺序

4.3.1.精确函数

由于精确函数应用一定会带上 Catalog 或者数据库名称,所以 Flink 中的精确函数引用一定是指向临时性 Catalog 函数或 Catalog 函数的。

比如:select mycatalog.mydb.myfunc(x) from mytable

那么 Flink 对其解析顺序以及使用顺序如下:

  1. ⭐ 临时性 catalog 函数
  2. ⭐ Catalog 函数

4.3.2.模糊函数

比如 select myfunc(x) from mytable

解析顺序以及使用顺序如下:

  1. ⭐ 临时性系统内置函数
  2. ⭐ 系统内置函数
  3. ⭐ 临时性 Catalog 函数, 只会在当前会话的当前 Catalog 和当前数据库中查找函数及解析函数
  4. ⭐ Catalog 函数, 在当前 Catalog 和当前数据库中查找函数及解析函数

4.4.系统内置函数

系统内置函数小伙伴萌可以直接在 Flink 官网进行查询,博主这里就不多进行介绍。

https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/functions/systemfunctions/#hash-functions

注意: 在目前 1.13 版本的 Flink 体系中,内置的系统函数没有像 Hive 内置的函数那么丰富,比如 Hive 中常见的 get_json_object 之类的,Flink 都是没有的,但是 Flink 提供了插件化 Module 的能力,能扩充一些 UDF,下文会进行介绍。

4.5.SQL 自定义函数(UDF)

!!!Flink 体系也提供了类似于其他大数据引擎的 UDF 体系。

自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用 SQL 进行 直接 表达的频繁使用或自定义的逻辑。

目前 Flink 自定义函数可以基于 JVM 语言(例如 Java 或 Scala)或 Python 实现,实现者可以在 UDF 中使用任意第三方库,本章聚焦于使用 Java 语言开发自定义函数。

当前 Flink 提供了一下几种 UDF 能力:

  1. 标量函数(Scalar functions 或 UDAF):输入一条输出一条,将标量值转换成一个新标量值,对标 Hive 中的 UDF;
  2. 表值函数(Table functions 或 UDTF):输入一条条输出多条,对标 Hive 中的 UDTF;
  3. 聚合函数(Aggregate functions 或 UDAF):输入多条输出一条,对标 Hive 中的 UDAF;
  4. 表值聚合函数(Table aggregate functions 或 UDTAF):仅仅支持 Table API,不支持 SQL API,其可以将多行转为多行;
  5. 异步表值函数(Async table functions):这是一种特殊的 UDF,支持异步查询外部数据系统,用在前文介绍到的 lookup join 中作为查询外部系统的函数。

先直接给一个案例看看,怎么创建并在 Flink SQL 中使用一个 UDF:

代码语言:javascript复制
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;

// 定义一个标量函数
public static class SubstringFunction extends ScalarFunction {
  public String eval(String s, Integer begin, Integer end) {
    return s.substring(begin, end);
  }
}

TableEnvironment env = TableEnvironment.create(...);

// 在 Table API 可以直接以引用 class 方式使用 UDF
env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));

// 注册 UDF
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);

// Table API 调用 UDF
env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));

// SQL API 调用 UDF
env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");

注意:如果你的函数在初始化时,是有入参的,那么需要你的入参是 Serializable 的。即 Java 中需要继承 Serializable 接口。

案例如下:

代码语言:javascript复制
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;

// 定义一个带有输入参数的标量函数
public static class SubstringFunction extends ScalarFunction {

  -- boolean 默认就是 Serializable 的
  private boolean endInclusive;

  public SubstringFunction(boolean endInclusive) {
    this.endInclusive = endInclusive;
  }

  public String eval(String s, Integer begin, Integer end) {
    return s.substring(begin, endInclusive ? end   1 : end);
  }
}

TableEnvironment env = TableEnvironment.create(...);

// Table API 调用 UDF
env.from("MyTable").select(call(new SubstringFunction(true), $("myField"), 5, 12));

// 注册 UDF
env.createTemporarySystemFunction("SubstringFunction", new SubstringFunction(true));

4.6.开发 UDF 之前的需知事项

总结这几个事项主要包含以下步骤:

  1. 首先需要继承 Flink SQL UDF 体系提供的基类,每种 UDF 实现都有不同的基类
  2. 实现 UDF 执行逻辑函数,不同类型的 UDF 需要实现不同的执行逻辑函数
  3. 注意 UDF 入参、出参类型推导,Flink 在一些基础类型上的是可以直接推导出类型信息的,但是一些复杂类型就无能为力了,这里需要用户主动介入
  4. 明确 UDF 输出结果是否是定值,如果是定值则 Flink 会在生成计划时就执行一遍,得出结果,然后使用这个定值的结果作为后续的执行逻辑的参数,这样可以做到不用在 Flink SQL 任务运行时每次都执行一次,会有性能优化
  5. 巧妙运用运行时上下文,可以在任务运行前加载到一些外部资源、上下文配置信息,扩展 UDF 能力

4.6.1.继承 UDF 基类

和 Hive UDF 实现思路类似,在 Flink UDF 体系中,需要注意一下事项:

  1. ⭐ Flink UDF 要继承一个基类(比如标量 UDF 要继承 org.apache.flink.table.functions.ScalarFunction)。
  2. ⭐ 类必须声明为 public,不能是 abstract 类,不能使用非静态内部类或匿名类。
  3. ⭐ 为了在 Catalog 中存储此类,该类必须要有默认构造函数并且在运行时可以进行实例化。

4.6.2.实现 UDF 执行逻辑函数

基类提供了一组可以被重写的方法,来给用户进行使用,这些可被重写的方法就是主要承担 UDF 自定义执行逻辑的地方。

举例在 ScalarFunction 中:

  1. open():用于初始化资源(比如连接外部资源),程序初始化时进行调用
  2. close():用于关闭资源,程序结束时进行调用
  3. isDeterministic():用于判断返回结果是否是确定的,如果是确定的,结果会被直接执行
  4. eval(xxx):Flink 用于处理每一条数据的主要处理逻辑函数

你可以自定义 eval 的入参,比如:

  • eval(Integer) 和 eval(LocalDateTime);
  • 使用变长参数,例如 eval(Integer...);
  • 使用对象,例如 eval(Object) 可接受 LocalDateTime、Integer 作为参数,只要是 Object 都可以;
  • 也可组合使用,例如 eval(Object...) 可接受所有类型的参数。

并且你可以在一个 UDF 中重载 eval 函数来实现不同的逻辑,比如:

代码语言:javascript复制
import org.apache.flink.table.functions.ScalarFunction;

// 有多个重载求和方法的函数
public static class SumFunction extends ScalarFunction {

  // 入参为 Integer
  public Integer eval(Integer a, Integer b) {
    return a   b;
  }

  // 入参为 String
  public Integer eval(String a, String b) {
    return Integer.valueOf(a)   Integer.valueOf(b);
  }

  // 入参为多个 Double
  public Integer eval(Double... d) {
    double result = 0;
    for (double value : d)
      result  = value;
    return (int) result;
  }
}

注意:由于 Flink 在运行时会调用这些方法,所以这些方法必须声明为 public,并且包含明确的输入和输出参数。

4.6.3.注意 UDF 入参、出参类型推导

从两个角度来说,为什么函数的入参、出参类型会对 UDF 这么重要。

  1. ⭐ 从开发人员角度讲,在设计 UDF 的时候,肯定会涉及到 UDF 预期的入参、出参类型信息、也包括一些数据的精度、小数位数等信息
  2. ⭐ 从程序运行角度讲,Flink SQL 程序运行时,肯定也需要知道怎么将 SQL 中的类型数据与 UDF 的入参、出参类型,这样才能做数据序列化等操作

而 Flink 也提供了三种方式帮助 Flink 程序获取参数类型信息。

  1. ⭐ 自动类型推导功能:Flink 具备 UDF 自动类型推导功能,该功能可以通过反射从函数的类及其求值方法派生数据类型。比如如果你的 UDF 的方法或者类的签名中已经有了对应的入参、出参的类型,Flink 一般都可以推导并获取到这些类型信息。
  2. ⭐ 添加类型注解:当 1 中的隐式反射提取方法不成功,则可以通过使用 Flink 提供的 @DataTypeHint@FunctionHint 注解对应的参数、类或方法来显示的支持 Flink 参数类型提取。
  3. ⭐ 重写 getTypeInference():你可以使用 Flink 提供的更高级的类型推导方法,你可以在 UDF 实现类中重写 getTypeInference() 方法去显示声明函数的参数类型信息

接下来介绍几个例子。

  1. ⭐ 自动类型推导案例:

自动类型推导会检查函数的 签名和 eval 方法签名,从而推导出函数入参和出参的数据类型,@DataTypeHint@FunctionHint 注解也可以辅助支持自动类型推导。

关于自动类型推导具体将 Java 的对象会映射成 SQL 的具体哪个数据类型,可以参考 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/types/#data-type-extraction

案例如下:

代码语言:javascript复制
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;

// 有多个重载求值方法的函数
public static class OverloadedFunction extends ScalarFunction {

  // 不需要任何声明,可以直接推导出类型信息,即入参和出参对应到 SQL 中的 bigint 类型
  public Long eval(long a, long b) {
    return a   b;
  }

  // 使用 @DataTypeHint("DECIMAL(12, 3)") 定义 decimal 的精度和小数位
  public @DataTypeHint("DECIMAL(12, 3)") BigDecimal eval(double a, double b) {
    return BigDecimal.valueOf(a   b);
  }

  // 使用注解定义嵌套数据类型
  @DataTypeHint("ROW<s STRING, t TIMESTAMP_LTZ(3)>")
  public Row eval(int i) {
    return Row.of(String.valueOf(i), Instant.ofEpochSecond(i));
  }

  // 允许任意类型的输入,并输出序列化定制后的值
  @DataTypeHint(value = "RAW", bridgedTo = ByteBuffer.class)
  public ByteBuffer eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
    return MyUtils.serializeToByteBuffer(o);
  }
}
  1. ⭐ 根据 @FunctionHint 注解自动推导类型案例:

使用 @DataTypeHint 注解虽好,但是有些场景下,使用起来比较复杂,比如:

  • ⭐ 我们不希望 eval 函数的入参和出参都是一个非常具体的类型,比如 long,int,double 等。我们希望它是一个通用的类型,比如 Object。这样的话就不用重载那么多的函数,可以直接使用一个 eval 函数实现不同的处理逻辑,返回不同类型的结果
  • ⭐ 多个 eval 方法的返回结果类型都是相同的,我们懒得写多次 @DataTypeHint

那么就可以使用 @FunctionHint 实现,@FunctionHint 是声明在类上面的,举例如下:

代码语言:javascript复制
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

// 1. 解耦类型推导与 eval 方法,类型推导根据 FunctionHint 注解中的信息来,下面的案例说明当前这个 UDF 有三种输入输出类型信息组合
@FunctionHint(
  input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
  output = @DataTypeHint("INT")
)
@FunctionHint(
  input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
  output = @DataTypeHint("BIGINT")
)
@FunctionHint(
  input = {},
  output = @DataTypeHint("BOOLEAN")
)
public static class OverloadedFunction extends TableFunction<Object> {

  public void eval(Object... o) {
    if (o.length == 0) {
      collect(false);
    }
    collect(o[0]);
  }
}

// 2. 为函数类的所有 eval 方法指定同一个输出类型
@FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
public static class OverloadedFunction extends TableFunction<Row> {

  public void eval(int a, int b) {
    collect(Row.of("Sum", a   b));
  }

  public void eval() {
    collect(Row.of("Empty args", -1));
  }
}
  1. ⭐ getTypeInference()

getTypeInference() 可以做到根据小伙伴萌自定义的方式去定义类型推导过程及结果,具有高度自定义的能力。举例如下:

代码语言:javascript复制
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.types.Row;

public static class LiteralFunction extends ScalarFunction {
  public Object eval(String s, String type) {
    switch (type) {
      case "INT":
        return Integer.valueOf(s);
      case "DOUBLE":
        return Double.valueOf(s);
      case "STRING":
      default:
        return s;
    }
  }

  // 如果实现了 getTypeInference,则会禁用自动的反射式类型推导,使用如下逻辑进行类型推导
  @Override
  public TypeInference getTypeInference(DataTypeFactory typeFactory) {
    return TypeInference.newBuilder()
      // 指定输入参数的类型,必要时参数会被隐式转换
      .typedArguments(DataTypes.STRING(), DataTypes.STRING())
      // 用户高度自定义的类型推导逻辑
      .outputTypeStrategy(callContext -> {
        if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) {
          throw callContext.newValidationError("Literal expected for second argument.");
        }
        // 基于第一个入参决定具体的返回数据类型
        final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING");
        switch (literal) {
          case "INT":
            return Optional.of(DataTypes.INT().notNull());
          case "DOUBLE":
            return Optional.of(DataTypes.DOUBLE().notNull());
          case "STRING":
          default:
            return Optional.of(DataTypes.STRING());
        }
      })
      .build();
  }
}

4.6.4.明确 UDF 输出结果是否是定值

用户可以通过重写 isDeterministic() 函数来声明这个 UDF 产出的结果是否是一个定值。

对于纯函数(即没有入参的函数,比如 random(), date(), or now() 等)来说,默认情况下 isDeterministic() 返回 true,小伙伴萌可以自定义返回 false。

如果函数不是一个纯函数(即没有入参的函数,比如 random(), date(), or now() 等),这个方法必须返回 false

那么 isDeterministic() 方法的返回值到底影响什么呢?

答案:影响 Flink 任务在什么时候就直接执行这个 UDF。主要在以下两个方面体现:

  1. ⭐ Flink 在生成计划期间直接执行 UDF 获得结果:如果使用常量表达式调用函数,或者使用常量作为函数的入参,则 Flink 任务可能不会在任务正式运行时执行该函数。举个例子,SELECT ABS(-1) FROM tSELECT ABS(field) FROM t WHERE field = -1,这两种都会被 Flink 进行优化,直接把 ABS(-1) 的结果在客户端生成执行计划时就将结果运行出来。如果不想在生成执行计划阶段直接将结果运行出来,可以实现 isDeterministic() 返回 false。
  2. ⭐ Flink 在程序运行期间执行 UDF 获得结果:如果 UDF 的入参不是常量表达式,或者 isDeterministic() 返回 false,则 Flink 会在程序运行期间执行 UDF。

那么小伙伴会问到,有些场景下 Flink SQL 是做了各种优化之后然后推断出表达式是否是常量,我怎么判断能够更加方便的判断出这个 Flink 是否将这个 UDF 的优化为固定结果了呢?

结论:这些都是可以在 Flink SQL 生成的算子图中看到,在 Flink web ui 中,每一个算子上面都可以详细看到 Flink 最终生成的算子执行逻辑。

4.6.5.巧妙运用运行时上下文

有时候我们想在 UDF 需要获取一些 Flink 任务运行的全局信息,或者在 UDF 真正处理数据之前做一些配置(setup)/清理(clean-up)的工作。UDF 为我们提供了 open() 和 close() 方法,你可以重写这两个方法做到类似于 DataStream API 中 RichFunction 的功能。

  1. open() 方法:在任务初始化时被调用,常常用于加载一些外部资源;
  2. close() 方法:在任务结束时被调用,常常用于关闭一些外部资源;

其中 open() 方法提供了一个 FunctionContext,它包含了一些 UDF 被执行时的上下文信息,比如 metric group、分布式文件缓存,或者是全局的作业参数等。

比如可以获取到下面的信息:

  1. ⭐ getMetricGroup():执行该函数的 subtask 的 Metric Group
  2. ⭐ getCachedFile(name):分布式文件缓存的本地临时文件副本
  3. ⭐ getJobParameter(name, defaultValue):获取 Flink 任务的全局作业参数
  4. ⭐ getExternalResourceInfos(resourceName):获取一些外部资源

下面的例子展示了如何在一个标量函数中通过 FunctionContext 来获取一个全局的任务参数:

代码语言:javascript复制
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

public static class HashCodeFunction extends ScalarFunction {

    private int factor = 0;

    @Override
    public void open(FunctionContext context) throws Exception {
        // 4. 在 UDF 中获取全局参数 hashcode_factor
        // 用户可以配置全局作业参数 "hashcode_factor"
        // 获取参数 "hashcode_factor"
        // 如果不存在,则使用默认值 "12"
        factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "12"));
    }

    public int eval(String s) {
        return s.hashCode() * factor;
    }
}

TableEnvironment env = TableEnvironment.create(...);

// 1. 设置任务参数
env.getConfig().addJobParameter("hashcode_factor", "31");

// 2. 注册函数
env.createTemporarySystemFunction("hashCode", HashCodeFunction.class);

// 3. 调用函数
env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable");

以上就是关于开发一个 UDF 之前,你需要注意的一些事项,这些内容不但包含了一些基础必备知识,也包含了一些扩展知识,帮助我们开发更强大的 UDF。

4.7.SQL 标量函数(Scalar Function)

标量函数即 UDF,常常用于进一条数据出一条数据的场景。

使用 JavaScala 开发一个 Scalar Function 必须包含以下几点:

  1. ⭐ 实现 org.apache.flink.table.functions.ScalarFunction 接口
  2. ⭐ 实现一个或者多个自定义的 eval 函数,名称必须叫做 eval,eval 方法签名必须是 public 的
  3. ⭐ eval 方法的入参、出参都是直接体现在 eval 函数的签名中

举例:

代码语言:javascript复制
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;

public static class HashFunction extends ScalarFunction {

  // 接受任意类型输入,返回 INT 型输出
  public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
    return o.hashCode();
  }
}

TableEnvironment env = TableEnvironment.create(...);

// 在 Table API 里不经注册直接调用函数
env.from("MyTable").select(call(HashFunction.class, $("myField")));

// 注册函数
env.createTemporarySystemFunction("HashFunction", HashFunction.class);

// 在 Table API 里调用注册好的函数
env.from("MyTable").select(call("HashFunction", $("myField")));

// 在 SQL 里调用注册好的函数
env.sqlQuery("SELECT HashFunction(myField) FROM MyTable");

4.8.SQL 表值函数(Table Function)

表值函数即 UDTF,常用于进一条数据,出多条数据的场景。

使用 JavaScala 开发一个 Table Function 必须包含以下几点:

  1. ⭐ 实现 org.apache.flink.table.functions.TableFunction 接口
  2. ⭐ 实现一个或者多个自定义的 eval 函数,名称必须叫做 eval,eval 方法签名必须是 public 的
  3. ⭐ eval 方法的入参是直接体现在 eval 函数签名中,出参是体现在 TableFunction 类的泛型参数 T 中,eval 是没有返回值的,这一点是和标量函数不同的,Flink TableFunction 接口提供了 collect(T) 来发送输出的数据。这一点也比较好理解,如果都体现在函数签名上,那就成了标量函数了,而使用 collect(T) 才能体现出 进一条数据 出多条数据

在 SQL 中是用 SQL 中的 LATERAL TABLE(<TableFunction>) 配合 JOINLEFT JOIN xxx ON TRUE 使用。

举例:

代码语言:javascript复制
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*;

@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {

  public void eval(String str) {
    for (String s : str.split(" ")) {
      // 输出结果
      collect(Row.of(s, s.length()));
    }
  }
}

TableEnvironment env = TableEnvironment.create(...);

// 在 Table API 里可以直接调用 UDF
env
  .from("MyTable")
  .joinLateral(call(SplitFunction.class, $("myField")))
  .select($("myField"), $("word"), $("length"));

env
  .from("MyTable")
  .leftOuterJoinLateral(call(SplitFunction.class, $("myField")))
  .select($("myField"), $("word"), $("length"));

// 在 Table API 里重命名 UDF 的结果字段
env
  .from("MyTable")
  .leftOuterJoinLateral(call(SplitFunction.class, $("myField")).as("newWord", "newLength"))
  .select($("myField"), $("newWord"), $("newLength"));

// 注册函数
env.createTemporarySystemFunction("SplitFunction", SplitFunction.class);

// 在 Table API 里调用注册好的 UDF
env
  .from("MyTable")
  .joinLateral(call("SplitFunction", $("myField")))
  .select($("myField"), $("word"), $("length"));

env
  .from("MyTable")
  .leftOuterJoinLateral(call("SplitFunction", $("myField")))
  .select($("myField"), $("word"), $("length"));

// 在 SQL 里调用注册好的 UDF
env.sqlQuery(
  "SELECT myField, word, length "  
  "FROM MyTable, LATERAL TABLE(SplitFunction(myField))");

env.sqlQuery(
  "SELECT myField, word, length "  
  "FROM MyTable "  
  "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");

// 在 SQL 里重命名 UDF 字段
env.sqlQuery(
  "SELECT myField, newWord, newLength "  
  "FROM MyTable "  
  "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE");

注意: 如果你是使用 Scala 实现函数,不要使用 Scala 中 object 实现 UDF,Scala object 是单例的,有可能会导致并发问题。

4.9.SQL 聚合函数(Aggregate Function)

聚合函数即 UDAF,常用于进多条数据,出一条数据的场景。

UDAF

上面的图片展示了一个聚合函数的例子以及聚合函数包含的几个重要方法。

假设你有一个关于饮料的表。表里面有三个字段,分别是 id、name、price,表里有 5 行数据。

假设你需要找到所有饮料里最贵的饮料的价格,即执行一个 max() 聚合就能拿到结果。那么 max() 聚合的执行旧需要遍历所有 5 行数据,最终结果就只有一个数值。

使用 JavaScala 开发一个 Aggregate Function 必须包含以下几点:

  1. ⭐ 实现 AggregateFunction 接口,其中所有的方法必须是 public 的、非 static 的
  2. ⭐ 必须实现以下几个方法:
  • Acc聚合中间结果 createAccumulator():为当前 Key 初始化一个空的 accumulator,其存储了聚合的中间结果,比如在执行 max() 时会存储当前的 max 值
  • accumulate(Acc accumulator, Input输入参数):对于每一行数据,都会调用 accumulate() 方法来更新 accumulator,这个方法就是用于处理每一条输入数据;这个方法必须声明为 public 和非 static 的。accumulate 方法可以重载,每个方法的参数类型可以不同,并且支持变长参数。
  • Output输出参数 getValue(Acc accumulator):通过调用 getValue 方法来计算和返回最终的结果
  1. ⭐ 还有几个方法是在某些场景下才必须实现的:
  • retract(Acc accumulator, Input输入参数):在回撤流的场景下必须要实现,Flink 在计算回撤数据时需要进行调用,如果没有实现则会直接报错
  • merge(Acc accumulator, Iterable<Acc> it):在许多批式聚合以及流式聚合中的 Session、Hop 窗口聚合场景下都是必须要实现的。除此之外,这个方法对于优化也很多帮助。例如,如果你打开了两阶段聚合优化,就需要 AggregateFunction 实现 merge 方法,从而可以做到在数据进行 shuffle 前先进行一次聚合计算。
  • resetAccumulator():在批式聚合中是必须实现的。
  1. ⭐ 还有几个关于入参、出参数据类型信息的方法,默认情况下,用户的 Input输入参数accumulate(Acc accumulator, Input输入参数) 的入参 Input输入参数)、accumulator(Acc聚合中间结果 createAccumulator() 的返回结果)、Output输出参数 数据类型(Output输出参数 getValue(Acc accumulator)Output输出参数)都会被 Flink 使用反射获取到。但是对于 accumulatorOutput输出参数 类型来说,Flink SQL 的类型推导在遇到复杂类型的时候可能会推导出错误的结果(注意:Input输入参数 因为是上游算子传入的,所以类型信息是确认的,不会出现推导错误的情况),比如那些非基本类型 POJO 的复杂类型。所以跟 ScalarFunction 和 TableFunction 一样,AggregateFunction 提供了 AggregateFunction#getResultType()AggregateFunction#getAccumulatorType() 来分别指定最终返回值类型和 accumulator 的类型,两个函数的返回值类型都是 TypeInformation,所以熟悉 DataStream 的小伙伴很容易上手。
  • getResultType():即 Output输出参数 getValue(Acc accumulator) 的输出结果数据类型
  • getAccumulatorType():即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型

这个时候,我们直接来举一个加权平均值的例子看下,总共 3 个步骤:

  • ⭐ 定义一个聚合函数来计算某一列的加权平均
  • ⭐ 在 TableEnvironment 中注册函数
  • ⭐ 在查询中使用函数

为了计算加权平均值,accumulator 需要存储加权总和以及数据的条数。在我们的例子里,我们定义了一个类 WeightedAvgAccumulator 来作为 accumulator。

Flink 的 checkpoint 机制会自动保存 accumulator,在失败时进行恢复,以此来保证精确一次的语义。

我们的 WeightedAvg(聚合函数)的 accumulate 方法有三个输入参数。第一个是 WeightedAvgAccum accumulator,另外两个是用户自定义的输入:输入的值 ivalue 和 输入的权重 iweight。

尽管 retract()、merge()、resetAccumulator() 这几个方法在大多数聚合类型中都不是必须实现的,博主也在样例中提供了他们的实现。并且定义了 getResultType() 和 getAccumulatorType()。

代码语言:javascript复制
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.AggregateFunction;
import static org.apache.flink.table.api.Expressions.*;

// 自定义一个计算权重 avg 的 accmulator
public static class WeightedAvgAccumulator {
  public long sum = 0;
  public int count = 0;
}

// 输入:Long iValue, Integer iWeight
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccumulator> {

  @Override 
  // 创建一个 accumulator
  public WeightedAvgAccumulator createAccumulator() {
    return new WeightedAvgAccumulator();
  }

  public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {
    acc.sum  = iValue * iWeight;
    acc.count  = iWeight;
  }

  public void retract(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {
    acc.sum -= iValue * iWeight;
    acc.count -= iWeight;
  }

  @Override
  // 获取返回结果
  public Long getValue(WeightedAvgAccumulator acc) {
    if (acc.count == 0) {
      return null;
    } else {
      return acc.sum / acc.count;
    }
  }

  // Session window 可以使用这个方法将几个单独窗口的结果合并
  public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {
    for (WeightedAvgAccumulator a : it) {
      acc.count  = a.count;
      acc.sum  = a.sum;
    }
  }

  public void resetAccumulator(WeightedAvgAccumulator acc) {
    acc.count = 0;
    acc.sum = 0L;
  }
}

TableEnvironment env = TableEnvironment.create(...);

env
  .from("MyTable")
  .groupBy($("myField"))
  .select($("myField"), call(WeightedAvg.class, $("value"), $("weight")));

// 注册函数
env.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);

// Table API 调用函数
env
  .from("MyTable")
  .groupBy($("myField"))
  .select($("myField"), call("WeightedAvg", $("value"), $("weight")));

// SQL API 调用函数
env.sqlQuery(
  "SELECT myField, WeightedAvg(`value`, weight) FROM MyTable GROUP BY myField"
);

4.10.SQL 表值聚合函数(Table Aggregate Function)

表值聚合函数即 UDTAF。首先说明这个函数目前只能在 Table API 中进行使用,不能在 SQL API 中使用。那么这个函数有什么作用呢,为什么被创建出来?

因为在 SQL 表达式中,如果我们想对数据先分组再进行聚合取值,能选择的就是 select max(xxx) from source_table group by key1, key2。但是上面这个 SQL 的 max 语义最后产出的结果只有一条最终结果,如果我想取聚合结果最大的 n 条数据,并且 n 条数据,每一条都要输出一次结果数据,上面的 SQL 就没有办法实现了(因为在聚合的情况下还输出多条,从上述 SQL 语义上来说就是不正确的)。

所以 UDTAF 就是为了处理这种场景,他可以让我们自定义 怎么去取多少条 最终的聚合结果。所以可以看到 UDTAF 和 UDAF 是类似的。如下图所示:

UDTAF

上图展示了一个表值聚合函数的例子。

假设你有一个饮料的表,这个表有 3 列,分别是 id、name 和 price,一共有 5 行。

假设你需要找到价格最高的两个饮料,类似于 top2() 表值聚合函数。你需要遍历所有 5 行数据,输出结果为 2 行数据的一个表。

使用 JavaScala 开发一个 Table Aggregate Function 必须包含以下几点:

  1. ⭐ 实现 TableAggregateFunction 接口,其中所有的方法必须是 public 的、非 static 的
  2. ⭐ 必须实现以下几个方法:
  • Acc聚合中间结果 createAccumulator():为当前 Key 初始化一个空的 accumulator,其存储了聚合的中间结果,比如在执行 max() 时会存储每一条中间结果的 max 值
  • accumulate(Acc accumulator, Input输入参数):对于每一行数据,都会调用 accumulate() 方法来更新 accumulator,这个方法就是对每一条输入数据进行执行,比如执行 max() 时,遍历每一条数据执行;在实现这个方法是必须声明为 public 和非 static 的。accumulate 方法可以重载,每个方法的参数类型不同,并且支持变长参数。
  • emitValue(Acc accumulator, Collector<OutPut> collector) 或者 emitUpdateWithRetract(Acc accumulator, RetractableCollector<OutPut> collector):当遍历所有的数据,当所有的数据都处理完了之后,通过调用 emit 方法来计算和输出最终的结果,在这里你就可以自定义到底输出多条少以及怎么样去输出结果。那么对于 emitValue 以及 emitUpdateWithRetract 的区别来说,拿 TopN 实现来说,emitValue 每次都会发送所有的最大的 n 个值,而这在流式任务中可能会有一些性能问题。为了提升性能,用户可以实现 emitUpdateWithRetract 方法。这个方法在 retract 模式下会增量的输出结果,比如只在有数据更新时,可以做到撤回老的数据,然后再发送新的数据,而不需要每次都发出全量的最新数据。如果我们同时定义了 emitUpdateWithRetract、emitValue 方法,那 emitUpdateWithRetract 会优先于 emitValue 方法被使用,因为引擎会认为 emitUpdateWithRetract 会更加高效,因为它的输出是增量的。
  1. ⭐ 还有几个方法是在某些场景下才必须实现的:
  • retract(Acc accumulator, Input输入参数):在回撤流的场景下必须要实现,Flink 在计算回撤数据时需要进行调用,如果没有实现则会直接报错
  • merge(Acc accumulator, Iterable<Acc> it):在许多批式聚合以及流式聚合中的 Session、Hop 窗口聚合场景下都是必须要实现的。除此之外,这个方法对于优化也很多帮助。例如,如果你打开了两阶段聚合优化,就需要 AggregateFunction 实现 merge 方法,从而在第一阶段先进行数据聚合。
  • resetAccumulator():在批式聚合中是必须实现的。
  1. ⭐ 还有几个关于入参、出参数据类型信息的方法,默认情况下,用户的 Input输入参数accumulate(Acc accumulator, Input输入参数) 的入参 Input输入参数)、accumulator(Acc聚合中间结果 createAccumulator() 的返回结果)、Output输出参数 数据类型(emitValue(Acc acc, Collector<Output输出参数> out)Output输出参数)都会被 Flink 使用反射获取到。但是对于 accumulatorOutput输出参数 类型来说,Flink SQL 的类型推导在遇到复杂类型的时候可能会推导出错误的结果(注意:Input输入参数 因为是上游算子传入的,所以类型信息是确认的,不会出现推导错误的情况),比如那些非基本类型 POJO 的复杂类型。所以跟 ScalarFunction 和 TableFunction 一样,AggregateFunction 提供了 TableAggregateFunction#getResultType()TableAggregateFunction#getAccumulatorType() 来分别指定最终返回值类型和 accumulator 的类型,两个函数的返回值类型都是 TypeInformation,所以熟悉 DataStream 的小伙伴很容易上手。
  • getResultType():即 emitValue(Acc acc, Collector<Output输出参数> out) 的输出结果数据类型
  • getAccumulatorType():即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型

这个时候,我们直接来举一个 Top2 的例子看下吧:

  • ⭐ 定义一个 TableAggregateFunction 来计算给定列的最大的 2 个值
  • ⭐ 在 TableEnvironment 中注册函数
  • ⭐ 在 Table API 查询中使用函数(当前只在 Table API 中支持 TableAggregateFunction)

为了计算最大的 2 个值,accumulator 需要保存当前看到的最大的 2 个值。

在我们的例子中,我们定义了类 Top2Accum 来作为 accumulator。

Flink 的 checkpoint 机制会自动保存 accumulator,并且在失败时进行恢复,来保证精确一次的语义。

我们的 Top2 表值聚合函数(TableAggregateFunction)的 accumulate() 方法有两个输入,第一个是 Top2Accum accumulator,另一个是用户定义的输入:输入的值 v。尽管 merge() 方法在大多数聚合类型中不是必须的,我们也在样例中提供了它的实现。并且定义了 getResultType() 和 getAccumulatorType() 方法。

代码语言:javascript复制
/**
 * Accumulator for Top2.
 */
public class Top2Accum {
    public Integer first;
    public Integer second;
}

public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {

    @Override
    public Top2Accum createAccumulator() {
        Top2Accum acc = new Top2Accum();
        acc.first = Integer.MIN_VALUE;
        acc.second = Integer.MIN_VALUE;
        return acc;
    }


    public void accumulate(Top2Accum acc, Integer v) {
        if (v > acc.first) {
            acc.second = acc.first;
            acc.first = v;
        } else if (v > acc.second) {
            acc.second = v;
        }
    }

    public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
        for (Top2Accum otherAcc : iterable) {
            accumulate(acc, otherAcc.first);
            accumulate(acc, otherAcc.second);
        }
    }

    public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
        // emit the value and rank
        if (acc.first != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.first, 1));
        }
        if (acc.second != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.second, 2));
        }
    }
}

// 注册函数
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("top2", new Top2());

// 初始化表
Table tab = ...;

// 使用函数
tab.groupBy("key")
    .flatAggregate("top2(a) as (v, rank)")
    .select("key, v, rank");

下面的例子展示了如何使用 emitUpdateWithRetract 方法来只发送更新的数据。

为了只发送更新的结果,accumulator 保存了上一次的最大的 2 个值,也保存了当前最大的 2 个值。

代码语言:javascript复制
/**
 * Accumulator for Top2.
 */
public class Top2Accum {
    public Integer first;
    public Integer second;
    public Integer oldFirst;
    public Integer oldSecond;
}

public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {

    @Override
    public Top2Accum createAccumulator() {
        Top2Accum acc = new Top2Accum();
        acc.first = Integer.MIN_VALUE;
        acc.second = Integer.MIN_VALUE;
        acc.oldFirst = Integer.MIN_VALUE;
        acc.oldSecond = Integer.MIN_VALUE;
        return acc;
    }

    public void accumulate(Top2Accum acc, Integer v) {
        if (v > acc.first) {
            acc.second = acc.first;
            acc.first = v;
        } else if (v > acc.second) {
            acc.second = v;
        }
    }

    public void emitUpdateWithRetract(Top2Accum acc, RetractableCollector<Tuple2<Integer, Integer>> out) {
        if (!acc.first.equals(acc.oldFirst)) {
            // if there is an update, retract old value then emit new value.
            if (acc.oldFirst != Integer.MIN_VALUE) {
                out.retract(Tuple2.of(acc.oldFirst, 1));
            }
            out.collect(Tuple2.of(acc.first, 1));
            acc.oldFirst = acc.first;
        }

        if (!acc.second.equals(acc.oldSecond)) {
            // if there is an update, retract old value then emit new value.
            if (acc.oldSecond != Integer.MIN_VALUE) {
                out.retract(Tuple2.of(acc.oldSecond, 2));
            }
            out.collect(Tuple2.of(acc.second, 2));
            acc.oldSecond = acc.second;
        }
    }
}

// 注册函数
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("top2", new Top2());

// 初始化表
Table tab = ...;

// 使用函数
tab.groupBy("key")
    .flatAggregate("top2(a) as (v, rank)")
    .select("key, v, rank");

5.SQL 能力扩展篇

5.1.SQL UDF 扩展 - Module

源码公众号后台回复1.13.2 最全 flink sql获取。

在介绍 Flink Module 具体能力之前,我们先来聊聊博主讲述的思路:

  1. ⭐ 背景及应用场景介绍
  2. ⭐ Flink Module 功能介绍
  3. ⭐ 应用案例:Flink SQL 支持 Hive UDF

5.1.1.Flink SQL Module 应用场景

兄弟们,想想其实大多数公司都是从离线数仓开始建设的。相信大家必然在自己的生产环境中开发了非常多的 Hive UDF。随着需求对于时效性要求的增高,越来越多的公司也开始建设起实时数仓。很多场景下实时数仓的建设都是随着离线数仓而建设的。实时数据使用 Flink 产出,离线数据使用 Hive/Spark 产出。

那么回到我们的问题:为什么需要给 Flink UDF 做扩展呢?可能这个问题比较大,那么博主分析的具体一些,如果 Flink 扩展支持 Hive UDF 对我们有哪些好处呢?

博主分析了下,结论如下:

站在数据需求的角度来说,一般会有以下两种情况:

  1. ⭐ 以前已经有了离线数据链路,需求方也想要实时数据。如果直接能用已经开发好的 hive udf,则不用将相同的逻辑迁移到 flink udf 中,并且后续无需费时费力维护两个 udf 的逻辑一致性。
  2. ⭐ 实时和离线的需求都是新的,需要新开发。如果只开发一套 UDF,则事半功倍。

因此在 Flink 中支持 Hive UDF(也即扩展 Flink 的 UDF 能力)这件事对开发人员提效来说是非常有好处的。

5.1.2.Flink SQL Module 功能介绍

Module 允许 Flink 扩展函数能力。它是可插拔的,Flink 官方本身已经提供了一些 Module,用户也可以编写自己的 Module。

例如,用户可以定义自己的函数,并将其作为加载进入 Flink,以在 Flink SQL 和 Table API 中使用。

再举一个例子,用户可以加载官方已经提供的的 Hive Module,将 Hive 已有的内置函数作为 Flink 的内置函数。

目前 Flink 包含了以下三种 Module:

  1. ⭐ CoreModule:CoreModule 是 Flink 内置的 Module,其包含了目前 Flink 内置的所有 UDF,Flink 默认开启的 Module 就是 CoreModule,我们可以直接使用其中的 UDF
  2. ⭐ HiveModule:HiveModule 可以将 Hive 内置函数作为 Flink 的系统函数提供给 SQLTable API 用户进行使用,比如 get_json_object 这类 Hive 内置函数(Flink 默认的 CoreModule 是没有的)
  3. ⭐ 用户自定义 Module:用户可以实现 Module 接口实现自己的 UDF 扩展 Module

在 Flink 中,Module 可以被 加载启用禁用卸载 Module,当 TableEnvironment 加载(见 SQL 语法篇的 Load Module) Module 之后,默认就是开启的。

Flink 是同时支持多个 Module 的,并且根据加载 Module 的顺序去按顺序查找和解析 UDF,先查到的先解析使用。

此外,Flink 只会解析已经启用了的 Module。那么当两个 Module 中出现两个同名的函数时,会有以下三种情况:

  1. ⭐ 如果两个 Module 都启用的话,Flink 会根据加载 Module 的顺序进行解析,结果就是会使用顺序为第一个的 Module 的 UDF
  2. ⭐ 如果只有一个 Module 启用的话,Flink 就只会从启用的 Module 解析 UDF
  3. ⭐ 如果两个 Module 都没有启用,Flink 就无法解析这个 UDF

当然如果出现第一种情况时,用户也可以改变使用 Module 的顺序。比如用户可以使用 USE MODULE hive, core 语句去将 Hive Module 设为第一个使用及解析的 Module。

另外,用户可以使用 USE MODULES hive 去禁用默认的 core Module,注意,禁用不是卸载 Module,用户之后还可以再次启用 Module,并且使用 USE MODULES core 去将 core Module 设置为启用的。如果使用未加载的 Module,则会直接抛出异常。

禁用和卸载 Module 的区别在于禁用依然会在 TableEnvironment 保留 Module,用户依然可以使用使用 list 命令看到禁用的 Module。

注意: 由于 Module 的 UDF 是被 Flink 认为是 Flink 系统内置的,它不和任何 Catalog,数据库绑定,所以这部分 UDF 没有对应的命名空间,即没有 Catalog,数据库命名空间。

  1. ⭐ 使用 SQL API 加载、卸载、使用、列出 Module
代码语言:javascript复制
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// 展示加载和启用的 Module
tableEnv.executeSql("SHOW MODULES").print();
//  ------------- 
// | module name |
//  ------------- 
// |        core |
//  ------------- 
tableEnv.executeSql("SHOW FULL MODULES").print();
//  ------------- ------ 
// | module name | used |
//  ------------- ------ 
// |        core | true |
//  ------------- ------ 

// 加载 hive module
tableEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '...')");

// 展示所有启用的 module
tableEnv.executeSql("SHOW MODULES").print();
//  ------------- 
// | module name |
//  ------------- 
// |        core |
// |        hive |
//  ------------- 

// 展示所有加载的 module 以及它们的启用状态
tableEnv.executeSql("SHOW FULL MODULES").print();
//  ------------- ------ 
// | module name | used |
//  ------------- ------ 
// |        core | true |
// |        hive | true |
//  ------------- ------ 

// 改变 module 解析顺序
tableEnv.executeSql("USE MODULES hive, core");
tableEnv.executeSql("SHOW MODULES").print();
//  ------------- 
// | module name |
//  ------------- 
// |        hive |
// |        core |
//  ------------- 
tableEnv.executeSql("SHOW FULL MODULES").print();
//  ------------- ------ 
// | module name | used |
//  ------------- ------ 
// |        hive | true |
// |        core | true |
//  ------------- ------ 

// 禁用 core module
tableEnv.executeSql("USE MODULES hive");
tableEnv.executeSql("SHOW MODULES").print();
//  ------------- 
// | module name |
//  ------------- 
// |        hive |
//  ------------- 
tableEnv.executeSql("SHOW FULL MODULES").print();
//  ------------- ------- 
// | module name |  used |
//  ------------- ------- 
// |        hive |  true |
// |        core | false |
//  ------------- ------- 

// 卸载 hive module
tableEnv.executeSql("UNLOAD MODULE hive");
tableEnv.executeSql("SHOW MODULES").print();
// Empty set
tableEnv.executeSql("SHOW FULL MODULES").print();
//  ------------- ------- 
// | module name |  used |
//  ------------- ------- 
// |        hive | false |
//  ------------- ------- 
  1. ⭐ 使用 Java API 加载、卸载、使用、列出 Module
代码语言:javascript复制
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// Show initially loaded and enabled modules
tableEnv.listModules();
//  ------------- 
// | module name |
//  ------------- 
// |        core |
//  ------------- 
tableEnv.listFullModules();
//  ------------- ------ 
// | module name | used |
//  ------------- ------ 
// |        core | true |
//  ------------- ------ 

// Load a hive module
tableEnv.loadModule("hive", new HiveModule());

// Show all enabled modules
tableEnv.listModules();
//  ------------- 
// | module name |
//  ------------- 
// |        core |
// |        hive |
//  ------------- 

// Show all loaded modules with both name and use status
tableEnv.listFullModules();
//  ------------- ------ 
// | module name | used |
//  ------------- ------ 
// |        core | true |
// |        hive | true |
//  ------------- ------ 

// Change resolution order
tableEnv.useModules("hive", "core");
tableEnv.listModules();
//  ------------- 
// | module name |
//  ------------- 
// |        hive |
// |        core |
//  ------------- 
tableEnv.listFullModules();
//  ------------- ------ 
// | module name | used |
//  ------------- ------ 
// |        hive | true |
// |        core | true |
//  ------------- ------ 

// Disable core module
tableEnv.useModules("hive");
tableEnv.listModules();
//  ------------- 
// | module name |
//  ------------- 
// |        hive |
//  ------------- 
tableEnv.listFullModules();
//  ------------- ------- 
// | module name |  used |
//  ------------- ------- 
// |        hive |  true |
// |        core | false |
//  ------------- ------- 

// Unload hive module
tableEnv.unloadModule("hive");
tableEnv.listModules();
// Empty set
tableEnv.listFullModules();
//  ------------- ------- 
// | module name |  used |
//  ------------- ------- 
// |        hive | false |
//  ------------- ------- 

5.1.3.应用案例:Flink SQL 支持 Hive UDF

Flink 支持 hive UDF 这件事分为两个部分。

  1. ⭐ Flink 扩展支持 hive 内置 UDF
  2. ⭐ Flink 扩展支持用户自定义 hive UDF

第一部分:Flink 扩展支持 Hive 内置 UDF,比如 get_json_objectrlike 等等。

有同学问了,这么基本的 UDF,Flink 都没有吗?

确实没有。关于 Flink SQL 内置的 UDF 见如下链接,大家可以看看 Flink 支持了哪些 UDF:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions/

那么如果我如果强行使用 get_json_object 这个 UDF,会发生啥呢?结果如下图。

直接报错找不到 UDF。

error

第二部分:Flink 扩展支持用户自定义 Hive UDF。

内置函数解决不了用户的复杂需求,用户就需要自己写 Hive UDF,并且这部分自定义 UDF 也想在 flink sql 中使用。

下面看看怎么在 Flink SQL 中进行这两种扩展。

  1. ⭐ flink 扩展支持 hive 内置 udf

步骤如下:

  • ⭐ 引入 hive 的 connector。其中包含了 flink 官方提供的一个 HiveModule。在 HiveModule 中包含了 hive 内置的 udf。
代码语言:javascript复制
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
  • ⭐ 在 StreamTableEnvironment 中加载 HiveModule
代码语言:javascript复制
String name = "default";
String version = "3.1.2";
tEnv.loadModule(name, new HiveModule(version));

然后在控制台打印一下目前有的 module。

代码语言:javascript复制
String[] modules = tEnv.listModules();
Arrays.stream(modules).forEach(System.out::println);

然后可以看到除了 core module,还有我们刚刚加载进去的 default module。

代码语言:javascript复制
default
core
  • ⭐ 查看所有 module 的所有 udf。在控制台打印一下。
代码语言:javascript复制
String[] functions = tEnv.listFunctions();
Arrays.stream(functions).forEach(System.out::println);

就会将 default 和 core module 中的所有包含的 udf 给列举出来,当然也就包含了 hive module 中的 get_json_object。

get_json_object

然后我们再去在 Flink SQL 中使用 get_json_object 这个 UDF,就没有报错,能正常输出结果了。

使用 Flink Hive connector 自带的 HiveModule,已经能够解决很大一部分常见 UDF 使用的问题了。

  1. ⭐ Flink 扩展支持用户自定义 Hive UDF

原本博主是直接想要使用 Flink SQL 中的 create temporary function 去执行引入自定义 Hive UDF 的。

举例如下:

代码语言:javascript复制
CREATE TEMPORARY FUNCTION test_hive_udf as 'flink.examples.sql._09.udf._02_stream_hive_udf.TestGenericUDF';

发现在执行这句 SQL 时,是可以执行成功,将 UDF 注册进去的。

但是在后续 UDF 初始化时就报错了。具体错误如下图。直接报错 ClassCastException。

ddl hive udf error

看了下源码,Flink 流任务模式下(未连接 Hive MetaStore 时)在创建 UDF 时会认为这个 UDF 是 Flink 生态体系中的 UDF。

所以在初始化我们引入的 TestGenericUDF 时,默认会按照 Flink 的 UserDefinedFunction 强转,因此才会报强转错误。

那么我们就不能使用 Hive UDF 了吗?

错误,小伙伴萌岂敢有这种想法。博主都把这个标题列出来了(牛逼都吹出去了),还能给不出解决方案嘛。

思路见下一节。

  1. ⭐ Flink 扩展支持用户自定义 Hive UDF 的增强 module

其实思路很简单。

使用 Flink SQL 中的 create temporary function 虽然不能执行,但是 Flink 提供了插件化的自定义 module。

我们可以扩展一个支持用户自定义 Hive UDF 的 module,使用这个 module 来支持自定义的 Hive UDF。

实现的代码也非常简单。简单的把 Flink Hive connector 提供的 HiveModule 做一个增强即可,即下图中的 HiveModuleV2。使用方式如下图所示:

源码公众号后台回复1.13.2 sql hive udf获取。

hive module enhance

然后程序就正常跑起来了。

肥肠滴好用!

5.2.SQL 元数据扩展 - Catalog

5.2.1.Flink Catalog 功能介绍

数据处理最关键的方面之一是管理元数据。元数据可以是临时的,例如临时表、UDF。元数据也可以是持久化的,例如 Hive MetaStore 中的元数据。

Flink SQL 中是由 Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。对标 Hive 去理解就是 Hive 的 MetaStore,都是用于存储计算引擎涉及到的元数据信息。

Catalog 允许用户引用其数据存储系统中现有的元数据,并自动将其映射到 Flink 的相应元数据。例如,Flink 可以直接使用 Hive MetaStore 中的表的元数据,也可以将 Flink SQL 中的元数据存储到 Hive MetaStore 中。Catalog 极大地简化了用户开始使用 Flink 的步骤,提升了用户体验。

目前 Flink 包含了以下四种 Catalog:

  1. ⭐ GenericInMemoryCatalog:GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期(即一个 Flink 任务一次运行生命周期内)内可用。
  2. ⭐ JdbcCatalog:JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。PostgresCatalog 是当前实现的唯一一种 JDBC Catalog,即可以将 Flink SQL 的预案数据存储在 Postgres 中。
代码语言:javascript复制
// PostgresCatalog 方法支持的方法
PostgresCatalog.databaseExists(String databaseName)
PostgresCatalog.listDatabases()
PostgresCatalog.getDatabase(String databaseName)
PostgresCatalog.listTables(String databaseName)
PostgresCatalog.getTable(ObjectPath tablePath)
PostgresCatalog.tableExists(ObjectPath tablePath)
  1. ⭐ HiveCatalog:HiveCatalog 有两个用途,作为 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。注意:Hive MetaStore 以小写形式存储所有元数据对象名称。而 GenericInMemoryCatalog 会区分大小写。
代码语言:javascript复制
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name            = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir     = "/opt/hive-conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
  1. ⭐ 用户自定义 Catalog:用户可以实现 Catalog 接口实现自定义 Catalog

下面看看 Flink Catalog 提供了什么 API,以及对应 API 的使用案例:

  1. ⭐ 使用 SQL API 将表创建注册进 Catalog
代码语言:javascript复制
TableEnvironment tableEnv = ...

// 创建 HiveCatalog 
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");

// 注册 catalog
tableEnv.registerCatalog("myhive", catalog);

// 在 catalog 中创建 database
tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");

// 在 catalog 中创建表
tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");

tableEnv.listTables(); // 列出当前 myhive.mydb 中的所有表
  1. ⭐ 使用 Java API 将表创建注册进 Catalog
代码语言:javascript复制
import org.apache.flink.table.api.*;
import org.apache.flink.table.catalog.*;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.descriptors.Kafka;

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());

// 创建 HiveCatalog 
Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");

// 注册 catalog
tableEnv.registerCatalog("myhive", catalog);

// 在 catalog 中创建 database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));

// 在 catalog 中创建表
TableSchema schema = TableSchema.builder()
    .field("name", DataTypes.STRING())
    .field("age", DataTypes.INT())
    .build();

catalog.createTable(
        new ObjectPath("mydb", "mytable"), 
        new CatalogTableImpl(
            schema,
            new Kafka()
                .version("0.11")
                ....
                .startFromEarlist()
                .toProperties(),
            "my comment"
        ),
        false
    );
    
List<String> tables = catalog.listTables("mydb"); // 列出当前 myhive.mydb 中的所有表

5.2.2.操作 Catalog 的 API

这里只列出了 Java 的 Catalog API,用户也可以使用 SQL DDL API 实现相同的功能。关于 DDL 的详细信息请参考之前介绍到的 SQL CREATE DDL 章节。

  1. ⭐ Catalog 操作
代码语言:javascript复制
// 注册 Catalog
tableEnv.registerCatalog(new CustomCatalog("myCatalog"));

// 切换 Catalog 和 Database
tableEnv.useCatalog("myCatalog");
tableEnv.useDatabase("myDb");
// 也可以通过以下方式访问对应的表
tableEnv.from("not_the_current_catalog.not_the_current_db.my_table");
  1. ⭐ 数据库操作
代码语言:javascript复制
// create database
catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);

// drop database
catalog.dropDatabase("mydb", false);

// alter database
catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);

// get databse
catalog.getDatabase("mydb");

// check if a database exist
catalog.databaseExists("mydb");

// list databases in a catalog
catalog.listDatabases("mycatalog");
  1. ⭐ 表操作
代码语言:javascript复制
// create table
catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// drop table
catalog.dropTable(new ObjectPath("mydb", "mytable"), false);

// alter table
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);

// rename table
catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");

// get table
catalog.getTable("mytable");

// check if a table exist or not
catalog.tableExists("mytable");

// list tables in a database
catalog.listTables("mydb");
  1. ⭐ 视图操作
代码语言:javascript复制
// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);

// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);

// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);

// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);

// get view
catalog.getTable("myview");

// check if a view exist or not
catalog.tableExists("mytable");

// list views in a database
catalog.listViews("mydb");
  1. ⭐ 分区操作
代码语言:javascript复制
// create view
catalog.createPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// drop partition
catalog.dropPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...), false);

// alter partition
catalog.alterPartition(
    new ObjectPath("mydb", "mytable"),
    new CatalogPartitionSpec(...),
    new CatalogPartitionImpl(...),
    false);

// get partition
catalog.getPartition(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// check if a partition exist or not
catalog.partitionExists(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// list partitions of a table
catalog.listPartitions(new ObjectPath("mydb", "mytable"));

// list partitions of a table under a give partition spec
catalog.listPartitions(new ObjectPath("mydb", "mytable"), new CatalogPartitionSpec(...));

// list partitions of a table by expression filter
catalog.listPartitionsByFilter(new ObjectPath("mydb", "mytable"), Arrays.asList(epr1, ...));
  1. ⭐ 函数操作
代码语言:javascript复制
// create function
catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// drop function
catalog.dropFunction(new ObjectPath("mydb", "myfunc"), false);

// alter function
catalog.alterFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);

// get function
catalog.getFunction("myfunc");

// check if a function exist or not
catalog.functionExists("myfunc");

// list functions in a database
catalog.listFunctions("mydb");

5.3.SQL 任务参数配置

关于 Flink SQL 详细的配置项及功能如下链接所示,详细内容大家可以点击链接去看,博主下面只介绍常用的性能优化参数及其功能:

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/config/

5.3.1.参数设置方式

Flink SQL 相关参数需要在 TableEnvironment 中设置。如下案例:

代码语言:javascript复制
// instantiate table environment
TableEnvironment tEnv = ...

// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");

具体参数分为以下 3 类:

  1. ⭐ 运行时参数:优化 Flink SQL 任务在执行时的任务性能
  2. ⭐ 优化器参数:Flink SQL 任务在生成执行计划时,经过优化器优化生成更优的执行计划
  3. ⭐ 表参数:用于调整 Flink SQL table 的执行行为

5.3.2.运行时参数

用于优化 Flink SQL 任务在执行时的任务性能。

代码语言:javascript复制
// 默认值:100
// 值类型:Integer
// 流批任务:流、批任务都支持
// 用处:异步 lookup join 中最大的异步 IO 执行数目
table.exec.async-lookup.buffer-capacity: 100

// 默认值:false
// 值类型:Boolean
// 流批任务:流任务支持
// 用处:MiniBatch 优化是一种专门针对 unbounded 流任务的优化(即非窗口类应用),其机制是在 `允许的延迟时间间隔内` 以及 `达到最大缓冲记录数` 时触发以减少 `状态访问` 的优化,从而节约处理时间。下面两个参数一个代表 `允许的延迟时间间隔`,另一个代表 `达到最大缓冲记录数`。
table.exec.mini-batch.enabled: false

// 默认值:0 ms
// 值类型:Duration
// 流批任务:流任务支持
// 用处:此参数设置为多少就代表 MiniBatch 机制最大允许的延迟时间。注意这个参数要配合 `table.exec.mini-batch.enabled` 为 true 时使用,而且必须大于 0 ms
table.exec.mini-batch.allow-latency: 0 ms

// 默认值:-1
// 值类型:Long
// 流批任务:流任务支持
// 用处:此参数设置为多少就代表 MiniBatch 机制最大缓冲记录数。注意这个参数要配合 `table.exec.mini-batch.enabled` 为 true 时使用,而且必须大于 0
table.exec.mini-batch.size: -1

// 默认值:-1
// 值类型:Integer
// 流批任务:流、批任务都支持
// 用处:可以用此参数设置 Flink SQL 中算子的并行度,这个参数的优先级 `高于` StreamExecutionEnvironment 中设置的并行度优先级,如果这个值设置为 -1,则代表没有设置,会默认使用 StreamExecutionEnvironment 设置的并行度
table.exec.resource.default-parallelism: -1

// 默认值:ERROR
// 值类型:Enum【ERROR, DROP】
// 流批任务:流、批任务都支持
// 用处:表上的 NOT NULL 列约束强制不能将 NULL 值插入表中。Flink 支持 `ERROR`(默认)和 `DROP` 配置。默认情况下,当 NULL 值写入 NOT NULL 列时,Flink 会产生运行时异常。用户可以将行为更改为 `DROP`,直接删除此类记录,而不会引发异常。
table.exec.sink.not-null-enforcer: ERROR

// 默认值:false
// 值类型:Boolean
// 流批任务:流任务
// 用处:接入了 CDC 的数据源,上游 CDC 如果产生重复的数据,可以使用此参数在 Flink 数据源算子进行去重操作,去重会引入状态开销
table.exec.source.cdc-events-duplicate: false

// 默认值:0 ms
// 值类型:Duration
// 流批任务:流任务
// 用处:如果此参数设置为 60 s,当 Source 算子在 60 s 内未收到任何元素时,这个 Source 将被标记为临时空闲,此时下游任务就不依赖此 Source 的 Watermark 来推进整体的 Watermark 了。
// 默认值为 0 时,代表未启用检测源空闲。
table.exec.source.idle-timeout: 0 ms

// 默认值:0 ms
// 值类型:Duration
// 流批任务:流任务
// 用处:指定空闲状态(即未更新的状态)将保留多长时间。尤其是在 unbounded 场景中很有用。默认 0 ms 为不清除空闲状态
table.exec.state.ttl: 0 ms

其中上述参数中最常被用到为一下两种:

  1. ⭐ MiniBatch 聚合
代码语言:javascript复制
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 60 s
table.exec.mini-batch.size: 1000000000

具体使用场景如下链接:

https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/tuning/#minibatch-aggregation

  1. ⭐ state ttl 状态过期
代码语言:javascript复制
-- 状态清除如下流 SQL 案例场景很有用,随着实时任务的运行,前几天(即前几天的 p_date)的 state 不会被更新的情况下,就可以使用空闲状态删除机制把 state 给删除
select 
  p_date
  , count(distinct user_id) as uv
from source_table
group 
  p_date

5.3.3.优化器参数

Flink SQL 任务在生成执行计划时,优化生成更优的执行计划

代码语言:javascript复制
// 默认值:AUTO
// 值类型:String
// 流批任务:流、批任务都支持
// 用处:聚合阶段的策略。和 MapReduce 的 Combiner 功能类似,可以在数据 shuffle 前做一些提前的聚合,可以选择以下三种方式
// TWO_PHASE:强制使用具有 localAggregate 和 globalAggregate 的两阶段聚合。请注意,如果聚合函数不支持优化为两个阶段,Flink 仍将使用单阶段聚合。
// 两阶段优化在计算 count,sum 时很有用,但是在计算 count distinct 时需要注意,key 的稀疏程度,如果 key 不稀疏,那么很可能两阶段优化的效果会适得其反
// ONE_PHASE:强制使用只有 CompleteGlobalAggregate 的一个阶段聚合。
// AUTO:聚合阶段没有特殊的执行器。选择 TWO_PHASE 或者 ONE_PHASE 取决于优化器的成本。
// 
// 注意!!!:此优化在窗口聚合中会自动生效,但是在 unbounded agg 中需要与 minibatch 参数相结合使用才会生效
table.optimizer.agg-phase-strategy: AUTO

// 默认值:false
// 值类型:Boolean
// 流批任务:流任务
// 用处:避免 group by 计算 count distinctsum distinct 数据时的 group by 的 key 较少导致的数据倾斜,比如 group by 中一个 key 的 distinct 要去重 500w 数据,而另一个 key 只需要去重 3 个 key,那么就需要先需要按照 distinct 的 key 进行分桶。将此参数设置为 true 之后,下面的 table.optimizer.distinct-agg.split.bucket-num 可以用于决定分桶数是多少
// 后文会介绍具体的案例
table.optimizer.distinct-agg.split.enabled: false

// 默认值:1024
// 值类型:Integer
// 流批任务:流任务
// 用处:避免 group by 计算 count distinct 数据时的 group by 较少导致的数据倾斜。加了此参数之后,会先根据 group by key 结合 hash_code(distinct_key)进行分桶,然后再自动进行合桶。
// 后文会介绍具体的案例
table.optimizer.distinct-agg.split.bucket-num: 1024

// 默认值:true
// 值类型:Boolean
// 流批任务:流任务
// 用处:如果设置为 true,Flink 优化器将会尝试找出重复的自计划并重用。默认为 true 不需要改动
table.optimizer.reuse-sub-plan-enabled: true

// 默认值:true
// 值类型:Boolean
// 流批任务:流任务
// 用处:如果设置为 true,Flink 优化器会找出重复使用的 table source 并且重用。默认为 true 不需要改动
table.optimizer.reuse-source-enabled: true

// 默认值:true
// 值类型:Boolean
// 流批任务:流任务
// 用处:如果设置为 true,Flink 优化器将会做谓词下推到 FilterableTableSource 中,将一些过滤条件前置,提升性能。默认为 true 不需要改动
table.optimizer.source.predicate-pushdown-enabled: true

其中上述参数中最常被用到为以下两种:

  1. ⭐ 两阶段优化:
代码语言:javascript复制
table.optimizer.agg-phase-strategy: AUTO

在计算 count(1),sum(col) 场景汇总提效很高,因为 count(1),sum(col) 在经过本地 localAggregate 之后,每个 group by 的 key 就一个结果值。

注意!!!:此优化在窗口聚合中会自动生效,但是在 unbounded agg 中需要与 minibatch 参数相结合使用才会生效。

  1. ⭐ split 分桶:
代码语言:javascript复制
table.optimizer.distinct-agg.split.enabled: true
table.optimizer.distinct-agg.split.bucket-num: 1024
代码语言:javascript复制
INSERT INTO sink_table
SELECT
    count(distinct user_id) as uv,
    max(cast(server_timestamp as bigint)) as server_timestamp
FROM source_table

-- 上述 SQL 打开了 split 分桶之后的效果等同于以下 SQL

INSERT INTO sink_table
SELECT 
    sum(bucket_uv) as uv
    , max(server_timestamp) as server_timestamp
FROM (
    SELECT
        count(distinct user_id) as bucket_uv,
        max(cast(server_timestamp as bigint)) as server_timestamp
    FROM source_table
    group by
        mod(hash_code(user_id), 1024)
)

注意!!!:如果有多个 distinct key,则多个 distinct key 都会被作为分桶 key。

5.3.4.表参数

代码语言:javascript复制
// 默认值:false
// 值类型:Boolean
// 流批任务:流、批任务都支持
// 用处:DML SQL(即执行 insert into 操作)是异步执行还是同步执行。默认为异步(false),即可以同时提交多个 DML SQL 作业,如果设置为 true,则为同步,第二个 DML 将会等待第一个 DML 操作执行结束之后再执行
table.dml-sync: false

// 默认值:64000
// 值类型:Integer
// 流批任务:流、批任务都支持
// 用处:Flink SQL 会通过生产 java 代码来执行具体的 SQL 逻辑,但是 jvm 限制了一个 java 方法的最大长度不能超过 64KB,但是某些场景下 Flink SQL 生产的 java 代码会超过 64KB,这时 jvm 就会直接报错。因此此参数可以用于限制生产的 java 代码的长度来避免超过 64KB,从而避免 jvm 报错。
table.generated-code.max-length: 64000

// 默认值:default
// 值类型:String
// 流批任务:流、批任务都支持
// 用处:在使用天级别的窗口时,通常会遇到时区问题。举个例子,Flink 开一天的窗口,默认是按照 UTC 零时区进行划分,那么在北京时区划分出来的一天的窗口是第一天的早上 8:00 到第二天的早上 8:00,但是实际场景中想要的效果是第一天的早上 0:00 到第二天的早上 0:00 点。因此可以将此参数设置为 GMT 08:00 来解决这个问题。
table.local-time-zone: default

// 默认值:default
// 值类型:Enum【BLINK、OLD】
// 流批任务:流、批任务都支持
// 用处:Flink SQL planner,默认为 BLINK planner,也可以选择 old planner,但是推荐使用 BLINK planner
table.planner: BLINK

// 默认值:default
// 值类型:String
// 流批任务:流、批任务都支持
// 用处:Flink 解析一个 SQL 的解析器,目前有 Flink SQL 默认的解析器和 Hive SQL 解析器,其区别在于两种解析器支持的语法会有不同,比如 Hive SQL 解析器支持 between and、rlike 语法,Flink SQL 不支持
table.sql-dialect: default

5.4.SQL 性能调优

本小节主要介绍 Flink SQL 中的聚合算子的优化,在某些场景下应用这些优化后,性能提升会非常大。本小节主要包含以下四种优化:

  1. (常用)MiniBatch 聚合:unbounded group agg 中,可以使用 minibatch 聚合来做到微批计算、访问状态、输出结果,避免每来一条数据就计算、访问状态、输出一次结果,从而减少访问 state 的时长(尤其是 Rocksdb)提升性能。
  2. (常用)两阶段聚合:类似 MapReduce 中的 Combiner 的效果,可以先在 shuffle 数据之前先进行一次聚合,减少 shuffle 数据量
  3. (不常用)split 分桶:在 count distinct、sum distinct 的去重的场景中,如果出现数据倾斜,任务性能会非常差,所以如果先按照 distinct key 进行分桶,将数据打散到各个 TM 进行计算,然后将分桶的结果再进行聚合,性能就会提升很大
  4. (常用)去重 filter 子句:在 count distinct 中使用 filter 子句于 Hive SQL 中的 count(distinct if(xxx, user_id, null)) 子句,但是 state 中同一个 key 会按照 bit 位会进行复用,这对状态大小优化非常有用

上面简单介绍了聚合场景的四种优化,下面详细介绍一下其最终效果以及实现原理。

5.4.1.MiniBatch 聚合

  1. ⭐ 问题场景:默认情况下,unbounded agg 算子是逐条处理输入的记录,其处理流程如下:
  • ⭐ 从状态中读取 accumulator;
  • ⭐ 累加/撤回的数据记录至 accumulator;
  • ⭐ 将 accumulator 写回状态;
  • ⭐ 下一条记录将再次从流程 1 开始处理。

但是上述处理流程的问题在于会增加 StateBackend 的访问性能开销(尤其是对于 RocksDB StateBackend)。

  1. ⭐ MiniBatch 聚合如何解决上述问题:其核心思想是将一组输入的数据缓存在聚合算子内部的缓冲区中。当输入的数据被触发处理时,每个 key 只需要访问一次状态后端,这样可以大大减少访问状态的时间开销从而获得更好的吞吐量。但是,其会增加一些数据产出的延迟,因为它会缓冲一些数据再去处理。因此如果你要做这个优化,需要提前做一下吞吐量和延迟之间的权衡,但是大多数情况下,buffer 数据的延迟都是可以被接受的。所以非常建议在 unbounded agg 场景下使用这项优化。

下图说明了 MiniBatch 聚合如何减少状态访问的。

MiniBatch

上图展示了加 MiniBatch 和没加 MiniBatch 之前的执行区别。

  1. ⭐ 启用 MiniBatch 聚合的参数:
代码语言:javascript复制
TableEnvironment tEnv = ...

Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.setString("table.exec.mini-batch.enabled", "true"); // 启用 MiniBatch 聚合
configuration.setString("table.exec.mini-batch.allow-latency", "5 s"); // buffer 最多 5s 的输入数据记录
configuration.setString("table.exec.mini-batch.size", "5000"); // buffer 最多的输入数据记录数目

注意!!!

  1. table.exec.mini-batch.allow-latencytable.exec.mini-batch.size 两者只要其中一项满足条件就会执行 batch 访问状态操作。
  2. ⭐ 上述 MiniBatch 配置不会对 Window TVF 生效,因为!!!Window TVF 默认就会启用小批量优化,Window TVF 会将 buffer 的输入记录记录在托管内存中,而不是 JVM 堆中,因此 Window TVF 不会有 GC 过高或者 OOM 的问题。

5.4.2.两阶段聚合

  1. ⭐ 问题场景:在聚合数据处理场景中,很可能会由于热点数据导致数据倾斜,如下 SQL 所示,当 color = RED 为 50000w 条,而 color = BLUE 为 5 条,就产生了数据倾斜,而器数据处理的算子产生性能瓶颈。
代码语言:javascript复制
SELECT color, sum(id)
FROM T
GROUP BY color
  1. ⭐ 两阶段聚合如何解决上述问题:其核心思想类似于 MapReduce 中的 Combiner Reduce,先将聚合操作在本地做一次 local 聚合,这样 shuffle 到下游的数据就会变少。

还是上面的 SQL 案例,如果在 50000w 条的 color = RED 的数据 shuffle 之前,在本地将 color = RED 的数据聚合成为 1 条结果,那么 shuffle 给下游的数据量就被极大地减少了。

下图说明了两阶段聚合是如何处理热点数据的:

两阶段聚合

  1. ⭐ 启用两阶段聚合的参数:
代码语言:javascript复制
TableEnvironment tEnv = ...

Configuration configuration = tEnv.getConfig().getConfiguration();
configuration.setString("table.exec.mini-batch.enabled", "true"); // 打开 minibatch
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); // 打开两阶段聚合

注意!!!

  1. ⭐ 此优化在窗口聚合中会自动生效,大家在使用 Window TVF 时可以看到 localagg globalagg 两部分
  2. ⭐ 但是在 unbounded agg 中需要与 MiniBatch 参数相结合使用才会生效。

5.4.3.split 分桶

  1. ⭐ 问题场景:使用两阶段聚合虽然能够很好的处理 count,sum 等常规聚合算子,但是在 count distinct,sum distinct 等算子的两阶段聚合效果在大多数场景下都不太满足预期。

因为 100w 条数据的 count 聚合能够在 local 算子聚合为 1 条数据,但是 count distinct 聚合 100w 条在 local 聚合之后的结果和可能是 90w 条,那么依然会有数据倾斜,如下 SQL 案例所示:

代码语言:javascript复制
SELECT color, COUNT(DISTINCT user_id)
FROM T
GROUP BY color
  1. ⭐ split 分桶如何解决上述问题:其核心思想在于按照 distinct 的 key,即 user_id,先做数据的分桶,将数据打散,分散到 Flink 的多个 TM 上进行计算,然后再将数据合桶计算。打开 split 分桶之后的效果就等同于以下 SQL:
代码语言:javascript复制
SELECT color, SUM(cnt)
FROM (
    SELECT color, COUNT(DISTINCT user_id) as cnt
    FROM T
    GROUP BY color, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY color

下图说明了 split 分桶的处理流程:

split 聚合

  1. ⭐ 启用 split 分桶的参数:
代码语言:javascript复制
TableEnvironment tEnv = ...

tEnv.getConfig()
  .getConfiguration()
  .setString("table.optimizer.distinct-agg.split.enabled", "true");  // 打开 split 分桶

注意!!!

  1. ⭐ 如果有多个 distinct key,则多个 distinct key 都会被作为分桶 key。比如 count(distinct a),sum(distinct b) 这种多个 distinct key 也支持。
  2. ⭐ 小伙伴萌自己写的 UDAF 不支持!
  3. ⭐ 其实此种优化很少使用,因为大家直接自己按照分桶的写法自己就可以写了,而且最后生成的算子图和自己写的 SQL 的语法也能对应的上

5.4.4.去重 filter 子句

  1. ⭐ 问题场景:在一些场景下,用户可能需要从不同维度计算 UV,例如 Android 的 UV、iPhone 的 UV、Web 的 UV 和总 UV。许多用户会选择 CASE WHEN 支持此功能,如下 SQL 所示:
代码语言:javascript复制
SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
 COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day

但是如果你想实现类似的效果,Flink SQL 提供了更好性能的写法,就是本小节的 filter 子句。

  1. ⭐ Filter 子句重写上述场景:
代码语言:javascript复制
SELECT
 day,
 COUNT(DISTINCT user_id) AS total_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
 COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('web', 'other')) AS web_uv
FROM T
GROUP BY day

Filter 子句的优化点在于,Flink 会识别出三个去重的 key 都是 user_id,因此会把三个去重的 key 存在一个共享的状态中。而不是上文 case when 中的三个状态中。其具体实现区别在于:

  • ⭐ case when:total_uv、app_uv、web_uv 在去重时,state 是存在三个 MapState 中的,MapState key 为 user_id,value 为默认值,判断是否重复直接按照 key 是在 MapState 中的出现过进行判断。如果总 uv 为 1 亿,'android', 'iphone' uv 为 5kw,'wap', 'other' uv 为 5kw,则 3 个 state 要存储总共 2 亿条数据
  • ⭐ filter:total_uv、app_uv、web_uv 在去重时,state 是存在一个 MapState 中的,MapState key 为 user_id,value 为 long,其中 long 的第一个 bit 位标识在计算总 uv 时此 user_id 是否来光顾哦,第二个标识 'android', 'iphone',第三个标识 'wap', 'other',因此在上述 case when 相同的数据量的情况下,总共只需要存储 1 亿条数据,state 容量减小了几乎 50%

或者下面的场景也可以使用 filter 子句进行替换。

  1. ⭐ 优化前:
代码语言:javascript复制
select
    day
    , app_typp
    , count(distinct user_id) as uv
from source_table
group by
    day
    , app_type

如果能够确定 app_type 是可以枚举的,比如为 android、iphone、web 三种,则可以使用 filter 子句做性能优化:

代码语言:javascript复制
select
    day
    , count(distinct user_id) filter (where app_type = 'android') as android_uv
    , count(distinct user_id) filter (where app_type = 'iphone') as iphone_uv
    , count(distinct user_id) filter (where app_type = 'web') as web_uv
from source_table
group by
    day

经过上述优化之后,state 大小的优化效果也会是成倍提升的。

5.5.SQL Connector 扩展 - 自定义 SourceSink

5.5.1.自定义 SourceSink

flink sql 知其所以然(一)| sourcesink 原理

5.5.2.自定义 SourceSink 的扩展接口

Flink SQL 中除了自定义的 Source 的基础接口之外,还提供了一部分扩展接口用于性能的优化、能力扩展,接下来详细进行介绍。在 SourceSink 中主要包含了以下接口:

  1. ⭐ Source 算子的接口:
  • SupportsFilterPushDown:将过滤条件下推到 Source 中提前过滤,减少下游处理的数据量。案例可见 org.apache.flink.table.filesystem.FileSystemTableSource
  • SupportsLimitPushDown:将 limit 条目数下推到 Source 中提前限制处理的条目数。案例可见 org.apache.flink.table.filesystem.FileSystemTableSource
  • SupportsPartitionPushDown:(常用于批处理场景)将带有 Partition 属性的 Source,将所有的 Partition 数据获取到之后,然后在 Source 决定哪个 Source 读取哪些 Partition 的数据,而不必在 Source 后过滤。比如 Hive 表的 Partition,将所有 Partition 获取到之后,然后决定某个 Source 应该读取哪些 Partition,详细可见 org.apache.flink.table.filesystem.FileSystemTableSource
  • SupportsProjectionPushDown:将下游用到的字段下推到 Source 中,然后 Source 中只取这些字段,不使用的字段就不往下游发。案例可见 org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource
  • SupportsReadingMetadata:支持读取 Source 的 metadata,比如在 Kafka Source 中读取 Kafka 的 offset,写入时间戳等数据。案例可见 org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource
  • SupportsWatermarkPushDown:支持将 Watermark 的分配方式下推到 Source 中,比如 Kafka Source 中一个 Source Task 可以读取多个 Partition,然后为每个 Partition 单独分配 Watermark Generator,这样 Watermark 的生成粒度就是单 Partition,在事件时间下数据计算会更加准确。案例可见 org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource
  • SupportsSourceWatermark:支持自定义的 Source Watermark 分配方式,比如目前已有的 Watermark 分配方式不满足需求,需要自定义 Source 的 Watermark 生成方式,则可以实现此接口 在 DDL 中声明 SOURCE_WATERMARK() 来声明使用自定义 Source 的 Watermark 生成方式。案例可见 org.apache.flink.table.planner.connectors.ExternalDynamicSource
  1. ⭐ Sink 算子的接口:
  • SupportsOverwrite:(常用于批处理场景)支持类似于 Hive SQL 的 insert overwrite table xxx 的能力,将已有分区内的数据进行覆盖。案例可见 org.apache.flink.connectors.hive.HiveTableSink
  • SupportsPartitioning:(常用于批处理场景)支持类似于 Hive SQL 的 insert INTO xxx partition(key = 'A') xxx 的能力,支持将结果数据写入某个静态分区。案例可见 org.apache.flink.connectors.hive.HiveTableSink
  • SupportsWritingMetadata:支持将 metadata 写入到 Sink 中,比如可以往 Kafka Sink 中写入 Kafka 的 timestamp、header 等。案例可见 org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink

5.5.3.Source:SupportsFilterPushDown

  1. ⭐ 应用场景:将 where 中的一些过滤条件下推到 Source 中进行处理,这样不需要的数据就可以不往下游发送了,性能会有提升。
  2. ⭐ 优化前:如下图 web ui 算子图,过滤条件都在 Source 节点之后有单独的 filter 算子进行承接

filter 前

  1. ⭐ 优化方案及实现:在 DynamicTableSource 中实现 SupportsFilterPushDown 接口的方法,具体实现方案如下:
代码语言:javascript复制
public class Abilities_TableSource implements ScanTableSource
        , SupportsFilterPushDown // 过滤条件下推 {
    private List<ResolvedExpression> filters;

    // 方法输入参数:List<ResolvedExpression> filters:引擎下推过来的过滤条件,然后在此方法中来决定哪些条件需要被下推
    // 方法输出参数:Result:Result 记录哪些过滤条件在 Source 中应用,哪些条件不能在 Source 中应用
    @Override
    public Result applyFilters(List<ResolvedExpression> filters) {
        this.filters = new LinkedList<>(filters);

        // 1.不上推任何过滤条件
        // Result.of(上推的 filter, 没有做上推的 filter)
//        return Result.of(Lists.newLinkedList(), filters);
        // 2.将所有的过滤条件都上推到 source
        return Result.of(filters, Lists.newLinkedList());
    }
}
  1. ⭐ 优化效果:如下图 web ui 算子图,过滤条件在 Source 节点执行

filter 后

5.5.4.Source:SupportsLimitPushDown

  1. ⭐ 应用场景:将 limit 子句下推到 Source 中,在批场景中可以过滤大部分不需要的数据
  2. ⭐ 优化前:如下图 web ui 算子图,limit 条件都在 Source 节点之后有单独的 Limit 算子进行承接

limit 前

  1. ⭐ 优化方案及实现:在 DynamicTableSource 中实现 SupportsLimitPushDown 接口的方法,具体实现方案如下:
代码语言:javascript复制
public class Abilities_TableSource implements ScanTableSource
        , SupportsLimitPushDown // limit 条件下推 {

    private long limit = -1;

    @Override
    // 方法输入参数:long limit:引擎下推过来的 limit 条目数
    public void applyLimit(long limit) {
        // 将 limit 数接收到之后,然后在 SourceFunction 中可以进行过滤
        this.limit = limit;
    }
}
  1. ⭐ 优化效果:如下图 web ui 算子图,limit 条件在 Source 节点执行

limit 后

5.5.5.Source:SupportsProjectionPushDown

  1. ⭐ 应用场景:将下游用到的字段下推到 Source 中,然后 Source 中可以做到只取这些字段,不使用的字段就不往下游发
  2. ⭐ 优化前:如下图 web ui 算子图,limit 条件都在 Source 节点之后有单独的 Limit 算子进行承接

project 前

  1. ⭐ 优化方案及实现:在 DynamicTableSource 中实现 SupportsProjectionPushDown 接口的方法,具体实现方案如下:
代码语言:javascript复制
public class Abilities_TableSource implements ScanTableSource
        , SupportsProjectionPushDown // select 字段下推 {

    private TableSchema tableSchema;

    @SneakyThrows
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

        // create runtime classes that are shipped to the cluster

        final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
                runtimeProviderContext,
                getSchemaWithMetadata(this.tableSchema).toRowDataType());

        ...
    }

    @Override
    // 方法输入参数:
    // int[][] projectedFields:下游算子 `使用到的那些字段` 的下标,可以通过 projectSchemaWithMetadata 方法结合 table schema 信息生成 Source 新的需要写出 schema 信息
    public void applyProjection(int[][] projectedFields) {
        this.tableSchema = projectSchemaWithMetadata(this.tableSchema, projectedFields);
    }
}
  1. ⭐ 优化效果:如下图 web ui 算子图,下游没有用到的字段直接在 Source 节点过滤掉,不输出

project 后

5.5.6.Source:SupportsReadingMetadata

  1. ⭐ 应用场景:支持读取 Source 的 metadata,比如在 Kafka Source 中读取 Kafka 的 offset,写入时间戳等数据
  2. ⭐ 支持之前:比如想获取 Kafka 中的 offset 字段,在之前是不支持的
  3. ⭐ 支持方案及实现:在 DynamicTableSource 中实现 SupportsReadingMetadata 接口的方法,我们来看看 Flink Kafka Consumer 的具体实现方案:
代码语言:javascript复制
// 注意!!!先执行 listReadableMetadata(),然后执行 applyReadableMetadata(xxx, xxx) 方法

// 方法输出参数:列出 Kafka Source 可以从 Kafka 中读取的 metadata 数据
@Override
public Map<String, DataType> listReadableMetadata() {
    final Map<String, DataType> metadataMap = new LinkedHashMap<>();

    // add value format metadata with prefix
    valueDecodingFormat
            .listReadableMetadata()
            .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX   key, value));

    // add connector metadata
    Stream.of(ReadableMetadata.values())
            .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));

    return metadataMap;
}

// 方法输入参数:
// List<String> metadataKeys:用户 SQL 中写入到 Sink 表的的 metadata 字段名称(metadataKeys)
// DataType producedDataType:将用户 SQL 写入到 Sink 表的所有字段的类型信息传进来,包括了 metadata 字段的类型信息
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
    final List<String> formatMetadataKeys =
            metadataKeys.stream()
                    .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
                    .collect(Collectors.toList());
    final List<String> connectorMetadataKeys = new ArrayList<>(metadataKeys);
    connectorMetadataKeys.removeAll(formatMetadataKeys);

    final Map<String, DataType> formatMetadata = valueDecodingFormat.listReadableMetadata();
    if (formatMetadata.size() > 0) {
        final List<String> requestedFormatMetadataKeys =
                formatMetadataKeys.stream()
                        .map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
                        .collect(Collectors.toList());
        valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
    }

    this.metadataKeys = connectorMetadataKeys;
    this.producedDataType = producedDataType;
}
  1. ⭐ 支持之后的效果:
代码语言:javascript复制
CREATE TABLE KafkaTable (
   // METADATA 字段用于声明可以从 Source 读取的 metadata
   // 关于 Flink Kafka Source 可以读取的 metadata 见以下链接
   // https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#available-metadata
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

在后续的 DML SQL 语句中就可以正常使用这些 metadata 字段的数据了。

5.5.7.Source:SupportsWatermarkPushDown

  1. ⭐ 应用场景:支持将 Watermark 的分配方式下推到 Source 中,比如 Kafka Source 中一个 Source Task 可以读取多个 Partition,Watermark 分配器下推到 Source 算子中后,就可以为每个 Partition 单独分配 Watermark Generator,这样 Watermark 的生成粒度就是 Kafka 的单 Partition,在事件时间下数据乱序会更小。
  2. ⭐ 支持之前:可以看到下图,Watermark 的分配是在 Source 节点之后的。

watermark 前

  1. ⭐ 支持方案及实现:在 DynamicTableSource 中实现 SupportsWatermarkPushDown 接口的方法,我们来看看 Flink Kafka Consumer 的具体实现方案:
代码语言:javascript复制
// 方法输入参数:
// WatermarkStrategy<RowData> watermarkStrategy:将用户 DDL 中的 watermark 生成方式传入
@Override
public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
    this.watermarkStrategy = watermarkStrategy;
}
  1. ⭐ 支持之后的效果:

watermark 前

5.5.8.Sink:SupportsOverwrite

  1. ⭐ 应用场景:(常用于批处理场景)支持类似于 Hive SQL 的 insert overwrite table xxx 的能力,将已有分区内的数据进行覆盖。
  2. ⭐ 支持方案及实现:在 DynamicTableSink 中实现 SupportsOverwrite 接口的方法,我们来看看 HiveTableSink 的具体实现方案:
代码语言:javascript复制
private DataStreamSink<Row> createBatchSink(
    DataStream<RowData> dataStream,
    DataStructureConverter converter,
    StorageDescriptor sd,
    HiveWriterFactory recordWriterFactory,
    OutputFileConfig fileNaming,
    final int parallelism)
    throws IOException {
FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
...
--- 2. 将 overwrite 字段设置到 FileSystemOutputFormat 中,在后续写入数据到 Hive 表时,如果 overwrite = true,则会覆盖直接覆盖已有数据
builder.setOverwrite(overwrite);
builder.setStaticPartitions(staticPartitionSpec);
...
return dataStream
        .map((MapFunction<RowData, Row>) value -> (Row) converter.toExternal(value))
        .writeUsingOutputFormat(builder.build())
        .setParallelism(parallelism);
}

// 1. 方法输入参数:
// boolean overwrite:用户写的 SQL 中如果包含了 overwrite 关键字,则方法入参 overwrite = true
// 如果不包含 overwrite 关键字,则方法入参 overwrite = false
@Override
public void applyOverwrite(boolean overwrite) {
    this.overwrite = overwrite;
}
  1. ⭐ 支持之后的效果:

支持在批任务中 insert overwrite xxx。

代码语言:javascript复制
insert overwrite hive_sink_table
select
    user_id
    , order_amount
    , server_timestamp_bigint
    , server_timestamp 
from hive_source_table

5.5.9.Sink:SupportsPartitioning

  1. ⭐ 应用场景:(常用于批处理场景)支持类似于 Hive SQL 的 insert INTO xxx partition(key = 'A') 的能力,支持将结果数据写入某个静态分区。
  2. ⭐ 支持方案及实现:在 DynamicTableSink 中实现 SupportsPartitioning 接口的方法,我们来看看 HiveTableSink 的具体实现方案:
代码语言:javascript复制
private DataStreamSink<Row> createBatchSink(
    DataStream<RowData> dataStream,
    DataStructureConverter converter,
    StorageDescriptor sd,
    HiveWriterFactory recordWriterFactory,
    OutputFileConfig fileNaming,
    final int parallelism)
    throws IOException {
FileSystemOutputFormat.Builder<Row> builder = new FileSystemOutputFormat.Builder<>();
...
builder.setMetaStoreFactory(msFactory());
builder.setOverwrite(overwrite);
--- 2. 将 staticPartitionSpec 字段设置到 FileSystemOutputFormat 中,在后续写入数据到 Hive 表时,如果有静态分区,则会将数据写入到对应的静态分区中
builder.setStaticPartitions(staticPartitionSpec);
...
return dataStream
        .map((MapFunction<RowData, Row>) value -> (Row) converter.toExternal(value))
        .writeUsingOutputFormat(builder.build())
        .setParallelism(parallelism);
}

// 1. 方法输入参数:
// Map<String, String> partitionMap:用户写的 SQL 中如果包含了 partition(partition_key = 'A') 关键字
// 则方法入参 Map<String, String> partitionMap 的输入值转为 JSON 后为:{"partition_key": "A"}
// 用户可以自己将方法入参的 partitionMap 保存到自定义变量中,后续写出到 Hive 表时进行使用
@Override
public void applyStaticPartition(Map<String, String> partitionMap) {
    staticPartitionSpec = new LinkedHashMap<>();
    for (String partitionCol : getPartitionKeys()) {
        if (partitionMap.containsKey(partitionCol)) {
            staticPartitionSpec.put(partitionCol, partitionMap.get(partitionCol));
        }
    }
}
  1. ⭐ 支持之后的效果:
代码语言:javascript复制
insert overwrite hive_sink_table partition(date = '2022-01-01')
select
    user_id
    , order_amount
    , server_timestamp_bigint
    , server_timestamp 
from hive_source_table

5.5.9.Sink:SupportsWritingMetadata

  1. ⭐ 应用场景:支持将 metadata 写入到 Sink 中。举例:可以往 Kafka Sink 中写入 Kafka 的 timestamp、header 等。案例可见 org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink
  2. ⭐ 支持方案及实现:在 DynamicTableSink 中实现 SupportsWritingMetadata 接口的方法,我们来看看 KafkaDynamicSink 的具体实现方案:
代码语言:javascript复制
// 注意!!!先执行 listWritableMetadata(),然后执行 applyWritableMetadata(xxx, xxx) 方法

// 1. 方法返回参数 Map<String, DataType>:Flink 会获取到可以写入到 Kafka Sink 中的 metadata 都有哪些
@Override
public Map<String, DataType> listWritableMetadata() {
    final Map<String, DataType> metadataMap = new LinkedHashMap<>();
    Stream.of(WritableMetadata.values())
            .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
    return metadataMap;
}

// 2. 方法输入参数:
// List<String> metadataKeys:通过解析用户的 SQL 语句,得出用户写出到 Sink 的 metadata 列信息,是 listWritableMetadata() 返回结果的子集
// DataType consumedDataType:写出到 Sink 字段的 DataType 类型信息,包括了写出的 metadata 列的类型信息(注意!!!metadata 列会被添加到最后一列)。
// 用户可以将这两个信息获取到,然后传入构造的 SinkFunction 中实现将对应字段写入 metadata 流程。
@Override
public void applyWritableMetadata(List<String> metadataKeys, DataType consumedDataType) {
    this.metadataKeys = metadataKeys;
    this.consumedDataType = consumedDataType;
}
  1. ⭐ 支持之后的效果:
代码语言:javascript复制
CREATE TABLE KafkaSourceTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'source_topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'json'
);

CREATE TABLE KafkaSinkTable (
  -- 1. 定义 kafka 中 metadata 的 timestamp 列
  `timestamp` TIMESTAMP_LTZ(3) METADATA,
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink_topic',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'json'
);

insert into KafkaSinkTable
select
    -- 2. 写入到 kafka 的 metadata 中的 timestamp
    cast(CURRENT_TIMESTAMP as TIMESTAMP_LTZ(3)) as `timestamp`
    , user_id
    , item_id
    , behavior
from KafkaSourceTable

5.6.SQL Format 扩展

关于怎么实现一个自定义的 Format 可以参考一下文章。

flink sql 知其所以然(五)| 自定义 protobuf format

0 人点赞