Dinky 扩展 Phoenix 连接器使用分享

2022-05-19 10:35:57 浏览数 (1)

摘要:本文介绍了在 Dinky 中扩展 Phoenix 的 Flink 连接器使用分享。内容包括:

  1. Phoenix 连接器编译
  2. Phoenix 连接器部署
  3. Phoenix 连接器使用
  4. Phoenix 连接器 Demo

Tips:历史传送门~

《Dinky 0.6.1 已发布,优化 Flink 应用体验》

《Dinky在Kubernetes的实践分享》

《Dinky在IDEA远程调试实践分享》

《Dlink 在 FinkCDC 流式入湖 Hudi 的实践分享》

GitHub 地址

https://github.com/DataLinkDC/dlink

https://gitee.com/DataLinkDC/Dinky

欢迎大家关注 Dinky 的发展~

一、Phoenix 连接器编译

1.下载源码

https://github.com/DataLinkDC/dlink

2.参考官网文档进行编译打包

http://www.dlink.top/#/zh-CN/deploy/build

3.找到 connector 包

二、Phoenix 连接器部署

使用方式:

  • 2.1 Flink 中使用 通过 flink 启动的 flink 任务,例如 flink session 任务,需要将 dlink-connector-phoenix-1.13-xxx.jar 和 phoenix-4.14.2-HBase-1.4-client.jar 、phoenix-core-4.14.2-HBase-1.4.jar 放入flink/lib 目录下,可以直接在sql中建表使用。
  • 2.2 dinky 中使用集成 可用于 yarn-perjob 等方式集群任务提交等,也就是通过 dinky 提交任务,此方式需要排除 phoenix-4.14.2-HBase-1.4-client.jar 与 dinky 中冲突的依赖 servlet , gson类。

1.排除冲突依赖

参考官网进行 http://www.dlink.top/#/zh-CN/deploy/deploy?id=加载依赖

将phoenix-4.14.2-HBase-1.4-client.jar 中的 servlet ,gson 依赖项排除。

flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar 也是同样删除 servlet 包中的类。

如果遇到新版本其他依赖冲突问题,可参考此方式进行排除。

在这里直接删除了 phoenix-4.14.2-HBase-1.4-client.jar 包中的上述冲突类的包。如下所示:

将排除冲突依赖的 phoenix-4.14.2-HBase-1.4-client.jar 和 phoenix-core-4.14.2-HBase-1.4.jar 放入 Dinky/plugins 目录下。

并确认 Dinky/lib 目录下存在 dlink-connector-phoenix-1.13-0.6.1-SNAPSHOT.jar 的 connector 包

这里使用的Flink版本为1.13.5 dinky plugins 目录下的依赖如下所示:

三、Phoenix 连接器的使用

1.Dinky 中使用

新建 flink sql studio

创建 flink phoenix 表

代码语言:javascript复制
CREATE TABLE pv (
    sid INT,
    ucount BIGINT,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
  'connector.type' = 'phoenix', 
  'connector.url' = 'jdbc:phoenix:xxxxxx:2181', 
  'connector.table' = 'TEST.PV',  
  'connector.driver' = 'org.apache.phoenix.jdbc.PhoenixDriver', 
  'connector.username' = '', 
  'connector.password' = '',
  'phoenix.schema.isnamespacemappingenabled' = 'true',
  'phoenix.schema.mapsystemtablestonamespace' = 'true',
  'connector.write.flush.max-rows' = '1'
 );

参数解释:

phoenix-connector中拓展了

  • 'phoenix.schema.isnamespacemappingenabled' = 'true',
  • 'phoenix.schema.mapsystemtablestonamespace' = 'true'两个参数 用于连接开启 schema 配置的 phoenix ,如果未开启则设置为 false。
  • 'connector.write.flush.max-rows' 参数为写入的数据批次条数,如果想提升写入 phoenix 性能可以设置较大。
  • 'connector.table' = 'TEST.PV', TEST为 phoenix 中 schema 名 PV 为phoenix 表名 3.2 原生 Flink 使用**

2.原生 Flink 使用

在Flink中只需要将打包后的phoenix connector 和 原生的phoenix-client , phoenix-core包放入 flink/lib 目录下即可在 sql-client中 使用。

lib下文件如下图所示:

四、Phoenix 连接器 Demo

1.Demo1 求实时 PV 数据

通过模拟数据源,将关联 mysql 维表数据,然后将数据写入 phoenix 中。

由于 phonix 中的 insert 语义是 upsert ,相同的主键数据会覆盖。实现了实时 pv 数据的变动。

代码语言:javascript复制
SET 'table.exec.mini-batch.enabled' = 'true';
SET 'table.exec.mini-batch.allow-latency' = '5 s';
SET 'table.exec.mini-batch.size' = '5000';
SET 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';

--模拟数据源
CREATE TABLE datagen (
 userid int,
 proctime as PROCTIME()
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='10',
 'fields.userid.kind'='random',
 'fields.userid.min'='1',
 'fields.userid.max'='100'
);

--创建mysql lookup维表
CREATE TABLE student (
    sid INT,
    name STRING,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://xxxx:3306/test',
   'table-name' = 'student',
   'username' = 'xxxx',
   'password' = 'xxxxx',
   'lookup.max-retries' = '1',
   'lookup.cache.max-rows' = '1000',
   'lookup.cache.ttl' = '60s' 
);

--创建 phoenix pv表
CREATE TABLE pv (
    sid INT,
    ucount BIGINT,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
  'connector.type' = 'phoenix', 
  'connector.url' = 'jdbc:phoenix:zxbd-test-hbase:2181', 
  'connector.table' = 'TEST.PV',  
  'connector.driver' = 'org.apache.phoenix.jdbc.PhoenixDriver', 
  'connector.username' = '', 
  'connector.password' = '',
  'phoenix.schema.isnamespacemappingenabled' = 'true',
  'phoenix.schema.mapsystemtablestonamespace' = 'true',
  'connector.write.flush.max-rows' = '30'
 );

insert into pv select student.sid as sid ,count(student.sid) as ucount from datagen left join student FOR SYSTEM_TIME AS OF datagen.proctime on student.sid = datagen.userid group by student.sid having student.sid is not null;

保存提交任务,在 sql-client 同样可以执行。

dinky任务提交方式,可以参考官网使用文档:http://www.dlink.top/#/zh-CN/administrator_guide/studio/job_dev/flinksql_guide/flinksql_job_submit

2.Demo2 Mysql CDC 数据实时同步 Phoenix

通过 flink 的 cdc 能力,将 mysql 中的数据实时同步至 phoenix 中。

代码语言:javascript复制
SET 'execution.checkpointing.interval' = '30s'; 

CREATE TABLE student (
    sid INT,
    name STRING,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'xxxxx',
'port' = 'xxx',
'username' = 'xxxx',
'password' = 'xxxxx',
'database-name' = 'test',
'table-name' = 'student');


CREATE TABLE pstudent (
    sid INT,
    name STRING,
    PRIMARY KEY (sid) NOT ENFORCED
) WITH (
  'connector.type' = 'phoenix', 
  'connector.url' = 'jdbc:phoenix:xxxx:2181', 
  'connector.table' = 'TEST.PSTUDENT',  
  'connector.driver' = 'org.apache.phoenix.jdbc.PhoenixDriver', 
  'connector.username' = '', 
  'connector.password' = '',
  'phoenix.schema.isnamespacemappingenabled' = 'true',
  'phoenix.schema.mapsystemtablestonamespace' = 'true',
  'connector.write.flush.max-rows' = '1'
 );

insert into pstudent select * from student;

保存提交任务。

0 人点赞