0878-1.6.2-如何在CDP7.1.7中安装SSB

2022-04-18 11:47:23 浏览数 (1)

1.文档编写目的

SQL Stream Builder(SSB)是Cloudera提供的基于Flink-SQL的实时流计算Web开发平台,它提供了一个交互式的Flink SQL编辑器,让用户可以方便的使用SQL访问一个source比如Kafka中的数据写入到一个sink比如Hive中,具体可以参考Fayson的上一篇文章《0877-1.6.2-SQL Stream Builder(SSB)概述》。本文主要介绍如何在CDP中安装SSB,SSB与Apache Flink同属于Cloudera Streaming Analytics(CSA)套件,而且安装包Parcel也是同一个,只是csd文件有区分,建议在安装SSB之前先提前安装好Flink,Flink安装文档参考Fayson之前的文章《0876-7.1.7-如何在CDP中部署Flink1.14》。

•测试环境

1.操作系统CentOS7.9

2.CDP7.1.7/CM7.4.4

3.Flink1.14已安装成功

4.使用root用户操作

5.集群未启用Kerberos

2.安装前置准备

2.1准备SSB的csd文件

SSB与Apache Flink同属于一个Parcel安装包,但是csd文件不一样,安装SSB之前建议先安装好Flink,然后我们只需要准备SSB的csd文件,让Cloudera Manager可以显示安装SSB。

1.下载SSB的csd文件,下载地址如下:

代码语言:javascript复制
https://archive.cloudera.com/p/csa/1.6.2.0/csd/SQL_STREAM_BUILDER-1.14.0-csa1.6.2.0-cdh7.1.7.0-551-23013538.jar

2.将下载的csd文件放到Cloudera Manager Server节点的/opt/cloudera/csd目录,并且修改文件属组。

代码语言:javascript复制
chown cloudera-scm:cloudera-scm/opt/cloudera/csd/SQL_STREAM_BUILDER-1.14.0-csa1.6.2.0-cdh7.1.7.0-551-23013538.jar

3.重启CM Server,这时添加服务页面可以看到添加SSB服务。

代码语言:javascript复制
systemctl restart cloudera-scm-server

2.2 安装postgresql10

1.将Cloudera提供的postgresql10下载到本地,下载地址为:

代码语言:javascript复制
https://archive.cloudera.com/postgresql10/redhat7/libicu-devel-50.2-4.el7_7.x86_64.rpm
https://archive.cloudera.com/postgresql10/redhat7/pgdg-redhat-repo-latest.noarch.rpm
https://archive.cloudera.com/postgresql10/redhat7/postgresql10-10.12-1PGDG.rhel7.x86_64.rpm
https://archive.cloudera.com/postgresql10/redhat7/postgresql10-libs-10.12-1PGDG.rhel7.x86_64.rpm
https://archive.cloudera.com/postgresql10/redhat7/postgresql10-server-10.12-1PGDG.rhel7.x86_64.rpm
https://archive.cloudera.com/postgresql10/redhat7/systemd-sysv-219-73.el7_8.5.x86_64.rpm

2.将下载好的6个rpm文件放到同一个postgresql10目录,然后创建repodata,并移动到/var/www/html目录。

代码语言:javascript复制
createrepo .
mv postgresql10/ /var/www/html/

测试访问http地址能够看到文件。

3.制作postgresql10的repo文件

代码语言:javascript复制
[postgresql10]
name = Postgresql 10
baseurl = http://cdh0001/postgresql10
enable = true
gpgcheck = false

4.安装postgresql

代码语言:javascript复制
yum -y install postgresql10-server

5.创建数据与日志目录。

代码语言:javascript复制
mkdir -p /data/pgsql_data
mkdir -p /var/log/pgsql_log
chown postgres:postgres /data/pgsql_data/
chown postgres:postgres /var/log/pgsql_log

6.初始化数据库

代码语言:javascript复制
sudo -u postgres /usr/pgsql-10/bin/initdb -D /data/pgsql_data/

7.修改postgresql.conf文件中的listen_addresses,将localhost改为*,并反注释,保存文件退出。

代码语言:javascript复制
vim /data/pgsql_data/postgresql.conf

8.修改pg_hba.conf文件在末尾增加以下内容,保存文件退出。

代码语言:javascript复制
vim /data/pgsql_data/pg_hba.conf
host all all 0.0.0.0/0 md5

9.启动postgresql

代码语言:javascript复制
sudo -u postgres /usr/pgsql-10/bin/pg_ctl -D /data/pgsql_data -l /var/log/pgsql_log/logfile start

