Flink 表值聚合操作在 Dlink 的实践

2021-12-27 12:53:51 浏览数 (1)

一、背景

Flink 具有强大的自定义函数功能,最新的 1.13 版本新增了 Async Table Functions。而其已有的 Table Aggregate Functions 只能在 Table API 中使用,无法 FlinkSQL 中进行定义。于是,成为 Apache Flink 的最佳伙伴的目标,正推动着 Dlink 试图实现该特性。

二、准备工作

部署 Dlink-0.2.2

获取安装包

百度网盘链接:https://pan.baidu.com/s/1gHZPGMhYUcpZZgOHta3Csw 提取码:0202

应用结构
代码语言:javascript复制
config/ -- 配置文件
|- application.yml
lib/ -- 外部依赖及Connector
|- dlink-client-1.12.jar -- 必需
|- dlink-connector-jdbc.jar
|- dlink-function-0.2.2.jar
|- flink-connector-jdbc_2.11-1.12.4.jar
|- flink-csv-1.12.4.jar
|- flink-json-1.12.4.jar
|- mysql-connector-java-8.0.21.jar
sql/ 
|- dlink.sql -- Mysql初始化脚本
auto.sh -- 启动停止脚本
dlink-admin.jar -- 程序包
修改配置文件

配置数据库地址。

代码语言:javascript复制
spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/dlink?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
    username: dlink
    password: dlink
    driver-class-name: com.mysql.cj.jdbc.Driver
执行与停止
代码语言:javascript复制
# 启动
sh auto.sh start
# 停止
sh auto.sh stop
# 重启
sh auto.sh restart
# 状态
sh auto.sh status

准备测试表

