Flink中Table语法的聚合操作

2023-04-27 15:16:27 浏览数 (1)

常用方法

Flink Table 内置的聚合方法包括:

  1. sum():求和
  2. count():计数
  3. avg():平均值
  4. min():最小值
  5. max():最大值
  6. stddevPop():计算整个波动总体的标准偏差
  7. stddevSamp():计算样本数据的标准偏差
  8. varPop():计算整个波动总体的方差
  9. varSamp():计算样本数据的方差

另外,Flink Table 还支持自定义聚合方法。

示例

示例:

代码语言:javascript复制
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.api.scala._
import org.apache.flink.types.Row
import org.apache.flink.table.functions.AggregateFunction

object TableAggregationsExample {
  def main(args: Array[String]): Unit = {
    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
    val tEnv = StreamTableEnvironment.create(env, settings)

    val stream = env.fromElements(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (3L, 2, "Hello"),
      (4L, 3, "World"))

    val table = tEnv.fromDataStream(stream, $"id", $"num", $"str")

    // count
    table
      .groupBy($"str")
      .select($"str", $"id".count())
      .toRetractStream[Row]
      .print()

    // sum
    table
      .groupBy($"str")
      .select($"str", $"id".sum())
      .toRetractStream[Row]
      .print()

    // max
    table
      .groupBy($"str")
      .select($"str", $"id".max())
      .toRetractStream[Row]
      .print()

    // min
    table
      .groupBy($"str")
      .select($"str", $"id".min())
      .toRetractStream[Row]
      .print()

    // avg
    table
      .groupBy($"str")
      .select($"str", $"id".avg())
      .toRetractStream[Row]
      .print()

    // Custom Aggregate Function
    val myCount = new MyCount
    tEnv.createTemporaryView("myTable", table)
    tEnv.registerFunction("myCount", myCount)

    table
      .groupBy($"str")
      .select($"str", call("myCount", $"id"))
      .toRetractStream[Row]
      .print()

    env.execute()
  }

  class MyCount extends AggregateFunction[Long, MyCountAccumulator] {
    @Override def createAccumulator(): MyCountAccumulator = new MyCountAccumulator

    @Override def getValue(accumulator: MyCountAccumulator): Long = accumulator.count

    def accumulate(acc: MyCountAccumulator, id: Long) = acc.count  = 1

  }

  class MyCountAccumulator {
    var count: Long = 0L
  }
}

该示例中展示了Flink Table内置的count/sum/max/min/avg等聚合方法的使用,并在最后展示了如何使用自定义聚合函数。

0 人点赞