10.为SQL Stream Builder元数据和Materialized View Engine创建数据库与用户

代码语言:javascript复制
sudo -u postgres psql

CREATE ROLE ssb_admin LOGIN PASSWORD 'password';
CREATE DATABASE ssb_admin OWNER ssb_admin ENCODING 'UTF8';


CREATE ROLE ssb_mve LOGIN PASSWORD 'password';
CREATE DATABASE ssb_mve OWNER ssb_mve ENCODING 'UTF8';

11.准备SSB连接postgresql的jdbc驱动文件

代码语言:javascript复制
mv postgresql-9.4.1212.jar /usr/share/java
cd /usr/share/java/
chmod 777 postgresql-9.4.1212.jar
ln -s postgresql-9.4.1212.jar postgresql-connector-java.jar

3.安装SSB

1.进入CM主页,选择“添加服务”。

2.选择SSB,点击“继续”

3.选择Streaming SQL Engine,Materialized View Engine,Streaming SQL Console所在的主机,并点击“继续”。

4.数据库设置页面类型选择PostgreSQL,点击“测试连接”,测试成功后,点击“继续。”

5.输入物化视图引擎的数据库连接地址,数据库登录用户名和密码,点击“继续”。

6.等待服务器启动成功。

全部启动成功以后,点击“继续”

12.点击“完成”,返回CM主页

4.SSB功能测试

1.首次登录Streaming SQL Console,使用admin/admin。

2.登录后可以直接见到一个Flink SQL的编辑器页面。

3.在SSB中创建用户fayson重新登录SSB,另外在集群各节点的OS中也创建同样的fayson用户。

4.1使用SSB读取Kafka中的数据

1.创建Kafka Topic

代码语言:javascript复制
sudo -u fayson kafka-topics --create --zookeeper cdh0001:2181/kafka --replication-factor 3 --partitions 3 --topic MyTopic

2.写入数据到该Topic

代码语言:javascript复制
sudo -u fayson kafka-console-producer --broker-list cdh0002:9092,cdh0003:9092,cdh0004:9092 --topic MyTopic

{"id": 1,"username": "zhangshan","password": "123qwe","lock": false}
{"id": 2,"username": "lisi","password": "123qwe","lock": false}
{"id": 3,"username": "wangwu","password": "123qwe","lock": false}
{"id": 4,"username": "fayson","password": "123@qwe","lock": true}
{"id": 5,"username": "zhaoliu","password": "1234qwe","lock": true}

3.消费数据进行测试

代码语言:javascript复制
sudo -u fayson kafka-console-consumer --topic MyTopic --from-beginning --bootstrap-server cdh001:9092,cdh0003:9092,cdh0004:9092

4.在SSB中创建映射表

点击“Detect Schema”

导入schema成功,点击Save Changes

5.创建Kafka表成功

6.访问表中的数据进行测试,因为我们测试数据只有5条,要在设置页面进行设置,如下所示

然后回到Console执行Flink SQL,输入以下SQL,点击“Execute”

代码语言:javascript复制
select id,username,password,lock from `ssb`.`ssb_default`.`MyTopicSource`

7.可以在Flink的Dashboard看到这个任务

4.2使用SSB将Kafka Topic中的数据写入到另外一个Topic

1.在Kafka中创建一个用于sink的topic

代码语言:javascript复制
sudo -u fayson kafka-topics --create --zookeeper cdh0001:2181/kafka --replication-factor 3 --partitions 3 --topic MyTopicSink

2.在SSB中创建用于sink的topic的映射表,因为topic中还没有数据没办法自动侦测schema,可以自定义json格式。

点击“Save Changes”

3.可以在SSB的Tables页面看到sink表

4.使用Flink SQL将MyTopicSource表数据写入到MyTopicSink中,输入以下SQL,点击“Execute”

代码语言:javascript复制
insert into `ssb`.`ssb_default`.`MyTopicSink`
select * from `ssb`.`ssb_default`.`MyTopicSource`

5.在Flink的Dashboard页面可以看到该任务

6.通过Kafka的消费命令进行测试,数据已经写入到sink的topic中

代码语言:javascript复制
sudo -u fayson kafka-console-consumer --topic MyTopicSink --from-beginning --bootstrap-server cdh001:9092,cdh0003:9092,cdh0004:9092

发现数据已经写入到sink的topic中

4.3通过SSB将Kafka Topic中的数据写入到Hive

