Flink 实践教程-进阶(10):自定义聚合函数(UDAF)

2022-03-14 11:40:06 浏览数 (1)

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。

本文将为您详细介绍如何使用自定义聚合函数(UDAF),将处理后的存入 MySQL 中。

前置准备

创建流计算 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建 MySQL 实例

进入 MySQL 控制台 [3],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [4]。进入实例后,单击右上角【登陆】即可登陆 MySQL 数据库。

创建 MySQL 表
代码语言:javascript复制
-- 建表语句,用于向 Source 提供数据CREATE TABLE `udaf_input` ( `id`       int(10) NOT NULL, `product`  varchar(50) DEFAULT '', `value`    int(10) DEFAULT NULL, `weight`   int(10) DEFAULT NULL, PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8
-- 插入数据INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (1, 'oceanus-1', 2, 2);INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (2, 'oceanus-1', 3, 3);INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (3, 'oceanus-2', 5, 4);INSERT INTO `udaf_input` (`id`, `product`, `value`, `weight`) VALUES (5, 'oceanus-2', 6, 5);
-- 建表语句,用于接收 Sink 端数据CREATE TABLE `udaf_output` ( `product`  varchar(50) NOT NULL DEFAULT '', `sum`      double(11,0) DEFAULT NULL, PRIMARY KEY (`product`)) ENGINE=InnoDB DEFAULT CHARSET=utf8

开发 UDAF

我们自定义一个 UDAF,继承 AggregateFunction,对算子输入的两个字段计算加权平均值。

1. 代码编写

WeightedAvgAccumulator类:

代码语言:javascript复制
package demos.UDAF;
public class WeightedAvgAccumulator{   public long sum = 0;   public int count = 0;}

WeightedAvg 类:

代码语言:javascript复制
package demos.UDAF;
import org.apache.flink.table.functions.AggregateFunction;
public class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccumulator> {
   @Override   public WeightedAvgAccumulator createAccumulator() {       return new WeightedAvgAccumulator();  }
   @Override   public Long getValue(WeightedAvgAccumulator acc) {       if (acc.count == 0) {           return null;      } else {           return acc.sum / acc.count;      }  }
   public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {       acc.sum  = iValue * iWeight;       acc.count  = iWeight;  }
   public void retract(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {       acc.sum -= iValue * iWeight;       acc.count -= iWeight;  }
   public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {       for (WeightedAvgAccumulator a : it) {           acc.count  = a.count;           acc.sum  = a.sum;      }  }
   public void resetAccumulator(WeightedAvgAccumulator acc) {       acc.count = 0;       acc.sum = 0L;  }}

2. 打包 Jar

使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。命令行打包命令:

代码语言:javascript复制
mvn clean package

命令行打包后生成的 Jar 包可以在项目 target 目录下找到。

流计算 Oceanus 作业

上传依赖

在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 Jar 包。

创建 SQL 作业

在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 SQL 作业,点击【开发调试】进入作业编辑页面。单击【作业参数】,在【引用程序包】处选择刚才上传的 Jar 包。

1. 创建 Function
代码语言:javascript复制
CREATE TEMPORARY SYSTEM FUNCTION WeightedAvg  AS 'demos.UDAF.WeightedAvg';

WeightedAvg代表创建的函数名,demos.UDAF.WeightedAvg代表代码所在路径。

2. 创建 Source
代码语言:javascript复制
CREATE TABLE `mysql_cdc_source_table` ( `id`        INT, `product`   VARCHAR, `value`     INT, `weight`    INT, PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ( 'connector' = 'mysql-cdc',  -- 固定值 'mysql-cdc' 'hostname' = 'xx.xx.xx.xx',      -- 数据库的 IP 'port' = 'xxxx',                -- 数据库的访问端口 'username' = 'root',            -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限) 'password' = 'xxxxxxxxx',       -- 数据库访问的密码 'database-name' = 'testdb',     -- 需要同步的数据库 'table-name' = 'udaf_input'     -- 需要同步的数据表名);

3. 创建 Sink

代码语言:javascript复制
CREATE TABLE `jdbc_source_table` (   `product`  VARCHAR,   `sum`      DOUBLE,   PRIMARY KEY(`product`) NOT ENFORCED) WITH (   -- 指定数据库连接参数   'connector' = 'jdbc',   'url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',   -- 请替换为您的实际 MySQL 连接参数   'table-name' = 'udaf_output',    -- 需要写入的数据表   'username' = 'root',             -- 数据库访问的用户名(需要提供 INSERT 权限)   'password' = 'xxxxxxxxx',        -- 数据库访问的密码   'sink.buffer-flush.max-rows' = '200',  -- 批量输出的条数   'sink.buffer-flush.interval' = '2s'    -- 批量输出的间隔);

4. 编写业务 SQL

代码语言:javascript复制
INSERT INTO jdbc_source_tableSELECTproduct,CAST(WeightedAvg(`value`,`weight`) AS DOUBLE) AS `sum`FROM mysql_cdc_source_table GROUP BY `product`;

总结

本文首先在本地开发 UDAF 函数,将其打成 Jar 包后上传到 Oceanus 平台引用。接下来使用 MySQL CDC 连接器获取udaf_input表数据,调用 UDAF 函数对输入的两个字段计算加权平均值后存入 MySQL 中。其他的自定义函数,例如自定义标量函数(UDF)和自定义表值函数(UDTF)的使用方法和视频教程可以参考之前的文章 Flink 实践教程:进阶8-自定义标量函数(UDF) [5]、Flink 实践教程:进阶9-自定义表值函数(UDTF) [6]

  • 自定义聚合函数(UDAF)可以将多条记录聚合成 1 条记录。

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview

[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298

[3] MySQL 控制台:https://console.cloud.tencent.com/cdb

[4] 创建 MySQL 实例:https://cloud.tencent.com/document/product/236/46433

[5] Flink 实践教程:进阶8-自定义标量函数(UDF):https://cloud.tencent.com/developer/article/1946320

[6] Flink 实践教程:进阶9-自定义表值函数(UDTF):https://cloud.tencent.com/developer/article/1951900

流计算 Oceanus 限量秒杀专享活动火爆进行中↓↓

点击文末「阅读原文」,了解腾讯云流计算 Oceanus 更多信息~

腾讯云大数据

长按二维码 关注我们

0 人点赞