1.文档编写目的
SQL Stream Builder(SSB)是Cloudera提供的基于Flink-SQL的实时流计算Web开发平台,它提供了一个交互式的Flink SQL编辑器,让用户可以方便的使用SQL访问一个source比如Kafka中的数据写入到一个sink比如Hive中,具体可以参考Fayson的上一篇文章《0877-1.6.2-SQL Stream Builder(SSB)概述》。本文主要介绍如何在CDP中安装SSB,SSB与Apache Flink同属于Cloudera Streaming Analytics(CSA)套件,而且安装包Parcel也是同一个,只是csd文件有区分,建议在安装SSB之前先提前安装好Flink,Flink安装文档参考Fayson之前的文章《0876-7.1.7-如何在CDP中部署Flink1.14》。
•测试环境
1.操作系统CentOS7.9
2.CDP7.1.7/CM7.4.4
3.Flink1.14已安装成功
4.使用root用户操作
5.集群未启用Kerberos
2.安装前置准备
2.1准备SSB的csd文件
SSB与Apache Flink同属于一个Parcel安装包,但是csd文件不一样,安装SSB之前建议先安装好Flink,然后我们只需要准备SSB的csd文件,让Cloudera Manager可以显示安装SSB。
1.下载SSB的csd文件,下载地址如下:
https://archive.cloudera.com/p/csa/1.6.2.0/csd/SQL_STREAM_BUILDER-1.14.0-csa1.6.2.0-cdh7.1.7.0-551-23013538.jar
2.将下载的csd文件放到Cloudera Manager Server节点的/opt/cloudera/csd目录,并且修改文件属组。
chown cloudera-scm:cloudera-scm/opt/cloudera/csd/SQL_STREAM_BUILDER-1.14.0-csa1.6.2.0-cdh7.1.7.0-551-23013538.jar
3.重启CM Server,这时添加服务页面可以看到添加SSB服务。
代码语言:javascript复制systemctl restart cloudera-scm-server
2.2 安装postgresql10
1.将Cloudera提供的postgresql10下载到本地,下载地址为:
代码语言:javascript复制https://archive.cloudera.com/postgresql10/redhat7/libicu-devel-50.2-4.el7_7.x86_64.rpm
https://archive.cloudera.com/postgresql10/redhat7/pgdg-redhat-repo-latest.noarch.rpm
https://archive.cloudera.com/postgresql10/redhat7/postgresql10-10.12-1PGDG.rhel7.x86_64.rpm
https://archive.cloudera.com/postgresql10/redhat7/postgresql10-libs-10.12-1PGDG.rhel7.x86_64.rpm
https://archive.cloudera.com/postgresql10/redhat7/postgresql10-server-10.12-1PGDG.rhel7.x86_64.rpm
https://archive.cloudera.com/postgresql10/redhat7/systemd-sysv-219-73.el7_8.5.x86_64.rpm
2.将下载好的6个rpm文件放到同一个postgresql10目录,然后创建repodata,并移动到/var/www/html目录。
代码语言:javascript复制createrepo .
mv postgresql10/ /var/www/html/
测试访问http地址能够看到文件。
3.制作postgresql10的repo文件
代码语言:javascript复制[postgresql10]
name = Postgresql 10
baseurl = http://cdh0001/postgresql10
enable = true
gpgcheck = false
4.安装postgresql
代码语言:javascript复制yum -y install postgresql10-server
5.创建数据与日志目录。
代码语言:javascript复制mkdir -p /data/pgsql_data
mkdir -p /var/log/pgsql_log
chown postgres:postgres /data/pgsql_data/
chown postgres:postgres /var/log/pgsql_log
6.初始化数据库
代码语言:javascript复制sudo -u postgres /usr/pgsql-10/bin/initdb -D /data/pgsql_data/
7.修改postgresql.conf文件中的listen_addresses,将localhost改为*,并反注释,保存文件退出。
代码语言:javascript复制vim /data/pgsql_data/postgresql.conf
8.修改pg_hba.conf文件在末尾增加以下内容,保存文件退出。
代码语言:javascript复制vim /data/pgsql_data/pg_hba.conf
host all all 0.0.0.0/0 md5
9.启动postgresql
代码语言:javascript复制sudo -u postgres /usr/pgsql-10/bin/pg_ctl -D /data/pgsql_data -l /var/log/pgsql_log/logfile start
10.为SQL Stream Builder元数据和Materialized View Engine创建数据库与用户
代码语言:javascript复制sudo -u postgres psql
CREATE ROLE ssb_admin LOGIN PASSWORD 'password';
CREATE DATABASE ssb_admin OWNER ssb_admin ENCODING 'UTF8';
CREATE ROLE ssb_mve LOGIN PASSWORD 'password';
CREATE DATABASE ssb_mve OWNER ssb_mve ENCODING 'UTF8';
11.准备SSB连接postgresql的jdbc驱动文件
代码语言:javascript复制mv postgresql-9.4.1212.jar /usr/share/java
cd /usr/share/java/
chmod 777 postgresql-9.4.1212.jar
ln -s postgresql-9.4.1212.jar postgresql-connector-java.jar
3.安装SSB
1.进入CM主页,选择“添加服务”。
2.选择SSB,点击“继续”
3.选择Streaming SQL Engine,Materialized View Engine,Streaming SQL Console所在的主机,并点击“继续”。
4.数据库设置页面类型选择PostgreSQL,点击“测试连接”,测试成功后,点击“继续。”
5.输入物化视图引擎的数据库连接地址,数据库登录用户名和密码,点击“继续”。
6.等待服务器启动成功。
全部启动成功以后,点击“继续”
12.点击“完成”,返回CM主页
4.SSB功能测试
1.首次登录Streaming SQL Console,使用admin/admin。
2.登录后可以直接见到一个Flink SQL的编辑器页面。
3.在SSB中创建用户fayson重新登录SSB,另外在集群各节点的OS中也创建同样的fayson用户。
4.1使用SSB读取Kafka中的数据
1.创建Kafka Topic
代码语言:javascript复制sudo -u fayson kafka-topics --create --zookeeper cdh0001:2181/kafka --replication-factor 3 --partitions 3 --topic MyTopic
2.写入数据到该Topic
代码语言:javascript复制sudo -u fayson kafka-console-producer --broker-list cdh0002:9092,cdh0003:9092,cdh0004:9092 --topic MyTopic
{"id": 1,"username": "zhangshan","password": "123qwe","lock": false}
{"id": 2,"username": "lisi","password": "123qwe","lock": false}
{"id": 3,"username": "wangwu","password": "123qwe","lock": false}
{"id": 4,"username": "fayson","password": "123@qwe","lock": true}
{"id": 5,"username": "zhaoliu","password": "1234qwe","lock": true}
3.消费数据进行测试
代码语言:javascript复制sudo -u fayson kafka-console-consumer --topic MyTopic --from-beginning --bootstrap-server cdh001:9092,cdh0003:9092,cdh0004:9092
4.在SSB中创建映射表
点击“Detect Schema”
导入schema成功,点击Save Changes
5.创建Kafka表成功
6.访问表中的数据进行测试,因为我们测试数据只有5条,要在设置页面进行设置,如下所示
然后回到Console执行Flink SQL,输入以下SQL,点击“Execute”
代码语言:javascript复制select id,username,password,lock from `ssb`.`ssb_default`.`MyTopicSource`
7.可以在Flink的Dashboard看到这个任务
4.2使用SSB将Kafka Topic中的数据写入到另外一个Topic
1.在Kafka中创建一个用于sink的topic
代码语言:javascript复制sudo -u fayson kafka-topics --create --zookeeper cdh0001:2181/kafka --replication-factor 3 --partitions 3 --topic MyTopicSink
2.在SSB中创建用于sink的topic的映射表,因为topic中还没有数据没办法自动侦测schema,可以自定义json格式。
点击“Save Changes”
3.可以在SSB的Tables页面看到sink表
4.使用Flink SQL将MyTopicSource表数据写入到MyTopicSink中,输入以下SQL,点击“Execute”
代码语言:javascript复制insert into `ssb`.`ssb_default`.`MyTopicSink`
select * from `ssb`.`ssb_default`.`MyTopicSource`
5.在Flink的Dashboard页面可以看到该任务
6.通过Kafka的消费命令进行测试,数据已经写入到sink的topic中
代码语言:javascript复制sudo -u fayson kafka-console-consumer --topic MyTopicSink --from-beginning --bootstrap-server cdh001:9092,cdh0003:9092,cdh0004:9092
发现数据已经写入到sink的topic中
4.3通过SSB将Kafka Topic中的数据写入到Hive
1.要sink到Hive表中,首先要在SSB中注册Hive服务,以方便SSB能看到Hive的数据库与表,进入“Data Providers”
点击“Validate”
2.在Hive中创建一个用于sink的表
代码语言:javascript复制create external table kafka_hive_sink (
id bigint,
username string,
password string,
lock boolean
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ","
stored as textfile location '/user/fayson/kafka_hive_sink';
3.将Kafka Topic的数据写入到Hive表中,输入以下SQL语句,比点击“Execute”
代码语言:javascript复制set execution.checkpointing.interval=10000;
insert into `CDP_Hive`.`default`.`kafka_hive_sink`
select id,username,password,lock from `ssb`.`ssb_default`.`MyTopicSource`;
4.查询Hive表可以从Kafka插入的数据。
5.从Flink Dashboard也能看到该作业
4.4 SSB中的UDF测试
1.进入SSB的Console页面,选择“Functions”,点击“Create Function”
2.创建HELLO_WORLD自定义函数,输入一个自定义函数的JavaScript代码如下:
代码语言:javascript复制function HELLO_WORLD(input){
return "Hello World" input;
}
HELLO_WORLD($p0); // this line must exist
3.点击“Save Changes”保存该自定义函数
4.测试自定义函数
代码语言:javascript复制select username,HELLO_WORLD(username) from MyTopicSource;
返回的结果符合预期。
4.5物化视图测试
1.在Console页面执行以下语句
代码语言:javascript复制select id,username,password,lock from MyTopicSource;
2.点击“Stop”停止该Flink SQL作业,然后选择“Materialized Views”,将Materialized View设置为“Enabled”,将id选择为Primary Key。
3.点击Add API Key,输入Key Name,然后点击“Save”
4.保存API Key以后,点击Add Query
13.输入API Endpoint pattern,然后Query Builder点击Select ALL,最后点击“Save Changes”
将URL Pattern复制到剪贴板
5.回到Console页面,将Flink SQL作业重新运行起来
6.在浏览器页面输入在之前步骤复制的RestfulAPI的地址到浏览器。
代码语言:javascript复制http://cdh0001:18131/api/v1/query/5200/helloworld?key=7523ea54-1393-4c4b-aee8-b22fd3202abf
发现能够正确返回Kafka Topic中的数据。
4.6SSB写入Kudu测试
1.先在Kudu中创建一张测试表用于从Kafka写入
代码语言:javascript复制CREATE TABLE my_first_table
(
id BIGINT,
username string,
password string,
lock boolean,
PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;
2.要sink到Kudu表中,首先要在SSB中注册Kudu服务,以方便SSB能看到Kudu的数据库与表,进入“Data Providers”
点击“Validate”
3.点击“Add Tables”,可以查看到刚刚在Kudu中创建的表。
4.进入Console,执行以下SQL,将Kafka中的数据写入Kudu
代码语言:javascript复制insert into `CDP_KUDU`.`default_database`.`default.my_first_table`
select id,username,password,lock from MyTopicSource;
5.使用Impala查询Kudu的数据,发现已经插入成功。
5.其他问题
1.SSB的Streaming SQL Console支持的数据库有Oracle,MariaDB,MySQL以及PostgreSQL,但是Materialized View Engine只支持PostgreSQL,所以建议要将SSB用起来为了方便直接使用PostgreSQL。
2.保证在Ranger中给ssb用户授予了Hive相关表的权限。
3.postgresql的驱动可以到官网下载
代码语言:javascript复制https://jdbc.postgresql.org/download.html
4.更多postgresql相关配置,可以参考Cloudera官网:
https://docs.cloudera.com/cdp-private-cloud-base/7.1.7/installation/topics/cdpdc-configuring-starting-postgresql-server.html
5.Fayson的测试环境因为没有开启Kerberos,但是安装了Ranger,在SSB中集成Kudu的时候存在问题,可以暂时取消Kudu与Ranger的集成来解决。
6.本文在测试从Kafka中将数据写入到Hive时,手动设置了execution.checkpointing.interval为10000,因为Flink Connector在sink数据到HDFS或者Hive的时候,是两阶段提交,是先写到临时文件,当checkpoint发生的时候才会真正写入到Hive或HDFS,所以为了快速看到效果,进行了手动设置,因为CDP中的Flink服务checkpoint配置默认是空没配置的。