一、准备环境
1.根据产品文档安装Flink客户端;
2.将sql-client-defaults.yaml放入/opt/client/Flink/flink/conf中
3.将jaas.conf放入/opt/client/Flink/flink/conf中
Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=false useTicketCache=true debug=false; };
4.添加sql-client.sh中添加在JVM_ARGS参数:
JVM_ARGS="-Djava.security.auth.login.config=/opt/client/Flink/flink/conf/jaas.conf $JVM_ARGS"
二、启动Flink集群
例如:yarn-session.sh -t ssl -d
三、启动SQL-Client
./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml
四、运行SQL
CREATE TABLE kafkaSourceTable ( order_id VARCHAR, shop_id VARCHAR, member_id VARCHAR, trade_amt DOUBLE ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'order_sql', 'connector.properties.bootstrap.servers' = '10.162.147.217:21005', 'connector.properties.zookeeper.connect' = '10.162.147.217:24002', 'connector.properties.group.id' = 'test-consumer-group', 'connector.startup-mode' = 'latest-offset', 'format.type' = 'json' );
CREATE TABLE kafkaSinkTable(shop_id VARCHAR, member_id VARCHAR) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'order_sql', 'connector.properties.bootstrap.servers' = '10.162.147.217:21005', 'connector.properties.zookeeper.connect' = '10.162.147.217:24002', 'update-mode' = 'append', 'format.type' = 'json' ); INSERT INTO kafkaSinkTable SELECT shop_id, member_id FROM kafkaSourceTable; SELECT shop_id, member_id FROM kafkaSourceTable;
五、对接Hive
1)修改sql-client-defaults.yaml
catalogs: - name: myhive type: hive hive-conf-dir: /opt/clienrc5/Hive/config hive-version: 3.1.0
2)在/opt/clienrc5/Hive/config/hive-site.xml添加配置
<property> <name>hive.metastore.sasl.enabled</name> <value>true</value> </property>
3)启动sql-client
use catalog myhive;
SET table.sql-dialect=hive;
CREATE TABLE IF NOT EXISTS hive_dialect_tbl ( `id` int , `name` string , `age` int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; SET table.sql-dialect=default;
CREATE TABLE datagen ( `id` int , `name` string , `age` int ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1' ); INSERT INTO hive_dialect_tbl SELECT * FROM datagen; select * from hive_dialect_tbl;