Oceanus使用自定义Connector指南

2021-09-26 16:49:16 浏览数 (3)

实时即未来,最近在腾讯云Oceanus进行Flink实时计算服务,以下为使用自定义Connector的实践。分享给大家~

支持自定义conncetor操作说明

  1. 在腾讯云Oceanus页面-->程序包管理页面--> 选择新建程序包,上传自己jar包。 上传时选择与自己环境对应的区域。 如:flink-connector-kudu.jar
  2. 在作业管理页面新建作业 --> 开发调试 --> 作业参数 -->引用程序包,选择刚刚上传的jar包,注意选择对应的版本,并保存。
  3. 在开发调试页面编写对应的Flink SQL --> 发布运行。

SQL示例

代码语言:txt复制
-- Datagen Connector 可以随机生成一些数据用于测试
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html
CREATE TABLE random_source ( 
  f_sequence INT, 
  f_random INT, 
  f_random_str VARCHAR 
  ) WITH ( 
  'connector' = 'datagen', 
  'rows-per-second'='1',  -- 每秒产生的数据条数
      
  'fields.f_sequence.kind'='sequence',   -- 有界序列(结束后自动停止输出)
  'fields.f_sequence.start'='1',         -- 序列的起始值
  'fields.f_sequence.end'='10000',       -- 序列的终止值
      
  'fields.f_random.kind'='random',       -- 无界的随机数
  'fields.f_random.min'='1',             -- 随机数的最小值
  'fields.f_random.max'='1000',          -- 随机数的最大值
      
  'fields.f_random_str.length'='10'      -- 随机字符串的长度
);

CREATE TABLE Data_Output (
  `id` BIGINT,
  `name` STRING
) WITH (
  'connector.type' = 'kudu'
  ,'kudu.masters' = 'master01:7051,master02:7051,master03:7051'
  ,'kudu.table' = 'Data_Output'
  ,'kudu.hash-columns' = 'id'
  ,'kudu.primary-key-columns' = 'id'
  ,'kudu.max-buffer-size' = '5000'
  ,'kudu.flush-interval' = '1000'
);

INSERT INTO `Data_Output`
SELECT f_sequence, f_random_str FROM random_source;

注:首次使用Kudu表时,kudu表用impala-shell查询时需要在Impala中创建对应的外表才能查到kudu的表数据。

自定义Conector可参考开源代码自己进行修改,打包。

开发自定义SQL Connector指南

0 人点赞