学生表(student)
代码语言:javascript复制
DROP TABLE IF EXISTS `student`;
CREATE TABLE `student`  (
  `sid` int(11) NOT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (`sid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

INSERT INTO `student` VALUES (1, '小明');
INSERT INTO `student` VALUES (2, '小红');
INSERT INTO `student` VALUES (3, '小黑');
INSERT INTO `student` VALUES (4, '小白');
一维成绩表(score)
代码语言:javascript复制
DROP TABLE IF EXISTS `score`;
CREATE TABLE `score`  (
  `cid` int(11) NOT NULL,
  `sid` int(11) NULL DEFAULT NULL,
  `cls` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `score` int(11) NULL DEFAULT NULL,
  PRIMARY KEY (`cid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

INSERT INTO `score` VALUES (1, 1, 'chinese', 90);
INSERT INTO `score` VALUES (2, 1, 'math', 100);
INSERT INTO `score` VALUES (3, 1, 'english', 95);
INSERT INTO `score` VALUES (4, 2, 'chinese', 98);
INSERT INTO `score` VALUES (5, 2, 'english', 99);
INSERT INTO `score` VALUES (6, 3, 'chinese', 99);
INSERT INTO `score` VALUES (7, 3, 'english', 100);
前二排名表(scoretop2)
代码语言:javascript复制
DROP TABLE IF EXISTS `scoretop2`;
CREATE TABLE `scoretop2`  (
  `cls` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
  `score` int(11) NULL DEFAULT NULL,
  `rank` int(11) NOT NULL,
  PRIMARY KEY (`cls`, `rank`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
二维成绩单表(studentscore)
代码语言:javascript复制
DROP TABLE IF EXISTS `studentscore`;
CREATE TABLE `studentscore`  (
  `sid` int(11) NOT NULL,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `chinese` int(11) NULL DEFAULT NULL,
  `math` int(11) NULL DEFAULT NULL,
  `english` int(11) NULL DEFAULT NULL,
  PRIMARY KEY (`sid`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

问题提出

输出各科成绩前二的分数

要求输出已有学科排名前二的分数到scoretop2表中。

输出二维成绩单

要求将一维成绩表转化为二维成绩单,其中不存在的成绩得分为0,并输出至studentscore表中。

三、Dlink 的 AGGTABLE

本文以 Flink 官方的 Table Aggregate Functions 示例 Top2 为例进行比较说明,传送门 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/#table-aggregate-functions

官方 Table API 实现

代码语言:javascript复制
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;
import static org.apache.flink.table.api.Expressions.*;

// mutable accumulator of structured type for the aggregate function
public static class Top2Accumulator {
  public Integer first;
  public Integer second;
}

// function that takes (value INT), stores intermediate results in a structured
// type of Top2Accumulator, and returns the result as a structured type of Tuple2<Integer, Integer>
// for value and rank
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accumulator> {

  @Override
  public Top2Accumulator createAccumulator() {
    Top2Accumulator acc = new Top2Accumulator();
    acc.first = Integer.MIN_VALUE;
    acc.second = Integer.MIN_VALUE;
    return acc;
  }

  public void accumulate(Top2Accumulator acc, Integer value) {
    if (value > acc.first) {
      acc.second = acc.first;
      acc.first = value;
    } else if (value > acc.second) {
      acc.second = value;
    }
  }

  public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
    for (Top2Accumulator otherAcc : it) {
      accumulate(acc, otherAcc.first);
      accumulate(acc, otherAcc.second);
    }
  }

  public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, Integer>> out) {
    // emit the value and rank
    if (acc.first != Integer.MIN_VALUE) {
      out.collect(Tuple2.of(acc.first, 1));
    }
    if (acc.second != Integer.MIN_VALUE) {
      out.collect(Tuple2.of(acc.second, 2));
    }
  }
}

TableEnvironment env = TableEnvironment.create(...);

// call function "inline" without registration in Table API
env
  .from("MyTable")
  .groupBy($("myField"))
  .flatAggregate(call(Top2.class, $("value")))
  .select($("myField"), $("f0"), $("f1"));

// call function "inline" without registration in Table API
// but use an alias for a better naming of Tuple2's fields
env
  .from("MyTable")
  .groupBy($("myField"))
  .flatAggregate(call(Top2.class, $("value")).as("value", "rank"))
  .select($("myField"), $("value"), $("rank"));

// register function
env.createTemporarySystemFunction("Top2", Top2.class);

// call registered function in Table API
env
  .from("MyTable")
  .groupBy($("myField"))
  .flatAggregate(call("Top2", $("value")).as("value", "rank"))
  .select($("myField"), $("value"), $("rank"));

Dlink FlinkSql 实现

示例
代码语言:javascript复制
CREATE AGGTABLE aggdemo AS 
SELECT myField,value,rank
FROM MyTable
GROUP BY myField
AGG BY TOP2(value) as (value,rank);
优势

可以通过 FlinkSQL 来实现表值聚合的需求,降低了开发与维护成本。

缺点

语法固定,示例关键字必须存在并进行描述,where 可以加在 FROM 和 GROUP BY 之间。

四、Dlink 本地实现分组 Top2

本示例通过 Dlink 的本地环境进行演示实现。

进入Dlink

只有版本号大于等于 0.2.2-rc1 的 Dlink 才支持本文 AGGTABLE 的使用。

编写 FlinkSQL

代码语言:javascript复制
jdbcconfig:='connector' = 'jdbc',
    'url' = 'jdbc:mysql://127.0.0.1:3306/data?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',
    'username'='dlink',
    'password'='dlink',;
CREATE TABLE student (
    sid INT,
    name STRING,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
    ${jdbcconfig}
    'table-name' = 'student'
);
CREATE TABLE score (
    cid INT,
    sid INT,
    cls STRING,
    score INT,
    PRIMARY KEY (cid) NOT ENFORCED
) WITH (
    ${jdbcconfig}
    'table-name' = 'score'
);
CREATE TABLE scoretop2 (
    cls STRING,
    score INT,
    `rank` INT,
    PRIMARY KEY (cls,`rank`) NOT ENFORCED
) WITH (
    ${jdbcconfig}
    'table-name' = 'scoretop2'
);
CREATE AGGTABLE aggscore AS 
SELECT cls,score,rank
FROM score
GROUP BY cls
AGG BY TOP2(score) as (score,rank);

insert into scoretop2
select 
b.cls,b.score,b.`rank`
from aggscore b

本 Sql 使用了 Dlink 的增强特性 Fragment 机制,对 jdbc的配置进行了定义。

维护 FlinkSQL 及配置

编写 FlinkSQL ,配置开启 Fragment 机制,设置 Flink 集群为本地执行。点击保存。

同步执行INSERT

点击同步执行按钮运行当前编辑器中的 FlinkSQL 语句集。弹出提示信息,等待执行完成后自动关闭并刷新信息和结果。

当前版本使用异步提交功能将直接提交任务到集群,Studio 不负责执行结果的记录。提交任务前请保存 FlinkSQL 和配置,否则将提交修改前的语句和配置。

执行反馈

本地执行成功,“0_admin” 为本地会话,里面存储了 Catalog。

同步执行SELECT查看中间过程

由于当前会话中已经存储了表的定义,此时直接选中 select 语句点击同步执行可以重新计算并展示其计算过程中产生的结果,由于 Flink 表值聚合操作机制,该结果非最终结果。

同步执行SELECT查看最终结果

在草稿的页面使用相同的会话可以共享 Catalog,此时只需要执行 select 查询 sink 表就可以预览最终的统计结果。

查看Mysql表的数据

sink 表中只有五条数据,结果是正确的。

五、Dlink 远程集群实现分组多行转列

本示例通过 Dlink 控制远程集群来实现。

远程集群的 lib 中需要上传 dlink-function.jar 。

编写FlinkSQL

代码语言:javascript复制
jdbcconfig:='connector' = 'jdbc',
    'url' = 'jdbc:mysql://127.0.0.1:3306/data?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true',
    'username'='dlink',
    'password'='dlink',;
CREATE TABLE student (
    sid INT,
    name STRING,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
     ${jdbcconfig}
    'table-name' = 'student'
);
CREATE TABLE score (
    cid INT,
    sid INT,
    cls STRING,
    score INT,
    PRIMARY KEY (cid) NOT ENFORCED
) WITH (
     ${jdbcconfig}
    'table-name' = 'score'
);
CREATE TABLE studentscore (
    sid INT,
    name STRING,
    chinese INT,
    math INT,
    english INT,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
     ${jdbcconfig}
    'table-name' = 'studentscore'
);
CREATE AGGTABLE aggscore2 AS 
SELECT sid,data
FROM score
GROUP BY sid
AGG BY TO_MAP(cls,score) as (data);

insert into studentscore
select 
a.sid,a.name,
cast(GET_KEY(b.data,'chinese','0') as int),
cast(GET_KEY(b.data,'math','0') as int),
cast(GET_KEY(b.data,'english','0') as int)
from student a
left join aggscore2 b on a.sid=b.sid 

本实例通过表值聚合将分组后的多行转单列然后通过 GET_KEY 取值的思路来实现。同时,也使用了 Fragment 机制。

同步执行

与示例一相似,不同点在于需要更改集群配置为 远程集群。远程集群的注册在集群中心注册,Hosts 需要填写 JobManager 的地址,HA模式则使用英文逗号分割可能出现的地址,如“127.0.0.1:8081,127.0.0.2:8081,127.0.0.3:8081”。心跳监测正常的集群实例即可用于任务执行或提交。

Flink UI

打开集群的 Flink UI 可以发现刚刚提交的批任务,此时可以发现集群版本号为 1.12.2 ,而 Dlink 默认版本为 1.12.4 ,所以一般大版本内可以互相兼容。

查看Mysql表的数据

查看 Mysql 表的最终数据,发现存在四条结果,且也符合问题的要求,不存在的为 0,故结果正确。

六、未来

未来,Dlink 将紧跟 Flink 官方社区发展,为推广及发展 Flink 的应用而奋斗。

Dlink 将后续不断完善 Studio,打造 FlinkSQL 的最佳搭档的形象。 当然元数据、时间与依赖调度、血缘分析、影响分析、数据地图、监控预警等核心功能将逐步上线。

与此同时,DataLink 数据中台将同步发展,未来将提供开源的企业级数据中台解决方案

0 人点赞