1.要sink到Hive表中,首先要在SSB中注册Hive服务,以方便SSB能看到Hive的数据库与表,进入“Data Providers”

点击“Validate”

2.在Hive中创建一个用于sink的表

代码语言:javascript复制
create external table kafka_hive_sink (
id bigint,
username string,
password string,
lock boolean
) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY "," 
stored as textfile location '/user/fayson/kafka_hive_sink';

3.将Kafka Topic的数据写入到Hive表中,输入以下SQL语句,比点击“Execute”

代码语言:javascript复制
set execution.checkpointing.interval=10000;

insert into `CDP_Hive`.`default`.`kafka_hive_sink`

select id,username,password,lock from `ssb`.`ssb_default`.`MyTopicSource`;

4.查询Hive表可以从Kafka插入的数据。

5.从Flink Dashboard也能看到该作业

4.4 SSB中的UDF测试

1.进入SSB的Console页面,选择“Functions”,点击“Create Function”

2.创建HELLO_WORLD自定义函数,输入一个自定义函数的JavaScript代码如下:

代码语言:javascript复制
function HELLO_WORLD(input){ 
   return "Hello World"   input; 
}

HELLO_WORLD($p0);  // this line must exist

3.点击“Save Changes”保存该自定义函数

4.测试自定义函数

代码语言:javascript复制
select username,HELLO_WORLD(username) from MyTopicSource;

返回的结果符合预期。

4.5物化视图测试

1.在Console页面执行以下语句

代码语言:javascript复制
select id,username,password,lock from MyTopicSource;

2.点击“Stop”停止该Flink SQL作业,然后选择“Materialized Views”,将Materialized View设置为“Enabled”,将id选择为Primary Key。

3.点击Add API Key,输入Key Name,然后点击“Save”

4.保存API Key以后,点击Add Query

13.输入API Endpoint pattern,然后Query Builder点击Select ALL,最后点击“Save Changes”

将URL Pattern复制到剪贴板

5.回到Console页面,将Flink SQL作业重新运行起来

6.在浏览器页面输入在之前步骤复制的RestfulAPI的地址到浏览器。

代码语言:javascript复制
http://cdh0001:18131/api/v1/query/5200/helloworld?key=7523ea54-1393-4c4b-aee8-b22fd3202abf

发现能够正确返回Kafka Topic中的数据。

4.6SSB写入Kudu测试

1.先在Kudu中创建一张测试表用于从Kafka写入

代码语言:javascript复制
CREATE TABLE my_first_table
(
  id BIGINT,
  username string,
  password string,
  lock boolean,
  PRIMARY KEY(id)
)
PARTITION BY HASH PARTITIONS 16
STORED AS KUDU;

2.要sink到Kudu表中,首先要在SSB中注册Kudu服务,以方便SSB能看到Kudu的数据库与表,进入“Data Providers”

点击“Validate”

3.点击“Add Tables”,可以查看到刚刚在Kudu中创建的表。

4.进入Console,执行以下SQL,将Kafka中的数据写入Kudu

代码语言:javascript复制
insert into `CDP_KUDU`.`default_database`.`default.my_first_table`

select id,username,password,lock from MyTopicSource;

5.使用Impala查询Kudu的数据,发现已经插入成功。

5.其他问题

1.SSB的Streaming SQL Console支持的数据库有Oracle,MariaDB,MySQL以及PostgreSQL,但是Materialized View Engine只支持PostgreSQL,所以建议要将SSB用起来为了方便直接使用PostgreSQL。

2.保证在Ranger中给ssb用户授予了Hive相关表的权限。

3.postgresql的驱动可以到官网下载

代码语言:javascript复制
https://jdbc.postgresql.org/download.html

4.更多postgresql相关配置,可以参考Cloudera官网:

代码语言:javascript复制
https://docs.cloudera.com/cdp-private-cloud-base/7.1.7/installation/topics/cdpdc-configuring-starting-postgresql-server.html

5.Fayson的测试环境因为没有开启Kerberos,但是安装了Ranger,在SSB中集成Kudu的时候存在问题,可以暂时取消Kudu与Ranger的集成来解决。

6.本文在测试从Kafka中将数据写入到Hive时,手动设置了execution.checkpointing.interval为10000,因为Flink Connector在sink数据到HDFS或者Hive的时候,是两阶段提交,是先写到临时文件,当checkpoint发生的时候才会真正写入到Hive或HDFS,所以为了快速看到效果,进行了手动设置,因为CDP中的Flink服务checkpoint配置默认是空没配置的。

0 人点赞